diff --git a/README.md b/README.md index 88bfadb..4bfbc50 100644 --- a/README.md +++ b/README.md @@ -1,6 +1,6 @@ ## TODO -考虑一个方案限制接口请求速率,无侵入更好 +优化中间件,配置通用限速 trade/create 性能问题,缩短事务时间,考虑其他方式实现可靠分布式事务 @@ -8,20 +8,17 @@ jsonb 类型转换问题,考虑一个高效的 any 到 struct 转换工具 端口资源池的 gc 实现 +channel 服务代码结构,用 provider 代替整个 service 的复用 -标准化生产环境 cors 配置 +用反射实现环境变量解析,以简化函数签名 -底层调用集成 otel -- redis -- gorm -- 三方接口 +--- 分离 task 的客户端,支持多进程(prefork 必要!) 调整目录结构: ``` -- /core 核心概念 - /util 工具函数 - /models 模型 @@ -31,7 +28,7 @@ jsonb 类型转换问题,考虑一个高效的 any 到 struct 转换工具 - /services 服务层 - /auth 认证相关,特化服务 -- /app 应用相关,初始化日志,环境变量等 +- /app 应用相关,初始化日志,环境变量,错误类型等 - /http 协议层,http 服务 - /cmd 主函数 @@ -40,21 +37,15 @@ cmd 调用 app, http 的初始化函数 http 调用 clients 的初始化函数 ``` -开号流程事务化 +--- -开号: -- 提交关闭任务 -- 保存数据 -- 开通端口 - -过期: -- 接口 +慢速请求底层调用埋点监控 - redis +- gorm +- 三方接口 冷数据迁移方案 -proxy 网关更新接口可以传输更结构化的数据,直接区分不同类型以加快更新速度 - ## 业务逻辑 ### 订单关闭的几种方式 @@ -73,22 +64,18 @@ proxy 网关更新接口可以传输更结构化的数据,直接区分不同 ### 节点分配与存储逻辑 -添加: -- 检查用户 ip 是否在白名单内 -- 取用端口,不够则返回失败 -- 将分配结果转写成配置发送到网关 -- 保存通道信息和分配记录,其中通道信息以网关为主体,分配记录以用户为主体 -- 添加异步任务,当时间结束后释放取用的端口并清空网关配置 +提取: +- 检查用户套餐与白名单 +- 选中代理 + - 找到当前可用端口最多的代理 + - 不考虑分割端口,不够加机器 +- 获取可用端口 +- 获取可用节点 +- 生成批次号,提交到期释放任务 +- 绑定节点与端口,保存到数据库 +- 分别提交连接与配置请求 -删除: -- 如果传入用户信息,检查要删除的连接是否属于该用户 -- 释放可用端口 - - redis 脚本中检查,如果端口所属节点已下线则直接忽略 -- 提交清空配置到网关 - -缩扩容: -- 通过调度任务实现缩扩容 -- 每分钟检查一次全部配置,按代理分组 -- 获取所有代理后备配置 -- 后备配置/当前配置 - - 当比例 < 1.5 或 > 3 时,重新更新为 2 倍 +释放: +- 根据批次查出所有端口与相关节点 +- 分别提交断开与关闭请求 +- 释放端口 diff --git a/pkg/env/env.go b/pkg/env/env.go index 1dc4d4b..0bd0a1d 100644 --- a/pkg/env/env.go +++ b/pkg/env/env.go @@ -36,7 +36,7 @@ var ( RedisPort = "6379" RedisPassword = "" - BaiyinAddr = "http://103.139.212.110:9989" + BaiyinCloudUrl string BaiyinTokenUrl string IdenCallbackUrl string @@ -115,7 +115,7 @@ func Init() { errs = append(errs, parse(&RedisPort, "REDIS_PORT", true, nil)) errs = append(errs, parse(&RedisPassword, "REDIS_PASS", true, nil)) - errs = append(errs, parse(&BaiyinAddr, "BAIYIN_ADDR", true, nil)) + errs = append(errs, parse(&BaiyinCloudUrl, "BAIYIN_CLOUD_URL", false, nil)) errs = append(errs, parse(&BaiyinTokenUrl, "BAIYIN_TOKEN_URL", false, nil)) errs = append(errs, parse(&IdenCallbackUrl, "IDEN_CALLBACK_URL", false, nil)) diff --git a/pkg/u/u.go b/pkg/u/u.go index bebadea..f99e2a8 100644 --- a/pkg/u/u.go +++ b/pkg/u/u.go @@ -25,6 +25,15 @@ func ElseTo[A any, B any](a *A, f func(A) B) *B { } } +// 三元表达式 +func Ternary[T any](condition bool, trueValue T, falseValue T) T { + if condition { + return trueValue + } else { + return falseValue + } +} + // ==================== // 指针 // ==================== diff --git a/scripts/sql/init.sql b/scripts/sql/init.sql index 504ba1e..ec4be59 100644 --- a/scripts/sql/init.sql +++ b/scripts/sql/init.sql @@ -497,12 +497,13 @@ comment on column link_client_permission.permission_id is '权限ID'; drop table if exists proxy cascade; create table proxy ( id int generated by default as identity primary key, - version int not null, - mac text not null, - ip inet not null, + version int not null, + mac text not null, + ip inet not null, + host text, secret text, - type int not null, - status int not null, + type int not null, + status int not null, meta jsonb, created_at timestamptz default current_timestamp, updated_at timestamptz default current_timestamp, @@ -518,8 +519,9 @@ comment on column proxy.id is '代理服务ID'; comment on column proxy.version is '代理服务版本'; comment on column proxy.mac is '代理服务名称'; comment on column proxy.ip is '代理服务地址'; -comment on column proxy.type is '代理服务类型:1-自有,2-白银'; +comment on column proxy.host is '代理服务域名'; comment on column proxy.secret is '代理服务密钥'; +comment on column proxy.type is '代理服务类型:1-自有,2-白银'; comment on column proxy.status is '代理服务状态:0-离线,1-在线'; comment on column proxy.meta is '代理服务元信息'; comment on column proxy.created_at is '创建时间'; @@ -600,8 +602,10 @@ create table channel ( resource_id int not null, batch_no text not null, proxy_id int not null, + host text not null, port int not null, edge_id int, + edge_ref text, filter_isp int, filter_prov text, filter_city text, @@ -626,8 +630,10 @@ comment on column channel.user_id is '用户ID'; comment on column channel.resource_id is '套餐ID'; comment on column channel.batch_no is '批次编号'; comment on column channel.proxy_id is '代理ID'; +comment on column channel.host is '代理主机(快照)'; comment on column channel.port is '代理端口'; comment on column channel.edge_id is '节点ID(手动配置)'; +comment on column channel.edge_ref is '外部节点引用,用于索引没有ID的外部非受控节点'; comment on column channel.filter_isp is '运营商过滤(自动配置):参考 edge.isp'; comment on column channel.filter_prov is '省份过滤(自动配置)'; comment on column channel.filter_city is '城市过滤(自动配置)'; diff --git a/web/core/http.go b/web/core/http.go index 1dadb22..b26a174 100644 --- a/web/core/http.go +++ b/web/core/http.go @@ -1,5 +1,17 @@ package core +import ( + "fmt" + "net/http" + "net/http/httputil" + "net/url" + "platform/pkg/env" + "platform/pkg/u" + "reflect" + "strconv" + "strings" +) + // PageReq 分页请求参数 type PageReq struct { RawPage int `json:"page"` @@ -38,3 +50,83 @@ type PageResp struct { Size int `json:"size"` List any `json:"list"` } + +// Fetch 发送HTTP请求并返回响应 +func Fetch(req *http.Request) (*http.Response, error) { + if env.DebugHttpDump { + str, err := httputil.DumpRequest(req, true) + if err != nil { + return nil, err + } + fmt.Printf("===== REQUEST ===== %s\n", req.URL) + fmt.Println(string(str)) + } + + resp, err := http.DefaultClient.Do(req) + if err != nil { + return nil, err + } + + if env.DebugHttpDump { + str, err := httputil.DumpResponse(resp, true) + if err != nil { + return nil, err + } + fmt.Printf("===== RESPONSE ===== %s\n", req.URL) + fmt.Println(string(str)) + } + + return resp, nil +} + +func Query(in any) url.Values { + out := url.Values{} + + if in == nil { + return out + } + + ref := reflect.ValueOf(in) + if ref.Kind() == reflect.Pointer { + ref = ref.Elem() + } + + if ref.Kind() != reflect.Struct { + return out + } + + for i := 0; i < ref.NumField(); i++ { + field := ref.Type().Field(i) + value := ref.Field(i) + + if field.Type.Kind() == reflect.Pointer { + if value.IsNil() { + continue + } + value = value.Elem() + } + + name := field.Name + tags := strings.Split(field.Tag.Get("query"), ",") + if len(tags) > 0 && tags[0] != "" { + name = tags[0] + } + + switch value := value.Interface().(type) { + case string: + out.Add(name, url.QueryEscape(value)) + case int: + out.Add(name, strconv.Itoa(value)) + case bool: + if tags[1] == "b2i" { + out.Add(name, u.Ternary(value, "1", "0")) + } else { + out.Add(name, strconv.FormatBool(value)) + } + default: + out.Add(name, fmt.Sprintf("%v", value)) + } + } + + return out +} diff --git a/web/globals/baiyin.go b/web/globals/baiyin.go index ca25807..570807e 100644 --- a/web/globals/baiyin.go +++ b/web/globals/baiyin.go @@ -10,6 +10,7 @@ import ( "net/http/httputil" "net/url" "platform/pkg/env" + "platform/web/core" "strconv" "strings" "time" @@ -17,18 +18,12 @@ import ( // CloudClient 定义云服务接口 type CloudClient interface { - CloudEdges(param CloudEdgesReq) (*CloudEdgesResp, error) - CloudConnect(param CloudConnectReq) error - CloudDisconnect(param CloudDisconnectReq) (int, error) + CloudEdges(param *CloudEdgesReq) (*CloudEdgesResp, error) + CloudConnect(param *CloudConnectReq) error + CloudDisconnect(param *CloudDisconnectReq) (int, error) CloudAutoQuery() (CloudConnectResp, error) } -// GatewayClient 定义网关接口 -type GatewayClient interface { - GatewayPortConfigs(params []PortConfigsReq) error - GatewayPortActive(param ...PortActiveReq) (map[string]PortData, error) -} - type cloud struct { url string } @@ -37,59 +32,14 @@ var Cloud CloudClient func initBaiyin() error { Cloud = &cloud{ - url: env.BaiyinAddr, + url: env.BaiyinCloudUrl, } return nil } -type AutoConfig struct { - Province string `json:"province"` - City string `json:"city"` - Isp string `json:"isp"` - Count int `json:"count"` -} - -// region cloud:/edges - -type CloudEdgesReq struct { - Province string - City string - Isp string - Offset int - Limit int -} - -type CloudEdgesResp struct { - Edges []Edge `json:"edges"` - Total int `json:"total"` - Offset int `json:"offset"` - Limit int `json:"limit"` -} - -type Edge struct { - EdgesId int `json:"edges_id"` - Province string `json:"province"` - City string `json:"city"` - Isp string `json:"isp"` - Ip string `json:"ip"` - Rtt int `json:"rtt"` - PacketLoss int `json:"packet_loss"` -} - -func (c *cloud) CloudEdges(param CloudEdgesReq) (*CloudEdgesResp, error) { - data := strings.Builder{} - data.WriteString("province=") - data.WriteString(param.Province) - data.WriteString("&city=") - data.WriteString(param.City) - data.WriteString("&isp=") - data.WriteString(param.Isp) - data.WriteString("&offset=") - data.WriteString(strconv.Itoa(param.Offset)) - data.WriteString("&limit=") - data.WriteString(strconv.Itoa(param.Limit)) - - resp, err := c.requestCloud("GET", "/edges?"+data.String(), "") +// cloud:/edges 筛选查询边缘节点 +func (c *cloud) CloudEdges(param *CloudEdgesReq) (*CloudEdgesResp, error) { + resp, err := c.requestCloud("GET", "/edges?"+core.Query(param).Encode(), "") if err != nil { return nil, err } @@ -115,17 +65,46 @@ func (c *cloud) CloudEdges(param CloudEdgesReq) (*CloudEdgesResp, error) { return &result, nil } -// endregion - -// region cloud:/connect - -type CloudConnectReq struct { - Uuid string `json:"uuid"` - Edge []string `json:"edge,omitempty"` - AutoConfig []AutoConfig `json:"auto_config,omitempty"` +type CloudEdgesReq struct { + Province *string `query:"province"` + City *string `query:"city"` + Isp *string `query:"isp"` + Offset *int `query:"offset"` + Limit *int `query:"limit"` + NoRepeat *bool `query:"norepeat,b2i"` + NoDayRepeat *bool `query:"nodayrepeat,b2i"` + IpUnchangedTime *int `query:"ip_unchanged_time"` // 单位秒 + ActiveTime *int `query:"active_time"` // 单位秒 + // 排序方式,可选值: + // - create_time_asc 设备创建时间顺序 + // - create_time_desc 设备创建时间倒序 + // - ip_unchanged_time_asc ip持续没变化时间顺序 + // - ip_unchanged_time_desc ip持续没变化时间倒序 + // - active_time_asc 连续活跃时间顺序 + // - active_time_desc 连续活跃时间倒序 + // - rand 随机排序 (默认) + Sort *string `query:"sort"` } -func (c *cloud) CloudConnect(param CloudConnectReq) error { +type CloudEdgesResp struct { + Edges []Edge `json:"edges"` + Total int `json:"total"` + Offset int `json:"offset"` + Limit int `json:"limit"` +} + +type Edge struct { + EdgeID string `json:"edge_id"` + Province string `json:"province"` + City string `json:"city"` + Isp string `json:"isp"` + Ip string `json:"ip"` + Rtt int `json:"rtt"` + PacketLoss int `json:"packet_loss"` +} + +// cloud:/connect 连接边缘节点到网关 +func (c *cloud) CloudConnect(param *CloudConnectReq) error { data, err := json.Marshal(param) if err != nil { return err @@ -162,25 +141,21 @@ func (c *cloud) CloudConnect(param CloudConnectReq) error { return nil } -// endregion - -// region cloud:/disconnect - -type CloudDisconnectReq struct { - Uuid string `json:"uuid"` - Edge []string `json:"edge,omitempty"` - Config []Config `json:"auto_config,omitempty"` +type CloudConnectReq struct { + Uuid string `json:"uuid"` + Edge *[]string `json:"edge,omitempty"` + AutoConfig *[]AutoConfig `json:"auto_config,omitempty"` } -type Config struct { +type AutoConfig struct { Province string `json:"province"` City string `json:"city"` Isp string `json:"isp"` Count int `json:"count"` - Online bool `json:"online"` } -func (c *cloud) CloudDisconnect(param CloudDisconnectReq) (int, error) { +// cloud:/disconnect 解除连接边缘节点到网关 +func (c *cloud) CloudDisconnect(param *CloudDisconnectReq) (int, error) { data, err := json.Marshal(param) if err != nil { return 0, err @@ -217,12 +192,21 @@ func (c *cloud) CloudDisconnect(param CloudDisconnectReq) (int, error) { return int(result["disconnected_edges"].(float64)), nil } -// endregion +type CloudDisconnectReq struct { + Uuid string `json:"uuid"` + Edge *[]string `json:"edge,omitempty"` + Config *[]Config `json:"auto_config,omitempty"` +} -// region cloud:/auto_query - -type CloudConnectResp map[string][]AutoConfig +type Config struct { + Province string `json:"province"` + City string `json:"city"` + Isp string `json:"isp"` + Count int `json:"count"` + Online bool `json:"online"` +} +// cloud:/auto_query 自动连接配置查询 func (c *cloud) CloudAutoQuery() (CloudConnectResp, error) { resp, err := c.requestCloud("GET", "/auto_query", "") if err != nil { @@ -250,7 +234,7 @@ func (c *cloud) CloudAutoQuery() (CloudConnectResp, error) { return result, nil } -// endregion +type CloudConnectResp map[string][]AutoConfig func (c *cloud) requestCloud(method string, url string, data string) (*http.Response, error) { @@ -263,7 +247,7 @@ func (c *cloud) requestCloud(method string, url string, data string) (*http.Resp req.Header.Set("Content-Type", "application/json") var resp *http.Response - for i := 0; i < 2; i++ { + for i := range 2 { token, err := c.token(i == 1) if err != nil { return nil, err @@ -304,7 +288,7 @@ func (c *cloud) requestCloud(method string, url string, data string) (*http.Resp func (c *cloud) token(refresh bool) (string, error) { // redis 获取令牌 if !refresh { - token, err := Redis.Get(context.Background(), "remote:token").Result() + token, err := Redis.Get(context.Background(), BaiyinToken).Result() if err == nil && token != "" { return token, nil } @@ -338,7 +322,7 @@ func (c *cloud) token(refresh bool) (string, error) { var result map[string]any err = json.Unmarshal(body, &result) if err != nil { - return "", err + return "", fmt.Errorf("解析响应 [%s] 失败: %w", string(body), err) } if result["code"].(float64) != 1 { @@ -347,7 +331,7 @@ func (c *cloud) token(refresh bool) (string, error) { // redis 设置令牌 token := result["token"].(string) - err = Redis.Set(context.Background(), "remote:token", token, 1*time.Hour).Err() + err = Redis.Set(context.Background(), BaiyinToken, token, 1*time.Hour).Err() if err != nil { return "", err } @@ -355,6 +339,15 @@ func (c *cloud) token(refresh bool) (string, error) { return token, nil } +const BaiyinToken = "clients:baiyin:token" + +// GatewayClient 定义网关接口 +type GatewayClient interface { + GatewayPortConfigs(params []*PortConfigsReq) error + GatewayPortActive(param ...*PortActiveReq) (map[string]PortData, error) + GatewayEdge(params *GatewayEdgeReq) (map[string]GatewayEdgeInfo, error) +} + type gateway struct { url string username string @@ -373,6 +366,68 @@ func NewGateway(url, username, password string) GatewayClient { return GatewayInitializer(url, username, password) } +type GatewayEdgeReq struct { + EdgeID *string `query:"edge_id"` + Province *string `query:"province"` + City *string `query:"city"` + Isp *string `query:"isp"` + Connected *bool `query:"connected"` + Assigned *bool `query:"assigned"` + GetRand *int `query:"getRand"` + IpUnchangedTimeStart *int `query:"ip_unchanged_time_start"` + IpUnchangedTimeEnd *int `query:"ip_unchanged_time_end"` + OnlineTimeStart *int `query:"online_time_start"` + OnlineTimeEnd *int `query:"online_time_end"` + Rtt *int `query:"rtt"` + MinRtt *int `query:"min_rtt"` + RttBaidu *int `query:"rtt_baidu"` + PacketLoss *int `query:"packet_loss"` + PacketLossBaidu *int `query:"packet_loss_baidu"` + IP *string `query:"ip"` + Limit *int `query:"limit"` + Offset *int `query:"offset"` +} + +type GatewayEdgeResp struct { + Code int `json:"code"` + Msg string `json:"msg"` + Data map[string]GatewayEdgeInfo `json:"data"` + Total int `json:"total"` +} + +type GatewayEdgeInfo struct { + IP string `json:"ip"` + Connected bool `json:"connected"` + Assigned bool `json:"assigned"` + AssignedTo string `json:"assignedto"` + PacketLoss int `json:"packet_loss"` + PacketLossBaidu int `json:"packet_loss_baidu"` + Rtt int `json:"rtt"` + RttBaidu int `json:"rtt_baidu"` + OfflineTime int `json:"offline_time"` + OnlineTime int `json:"online_time"` + IpUnchangedTime int `json:"ip_unchanged_time"` +} + +func (c *gateway) GatewayEdge(req *GatewayEdgeReq) (map[string]GatewayEdgeInfo, error) { + resp, err := c.get("/edge", core.Query(req)) + if err != nil { + return nil, fmt.Errorf("查询可用节点失败:%w", err) + } + defer resp.Body.Close() + + body := new(GatewayEdgeResp) + if err = json.NewDecoder(resp.Body).Decode(body); err != nil { + return nil, fmt.Errorf("解析响应内容失败:%w", err) + } + + if body.Code != 0 { + return nil, fmt.Errorf("接口业务响应异常: %d %s", body.Code, body.Msg) + } + + return body.Data, nil +} + // region gateway:/port/configs type PortConfigsReq struct { @@ -395,7 +450,7 @@ type AutoEdgeConfig struct { PacketLoss int `json:"packet_loss,omitempty"` } -func (c *gateway) GatewayPortConfigs(params []PortConfigsReq) error { +func (c *gateway) GatewayPortConfigs(params []*PortConfigsReq) error { if len(params) == 0 { return errors.New("params is empty") } @@ -461,10 +516,10 @@ type PortData struct { Userpass string `json:"userpass"` } -func (c *gateway) GatewayPortActive(param ...PortActiveReq) (map[string]PortData, error) { +func (c *gateway) GatewayPortActive(param ...*PortActiveReq) (map[string]PortData, error) { _param := PortActiveReq{} - if len(param) != 0 { - _param = param[0] + if len(param) != 0 && param[0] != nil { + _param = *param[0] } path := strings.Builder{} @@ -520,38 +575,33 @@ func (c *gateway) GatewayPortActive(param ...PortActiveReq) (map[string]PortData // endregion +func (c *gateway) get(url string, params url.Values) (*http.Response, error) { + url = fmt.Sprintf("http://%s:%s@%s:9990%s?%s", c.username, c.password, c.url, url, params.Encode()) + req, err := http.NewRequest(http.MethodGet, url, nil) + if err != nil { + return nil, fmt.Errorf("创建请求失败:%w", err) + } + + res, err := core.Fetch(req) + if err != nil { + return nil, fmt.Errorf("获取数据失败:%w", err) + } + + if res.StatusCode != http.StatusOK { + bytes, _ := io.ReadAll(res.Body) + return nil, fmt.Errorf("接口响应异常: %d %s", res.StatusCode, string(bytes)) + } + + return res, nil +} + func (c *gateway) requestGateway(method string, url string, data string) (*http.Response, error) { - //goland:noinspection ALL url = fmt.Sprintf("http://%s:%s@%s:9990%s", c.username, c.password, c.url, url) req, err := http.NewRequest(method, url, strings.NewReader(data)) if err != nil { return nil, err } - req.Header.Set("Content-Type", "application/json") - if env.DebugHttpDump { - str, err := httputil.DumpRequest(req, true) - if err != nil { - return nil, err - } - fmt.Println("==============================") - fmt.Println(string(str)) - } - - resp, err := http.DefaultClient.Do(req) - if err != nil { - return nil, err - } - - if env.DebugHttpDump { - str, err := httputil.DumpResponse(resp, true) - if err != nil { - return nil, err - } - fmt.Println("------------------------------") - fmt.Println(string(str)) - } - - return resp, nil + return core.Fetch(req) } diff --git a/web/handlers/channel.go b/web/handlers/channel.go index b188c92..0e3881c 100644 --- a/web/handlers/channel.go +++ b/web/handlers/channel.go @@ -57,7 +57,6 @@ func ListChannels(c *fiber.Ctx) error { // 查询数据 channels, err := q.Channel. - Preload(q.Channel.Proxy). Where(cond). Order(q.Channel.CreatedAt.Desc()). Offset(req.GetOffset()). @@ -144,7 +143,7 @@ func CreateChannel(c *fiber.Ctx) error { for i, channel := range result { resp[i] = &CreateChannelRespItem{ Proto: req.Protocol, - Host: channel.Proxy.IP.String(), + Host: channel.Host, Port: channel.Port, } if req.AuthType == s.ChannelAuthTypePass { diff --git a/web/middlewares.go b/web/middlewares.go index 3be700a..2b4a45d 100644 --- a/web/middlewares.go +++ b/web/middlewares.go @@ -5,7 +5,6 @@ import ( "github.com/gofiber/contrib/otelfiber/v2" "github.com/gofiber/fiber/v2" - "github.com/gofiber/fiber/v2/middleware/cors" "github.com/gofiber/fiber/v2/middleware/logger" "github.com/gofiber/fiber/v2/middleware/recover" "github.com/gofiber/fiber/v2/middleware/requestid" @@ -38,9 +37,6 @@ func ApplyMiddlewares(app *fiber.App) { }, })) - // cors - app.Use(cors.New()) - // authenticate app.Use(auth.Authenticate()) } diff --git a/web/models/bill.go b/web/models/bill.go index fc64d7f..9f31e24 100644 --- a/web/models/bill.go +++ b/web/models/bill.go @@ -18,10 +18,10 @@ type Bill struct { Type BillType `json:"type" gorm:"column:type"` // 账单类型:1-消费,2-退款,3-充值 Amount decimal.Decimal `json:"amount" gorm:"column:amount"` // 账单金额 - User *User `json:"user" gorm:"foreignKey:UserID"` - Trade *Trade `json:"trade" gorm:"foreignKey:TradeID"` - Resource *Resource `json:"resource" gorm:"foreignKey:ResourceID"` - Refund *Refund `json:"refund" gorm:"foreignKey:RefundID"` + User *User `json:"user,omitempty" gorm:"foreignKey:UserID"` + Trade *Trade `json:"trade,omitempty" gorm:"foreignKey:TradeID"` + Resource *Resource `json:"resource,omitempty" gorm:"foreignKey:ResourceID"` + Refund *Refund `json:"refund,omitempty" gorm:"foreignKey:RefundID"` } // BillType 账单类型枚举 diff --git a/web/models/channel.go b/web/models/channel.go index 460ef8b..a40522d 100644 --- a/web/models/channel.go +++ b/web/models/channel.go @@ -11,10 +11,12 @@ type Channel struct { core.Model UserID int32 `json:"user_id" gorm:"column:user_id"` // 用户ID ResourceID int32 `json:"resource_id" gorm:"column:resource_id"` // 套餐ID - ProxyID int32 `json:"proxy_id" gorm:"column:proxy_id"` // 代理ID BatchNo string `json:"batch_no" gorm:"column:batch_no"` // 批次编号 + ProxyID int32 `json:"proxy_id" gorm:"column:proxy_id"` // 代理ID + Host string `json:"host" gorm:"column:host"` // 代理主机 Port uint16 `json:"port" gorm:"column:port"` // 代理端口 EdgeID *int32 `json:"edge_id" gorm:"column:edge_id"` // 节点ID(手动配置) + EdgeRef *string `json:"edge_ref" gorm:"column:edge_ref"` // 外部节点引用,用于索引没有ID的外部非受控节点 FilterISP *EdgeISP `json:"filter_isp" gorm:"column:filter_isp"` // 运营商过滤(自动配置):参考 edge.isp FilterProv *string `json:"filter_prov" gorm:"column:filter_prov"` // 省份过滤(自动配置) FilterCity *string `json:"filter_city" gorm:"column:filter_city"` // 城市过滤(自动配置) @@ -24,8 +26,8 @@ type Channel struct { Password *string `json:"password" gorm:"column:password"` // 密码 ExpiredAt time.Time `json:"expired_at" gorm:"column:expired_at"` // 过期时间 - User User `json:"user" gorm:"foreignKey:UserID"` - Resource Resource `json:"resource" gorm:"foreignKey:ResourceID"` - Proxy Proxy `json:"proxy" gorm:"foreignKey:ProxyID"` - Edge *Edge `json:"edge" gorm:"foreignKey:EdgeID"` + User *User `json:"user,omitempty" gorm:"foreignKey:UserID"` + Resource *Resource `json:"resource,omitempty" gorm:"foreignKey:ResourceID"` + Proxy *Proxy `json:"proxy,omitempty" gorm:"foreignKey:ProxyID"` + Edge *Edge `json:"edge,omitempty" gorm:"foreignKey:EdgeID"` } diff --git a/web/models/logs_login.go b/web/models/logs_login.go index d76ef79..fdc779b 100644 --- a/web/models/logs_login.go +++ b/web/models/logs_login.go @@ -16,7 +16,7 @@ type LogsLogin struct { UserID *int32 `json:"user_id" gorm:"column:user_id"` // 用户ID Time time.Time `json:"time" gorm:"column:time"` // 登录时间 - User *User `json:"user" gorm:"foreignKey:UserID"` + User *User `json:"user,omitempty" gorm:"foreignKey:UserID"` } // GrantType 授权类型枚举 diff --git a/web/models/logs_request.go b/web/models/logs_request.go index cd6c643..ba5d984 100644 --- a/web/models/logs_request.go +++ b/web/models/logs_request.go @@ -19,6 +19,6 @@ type LogsRequest struct { Time time.Time `json:"time" gorm:"column:time"` // 请求时间 Latency string `json:"latency" gorm:"column:latency"` // 请求延迟 - User *User `json:"user" gorm:"foreignKey:UserID"` - Client *Client `json:"client" gorm:"foreignKey:ClientID"` + User *User `json:"user,omitempty" gorm:"foreignKey:UserID"` + Client *Client `json:"client,omitempty" gorm:"foreignKey:ClientID"` } diff --git a/web/models/permission.go b/web/models/permission.go index 7f07191..7ec326e 100644 --- a/web/models/permission.go +++ b/web/models/permission.go @@ -9,6 +9,6 @@ type Permission struct { Name string `json:"name" gorm:"column:name"` // 权限名称 Description *string `json:"description" gorm:"column:description"` // 权限描述 - Parent *Permission `json:"parent" gorm:"foreignKey:ParentID"` - Children []*Permission `json:"children" gorm:"foreignKey:ParentID"` + Parent *Permission `json:"parent,omitempty" gorm:"foreignKey:ParentID"` + Children []*Permission `json:"children,omitempty" gorm:"foreignKey:ParentID"` } diff --git a/web/models/proxy.go b/web/models/proxy.go index 52d9bda..83da91f 100644 --- a/web/models/proxy.go +++ b/web/models/proxy.go @@ -13,12 +13,13 @@ type Proxy struct { Version int32 `json:"version" gorm:"column:version"` // 代理服务版本 Mac string `json:"mac" gorm:"column:mac"` // 代理服务名称 IP orm.Inet `json:"ip" gorm:"column:ip;not null"` // 代理服务地址 + Host *string `json:"host" gorm:"column:host"` // 代理服务域名 Secret *string `json:"secret" gorm:"column:secret"` // 代理服务密钥 Type ProxyType `json:"type" gorm:"column:type"` // 代理服务类型:1-自有,2-白银 Status ProxyStatus `json:"status" gorm:"column:status"` // 代理服务状态:0-离线,1-在线 Meta *datatypes.JSONType[any] `json:"meta" gorm:"column:meta"` // 代理服务元信息 - Channels []Channel `json:"channels" gorm:"foreignkey:ProxyID"` + Channels []Channel `json:"channels,omitempty" gorm:"foreignkey:ProxyID"` } // ProxyType 代理服务类型枚举 diff --git a/web/models/resource.go b/web/models/resource.go index 1b76cee..cb4f7b4 100644 --- a/web/models/resource.go +++ b/web/models/resource.go @@ -12,9 +12,9 @@ type Resource struct { Active bool `json:"active" gorm:"column:active"` // 套餐状态 Type ResourceType `json:"type" gorm:"column:type"` // 套餐类型:1-短效动态,2-长效动态 - User User `json:"user" gorm:"foreignKey:UserID"` - Short *ResourceShort `json:"short" gorm:"foreignKey:ResourceID"` - Long *ResourceLong `json:"long" gorm:"foreignKey:ResourceID"` + User *User `json:"user,omitempty" gorm:"foreignKey:UserID"` + Short *ResourceShort `json:"short,omitempty" gorm:"foreignKey:ResourceID"` + Long *ResourceLong `json:"long,omitempty" gorm:"foreignKey:ResourceID"` } // ResourceType 套餐类型枚举 diff --git a/web/models/session.go b/web/models/session.go index f734c50..2529d68 100644 --- a/web/models/session.go +++ b/web/models/session.go @@ -20,7 +20,7 @@ type Session struct { RefreshTokenExpires *time.Time `json:"refresh_token_expires" gorm:"column:refresh_token_expires"` // 刷新令牌过期时间 Scopes *string `json:"scopes" gorm:"column:scopes"` // 权限范围 - User *User `json:"user" gorm:"foreignKey:UserID"` - Admin *Admin `json:"admin" gorm:"foreignKey:AdminID"` - Client *Client `json:"client" gorm:"foreignKey:ClientID;belongsTo:ID"` + User *User `json:"user,omitempty" gorm:"foreignKey:UserID"` + Admin *Admin `json:"admin,omitempty" gorm:"foreignKey:AdminID"` + Client *Client `json:"client,omitempty" gorm:"foreignKey:ClientID;belongsTo:ID"` } diff --git a/web/models/user.go b/web/models/user.go index 97360be..1e4768c 100644 --- a/web/models/user.go +++ b/web/models/user.go @@ -29,7 +29,7 @@ type User struct { LastLoginIP *orm.Inet `json:"last_login_ip" gorm:"column:last_login_ip"` // 最后登录地址 LastLoginUA *string `json:"last_login_ua" gorm:"column:last_login_ua"` // 最后登录代理 - Admin Admin `json:"admin" gorm:"foreignKey:AdminID"` + Admin *Admin `json:"admin,omitempty" gorm:"foreignKey:AdminID"` } // UserStatus 用户状态枚举 diff --git a/web/queries/channel.gen.go b/web/queries/channel.gen.go index 9f807be..9ef0390 100644 --- a/web/queries/channel.gen.go +++ b/web/queries/channel.gen.go @@ -33,10 +33,12 @@ func newChannel(db *gorm.DB, opts ...gen.DOOption) channel { _channel.DeletedAt = field.NewField(tableName, "deleted_at") _channel.UserID = field.NewInt32(tableName, "user_id") _channel.ResourceID = field.NewInt32(tableName, "resource_id") - _channel.ProxyID = field.NewInt32(tableName, "proxy_id") _channel.BatchNo = field.NewString(tableName, "batch_no") + _channel.ProxyID = field.NewInt32(tableName, "proxy_id") + _channel.Host = field.NewString(tableName, "host") _channel.Port = field.NewUint16(tableName, "port") _channel.EdgeID = field.NewInt32(tableName, "edge_id") + _channel.EdgeRef = field.NewString(tableName, "edge_ref") _channel.FilterISP = field.NewInt(tableName, "filter_isp") _channel.FilterProv = field.NewString(tableName, "filter_prov") _channel.FilterCity = field.NewString(tableName, "filter_city") @@ -141,10 +143,12 @@ type channel struct { DeletedAt field.Field UserID field.Int32 ResourceID field.Int32 - ProxyID field.Int32 BatchNo field.String + ProxyID field.Int32 + Host field.String Port field.Uint16 EdgeID field.Int32 + EdgeRef field.String FilterISP field.Int FilterProv field.String FilterCity field.String @@ -182,10 +186,12 @@ func (c *channel) updateTableName(table string) *channel { c.DeletedAt = field.NewField(table, "deleted_at") c.UserID = field.NewInt32(table, "user_id") c.ResourceID = field.NewInt32(table, "resource_id") - c.ProxyID = field.NewInt32(table, "proxy_id") c.BatchNo = field.NewString(table, "batch_no") + c.ProxyID = field.NewInt32(table, "proxy_id") + c.Host = field.NewString(table, "host") c.Port = field.NewUint16(table, "port") c.EdgeID = field.NewInt32(table, "edge_id") + c.EdgeRef = field.NewString(table, "edge_ref") c.FilterISP = field.NewInt(table, "filter_isp") c.FilterProv = field.NewString(table, "filter_prov") c.FilterCity = field.NewString(table, "filter_city") @@ -210,17 +216,19 @@ func (c *channel) GetFieldByName(fieldName string) (field.OrderExpr, bool) { } func (c *channel) fillFieldMap() { - c.fieldMap = make(map[string]field.Expr, 22) + c.fieldMap = make(map[string]field.Expr, 24) c.fieldMap["id"] = c.ID c.fieldMap["created_at"] = c.CreatedAt c.fieldMap["updated_at"] = c.UpdatedAt c.fieldMap["deleted_at"] = c.DeletedAt c.fieldMap["user_id"] = c.UserID c.fieldMap["resource_id"] = c.ResourceID - c.fieldMap["proxy_id"] = c.ProxyID c.fieldMap["batch_no"] = c.BatchNo + c.fieldMap["proxy_id"] = c.ProxyID + c.fieldMap["host"] = c.Host c.fieldMap["port"] = c.Port c.fieldMap["edge_id"] = c.EdgeID + c.fieldMap["edge_ref"] = c.EdgeRef c.fieldMap["filter_isp"] = c.FilterISP c.fieldMap["filter_prov"] = c.FilterProv c.fieldMap["filter_city"] = c.FilterCity diff --git a/web/queries/proxy.gen.go b/web/queries/proxy.gen.go index f4f5fd9..bb61cfc 100644 --- a/web/queries/proxy.gen.go +++ b/web/queries/proxy.gen.go @@ -34,6 +34,7 @@ func newProxy(db *gorm.DB, opts ...gen.DOOption) proxy { _proxy.Version = field.NewInt32(tableName, "version") _proxy.Mac = field.NewString(tableName, "mac") _proxy.IP = field.NewField(tableName, "ip") + _proxy.Host = field.NewString(tableName, "host") _proxy.Secret = field.NewString(tableName, "secret") _proxy.Type = field.NewInt(tableName, "type") _proxy.Status = field.NewInt(tableName, "status") @@ -120,6 +121,7 @@ type proxy struct { Version field.Int32 Mac field.String IP field.Field + Host field.String Secret field.String Type field.Int Status field.Int @@ -148,6 +150,7 @@ func (p *proxy) updateTableName(table string) *proxy { p.Version = field.NewInt32(table, "version") p.Mac = field.NewString(table, "mac") p.IP = field.NewField(table, "ip") + p.Host = field.NewString(table, "host") p.Secret = field.NewString(table, "secret") p.Type = field.NewInt(table, "type") p.Status = field.NewInt(table, "status") @@ -168,7 +171,7 @@ func (p *proxy) GetFieldByName(fieldName string) (field.OrderExpr, bool) { } func (p *proxy) fillFieldMap() { - p.fieldMap = make(map[string]field.Expr, 12) + p.fieldMap = make(map[string]field.Expr, 13) p.fieldMap["id"] = p.ID p.fieldMap["created_at"] = p.CreatedAt p.fieldMap["updated_at"] = p.UpdatedAt @@ -176,6 +179,7 @@ func (p *proxy) fillFieldMap() { p.fieldMap["version"] = p.Version p.fieldMap["mac"] = p.Mac p.fieldMap["ip"] = p.IP + p.fieldMap["host"] = p.Host p.fieldMap["secret"] = p.Secret p.fieldMap["type"] = p.Type p.fieldMap["status"] = p.Status diff --git a/web/routes.go b/web/routes.go index e7ff573..a46e452 100644 --- a/web/routes.go +++ b/web/routes.go @@ -73,6 +73,7 @@ func ApplyRouters(app *fiber.App) { edge.Post("/assign", handlers.AssignEdge) edge.Post("/all", handlers.AllEdgesAvailable) + // 回调 callbacks := app.Group("/callback") callbacks.Get("/identify", handlers.IdentifyCallbackNew) diff --git a/web/services/channel.go b/web/services/channel.go index c3155ff..caeba5d 100644 --- a/web/services/channel.go +++ b/web/services/channel.go @@ -2,25 +2,42 @@ package services import ( "context" + "fmt" "math/rand/v2" "net/netip" "platform/web/core" g "platform/web/globals" m "platform/web/models" q "platform/web/queries" + "strconv" "time" + "github.com/redis/go-redis/v9" "gorm.io/gen/field" ) -var Channel ChannelService = &channelBaiyinService{} - // 通道服务 -type ChannelService interface { +var Channel = &channelServer{ + provider: &channelBaiyinService{}, +} + +type ChanProviderAdapter interface { CreateChannels(source netip.Addr, resourceId int32, authWhitelist bool, authPassword bool, count int, edgeFilter ...EdgeFilter) ([]*m.Channel, error) RemoveChannels(batch string) error } +type channelServer struct { + provider ChanProviderAdapter +} + +func (s *channelServer) CreateChannels(source netip.Addr, resourceId int32, authWhitelist bool, authPassword bool, count int, edgeFilter ...EdgeFilter) ([]*m.Channel, error) { + return s.provider.CreateChannels(source, resourceId, authWhitelist, authPassword, count, edgeFilter...) +} + +func (s *channelServer) RemoveChannels(batch string) error { + return s.provider.RemoveChannels(batch) +} + // 授权方式 type ChannelAuthType int @@ -60,12 +77,14 @@ func findResource(resourceId int32) (*ResourceView, error) { if err != nil { return nil, ErrResourceNotExist } - + if resource.User == nil { + return nil, ErrResourceNotExist + } var info = &ResourceView{ Id: resource.ID, Active: resource.Active, Type: resource.Type, - User: resource.User, + User: *resource.User, } switch resource.Type { @@ -135,34 +154,127 @@ type ResourceView struct { User m.User } +// 检查用户是否可提取 +func ensure(now time.Time, source netip.Addr, resourceId int32, count int) (*ResourceView, []string, error) { + if count > 400 { + return nil, nil, core.NewBizErr("单次最多提取 400 个") + } + + // 获取用户套餐 + resource, err := findResource(resourceId) + if err != nil { + return nil, nil, err + } + + // 检查用户 + user := resource.User + if user.IDToken == nil || *user.IDToken == "" { + return nil, nil, core.NewBizErr("账号未实名") + } + + // 获取用户白名单并检查用户 ip 地址 + whitelists, err := q.Whitelist.Where( + q.Whitelist.UserID.Eq(user.ID), + ).Find() + if err != nil { + return nil, nil, err + } + + ips := make([]string, len(whitelists)) + pass := false + for i, item := range whitelists { + ips[i] = item.IP.String() + if item.IP.Addr == source { + pass = true + } + } + if !pass { + return nil, nil, core.NewBizErr(fmt.Sprintf("IP 地址 %s 不在白名单内", source.String())) + } + + // 检查套餐使用情况 + switch resource.Mode { + default: + return nil, nil, core.NewBizErr("不支持的套餐模式") + + // 包时 + case m.ResourceModeTime: + // 检查过期时间 + if resource.Expire.Before(now) { + return nil, nil, ErrResourceExpired + } + // 检查每日限额 + used := 0 + if now.Format("2006-01-02") == resource.DailyLast.Format("2006-01-02") { + used = int(resource.DailyUsed) + } + excess := used+count > int(resource.DailyLimit) + if excess { + return nil, nil, ErrResourceDailyLimit + } + + // 包量 + case m.ResourceModeQuota: + // 检查可用配额 + if int(resource.Quota)-int(resource.Used) < count { + return nil, nil, ErrResourceExhausted + } + } + + return resource, ips, nil +} + var ( - allChansKey = "channel:all" freeChansKey = "channel:free" usedChansKey = "channel:used" ) +// 扩容通道 +func regChans(proxy int32, chans []netip.AddrPort) error { + strs := make([]any, len(chans)) + for i, ch := range chans { + strs[i] = ch.String() + } + + key := freeChansKey + ":" + strconv.Itoa(int(proxy)) + err := g.Redis.SAdd(context.Background(), key, strs...).Err() + if err != nil { + return fmt.Errorf("扩容通道失败: %w", err) + } + return nil +} + +// 缩容通道 +func remChans(proxy int32) error { + key := freeChansKey + ":" + strconv.Itoa(int(proxy)) + err := g.Redis.SRem(context.Background(), key).Err() + if err != nil { + return fmt.Errorf("缩容通道失败: %w", err) + } + return nil +} + // 取用通道 -func lockChans(batch string, count int, expire time.Time) ([]netip.AddrPort, error) { - chans, err := g.Redis.Eval( +func lockChans(proxy int32, batch string, count int) ([]netip.AddrPort, error) { + pid := strconv.Itoa(int(proxy)) + chans, err := RedisScriptLockChans.Run( context.Background(), - RedisScriptLockChans, + g.Redis, []string{ - freeChansKey, - usedChansKey, - usedChansKey + ":" + batch, + freeChansKey + ":" + pid, + usedChansKey + ":" + pid + ":" + batch, }, count, - expire.Unix(), ).StringSlice() if err != nil { - return nil, core.NewBizErr("获取通道失败", err) + return nil, fmt.Errorf("获取通道失败: %w", err) } addrs := make([]netip.AddrPort, len(chans)) for i, ch := range chans { addr, err := netip.ParseAddrPort(ch) if err != nil { - return nil, core.NewServErr("解析通道数据失败", err) + return nil, fmt.Errorf("解析通道数据失败: %w", err) } addrs[i] = addr } @@ -170,41 +282,31 @@ func lockChans(batch string, count int, expire time.Time) ([]netip.AddrPort, err return addrs, nil } -var RedisScriptLockChans = ` +var RedisScriptLockChans = redis.NewScript(` local free_key = KEYS[1] -local used_key = KEYS[2] -local batch_key = KEYS[3] +local batch_key = KEYS[2] local count = tonumber(ARGV[1]) -local expire = tonumber(ARGV[2]) if redis.call("SCARD", free_key) < count then return nil end local ports = redis.call("SPOP", free_key, count) -redis.call("ZADD", used_key, expire, batch_key) redis.call("RPUSH", batch_key, unpack(ports)) return ports -` +`) // 归还通道 -func freeChans(batch string, chans []string) error { - values := make([]any, len(chans)) - for i, ch := range chans { - values[i] = ch - } - - err := g.Redis.Eval( +func freeChans(proxy int32, batch string) error { + pid := strconv.Itoa(int(proxy)) + err := RedisScriptFreeChans.Run( context.Background(), - RedisScriptFreeChans, + g.Redis, []string{ - freeChansKey, - usedChansKey, - usedChansKey + ":" + batch, - allChansKey, + freeChansKey + ":" + pid, + usedChansKey + ":" + pid + ":" + batch, }, - values..., ).Err() if err != nil { return core.NewBizErr("释放通道失败", err) @@ -213,92 +315,19 @@ func freeChans(batch string, chans []string) error { return nil } -var RedisScriptFreeChans = ` +var RedisScriptFreeChans = redis.NewScript(` local free_key = KEYS[1] -local used_key = KEYS[2] -local batch_key = KEYS[3] -local all_key = KEYS[4] -local chans = ARGV +local batch_key = KEYS[2] -local count = 0 -for i, chan in ipairs(chans) do - if redis.call("SISMEMBER", all_key, chan) == 1 then - redis.call("SADD", free_key, chan) - count = count + 1 - end -end -redis.call("ZREM", used_key, batch_key) +local chans = redis.call("LRANGE", batch_key, 0, -1) redis.call("DEL", batch_key) -return count -` - -// 扩容通道 -func addChans(chans []netip.AddrPort) error { - strs := make([]string, len(chans)) - for i, ch := range chans { - strs[i] = ch.String() - } - - err := g.Redis.Eval( - context.Background(), - RedisScriptAddChans, - []string{ - freeChansKey, - allChansKey, - }, - strs, - ).Err() - if err != nil { - return core.NewBizErr("扩容通道失败", err) - } - - return nil -} - -var RedisScriptAddChans = ` -local free_key = KEYS[1] -local all_key = KEYS[2] -local chans = ARGV - -local batch_size = 5000 -for i = 1, #chans, batch_size do - local end_index = math.min(i + batch_size - 1, #chans) - redis.call("SADD", free_key, unpack(chans, i, end_index)) - redis.call("SADD", all_key, unpack(chans, i, end_index)) +if redis.call("EXISTS", free_key) == 1 then + redis.call("SADD", free_key, unpack(chans)) end return 1 -` - -// 缩容通道 -func removeChans(chans []string) error { - err := g.Redis.Eval( - context.Background(), - RedisScriptRemoveChans, - []string{ - freeChansKey, - allChansKey, - }, - chans, - ).Err() - if err != nil { - return core.NewBizErr("缩容通道失败", err) - } - - return nil -} - -var RedisScriptRemoveChans = ` -local free_key = KEYS[1] -local all_key = KEYS[2] -local chans = ARGV - -redis.call("SREM", free_key, unpack(chans)) -redis.call("SREM", all_key, unpack(chans)) - -return 1 -` +`) // 错误信息 var ( diff --git a/web/services/channel_baiyin.go b/web/services/channel_baiyin.go index 82535a1..5ae20b0 100644 --- a/web/services/channel_baiyin.go +++ b/web/services/channel_baiyin.go @@ -1,7 +1,6 @@ package services import ( - "database/sql/driver" "encoding/json" "fmt" "log/slog" @@ -24,10 +23,6 @@ import ( type channelBaiyinService struct{} func (s *channelBaiyinService) CreateChannels(source netip.Addr, resourceId int32, authWhitelist bool, authPassword bool, count int, edgeFilter ...EdgeFilter) ([]*m.Channel, error) { - if count > 400 { - return nil, core.NewBizErr("单次最多提取 400 个") - } - var filter *EdgeFilter = nil if len(edgeFilter) > 0 { filter = &edgeFilter[0] @@ -36,148 +31,114 @@ func (s *channelBaiyinService) CreateChannels(source netip.Addr, resourceId int3 now := time.Now() batch := ID.GenReadable("bat") - // 获取用户套餐 - resource, err := findResource(resourceId) + // 检查并获取套餐与白名单 + resource, whitelists, err := ensure(now, source, resourceId, count) if err != nil { return nil, err } - // 检查用户 user := resource.User - if user.IDToken == nil || *user.IDToken == "" { - return nil, core.NewBizErr("账号未实名") - } - - // 获取用户白名单并检查用户 ip 地址 - whitelists, err := q.Whitelist.Where( - q.Whitelist.UserID.Eq(user.ID), - ).Find() - if err != nil { - return nil, err - } - - whitelistIPs := make([]string, len(whitelists)) - pass := false - for i, item := range whitelists { - whitelistIPs[i] = item.IP.String() - if item.IP.Addr == source { - pass = true - } - } - if !pass { - return nil, core.NewBizErr(fmt.Sprintf("IP 地址 %s 不在白名单内", source.String())) - } - - // 检查套餐使用情况 - switch resource.Mode { - default: - return nil, core.NewBizErr("不支持的套餐模式") - - // 包时 - case m.ResourceModeTime: - // 检查过期时间 - if resource.Expire.Before(now) { - return nil, ErrResourceExpired - } - // 检查每日限额 - used := 0 - if now.Format("2006-01-02") == resource.DailyLast.Format("2006-01-02") { - used = int(resource.DailyUsed) - } - excess := used+count > int(resource.DailyLimit) - if excess { - return nil, ErrResourceDailyLimit - } - - // 包量 - case m.ResourceModeQuota: - // 检查可用配额 - if int(resource.Quota)-int(resource.Used) < count { - return nil, ErrResourceExhausted - } - } - expire := now.Add(resource.Live) + // 选择代理 + proxyResult := struct { + m.Proxy + Count int + }{} + err = q.Proxy. + LeftJoin(q.Channel, q.Channel.ProxyID.EqCol(q.Proxy.ID), q.Channel.ExpiredAt.Gt(now)). + Select(q.Proxy.ALL, field.NewUnsafeFieldRaw("10000 - count(*)").As("count")). + Where( + q.Proxy.Type.Eq(int(m.ProxyTypeBaiYin)), + q.Proxy.Status.Eq(int(m.ProxyStatusOnline)), + ). + Group(q.Proxy.ID). + Order(field.NewField("", "count")). + Limit(1).Scan(&proxyResult) + if err != nil { + return nil, core.NewBizErr("获取可用代理失败", err) + } + if proxyResult.Count < count { + return nil, core.NewBizErr("无可用主机,请稍后再试") + } + proxy := proxyResult.Proxy + // 获取可用通道 - chans, err := lockChans(batch, count, expire) + chans, err := lockChans(proxy.ID, batch, count) if err != nil { - return nil, err + return nil, core.NewBizErr("无可用通道,请稍后再试", err) } - // 获取对应代理 - ips := make([]driver.Valuer, 0) - findProxy := make(map[orm.Inet]*m.Proxy) - for _, ch := range chans { - ip := orm.Inet{Addr: ch.Addr()} - if _, ok := findProxy[ip]; !ok { - ips = append(ips, ip) - findProxy[ip] = nil - } - } - - proxies, err := q.Proxy.Where( - q.Proxy.Type.Eq(int(m.ProxyTypeBaiYin)), - q.Proxy.Status.Eq(int(m.ProxyStatusOnline)), - q.Proxy.IP.In(ips...), - ).Find() + // 获取可用节点 + edgesResp, err := g.Cloud.CloudEdges(&g.CloudEdgesReq{ + Province: filter.Prov, + City: filter.City, + Isp: u.X(filter.Isp.String()), + Limit: &count, + NoRepeat: u.P(true), + NoDayRepeat: u.P(true), + ActiveTime: u.P(3600), + IpUnchangedTime: u.P(3600), + Sort: u.P("ip_unchanged_time_asc"), + }) if err != nil { - return nil, core.NewBizErr("获取代理失败", err) + return nil, core.NewBizErr("获取可用节点失败", err) } - - groups := make(map[*m.Proxy][]*m.Channel) - for _, proxy := range proxies { - findProxy[proxy.IP] = proxy - groups[proxy] = make([]*m.Channel, 0) + if edgesResp.Total != count && len(edgesResp.Edges) != count { + return nil, core.NewBizErr("无可用节点,请稍后再试") } + edges := edgesResp.Edges // 准备通道数据 - actions := make([]*m.LogsUserUsage, len(chans)) - channels := make([]*m.Channel, len(chans)) - for i, ch := range chans { + channels := make([]*m.Channel, count) + chanConfigs := make([]*g.PortConfigsReq, count) + edgeConfigs := make([]string, count) + for i := range count { + ch := chans[i] + edge := edges[i] + if err != nil { return nil, core.NewBizErr("解析通道地址失败", err) } - // 使用记录 - actions[i] = &m.LogsUserUsage{ - UserID: user.ID, - ResourceID: resourceId, - BatchNo: batch, - Count: int32(count), - ISP: u.P(filter.Isp.String()), - Prov: filter.Prov, - City: filter.City, - IP: orm.Inet{Addr: source}, - Time: now, - } - // 通道数据 - inet := orm.Inet{Addr: ch.Addr()} channels[i] = &m.Channel{ UserID: user.ID, ResourceID: resourceId, BatchNo: batch, - ProxyID: findProxy[inet].ID, + ProxyID: proxy.ID, + Host: u.Else(proxy.Host, proxy.IP.String()), Port: ch.Port(), + EdgeRef: u.P(edge.EdgeID), FilterISP: filter.Isp, FilterProv: filter.Prov, FilterCity: filter.City, ExpiredAt: expire, - Proxy: *findProxy[inet], } + + // 通道配置数据 + chanConfigs[i] = &g.PortConfigsReq{ + Port: int(ch.Port()), + Status: true, + Edge: &[]string{edge.EdgeID}, + } + + // 白名单模式 if authWhitelist { - channels[i].Whitelists = u.P(strings.Join(whitelistIPs, ",")) + channels[i].Whitelists = u.P(strings.Join(whitelists, ",")) + chanConfigs[i].Whitelist = &whitelists } + + // 密码模式 if authPassword { username, password := genPassPair() channels[i].Username = &username channels[i].Password = &password + chanConfigs[i].Userpass = u.P(username + ":" + password) } - // 关联代理 - proxy := findProxy[inet] - groups[proxy] = append(groups[proxy], channels[i]) + // 连接配置数据 + edgeConfigs[i] = edge.EdgeID } // 提交异步任务关闭通道 @@ -230,7 +191,7 @@ func (s *channelBaiyinService) CreateChannels(source netip.Addr, resourceId int3 return core.NewServErr("更新套餐使用记录失败", err) } - // 保存通道和分配记录 + // 保存通道 err = q.Channel. Omit(field.AssociationFields). Create(channels...) @@ -238,7 +199,18 @@ func (s *channelBaiyinService) CreateChannels(source netip.Addr, resourceId int3 return core.NewServErr("保存通道失败", err) } - err = q.LogsUserUsage.Create(actions...) + // 保存提取记录 + err = q.LogsUserUsage.Create(&m.LogsUserUsage{ + UserID: user.ID, + ResourceID: resourceId, + BatchNo: batch, + Count: int32(count), + ISP: u.P(filter.Isp.String()), + Prov: filter.Prov, + City: filter.City, + IP: orm.Inet{Addr: source}, + Time: now, + }) if err != nil { return core.NewServErr("保存用户使用记录失败", err) } @@ -250,37 +222,29 @@ func (s *channelBaiyinService) CreateChannels(source netip.Addr, resourceId int3 } // 提交配置 - for proxy, chanels := range groups { - secret := strings.Split(u.Z(proxy.Secret), ":") - gateway := g.NewGateway(proxy.IP.String(), secret[0], secret[1]) + secret := strings.Split(u.Z(proxy.Secret), ":") + gateway := g.NewGateway(proxy.IP.String(), secret[0], secret[1]) + if env.DebugExternalChange { - configs := make([]g.PortConfigsReq, len(chanels)) - for i, channel := range chanels { - configs[i] = g.PortConfigsReq{ - Port: int(channel.Port), - Status: true, - AutoEdgeConfig: &g.AutoEdgeConfig{ - Isp: channel.FilterISP.String(), - Province: u.Z(channel.FilterProv), - City: u.Z(channel.FilterCity), - }, - } - if authWhitelist { - configs[i].Whitelist = &whitelistIPs - } - if authPassword { - configs[i].Userpass = u.P(fmt.Sprintf("%s:%s", *channel.Username, *channel.Password)) - } + // 连接节点到网关 + err = g.Cloud.CloudConnect(&g.CloudConnectReq{ + Uuid: proxy.Mac, + Edge: &edgeConfigs, + }) + if err != nil { + return nil, core.NewServErr("连接云平台失败", err) } - if env.DebugExternalChange { - err := gateway.GatewayPortConfigs(configs) - if err != nil { - return nil, core.NewServErr(fmt.Sprintf("配置代理 %s 端口失败", proxy.IP.String()), err) - } - } else { - bytes, _ := json.Marshal(configs) - slog.Debug("提交代理端口配置", "proxy", proxy.IP.String(), "config", string(bytes)) + // 启用网关代理通道 + err = gateway.GatewayPortConfigs(chanConfigs) + if err != nil { + return nil, core.NewServErr(fmt.Sprintf("配置代理 %s 端口失败", proxy.IP.String()), err) + } + } else { + slog.Debug("提交代理端口配置", "proxy", proxy.IP.String()) + for _, item := range chanConfigs { + str, _ := json.Marshal(item) + fmt.Println(string(str)) } } @@ -291,57 +255,58 @@ func (s *channelBaiyinService) RemoveChannels(batch string) error { start := time.Now() // 获取连接数据 - channels, err := q.Channel. - Preload(q.Channel.Proxy). - Where(q.Channel.BatchNo.Eq(batch)). - Find() + channels, err := q.Channel.Where(q.Channel.BatchNo.Eq(batch)).Find() if err != nil { return core.NewServErr("获取通道数据失败", err) } - proxies := make(map[string]*m.Proxy, len(channels)) - groups := make(map[string][]*m.Channel, len(channels)) - chans := make([]string, len(channels)) - for i, channel := range channels { - ip := channel.Proxy.IP.String() - groups[ip] = append(groups[ip], channel) - proxies[ip] = &channel.Proxy - chans[i] = fmt.Sprintf("%s:%d", ip, channel.Port) + proxy, err := q.Proxy.Where(q.Proxy.ID.Eq(channels[0].ProxyID)).Take() + if err != nil { + return core.NewServErr("获取代理数据失败", err) } - addrs := make([]netip.AddrPort, len(channels)) + // 准备配置数据 + edgeConfigs := make([]string, len(channels)) + configs := make([]*g.PortConfigsReq, len(channels)) for i, channel := range channels { - addrs[i] = netip.AddrPortFrom(channel.Proxy.IP.Addr, channel.Port) - } - - // 清空配置 - for ip, channels := range groups { - proxy := proxies[ip] - secret := strings.Split(*proxy.Secret, ":") - gateway := g.NewGateway(ip, secret[0], secret[1]) - - configs := make([]g.PortConfigsReq, len(channels)) - for i, channel := range channels { - configs[i] = g.PortConfigsReq{ - Status: false, - Port: int(channel.Port), - Edge: &[]string{}, - } + if channel.EdgeRef != nil { + edgeConfigs[i] = *channel.EdgeRef + } else { + slog.Warn(fmt.Sprintf("通道 %d 没有保存节点引用", channel.ID)) } - if env.DebugExternalChange { - err := gateway.GatewayPortConfigs(configs) - if err != nil { - return core.NewServErr(fmt.Sprintf("清空代理 %s 端口配置失败", proxy.IP.String()), err) - } - } else { - bytes, _ := json.Marshal(configs) - slog.Debug("清除代理端口配置", "proxy", ip, "config", string(bytes)) + configs[i] = &g.PortConfigsReq{ + Status: false, + Port: int(channel.Port), + Edge: &[]string{}, + } + } + + // 提交配置 + if env.DebugExternalChange { + // 断开节点连接 + g.Cloud.CloudDisconnect(&g.CloudDisconnectReq{ + Uuid: proxy.Mac, + Edge: &edgeConfigs, + }) + + // 清空通道配置 + secret := strings.Split(*proxy.Secret, ":") + gateway := g.NewGateway(proxy.IP.String(), secret[0], secret[1]) + err := gateway.GatewayPortConfigs(configs) + if err != nil { + return core.NewServErr(fmt.Sprintf("清空代理 %s 端口配置失败", proxy.IP.String()), err) + } + } else { + slog.Debug("清除代理端口配置", "proxy", proxy.IP) + for _, item := range configs { + str, _ := json.Marshal(item) + fmt.Println(string(str)) } } // 释放端口 - err = freeChans(batch, chans) + err = freeChans(proxy.ID, batch) if err != nil { return err } diff --git a/web/services/proxy.go b/web/services/proxy.go index e77cbc4..de60d5c 100644 --- a/web/services/proxy.go +++ b/web/services/proxy.go @@ -33,28 +33,29 @@ func (s *proxyService) AllProxies(proxyType m.ProxyType, channels bool) ([]*m.Pr // RegisterBaiyin 注册新代理服务 func (s *proxyService) RegisterBaiyin(Mac string, IP netip.Addr, username, password string) error { - // 添加可用通道到 redis - chans := make([]netip.AddrPort, 10000) - for i := range 10000 { - chans[i] = netip.AddrPortFrom(IP, uint16(i+10000)) - } - err := addChans(chans) - if err != nil { - return core.NewServErr("添加通道失败", err) - } - // 保存代理信息 - if err := q.Proxy.Create(&m.Proxy{ + proxy := &m.Proxy{ Version: 0, Mac: Mac, IP: orm.Inet{Addr: IP}, Secret: u.P(fmt.Sprintf("%s:%s", username, password)), Type: m.ProxyTypeBaiYin, Status: m.ProxyStatusOnline, - }); err != nil { + } + if err := q.Proxy.Create(proxy); err != nil { return core.NewServErr("保存通道数据失败") } + // 添加可用通道到 redis + chans := make([]netip.AddrPort, 10000) + for i := range 10000 { + chans[i] = netip.AddrPortFrom(IP, uint16(i+10000)) + } + err := regChans(proxy.ID, chans) + if err != nil { + return core.NewServErr("添加通道失败", err) + } + return nil } diff --git a/web/tasks/task.go b/web/tasks/task.go index 26bbd65..8424183 100644 --- a/web/tasks/task.go +++ b/web/tasks/task.go @@ -5,18 +5,11 @@ import ( "encoding/json" "fmt" "log/slog" - "platform/pkg/env" - "platform/pkg/u" "platform/web/events" - g "platform/web/globals" - m "platform/web/models" - q "platform/web/queries" s "platform/web/services" - "strings" "time" "github.com/hibiken/asynq" - "gorm.io/datatypes" ) func HandleCompleteTrade(_ context.Context, task *asynq.Task) (err error) { @@ -51,99 +44,3 @@ func HandleRemoveChannel(_ context.Context, task *asynq.Task) (err error) { } return nil } - -func HandleFlushGateway(_ context.Context, task *asynq.Task) error { - start := time.Now() - defer func() { - duration := time.Since(start) - if duration > time.Second { - slog.Warn("更新代理后备配置耗时过长", "time", duration.String()) - } - }() - - // 获取所有网关:配置组 - proxies, err := s.Proxy.AllProxies(m.ProxyTypeBaiYin, true) - if err != nil { - return fmt.Errorf("获取网关失败: %w", err) - } - - for _, proxy := range proxies { - - // 获取当前后备配置 - locals := map[string]int{} - for _, channel := range proxy.Channels { - isp := channel.FilterISP.String() - prov := u.Z(channel.FilterProv) - city := u.Z(channel.FilterCity) - locals[fmt.Sprintf("%s:%s:%s", isp, prov, city)]++ - } - - // 获取之前的后备配置 - remotes := map[string]int{} - if proxy.Meta != nil { - meta, ok := proxy.Meta.Data().([]any) - if !ok { - return fmt.Errorf("解析网关数据失败: %T", proxy.Meta.Data()) - } - for _, rawM := range meta { - m, ok := rawM.(map[string]any) - if !ok { - return fmt.Errorf("解析网关数据失败: %T", rawM) - } - remotes[fmt.Sprintf("%s:%s:%s", m["isp"], m["province"], m["city"])] = int(m["count"].(float64)) - } - } - - // 检查是否需要更新 - pass := true - for k, local := range locals { - remote, ok := remotes[k] - if !ok { - pass = false - } else { - local, remote := float64(local), float64(remote) - if remote < local*1.5 || remote > local*3 { - pass = false - } - } - } - if pass { - continue - } - - // 更新后备配置 - configs := make([]g.AutoConfig, 0) - for k, local := range locals { - arr := strings.Split(k, ":") - isp, prov, city := arr[0], arr[1], arr[2] - configs = append(configs, g.AutoConfig{ - Isp: isp, - Province: prov, - City: city, - Count: local * 2, - }) - } - - if env.DebugExternalChange { - err := g.Cloud.CloudConnect(g.CloudConnectReq{ - Uuid: proxy.Mac, - AutoConfig: configs, - }) - if err != nil { - slog.Error("提交代理后备配置失败", "error", err) - } - } else { - bytes, _ := json.Marshal(configs) - slog.Debug("更新代理后备配置", "proxy", proxy.IP.String(), "config", string(bytes)) - } - - _, err := q.Proxy. - Where(q.Proxy.ID.Eq(proxy.ID)). - UpdateSimple(q.Proxy.Meta.Value(datatypes.NewJSONType(configs))) - if err != nil { - slog.Error("更新代理后备配置失败", "error", err) - } - } - - return nil -} diff --git a/web/web.go b/web/web.go index 6e78eb0..78b9e01 100644 --- a/web/web.go +++ b/web/web.go @@ -6,7 +6,7 @@ import ( "log/slog" _ "net/http/pprof" "platform/web/events" - base "platform/web/globals" + deps "platform/web/globals" "platform/web/tasks" "time" @@ -19,12 +19,12 @@ func RunApp(pCtx context.Context) error { g, ctx := errgroup.WithContext(pCtx) // 初始化依赖 - err := base.Init(ctx) + err := deps.Init(ctx) if err != nil { return fmt.Errorf("初始化依赖失败: %w", err) } defer func() { - err := base.Close() + err := deps.Close() if err != nil { slog.Error("关闭依赖失败", "error", err) } @@ -39,10 +39,6 @@ func RunApp(pCtx context.Context) error { return RunTask(ctx) }) - g.Go(func() error { - return RunSchedule(ctx) - }) - return g.Wait() } @@ -76,30 +72,9 @@ func RunWeb(ctx context.Context) error { return nil } -func RunSchedule(ctx context.Context) error { - var scheduler = asynq.NewSchedulerFromRedisClient(base.Redis, &asynq.SchedulerOpts{ - Location: time.Local, - }) - - scheduler.Register("@every 5s", events.NewFlushGateway(5*time.Second)) - - // 停止服务 - go func() { - <-ctx.Done() - scheduler.Shutdown() - }() - - // 启动服务 - err := scheduler.Run() - if err != nil { - return fmt.Errorf("调度服务运行失败: %w", err) - } - - return nil -} - func RunTask(ctx context.Context) error { - var server = asynq.NewServerFromRedisClient(base.Redis, asynq.Config{ + var server = asynq.NewServerFromRedisClient(deps.Redis, asynq.Config{ + ShutdownTimeout: 5 * time.Second, ErrorHandler: asynq.ErrorHandlerFunc(func(ctx context.Context, task *asynq.Task, err error) { slog.Error("任务执行失败", "task", task.Type(), "error", err) }), @@ -108,7 +83,6 @@ func RunTask(ctx context.Context) error { var mux = asynq.NewServeMux() mux.HandleFunc(events.RemoveChannel, tasks.HandleRemoveChannel) mux.HandleFunc(events.CompleteTrade, tasks.HandleCompleteTrade) - mux.HandleFunc(events.FlushGateway, tasks.HandleFlushGateway) // 停止服务 go func() {