package services import ( "context" "encoding/json" "fmt" "log/slog" "net/netip" "platform/pkg/env" "platform/pkg/u" "platform/web/core" e "platform/web/events" g "platform/web/globals" "platform/web/globals/orm" m "platform/web/models" q "platform/web/queries" "strings" "time" "github.com/hibiken/asynq" "gorm.io/gen" "gorm.io/gen/field" ) type channelBaiyinProvider struct{} func (s *channelBaiyinProvider) CreateChannels(source netip.Addr, resourceNo string, authWhitelist bool, authPassword bool, count int, filter *EdgeFilter) ([]*m.Channel, error) { if filter == nil { return nil, core.NewBizErr("缺少节点过滤条件") } now := time.Now() batchNo := ID.GenReadable("bat") channels := make([]*m.Channel, count) // 资源锁,防止并发扣减失败导致的端口悬空问题 err := g.Redsync.WithLock(lockChannelCreateKey(resourceNo), func() error { // 检查并获取套餐与白名单 resource, whitelists, err := ensure(now, source, resourceNo, authWhitelist, count) if err != nil { return err } user := resource.User expire := now.Add(resource.Live) // 选择代理 proxy, gateway, err := selectProxy(count) if err != nil { return err } // 取用端口 chans, err := selectPorts(proxy.ID, batchNo, count, expire) if err != nil { return err } // 绑定节点端口 chanConfigs := make([]*g.PortConfigsReq, count) edgeConfigs := make([]string, 0, count) for i := range count { ch := chans[i] // 通道数据 channels[i] = &m.Channel{ UserID: user.ID, ResourceID: resource.ID, BatchNo: batchNo, ProxyID: proxy.ID, Host: u.Else(proxy.Host, proxy.IP.String()), Port: ch.Port(), FilterISP: filter.Isp, FilterProv: filter.Prov, FilterCity: filter.City, ExpiredAt: expire, Proxy: proxy, } // 通道配置数据 chanConfigs[i] = &g.PortConfigsReq{ Port: int(ch.Port()), Status: true, AutoEdgeConfig: &g.AutoEdgeConfig{ Province: u.Z(filter.Prov), City: u.Z(filter.City), Isp: filter.Isp.String(), Count: u.P(1), }, } // 白名单模式 if authWhitelist { channels[i].Whitelists = u.P(strings.Join(whitelists, ",")) chanConfigs[i].Whitelist = &whitelists } // 密码模式 if authPassword { username, password := genPassPair() channels[i].Username = &username channels[i].Password = &password chanConfigs[i].Userpass = u.P(username + ":" + password) } } // 提交配置 slog.Debug("提交代理端口配置", "proxy", proxy.IP.String(), "total_count", len(chanConfigs), "remote_count", len(edgeConfigs)) if env.RunMode == env.RunModeProd { // 从云端补足节点 err := ensureEdges(proxy, gateway, filter, count) if err != nil { slog.Warn("ensureEdges 失败", "err", err) // 不阻止通道创建,继续走后续流程 } // 启用网关代理通道 if len(chanConfigs) > 0 { if err := gateway.GatewayPortConfigs(chanConfigs); err != nil { slog.Warn("提交代理端口配置失败", "error", err.Error()) return core.NewServErr(fmt.Sprintf("配置代理 %s 端口失败", proxy.IP.String()), err) } } } else { for _, item := range chanConfigs { str, _ := json.Marshal(item) fmt.Println(string(str)) } } // 保存数据 err = q.Q.Transaction(func(q *q.Query) error { // 更新使用记录 var result gen.ResultInfo var err error switch resource.Type { case m.ResourceTypeShort: result, err = q.ResourceShort. Where( q.ResourceShort.ID.Eq(*resource.ShortId), q.ResourceShort.Used.Eq(resource.Used), q.ResourceShort.Daily.Eq(resource.Daily), ). UpdateSimple( q.ResourceShort.Used.Add(int32(count)), q.ResourceShort.Daily.Value(int32(resource.Today+count)), q.ResourceShort.LastAt.Value(now), ) case m.ResourceTypeLong: result, err = q.ResourceLong. Where( q.ResourceLong.ID.Eq(*resource.LongId), q.ResourceLong.Used.Eq(resource.Used), q.ResourceLong.Daily.Eq(resource.Daily), ). UpdateSimple( q.ResourceLong.Used.Add(int32(count)), q.ResourceLong.Daily.Value(int32(resource.Today+count)), q.ResourceLong.LastAt.Value(now), ) default: return core.NewBizErr("套餐类型不正确,无法更新") } if err != nil { return core.NewServErr("更新套餐使用记录失败", err) } if result.RowsAffected == 0 { return core.NewBizErr("套餐状态已过期") } // 保存通道 err = q.Channel. Omit(field.AssociationFields). Create(channels...) if err != nil { return core.NewServErr("保存通道失败", err) } // 保存提取记录 err = q.LogsUserUsage.Create(&m.LogsUserUsage{ UserID: user.ID, ResourceID: resource.ID, BatchNo: batchNo, Count: int32(count), ISP: u.X(filter.Isp.String()), Prov: filter.Prov, City: filter.City, IP: orm.Inet{Addr: source}, Time: now, }) if err != nil { return core.NewServErr("保存用户使用记录失败", err) } return nil }) if err != nil { return err } return nil }) if err != nil { return nil, err } return channels, nil } func (s *channelBaiyinProvider) RemoveChannels(batchNo string) error { return g.Redsync.WithLock(lockChannelRemoveKey(batchNo), func() error { start := time.Now() // 获取连接数据 channels, err := q.Channel.Where(q.Channel.BatchNo.Eq(batchNo)).Find() if err != nil { return core.NewServErr(fmt.Sprintf("获取通道数据失败,batch:%s", batchNo), err) } if len(channels) == 0 { slog.Warn(fmt.Sprintf("未找到通道数据,batch:%s", batchNo)) return nil } proxy, err := q.Proxy.Where(q.Proxy.ID.Eq(channels[0].ProxyID)).Take() if err != nil { return core.NewServErr(fmt.Sprintf("获取代理数据失败,batch:%s", batchNo), err) } // 检查通道是否存在 chans, err := g.Redis.LRange(context.Background(), usedChansKey(proxy.ID, batchNo), 0, -1).Result() if err != nil { return core.NewServErr("查询使用中通道失败", err) } if len(chans) == 0 { slog.Debug("通道为空,跳过清理", "key", usedChansKey(proxy.ID, batchNo)) return nil // 没有使用中通道,已经被清理过了 } // 准备配置数据 configs := make([]*g.PortConfigsReq, len(chans)) for i, ch := range chans { ap, err := netip.ParseAddrPort(ch) if err != nil { return core.NewServErr(fmt.Sprintf("解析通道数据失败: %s", ch), err) } configs[i] = &g.PortConfigsReq{ Port: int(ap.Port()), Edge: &[]string{}, AutoEdgeConfig: &g.AutoEdgeConfig{Count: u.P(0)}, Status: false, } } // 提交配置 if env.RunMode == env.RunModeProd { gateway, err := proxyGateway(proxy) if err != nil { return core.NewServErr("创建代理网关失败", err) } if err = gateway.GatewayPortConfigs(configs); err != nil { return core.NewServErr(fmt.Sprintf("清空代理 %s 端口配置失败", proxy.IP.String()), err) } } else { for _, item := range configs { str, _ := json.Marshal(item) fmt.Println(string(str)) } } // 释放端口 err = freeChans(proxy.ID, batchNo) if err != nil { return err } slog.Debug("清除代理端口配置", "proxy", proxy.ID, "batch", batchNo, "duration", time.Since(start).String()) return nil }) } // ClearExpiredChannels 清理指定代理的过期通道,并返回清理数量(现在理论上不会有需要手动批量清理的通道,未来可以废弃) func (s *channelBaiyinProvider) ClearExpiredChannels(proxyId int32) (int, error) { now := time.Now() // 获取未清理通道 keys, err := g.Redis.Keys(context.Background(), usedChansKey(proxyId, "*")).Result() if err != nil { return 0, core.NewServErr("查询使用中通道失败", err) } if len(keys) == 0 { return 0, nil } batchList := make([]string, len(keys)) batchSet := make(map[string]struct{}, len(keys)) for i, key := range keys { parts := strings.Split(key, ":") if len(parts) != 4 { return 0, core.NewServErr(fmt.Sprintf("使用中通道键格式错误: %s", key), nil) } batchList[i] = parts[3] batchSet[parts[3]] = struct{}{} } // 排除未过期通道 var batchQueried []struct{ BatchNo string } err = q.Channel. Select(q.Channel.BatchNo). Where( q.Channel.BatchNo.In(batchList...), q.Channel.ExpiredAt.Gte(now.UTC()), ). Group(q.Channel.BatchNo). Scan(&batchQueried) if err != nil { return 0, core.NewServErr("查询过期通道失败", err) } for _, batch := range batchQueried { delete(batchSet, batch.BatchNo) } // 清理过期通道 slog.Info("批量清理过期通道", "count", len(batchSet)) for batchNo, _ := range batchSet { err := s.RemoveChannels(batchNo) if err != nil { slog.Error("清理过期通道失败", "batch", batchNo, "error", err) } } return len(batchSet), nil } func lockChannelCreateKey(resourceNo string) string { return fmt.Sprintf("platform:channel:create:%s", resourceNo) } func lockChannelRemoveKey(bid string) string { return fmt.Sprintf("platform:batch:remove_expired:%s", bid) } func selectProxy(count int) (*m.Proxy, g.GatewayClient, error) { // 获取在线节点 proxies, err := q.Proxy.Where( q.Proxy.Type.Eq(int(m.ProxyTypeBaiYin)), q.Proxy.Status.Eq(int(m.ProxyStatusOnline)), ).Find() if err != nil { return nil, nil, core.NewBizErr("获取可用代理失败", err) } if len(proxies) == 0 { return nil, nil, core.NewBizErr("无可用代理") } proxyIDs := make([]int32, 0, len(proxies)) proxyMap := make(map[int32]*m.Proxy, len(proxies)) for _, item := range proxies { proxyIDs = append(proxyIDs, item.ID) proxyMap[item.ID] = item } // 获取最空闲节点 maxId := int32(0) maxCount := -1 for _, id := range proxyIDs { idCount, err := g.Redis.SCard(context.Background(), freeChansKey(id)).Result() if err != nil { return nil, nil, fmt.Errorf("查询可用通道数量失败: %w", err) } if idCount > int64(maxCount) { maxCount = int(idCount) maxId = id } } if maxCount < count { return nil, nil, core.NewBizErr("无可用代理") } proxy := proxyMap[maxId] gateway, err := proxyGateway(proxy) if err != nil { return nil, nil, core.NewServErr("创建代理网关失败", err) } return proxy, gateway, nil } func selectPorts(proxyId int32, batchNo string, count int, expire time.Time) ([]netip.AddrPort, error) { chans, err := lockChans(proxyId, batchNo, count) if err != nil { return nil, core.NewBizErr("无可用通道,请稍后再试", err) } _, err = g.Asynq.Enqueue( e.NewRemoveChannel(batchNo), asynq.ProcessAt(expire), ) if err != nil { return nil, core.NewServErr("注册异步关闭通道任务失败", err) } return chans, nil } // ensureEdges 检查本地节点是否足够,如果不足从云端连入 // 本地节点通过 Assigned = false 排除已分配节点 // 云端节点通过 NoRepeat = true 排除已分配节点 func ensureEdges(proxy *m.Proxy, gateway g.GatewayClient, filter *EdgeFilter, count int) error { if filter.IsEmpty() { return nil // 没有过滤条件,直接返回空,避免无意义的查询 } // 先查本地 localEdges, err := gateway.GatewayEdge(&g.GatewayEdgeReq{ Province: filter.Prov, City: filter.City, Isp: u.X(filter.Isp.String()), Limit: &count, Assigned: u.P(false), }) if err != nil { return core.NewBizErr("检查可用节点失败[1]", err) } if len(localEdges) >= count { return nil // 本地节点足够,直接返回空,后续逻辑会优先使用本地节点 } // 再查云端 remaining := count - len(localEdges) cloudEdges, err := g.Cloud.CloudEdges(&g.CloudEdgesReq{ Province: filter.Prov, City: filter.City, Isp: u.X(filter.Isp.String()), Limit: &remaining, NoRepeat: u.P(true), ActiveTime: u.P(3600), IpUnchangedTime: u.P(3600), }) if err != nil { return core.NewBizErr("检查可用节点失败[2]", err) } if len(cloudEdges.Edges) < remaining { return core.NewBizErr("地区可用节点数量不足") } // 连入云端节点 edges := make([]string, remaining) for i, edge := range cloudEdges.Edges { edges[i] = edge.EdgeID } if err := g.Cloud.CloudConnect(&g.CloudConnectReq{Uuid: proxy.Mac, Edge: &edges}); err != nil { return core.NewServErr("连接云平台失败", err) } return nil } type EdgeInfo struct { Type EdgeInfoType EdgeID string } type EdgeInfoType string const ( EdgeInfoLocal EdgeInfoType = "local" EdgeInfoCloud EdgeInfoType = "cloud" )