package services import ( "context" "database/sql" "fmt" "github.com/gofiber/fiber/v2" "gorm.io/gen/field" "log/slog" "math" "math/rand/v2" "platform/pkg/env" "platform/pkg/u" "platform/web/auth" channel2 "platform/web/domains/channel" edge2 "platform/web/domains/edge" proxy2 "platform/web/domains/proxy" resource2 "platform/web/domains/resource" g "platform/web/globals" "platform/web/globals/orm" m "platform/web/models" q "platform/web/queries" "strconv" "strings" "time" "github.com/gofiber/fiber/v2/middleware/requestid" "github.com/redis/go-redis/v9" ) var Channel = &channelService{} type channelService struct { } // region RemoveChannel func (s *channelService) RemoveChannels(ctx context.Context, authCtx *auth.Context, id ...int32) error { var now = time.Now() var rid = ctx.Value(requestid.ConfigDefault.ContextKey).(string) err := q.Q.Transaction(func(tx *q.Query) error { // 查找通道 channels, err := tx.Channel.Where( q.Channel.ID.In(id...), ).Find() if err != nil { return err } // 检查权限,如果为用户操作的话,则只能删除自己的通道 for _, channel := range channels { if authCtx.Payload.Type == auth.PayloadUser && authCtx.Payload.Id != channel.UserID { return fiber.NewError(fiber.StatusForbidden) } } // 查找代理 proxySet := make(map[int32]struct{}) proxyIds := make([]int32, 0) for _, channel := range channels { if _, ok := proxySet[channel.ProxyID]; !ok { proxyIds = append(proxyIds, channel.ProxyID) proxySet[channel.ProxyID] = struct{}{} } } proxies, err := tx.Proxy.Where( q.Proxy.ID.In(proxyIds...), ).Find() if err != nil { return err } // 删除指定的通道 result, err := tx.Channel.Debug(). Where(q.Channel.ID.In(id...)). Update(q.Channel.DeletedAt, now) if err != nil { return err } if result.RowsAffected != int64(len(channels)) { return ChannelServiceErr("删除通道失败") } // 禁用代理端口并下线用过的节点 if env.DebugExternalChange { var step = time.Now() // 组织数据 var configMap = make(map[int32][]g.PortConfigsReq, len(proxies)) var proxyMap = make(map[int32]*m.Proxy, len(proxies)) for _, proxy := range proxies { configMap[proxy.ID] = make([]g.PortConfigsReq, 0) proxyMap[proxy.ID] = proxy } var portMap = make(map[uint64]struct{}) for _, channel := range channels { var config = g.PortConfigsReq{ Port: int(channel.ProxyPort), Edge: &[]string{}, AutoEdgeConfig: &g.AutoEdgeConfig{ Count: u.P(0), }, Status: false, } configMap[channel.ProxyID] = append(configMap[channel.ProxyID], config) key := uint64(channel.ProxyID)<<32 | uint64(channel.ProxyPort) portMap[key] = struct{}{} } slog.Debug("组织数据", "rid", rid, "step", time.Since(step)) // 更新配置 for proxyId, configs := range configMap { if len(configs) == 0 { continue } proxy, ok := proxyMap[proxyId] if !ok { return ChannelServiceErr("代理不存在") } var secret = strings.Split(proxy.Secret, ":") gateway := g.NewGateway( proxy.Host, secret[0], secret[1], ) // 查询节点配置 step = time.Now() actives, err := gateway.GatewayPortActive() if err != nil { return err } slog.Debug("查询节点配置", "rid", rid, "step", time.Since(step)) // 更新节点配置 step = time.Now() err = gateway.GatewayPortConfigs(configs) if err != nil { return err } slog.Debug("更新节点配置", "rid", rid, "step", time.Since(step)) // 下线对应节点 step = time.Now() var edges []string for portStr, active := range actives { port, err := strconv.Atoi(portStr) if err != nil { return err } key := uint64(proxyId)<<32 | uint64(port) if _, ok := portMap[key]; ok { edges = append(edges, active.Edge...) } } if len(edges) > 0 { _, err := g.Cloud.CloudDisconnect(g.CloudDisconnectReq{ Uuid: proxy.Name, Edge: edges, }) if err != nil { return err } } slog.Debug("下线对应节点", "rid", rid, "step", time.Since(step)) } } return nil }) if err != nil { return err } return nil } // endregion // region CreateChannel func (s *channelService) CreateChannel( authCtx *auth.Context, resourceId int32, protocol channel2.Protocol, authType ChannelAuthType, count int, edgeFilter ...EdgeFilterConfig, ) (channels []*m.Channel, err error) { var now = time.Now() var filter = EdgeFilterConfig{} if len(edgeFilter) > 0 { filter = edgeFilter[0] } err = q.Q.Transaction(func(q *q.Query) (err error) { // 查找套餐 resource, err := findResource(q, resourceId, authCtx.Payload.Id, count, now) if err != nil { return err } // 查找白名单 var whitelist []string if authType == ChannelAuthTypeIp { whitelist, err = findWhitelist(q, authCtx.Payload.Id) if err != nil { return err } } // 分配节点 var config = ChannelCreateConfig{ Protocol: protocol, AuthIp: authType == ChannelAuthTypeIp, Whitelists: whitelist, AuthPass: authType == ChannelAuthTypePass, Expiration: now.Add(time.Duration(resource.Live) * time.Second), } switch resource2.Type(resource.Type) { case resource2.TypeShort: channels, err = assignShortChannels(q, authCtx.Payload.Id, count, config, filter, now) case resource2.TypeLong: channels, err = assignLongChannels(q, authCtx.Payload.Id, count, config, filter) } if err != nil { return err } // 保存通道开通结果 err = saveAssigns(q, resource, channels, now) if err != nil { return err } return nil }, &sql.TxOptions{Isolation: sql.LevelRepeatableRead}) if err != nil { return nil, err } return channels, nil } func findResource(q *q.Query, resourceId int32, userId int32, count int, now time.Time) (*ResourceInfo, error) { resource, err := q.Resource. Preload( q.Resource.Short.On(q.Resource.Type.Eq(int32(resource2.TypeShort))), q.Resource.Long.On(q.Resource.Type.Eq(int32(resource2.TypeLong))), ). Where( q.Resource.ID.Eq(resourceId), q.Resource.UserID.Eq(userId), ). Take() if err != nil { return nil, ErrResourceNotExist } var info = &ResourceInfo{ Id: resource.ID, Active: resource.Active, Type: resource2.Type(resource.Type), } switch resource2.Type(resource.Type) { case resource2.TypeShort: var sub = resource.Short info.Mode = resource2.Mode(sub.Type) info.Live = sub.Live info.DailyLimit = sub.DailyLimit info.DailyUsed = sub.DailyUsed info.DailyLast = time.Time(sub.DailyLast) info.Quota = sub.Quota info.Used = sub.Used info.Expire = time.Time(sub.DailyLast) case resource2.TypeLong: var sub = resource.Long info.Mode = resource2.Mode(sub.Type) info.Live = sub.Live info.DailyLimit = sub.DailyLimit info.DailyUsed = sub.DailyUsed info.DailyLast = time.Time(sub.DailyLast) info.Quota = sub.Quota info.Used = sub.Used info.Expire = time.Time(sub.DailyLast) } // 检查套餐状态 if !info.Active { return nil, ErrResourceInvalid } // 检查套餐使用情况 switch info.Mode { default: return nil, ChannelServiceErr("不支持的套餐模式") // 包时 case resource2.ModeTime: // 检查过期时间 if info.Expire.Before(now) { return nil, ErrResourceExpired } // 检查每日限额 used := 0 if now.Format("2006-01-02") == info.DailyLast.Format("2006-01-02") { used = int(info.DailyUsed) } excess := used+count > int(info.DailyLimit) if excess { return nil, ErrResourceDailyLimit } // 包量 case resource2.ModeCount: // 检查可用配额 if int(info.Quota)-int(info.Used) < count { return nil, ErrResourceExhausted } } return info, nil } func findWhitelist(q *q.Query, userId int32) ([]string, error) { var whitelist []string err := q.Whitelist. Where(q.Whitelist.UserID.Eq(userId)). Select(q.Whitelist.Host). Scan(&whitelist) if err != nil { return nil, err } if len(whitelist) == 0 { return nil, ChannelServiceErr("用户没有白名单") } return whitelist, nil } func assignShortChannels( q *q.Query, userId int32, count int, config ChannelCreateConfig, filter EdgeFilterConfig, now time.Time, ) ([]*m.Channel, error) { // 查找网关 proxies, err := q.Proxy. Where(q.Proxy.Type.Eq(int32(proxy2.TypeThirdParty))). Find() if err != nil { return nil, err } // 查找已使用的节点 var proxyIds = make([]int32, len(proxies)) for i, proxy := range proxies { proxyIds[i] = proxy.ID } allChannels, err := q.Channel. Select( q.Channel.ProxyID, q.Channel.ProxyPort). Where( q.Channel.ProxyID.In(proxyIds...), q.Channel.Expiration.Gt(orm.LocalDateTime(now))). Group( q.Channel.ProxyPort, q.Channel.ProxyID). Find() if err != nil { return nil, err } // 查询已配置的节点 remoteConfigs, err := g.Cloud.CloudAutoQuery() if err != nil { return nil, err } // 统计已用节点量与端口查找表 var proxyUses = make(map[int32]int, len(allChannels)) var portsMap = make(map[uint64]struct{}) for _, channel := range allChannels { proxyUses[channel.ProxyID]++ key := uint64(channel.ProxyID)<<32 | uint64(channel.ProxyPort) portsMap[key] = struct{}{} } // 计算分配额度 var total = len(allChannels) + count var avg = int(math.Ceil(float64(total) / float64(len(proxies)))) // 分配节点 var newChannels = make([]*m.Channel, 0, count) for _, proxy := range proxies { var prev = proxyUses[proxy.ID] var next = int(math.Max(float64(prev), float64(int(math.Min(float64(avg), float64(total)))))) total -= next var acc = next - prev if acc <= 0 { continue } // 获取远端配置量 var count = 0 remoteConfig, ok := remoteConfigs[proxy.Name] if ok { for _, config := range remoteConfig { if config.Isp == filter.Isp && config.City == filter.City && config.Province == filter.Prov { count = config.Count break } } } // 提交节点配置 if env.DebugExternalChange && next > count { var step = time.Now() var multiple float64 = 2 // 扩张倍数 var newConfig = g.AutoConfig{ Province: filter.Prov, City: filter.City, Isp: filter.Isp, Count: int(math.Ceil(float64(next) * multiple)), } var newConfigs []g.AutoConfig if count == 0 { newConfigs = append(newConfigs, newConfig) } else { newConfigs = make([]g.AutoConfig, len(remoteConfig)) for i, config := range remoteConfig { if config.Isp == filter.Isp && config.City == filter.City && config.Province == filter.Prov { count = config.Count break } newConfigs[i] = config } } err := g.Cloud.CloudConnect(g.CloudConnectReq{ Uuid: proxy.Name, Edge: nil, AutoConfig: newConfigs, }) if err != nil { return nil, err } slog.Debug("提交节点配置", slog.Duration("step", time.Since(step)), slog.String("proxy", proxy.Name), slog.Int("used", prev), slog.Int("count", next), ) } // 筛选可用端口 var portConfigs = make([]g.PortConfigsReq, 0, acc) for port := 10000; port < 20000 && len(portConfigs) < acc; port++ { // 跳过存在的端口 key := uint64(proxy.ID)<<32 | uint64(port) _, ok := portsMap[key] if ok { continue } // 配置新端口 var portConf = g.PortConfigsReq{ Port: port, Edge: nil, Status: true, AutoEdgeConfig: &g.AutoEdgeConfig{ Province: filter.Prov, City: filter.City, Isp: filter.Isp, Count: u.P(1), PacketLoss: 30, }, } var newChannel = &m.Channel{ UserID: userId, ProxyID: proxy.ID, ProxyHost: proxy.Host, ProxyPort: int32(port), Protocol: int32(config.Protocol), Expiration: orm.LocalDateTime(config.Expiration), } if config.AuthIp { portConf.Whitelist = &config.Whitelists newChannel.AuthIP = true } if config.AuthPass { username, password := genPassPair() portConf.Userpass = u.P(fmt.Sprintf("%s:%s", username, password)) newChannel.AuthPass = true newChannel.Username = username newChannel.Password = password } portConfigs = append(portConfigs, portConf) newChannels = append(newChannels, newChannel) } if len(portConfigs) < acc { return nil, ChannelServiceErr("网关端口数量到达上限,无法分配") } // 提交端口配置 if env.DebugExternalChange { var step = time.Now() var secret = strings.Split(proxy.Secret, ":") gateway := g.NewGateway( proxy.Host, secret[0], secret[1], ) err = gateway.GatewayPortConfigs(portConfigs) if err != nil { return nil, err } slog.Debug("提交端口配置", "step", time.Since(step)) } } if len(newChannels) != count { return nil, ChannelServiceErr("分配节点失败") } return newChannels, nil } func assignLongChannels(q *q.Query, userId int32, count int, config ChannelCreateConfig, filter EdgeFilterConfig) ([]*m.Channel, error) { // 查询符合条件的节点,根据 channel 统计使用次数 var edges = make([]struct { m.Edge Count int Host string }, 0) err := q.Edge. LeftJoin(q.Channel, q.Channel.EdgeID.EqCol(q.Edge.ID)). LeftJoin(q.Proxy, q.Proxy.ID.EqCol(q.Edge.ProxyID)). Select( q.Edge.ALL, q.Channel.ALL.Count().As("count"), q.Proxy.Host, ). Group(q.Edge.ID). Where( q.Edge.Prov.Eq(filter.Prov), q.Edge.City.Eq(filter.City), q.Edge.Isp.Eq(int32(edge2.ISPFromStr(filter.Isp))), q.Edge.Status.Eq(1), ). Order(field.NewField("", "count").Asc()). Scan(edges) if err != nil { return nil, err } fmt.Printf("edges: %v\n", edges) // 计算分配负载(考虑去重,维护一个节点使用记录表,优先分配未使用节点,达到算法额定负载后再选择负载最少的节点) var total = count for _, edge := range edges { total += edge.Count } var avg = int(math.Ceil(float64(total) / float64(len(edges)))) var channels = make([]*m.Channel, 0, count) var reqs = make(map[string][]*g.ProxyPermitConfig) for _, edge := range edges { prev := edge.Count next := int(math.Max(float64(prev), float64(int(math.Min(float64(avg), float64(total)))))) total -= next acc := next - prev if acc <= 0 { continue } for range acc { var channel = &m.Channel{ UserID: userId, ProxyID: edge.ProxyID, EdgeID: edge.ID, Protocol: int32(config.Protocol), AuthIP: config.AuthIp, AuthPass: config.AuthPass, Expiration: orm.LocalDateTime(config.Expiration), } if config.AuthPass { username, password := genPassPair() channel.Username = username channel.Password = password } channels = append(channels, channel) req := &g.ProxyPermitConfig{ Id: channel.EdgeID, Expire: time.Time(channel.Expiration), } if channel.AuthIP { req.Whitelists = &config.Whitelists } if channel.AuthPass { req.Username = &channel.Username req.Password = &channel.Password } reqs[edge.Host] = append(reqs[edge.Host], req) } } // 发送配置到网关 if env.DebugExternalChange { var step = time.Now() for host, reqs := range reqs { err := g.Proxy.Permit(host, reqs) if err != nil { return nil, err } } slog.Debug("提交端口配置", "step", time.Since(step)) } return channels, nil } func saveAssigns(q *q.Query, resource *ResourceInfo, channels []*m.Channel, now time.Time) (err error) { // 缓存通道数据 pipe := g.Redis.TxPipeline() zList := make([]redis.Z, 0, len(channels)) for _, channel := range channels { expiration := time.Time(channel.Expiration) zList = append(zList, redis.Z{ Score: float64(expiration.Unix()), Member: channel.ID, }) } pipe.ZAdd(context.Background(), "tasks:channel", zList...) _, err = pipe.Exec(context.Background()) if err != nil { return err } // 保存通道 err = q.Channel. Omit( q.Channel.EdgeID, q.Channel.EdgeHost, q.Channel.DeletedAt, ). Create(channels...) if err != nil { return err } // 更新套餐使用记录 var count = len(channels) var last = time.Time(resource.DailyLast) var dailyUsed int32 if now.Year() != last.Year() || now.Month() != last.Month() || now.Day() != last.Day() { dailyUsed = int32(count) } else { dailyUsed = resource.DailyUsed + int32(count) } switch resource2.Type(resource.Type) { case resource2.TypeShort: _, err = q.ResourceShort. Where(q.ResourceShort.ResourceID.Eq(resource.Id)). UpdateSimple( q.ResourceShort.Used.Add(int32(count)), q.ResourceShort.DailyUsed.Value(dailyUsed), q.ResourceShort.DailyLast.Value(orm.LocalDateTime(now)), ) case resource2.TypeLong: _, err = q.ResourceLong. Where(q.ResourceLong.ResourceID.Eq(resource.Id)). UpdateSimple( q.ResourceLong.Used.Add(int32(count)), q.ResourceLong.DailyUsed.Value(dailyUsed), q.ResourceLong.DailyLast.Value(orm.LocalDateTime(now)), ) } if err != nil { return err } return nil } // endregion func genPassPair() (string, string) { //goland:noinspection SpellCheckingInspection var alphabet = []rune("abcdefghjkmnpqrstuvwxyz") var numbers = []rune("23456789") var username = make([]rune, 6) var password = make([]rune, 6) for i := range 6 { if i < 2 { username[i] = alphabet[rand.N(len(alphabet))] } else { username[i] = numbers[rand.N(len(numbers))] } password[i] = numbers[rand.N(len(numbers))] } return string(username), string(password) } type ChannelAuthType int const ( ChannelAuthTypeIp ChannelAuthType = iota + 1 ChannelAuthTypePass ) type ChannelCreateConfig struct { Protocol channel2.Protocol AuthIp bool Whitelists []string AuthPass bool Expiration time.Time } type EdgeFilterConfig struct { Isp string `json:"isp"` Prov string `json:"prov"` City string `json:"city"` } type ResourceInfo struct { Id int32 Active bool Type resource2.Type Mode resource2.Mode Live int32 DailyLimit int32 DailyUsed int32 DailyLast time.Time Quota int32 Used int32 Expire time.Time } type ChannelServiceErr string func (c ChannelServiceErr) Error() string { return string(c) } const ( ErrResourceNotExist = ChannelServiceErr("套餐不存在") ErrResourceInvalid = ChannelServiceErr("套餐不可用") ErrResourceExhausted = ChannelServiceErr("套餐已用完") ErrResourceExpired = ChannelServiceErr("套餐已过期") ErrResourceDailyLimit = ChannelServiceErr("套餐每日配额已用完") )