diff --git a/go.mod b/go.mod index 8054b6c..4e1814b 100644 --- a/go.mod +++ b/go.mod @@ -10,17 +10,19 @@ require ( github.com/go-playground/validator/v10 v10.26.0 github.com/gofiber/fiber/v2 v2.52.6 github.com/google/uuid v1.6.0 + github.com/hibiken/asynq v0.25.1 github.com/jdcloud-api/jdcloud-sdk-go v1.64.0 github.com/joho/godotenv v1.5.1 github.com/jxskiss/base62 v1.1.0 github.com/lmittmann/tint v1.0.7 - github.com/redis/go-redis/v9 v9.7.3 + github.com/redis/go-redis/v9 v9.8.0 + github.com/shopspring/decimal v1.4.0 github.com/smartwalle/alipay/v3 v3.2.25 github.com/valyala/fasthttp v1.59.0 github.com/wechatpay-apiv3/wechatpay-go v0.2.20 golang.org/x/crypto v0.36.0 gorm.io/driver/postgres v1.5.11 - gorm.io/gen v0.3.26 + gorm.io/gen v0.3.27 gorm.io/gorm v1.25.12 gorm.io/plugin/dbresolver v1.5.3 ) @@ -57,10 +59,11 @@ require ( github.com/modern-go/concurrent v0.0.0-20180306012644-bacd9c7ef1dd // indirect github.com/modern-go/reflect2 v1.0.2 // indirect github.com/rivo/uniseg v0.4.7 // indirect - github.com/shopspring/decimal v1.4.0 // indirect + github.com/robfig/cron/v3 v3.0.1 // indirect github.com/smartwalle/ncrypto v1.0.4 // indirect github.com/smartwalle/ngx v1.0.9 // indirect github.com/smartwalle/nsign v1.0.9 // indirect + github.com/spf13/cast v1.7.0 // indirect github.com/tjfoc/gmsm v1.4.1 // indirect github.com/valyala/bytebufferpool v1.0.0 // indirect golang.org/x/mod v0.24.0 // indirect @@ -68,7 +71,9 @@ require ( golang.org/x/sync v0.12.0 // indirect golang.org/x/sys v0.31.0 // indirect golang.org/x/text v0.23.0 // indirect + golang.org/x/time v0.8.0 // indirect golang.org/x/tools v0.31.0 // indirect + google.golang.org/protobuf v1.35.2 // indirect gopkg.in/ini.v1 v1.67.0 // indirect gorm.io/datatypes v1.2.5 // indirect gorm.io/driver/mysql v1.5.7 // indirect diff --git a/go.sum b/go.sum index c112205..8ad6c9b 100644 --- a/go.sum +++ b/go.sum @@ -75,6 +75,8 @@ github.com/dgryski/go-rendezvous v0.0.0-20200823014737-9f7001d12a5f/go.mod h1:cu github.com/envoyproxy/go-control-plane v0.9.0/go.mod h1:YTl/9mNaCwkRvm6d1a2C3ymFceY/DCBVvsKhRF0iEA4= github.com/envoyproxy/go-control-plane v0.9.4/go.mod h1:6rpuAdCZL397s3pYoYcLgu1mIlRU8Am5FuJP05cCM98= github.com/envoyproxy/protoc-gen-validate v0.1.0/go.mod h1:iSmxcyjqTsJpI2R4NaDN7+kN2VEUnK/pcBlmesArF7c= +github.com/frankban/quicktest v1.14.6 h1:7Xjx+VpznH+oBnejlPUj8oUpdxnVs4f8XU8WnHkI4W8= +github.com/frankban/quicktest v1.14.6/go.mod h1:4ptaffx2x8+WTWXmUCuVU6aPUX1/Mz7zb5vbUoiM6w0= github.com/gabriel-vasile/mimetype v1.4.8 h1:FfZ3gj38NjllZIeJAmMhr+qKL8Wu+nOoI3GqacKw1NM= github.com/gabriel-vasile/mimetype v1.4.8/go.mod h1:ByKUIKGjh1ODkGM1asKUbQZOLGrPjydw3hYPU2YU9t8= github.com/go-playground/assert/v2 v2.2.0 h1:JvknZsQTYeFEAhQwI4qEt9cyV5ONwRHC+lYKSsYSR8s= @@ -118,6 +120,8 @@ github.com/google/uuid v1.6.0 h1:NIvaJDMOsjHA8n1jAhLSgzrAzy1Hgr+hNrb57e+94F0= github.com/google/uuid v1.6.0/go.mod h1:TIyPZe4MgqvfeYDBFedMoGGpEw/LqOeaOT+nhxU+yHo= github.com/gopherjs/gopherjs v0.0.0-20181017120253-0766667cb4d1/go.mod h1:wJfORRmW1u3UXTncJ5qlYoELFm8eSnnEO6hX4iZ3EWY= github.com/gopherjs/gopherjs v0.0.0-20200217142428-fce0ec30dd00/go.mod h1:wJfORRmW1u3UXTncJ5qlYoELFm8eSnnEO6hX4iZ3EWY= +github.com/hibiken/asynq v0.25.1 h1:phj028N0nm15n8O2ims+IvJ2gz4k2auvermngh9JhTw= +github.com/hibiken/asynq v0.25.1/go.mod h1:pazWNOLBu0FEynQRBvHA26qdIKRSmfdIfUm4HdsLmXg= github.com/jackc/pgpassfile v1.0.0 h1:/6Hmqy13Ss2zCq62VdNG8tM1wchn8zjSGOBJ6icpsIM= github.com/jackc/pgpassfile v1.0.0/go.mod h1:CEx0iS5ambNFdcRtxPj5JhEz+xB6uRky5eyVu/W2HEg= github.com/jackc/pgservicefile v0.0.0-20240606120523-5a60cdf6a761 h1:iCEnooe7UlwOQYpKFhBabPMi4aNAfoODPEFNiAnClxo= @@ -142,8 +146,12 @@ github.com/jxskiss/base62 v1.1.0 h1:A5zbF8v8WXx2xixnAKD2w+abC+sIzYJX+nxmhA6HWFw= github.com/jxskiss/base62 v1.1.0/go.mod h1:HhWAlUXvxKThfOlZbcuFzsqwtF5TcqS9ru3y5GfjWAc= github.com/klauspost/compress v1.18.0 h1:c/Cqfb0r+Yi+JtIEq73FWXVkRonBlf0CRNYc8Zttxdo= github.com/klauspost/compress v1.18.0/go.mod h1:2Pp+KzxcywXVXMr50+X0Q/Lsb43OQHYWRCY2AiWywWQ= +github.com/kr/pretty v0.3.1 h1:flRD4NNwYAUpkphVc1HcthR4KEIFJ65n8Mw5qdRn3LE= +github.com/kr/pretty v0.3.1/go.mod h1:hoEshYVHaxMs3cyo3Yncou5ZscifuDolrwPKZanG3xk= github.com/kr/pty v1.1.1/go.mod h1:pFQYn66WHrOpPYNljwOMqo10TkYh1fy3cYio2l3bCsQ= github.com/kr/text v0.1.0/go.mod h1:4Jbv+DJW3UT/LiOwJeYQe1efqtUx/iVham/4vfdArNI= +github.com/kr/text v0.2.0 h1:5Nx0Ya0ZqY2ygV366QzturHI13Jq95ApcVaJBhpS+AY= +github.com/kr/text v0.2.0/go.mod h1:eLer722TekiGuMkidMxC/pM04lWEeraHUUmBw8l2grE= github.com/leodido/go-urn v1.4.0 h1:WT9HwE9SGECu3lg4d/dIA+jxlljEa1/ffXKmRjqdmIQ= github.com/leodido/go-urn v1.4.0/go.mod h1:bvxc+MVxLKB4z00jd1z+Dvzr47oO32F/QSNjSBOlFxI= github.com/lmittmann/tint v1.0.7 h1:D/0OqWZ0YOGZ6AyC+5Y2kD8PBEzBk6rFHVSfOqCkF9Y= @@ -171,11 +179,14 @@ github.com/niemeyer/pretty v0.0.0-20200227124842-a10e7caefd8e/go.mod h1:zD1mROLA github.com/pmezard/go-difflib v1.0.0 h1:4DBwDE0NGyQoBHbLQYPwSUPoCMWR5BEzIk/f1lZbAQM= github.com/pmezard/go-difflib v1.0.0/go.mod h1:iKH77koFhYxTK1pcRnkKkqfTogsbg7gZNVY4sRDYZ/4= github.com/prometheus/client_model v0.0.0-20190812154241-14fe0d1b01d4/go.mod h1:xMI15A0UPsDsEKsMN9yxemIoYk6Tm2C1GtYGdfGttqA= -github.com/redis/go-redis/v9 v9.7.3 h1:YpPyAayJV+XErNsatSElgRZZVCwXX9QzkKYNvO7x0wM= -github.com/redis/go-redis/v9 v9.7.3/go.mod h1:bGUrSggJ9X9GUmZpZNEOQKaANxSGgOEBRltRTZHSvrA= +github.com/redis/go-redis/v9 v9.8.0 h1:q3nRvjrlge/6UD7eTu/DSg2uYiU2mCL0G/uzBWqhicI= github.com/rivo/uniseg v0.2.0/go.mod h1:J6wj4VEh+S6ZtnVlnTBMWIodfgj8LQOQFoIToxlJtxc= github.com/rivo/uniseg v0.4.7 h1:WUdvkW8uEhrYfLC4ZzdpI2ztxP1I582+49Oc5Mq64VQ= github.com/rivo/uniseg v0.4.7/go.mod h1:FN3SvrM+Zdj16jyLfmOkMNblXMcoc8DfTHruCPUcx88= +github.com/robfig/cron/v3 v3.0.1 h1:WdRxkvbJztn8LMz/QEvLN5sBU+xKpSqwwUO1Pjr4qDs= +github.com/robfig/cron/v3 v3.0.1/go.mod h1:eQICP3HwyT7UooqI/z+Ov+PtYAWygg1TEWWzGIFLtro= +github.com/rogpeppe/go-internal v1.9.0 h1:73kH8U+JUqXU8lRuOHeVHaa/SZPifC7BkcraZVejAe8= +github.com/rogpeppe/go-internal v1.9.0/go.mod h1:WtVeX8xhTBvf0smdhujwtBcq4Qrzq/fJaraNFVN+nFs= github.com/shopspring/decimal v1.4.0 h1:bxl37RwXBklmTi0C79JfXCEBD1cqqHt0bbgBAGFp81k= github.com/shopspring/decimal v1.4.0/go.mod h1:gawqmDU56v4yIKSwfBSFip1HdCCXN8/+DMd9qYNcwME= github.com/smartwalle/alipay/v3 v3.2.25 h1:cRDN+fpDWTVHnuHIF/vsJETskRXS/S+fDOdAkzXmV/Q= @@ -189,6 +200,8 @@ github.com/smartwalle/nsign v1.0.9/go.mod h1:eY6I4CJlyNdVMP+t6z1H6Jpd4m5/V+8xi44 github.com/smartystreets/assertions v0.0.0-20180927180507-b2de0cb4f26d/go.mod h1:OnSkiWE9lh6wB0YB77sQom3nweQdgAjqCqsofrRNTgc= github.com/smartystreets/assertions v1.1.0/go.mod h1:tcbTF8ujkAEcZ8TElKY+i30BzYlVhC/LOxJk7iOWnoo= github.com/smartystreets/goconvey v1.6.4/go.mod h1:syvi0/a8iFYH4r/RixwvyeAJjdLS9QV7WQ/tjFTllLA= +github.com/spf13/cast v1.7.0 h1:ntdiHjuueXFgm5nzDRdOS4yfT43P5Fnud6DH50rz/7w= +github.com/spf13/cast v1.7.0/go.mod h1:ancEpBxwJDODSW/UG4rDrAqiKolqNNh2DX3mk86cAdo= github.com/stretchr/objx v0.1.0/go.mod h1:HFkY916IF+rwdDfMAkV7OtwuqBVzrE8GR6GFx+wExME= github.com/stretchr/objx v0.2.0/go.mod h1:qt09Ya8vawLte6SNmTgCsAVtYtaKzEcn8ATUoHMkEqE= github.com/stretchr/objx v0.4.0/go.mod h1:YvHI0jy2hoMjB+UWwv71VJQ9isScKT/TqJzVSSt89Yw= @@ -215,6 +228,8 @@ github.com/xyproto/randomstring v1.0.5/go.mod h1:rgmS5DeNXLivK7YprL0pY+lTuhNQW3i github.com/yuin/goldmark v1.1.27/go.mod h1:3hX8gzYuyVAZsxl0MRgGTJEmQBFcNTphYh9decYSb74= github.com/yuin/goldmark v1.1.30/go.mod h1:3hX8gzYuyVAZsxl0MRgGTJEmQBFcNTphYh9decYSb74= github.com/yuin/goldmark v1.4.13/go.mod h1:6yULJ656Px+3vBD8DxQVa3kxgyrAnzto9xy5taEt/CY= +go.uber.org/goleak v1.3.0 h1:2K3zAYmnTNqV73imy9J1T3WC+gmCePx2hEGkimedGto= +go.uber.org/goleak v1.3.0/go.mod h1:CoHD4mav9JJNrW/WLlf7HGZPjdw8EucARQHekz1X6bE= golang.org/x/crypto v0.0.0-20190308221718-c2843e01d9a2/go.mod h1:djNgcEr1/C05ACkg1iLfiJU5Ep61QUkGW8qpdssI0+w= golang.org/x/crypto v0.0.0-20191011191535-87dc89f01550/go.mod h1:yigFU9vqHzYiE8UmvKecakEJjdnWj3jj499lnFckfCI= golang.org/x/crypto v0.0.0-20191219195013-becbf705a915/go.mod h1:LzIPMQfyMNhhGPhUkYOs5KpL4U8rLKemX1yGLhDgUto= @@ -325,6 +340,8 @@ golang.org/x/text v0.15.0/go.mod h1:18ZOQIKpY8NJVqYksKHtTdi31H5itFRjB5/qKTNYzSU= golang.org/x/text v0.16.0/go.mod h1:GhwF1Be+LQoKShO3cGOHzqOgRrGaYc9AvblQOmPVHnI= golang.org/x/text v0.23.0 h1:D71I7dUrlY+VX0gQShAThNGHFxZ13dGLBHQLVl1mJlY= golang.org/x/text v0.23.0/go.mod h1:/BLNzu4aZCJ1+kcD0DNRotWKage4q2rGVAg4o22unh4= +golang.org/x/time v0.8.0 h1:9i3RxcPv3PZnitoVGMPDKZSq1xW1gK1Xy3ArNOGZfEg= +golang.org/x/time v0.8.0/go.mod h1:3BpzKBy/shNhVucY/MWOyx10tF3SFh9QdLuxbVysPQM= golang.org/x/tools v0.0.0-20180917221912-90fa682c2a6e/go.mod h1:n7NCudcB/nEzxVGmLbDWY5pfWTLqBcC2KZ6jyYvM4mQ= golang.org/x/tools v0.0.0-20190114222345-bf090417da8b/go.mod h1:n7NCudcB/nEzxVGmLbDWY5pfWTLqBcC2KZ6jyYvM4mQ= golang.org/x/tools v0.0.0-20190226205152-f727befe758c/go.mod h1:9Yl7xja0Znq3iFh3HoIrodX9oNMXvdceNzlUR8zjMvY= @@ -356,6 +373,8 @@ google.golang.org/protobuf v0.0.0-20200228230310-ab0ca4ff8a60/go.mod h1:cfTl7dwQ google.golang.org/protobuf v1.20.1-0.20200309200217-e05f789c0967/go.mod h1:A+miEFZTKqfCUM6K7xSMQL9OKL/b6hQv+e19PK+JZNE= google.golang.org/protobuf v1.21.0/go.mod h1:47Nbq4nVaFHyn7ilMalzfO3qCViNmqZ2kzikPIcrTAo= google.golang.org/protobuf v1.23.0/go.mod h1:EGpADcykh3NcUnDUJcl1+ZksZNG86OlYog2l/sGQquU= +google.golang.org/protobuf v1.35.2 h1:8Ar7bF+apOIoThw1EdZl0p1oWvMqTHmpA2fRTyZO8io= +google.golang.org/protobuf v1.35.2/go.mod h1:9fA7Ob0pmnwhb644+1+CVWFRbNajQ6iRojtC/QF5bRE= gopkg.in/check.v1 v0.0.0-20161208181325-20d25e280405/go.mod h1:Co6ibVJAznAaIkqp8huTwlJQCZ016jof/cbN4VW5Yz0= gopkg.in/check.v1 v1.0.0-20200227125254-8fa46927fb4f/go.mod h1:Co6ibVJAznAaIkqp8huTwlJQCZ016jof/cbN4VW5Yz0= gopkg.in/ini.v1 v1.56.0/go.mod h1:pNLf8WUiyNEtQjuu5G5vTm06TEv9tsIgeAvK8hOrP4k= @@ -377,8 +396,7 @@ gorm.io/driver/sqlite v1.5.7 h1:8NvsrhP0ifM7LX9G4zPB97NwovUakUxc+2V2uuf3Z1I= gorm.io/driver/sqlite v1.5.7/go.mod h1:U+J8craQU6Fzkcvu8oLeAQmi50TkwPEhHDEjQZXDah4= gorm.io/driver/sqlserver v1.5.4 h1:xA+Y1KDNspv79q43bPyjDMUgHoYHLhXYmdFcYPobg8g= gorm.io/driver/sqlserver v1.5.4/go.mod h1:+frZ/qYmuna11zHPlh5oc2O6ZA/lS88Keb0XSH1Zh/g= -gorm.io/gen v0.3.26 h1:sFf1j7vNStimPRRAtH4zz5NiHM+1dr6eA9aaRdplyhY= -gorm.io/gen v0.3.26/go.mod h1:a5lq5y3w4g5LMxBcw0wnO6tYUCdNutWODq5LrIt75LE= +gorm.io/gen v0.3.27 h1:ziocAFLpE7e0g4Rum69pGfB9S6DweTxK8gAun7cU8as= gorm.io/gorm v1.24.7-0.20230306060331-85eaf9eeda11/go.mod h1:L4uxeKpfBml98NYqVqwAdmV1a2nBtAec/cf3fpucW/k= gorm.io/gorm v1.25.0/go.mod h1:L4uxeKpfBml98NYqVqwAdmV1a2nBtAec/cf3fpucW/k= gorm.io/gorm v1.25.7/go.mod h1:hbnx/Oo0ChWMn1BIhpy1oYozzpM15i4YPuHDmfYtwg8= diff --git a/pkg/env/env.go b/pkg/env/env.go index 230b075..3c1744f 100644 --- a/pkg/env/env.go +++ b/pkg/env/env.go @@ -12,16 +12,10 @@ import ( // region app var ( - AppName = "platform" RunMode = "debug" // debug, production ) func loadApp() { - _AppName := os.Getenv("APP_NAME") - if _AppName != "" { - AppName = _AppName - } - _RunMode := os.Getenv("RUN_MODE") switch _RunMode { case "debug", "production": diff --git a/scripts/sql/init.sql b/scripts/sql/init.sql index 74de32e..56a43ad 100644 --- a/scripts/sql/init.sql +++ b/scripts/sql/init.sql @@ -1,22 +1,3 @@ --- 清空数据表 -do -$$ - declare - r record; - begin - for r in ( - select - tablename - from - pg_tables - where - schemaname = 'public' - ) loop - execute 'DROP TABLE IF EXISTS ' || quote_ident(r.tablename) || ' CASCADE'; - end loop; - end -$$; - -- ==================== -- region 日志 -- ==================== @@ -620,39 +601,39 @@ comment on column whitelist.deleted_at is '删除时间'; -- channel drop table if exists channel cascade; create table channel ( - id serial primary key, - user_id int not null references "user" (id) + id serial primary key, + user_id int not null references "user" (id) on update cascade on delete cascade, - proxy_id int not null references proxy (id) -- - on update cascade -- + proxy_id int not null references proxy (id) -- + on update cascade -- on delete set null, - edge_id int references edge (id) -- - on update cascade -- - on delete set null, - proxy_host varchar(255) not null default '', - proxy_port int not null, - edge_host varchar(255), - protocol int, - auth_ip bool not null default false, - whitelists text, - auth_pass bool not null default false, - username varchar(255), - password varchar(255), - expiration timestamp not null, - created_at timestamp default current_timestamp, - updated_at timestamp default current_timestamp, - deleted_at timestamp + edge_id int references edge (id) -- + on update cascade -- + on delete set null, + resource_id int references resource (id) -- + on update cascade -- + on delete set null, + proxy_host varchar(255) not null default '', + proxy_port int not null, + edge_host varchar(255), + protocol int, + auth_ip bool not null default false, + whitelists text, + auth_pass bool not null default false, + username varchar(255), + password varchar(255), + expiration timestamp not null, + created_at timestamp default current_timestamp, + updated_at timestamp default current_timestamp, + deleted_at timestamp ); create index channel_user_id_index on channel (user_id); create index channel_proxy_id_index on channel (proxy_id); create index channel_edge_id_index on channel (edge_id); -create index channel_proxy_host_index on channel (proxy_host); -create index channel_proxy_port_index on channel (proxy_port); -create index channel_edge_host_index on channel (edge_host); +create index channel_resource_id_index on channel (resource_id); create index channel_auth_ip_index on channel (auth_ip); create index channel_auth_pass_index on channel (auth_pass); -create index channel_username_index on channel (username); create index channel_expiration_index on channel (expiration); create index channel_deleted_at_index on channel (deleted_at); @@ -662,6 +643,7 @@ comment on column channel.id is '通道ID'; comment on column channel.user_id is '用户ID'; comment on column channel.proxy_id is '代理ID'; comment on column channel.edge_id is '节点ID'; +comment on column channel.resource_id is '套餐ID'; comment on column channel.proxy_host is '代理地址'; comment on column channel.proxy_port is '转发端口'; comment on column channel.edge_host is '节点地址'; diff --git a/web/globals/asynq.go b/web/globals/asynq.go new file mode 100644 index 0000000..19fe59c --- /dev/null +++ b/web/globals/asynq.go @@ -0,0 +1,22 @@ +package globals + +import ( + "github.com/hibiken/asynq" + "log/slog" +) + +var Asynq *asynq.Client + +func InitAsynq() { + var client = asynq.NewClientFromRedisClient(Redis) + Asynq = client +} + +func CloseAsynq() { + if Asynq != nil { + err := Asynq.Close() + if err != nil { + slog.Error("关闭 Asynq 客户端失败", "error", err) + } + } +} diff --git a/web/globals/init.go b/web/globals/init.go index 27ddd98..7ad04d8 100644 --- a/web/globals/init.go +++ b/web/globals/init.go @@ -9,4 +9,5 @@ func Init() { initRedis() initOrm() initProxy() + InitAsynq() } diff --git a/web/globals/proxy.go b/web/globals/proxy.go index a8c439a..e0edba1 100644 --- a/web/globals/proxy.go +++ b/web/globals/proxy.go @@ -8,6 +8,7 @@ import ( "encoding/base64" "encoding/json" "fmt" + "io" "net/http" "strings" "time" @@ -42,6 +43,7 @@ func (p *ProxyClient) Permit(host string, secret string, config []*ProxyPermitCo return fmt.Errorf("加密请求失败: %w", err) } + //goland:noinspection HttpUrlsUsage resp, err := http.Post( fmt.Sprintf("http://%s:8848%s", host, PermitEndpoint), "application/json", @@ -50,7 +52,9 @@ func (p *ProxyClient) Permit(host string, secret string, config []*ProxyPermitCo if err != nil { return err } - defer resp.Body.Close() + defer func(Body io.ReadCloser) { + _ = Body.Close() + }(resp.Body) if resp.StatusCode != http.StatusOK { return fmt.Errorf("配置端口许可失败: %s", resp.Status) diff --git a/web/handlers/channel.go b/web/handlers/channel.go index b8239b9..c7f5b89 100644 --- a/web/handlers/channel.go +++ b/web/handlers/channel.go @@ -154,7 +154,7 @@ func CreateChannel(c *fiber.Ctx) error { // 创建通道 result, err := s.Channel.CreateChannel( - authContext, + authContext.Payload.Id, req.ResourceId, req.Protocol, req.AuthType, @@ -197,7 +197,7 @@ type RemoveChannelsReq struct { func RemoveChannels(c *fiber.Ctx) error { // 检查权限 - authCtx, err := auth.Protect(c, []auth.PayloadType{auth.PayloadUser, auth.PayloadInternalServer}, []string{}) + authCtx, err := auth.NewProtect(c).Payload(auth.PayloadUser).Do() if err != nil { return err } @@ -209,7 +209,31 @@ func RemoveChannels(c *fiber.Ctx) error { } // 删除通道 - err = s.Channel.RemoveChannels(c.Context(), authCtx, req.ByIds...) + err = s.Channel.RemoveChannels(req.ByIds, authCtx.Payload.Id) + if err != nil { + return err + } + + return c.SendStatus(fiber.StatusOK) +} + +type RemoveChannelByTaskReq []int32 + +func RemoveChannelByTask(c *fiber.Ctx) error { + // 检查权限 + _, err := auth.NewProtect(c).Payload(auth.PayloadInternalServer).Do() + if err != nil { + return err + } + + // 解析请求参数 + var req RemoveChannelByTaskReq + if err := c.BodyParser(&req); err != nil { + return err + } + + // 删除通道 + err = s.Channel.RemoveChannels(req) if err != nil { return err } diff --git a/web/models/channel.gen.go b/web/models/channel.gen.go index 634227c..c0c81bf 100644 --- a/web/models/channel.gen.go +++ b/web/models/channel.gen.go @@ -31,6 +31,7 @@ type Channel struct { EdgeHost string `gorm:"column:edge_host;type:character varying(255);comment:节点地址" json:"edge_host"` // 节点地址 EdgeID int32 `gorm:"column:edge_id;type:integer;comment:节点ID" json:"edge_id"` // 节点ID Whitelists string `gorm:"column:whitelists;type:text;comment:IP白名单,逗号分隔" json:"whitelists"` // IP白名单,逗号分隔 + ResourceID int32 `gorm:"column:resource_id;type:integer;comment:套餐ID" json:"resource_id"` // 套餐ID } // TableName Channel's table name diff --git a/web/queries/bill.gen.go b/web/queries/bill.gen.go index c23fd79..7a17bfc 100644 --- a/web/queries/bill.gen.go +++ b/web/queries/bill.gen.go @@ -155,11 +155,20 @@ func (b *bill) fillFieldMap() { func (b bill) clone(db *gorm.DB) bill { b.billDo.ReplaceConnPool(db.Statement.ConnPool) + b.Trade.db = db.Session(&gorm.Session{Initialized: true}) + b.Trade.db.Statement.ConnPool = db.Statement.ConnPool + b.Refund.db = db.Session(&gorm.Session{Initialized: true}) + b.Refund.db.Statement.ConnPool = db.Statement.ConnPool + b.Resource.db = db.Session(&gorm.Session{Initialized: true}) + b.Resource.db.Statement.ConnPool = db.Statement.ConnPool return b } func (b bill) replaceDB(db *gorm.DB) bill { b.billDo.ReplaceDB(db) + b.Trade.db = db.Session(&gorm.Session{}) + b.Refund.db = db.Session(&gorm.Session{}) + b.Resource.db = db.Session(&gorm.Session{}) return b } @@ -196,6 +205,11 @@ func (a billBelongsToTrade) Model(m *models.Bill) *billBelongsToTradeTx { return &billBelongsToTradeTx{a.db.Model(m).Association(a.Name())} } +func (a billBelongsToTrade) Unscoped() *billBelongsToTrade { + a.db = a.db.Unscoped() + return &a +} + type billBelongsToTradeTx struct{ tx *gorm.Association } func (a billBelongsToTradeTx) Find() (result *models.Trade, err error) { @@ -234,6 +248,11 @@ func (a billBelongsToTradeTx) Count() int64 { return a.tx.Count() } +func (a billBelongsToTradeTx) Unscoped() *billBelongsToTradeTx { + a.tx = a.tx.Unscoped() + return &a +} + type billBelongsToRefund struct { db *gorm.DB @@ -267,6 +286,11 @@ func (a billBelongsToRefund) Model(m *models.Bill) *billBelongsToRefundTx { return &billBelongsToRefundTx{a.db.Model(m).Association(a.Name())} } +func (a billBelongsToRefund) Unscoped() *billBelongsToRefund { + a.db = a.db.Unscoped() + return &a +} + type billBelongsToRefundTx struct{ tx *gorm.Association } func (a billBelongsToRefundTx) Find() (result *models.Refund, err error) { @@ -305,6 +329,11 @@ func (a billBelongsToRefundTx) Count() int64 { return a.tx.Count() } +func (a billBelongsToRefundTx) Unscoped() *billBelongsToRefundTx { + a.tx = a.tx.Unscoped() + return &a +} + type billBelongsToResource struct { db *gorm.DB @@ -345,6 +374,11 @@ func (a billBelongsToResource) Model(m *models.Bill) *billBelongsToResourceTx { return &billBelongsToResourceTx{a.db.Model(m).Association(a.Name())} } +func (a billBelongsToResource) Unscoped() *billBelongsToResource { + a.db = a.db.Unscoped() + return &a +} + type billBelongsToResourceTx struct{ tx *gorm.Association } func (a billBelongsToResourceTx) Find() (result *models.Resource, err error) { @@ -383,6 +417,11 @@ func (a billBelongsToResourceTx) Count() int64 { return a.tx.Count() } +func (a billBelongsToResourceTx) Unscoped() *billBelongsToResourceTx { + a.tx = a.tx.Unscoped() + return &a +} + type billDo struct{ gen.DO } func (b billDo) Debug() *billDo { diff --git a/web/queries/channel.gen.go b/web/queries/channel.gen.go index 1821c45..e860d12 100644 --- a/web/queries/channel.gen.go +++ b/web/queries/channel.gen.go @@ -44,6 +44,7 @@ func newChannel(db *gorm.DB, opts ...gen.DOOption) channel { _channel.EdgeHost = field.NewString(tableName, "edge_host") _channel.EdgeID = field.NewInt32(tableName, "edge_id") _channel.Whitelists = field.NewString(tableName, "whitelists") + _channel.ResourceID = field.NewInt32(tableName, "resource_id") _channel.fillFieldMap() @@ -71,6 +72,7 @@ type channel struct { EdgeHost field.String // 节点地址 EdgeID field.Int32 // 节点ID Whitelists field.String // IP白名单,逗号分隔 + ResourceID field.Int32 // 套餐ID fieldMap map[string]field.Expr } @@ -104,6 +106,7 @@ func (c *channel) updateTableName(table string) *channel { c.EdgeHost = field.NewString(table, "edge_host") c.EdgeID = field.NewInt32(table, "edge_id") c.Whitelists = field.NewString(table, "whitelists") + c.ResourceID = field.NewInt32(table, "resource_id") c.fillFieldMap() @@ -120,7 +123,7 @@ func (c *channel) GetFieldByName(fieldName string) (field.OrderExpr, bool) { } func (c *channel) fillFieldMap() { - c.fieldMap = make(map[string]field.Expr, 17) + c.fieldMap = make(map[string]field.Expr, 18) c.fieldMap["id"] = c.ID c.fieldMap["user_id"] = c.UserID c.fieldMap["proxy_id"] = c.ProxyID @@ -138,6 +141,7 @@ func (c *channel) fillFieldMap() { c.fieldMap["edge_host"] = c.EdgeHost c.fieldMap["edge_id"] = c.EdgeID c.fieldMap["whitelists"] = c.Whitelists + c.fieldMap["resource_id"] = c.ResourceID } func (c channel) clone(db *gorm.DB) channel { diff --git a/web/queries/proxy.gen.go b/web/queries/proxy.gen.go index f86413f..f55683a 100644 --- a/web/queries/proxy.gen.go +++ b/web/queries/proxy.gen.go @@ -121,11 +121,14 @@ func (p *proxy) fillFieldMap() { func (p proxy) clone(db *gorm.DB) proxy { p.proxyDo.ReplaceConnPool(db.Statement.ConnPool) + p.Edges.db = db.Session(&gorm.Session{Initialized: true}) + p.Edges.db.Statement.ConnPool = db.Statement.ConnPool return p } func (p proxy) replaceDB(db *gorm.DB) proxy { p.proxyDo.ReplaceDB(db) + p.Edges.db = db.Session(&gorm.Session{}) return p } @@ -162,6 +165,11 @@ func (a proxyHasManyEdges) Model(m *models.Proxy) *proxyHasManyEdgesTx { return &proxyHasManyEdgesTx{a.db.Model(m).Association(a.Name())} } +func (a proxyHasManyEdges) Unscoped() *proxyHasManyEdges { + a.db = a.db.Unscoped() + return &a +} + type proxyHasManyEdgesTx struct{ tx *gorm.Association } func (a proxyHasManyEdgesTx) Find() (result []*models.Edge, err error) { @@ -200,6 +208,11 @@ func (a proxyHasManyEdgesTx) Count() int64 { return a.tx.Count() } +func (a proxyHasManyEdgesTx) Unscoped() *proxyHasManyEdgesTx { + a.tx = a.tx.Unscoped() + return &a +} + type proxyDo struct{ gen.DO } func (p proxyDo) Debug() *proxyDo { diff --git a/web/queries/resource.gen.go b/web/queries/resource.gen.go index 044c2d2..e37f3a8 100644 --- a/web/queries/resource.gen.go +++ b/web/queries/resource.gen.go @@ -121,11 +121,17 @@ func (r *resource) fillFieldMap() { func (r resource) clone(db *gorm.DB) resource { r.resourceDo.ReplaceConnPool(db.Statement.ConnPool) + r.Short.db = db.Session(&gorm.Session{Initialized: true}) + r.Short.db.Statement.ConnPool = db.Statement.ConnPool + r.Long.db = db.Session(&gorm.Session{Initialized: true}) + r.Long.db.Statement.ConnPool = db.Statement.ConnPool return r } func (r resource) replaceDB(db *gorm.DB) resource { r.resourceDo.ReplaceDB(db) + r.Short.db = db.Session(&gorm.Session{}) + r.Long.db = db.Session(&gorm.Session{}) return r } @@ -162,6 +168,11 @@ func (a resourceHasOneShort) Model(m *models.Resource) *resourceHasOneShortTx { return &resourceHasOneShortTx{a.db.Model(m).Association(a.Name())} } +func (a resourceHasOneShort) Unscoped() *resourceHasOneShort { + a.db = a.db.Unscoped() + return &a +} + type resourceHasOneShortTx struct{ tx *gorm.Association } func (a resourceHasOneShortTx) Find() (result *models.ResourceShort, err error) { @@ -200,6 +211,11 @@ func (a resourceHasOneShortTx) Count() int64 { return a.tx.Count() } +func (a resourceHasOneShortTx) Unscoped() *resourceHasOneShortTx { + a.tx = a.tx.Unscoped() + return &a +} + type resourceHasOneLong struct { db *gorm.DB @@ -233,6 +249,11 @@ func (a resourceHasOneLong) Model(m *models.Resource) *resourceHasOneLongTx { return &resourceHasOneLongTx{a.db.Model(m).Association(a.Name())} } +func (a resourceHasOneLong) Unscoped() *resourceHasOneLong { + a.db = a.db.Unscoped() + return &a +} + type resourceHasOneLongTx struct{ tx *gorm.Association } func (a resourceHasOneLongTx) Find() (result *models.ResourceLong, err error) { @@ -271,6 +292,11 @@ func (a resourceHasOneLongTx) Count() int64 { return a.tx.Count() } +func (a resourceHasOneLongTx) Unscoped() *resourceHasOneLongTx { + a.tx = a.tx.Unscoped() + return &a +} + type resourceDo struct{ gen.DO } func (r resourceDo) Debug() *resourceDo { diff --git a/web/router.go b/web/router.go index 9d29de7..e77be75 100644 --- a/web/router.go +++ b/web/router.go @@ -50,6 +50,7 @@ func ApplyRouters(app *fiber.App) { channel.Post("/list", handlers.ListChannels) channel.Post("/create", handlers.CreateChannel) channel.Post("/remove", handlers.RemoveChannels) + channel.Post("/remove/by-task", handlers.RemoveChannelByTask) // 交易 trade := api.Group("/trade") diff --git a/web/services/channel.go b/web/services/channel.go index dfbb577..d110dd0 100644 --- a/web/services/channel.go +++ b/web/services/channel.go @@ -4,13 +4,13 @@ import ( "context" "database/sql" "fmt" + "github.com/hibiken/asynq" "gorm.io/gen/field" "log/slog" "math" "math/rand/v2" "platform/pkg/env" "platform/pkg/u" - "platform/web/auth" channel2 "platform/web/domains/channel" edge2 "platform/web/domains/edge" proxy2 "platform/web/domains/proxy" @@ -19,11 +19,11 @@ import ( "platform/web/globals/orm" m "platform/web/models" q "platform/web/queries" + "platform/web/tasks" "strconv" "strings" "time" - "github.com/gofiber/fiber/v2/middleware/requestid" "github.com/redis/go-redis/v9" ) @@ -32,47 +32,71 @@ var Channel = &channelService{} type channelService struct { } -// region RemoveChannel +// region 删除通道 -func (s *channelService) RemoveChannels(ctx context.Context, authCtx *auth.Context, id ...int32) error { +func (s *channelService) RemoveChannels(id []int32, userId ...int32) error { var now = time.Now() - var rid = ctx.Value(requestid.ConfigDefault.ContextKey).(string) err := q.Q.Transaction(func(tx *q.Query) error { // 查找通道 - channels, err := tx.Channel.Where( - q.Channel.ID.In(id...), - ).Find() + var do = tx.Channel.Where(q.Channel.ID.In(id...)) + if len(userId) > 0 { + do.Where(q.Channel.UserID.Eq(userId[0])) + } + channels, err := tx.Channel.Where(do).Find() if err != nil { return err } - // 检查权限,如果为用户操作的话,则只能删除自己的通道 + proxyMap := make(map[int32]*m.Proxy) + proxyIds := make([]int32, 0) + resourceMap := make(map[int32]*m.Resource) + resourceIds := make([]int32, 0) for _, channel := range channels { - if authCtx.Payload.Type == auth.PayloadUser && authCtx.Payload.Id != channel.UserID { - return ErrRemoveForbidden + if _, ok := proxyMap[channel.ProxyID]; !ok { + proxyIds = append(proxyIds, channel.ProxyID) + proxyMap[channel.ProxyID] = &m.Proxy{} } + if _, ok := resourceMap[channel.ResourceID]; !ok { + resourceIds = append(resourceIds, channel.ResourceID) + resourceMap[channel.ResourceID] = &m.Resource{} + } + } + + // 查找资源 + resources, err := tx.Resource.Where(tx.Resource.ID.In(resourceIds...)).Find() + if err != nil { + return err + } + for _, res := range resources { + resourceMap[res.ID] = res } // 查找代理 - proxySet := make(map[int32]struct{}) - proxyIds := make([]int32, 0) - for _, channel := range channels { - if _, ok := proxySet[channel.ProxyID]; !ok { - proxyIds = append(proxyIds, channel.ProxyID) - proxySet[channel.ProxyID] = struct{}{} - } - } - proxies, err := tx.Proxy.Where( - q.Proxy.ID.In(proxyIds...), - ).Find() + proxies, err := tx.Proxy.Where(q.Proxy.ID.In(proxyIds...)).Find() if err != nil { return err } + for _, proxy := range proxies { + proxyMap[proxy.ID] = proxy + } + + // 区分通道类型 + shortToRemove := make([]*m.Channel, 0) + longToRemove := make([]*m.Channel, 0) + for _, channel := range channels { + resource := resourceMap[channel.ResourceID] + switch resource2.Type(resource.Type) { + case resource2.TypeShort: + shortToRemove = append(shortToRemove, channel) + case resource2.TypeLong: + longToRemove = append(longToRemove, channel) + } + } // 删除指定的通道 - result, err := tx.Channel.Debug(). + result, err := tx.Channel. Where(q.Channel.ID.In(id...)). Update(q.Channel.DeletedAt, now) if err != nil { @@ -86,95 +110,14 @@ func (s *channelService) RemoveChannels(ctx context.Context, authCtx *auth.Conte if env.DebugExternalChange { var step = time.Now() - // 组织数据 - var configMap = make(map[int32][]g.PortConfigsReq, len(proxies)) - var proxyMap = make(map[int32]*m.Proxy, len(proxies)) - for _, proxy := range proxies { - configMap[proxy.ID] = make([]g.PortConfigsReq, 0) - proxyMap[proxy.ID] = proxy - } - var portMap = make(map[uint64]struct{}) - for _, channel := range channels { - var config = g.PortConfigsReq{ - Port: int(channel.ProxyPort), - Edge: &[]string{}, - AutoEdgeConfig: &g.AutoEdgeConfig{ - Count: u.P(0), - }, - Status: false, - } - configMap[channel.ProxyID] = append(configMap[channel.ProxyID], config) - - key := uint64(channel.ProxyID)<<32 | uint64(channel.ProxyPort) - portMap[key] = struct{}{} - } - - slog.Debug("组织数据", "rid", rid, "step", time.Since(step)) - - // 更新配置 - for proxyId, configs := range configMap { - if len(configs) == 0 { - continue - } - proxy, ok := proxyMap[proxyId] - if !ok { - return ChannelServiceErr("代理不存在") - } - - var secret = strings.Split(proxy.Secret, ":") - gateway := g.NewGateway( - proxy.Host, - secret[0], - secret[1], - ) - - // 查询节点配置 - step = time.Now() - - actives, err := gateway.GatewayPortActive() + if len(shortToRemove) > 0 { + err := removeShortChannelExternal(proxies, shortToRemove) if err != nil { return err } - - slog.Debug("查询节点配置", "rid", rid, "step", time.Since(step)) - - // 更新节点配置 - step = time.Now() - - err = gateway.GatewayPortConfigs(configs) - if err != nil { - return err - } - - slog.Debug("更新节点配置", "rid", rid, "step", time.Since(step)) - - // 下线对应节点 - step = time.Now() - - var edges []string - for portStr, active := range actives { - port, err := strconv.Atoi(portStr) - if err != nil { - return err - } - key := uint64(proxyId)<<32 | uint64(port) - if _, ok := portMap[key]; ok { - edges = append(edges, active.Edge...) - } - } - if len(edges) > 0 { - _, err := g.Cloud.CloudDisconnect(g.CloudDisconnectReq{ - Uuid: proxy.Name, - Edge: edges, - }) - if err != nil { - return err - } - } - - slog.Debug("下线对应节点", "rid", rid, "step", time.Since(step)) } + slog.Debug("提交删除通道配置", "step", time.Since(step)) } return nil @@ -186,12 +129,91 @@ func (s *channelService) RemoveChannels(ctx context.Context, authCtx *auth.Conte return nil } +func removeShortChannelExternal(proxies []*m.Proxy, channels []*m.Channel) error { + // 组织数据 + var configMap = make(map[int32][]g.PortConfigsReq, len(proxies)) + var proxyMap = make(map[int32]*m.Proxy, len(proxies)) + for _, proxy := range proxies { + configMap[proxy.ID] = make([]g.PortConfigsReq, 0) + proxyMap[proxy.ID] = proxy + } + var portMap = make(map[uint64]struct{}) + for _, channel := range channels { + var config = g.PortConfigsReq{ + Port: int(channel.ProxyPort), + Edge: &[]string{}, + AutoEdgeConfig: &g.AutoEdgeConfig{ + Count: u.P(0), + }, + Status: false, + } + configMap[channel.ProxyID] = append(configMap[channel.ProxyID], config) + + key := uint64(channel.ProxyID)<<32 | uint64(channel.ProxyPort) + portMap[key] = struct{}{} + } + + // 更新配置 + for proxyId, configs := range configMap { + if len(configs) == 0 { + continue + } + proxy, ok := proxyMap[proxyId] + if !ok { + return ChannelServiceErr("代理不存在") + } + + var secret = strings.Split(proxy.Secret, ":") + gateway := g.NewGateway( + proxy.Host, + secret[0], + secret[1], + ) + + // 查询节点配置 + actives, err := gateway.GatewayPortActive() + if err != nil { + return err + } + + // 更新节点配置 + err = gateway.GatewayPortConfigs(configs) + if err != nil { + return err + } + + // 下线对应节点 + var edges []string + for portStr, active := range actives { + port, err := strconv.Atoi(portStr) + if err != nil { + return err + } + key := uint64(proxyId)<<32 | uint64(port) + if _, ok := portMap[key]; ok { + edges = append(edges, active.Edge...) + } + } + if len(edges) > 0 { + _, err := g.Cloud.CloudDisconnect(g.CloudDisconnectReq{ + Uuid: proxy.Name, + Edge: edges, + }) + if err != nil { + return err + } + } + } + + return nil +} + // endregion -// region CreateChannel +// region 创建通道 func (s *channelService) CreateChannel( - authCtx *auth.Context, + userId int32, resourceId int32, protocol channel2.Protocol, authType ChannelAuthType, @@ -204,10 +226,11 @@ func (s *channelService) CreateChannel( filter = edgeFilter[0] } + var resource *ResourceInfo err = q.Q.Transaction(func(q *q.Query) (err error) { // 查找套餐 - resource, err := findResource(q, resourceId, authCtx.Payload.Id, count, now) + resource, err = findResource(q, resourceId, userId, count, now) if err != nil { return err } @@ -215,7 +238,7 @@ func (s *channelService) CreateChannel( // 查找白名单 var whitelist []string if authType == ChannelAuthTypeIp { - whitelist, err = findWhitelist(q, authCtx.Payload.Id) + whitelist, err = findWhitelist(q, userId) if err != nil { return err } @@ -232,10 +255,10 @@ func (s *channelService) CreateChannel( switch resource.Type { case resource2.TypeShort: config.Expiration = now.Add(time.Duration(resource.Live) * time.Second) - channels, err = assignShortChannels(q, authCtx.Payload.Id, count, config, filter, now) + channels, err = assignShortChannels(q, userId, resourceId, count, config, filter, now) case resource2.TypeLong: config.Expiration = now.Add(time.Duration(resource.Live) * time.Hour) - channels, err = assignLongChannels(q, authCtx.Payload.Id, count, config, filter) + channels, err = assignLongChannels(q, userId, resourceId, count, config, filter) } if err != nil { return err @@ -253,6 +276,27 @@ func (s *channelService) CreateChannel( return nil, err } + // 定时异步删除过期通道 + var duration time.Duration + switch resource.Type { + case resource2.TypeShort: + duration = time.Duration(resource.Live) * time.Second + case resource2.TypeLong: + duration = time.Duration(resource.Live) * time.Minute + } + + var ids = make([]int32, len(channels)) + for i := range channels { + ids[i] = channels[i].ID + } + _, err = g.Asynq.Enqueue( + tasks.NewRemoveChannel(ids), + asynq.ProcessIn(duration), + ) + if err != nil { + return nil, err + } + return channels, nil } @@ -287,7 +331,7 @@ func findResource(q *q.Query, resourceId int32, userId int32, count int, now tim info.DailyLast = time.Time(sub.DailyLast) info.Quota = sub.Quota info.Used = sub.Used - info.Expire = time.Time(sub.DailyLast) + info.Expire = time.Time(sub.Expire) case resource2.TypeLong: var sub = resource.Long info.Mode = resource2.Mode(sub.Type) @@ -297,7 +341,7 @@ func findResource(q *q.Query, resourceId int32, userId int32, count int, now tim info.DailyLast = time.Time(sub.DailyLast) info.Quota = sub.Quota info.Used = sub.Used - info.Expire = time.Time(sub.DailyLast) + info.Expire = time.Time(sub.Expire) } // 检查套餐状态 @@ -353,14 +397,7 @@ func findWhitelist(q *q.Query, userId int32) ([]string, error) { return whitelist, nil } -func assignShortChannels( - q *q.Query, - userId int32, - count int, - config ChannelCreateConfig, - filter EdgeFilterConfig, - now time.Time, -) ([]*m.Channel, error) { +func assignShortChannels(q *q.Query, userId int32, resourceId int32, count int, config ChannelCreateConfig, filter EdgeFilterConfig, now time.Time) ([]*m.Channel, error) { // 查找网关 proxies, err := q.Proxy. @@ -503,6 +540,7 @@ func assignShortChannels( var newChannel = &m.Channel{ UserID: userId, ProxyID: proxy.ID, + ResourceID: resourceId, ProxyHost: proxy.Host, ProxyPort: int32(port), Protocol: int32(config.Protocol), @@ -556,7 +594,7 @@ func assignShortChannels( return newChannels, nil } -func assignLongChannels(q *q.Query, userId int32, count int, config ChannelCreateConfig, filter EdgeFilterConfig) ([]*m.Channel, error) { +func assignLongChannels(q *q.Query, userId int32, resourceId int32, count int, config ChannelCreateConfig, filter EdgeFilterConfig) ([]*m.Channel, error) { // 查询符合条件的节点,根据 channel 统计使用次数 var edges = make([]struct { @@ -631,6 +669,7 @@ func assignLongChannels(q *q.Query, userId int32, count int, config ChannelCreat UserID: userId, ProxyID: edge.ProxyID, EdgeID: edge.ID, + ResourceID: resourceId, Protocol: int32(config.Protocol), AuthIP: config.AuthIp, AuthPass: config.AuthPass, @@ -752,8 +791,6 @@ func saveAssigns(q *q.Query, resource *ResourceInfo, channels []*m.Channel, now return nil } -// endregion - func genPassPair() (string, string) { //goland:noinspection SpellCheckingInspection var alphabet = []rune("abcdefghjkmnpqrstuvwxyz") @@ -773,6 +810,8 @@ func genPassPair() (string, string) { return string(username), string(password) } +// endregion + type ChannelAuthType int const ( @@ -820,6 +859,5 @@ const ( ErrResourceExhausted = ChannelServiceErr("套餐已用完") ErrResourceExpired = ChannelServiceErr("套餐已过期") ErrResourceDailyLimit = ChannelServiceErr("套餐每日配额已用完") - ErrRemoveForbidden = ChannelServiceErr("删除通道失败,当前用户没有权限") ErrEdgesNoAvailable = ChannelServiceErr("没有可用的节点") ) diff --git a/web/tasks/channel.go b/web/tasks/channel.go new file mode 100644 index 0000000..1714c29 --- /dev/null +++ b/web/tasks/channel.go @@ -0,0 +1,18 @@ +package tasks + +import ( + "encoding/json" + "github.com/hibiken/asynq" + "log/slog" +) + +const RemoveChannel = "channel:remove" + +func NewRemoveChannel(ids []int32) *asynq.Task { + bytes, err := json.Marshal(ids) + if err != nil { + slog.Error("序列化删除通道任务失败", "error", err) + return nil + } + return asynq.NewTask(RemoveChannel, bytes) +} diff --git a/web/web.go b/web/web.go index 2b3016f..7db773b 100644 --- a/web/web.go +++ b/web/web.go @@ -94,6 +94,8 @@ func (s *Server) Stop() { slog.Error("Failed to close database connection", slog.Any("err", err)) } + g.CloseAsynq() + err = s.fiber.Shutdown() if err != nil { slog.Error("Failed to shutdown server", slog.Any("err", err))