完善提取处理流程,解决提取并发问题

This commit is contained in:
2026-05-13 16:17:57 +08:00
parent d273731e31
commit ccbc6f0b67
6 changed files with 361 additions and 287 deletions

View File

@@ -1,13 +1,67 @@
## TODO ## TODO
- 解决提取成功率问题 ### 接口并发问题
- 提交代理端口配置后,并非同步将节点标记为占用状态,导致后续提取请求仍然会选中该节点,直到下一个提取请求才会发现该节点已满,导致提取失败?
#### 找可用网关 & 找可用端点
*(注:找端点依赖于网关状态,二者先后执行,但在并发控制上统一处理)*
**下线并发问题**
* 提供指定网关的可重入读写锁A。
* 整个提取期间锁定A直到提取结束。
* 网关可以并发下线但是不允许在锁A清空前进行删除或修改。
#### 找可用节点
##### 找本地
**并发筛选问题**
多个提取请求筛选到同一个节点,只有一个请求可以占用该节点,其他请求会直接失败。
**处理方案:**
* 提供占用锁,持续到提交配置后。
* 筛选出节点后,锁定该节点,如果锁定失败说明该节点已被占用(包括云端节点)。
* 如果被占用则等待占用结束后重新筛选(如果在结束前就重新筛选,可能还是会筛选到同样的节点,再次导致失败)。
* 提取时可能要求多个节点,因此锁定节点时,需要一个 lua 脚本同步锁定同一批节点。
##### 找云端
**重复筛选问题**
云端接口不会自动过滤已连接的节点,有可能筛选到已经连接甚至配置的节点。
**处理(缓解)方案:**
* 优先筛选今日未分配的节点,如果没有可用节点,再分配已用节点,这个方案暂时缓解问题。
#### 整理配置信息
* 该环节**不会有并发问题**。
#### 开通通道
*(注:包含以下三个独立操作,主要关注其执行的原子性与最终一致性保证)*
##### 提交异步关闭任务
**后续失败问题:**
* 不考虑回滚,执行时需要考虑后续数据不全的情况。
* 提交需要 `proxy``batch` 参数,端口取用是独占的,因此在归还端口前,一定不会有其他连接使用端口。任务信息中需要包含足够的信息以在没有数据库信息时解除配置。
* 解除连接与归还端口全部成功才算成功。
##### 提交配置到云端
* 该环节**不会有并发问题**。
##### 保存到数据库
**一致性处理:**
* 保存失败后,只会存在孤立占用。异步任务会自动重试,能够实现最终一致性。
---
- 并发扣减问题
- 代理选择通过查 redis 实现
- 重新考虑取用流程设计,是否可以分离端口归还与通道断开。端口归还后通道即使没有断开,未来也会被其他请求占用,能够实现最终一致性
---
- otel 没有正确记录接口失败信息 - otel 没有正确记录接口失败信息
筛选和关联展示功能扩展 筛选和关联展示功能扩展
channel 管理逻辑优化,需要携带 proxy 信息,考虑到 channel 没有成功创建的情况,或者保证创建操作的原子性
错误提示增强,展示整链路信息 错误提示增强,展示整链路信息
ip 提取频率限制,在 ensure 函数加逻辑,通过 redis 或者 pg 计算分钟内提取次数,只允许每分钟提取 30 次 ip 提取频率限制,在 ensure 函数加逻辑,通过 redis 或者 pg 计算分钟内提取次数,只允许每分钟提取 30 次

View File

@@ -257,7 +257,7 @@ func RemoveChannels(c *fiber.Ctx) error {
} }
// 删除通道 // 删除通道
err = s.Channel.RemoveChannels(req.Batch, nil) err = s.Channel.RemoveChannels(req.Batch)
if err != nil { if err != nil {
return err return err
} }

View File

