From 3f8e48ec68b070d4385c9a80f8107b4f564fa8dc Mon Sep 17 00:00:00 2001 From: luorijun Date: Sat, 17 May 2025 18:59:43 +0800 Subject: [PATCH] =?UTF-8?q?=E5=AE=9E=E7=8E=B0=E9=95=BF=E6=95=88=E5=A5=97?= =?UTF-8?q?=E9=A4=90=E5=88=9B=E5=BB=BA=E9=80=BB=E8=BE=91=EF=BC=8C=E5=B9=B6?= =?UTF-8?q?=E6=95=B4=E5=90=88=E4=B8=8D=E5=90=8C=E5=A5=97=E9=A4=90=E7=B1=BB?= =?UTF-8?q?=E5=9E=8B=E7=9A=84=E5=88=9B=E5=BB=BA=E6=B5=81=E7=A8=8B?= MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit --- README.md | 11 + cmd/gen/main.go | 10 + web/domains/edge/types.go | 14 ++ web/handlers/channel.go | 1 - web/models/resource.gen.go | 3 +- web/queries/bill.gen.go | 8 + web/queries/resource.gen.go | 83 ++++++- web/services/channel.go | 473 ++++++++++++++++++++---------------- web/services/edge.go | 85 ------- 9 files changed, 391 insertions(+), 297 deletions(-) delete mode 100644 web/services/edge.go diff --git a/README.md b/README.md index 7bfe3c2..5490c65 100644 --- a/README.md +++ b/README.md @@ -7,3 +7,14 @@ | proxy/shared-static | pss | 动态代理 | | proxy/shared-rotate | psr | 隧道代理 | | proxy/private-static | pps | 独享代理 | + +## 业务流程 + +### 短效动态 + +1. 获取远端连接配置 +2. 查询并计算实际连接状况 +3. 每个网关: + 1. 根据负载分配算法,计算是否需要为此网关分配新的连接 + 2. 如果需要新的连接且预计连接数大于已配置的连接数,则更新远端连接配置 + 3. 为新增连接分配可用端口,并更新网关端口配置 \ No newline at end of file diff --git a/cmd/gen/main.go b/cmd/gen/main.go index f07d782..a7fe1bf 100644 --- a/cmd/gen/main.go +++ b/cmd/gen/main.go @@ -46,6 +46,9 @@ func main() { resourceShort := g.GenerateModel("resource_short", common...) customs["resource_short"] = resourceShort + resourceLong := g.GenerateModel("resource_long", common...) + customs["resource_long"] = resourceLong + resource := g.GenerateModel("resource", append(common, gen.FieldRelate(field.HasOne, "Short", resourceShort, &field.RelateConfig{ RelatePointer: true, @@ -54,6 +57,13 @@ func main() { "references": []string{"ID"}, }, }), + gen.FieldRelate(field.HasOne, "Long", resourceLong, &field.RelateConfig{ + RelatePointer: true, + GORMTag: field.GormTag{ + "foreignKey": []string{"ResourceID"}, + "references": []string{"ID"}, + }, + }), )...) customs["resource"] = resource diff --git a/web/domains/edge/types.go b/web/domains/edge/types.go index 564ea38..de0d0a6 100644 --- a/web/domains/edge/types.go +++ b/web/domains/edge/types.go @@ -1,5 +1,7 @@ package edge +import "strings" + type ISP int32 const ( @@ -9,6 +11,18 @@ const ( IspChinaMobile // 中国移动 ) +func ISPFromStr(str string) ISP { + switch { + case strings.Contains(str, "电信"): + return IspChinaTelecom + case strings.Contains(str, "联通"): + return IspChinaUnicom + case strings.Contains(str, "移动"): + return IspChinaMobile + } + return IspUnknown +} + type Type int32 const ( diff --git a/web/handlers/channel.go b/web/handlers/channel.go index 92fa90d..b8239b9 100644 --- a/web/handlers/channel.go +++ b/web/handlers/channel.go @@ -154,7 +154,6 @@ func CreateChannel(c *fiber.Ctx) error { // 创建通道 result, err := s.Channel.CreateChannel( - c.Context(), authContext, req.ResourceId, req.Protocol, diff --git a/web/models/resource.gen.go b/web/models/resource.gen.go index b5af74c..12ab16d 100644 --- a/web/models/resource.gen.go +++ b/web/models/resource.gen.go @@ -18,11 +18,12 @@ type Resource struct { UserID int32 `gorm:"column:user_id;not null;comment:用户ID" json:"user_id"` // 用户ID ResourceNo string `gorm:"column:resource_no;comment:套餐编号" json:"resource_no"` // 套餐编号 Active bool `gorm:"column:active;not null;default:true;comment:套餐状态" json:"active"` // 套餐状态 - Type int32 `gorm:"column:type;not null;comment:套餐类型:1-动态,2-隧道,3-独享" json:"type"` // 套餐类型:1-动态,2-隧道,3-独享 + Type int32 `gorm:"column:type;not null;comment:套餐类型:1-短效动态,2-长效动态" json:"type"` // 套餐类型:1-短效动态,2-长效动态 CreatedAt orm.LocalDateTime `gorm:"column:created_at;default:CURRENT_TIMESTAMP;comment:创建时间" json:"created_at"` // 创建时间 UpdatedAt orm.LocalDateTime `gorm:"column:updated_at;default:CURRENT_TIMESTAMP;comment:更新时间" json:"updated_at"` // 更新时间 DeletedAt gorm.DeletedAt `gorm:"column:deleted_at;comment:删除时间" json:"deleted_at"` // 删除时间 Short *ResourceShort `gorm:"foreignKey:ResourceID;references:ID" json:"short"` + Long *ResourceLong `gorm:"foreignKey:ResourceID;references:ID" json:"long"` } // TableName Resource's table name diff --git a/web/queries/bill.gen.go b/web/queries/bill.gen.go index 2e841d3..9f308a9 100644 --- a/web/queries/bill.gen.go +++ b/web/queries/bill.gen.go @@ -60,6 +60,11 @@ func newBill(db *gorm.DB, opts ...gen.DOOption) bill { }{ RelationField: field.NewRelation("Resource.Short", "models.ResourceShort"), }, + Long: struct { + field.RelationField + }{ + RelationField: field.NewRelation("Resource.Long", "models.ResourceLong"), + }, } _bill.fillFieldMap() @@ -308,6 +313,9 @@ type billBelongsToResource struct { Short struct { field.RelationField } + Long struct { + field.RelationField + } } func (a billBelongsToResource) Where(conds ...field.Expr) *billBelongsToResource { diff --git a/web/queries/resource.gen.go b/web/queries/resource.gen.go index be0666c..044c2d2 100644 --- a/web/queries/resource.gen.go +++ b/web/queries/resource.gen.go @@ -41,6 +41,12 @@ func newResource(db *gorm.DB, opts ...gen.DOOption) resource { RelationField: field.NewRelation("Short", "models.ResourceShort"), } + _resource.Long = resourceHasOneLong{ + db: db.Session(&gorm.Session{}), + + RelationField: field.NewRelation("Long", "models.ResourceLong"), + } + _resource.fillFieldMap() return _resource @@ -54,12 +60,14 @@ type resource struct { UserID field.Int32 // 用户ID ResourceNo field.String // 套餐编号 Active field.Bool // 套餐状态 - Type field.Int32 // 套餐类型:1-动态,2-隧道,3-独享 + Type field.Int32 // 套餐类型:1-短效动态,2-长效动态 CreatedAt field.Field // 创建时间 UpdatedAt field.Field // 更新时间 DeletedAt field.Field // 删除时间 Short resourceHasOneShort + Long resourceHasOneLong + fieldMap map[string]field.Expr } @@ -99,7 +107,7 @@ func (r *resource) GetFieldByName(fieldName string) (field.OrderExpr, bool) { } func (r *resource) fillFieldMap() { - r.fieldMap = make(map[string]field.Expr, 9) + r.fieldMap = make(map[string]field.Expr, 10) r.fieldMap["id"] = r.ID r.fieldMap["user_id"] = r.UserID r.fieldMap["resource_no"] = r.ResourceNo @@ -192,6 +200,77 @@ func (a resourceHasOneShortTx) Count() int64 { return a.tx.Count() } +type resourceHasOneLong struct { + db *gorm.DB + + field.RelationField +} + +func (a resourceHasOneLong) Where(conds ...field.Expr) *resourceHasOneLong { + if len(conds) == 0 { + return &a + } + + exprs := make([]clause.Expression, 0, len(conds)) + for _, cond := range conds { + exprs = append(exprs, cond.BeCond().(clause.Expression)) + } + a.db = a.db.Clauses(clause.Where{Exprs: exprs}) + return &a +} + +func (a resourceHasOneLong) WithContext(ctx context.Context) *resourceHasOneLong { + a.db = a.db.WithContext(ctx) + return &a +} + +func (a resourceHasOneLong) Session(session *gorm.Session) *resourceHasOneLong { + a.db = a.db.Session(session) + return &a +} + +func (a resourceHasOneLong) Model(m *models.Resource) *resourceHasOneLongTx { + return &resourceHasOneLongTx{a.db.Model(m).Association(a.Name())} +} + +type resourceHasOneLongTx struct{ tx *gorm.Association } + +func (a resourceHasOneLongTx) Find() (result *models.ResourceLong, err error) { + return result, a.tx.Find(&result) +} + +func (a resourceHasOneLongTx) Append(values ...*models.ResourceLong) (err error) { + targetValues := make([]interface{}, len(values)) + for i, v := range values { + targetValues[i] = v + } + return a.tx.Append(targetValues...) +} + +func (a resourceHasOneLongTx) Replace(values ...*models.ResourceLong) (err error) { + targetValues := make([]interface{}, len(values)) + for i, v := range values { + targetValues[i] = v + } + return a.tx.Replace(targetValues...) +} + +func (a resourceHasOneLongTx) Delete(values ...*models.ResourceLong) (err error) { + targetValues := make([]interface{}, len(values)) + for i, v := range values { + targetValues[i] = v + } + return a.tx.Delete(targetValues...) +} + +func (a resourceHasOneLongTx) Clear() error { + return a.tx.Clear() +} + +func (a resourceHasOneLongTx) Count() int64 { + return a.tx.Count() +} + type resourceDo struct{ gen.DO } func (r resourceDo) Debug() *resourceDo { diff --git a/web/services/channel.go b/web/services/channel.go index 850e97d..609323f 100644 --- a/web/services/channel.go +++ b/web/services/channel.go @@ -3,9 +3,9 @@ package services import ( "context" "database/sql" - "errors" "fmt" "github.com/gofiber/fiber/v2" + "gorm.io/gen/field" "log/slog" "math" "math/rand/v2" @@ -13,7 +13,9 @@ import ( "platform/pkg/u" "platform/web/auth" channel2 "platform/web/domains/channel" + edge2 "platform/web/domains/edge" proxy2 "platform/web/domains/proxy" + resource2 "platform/web/domains/resource" g "platform/web/globals" "platform/web/globals/orm" m "platform/web/models" @@ -24,7 +26,6 @@ import ( "github.com/gofiber/fiber/v2/middleware/requestid" "github.com/redis/go-redis/v9" - "gorm.io/gorm" ) var Channel = &channelService{} @@ -32,20 +33,6 @@ var Channel = &channelService{} type channelService struct { } -type ResourceInfo struct { - Id int32 - UserId int32 - Active bool - Type int32 - Live int32 - DailyLimit int32 - DailyUsed int32 - DailyLast orm.LocalDateTime - Quota int32 - Used int32 - Expire orm.LocalDateTime -} - // region RemoveChannel func (s *channelService) RemoveChannels(ctx context.Context, authCtx *auth.Context, id ...int32) error { @@ -205,16 +192,14 @@ func (s *channelService) RemoveChannels(ctx context.Context, authCtx *auth.Conte // region CreateChannel func (s *channelService) CreateChannel( - ctx context.Context, authCtx *auth.Context, resourceId int32, protocol channel2.Protocol, authType ChannelAuthType, count int, edgeFilter ...EdgeFilterConfig, -) (newChannels []*m.Channel, err error) { +) (channels []*m.Channel, err error) { var now = time.Now() - var rid = ctx.Value(requestid.ConfigDefault.ContextKey).(string) var filter = EdgeFilterConfig{} if len(edgeFilter) > 0 { filter = edgeFilter[0] @@ -223,25 +208,13 @@ func (s *channelService) CreateChannel( err = q.Q.Transaction(func(q *q.Query) (err error) { // 查找套餐 - resource, err := findResource(q, resourceId, authCtx, count) - if err != nil { - return err - } - - // 查找网关 - proxies, err := findProxies(q) - if err != nil { - return err - } - - // 查找已使用的节点 - channels, err := findChannels(q, proxies) + resource, err := findResource(q, resourceId, authCtx.Payload.Id, count, now) if err != nil { return err } // 查找白名单 - var whitelist *[]string + var whitelist []string if authType == ChannelAuthTypeIp { whitelist, err = findWhitelist(q, authCtx.Payload.Id) if err != nil { @@ -250,26 +223,26 @@ func (s *channelService) CreateChannel( } // 分配节点 - var expire = now.Add(time.Duration(resource.Live) * time.Second) - newChannels, err = calcChannels(proxies, channels, whitelist, count, authCtx.Payload.Id, protocol, authType, expire, filter) + var config = ChannelCreateConfig{ + Protocol: protocol, + AuthIp: authType == ChannelAuthTypeIp, + Whitelists: whitelist, + AuthPass: authType == ChannelAuthTypePass, + Expiration: now.Add(time.Duration(resource.Live) * time.Second), + } + + switch resource2.Type(resource.Type) { + case resource2.TypeShort: + channels, err = assignShortChannels(q, authCtx.Payload.Id, count, config, filter, now) + case resource2.TypeLong: + channels, err = assignLongChannels(q, authCtx.Payload.Id, count, config, filter) + } if err != nil { return err } - // 更新套餐使用记录 - err = updateResource(rid, resource, count, now) - if err != nil { - return err - } - - // 保存通道 - err = saveChannels(newChannels) - if err != nil { - return err - } - - // 缓存通道数据 - err = cacheChannels(ctx, rid, newChannels, whitelist) + // 保存通道开通结果 + err = saveAssigns(q, resource, channels, now) if err != nil { return err } @@ -280,93 +253,86 @@ func (s *channelService) CreateChannel( return nil, err } - return newChannels, nil + return channels, nil } -func findResource(q *q.Query, resourceId int32, authCtx *auth.Context, count int) (*ResourceInfo, error) { - var resource = new(ResourceInfo) - data := q.Resource.As("data") - short := q.ResourceShort.As("short") - err := data.Scopes(orm.Alias(data)). - Select( - data.ID, data.UserID, data.Active, - short.Type, short.Live, short.DailyUsed, short.DailyLimit, short.DailyLast, short.Quota, short.Used, short.Expire, +func findResource(q *q.Query, resourceId int32, userId int32, count int, now time.Time) (*ResourceInfo, error) { + + resource, err := q.Resource. + Preload( + q.Resource.Short.On(q.Resource.Type.Eq(int32(resource2.TypeShort))), + q.Resource.Long.On(q.Resource.Type.Eq(int32(resource2.TypeLong))), ). - LeftJoin(q.ResourceShort.As("short"), short.ResourceID.EqCol(data.ID)). Where( - data.ID.Eq(resourceId), - data.UserID.Eq(authCtx.Payload.Id), + q.Resource.ID.Eq(resourceId), + q.Resource.UserID.Eq(userId), ). - Scan(&resource) + Take() if err != nil { - if errors.Is(err, gorm.ErrRecordNotFound) { - return nil, ErrResourceNotExist // 防止 id 猜测 - } - return nil, err + return nil, ErrResourceNotExist + } + + var info = &ResourceInfo{ + Id: resource.ID, + Active: resource.Active, + Type: resource2.Type(resource.Type), + } + switch resource2.Type(resource.Type) { + case resource2.TypeShort: + var sub = resource.Short + info.Mode = resource2.Mode(sub.Type) + info.Live = sub.Live + info.DailyLimit = sub.DailyLimit + info.DailyUsed = sub.DailyUsed + info.DailyLast = time.Time(sub.DailyLast) + info.Quota = sub.Quota + info.Used = sub.Used + info.Expire = time.Time(sub.DailyLast) + case resource2.TypeLong: + var sub = resource.Long + info.Mode = resource2.Mode(sub.Type) + info.Live = sub.Live + info.DailyLimit = sub.DailyLimit + info.DailyUsed = sub.DailyUsed + info.DailyLast = time.Time(sub.DailyLast) + info.Quota = sub.Quota + info.Used = sub.Used + info.Expire = time.Time(sub.DailyLast) } // 检查套餐状态 - if !resource.Active { + if !info.Active { return nil, ErrResourceInvalid } // 检查每日限额 - today := time.Now().Format("2006-01-02") == time.Time(resource.DailyLast).Format("2006-01-02") - dailyRemain := int(math.Max(float64(resource.DailyLimit-resource.DailyUsed), 0)) - if today && dailyRemain < count { + used := 0 + if now.Format("2006-01-02") == info.DailyLast.Format("2006-01-02") { + used = int(info.DailyUsed) + } + excess := used+count > int(info.DailyLimit) + if excess { return nil, ErrResourceDailyLimit } // 检查时间或配额 - if resource.Type == 1 { // 包时 - if time.Time(resource.Expire).Before(time.Now()) { + switch info.Mode { + case resource2.ModeTime: + if info.Expire.Before(now) { return nil, ErrResourceExpired } - } else { // 包量 - remain := int(math.Max(float64(resource.Quota-resource.Used), 0)) - if remain < count { + case resource2.ModeCount: + if int(info.Quota)-int(info.Used) < count { return nil, ErrResourceExhausted } + default: + return nil, ChannelServiceErr("不支持的套餐模式") } - return resource, nil + return info, nil } -func findProxies(q *q.Query) (proxies []*m.Proxy, err error) { - proxies, err = q.Proxy. - Where(q.Proxy.Type.Eq(int32(proxy2.TypeThirdParty))). - Find() - if err != nil { - return nil, err - } - - return proxies, nil -} - -func findChannels(q *q.Query, proxies []*m.Proxy) (channels []*m.Channel, err error) { - var proxyIds = make([]int32, len(proxies)) - for i, proxy := range proxies { - proxyIds[i] = proxy.ID - } - channels, err = q.Channel. - Select( - q.Channel.ProxyID, - q.Channel.ProxyPort). - Where( - q.Channel.ProxyID.In(proxyIds...), - q.Channel.Expiration.Gt(orm.LocalDateTime(time.Now()))). - Group( - q.Channel.ProxyPort, - q.Channel.ProxyID). - Find() - if err != nil { - return nil, err - } - - return channels, nil -} - -func findWhitelist(q *q.Query, userId int32) (*[]string, error) { +func findWhitelist(q *q.Query, userId int32) ([]string, error) { var whitelist []string err := q.Whitelist. Where(q.Whitelist.UserID.Eq(userId)). @@ -379,21 +345,45 @@ func findWhitelist(q *q.Query, userId int32) (*[]string, error) { return nil, ChannelServiceErr("用户没有白名单") } - return &whitelist, nil + return whitelist, nil } -func calcChannels( - proxies []*m.Proxy, - allChannels []*m.Channel, - whitelist *[]string, - count int, +func assignShortChannels( + q *q.Query, userId int32, - protocol channel2.Protocol, - authType ChannelAuthType, - expiration time.Time, + count int, + config ChannelCreateConfig, filter EdgeFilterConfig, + now time.Time, ) ([]*m.Channel, error) { - var step = time.Now() + + // 查找网关 + proxies, err := q.Proxy. + Where(q.Proxy.Type.Eq(int32(proxy2.TypeThirdParty))). + Find() + if err != nil { + return nil, err + } + + // 查找已使用的节点 + var proxyIds = make([]int32, len(proxies)) + for i, proxy := range proxies { + proxyIds[i] = proxy.ID + } + allChannels, err := q.Channel. + Select( + q.Channel.ProxyID, + q.Channel.ProxyPort). + Where( + q.Channel.ProxyID.In(proxyIds...), + q.Channel.Expiration.Gt(orm.LocalDateTime(now))). + Group( + q.Channel.ProxyPort, + q.Channel.ProxyID). + Find() + if err != nil { + return nil, err + } // 查询已配置的节点 remoteConfigs, err := g.Cloud.CloudAutoQuery() @@ -415,15 +405,19 @@ func calcChannels( var avg = int(math.Ceil(float64(total) / float64(len(proxies)))) // 分配节点 - var newChannels []*m.Channel + var newChannels = make([]*m.Channel, 0, count) for _, proxy := range proxies { - // 分配前后的节点量 var prev = proxyUses[proxy.ID] var next = int(math.Max(float64(prev), float64(int(math.Min(float64(avg), float64(total)))))) total -= next - // 网关配置的节点量 + var acc = next - prev + if acc <= 0 { + continue + } + + // 获取远端配置量 var count = 0 remoteConfig, ok := remoteConfigs[proxy.Name] if ok { @@ -435,6 +429,7 @@ func calcChannels( } } + // 提交节点配置 if env.DebugExternalChange && next > count { var step = time.Now() @@ -477,13 +472,7 @@ func calcChannels( ) } - // 节点增量 - var acc = next - prev - if acc <= 0 { - continue - } - - // 筛选可用端口 + // 筛选可用端口 todo auth all var portConfigs = make([]g.PortConfigsReq, 0, acc) for port := 10000; port < 20000 && len(portConfigs) < acc; port++ { // 跳过存在的端口 @@ -511,18 +500,18 @@ func calcChannels( ProxyID: proxy.ID, ProxyHost: proxy.Host, ProxyPort: int32(port), - Protocol: int32(protocol), - Expiration: orm.LocalDateTime(expiration), + Protocol: int32(config.Protocol), + Expiration: orm.LocalDateTime(config.Expiration), } - switch authType { + switch { - case ChannelAuthTypeIp: - portConf.Whitelist = whitelist + case config.AuthIp: + portConf.Whitelist = &config.Whitelists portConf.Userpass = u.P("") newChannel.AuthIP = true - case ChannelAuthTypePass: + case config.AuthPass: username, password := genPassPair() portConf.Whitelist = &[]string{} portConf.Userpass = u.P(fmt.Sprintf("%s:%s", username, password)) @@ -541,7 +530,7 @@ func calcChannels( return nil, ChannelServiceErr("网关端口数量到达上限,无法分配") } - // 提交端口配置并更新节点列表 + // 提交端口配置 if env.DebugExternalChange { var step = time.Now() @@ -560,37 +549,99 @@ func calcChannels( } } - slog.Debug("申请节点", "total", time.Since(step)) + if len(newChannels) != count { + return nil, ChannelServiceErr("分配节点失败") + } + return newChannels, nil } -func updateResource(rid string, resource *ResourceInfo, count int, now time.Time) (err error) { - toUpdate := m.ResourceShort{ - Used: resource.Used + int32(count), - DailyLast: orm.LocalDateTime(now), - } - var last = time.Time(resource.DailyLast) - if now.Year() != last.Year() || now.Month() != last.Month() || now.Day() != last.Day() { - toUpdate.DailyUsed = int32(count) - } else { - toUpdate.DailyUsed = resource.DailyUsed + int32(count) - } - _, err = q.ResourceShort. - Where(q.ResourceShort.ResourceID.Eq(resource.Id)). - Select( - q.ResourceShort.Used, - q.ResourceShort.DailyUsed, - q.ResourceShort.DailyLast, +func assignLongChannels(q *q.Query, userId int32, count int, config ChannelCreateConfig, filter EdgeFilterConfig) ([]*m.Channel, error) { + + // 查询符合条件的节点,根据 channel 统计使用次数 + var edges = make([]struct { + m.Edge + Count int + }, 0) + err := q.Edge. + LeftJoin(q.Channel, q.Channel.EdgeID.EqCol(q.Edge.ID)). + Select(q.Edge.ALL, q.Channel.ALL.Count().As("count")). + Group(q.Edge.ID). + Where( + q.Edge.Prov.Eq(filter.Prov), + q.Edge.City.Eq(filter.City), + q.Edge.Isp.Eq(int32(edge2.ISPFromStr(filter.Isp))), + q.Edge.Status.Eq(1), ). - Updates(toUpdate) + Order(field.NewField("", "count").Asc()). + Scan(edges) + if err != nil { + return nil, err + } + fmt.Printf("edges: %v\n", edges) + + // 计算分配负载(考虑去重,维护一个节点使用记录表,优先分配未使用节点,达到算法额定负载后再选择负载最少的节点) + var total = count + for _, edge := range edges { + total += edge.Count + } + var avg = int(math.Ceil(float64(total) / float64(len(edges)))) + var channels = make([]*m.Channel, 0, count) + for _, edge := range edges { + prev := edge.Count + next := int(math.Max(float64(prev), float64(int(math.Min(float64(avg), float64(total)))))) + total -= next + + acc := next - prev + if acc <= 0 { + continue + } + + for range acc { + var channel = &m.Channel{ + UserID: userId, + ProxyID: edge.ProxyID, + EdgeID: edge.ID, + Protocol: int32(config.Protocol), + AuthIP: config.AuthIp, + AuthPass: config.AuthPass, + Expiration: orm.LocalDateTime(config.Expiration), + } + if config.AuthPass { + username, password := genPassPair() + channel.Username = username + channel.Password = password + } + channels = append(channels, channel) + } + } + + // todo 发送配置到网关 + + return channels, nil +} + +func saveAssigns(q *q.Query, resource *ResourceInfo, channels []*m.Channel, now time.Time) (err error) { + + // 缓存通道数据 + pipe := g.Redis.TxPipeline() + + zList := make([]redis.Z, 0, len(channels)) + for _, channel := range channels { + expiration := time.Time(channel.Expiration) + zList = append(zList, redis.Z{ + Score: float64(expiration.Unix()), + Member: channel.ID, + }) + } + pipe.ZAdd(context.Background(), "tasks:channel", zList...) + + _, err = pipe.Exec(context.Background()) if err != nil { return err } - return nil -} - -func saveChannels(channels []*m.Channel) (err error) { + // 保存通道 err = q.Channel. Omit( q.Channel.EdgeID, @@ -602,31 +653,34 @@ func saveChannels(channels []*m.Channel) (err error) { return err } - return nil -} - -func cacheChannels(ctx context.Context, rid string, channels []*m.Channel, whitelists *[]string) (err error) { - if len(channels) == 0 { - return nil + // 更新套餐使用记录 + var count = len(channels) + var last = time.Time(resource.DailyLast) + var dailyUsed int32 + if now.Year() != last.Year() || now.Month() != last.Month() || now.Day() != last.Day() { + dailyUsed = int32(count) + } else { + dailyUsed = resource.DailyUsed + int32(count) } - pipe := g.Redis.TxPipeline() - - zList := make([]redis.Z, 0, len(channels)) - for _, channel := range channels { - expiration := time.Time(channel.Expiration) - keys := chAuthItems(channel, whitelists) - for _, key := range keys { - pipe.Set(ctx, key, true, time.Since(expiration)) - } - zList = append(zList, redis.Z{ - Score: float64(expiration.Unix()), - Member: channel.ID, - }) + switch resource2.Type(resource.Type) { + case resource2.TypeShort: + _, err = q.ResourceShort. + Where(q.ResourceShort.ResourceID.Eq(resource.Id)). + UpdateSimple( + q.ResourceShort.Used.Add(int32(count)), + q.ResourceShort.DailyUsed.Value(dailyUsed), + q.ResourceShort.DailyLast.Value(orm.LocalDateTime(now)), + ) + case resource2.TypeLong: + _, err = q.ResourceLong. + Where(q.ResourceLong.ResourceID.Eq(resource.Id)). + UpdateSimple( + q.ResourceLong.Used.Add(int32(count)), + q.ResourceLong.DailyUsed.Value(dailyUsed), + q.ResourceLong.DailyLast.Value(orm.LocalDateTime(now)), + ) } - pipe.ZAdd(ctx, "tasks:channel", zList...) - - _, err = pipe.Exec(ctx) if err != nil { return err } @@ -655,31 +709,6 @@ func genPassPair() (string, string) { return string(username), string(password) } -func chAuthItems(channel *m.Channel, whitelists *[]string) []string { - var count = 1 - var ips = make([]string, 0) - if channel.AuthIP && whitelists != nil { - count = len(*whitelists) - ips = *whitelists - } - - var proxy = channel.ProxyHost + ":" + strconv.Itoa(int(channel.ProxyPort)) - var sb = strings.Builder{} - var items = make([]string, count) - for i := range items { - // 权限 key 格式:::?:?:? - sb.WriteString(proxy) - if channel.AuthIP { - sb.WriteString(":" + ips[i]) - } - if channel.AuthPass { - sb.WriteString(":" + channel.Username + ":" + channel.Password) - } - } - - return items -} - type ChannelAuthType int const ( @@ -687,6 +716,34 @@ const ( ChannelAuthTypePass ) +type ChannelCreateConfig struct { + Protocol channel2.Protocol + AuthIp bool + Whitelists []string + AuthPass bool + Expiration time.Time +} + +type EdgeFilterConfig struct { + Isp string `json:"isp"` + Prov string `json:"prov"` + City string `json:"city"` +} + +type ResourceInfo struct { + Id int32 + Active bool + Type resource2.Type + Mode resource2.Mode + Live int32 + DailyLimit int32 + DailyUsed int32 + DailyLast time.Time + Quota int32 + Used int32 + Expire time.Time +} + type ChannelServiceErr string func (c ChannelServiceErr) Error() string { diff --git a/web/services/edge.go b/web/services/edge.go deleted file mode 100644 index aca4cfb..0000000 --- a/web/services/edge.go +++ /dev/null @@ -1,85 +0,0 @@ -package services - -import ( - "context" - g "platform/web/globals" - m "platform/web/models" -) - -type EdgeServiceErr string - -func (e EdgeServiceErr) Error() string { - return string(e) -} - -var Edge = &edgeService{} - -type edgeService struct{} - -func (s *edgeService) Filter(ctx context.Context, userId int32, count int, config ...EdgeFilterConfig) ([]*m.Edge, error) { - _config := EdgeFilterConfig{} - if len(config) > 0 { - _config = config[0] - } - - // 筛选符合条件且未分配给用户过的节点 - // 静态条件:省,市,运营商 - // 排序方式,1.分配给该用户的次数 2.分配给全部用户的次数 3.todo 节点的健康状态 - var edges []*FilteredEdge - g.DB.Raw(filterSqlRaw, userId, _config.Isp, _config.Prov, _config.City). - Limit(count). - Find(&edges) - - // todo 异步任务关闭代理 - - // todo 异步任务缩容 - - return nil, nil -} - -type EdgeFilterConfig struct { - Isp string `json:"isp"` - Prov string `json:"prov"` - City string `json:"city"` -} - -type EdgeFilterAsyncTask struct { - Config EdgeFilterConfig `json:"config"` - Count int `json:"count"` -} - -// 筛选节点的SQL语句,暂时用不到 -// 筛选已连接的符合条件且未分配给用户过的节点 -// -// 静态条件:省,市,运营商 -// 排序方式,1.分配给该用户的次数 2.分配给全部用户的次数 -const filterSqlRaw = ` -select - n.id as id, - n.name as name, - n.host as host, - n.fwd_port as fwd_port, - count(c.*) as total, - count(c.*) filter ( where c.user_id = ? ) as assigned -from - edge n - left join public.channel c - on n.id = c.edge_id and c.expiration > now() and c.deleted_at is null -where - n.isp = ? and - n.prov = ? and - n.city = ? -group by - n.id -order by - assigned, total -` - -type FilteredEdge struct { - Id int32 `json:"id"` - Name string `json:"name"` - Host string `json:"host"` - FwdPort int32 `json:"fwd_port"` - Total int32 `json:"total"` - Assigned int32 `json:"assigned"` -}