package services import ( "context" "database/sql" "fmt" "log/slog" "math" "math/rand/v2" "platform/pkg/env" "platform/pkg/u" "platform/web/core" channel2 "platform/web/domains/channel" edge2 "platform/web/domains/edge" proxy2 "platform/web/domains/proxy" resource2 "platform/web/domains/resource" g "platform/web/globals" "platform/web/globals/orm" m "platform/web/models" q "platform/web/queries" "platform/web/tasks" "strconv" "strings" "time" "github.com/hibiken/asynq" "gorm.io/gen/field" "github.com/redis/go-redis/v9" ) var Channel = &channelService{} type channelService struct{} // region 删除通道 func (s *channelService) RemoveChannels(id []int32, userId ...int32) error { var now = time.Now() err := q.Q.Transaction(func(tx *q.Query) error { // 查找通道 var do = tx.Channel.Where(q.Channel.ID.In(id...)) if len(userId) > 0 { do.Where(q.Channel.UserID.Eq(userId[0])) } channels, err := tx.Channel.Where(do).Find() if err != nil { return core.NewBizErr("查找通道失败", err) } proxyMap := make(map[int32]*m.Proxy) proxyIds := make([]int32, 0) resourceMap := make(map[int32]*m.Resource) resourceIds := make([]int32, 0) for _, channel := range channels { if _, ok := proxyMap[channel.ProxyID]; !ok { proxyIds = append(proxyIds, channel.ProxyID) proxyMap[channel.ProxyID] = &m.Proxy{} } if _, ok := resourceMap[channel.ResourceID]; !ok { resourceIds = append(resourceIds, channel.ResourceID) resourceMap[channel.ResourceID] = &m.Resource{} } } // 查找资源 resources, err := tx.Resource.Where(tx.Resource.ID.In(resourceIds...)).Find() if err != nil { return core.NewBizErr("查找资源失败", err) } for _, res := range resources { resourceMap[res.ID] = res } // 查找代理 proxies, err := tx.Proxy.Where(q.Proxy.ID.In(proxyIds...)).Find() if err != nil { return core.NewBizErr("查找代理失败", err) } for _, proxy := range proxies { proxyMap[proxy.ID] = proxy } // 区分通道类型 shortToRemove := make([]*m.Channel, 0) longToRemove := make([]*m.Channel, 0) for _, channel := range channels { resource := resourceMap[channel.ResourceID] switch resource2.Type(resource.Type) { case resource2.TypeShort: shortToRemove = append(shortToRemove, channel) case resource2.TypeLong: longToRemove = append(longToRemove, channel) } } // 删除指定的通道 result, err := tx.Channel. Where(q.Channel.ID.In(id...)). Update(q.Channel.DeletedAt, now) if err != nil { return core.NewBizErr("删除通道失败", err) } if result.RowsAffected != int64(len(channels)) { return core.NewBizErr("删除通道数量不匹配") } // 禁用代理端口并下线用过的节点 if env.DebugExternalChange { var step = time.Now() if len(shortToRemove) > 0 { err := removeShortChannelExternal(proxies, shortToRemove) if err != nil { return core.NewBizErr("提交删除通道配置失败", err) } } slog.Debug("提交删除通道配置", "step", time.Since(step)) } return nil }) if err != nil { return err } return nil } func removeShortChannelExternal(proxies []*m.Proxy, channels []*m.Channel) error { // 组织数据 var configMap = make(map[int32][]g.PortConfigsReq, len(proxies)) var proxyMap = make(map[int32]*m.Proxy, len(proxies)) for _, proxy := range proxies { configMap[proxy.ID] = make([]g.PortConfigsReq, 0) proxyMap[proxy.ID] = proxy } var portMap = make(map[uint64]struct{}) for _, channel := range channels { var config = g.PortConfigsReq{ Port: int(channel.ProxyPort), Edge: &[]string{}, AutoEdgeConfig: &g.AutoEdgeConfig{ Count: u.P(0), }, Status: false, } configMap[channel.ProxyID] = append(configMap[channel.ProxyID], config) key := uint64(channel.ProxyID)<<32 | uint64(channel.ProxyPort) portMap[key] = struct{}{} } // 更新配置 for proxyId, configs := range configMap { if len(configs) == 0 { continue } proxy, ok := proxyMap[proxyId] if !ok { return core.NewBizErr("代理不存在") } if proxy.Secret == nil { return core.NewBizErr("代理未配置密钥") } var secret = strings.Split(*proxy.Secret, ":") gateway := g.NewGateway( proxy.Host, secret[0], secret[1], ) // 查询节点配置 actives, err := gateway.GatewayPortActive() if err != nil { return core.NewBizErr("查询节点配置失败", err) } // 更新节点配置 err = gateway.GatewayPortConfigs(configs) if err != nil { return core.NewBizErr("提交删除通道配置失败", err) } // 下线对应节点 var edges []string for portStr, active := range actives { port, err := strconv.Atoi(portStr) if err != nil { return core.NewBizErr("端口转换失败", err) } key := uint64(proxyId)<<32 | uint64(port) if _, ok := portMap[key]; ok { edges = append(edges, active.Edge...) } } if len(edges) > 0 { _, err := g.Cloud.CloudDisconnect(g.CloudDisconnectReq{ Uuid: proxy.Name, Edge: edges, }) if err != nil { return core.NewBizErr("下线节点失败", err) } } } return nil } // endregion // region 创建通道 func (s *channelService) CreateChannel( userId int32, resourceId int32, protocol channel2.Protocol, authType ChannelAuthType, count int, edgeFilter ...EdgeFilter, ) (channels []*m.Channel, err error) { var now = time.Now() var filter = EdgeFilter{} if len(edgeFilter) > 0 { filter = edgeFilter[0] } var resource *ResourceInfo err = q.Q.Transaction(func(q *q.Query) (err error) { // 查找套餐 resource, err = findResource(q, resourceId, userId, count, now) if err != nil { return core.NewBizErr("查找套餐失败", err) } // 查找白名单 var whitelist []string if authType == ChannelAuthTypeIp { whitelist, err = findWhitelist(q, userId) if err != nil { return core.NewBizErr("查找白名单失败", err) } } // 分配节点 var config = ChannelCreator{ Protocol: protocol, AuthIp: authType == ChannelAuthTypeIp, Whitelists: whitelist, AuthPass: authType == ChannelAuthTypePass, } switch resource.Type { case resource2.TypeShort: config.Expiration = now.Add(time.Duration(resource.Live) * time.Second) channels, err = assignShortChannels(q, userId, resourceId, count, config, filter, now) case resource2.TypeLong: config.Expiration = now.Add(time.Duration(resource.Live) * time.Hour) channels, err = assignLongChannels(q, userId, resourceId, count, config, filter) } if err != nil { return core.NewBizErr("分配通道失败", err) } // 保存通道开通结果 err = saveAssigns(q, resource, channels, now) if err != nil { return core.NewBizErr("保存通道失败", err) } return nil }, &sql.TxOptions{Isolation: sql.LevelRepeatableRead}) if err != nil { return nil, err } // 定时异步删除过期通道 var duration time.Duration switch resource.Type { case resource2.TypeShort: duration = time.Duration(resource.Live) * time.Second case resource2.TypeLong: duration = time.Duration(resource.Live) * time.Minute } var ids = make([]int32, len(channels)) for i := range channels { ids[i] = channels[i].ID } _, err = g.Asynq.Enqueue( tasks.NewRemoveChannel(ids), asynq.ProcessIn(duration), ) if err != nil { return nil, core.NewBizErr("提交异步删除通道任务失败", err) } return channels, nil } func findResource(q *q.Query, resourceId int32, userId int32, count int, now time.Time) (*ResourceInfo, error) { resource, err := q.Resource. Preload( q.Resource.Short, q.Resource.Long, ). Where( q.Resource.ID.Eq(resourceId), q.Resource.UserID.Eq(userId), ). Take() if err != nil { return nil, ErrResourceNotExist } var info = &ResourceInfo{ Id: resource.ID, Active: resource.Active, Type: resource2.Type(resource.Type), } switch resource2.Type(resource.Type) { case resource2.TypeShort: var sub = resource.Short var dailyLast = time.Time{} if sub.DailyLast != nil { dailyLast = time.Time(*sub.DailyLast) } var expire = time.Time{} if sub.Expire != nil { expire = time.Time(*sub.Expire) } var quota int32 if sub.Quota != nil { quota = *sub.Quota } info.Mode = resource2.Mode(sub.Type) info.Live = sub.Live info.DailyLimit = sub.DailyLimit info.DailyUsed = sub.DailyUsed info.DailyLast = dailyLast info.Expire = expire info.Quota = quota info.Used = sub.Used case resource2.TypeLong: var sub = resource.Long var dailyLast = time.Time{} if sub.DailyLast != nil { dailyLast = time.Time(*sub.DailyLast) } var expire = time.Time{} if sub.Expire != nil { expire = time.Time(*sub.Expire) } var quota int32 if sub.Quota != nil { quota = *sub.Quota } info.Mode = resource2.Mode(sub.Type) info.Live = sub.Live info.DailyLimit = sub.DailyLimit info.DailyUsed = sub.DailyUsed info.DailyLast = dailyLast info.Expire = expire info.Quota = quota info.Used = sub.Used } // 检查套餐状态 if !info.Active { return nil, ErrResourceInvalid } // 检查套餐使用情况 switch info.Mode { default: return nil, core.NewBizErr("不支持的套餐模式") // 包时 case resource2.ModeTime: // 检查过期时间 if info.Expire.Before(now) { return nil, ErrResourceExpired } // 检查每日限额 used := 0 if now.Format("2006-01-02") == info.DailyLast.Format("2006-01-02") { used = int(info.DailyUsed) } excess := used+count > int(info.DailyLimit) if excess { return nil, ErrResourceDailyLimit } // 包量 case resource2.ModeCount: // 检查可用配额 if int(info.Quota)-int(info.Used) < count { return nil, ErrResourceExhausted } } return info, nil } func findWhitelist(q *q.Query, userId int32) ([]string, error) { var whitelist []string err := q.Whitelist. Where(q.Whitelist.UserID.Eq(userId)). Select(q.Whitelist.Host). Scan(&whitelist) if err != nil { return nil, core.NewBizErr("查询白名单失败", err) } if len(whitelist) == 0 { return nil, core.NewBizErr("没有配置白名单") } return whitelist, nil } func assignShortChannels(q *q.Query, userId int32, resourceId int32, count int, config ChannelCreator, filter EdgeFilter, now time.Time) ([]*m.Channel, error) { // 查找网关 proxies, err := q.Proxy. Where(q.Proxy.Type.Eq(int32(proxy2.TypeThirdParty))). Find() if err != nil { return nil, core.NewBizErr("查找网关失败", err) } // 查找已使用的节点 var proxyIds = make([]int32, len(proxies)) for i, proxy := range proxies { proxyIds[i] = proxy.ID } allChannels, err := q.Channel. Select( q.Channel.ProxyID, q.Channel.ProxyPort). Where( q.Channel.ProxyID.In(proxyIds...), q.Channel.Expiration.Gt(orm.LocalDateTime(now))). Group( q.Channel.ProxyPort, q.Channel.ProxyID). Find() if err != nil { return nil, core.NewBizErr("查找已使用的节点失败", err) } // 查询已配置的节点 remoteConfigs, err := g.Cloud.CloudAutoQuery() if err != nil { return nil, core.NewBizErr("查询远端节点配置失败", err) } // 统计已用节点量与端口查找表 var proxyUses = make(map[int32]int, len(allChannels)) var portsMap = make(map[uint64]struct{}) for _, channel := range allChannels { proxyUses[channel.ProxyID]++ key := uint64(channel.ProxyID)<<32 | uint64(channel.ProxyPort) portsMap[key] = struct{}{} } // 计算分配额度 var total = len(allChannels) + count var avg = int(math.Ceil(float64(total) / float64(len(proxies)))) // 分配节点 var newChannels = make([]*m.Channel, 0, count) for _, proxy := range proxies { var prev = proxyUses[proxy.ID] var next = int(math.Max(float64(prev), float64(int(math.Min(float64(avg), float64(total)))))) total -= next var acc = next - prev if acc <= 0 { continue } // 获取远端配置量 var count = 0 remoteConfig, ok := remoteConfigs[proxy.Name] if ok { for _, config := range remoteConfig { if config.Isp == filter.Isp && config.City == filter.City && config.Province == filter.Prov { count = config.Count break } } } // 提交节点配置 if env.DebugExternalChange && next > count { var step = time.Now() var multiple float64 = 2 // 扩张倍数 var newConfig = g.AutoConfig{ Province: filter.Prov, City: filter.City, Isp: filter.Isp, Count: int(math.Ceil(float64(next) * multiple)), } var newConfigs []g.AutoConfig if count == 0 { newConfigs = append(newConfigs, newConfig) } else { newConfigs = make([]g.AutoConfig, len(remoteConfig)) for i, config := range remoteConfig { if config.Isp == filter.Isp && config.City == filter.City && config.Province == filter.Prov { count = config.Count break } newConfigs[i] = config } } err := g.Cloud.CloudConnect(g.CloudConnectReq{ Uuid: proxy.Name, Edge: nil, AutoConfig: newConfigs, }) if err != nil { return nil, core.NewBizErr("提交节点配置失败", err) } slog.Debug("提交节点配置", slog.Duration("step", time.Since(step)), slog.String("proxy", proxy.Name), slog.Int("used", prev), slog.Int("count", next), ) } // 筛选可用端口 var portConfigs = make([]g.PortConfigsReq, 0, acc) for port := 10000; port < 20000 && len(portConfigs) < acc; port++ { // 跳过存在的端口 key := uint64(proxy.ID)<<32 | uint64(port) _, ok := portsMap[key] if ok { continue } // 配置新端口 var portConf = g.PortConfigsReq{ Port: port, Edge: nil, Status: true, AutoEdgeConfig: &g.AutoEdgeConfig{ Province: filter.Prov, City: filter.City, Isp: filter.Isp, Count: u.P(1), PacketLoss: 30, }, } var newChannel = &m.Channel{ UserID: userId, ProxyID: proxy.ID, ResourceID: resourceId, ProxyHost: proxy.Host, ProxyPort: int32(port), Protocol: u.P(int32(config.Protocol)), Expiration: orm.LocalDateTime(config.Expiration), } if config.AuthIp { portConf.Whitelist = &config.Whitelists newChannel.AuthIP = true newChannel.Whitelists = u.P(strings.Join(config.Whitelists, ",")) } if config.AuthPass { username, password := genPassPair() portConf.Userpass = u.P(fmt.Sprintf("%s:%s", username, password)) newChannel.AuthPass = true newChannel.Username = &username newChannel.Password = &password } portConfigs = append(portConfigs, portConf) newChannels = append(newChannels, newChannel) } if len(portConfigs) < acc { return nil, core.NewBizErr("网关端口数量到达上限,无法分配") } // 提交端口配置 if env.DebugExternalChange { var step = time.Now() if proxy.Secret == nil { return nil, core.NewBizErr("代理未配置密钥") } var secret = strings.Split(*proxy.Secret, ":") gateway := g.NewGateway( proxy.Host, secret[0], secret[1], ) err = gateway.GatewayPortConfigs(portConfigs) if err != nil { return nil, core.NewBizErr("提交端口配置失败", err) } slog.Debug("提交端口配置", "step", time.Since(step)) } } if len(newChannels) != count { return nil, core.NewBizErr("分配节点失败") } return newChannels, nil } func assignLongChannels(q *q.Query, userId int32, resourceId int32, count int, config ChannelCreator, filter EdgeFilter) ([]*m.Channel, error) { // 查询符合条件的节点,根据 channel 统计使用次数 var edges = make([]struct { m.Edge Count int ProxyHost string ProxySecret string }, 0) do := q.Edge.Where(q.Edge.Status.Eq(1)) if filter.Prov != "" { do = do.Where(q.Edge.Prov.Eq(filter.Prov)) } if filter.City != "" { do = do.Where(q.Edge.City.Eq(filter.City)) } if filter.Isp != "" { do = do.Where(q.Edge.Isp.Eq(int32(edge2.ISPFromStr(filter.Isp)))) } err := q.Edge. LeftJoin(q.Channel, q.Channel.EdgeID.EqCol(q.Edge.ID)). LeftJoin(q.Proxy, q.Proxy.ID.EqCol(q.Edge.ProxyID)). Select( q.Edge.ALL, q.Channel.ALL.Count().As("count"), q.Proxy.Host.As("proxy_host"), q.Proxy.Secret.As("proxy_secret"), ). Group(q.Edge.ID, q.Proxy.Host, q.Proxy.Secret). Where(do). Order(field.NewField("", "count").Asc()). Limit(count). Scan(&edges) if err != nil { return nil, core.NewBizErr("查询符合条件的节点失败", err) } if len(edges) == 0 { return nil, ErrEdgesNoAvailable } // 计算分配负载(考虑去重,维护一个节点使用记录表,优先分配未使用节点,达到算法额定负载后再选择负载最少的节点) var total = count for _, edge := range edges { total += edge.Count } var avg = int(math.Ceil(float64(total) / float64(len(edges)))) var channels = make([]*m.Channel, 0, count) var proxies = make(map[int32]*m.Proxy) var reqs = make(map[int32][]*g.ProxyPermitConfig) for _, edge := range edges { if edge.ProxyID == nil || edge.ProxyPort == nil { return nil, core.NewBizErr("节点配置不完整,缺少代理信息") } if _, ok := proxies[*edge.ProxyID]; !ok { proxies[*edge.ProxyID] = &m.Proxy{ ID: *edge.ProxyID, Host: edge.ProxyHost, Secret: &edge.ProxySecret, } } prev := edge.Count next := int(math.Max(float64(prev), float64(int(math.Min(float64(avg), float64(total)))))) total -= next acc := next - prev if acc <= 0 { continue } for range acc { var channel = &m.Channel{ UserID: userId, ProxyID: *edge.ProxyID, EdgeID: &edge.ID, ResourceID: resourceId, Protocol: u.P(int32(config.Protocol)), AuthIP: config.AuthIp, AuthPass: config.AuthPass, Expiration: orm.LocalDateTime(config.Expiration), ProxyHost: edge.ProxyHost, ProxyPort: *edge.ProxyPort, } if config.AuthIp { channel.Whitelists = u.P(strings.Join(config.Whitelists, ",")) } if config.AuthPass { username, password := genPassPair() channel.Username = &username channel.Password = &password } channels = append(channels, channel) req := &g.ProxyPermitConfig{ Id: *channel.EdgeID, Expire: time.Time(channel.Expiration), } if channel.AuthIP { req.Whitelists = &config.Whitelists } if channel.AuthPass { req.Username = channel.Username req.Password = channel.Password } reqs[*edge.ProxyID] = append(reqs[*edge.ProxyID], req) } } // 发送配置到网关 if env.DebugExternalChange { var step = time.Now() for id, reqs := range reqs { proxy := proxies[id] err := g.Proxy.Permit(proxy.Host, *proxy.Secret, reqs) if err != nil { return nil, core.NewBizErr("提交端口配置失败", err) } } slog.Debug("提交端口配置", "step", time.Since(step)) } return channels, nil } func saveAssigns(q *q.Query, resource *ResourceInfo, channels []*m.Channel, now time.Time) (err error) { if len(channels) == 0 { return nil } // 缓存通道数据 pipe := g.Redis.TxPipeline() zList := make([]redis.Z, 0, len(channels)) for _, channel := range channels { expiration := time.Time(channel.Expiration) zList = append(zList, redis.Z{ Score: float64(expiration.Unix()), Member: channel.ID, }) } pipe.ZAdd(context.Background(), "tasks:channel", zList...) _, err = pipe.Exec(context.Background()) if err != nil { return core.NewBizErr("缓存通道数据失败", err) } // 保存通道 err = q.Channel. Create(channels...) if err != nil { return core.NewBizErr("保存通道失败", err) } // 更新套餐使用记录 var count = len(channels) var last = resource.DailyLast var dailyUsed int32 if now.Year() != last.Year() || now.Month() != last.Month() || now.Day() != last.Day() { dailyUsed = int32(count) } else { dailyUsed = resource.DailyUsed + int32(count) } switch resource.Type { case resource2.TypeShort: _, err = q.ResourceShort. Where(q.ResourceShort.ResourceID.Eq(resource.Id)). UpdateSimple( q.ResourceShort.Used.Add(int32(count)), q.ResourceShort.DailyUsed.Value(dailyUsed), q.ResourceShort.DailyLast.Value(orm.LocalDateTime(now)), ) case resource2.TypeLong: _, err = q.ResourceLong. Where(q.ResourceLong.ResourceID.Eq(resource.Id)). UpdateSimple( q.ResourceLong.Used.Add(int32(count)), q.ResourceLong.DailyUsed.Value(dailyUsed), q.ResourceLong.DailyLast.Value(orm.LocalDateTime(now)), ) } if err != nil { return core.NewBizErr("更新套餐使用记录失败", err) } return nil } func genPassPair() (string, string) { //goland:noinspection SpellCheckingInspection var alphabet = []rune("abcdefghjkmnpqrstuvwxyz") var numbers = []rune("23456789") var username = make([]rune, 6) var password = make([]rune, 6) for i := range 6 { if i < 2 { username[i] = alphabet[rand.N(len(alphabet))] } else { username[i] = numbers[rand.N(len(numbers))] } password[i] = numbers[rand.N(len(numbers))] } return string(username), string(password) } // endregion type ChannelAuthType int const ( ChannelAuthTypeIp ChannelAuthType = iota + 1 ChannelAuthTypePass ) type ChannelCreator struct { Protocol channel2.Protocol AuthIp bool Whitelists []string AuthPass bool Expiration time.Time } type ResourceInfo struct { Id int32 Active bool Type resource2.Type Mode resource2.Mode Live int32 DailyLimit int32 DailyUsed int32 DailyLast time.Time Quota int32 Used int32 Expire time.Time } var ( ErrResourceNotExist = core.NewBizErr("套餐不存在") ErrResourceInvalid = core.NewBizErr("套餐不可用") ErrResourceExhausted = core.NewBizErr("套餐已用完") ErrResourceExpired = core.NewBizErr("套餐已过期") ErrResourceDailyLimit = core.NewBizErr("套餐每日配额已用完") ErrEdgesNoAvailable = core.NewBizErr("没有可用的节点") )