diff --git a/cmd/playground/main.go b/cmd/playground/main.go index c4c8328..311277e 100644 --- a/cmd/playground/main.go +++ b/cmd/playground/main.go @@ -1,5 +1,33 @@ package main -func main() { - println(string(rune(122))) +import ( + "fmt" + "platform/pkg/env" + "platform/pkg/logs" + "platform/pkg/orm" + "platform/web/models" + q "platform/web/queries" +) + +type ResourceInfo struct { + data models.Resource + pss models.ResourcePss +} + +func main() { + + env.Init() + logs.Init() + orm.Init() + + var resource = new(ResourceInfo) + data := q.Resource.As("data") + pss := q.ResourcePss.As("pss") + _ = data.Debug().Scopes(orm.Alias(data)). + Select(data.ALL, pss.ALL). + LeftJoin(q.ResourcePss.As("pss"), pss.ResourceID.EqCol(data.ID)). + Where(data.ID.Eq(1)). + Scan(&resource) + + fmt.Printf("%+v\n", resource) } diff --git a/pkg/remote/remote.go b/pkg/remote/remote.go index 0647482..b1725d5 100644 --- a/pkg/remote/remote.go +++ b/pkg/remote/remote.go @@ -3,6 +3,7 @@ package remote import ( "encoding/json" "errors" + "fmt" "io" "net/http" "strconv" @@ -19,7 +20,8 @@ var Client client func Init() { // todo 从环境变量中获取参数 Client = client{ - url: "http://103.139.212.110", + url: "http://103.139.212.110:9989", + token: "tHDarLc1ct6M9NMAxeO98lN2YsEadYSx.anVpcA==.MTc0MzA4MTAwMQ==", } } @@ -106,15 +108,15 @@ type CloudConnectReq struct { AutoConfig []AutoConfig `json:"auto_config,omitempty"` } -func (c *client) CloudConnect(param CloudConnectReq) (int, error) { +func (c *client) CloudConnect(param CloudConnectReq) error { data, err := json.Marshal(param) if err != nil { - return 0, err + return err } resp, err := c.requestCloud("POST", "/connect", string(data)) if err != nil { - return 0, err + return err } defer func(Body io.ReadCloser) { @@ -122,25 +124,25 @@ func (c *client) CloudConnect(param CloudConnectReq) (int, error) { }(resp.Body) if resp.StatusCode != http.StatusOK { - return 0, errors.New("failed to connect") + return errors.New("failed to connect") } body, err := io.ReadAll(resp.Body) if err != nil { - return 0, err + return err } var result map[string]any err = json.Unmarshal(body, &result) if err != nil { - return 0, err + return err } if result["status"] == "error" { - return 0, errors.New(result["details"].(string)) + return errors.New(result["details"].(string)) } - return int(result["connected_edges"].(float64)), nil + return nil } // endregion @@ -202,7 +204,7 @@ func (c *client) CloudDisconnect(param CloudDisconnectReq) (int, error) { // region cloud:/auto_query -type CloudConnectResp map[string]AutoConfig +type CloudConnectResp map[string][]AutoConfig func (c *client) CloudAutoQuery() (CloudConnectResp, error) { resp, err := c.requestCloud("GET", "/auto_query", "") @@ -235,7 +237,8 @@ func (c *client) CloudAutoQuery() (CloudConnectResp, error) { func (c *client) requestCloud(method string, url string, data string) (*http.Response, error) { - req, err := http.NewRequest(method, c.url+url, strings.NewReader(data)) + url = fmt.Sprintf("%s/api%s", c.url, url) + req, err := http.NewRequest(method, url, strings.NewReader(data)) if err != nil { return nil, err } @@ -264,7 +267,7 @@ func InitGateway(url, username, password string) *Gateway { // region gateway:/port/configs type PortConfigsReq struct { - Port string `json:"port"` + Port int `json:"port"` Edge []string `json:"edge,omitempty"` Type string `json:"type,omitempty"` Time int `json:"time,omitempty"` @@ -301,15 +304,15 @@ func (c *Gateway) GatewayPortConfigs(params []PortConfigsReq) error { _ = Body.Close() }(resp.Body) - if resp.StatusCode != http.StatusOK { - return errors.New("failed to configure port") - } - body, err := io.ReadAll(resp.Body) if err != nil { return err } + if resp.StatusCode != http.StatusOK { + return errors.New("failed to get port configs: " + string(body)) + } + var result map[string]any err = json.Unmarshal(body, &result) if err != nil { @@ -398,7 +401,8 @@ func (c *Gateway) requestGateway(method string, url string, data any) (*http.Res return nil, err } - req, err := http.NewRequest(method, c.username+":"+c.password+"@"+c.url+url, strings.NewReader(string(jsonData))) + url = fmt.Sprintf("http://%s:%s@%s:9990%s", c.username, c.password, c.url, url) + req, err := http.NewRequest(method, url, strings.NewReader(string(jsonData))) if err != nil { return nil, err } diff --git a/scripts/sql/init.sql b/scripts/sql/init.sql index 1806b70..a0ed496 100644 --- a/scripts/sql/init.sql +++ b/scripts/sql/init.sql @@ -563,12 +563,12 @@ comment on column product.deleted_at is '删除时间'; drop table if exists resource cascade; create table resource ( id serial primary key, - user_id int not null references "user" (id) + user_id int not null references "user" (id) on update cascade on delete cascade, - active bool default true, - created_at timestamp default current_timestamp, - updated_at timestamp default current_timestamp, + active bool not null default true, + created_at timestamp default current_timestamp, + updated_at timestamp default current_timestamp, deleted_at timestamp ); create index resource_user_id_index on resource (user_id); @@ -595,11 +595,11 @@ create table resource_pss ( quota int, used int, expire timestamp, - daily_limit int, - daily_used int, + daily_limit int not null default 0, + daily_used int not null default 0, daily_last timestamp, - created_at timestamp default current_timestamp, - updated_at timestamp default current_timestamp, + created_at timestamp default current_timestamp, + updated_at timestamp default current_timestamp, deleted_at timestamp ); create index resource_pss_resource_id_index on resource_pss (resource_id); diff --git a/test/test-api.http b/test/test-api.http index ba567a6..fbaa8b7 100644 --- a/test/test-api.http +++ b/test/test-api.http @@ -1,5 +1,44 @@ -GET http://api:123456@110.40.82.248:9990/port/active +### remote 令牌 +GET http://110.40.82.250:18702/server/index/getToken/key/juipbyjdapiverify + +### remote 配置信息 +GET http://103.139.212.110:9989/api/auto_query +token: tHDarLc1ct6M9NMAxeO98lN2YsEadYSx.anVpcA==.MTc0MzA4MTAwMQ== + +### gateway 端口信息 +GET http://api:123456@110.40.82.248:9990/port/active/ + +### remote 配置重置 +POST http://103.139.212.110:9989/api/connect +token: tHDarLc1ct6M9NMAxeO98lN2YsEadYSx.anVpcA==.MTc0MzA4MTAwMQ== +Content-Type: application/json + +{ + "uuid": "7a17e8b4-cdc3-4500-bf16-4a665991a7f6", + "auto_config": [ + { + "count": 200 + } + ] +} + +### 密码模式代理 +POST http://localhost:8080/api/channel/create +Authorization: Bearer b21568ed-09a9-4f1c-add6-d6b24bde7473 +Content-Type: application/json Accept: application/json -### +{ + "resource_id": 1, + "protocol": "http", + "auth_type": 1, + "count": 200, + "prov": "", + "city": "", + "isp": "", + "result_type": "text", + "result_separator": "both" +} + +### 白名单模式代理 diff --git a/web/handlers/channel.go b/web/handlers/channel.go index 29a467b..433adc3 100644 --- a/web/handlers/channel.go +++ b/web/handlers/channel.go @@ -19,8 +19,8 @@ type CreateChannelReq struct { Prov string `json:"prov" validate:"required"` City string `json:"city" validate:"required"` Isp string `json:"isp" validate:"required"` - ResultType CreateChannelResultType `json:"return_type" validate:"required,oneof=json text"` - ResultSeparator CreateChannelResultSeparator `json:"return_separator" validate:"required,oneof=enter line both tab"` + ResultType CreateChannelResultType `json:"result_type" validate:"required,oneof=json text"` + ResultSeparator CreateChannelResultSeparator `json:"result_separator" validate:"required,oneof=enter line both tab"` } func CreateChannel(c *fiber.Ctx) error { @@ -30,12 +30,12 @@ func CreateChannel(c *fiber.Ctx) error { } // 建立连接通道 - auth, ok := c.Locals("user").(*services.AuthContext) + auth, ok := c.Locals("auth").(*services.AuthContext) if !ok { return errors.New("user not found") } - channels, err := services.Channel.RemoteCreateChannel( + assigns, err := services.Channel.RemoteCreateChannel( c.Context(), auth, req.ResourceId, @@ -54,9 +54,13 @@ func CreateChannel(c *fiber.Ctx) error { // 返回连接通道列表 var result []string - for _, channel := range channels { - url := fmt.Sprintf("%s://%s:%d", channel.Protocol, channel.UserHost, channel.ProxyPort) - result = append(result, url) + for _, assign := range assigns { + var proxy = assign.Proxy + var channels = assign.Channels + for _, channel := range channels { + url := fmt.Sprintf("%s://%s:%d", channel.Protocol, proxy.Host, channel.ProxyPort) + result = append(result, url) + } } switch req.ResultType { diff --git a/web/models/resource.gen.go b/web/models/resource.gen.go index 437eb71..28337fa 100644 --- a/web/models/resource.gen.go +++ b/web/models/resource.gen.go @@ -16,7 +16,7 @@ const TableNameResource = "resource" type Resource struct { ID int32 `gorm:"column:id;primaryKey;autoIncrement:true;comment:套餐ID" json:"id"` // 套餐ID UserID int32 `gorm:"column:user_id;not null;comment:用户ID" json:"user_id"` // 用户ID - Active bool `gorm:"column:active;default:true;comment:套餐状态" json:"active"` // 套餐状态 + Active bool `gorm:"column:active;not null;default:true;comment:套餐状态" json:"active"` // 套餐状态 CreatedAt time.Time `gorm:"column:created_at;default:CURRENT_TIMESTAMP;comment:创建时间" json:"created_at"` // 创建时间 UpdatedAt time.Time `gorm:"column:updated_at;default:CURRENT_TIMESTAMP;comment:更新时间" json:"updated_at"` // 更新时间 DeletedAt gorm.DeletedAt `gorm:"column:deleted_at;comment:删除时间" json:"deleted_at"` // 删除时间 diff --git a/web/models/resource_pss.gen.go b/web/models/resource_pss.gen.go index a681863..f3f813e 100644 --- a/web/models/resource_pss.gen.go +++ b/web/models/resource_pss.gen.go @@ -21,8 +21,8 @@ type ResourcePss struct { Quota int32 `gorm:"column:quota;comment:配额数量" json:"quota"` // 配额数量 Used int32 `gorm:"column:used;comment:已用数量" json:"used"` // 已用数量 Expire time.Time `gorm:"column:expire;comment:过期时间" json:"expire"` // 过期时间 - DailyLimit int32 `gorm:"column:daily_limit;comment:每日限制" json:"daily_limit"` // 每日限制 - DailyUsed int32 `gorm:"column:daily_used;comment:今日已用数量" json:"daily_used"` // 今日已用数量 + DailyLimit int32 `gorm:"column:daily_limit;not null;comment:每日限制" json:"daily_limit"` // 每日限制 + DailyUsed int32 `gorm:"column:daily_used;not null;comment:今日已用数量" json:"daily_used"` // 今日已用数量 DailyLast time.Time `gorm:"column:daily_last;comment:今日最后使用时间" json:"daily_last"` // 今日最后使用时间 CreatedAt time.Time `gorm:"column:created_at;default:CURRENT_TIMESTAMP;comment:创建时间" json:"created_at"` // 创建时间 UpdatedAt time.Time `gorm:"column:updated_at;default:CURRENT_TIMESTAMP;comment:更新时间" json:"updated_at"` // 更新时间 diff --git a/web/router.go b/web/router.go index 789bf87..f3dfca5 100644 --- a/web/router.go +++ b/web/router.go @@ -16,6 +16,6 @@ func ApplyRouters(app *fiber.App) { auth.Post("/token", handlers.Token) // 通道 - channel := api.Group("/channel", PermitUser()) - channel.Post("/create", handlers.CreateChannel) + channel := api.Group("/channel") + channel.Post("/create", PermitUser(), handlers.CreateChannel) } diff --git a/web/services/channel.go b/web/services/channel.go index 69acf83..da3c888 100644 --- a/web/services/channel.go +++ b/web/services/channel.go @@ -2,6 +2,7 @@ package services import ( "context" + "encoding/json" "errors" "fmt" "log/slog" @@ -12,7 +13,6 @@ import ( "platform/web/common" "platform/web/models" q "platform/web/queries" - "strconv" "time" "github.com/google/uuid" @@ -41,10 +41,7 @@ func (s *channelService) CreateChannel( var channels []*models.Channel err := q.Q.Transaction(func(tx *q.Query) error { // 查找套餐 - var resource = struct { - data models.Resource - pss models.ResourcePss - }{} + var resource = ResourceInfo{} err := q.Resource.As("data"). LeftJoin(q.ResourcePss.As("pss"), q.ResourcePss.ResourceID.EqCol(q.Resource.ID)). Where(q.Resource.ID.Eq(resourceId)). @@ -57,29 +54,29 @@ func (s *channelService) CreateChannel( } // 检查使用人 - if auth.Payload.Type == PayloadUser && auth.Payload.Id != resource.data.UserID { + if auth.Payload.Type == PayloadUser && auth.Payload.Id != resource.UserId { return common.AuthForbiddenErr("无权限访问") } // 检查套餐状态 - if !resource.data.Active { + if !resource.Active { return ChannelServiceErr("套餐已失效") } // 检查每日限额 - today := time.Now().Format("2006-01-02") == resource.pss.DailyLast.Format("2006-01-02") - dailyRemain := int(math.Max(float64(resource.pss.DailyLimit-resource.pss.DailyUsed), 0)) + today := time.Now().Format("2006-01-02") == resource.DailyLast.Format("2006-01-02") + dailyRemain := int(math.Max(float64(resource.DailyLimit-resource.DailyUsed), 0)) if today && dailyRemain < count { return ChannelServiceErr("套餐每日配额不足") } // 检查时间或配额 - if resource.pss.Type == 1 { // 包时 - if resource.pss.Expire.Before(time.Now()) { + if resource.Type == 1 { // 包时 + if resource.Expire.Before(time.Now()) { return ChannelServiceErr("套餐已过期") } } else { // 包量 - remain := int(math.Max(float64(resource.pss.Quota-resource.pss.Used), 0)) + remain := int(math.Max(float64(resource.Quota-resource.Used), 0)) if remain < count { return ChannelServiceErr("套餐配额不足") } @@ -115,7 +112,7 @@ func (s *channelService) CreateChannel( AuthPass: authType == ChannelAuthTypePass, Username: username, Password: password, - Expiration: time.Now().Add(time.Duration(resource.pss.Live) * time.Second), + Expiration: time.Now().Add(time.Duration(resource.Live) * time.Second), }) } } @@ -128,18 +125,24 @@ func (s *channelService) CreateChannel( // 更新套餐使用记录 if today { - resource.pss.DailyUsed += int32(count) - resource.pss.Used += int32(count) + resource.DailyUsed += int32(count) + resource.Used += int32(count) } else { - resource.pss.DailyLast = time.Now() - resource.pss.DailyUsed = int32(count) - resource.pss.Used += int32(count) + resource.DailyLast = time.Now() + resource.DailyUsed = int32(count) + resource.Used += int32(count) } err = tx.ResourcePss. - Where(q.ResourcePss.ID.Eq(resource.pss.ID)). - Omit(q.ResourcePss.ResourceID). - Save(&resource.pss) + Where(q.ResourcePss.ID.Eq(resource.Id)). + Select( + q.ResourcePss.Used, + q.ResourcePss.DailyUsed, + q.ResourcePss.DailyLast). + Save(&models.ResourcePss{ + Used: resource.Used, + DailyUsed: resource.DailyUsed, + DailyLast: resource.DailyLast}) if err != nil { return err } @@ -151,10 +154,10 @@ func (s *channelService) CreateChannel( } // 缓存通道信息与异步删除任务 - err = cache(ctx, channels) - if err != nil { - return nil, err - } + // err = cache(ctx, channels) + // if err != nil { + // return nil, err + // } // 返回连接通道列表 return channels, errors.New("not implemented") @@ -234,10 +237,6 @@ func (s *channelService) RemoveChannels(ctx context.Context, auth *AuthContext, return nil } -func chKey(channel *models.Channel) string { - return fmt.Sprintf("channel:%s:%s", channel.UserHost, channel.NodeHost) -} - type ChannelServiceErr string func (c ChannelServiceErr) Error() string { @@ -254,7 +253,7 @@ func (s *channelService) RemoteCreateChannel( authType ChannelAuthType, count int, nodeFilter ...NodeFilterConfig, -) ([]*models.Channel, error) { +) ([]AssignPortResult, error) { filter := NodeFilterConfig{} if len(nodeFilter) > 0 { @@ -265,8 +264,11 @@ func (s *channelService) RemoteCreateChannel( var resource = new(ResourceInfo) data := q.Resource.As("data") pss := q.ResourcePss.As("pss") - err := data.Scopes(orm.Alias(data)). - Select(data.ALL, pss.ALL). + err := data.Debug().Scopes(orm.Alias(data)). + Select( + data.ID, data.UserID, data.Active, + pss.Type, pss.Live, pss.DailyUsed, pss.DailyLimit, pss.DailyLast, pss.Quota, pss.Used, pss.Expire, + ). LeftJoin(q.ResourcePss.As("pss"), pss.ResourceID.EqCol(data.ID)). Where(data.ID.Eq(resourceId)). Scan(&resource) @@ -282,27 +284,32 @@ func (s *channelService) RemoteCreateChannel( if err != nil { return nil, err } + slog.Debug("检查用户权限完成") // 申请节点 - assigned, err := assignEdge(count, filter) + edgeAssigns, err := assignEdge(count, filter) if err != nil { return nil, err } + debugAssigned := fmt.Sprintf("%+v", edgeAssigns) + slog.Debug("申请节点完成", "edgeAssigns", debugAssigned) // 分配端口 - expiration := time.Now().Add(time.Duration(resource.pss.Live) * time.Second) - channels, err := assignPort(assigned, auth.Payload.Id, protocol, authType, expiration, filter) + expiration := time.Now().Add(time.Duration(resource.Live) * time.Second) + portAssigns, err := assignPort(edgeAssigns, auth.Payload.Id, protocol, authType, expiration, filter) if err != nil { return nil, err } + debugChannels := fmt.Sprintf("%+v", portAssigns) + slog.Debug("分配端口完成", "portAssigns", debugChannels) // 缓存并关闭代理 - err = cache(ctx, channels) + err = cache(ctx, portAssigns) if err != nil { return nil, err } - return channels, nil + return portAssigns, nil } // endregion @@ -310,29 +317,29 @@ func (s *channelService) RemoteCreateChannel( func checkUser(auth *AuthContext, resource *ResourceInfo, count int) error { // 检查使用人 - if auth.Payload.Type == PayloadUser && auth.Payload.Id != resource.data.UserID { + if auth.Payload.Type == PayloadUser && auth.Payload.Id != resource.UserId { return common.AuthForbiddenErr("无权限访问") } // 检查套餐状态 - if !resource.data.Active { + if !resource.Active { return ChannelServiceErr("套餐已失效") } // 检查每日限额 - today := time.Now().Format("2006-01-02") == resource.pss.DailyLast.Format("2006-01-02") - dailyRemain := int(math.Max(float64(resource.pss.DailyLimit-resource.pss.DailyUsed), 0)) + today := time.Now().Format("2006-01-02") == resource.DailyLast.Format("2006-01-02") + dailyRemain := int(math.Max(float64(resource.DailyLimit-resource.DailyUsed), 0)) if today && dailyRemain < count { return ChannelServiceErr("套餐每日配额不足") } // 检查时间或配额 - if resource.pss.Type == 1 { // 包时 - if resource.pss.Expire.Before(time.Now()) { + if resource.Type == 1 { // 包时 + if resource.Expire.Before(time.Now()) { return ChannelServiceErr("套餐已过期") } } else { // 包量 - remain := int(math.Max(float64(resource.pss.Quota-resource.pss.Used), 0)) + remain := int(math.Max(float64(resource.Quota-resource.Used), 0)) if remain < count { return ChannelServiceErr("套餐配额不足") } @@ -342,7 +349,7 @@ func checkUser(auth *AuthContext, resource *ResourceInfo, count int) error { } // assignEdge 分配边缘节点数量 -func assignEdge(count int, filter NodeFilterConfig) ([]AssignEdgeResult, error) { +func assignEdge(count int, filter NodeFilterConfig) ([]*AssignEdgeResult, error) { // 查询现有节点连接情况 edgeConfigs, err := remote.Client.CloudAutoQuery() if err != nil { @@ -356,46 +363,59 @@ func assignEdge(count int, filter NodeFilterConfig) ([]AssignEdgeResult, error) return nil, err } - // 尽量平均分配节点用量 - var total = count - for _, v := range edgeConfigs { - total += v.Count + // 过滤需要变动的连接配置 + type ConfigInfo struct { + proxy *models.Proxy + config *remote.AutoConfig } - avg := int(math.Ceil(float64(total) / float64(len(edgeConfigs)))) - - var result []AssignEdgeResult - var rCount = 0 - for _, proxy := range proxies { - prev, ok := edgeConfigs[proxy.Name] - var nextCount = 0 - if !ok || (prev.Count < avg && prev.Count < total) { - nextCount = int(math.Min(float64(avg), float64(total))) - result = append(result, AssignEdgeResult{ + var total = count + var assigns = make([]*AssignEdgeResult, len(proxies), len(proxies)) + for i, proxy := range proxies { + remoteConfigs := edgeConfigs[proxy.Name] + for _, config := range remoteConfigs { + if config.Isp == filter.Isp && config.City == filter.City && config.Province == filter.Prov { + total += config.Count + assigns[i] = &AssignEdgeResult{ + proxy: proxy, + count: config.Count, + } + } + } + if assigns[i] == nil { + assigns[i] = &AssignEdgeResult{ proxy: proxy, - count: nextCount - prev.Count, - }) - total -= nextCount + count: 0, + } + } + } + avg := int(math.Ceil(float64(total) / float64(len(proxies)))) + + for i, assign := range assigns { + var prev = assign.count + var next = assign.count + if prev < avg && prev < total { + next = int(math.Min(float64(avg), float64(total))) + assigns[i].count = next - prev + total -= next } else { continue } - _rCount, err := remote.Client.CloudConnect(remote.CloudConnectReq{ - Uuid: proxy.Name, - Edge: nil, - AutoConfig: []remote.AutoConfig{{ - Province: filter.Prov, - City: filter.City, - Isp: filter.Isp, - Count: nextCount, - }}, - }) - if err != nil { - return nil, err - } - rCount += _rCount + // err := remote.Client.CloudConnect(remote.CloudConnectReq{ + // Uuid: assign.Proxy.Name, + // Edge: nil, + // AutoConfig: []remote.AutoConfig{{ + // Province: filter.Prov, + // City: filter.City, + // Isp: filter.Isp, + // Count: next, + // }}, + // }) + // if err != nil { + // return nil, err + // } } - slog.Debug("cloud connect", "count", rCount) - return result, nil + return assigns, nil } type AssignEdgeResult struct { @@ -405,13 +425,13 @@ type AssignEdgeResult struct { // assignPort 分配指定数量的端口 func assignPort( - assigns []AssignEdgeResult, + assigns []*AssignEdgeResult, userId int32, protocol ChannelProtocol, authType ChannelAuthType, expiration time.Time, filter NodeFilterConfig, -) ([]*models.Channel, error) { +) ([]AssignPortResult, error) { // 查询代理已配置端口 var proxyIds = make([]int32, 0, len(assigns)) for _, assigned := range assigns { @@ -425,7 +445,8 @@ func assignPort( q.Channel.ProxyID.In(proxyIds...), q.Channel.Expiration.Gt(time.Now())). Group( - q.Channel.ProxyPort). + q.Channel.ProxyPort, + q.Channel.ProxyID). Find() if err != nil { return nil, err @@ -439,14 +460,19 @@ func assignPort( } // 配置启用代理 - var result []*models.Channel - for i := 0; i < len(assigns); i++ { - proxy := assigns[i].proxy - count := assigns[i].count + var result = make([]AssignPortResult, len(assigns)) + for i, assign := range assigns { + var proxy = assign.proxy + var count = assign.count + + result[i] = AssignPortResult{ + Proxy: proxy, + } // 筛选可用端口 - var portConfigs = make([]remote.PortConfigsReq, count) - for port := 10000; port < 20000 || len(portConfigs) < count; port++ { + var channels = result[i].Channels + var configs = make([]remote.PortConfigsReq, 0, count) + for port := 10000; port < 20000 && len(configs) < count; port++ { // 跳过存在的端口 key := uint64(proxy.ID)<<32 | uint64(port) _, ok := proxyPorts[key] @@ -455,8 +481,9 @@ func assignPort( } // 配置新端口 - portConfigs[port] = remote.PortConfigsReq{ - Port: strconv.Itoa(port), + var i = len(configs) + configs = append(configs, remote.PortConfigsReq{ + Port: port, Edge: nil, Status: true, AutoEdgeConfig: remote.AutoEdgeConfig{ @@ -465,7 +492,7 @@ func assignPort( Isp: filter.Isp, Count: 1, }, - } + }) switch authType { case ChannelAuthTypeIp: var whitelist []string @@ -476,9 +503,9 @@ func assignPort( if err != nil { return nil, err } - portConfigs[port].Whitelist = whitelist + configs[i].Whitelist = whitelist for _, item := range whitelist { - result = append(result, &models.Channel{ + channels = append(channels, &models.Channel{ UserID: userId, ProxyID: proxy.ID, UserHost: item, @@ -491,8 +518,8 @@ func assignPort( } case ChannelAuthTypePass: username, password := genPassPair() - portConfigs[port].Userpass = fmt.Sprintf("%s:%s", username, password) - result = append(result, &models.Channel{ + configs[i].Userpass = fmt.Sprintf("%s:%s", username, password) + channels = append(channels, &models.Channel{ UserID: userId, ProxyID: proxy.ID, ProxyPort: int32(port), @@ -506,38 +533,64 @@ func assignPort( } } + result[i].Channels = channels + + if len(configs) < count { + return nil, ChannelServiceErr("网关端口数量到达上限,无法分配") + } + // 提交端口配置 - gateway := remote.InitGateway( - proxy.Host, - "api", - "123456", - ) - err = gateway.GatewayPortConfigs(portConfigs) + // gateway := remote.InitGateway( + // proxy.Host, + // "api", + // "123456", + // ) + // err = gateway.GatewayPortConfigs(configs) + // if err != nil { + // return nil, err + // } + + // 保存到数据库 + err = q.Channel. + Omit(q.Channel.NodeID). + Save(channels...) if err != nil { return nil, err } } - err = q.Channel.Save(result...) - if err != nil { - return nil, err - } - return result, nil } -func cache(ctx context.Context, channels []*models.Channel) error { +type AssignPortResult struct { + Proxy *models.Proxy + Channels []*models.Channel +} + +func chKey(channel *models.Channel) string { + return fmt.Sprintf("channel:%s:%s", channel.UserHost, channel.NodeHost) +} + +func cache(ctx context.Context, assigns []AssignPortResult) error { pipe := rds.Client.TxPipeline() - zList := make([]redis.Z, 0, len(channels)) - for _, channel := range channels { - pipe.Set(ctx, chKey(channel), channel, channel.Expiration.Sub(time.Now())) - zList = append(zList, redis.Z{ - Score: float64(channel.Expiration.Unix()), - Member: channel.ID, - }) + zList := make([]redis.Z, 0, len(assigns)) + for _, assign := range assigns { + var channels = assign.Channels + for _, channel := range channels { + marshal, err := json.Marshal(assign) + if err != nil { + return err + } + + pipe.Set(ctx, chKey(channel), string(marshal), channel.Expiration.Sub(time.Now())) + zList = append(zList, redis.Z{ + Score: float64(channel.Expiration.Unix()), + Member: channel.ID, + }) + } } - pipe.ZAdd(ctx, "tasks:channel", zList...) + pipe.ZAdd(ctx, "tasks:assign", zList...) _, err := pipe.Exec(ctx) if err != nil { @@ -549,7 +602,7 @@ func cache(ctx context.Context, channels []*models.Channel) error { func deleteCache(ctx context.Context, channels []*models.Channel) error { pipe := rds.Client.TxPipeline() - keys := make([]string, len(channels)) + keys := make([]string, 0, len(channels)) for i := range keys { keys[i] = chKey(channels[i]) } @@ -564,6 +617,15 @@ func deleteCache(ctx context.Context, channels []*models.Channel) error { } type ResourceInfo struct { - data models.Resource - pss models.ResourcePss + Id int32 + UserId int32 + Active bool + Type int32 + Live int32 + DailyLimit int32 + DailyUsed int32 + DailyLast time.Time + Quota int32 + Used int32 + Expire time.Time }