diff --git a/README.md b/README.md index f21f9ab..2477dae 100644 --- a/README.md +++ b/README.md @@ -1,13 +1,67 @@ ## TODO -- 解决提取成功率问题 - - 提交代理端口配置后,并非同步将节点标记为占用状态,导致后续提取请求仍然会选中该节点,直到下一个提取请求才会发现该节点已满,导致提取失败? +### 接口并发问题 + +#### 找可用网关 & 找可用端点 +*(注:找端点依赖于网关状态,二者先后执行,但在并发控制上统一处理)* + +**下线并发问题** +* 提供指定网关的可重入读写锁A。 +* 整个提取期间锁定A直到提取结束。 +* 网关可以并发下线,但是不允许在锁A清空前进行删除或修改。 + +#### 找可用节点 + +##### 找本地 +**并发筛选问题** +多个提取请求筛选到同一个节点,只有一个请求可以占用该节点,其他请求会直接失败。 + +**处理方案:** +* 提供占用锁,持续到提交配置后。 +* 筛选出节点后,锁定该节点,如果锁定失败说明该节点已被占用(包括云端节点)。 +* 如果被占用则等待占用结束后重新筛选(如果在结束前就重新筛选,可能还是会筛选到同样的节点,再次导致失败)。 +* 提取时可能要求多个节点,因此锁定节点时,需要一个 lua 脚本同步锁定同一批节点。 + +##### 找云端 +**重复筛选问题** +云端接口不会自动过滤已连接的节点,有可能筛选到已经连接甚至配置的节点。 + +**处理(缓解)方案:** +* 优先筛选今日未分配的节点,如果没有可用节点,再分配已用节点,这个方案暂时缓解问题。 + +#### 整理配置信息 + +* 该环节**不会有并发问题**。 + +#### 开通通道 +*(注:包含以下三个独立操作,主要关注其执行的原子性与最终一致性保证)* + +##### 提交异步关闭任务 +**后续失败问题:** +* 不考虑回滚,执行时需要考虑后续数据不全的情况。 +* 提交需要 `proxy` 和 `batch` 参数,端口取用是独占的,因此在归还端口前,一定不会有其他连接使用端口。任务信息中需要包含足够的信息以在没有数据库信息时解除配置。 +* 解除连接与归还端口全部成功才算成功。 + +##### 提交配置到云端 +* 该环节**不会有并发问题**。 + +##### 保存到数据库 +**一致性处理:** +* 保存失败后,只会存在孤立占用。异步任务会自动重试,能够实现最终一致性。 + +--- + +- 并发扣减问题 +- 代理选择通过查 redis 实现 + +- 重新考虑取用流程设计,是否可以分离端口归还与通道断开。端口归还后通道即使没有断开,未来也会被其他请求占用,能够实现最终一致性 + +--- + - otel 没有正确记录接口失败信息 筛选和关联展示功能扩展 -channel 管理逻辑优化,需要携带 proxy 信息,考虑到 channel 没有成功创建的情况,或者保证创建操作的原子性 - 错误提示增强,展示整链路信息 ip 提取频率限制,在 ensure 函数加逻辑,通过 redis 或者 pg 计算分钟内提取次数,只允许每分钟提取 30 次 diff --git a/web/handlers/channel.go b/web/handlers/channel.go index fa9f12c..114aaae 100644 --- a/web/handlers/channel.go +++ b/web/handlers/channel.go @@ -257,7 +257,7 @@ func RemoveChannels(c *fiber.Ctx) error { } // 删除通道 - err = s.Channel.RemoveChannels(req.Batch, nil) + err = s.Channel.RemoveChannels(req.Batch) if err != nil { return err } diff --git a/web/services/channel.go b/web/services/channel.go index 08eb18a..d859fdf 100644 --- a/web/services/channel.go +++ b/web/services/channel.go @@ -25,7 +25,7 @@ var Channel = &channelServer{ type ChannelServiceProvider interface { CreateChannels(source netip.Addr, resourceId int32, authWhitelist bool, authPassword bool, count int, edgeFilter *EdgeFilter) ([]*m.Channel, error) - RemoveChannels(batch string, proxyId *int32) error + RemoveChannels(batch string) error ClearExpiredChannels(proxyId int32) (int, error) } @@ -37,8 +37,8 @@ func (s *channelServer) CreateChannels(source netip.Addr, resourceId int32, auth return s.provider.CreateChannels(source, resourceId, authWhitelist, authPassword, count, edgeFilter) } -func (s *channelServer) RemoveChannels(batch string, proxyId *int32) error { - return s.provider.RemoveChannels(batch, proxyId) +func (s *channelServer) RemoveChannels(batch string) error { + return s.provider.RemoveChannels(batch) } func (s *channelServer) ClearExpiredChannels(proxyId int32) (int, error) { diff --git a/web/services/channel_baiyin.go b/web/services/channel_baiyin.go index 916a688..343bfee 100644 --- a/web/services/channel_baiyin.go +++ b/web/services/channel_baiyin.go @@ -18,6 +18,7 @@ import ( "time" "github.com/hibiken/asynq" + "gorm.io/gen" "gorm.io/gen/field" ) @@ -29,7 +30,7 @@ func (s *channelBaiyinProvider) CreateChannels(source netip.Addr, resourceId int } now := time.Now() - batch := ID.GenReadable("bat") + batchNo := ID.GenReadable("bat") // 检查并获取套餐与白名单 resource, whitelists, err := ensure(now, source, resourceId, authWhitelist, count) @@ -40,210 +41,168 @@ func (s *channelBaiyinProvider) CreateChannels(source netip.Addr, resourceId int user := resource.User expire := now.Add(resource.Live) - // 注册异步关闭任务 - _, err = g.Asynq.Enqueue( - e.NewRemoveChannel(batch), - asynq.ProcessAt(expire), - ) - if err != nil { - return nil, core.NewServErr("注册异步关闭通道任务失败", err) - } - // 选择代理 - proxyResult := struct { - m.Proxy - Count int - }{} - err = q.Proxy. - LeftJoin(q.Channel, q.Channel.ProxyID.EqCol(q.Proxy.ID), q.Channel.ExpiredAt.Gt(now)). - Select(q.Proxy.ALL, field.NewUnsafeFieldRaw("10000 - count(*)").As("count")). - Where( - q.Proxy.Type.Eq(int(m.ProxyTypeBaiYin)), - q.Proxy.Status.Eq(int(m.ProxyStatusOnline)), - ). - Group(q.Proxy.ID). - Order(field.NewField("", "count")). - Limit(1).Scan(&proxyResult) + proxy, gateway, err := selectProxy(count) if err != nil { - return nil, core.NewBizErr("获取可用代理失败", err) + return nil, err } - if proxyResult.Count < count { - return nil, core.NewBizErr("无可用主机,请稍后再试") - } - proxy := proxyResult.Proxy // 取用端口 - var chans []netip.AddrPort - err = g.Redsync.WithLock(proxyStatusLockKey(proxy.ID), func() error { - lockedProxy, err := q.Proxy.Where(q.Proxy.ID.Eq(proxy.ID)).Take() + chans, err := selectPorts(proxy.ID, batchNo, count, expire) + if err != nil { + return nil, err + } + + // 节点查询到提交,需要锁定防止并发取用 + channels := make([]*m.Channel, count) + err = g.Redsync.WithLock(lockChannelCreateKey(), func() error { + // 取用节点 + edges, err := selectEdges(gateway, filter, count) if err != nil { return err } - if lockedProxy.Status != m.ProxyStatusOnline { - return core.NewBizErr("无可用主机,请稍后再试") + + // 绑定节点端口 + chanConfigs := make([]*g.PortConfigsReq, count) + edgeConfigs := make([]string, 0, count) + for i := range count { + ch := chans[i] + edge := edges[i] + + // 通道数据 + channels[i] = &m.Channel{ + UserID: user.ID, + ResourceID: resourceId, + BatchNo: batchNo, + ProxyID: proxy.ID, + Host: u.Else(proxy.Host, proxy.IP.String()), + Port: ch.Port(), + EdgeRef: u.P(edge.EdgeID), + FilterISP: filter.Isp, + FilterProv: filter.Prov, + FilterCity: filter.City, + ExpiredAt: expire, + Proxy: proxy, + } + + // 通道配置数据 + chanConfigs[i] = &g.PortConfigsReq{ + Port: int(ch.Port()), + Status: true, + Edge: &[]string{edge.EdgeID}, + } + + // 白名单模式 + if authWhitelist { + channels[i].Whitelists = u.P(strings.Join(whitelists, ",")) + chanConfigs[i].Whitelist = &whitelists + } + + // 密码模式 + if authPassword { + username, password := genPassPair() + channels[i].Username = &username + channels[i].Password = &password + chanConfigs[i].Userpass = u.P(username + ":" + password) + } + + // 连接配置数据 + if edge.Type == EdgeInfoCloud { + edgeConfigs = append(edgeConfigs, edge.EdgeID) + } } - chans, err = lockChans(proxy.ID, batch, count) - if err != nil { - return core.NewBizErr("无可用通道,请稍后再试", err) + // 提交配置 + slog.Debug("提交代理端口配置", "proxy", proxy.IP.String(), "total_count", len(chanConfigs), "remote_count", len(edgeConfigs)) + if env.RunMode == env.RunModeProd { + + // 连接节点到网关 + if err := g.Cloud.CloudConnect(&g.CloudConnectReq{Uuid: proxy.Mac, Edge: &edgeConfigs}); err != nil { + return core.NewServErr("连接云平台失败", err) + } + + // 启用网关代理通道 + if err := gateway.GatewayPortConfigs(chanConfigs); err != nil { + slog.Warn("提交代理端口配置失败", "error", err.Error()) + return core.NewServErr(fmt.Sprintf("配置代理 %s 端口失败", proxy.IP.String()), err) + } + } else { + for _, item := range chanConfigs { + str, _ := json.Marshal(item) + fmt.Println(string(str)) + } } - proxy = *lockedProxy - return nil - }) - if err != nil { - return nil, err - } + // 保存数据 + err = q.Q.Transaction(func(q *q.Query) error { + // 更新使用记录 + var result gen.ResultInfo + var err error + switch resource.Type { + case m.ResourceTypeShort: + result, err = q.ResourceShort. + Where( + q.ResourceShort.ID.Eq(*resource.ShortId), + q.ResourceShort.Used.Eq(resource.Used), + q.ResourceShort.Daily.Eq(resource.Daily), + ). + UpdateSimple( + q.ResourceShort.Used.Add(int32(count)), + q.ResourceShort.Daily.Value(int32(resource.Today+count)), + q.ResourceShort.LastAt.Value(now), + ) - // 取用节点 - secret := strings.Split(u.Z(proxy.Secret), ":") - if len(secret) != 2 { - return nil, core.NewServErr(fmt.Sprintf("代理 %s 密钥格式错误", proxy.IP.String()), nil) - } - gateway := g.NewGateway(proxy.IP.String(), secret[0], secret[1]) + case m.ResourceTypeLong: + result, err = q.ResourceLong. + Where( + q.ResourceLong.ID.Eq(*resource.LongId), + q.ResourceLong.Used.Eq(resource.Used), + q.ResourceLong.Daily.Eq(resource.Daily), + ). + UpdateSimple( + q.ResourceLong.Used.Add(int32(count)), + q.ResourceLong.Daily.Value(int32(resource.Today+count)), + q.ResourceLong.LastAt.Value(now), + ) - edges, err := getAvailableEdges(gateway, filter, count) - if err != nil { - return nil, err - } + default: + return core.NewBizErr("套餐类型不正确,无法更新", nil) + } + if err != nil { + return core.NewServErr("更新套餐使用记录失败", err) + } + if result.RowsAffected == 0 { + return core.NewBizErr("提取太频繁,请稍后再试", nil) + } - // 绑定节点到端口 - channels := make([]*m.Channel, count) - chanConfigs := make([]*g.PortConfigsReq, count) - edgeConfigs := make([]string, 0, count) - for i := range count { - ch := chans[i] - edge := edges[i] + // 保存通道 + err = q.Channel. + Omit(field.AssociationFields). + Create(channels...) + if err != nil { + return core.NewServErr("保存通道失败", err) + } - // 通道数据 - channels[i] = &m.Channel{ - UserID: user.ID, - ResourceID: resourceId, - BatchNo: batch, - ProxyID: proxy.ID, - Host: u.Else(proxy.Host, proxy.IP.String()), - Port: ch.Port(), - EdgeRef: u.P(edge.EdgeID), - FilterISP: filter.Isp, - FilterProv: filter.Prov, - FilterCity: filter.City, - ExpiredAt: expire, - Proxy: &proxy, - } + // 保存提取记录 + err = q.LogsUserUsage.Create(&m.LogsUserUsage{ + UserID: user.ID, + ResourceID: resourceId, + BatchNo: batchNo, + Count: int32(count), + ISP: u.X(filter.Isp.String()), + Prov: filter.Prov, + City: filter.City, + IP: orm.Inet{Addr: source}, + Time: now, + }) + if err != nil { + return core.NewServErr("保存用户使用记录失败", err) + } - // 通道配置数据 - chanConfigs[i] = &g.PortConfigsReq{ - Port: int(ch.Port()), - Status: true, - Edge: &[]string{edge.EdgeID}, - } - - // 白名单模式 - if authWhitelist { - channels[i].Whitelists = u.P(strings.Join(whitelists, ",")) - chanConfigs[i].Whitelist = &whitelists - } - - // 密码模式 - if authPassword { - username, password := genPassPair() - channels[i].Username = &username - channels[i].Password = &password - chanConfigs[i].Userpass = u.P(username + ":" + password) - } - - // 连接配置数据 - if edge.Type == EdgeInfoCloud { - edgeConfigs = append(edgeConfigs, edge.EdgeID) - } - } - - // 提交配置 - slog.Debug("提交代理端口配置", "proxy", proxy.IP.String(), "total_count", len(chanConfigs), "remote_count", len(edgeConfigs)) - if env.RunMode == env.RunModeProd { - - // 连接节点到网关 - err = g.Cloud.CloudConnect(&g.CloudConnectReq{ - Uuid: proxy.Mac, - Edge: &edgeConfigs, + return nil }) if err != nil { - return nil, core.NewServErr("连接云平台失败", err) - } - - // 启用网关代理通道 - err = gateway.GatewayPortConfigs(chanConfigs) - if err != nil { - slog.Warn("提交代理端口配置失败", "error", err.Error()) - return nil, core.NewServErr(fmt.Sprintf("配置代理 %s 端口失败", proxy.IP.String()), err) - } - } else { - for _, item := range chanConfigs { - str, _ := json.Marshal(item) - fmt.Println(string(str)) - } - } - - // 保存数据 - err = q.Q.Transaction(func(q *q.Query) error { - // 更新使用记录 - var err error - switch resource.Type { - case m.ResourceTypeShort: - _, err = q.ResourceShort. - Where( - q.ResourceShort.ID.Eq(*resource.ShortId), - q.ResourceShort.Used.Eq(resource.Used), - q.ResourceShort.Daily.Eq(resource.Daily), - ). - UpdateSimple( - q.ResourceShort.Used.Add(int32(count)), - q.ResourceShort.Daily.Value(int32(resource.Today+count)), - q.ResourceShort.LastAt.Value(now), - ) - - case m.ResourceTypeLong: - _, err = q.ResourceLong. - Where( - q.ResourceLong.ID.Eq(*resource.LongId), - q.ResourceLong.Used.Eq(resource.Used), - q.ResourceLong.Daily.Eq(resource.Daily), - ). - UpdateSimple( - q.ResourceLong.Used.Add(int32(count)), - q.ResourceLong.Daily.Value(int32(resource.Today+count)), - q.ResourceLong.LastAt.Value(now), - ) - - default: - return core.NewServErr("套餐类型不正确,无法更新", nil) - } - if err != nil { - return core.NewServErr("更新套餐使用记录失败", err) - } - - // 保存通道 - err = q.Channel. - Omit(field.AssociationFields). - Create(channels...) - if err != nil { - return core.NewServErr("保存通道失败", err) - } - - // 保存提取记录 - err = q.LogsUserUsage.Create(&m.LogsUserUsage{ - UserID: user.ID, - ResourceID: resourceId, - BatchNo: batch, - Count: int32(count), - ISP: u.P(filter.Isp.String()), - Prov: filter.Prov, - City: filter.City, - IP: orm.Inet{Addr: source}, - Time: now, - }) - if err != nil { - return core.NewServErr("保存用户使用记录失败", err) + return err } return nil @@ -255,110 +214,106 @@ func (s *channelBaiyinProvider) CreateChannels(source netip.Addr, resourceId int return channels, nil } -func (s *channelBaiyinProvider) RemoveChannels(batch string, proxyId *int32) error { - return g.Redsync.WithLock(batchRemoveExpiredKey(batch), func() error { +func (s *channelBaiyinProvider) RemoveChannels(batch string) error { + return g.Redsync.WithLock(lockChannelRemoveKey(batch), func() error { start := time.Now() - pid := int32(0) - if proxyId == nil { - // 获取连接数据 - channels, err := q.Channel.Where(q.Channel.BatchNo.Eq(batch)).Find() - if err != nil { - return core.NewServErr(fmt.Sprintf("获取通道数据失败,batch:%s", batch), err) - } - if len(channels) == 0 { - slog.Warn(fmt.Sprintf("未找到通道数据,batch:%s", batch)) - return nil - } + // 获取连接数据 + channels, err := q.Channel.Where(q.Channel.BatchNo.Eq(batch)).Find() + if err != nil { + return core.NewServErr(fmt.Sprintf("获取通道数据失败,batch:%s", batch), err) + } + if len(channels) == 0 { + slog.Warn(fmt.Sprintf("未找到通道数据,batch:%s", batch)) + return nil + } - proxy, err := q.Proxy.Where(q.Proxy.ID.Eq(channels[0].ProxyID)).Take() - if err != nil { - return core.NewServErr(fmt.Sprintf("获取代理数据失败,batch:%s", batch), err) - } + proxy, err := q.Proxy.Where(q.Proxy.ID.Eq(channels[0].ProxyID)).Take() + if err != nil { + return core.NewServErr(fmt.Sprintf("获取代理数据失败,batch:%s", batch), err) + } - // 检查通道是否存在 - exist, err := g.Redis.Exists(context.Background(), usedChansKey(proxy.ID, batch)).Result() - if err != nil { - return core.NewServErr("查询使用中通道失败", err) - } - if exist == 0 { - return nil // 没有使用中通道,已经被清理过了 - } - - // 准备配置数据 - edgeConfigs := make([]string, len(channels)) - configs := make([]*g.PortConfigsReq, len(channels)) - for i, channel := range channels { - if channel.EdgeRef != nil { - edgeConfigs[i] = *channel.EdgeRef - } else { - slog.Warn(fmt.Sprintf("通道 %d 没有保存节点引用", channel.ID)) - } - - configs[i] = &g.PortConfigsReq{ - Status: false, - Port: int(channel.Port), - Edge: &[]string{}, - } - } - - // 提交配置 - if env.RunMode == env.RunModeProd { - - // 清空通道配置 - secret := strings.Split(u.Z(proxy.Secret), ":") - gateway := g.NewGateway(proxy.IP.String(), secret[0], secret[1]) - err := gateway.GatewayPortConfigs(configs) - if err != nil { - return core.NewServErr(fmt.Sprintf("清空代理 %s 端口配置失败", proxy.IP.String()), err) - } - - // 断开节点连接 - _, err = g.Cloud.CloudDisconnect(&g.CloudDisconnectReq{ - Uuid: proxy.Mac, - Edge: &edgeConfigs, - }) - if err != nil { - slog.Warn("断开云平台连接失败", "error", err.Error()) - return core.NewServErr("断开云平台连接失败", err) - } + // 检查通道是否存在 + exist, err := g.Redis.Exists(context.Background(), usedChansKey(proxy.ID, batch)).Result() + if err != nil { + return core.NewServErr("查询使用中通道失败", err) + } + if exist == 0 { + return nil // 没有使用中通道,已经被清理过了 + } + // 准备配置数据 + edgeConfigs := make([]string, len(channels)) + configs := make([]*g.PortConfigsReq, len(channels)) + for i, channel := range channels { + if channel.EdgeRef != nil { + edgeConfigs[i] = *channel.EdgeRef } else { - for _, item := range configs { - str, _ := json.Marshal(item) - fmt.Println(string(str)) - } + slog.Warn(fmt.Sprintf("通道 %d 没有保存节点引用", channel.ID)) + } + + configs[i] = &g.PortConfigsReq{ + Status: false, + Port: int(channel.Port), + Edge: &[]string{}, + } + } + + // 提交配置 + if env.RunMode == env.RunModeProd { + + // 清空通道配置 + secret := strings.Split(u.Z(proxy.Secret), ":") + if len(secret) != 2 { + return core.NewServErr(fmt.Sprintf("代理 %s 密钥格式错误", proxy.IP.String()), nil) + } + gateway := g.NewGateway(proxy.IP.String(), secret[0], secret[1]) + err := gateway.GatewayPortConfigs(configs) + if err != nil { + return core.NewServErr(fmt.Sprintf("清空代理 %s 端口配置失败", proxy.IP.String()), err) + } + + // 断开节点连接 + _, err = g.Cloud.CloudDisconnect(&g.CloudDisconnectReq{ + Uuid: proxy.Mac, + Edge: &edgeConfigs, + }) + if err != nil { + slog.Warn("断开云平台连接失败", "error", err.Error()) + return core.NewServErr("断开云平台连接失败", err) } - pid = proxy.ID } else { - pid = *proxyId + for _, item := range configs { + str, _ := json.Marshal(item) + fmt.Println(string(str)) + } } // 释放端口 - err := freeChans(pid, batch) + err = freeChans(proxy.ID, batch) if err != nil { return err } - slog.Debug("清除代理端口配置", "proxy", pid, "batch", batch, "duration", time.Since(start).String()) + slog.Debug("清除代理端口配置", "proxy", proxy.ID, "batch", batch, "duration", time.Since(start).String()) return nil }) } -// ClearExpiredChannels 定期清理过期通道,返回清理数量 -// 通道有三种情况: -// - 过期等待清理,过期时间在一小时内,可以等待异步任务回收通道 -// - 过期未清理,过期时间超过一小时,说明异步任务可能失败了,需要强制清理 -// - 异常通道,取用后任务失败,导致通道悬空,需要强制清理 +// ClearExpiredChannels 清理指定代理的过期通道,并返回清理数量(现在理论上不会有需要手动批量清理的通道,未来可以废弃) func (s *channelBaiyinProvider) ClearExpiredChannels(proxyId int32) (int, error) { now := time.Now() - // 获取使用中通道批次 + // 获取未清理通道 keys, err := g.Redis.Keys(context.Background(), usedChansKey(proxyId, "*")).Result() if err != nil { return 0, core.NewServErr("查询使用中通道失败", err) } + if len(keys) == 0 { + return 0, nil + } + batchList := make([]string, len(keys)) batchSet := make(map[string]struct{}, len(keys)) for i, key := range keys { @@ -390,7 +345,7 @@ func (s *channelBaiyinProvider) ClearExpiredChannels(proxyId int32) (int, error) // 清理过期通道 slog.Info("批量清理过期通道", "count", len(batchSet)) for batchNo, _ := range batchSet { - err := s.RemoveChannels(batchNo, &proxyId) + err := s.RemoveChannels(batchNo) if err != nil { slog.Error("清理过期通道失败", "batch", batchNo, "error", err) } @@ -399,11 +354,83 @@ func (s *channelBaiyinProvider) ClearExpiredChannels(proxyId int32) (int, error) return len(batchSet), nil } -func batchRemoveExpiredKey(bid string) string { +func lockChannelCreateKey() string { + return "platform:channel:create" +} + +func lockChannelRemoveKey(bid string) string { return fmt.Sprintf("platform:batch:remove_expired:%s", bid) } -func getAvailableEdges(gateway g.GatewayClient, filter *EdgeFilter, count int) ([]EdgeInfo, error) { +func selectProxy(count int) (*m.Proxy, g.GatewayClient, error) { + // 获取在线节点 + proxies, err := q.Proxy.Where( + q.Proxy.Type.Eq(int(m.ProxyTypeBaiYin)), + q.Proxy.Status.Eq(int(m.ProxyStatusOnline)), + ).Find() + if err != nil { + return nil, nil, core.NewBizErr("获取可用代理失败", err) + } + if len(proxies) == 0 { + return nil, nil, core.NewBizErr("无可用代理") + } + + proxyIDs := make([]int32, 0, len(proxies)) + proxyMap := make(map[int32]*m.Proxy, len(proxies)) + for _, item := range proxies { + proxyIDs = append(proxyIDs, item.ID) + proxyMap[item.ID] = item + } + + // 获取最空闲节点 + maxId := int32(0) + maxCount := -1 + for _, id := range proxyIDs { + idCount, err := g.Redis.SCard(context.Background(), freeChansKey(id)).Result() + if err != nil { + return nil, nil, fmt.Errorf("查询可用通道数量失败: %w", err) + } + + if idCount > int64(maxCount) { + maxCount = int(idCount) + maxId = id + } + } + if maxCount < count { + return nil, nil, core.NewBizErr("无可用代理") + } + proxy := proxyMap[maxId] + + secret := strings.Split(u.Z(proxy.Secret), ":") + if len(secret) != 2 { + return nil, nil, core.NewServErr(fmt.Sprintf("代理 %s 密钥格式错误", proxy.IP.String()), nil) + } + gateway := g.NewGateway(proxy.IP.String(), secret[0], secret[1]) + + return proxy, gateway, nil +} + +func selectPorts(proxyId int32, batchNo string, count int, expire time.Time) ([]netip.AddrPort, error) { + chans, err := lockChans(proxyId, batchNo, count) + if err != nil { + return nil, core.NewBizErr("无可用通道,请稍后再试", err) + } + + _, err = g.Asynq.Enqueue( + e.NewRemoveChannel(batchNo), + asynq.ProcessAt(expire), + ) + if err != nil { + return nil, core.NewServErr("注册异步关闭通道任务失败", err) + } + + return chans, nil +} + +// selectEdges 选择节点,优先本地节点,失败重试,直到达到重试次数限制 +// 本地节点通过 Assigned = false 排除已分配节点 +// 云端节点通过 NoRepeat = true 排除已分配节点 +func selectEdges(gateway g.GatewayClient, filter *EdgeFilter, count int) ([]EdgeInfo, error) { edges := make([]EdgeInfo, 0, count) // 先查本地 @@ -428,7 +455,7 @@ func getAvailableEdges(gateway g.GatewayClient, filter *EdgeFilter, count int) ( return edges, nil } - // 再查云端无重复 + // 再查云端 remaining := count - len(edges) cloudEdgesResp, err := g.Cloud.CloudEdges(&g.CloudEdgesReq{ Province: filter.Prov, @@ -449,13 +476,11 @@ func getAvailableEdges(gateway g.GatewayClient, filter *EdgeFilter, count int) ( EdgeID: edge.EdgeID, }) } - if len(edges) >= count { - return edges, nil + if len(edges) < count { + return nil, core.NewBizErr("地区可用节点数量不足") } - // 不能和已有的重复,如果有重复则再次查询云端补足,二次提取还有重复则放弃 - - return nil, core.NewBizErr("地区可用节点数量不足") + return edges, nil } type EdgeInfo struct { diff --git a/web/services/proxy.go b/web/services/proxy.go index bf4ee77..e7c6930 100644 --- a/web/services/proxy.go +++ b/web/services/proxy.go @@ -2,7 +2,6 @@ package services import ( "context" - "fmt" "net/netip" "platform/pkg/u" "platform/web/core" @@ -19,10 +18,6 @@ var Proxy = &proxyService{} type proxyService struct{} -func proxyStatusLockKey(id int32) string { - return fmt.Sprintf("platform:proxy:status:%d", id) -} - func hasUsedChans(proxyID int32) (bool, error) { ctx := context.Background() pattern := usedChansKey(proxyID, "*") @@ -191,7 +186,7 @@ type UpdateProxyStatus struct { } func (s *proxyService) UpdateStatus(update *UpdateProxyStatus) error { - return g.Redsync.WithLock(proxyStatusLockKey(update.ID), func() error { + return q.Q.Transaction(func(tx *q.Query) error { proxy, err := q.Proxy.Where(q.Proxy.ID.Eq(update.ID)).Take() if err != nil { return err diff --git a/web/tasks/task.go b/web/tasks/task.go index 7a85400..1540212 100644 --- a/web/tasks/task.go +++ b/web/tasks/task.go @@ -46,7 +46,7 @@ func HandleRemoveChannel(_ context.Context, task *asynq.Task) (err error) { batch := string(task.Payload()) slog.Info("[event]删除通道", "batch", batch) - err = s.Channel.RemoveChannels(batch, nil) + err = s.Channel.RemoveChannels(batch) if err != nil { return fmt.Errorf("删除通道失败: %w", err) }