@@ -25,7 +25,7 @@ var Channel = &channelServer{
type ChannelServiceProvider interface { type ChannelServiceProvider interface {
CreateChannels(source netip.Addr, resourceId int32, authWhitelist bool, authPassword bool, count int, edgeFilter *EdgeFilter) ([]*m.Channel, error) CreateChannels(source netip.Addr, resourceId int32, authWhitelist bool, authPassword bool, count int, edgeFilter *EdgeFilter) ([]*m.Channel, error)
RemoveChannels(batch string, proxyId *int32) error RemoveChannels(batch string) error
ClearExpiredChannels(proxyId int32) (int, error) ClearExpiredChannels(proxyId int32) (int, error)
} }
@@ -37,8 +37,8 @@ func (s *channelServer) CreateChannels(source netip.Addr, resourceId int32, auth
return s.provider.CreateChannels(source, resourceId, authWhitelist, authPassword, count, edgeFilter) return s.provider.CreateChannels(source, resourceId, authWhitelist, authPassword, count, edgeFilter)
} }
func (s *channelServer) RemoveChannels(batch string, proxyId *int32) error { func (s *channelServer) RemoveChannels(batch string) error {
return s.provider.RemoveChannels(batch, proxyId) return s.provider.RemoveChannels(batch)
} }
func (s *channelServer) ClearExpiredChannels(proxyId int32) (int, error) { func (s *channelServer) ClearExpiredChannels(proxyId int32) (int, error) {

View File

@@ -18,6 +18,7 @@ import (
"time" "time"
"github.com/hibiken/asynq" "github.com/hibiken/asynq"
"gorm.io/gen"
"gorm.io/gen/field" "gorm.io/gen/field"
) )
@@ -29,7 +30,7 @@ func (s *channelBaiyinProvider) CreateChannels(source netip.Addr, resourceId int
} }
now := time.Now() now := time.Now()
batch := ID.GenReadable("bat") batchNo := ID.GenReadable("bat")
// 检查并获取套餐与白名单 // 检查并获取套餐与白名单
resource, whitelists, err := ensure(now, source, resourceId, authWhitelist, count) resource, whitelists, err := ensure(now, source, resourceId, authWhitelist, count)
@@ -40,210 +41,168 @@ func (s *channelBaiyinProvider) CreateChannels(source netip.Addr, resourceId int
user := resource.User user := resource.User
expire := now.Add(resource.Live) expire := now.Add(resource.Live)
// 注册异步关闭任务
_, err = g.Asynq.Enqueue(
e.NewRemoveChannel(batch),
asynq.ProcessAt(expire),
)
if err != nil {
return nil, core.NewServErr("注册异步关闭通道任务失败", err)
}
// 选择代理 // 选择代理
proxyResult := struct { proxy, gateway, err := selectProxy(count)
m.Proxy
Count int
}{}
err = q.Proxy.
LeftJoin(q.Channel, q.Channel.ProxyID.EqCol(q.Proxy.ID), q.Channel.ExpiredAt.Gt(now)).
Select(q.Proxy.ALL, field.NewUnsafeFieldRaw("10000 - count(*)").As("count")).
Where(
q.Proxy.Type.Eq(int(m.ProxyTypeBaiYin)),
q.Proxy.Status.Eq(int(m.ProxyStatusOnline)),
).
Group(q.Proxy.ID).
Order(field.NewField("", "count")).
Limit(1).Scan(&proxyResult)
if err != nil { if err != nil {
return nil, core.NewBizErr("获取可用代理失败", err) return nil, err
} }
if proxyResult.Count < count {
return nil, core.NewBizErr("无可用主机,请稍后再试")
}
proxy := proxyResult.Proxy
// 取用端口 // 取用端口
var chans []netip.AddrPort chans, err := selectPorts(proxy.ID, batchNo, count, expire)
err = g.Redsync.WithLock(proxyStatusLockKey(proxy.ID), func() error { if err != nil {
lockedProxy, err := q.Proxy.Where(q.Proxy.ID.Eq(proxy.ID)).Take() return nil, err
}
// 节点查询到提交,需要锁定防止并发取用
channels := make([]*m.Channel, count)
err = g.Redsync.WithLock(lockChannelCreateKey(), func() error {
// 取用节点
edges, err := selectEdges(gateway, filter, count)
if err != nil { if err != nil {
return err return err
} }
if lockedProxy.Status != m.ProxyStatusOnline {
return core.NewBizErr("无可用主机,请稍后再试") // 绑定节点端口
chanConfigs := make([]*g.PortConfigsReq, count)
edgeConfigs := make([]string, 0, count)
for i := range count {
ch := chans[i]
edge := edges[i]
// 通道数据
channels[i] = &m.Channel{
UserID: user.ID,
ResourceID: resourceId,
BatchNo: batchNo,
ProxyID: proxy.ID,
Host: u.Else(proxy.Host, proxy.IP.String()),
Port: ch.Port(),
EdgeRef: u.P(edge.EdgeID),
FilterISP: filter.Isp,
FilterProv: filter.Prov,
FilterCity: filter.City,
ExpiredAt: expire,
Proxy: proxy,
}
// 通道配置数据
chanConfigs[i] = &g.PortConfigsReq{
Port: int(ch.Port()),
Status: true,
Edge: &[]string{edge.EdgeID},
}
// 白名单模式
if authWhitelist {
channels[i].Whitelists = u.P(strings.Join(whitelists, ","))
chanConfigs[i].Whitelist = &whitelists
}
// 密码模式
if authPassword {
username, password := genPassPair()
channels[i].Username = &username
channels[i].Password = &password
chanConfigs[i].Userpass = u.P(username + ":" + password)
}
// 连接配置数据
if edge.Type == EdgeInfoCloud {
edgeConfigs = append(edgeConfigs, edge.EdgeID)
}
} }
chans, err = lockChans(proxy.ID, batch, count) // 提交配置
if err != nil { slog.Debug("提交代理端口配置", "proxy", proxy.IP.String(), "total_count", len(chanConfigs), "remote_count", len(edgeConfigs))
return core.NewBizErr("无可用通道,请稍后再试", err) if env.RunMode == env.RunModeProd {
// 连接节点到网关
if err := g.Cloud.CloudConnect(&g.CloudConnectReq{Uuid: proxy.Mac, Edge: &edgeConfigs}); err != nil {
return core.NewServErr("连接云平台失败", err)
}
// 启用网关代理通道
if err := gateway.GatewayPortConfigs(chanConfigs); err != nil {
slog.Warn("提交代理端口配置失败", "error", err.Error())
return core.NewServErr(fmt.Sprintf("配置代理 %s 端口失败", proxy.IP.String()), err)
}
} else {
for _, item := range chanConfigs {
str, _ := json.Marshal(item)
fmt.Println(string(str))
}
} }
proxy = *lockedProxy // 保存数据
return nil err = q.Q.Transaction(func(q *q.Query) error {
}) // 更新使用记录
if err != nil { var result gen.ResultInfo
return nil, err var err error
} switch resource.Type {
case m.ResourceTypeShort:
result, err = q.ResourceShort.
Where(
q.ResourceShort.ID.Eq(*resource.ShortId),
q.ResourceShort.Used.Eq(resource.Used),
q.ResourceShort.Daily.Eq(resource.Daily),
).
UpdateSimple(
q.ResourceShort.Used.Add(int32(count)),
q.ResourceShort.Daily.Value(int32(resource.Today+count)),
q.ResourceShort.LastAt.Value(now),
)
// 取用节点 case m.ResourceTypeLong:
secret := strings.Split(u.Z(proxy.Secret), ":") result, err = q.ResourceLong.
if len(secret) != 2 { Where(
return nil, core.NewServErr(fmt.Sprintf("代理 %s 密钥格式错误", proxy.IP.String()), nil) q.ResourceLong.ID.Eq(*resource.LongId),
} q.ResourceLong.Used.Eq(resource.Used),
gateway := g.NewGateway(proxy.IP.String(), secret[0], secret[1]) q.ResourceLong.Daily.Eq(resource.Daily),
).
UpdateSimple(
q.ResourceLong.Used.Add(int32(count)),
q.ResourceLong.Daily.Value(int32(resource.Today+count)),
q.ResourceLong.LastAt.Value(now),
)
edges, err := getAvailableEdges(gateway, filter, count) default:
if err != nil { return core.NewBizErr("套餐类型不正确,无法更新", nil)
return nil, err }
} if err != nil {
return core.NewServErr("更新套餐使用记录失败", err)
}
if result.RowsAffected == 0 {
return core.NewBizErr("提取太频繁,请稍后再试", nil)
}
// 绑定节点到端口 // 保存通道
channels := make([]*m.Channel, count) err = q.Channel.
chanConfigs := make([]*g.PortConfigsReq, count) Omit(field.AssociationFields).
edgeConfigs := make([]string, 0, count) Create(channels...)
for i := range count { if err != nil {
ch := chans[i] return core.NewServErr("保存通道失败", err)
edge := edges[i] }
// 通道数据 // 保存提取记录
channels[i] = &m.Channel{ err = q.LogsUserUsage.Create(&m.LogsUserUsage{
UserID: user.ID, UserID: user.ID,
ResourceID: resourceId, ResourceID: resourceId,
BatchNo: batch, BatchNo: batchNo,
ProxyID: proxy.ID, Count: int32(count),
Host: u.Else(proxy.Host, proxy.IP.String()), ISP: u.X(filter.Isp.String()),
Port: ch.Port(), Prov: filter.Prov,
EdgeRef: u.P(edge.EdgeID), City: filter.City,
FilterISP: filter.Isp, IP: orm.Inet{Addr: source},
FilterProv: filter.Prov, Time: now,
FilterCity: filter.City, })
ExpiredAt: expire, if err != nil {
Proxy: &proxy, return core.NewServErr("保存用户使用记录失败", err)
} }
// 通道配置数据 return nil
chanConfigs[i] = &g.PortConfigsReq{
Port: int(ch.Port()),
Status: true,
Edge: &[]string{edge.EdgeID},
}
// 白名单模式
if authWhitelist {
channels[i].Whitelists = u.P(strings.Join(whitelists, ","))
chanConfigs[i].Whitelist = &whitelists
}
// 密码模式
if authPassword {
username, password := genPassPair()
channels[i].Username = &username
channels[i].Password = &password
chanConfigs[i].Userpass = u.P(username + ":" + password)
}
// 连接配置数据
if edge.Type == EdgeInfoCloud {
edgeConfigs = append(edgeConfigs, edge.EdgeID)
}
}
// 提交配置
slog.Debug("提交代理端口配置", "proxy", proxy.IP.String(), "total_count", len(chanConfigs), "remote_count", len(edgeConfigs))
if env.RunMode == env.RunModeProd {
// 连接节点到网关
err = g.Cloud.CloudConnect(&g.CloudConnectReq{
Uuid: proxy.Mac,
Edge: &edgeConfigs,
}) })
if err != nil { if err != nil {
return nil, core.NewServErr("连接云平台失败", err) return err
}
// 启用网关代理通道
err = gateway.GatewayPortConfigs(chanConfigs)
if err != nil {
slog.Warn("提交代理端口配置失败", "error", err.Error())
return nil, core.NewServErr(fmt.Sprintf("配置代理 %s 端口失败", proxy.IP.String()), err)
}
} else {
for _, item := range chanConfigs {
str, _ := json.Marshal(item)
fmt.Println(string(str))
}
}
// 保存数据
err = q.Q.Transaction(func(q *q.Query) error {
// 更新使用记录
var err error
switch resource.Type {
case m.ResourceTypeShort:
_, err = q.ResourceShort.
Where(
q.ResourceShort.ID.Eq(*resource.ShortId),
q.ResourceShort.Used.Eq(resource.Used),
q.ResourceShort.Daily.Eq(resource.Daily),
).
UpdateSimple(
q.ResourceShort.Used.Add(int32(count)),
q.ResourceShort.Daily.Value(int32(resource.Today+count)),
q.ResourceShort.LastAt.Value(now),
)
case m.ResourceTypeLong:
_, err = q.ResourceLong.
Where(
q.ResourceLong.ID.Eq(*resource.LongId),
q.ResourceLong.Used.Eq(resource.Used),
q.ResourceLong.Daily.Eq(resource.Daily),
).
UpdateSimple(
q.ResourceLong.Used.Add(int32(count)),
q.ResourceLong.Daily.Value(int32(resource.Today+count)),
q.ResourceLong.LastAt.Value(now),
)
default:
return core.NewServErr("套餐类型不正确,无法更新", nil)
}
if err != nil {
return core.NewServErr("更新套餐使用记录失败", err)
}
// 保存通道
err = q.Channel.
Omit(field.AssociationFields).
Create(channels...)
if err != nil {
return core.NewServErr("保存通道失败", err)
}
// 保存提取记录
err = q.LogsUserUsage.Create(&m.LogsUserUsage{
UserID: user.ID,
ResourceID: resourceId,
BatchNo: batch,
Count: int32(count),
ISP: u.P(filter.Isp.String()),
Prov: filter.Prov,
City: filter.City,
IP: orm.Inet{Addr: source},
Time: now,
})
if err != nil {
return core.NewServErr("保存用户使用记录失败", err)
} }
return nil return nil
@@ -255,110 +214,106 @@ func (s *channelBaiyinProvider) CreateChannels(source netip.Addr, resourceId int
return channels, nil return channels, nil
} }
func (s *channelBaiyinProvider) RemoveChannels(batch string, proxyId *int32) error { func (s *channelBaiyinProvider) RemoveChannels(batch string) error {
return g.Redsync.WithLock(batchRemoveExpiredKey(batch), func() error { return g.Redsync.WithLock(lockChannelRemoveKey(batch), func() error {
start := time.Now() start := time.Now()
pid := int32(0) // 获取连接数据
if proxyId == nil { channels, err := q.Channel.Where(q.Channel.BatchNo.Eq(batch)).Find()
// 获取连接数据 if err != nil {
channels, err := q.Channel.Where(q.Channel.BatchNo.Eq(batch)).Find() return core.NewServErr(fmt.Sprintf("获取通道数据失败batch%s", batch), err)
if err != nil { }
return core.NewServErr(fmt.Sprintf("获取通道数据失败batch%s", batch), err) if len(channels) == 0 {
} slog.Warn(fmt.Sprintf("未找到通道数据batch%s", batch))
if len(channels) == 0 { return nil
slog.Warn(fmt.Sprintf("未找到通道数据batch%s", batch)) }
return nil
}
proxy, err := q.Proxy.Where(q.Proxy.ID.Eq(channels[0].ProxyID)).Take() proxy, err := q.Proxy.Where(q.Proxy.ID.Eq(channels[0].ProxyID)).Take()
if err != nil { if err != nil {
return core.NewServErr(fmt.Sprintf("获取代理数据失败batch%s", batch), err) return core.NewServErr(fmt.Sprintf("获取代理数据失败batch%s", batch), err)
} }
// 检查通道是否存在 // 检查通道是否存在
exist, err := g.Redis.Exists(context.Background(), usedChansKey(proxy.ID, batch)).Result() exist, err := g.Redis.Exists(context.Background(), usedChansKey(proxy.ID, batch)).Result()
if err != nil { if err != nil {
return core.NewServErr("查询使用中通道失败", err) return core.NewServErr("查询使用中通道失败", err)
} }
if exist == 0 { if exist == 0 {
return nil // 没有使用中通道,已经被清理过了 return nil // 没有使用中通道,已经被清理过了
} }
// 准备配置数据
edgeConfigs := make([]string, len(channels))
configs := make([]*g.PortConfigsReq, len(channels))
for i, channel := range channels {
if channel.EdgeRef != nil {
edgeConfigs[i] = *channel.EdgeRef
} else {
slog.Warn(fmt.Sprintf("通道 %d 没有保存节点引用", channel.ID))
}
configs[i] = &g.PortConfigsReq{
Status: false,
Port: int(channel.Port),
Edge: &[]string{},
}
}
// 提交配置
if env.RunMode == env.RunModeProd {
// 清空通道配置
secret := strings.Split(u.Z(proxy.Secret), ":")
gateway := g.NewGateway(proxy.IP.String(), secret[0], secret[1])
err := gateway.GatewayPortConfigs(configs)
if err != nil {
return core.NewServErr(fmt.Sprintf("清空代理 %s 端口配置失败", proxy.IP.String()), err)
}
// 断开节点连接
_, err = g.Cloud.CloudDisconnect(&g.CloudDisconnectReq{
Uuid: proxy.Mac,
Edge: &edgeConfigs,
})
if err != nil {
slog.Warn("断开云平台连接失败", "error", err.Error())
return core.NewServErr("断开云平台连接失败", err)
}
// 准备配置数据
edgeConfigs := make([]string, len(channels))
configs := make([]*g.PortConfigsReq, len(channels))
for i, channel := range channels {
if channel.EdgeRef != nil {
edgeConfigs[i] = *channel.EdgeRef
} else { } else {
for _, item := range configs { slog.Warn(fmt.Sprintf("通道 %d 没有保存节点引用", channel.ID))
str, _ := json.Marshal(item) }
fmt.Println(string(str))
} configs[i] = &g.PortConfigsReq{
Status: false,
Port: int(channel.Port),
Edge: &[]string{},
}
}
// 提交配置
if env.RunMode == env.RunModeProd {
// 清空通道配置
secret := strings.Split(u.Z(proxy.Secret), ":")
if len(secret) != 2 {
return core.NewServErr(fmt.Sprintf("代理 %s 密钥格式错误", proxy.IP.String()), nil)
}
gateway := g.NewGateway(proxy.IP.String(), secret[0], secret[1])
err := gateway.GatewayPortConfigs(configs)
if err != nil {
return core.NewServErr(fmt.Sprintf("清空代理 %s 端口配置失败", proxy.IP.String()), err)
}
// 断开节点连接
_, err = g.Cloud.CloudDisconnect(&g.CloudDisconnectReq{
Uuid: proxy.Mac,
Edge: &edgeConfigs,
})
if err != nil {
slog.Warn("断开云平台连接失败", "error", err.Error())
return core.NewServErr("断开云平台连接失败", err)
} }
pid = proxy.ID
} else { } else {
pid = *proxyId for _, item := range configs {
str, _ := json.Marshal(item)
fmt.Println(string(str))
}
} }
// 释放端口 // 释放端口
err := freeChans(pid, batch) err = freeChans(proxy.ID, batch)
if err != nil { if err != nil {
return err return err
} }
slog.Debug("清除代理端口配置", "proxy", pid, "batch", batch, "duration", time.Since(start).String()) slog.Debug("清除代理端口配置", "proxy", proxy.ID, "batch", batch, "duration", time.Since(start).String())
return nil return nil
}) })
} }
// ClearExpiredChannels 定期清理过期通道,返回清理数量 // ClearExpiredChannels 清理指定代理的过期通道,返回清理数量(现在理论上不会有需要手动批量清理的通道,未来可以废弃)
// 通道有三种情况:
// - 过期等待清理,过期时间在一小时内,可以等待异步任务回收通道
// - 过期未清理,过期时间超过一小时,说明异步任务可能失败了,需要强制清理
// - 异常通道,取用后任务失败,导致通道悬空,需要强制清理
func (s *channelBaiyinProvider) ClearExpiredChannels(proxyId int32) (int, error) { func (s *channelBaiyinProvider) ClearExpiredChannels(proxyId int32) (int, error) {
now := time.Now() now := time.Now()
// 获取使用中通道批次 // 获取未清理通道
keys, err := g.Redis.Keys(context.Background(), usedChansKey(proxyId, "*")).Result() keys, err := g.Redis.Keys(context.Background(), usedChansKey(proxyId, "*")).Result()
if err != nil { if err != nil {
return 0, core.NewServErr("查询使用中通道失败", err) return 0, core.NewServErr("查询使用中通道失败", err)
} }
if len(keys) == 0 {
return 0, nil
}
batchList := make([]string, len(keys)) batchList := make([]string, len(keys))
batchSet := make(map[string]struct{}, len(keys)) batchSet := make(map[string]struct{}, len(keys))
for i, key := range keys { for i, key := range keys {
@@ -390,7 +345,7 @@ func (s *channelBaiyinProvider) ClearExpiredChannels(proxyId int32) (int, error)
// 清理过期通道 // 清理过期通道
slog.Info("批量清理过期通道", "count", len(batchSet)) slog.Info("批量清理过期通道", "count", len(batchSet))
for batchNo, _ := range batchSet { for batchNo, _ := range batchSet {
err := s.RemoveChannels(batchNo, &proxyId) err := s.RemoveChannels(batchNo)
if err != nil { if err != nil {
slog.Error("清理过期通道失败", "batch", batchNo, "error", err) slog.Error("清理过期通道失败", "batch", batchNo, "error", err)
} }
@@ -399,11 +354,83 @@ func (s *channelBaiyinProvider) ClearExpiredChannels(proxyId int32) (int, error)
return len(batchSet), nil return len(batchSet), nil
} }
func batchRemoveExpiredKey(bid string) string { func lockChannelCreateKey() string {
return "platform:channel:create"
}
func lockChannelRemoveKey(bid string) string {
return fmt.Sprintf("platform:batch:remove_expired:%s", bid) return fmt.Sprintf("platform:batch:remove_expired:%s", bid)
} }
func getAvailableEdges(gateway g.GatewayClient, filter *EdgeFilter, count int) ([]EdgeInfo, error) { func selectProxy(count int) (*m.Proxy, g.GatewayClient, error) {
// 获取在线节点
proxies, err := q.Proxy.Where(
q.Proxy.Type.Eq(int(m.ProxyTypeBaiYin)),
q.Proxy.Status.Eq(int(m.ProxyStatusOnline)),
).Find()
if err != nil {
return nil, nil, core.NewBizErr("获取可用代理失败", err)
}
if len(proxies) == 0 {
return nil, nil, core.NewBizErr("无可用代理")
}
proxyIDs := make([]int32, 0, len(proxies))
proxyMap := make(map[int32]*m.Proxy, len(proxies))
for _, item := range proxies {
proxyIDs = append(proxyIDs, item.ID)
proxyMap[item.ID] = item
}
// 获取最空闲节点
maxId := int32(0)
maxCount := -1
for _, id := range proxyIDs {
idCount, err := g.Redis.SCard(context.Background(), freeChansKey(id)).Result()
if err != nil {
return nil, nil, fmt.Errorf("查询可用通道数量失败: %w", err)
}
if idCount > int64(maxCount) {
maxCount = int(idCount)
maxId = id
}
}
if maxCount < count {
return nil, nil, core.NewBizErr("无可用代理")
}
proxy := proxyMap[maxId]
secret := strings.Split(u.Z(proxy.Secret), ":")
if len(secret) != 2 {
return nil, nil, core.NewServErr(fmt.Sprintf("代理 %s 密钥格式错误", proxy.IP.String()), nil)
}
gateway := g.NewGateway(proxy.IP.String(), secret[0], secret[1])
return proxy, gateway, nil
}
func selectPorts(proxyId int32, batchNo string, count int, expire time.Time) ([]netip.AddrPort, error) {
chans, err := lockChans(proxyId, batchNo, count)
if err != nil {
return nil, core.NewBizErr("无可用通道,请稍后再试", err)
}
_, err = g.Asynq.Enqueue(
e.NewRemoveChannel(batchNo),
asynq.ProcessAt(expire),
)
if err != nil {
return nil, core.NewServErr("注册异步关闭通道任务失败", err)
}
return chans, nil
}
// selectEdges 选择节点,优先本地节点,失败重试,直到达到重试次数限制
// 本地节点通过 Assigned = false 排除已分配节点
// 云端节点通过 NoRepeat = true 排除已分配节点
func selectEdges(gateway g.GatewayClient, filter *EdgeFilter, count int) ([]EdgeInfo, error) {
edges := make([]EdgeInfo, 0, count) edges := make([]EdgeInfo, 0, count)
// 先查本地 // 先查本地
@@ -428,7 +455,7 @@ func getAvailableEdges(gateway g.GatewayClient, filter *EdgeFilter, count int) (
return edges, nil return edges, nil
} }
// 再查云端无重复 // 再查云端
remaining := count - len(edges) remaining := count - len(edges)
cloudEdgesResp, err := g.Cloud.CloudEdges(&g.CloudEdgesReq{ cloudEdgesResp, err := g.Cloud.CloudEdges(&g.CloudEdgesReq{
Province: filter.Prov, Province: filter.Prov,
@@ -449,13 +476,11 @@ func getAvailableEdges(gateway g.GatewayClient, filter *EdgeFilter, count int) (
EdgeID: edge.EdgeID, EdgeID: edge.EdgeID,
}) })
} }
if len(edges) >= count { if len(edges) < count {
return edges, nil return nil, core.NewBizErr("地区可用节点数量不足")
} }
// 不能和已有的重复,如果有重复则再次查询云端补足,二次提取还有重复则放弃 return edges, nil
return nil, core.NewBizErr("地区可用节点数量不足")
} }
type EdgeInfo struct { type EdgeInfo struct {

View File

@@ -2,7 +2,6 @@ package services
import ( import (
"context" "context"
"fmt"
"net/netip" "net/netip"
"platform/pkg/u" "platform/pkg/u"
"platform/web/core" "platform/web/core"
@@ -19,10 +18,6 @@ var Proxy = &proxyService{}
type proxyService struct{} type proxyService struct{}
func proxyStatusLockKey(id int32) string {
return fmt.Sprintf("platform:proxy:status:%d", id)
}
func hasUsedChans(proxyID int32) (bool, error) { func hasUsedChans(proxyID int32) (bool, error) {
ctx := context.Background() ctx := context.Background()
pattern := usedChansKey(proxyID, "*") pattern := usedChansKey(proxyID, "*")
@@ -191,7 +186,7 @@ type UpdateProxyStatus struct {
} }
func (s *proxyService) UpdateStatus(update *UpdateProxyStatus) error { func (s *proxyService) UpdateStatus(update *UpdateProxyStatus) error {
return g.Redsync.WithLock(proxyStatusLockKey(update.ID), func() error { return q.Q.Transaction(func(tx *q.Query) error {
proxy, err := q.Proxy.Where(q.Proxy.ID.Eq(update.ID)).Take() proxy, err := q.Proxy.Where(q.Proxy.ID.Eq(update.ID)).Take()
if err != nil { if err != nil {
return err return err

View File

@@ -46,7 +46,7 @@ func HandleRemoveChannel(_ context.Context, task *asynq.Task) (err error) {
batch := string(task.Payload()) batch := string(task.Payload())
slog.Info("[event]删除通道", "batch", batch) slog.Info("[event]删除通道", "batch", batch)
err = s.Channel.RemoveChannels(batch, nil) err = s.Channel.RemoveChannels(batch)
if err != nil { if err != nil {
return fmt.Errorf("删除通道失败: %w", err) return fmt.Errorf("删除通道失败: %w", err)
} }