修复提取并发问题 & 修复接口时区问题

This commit is contained in:
2026-05-18 13:54:01 +08:00
parent 8f89503c88
commit 71554da541
16 changed files with 386 additions and 239 deletions

View File

@@ -31,33 +31,27 @@ func (s *channelBaiyinProvider) CreateChannels(source netip.Addr, resourceId int
now := time.Now()
batchNo := ID.GenReadable("bat")
// 检查并获取套餐与白名单
resource, whitelists, err := ensure(now, source, resourceId, authWhitelist, count)
if err != nil {
return nil, err
}
user := resource.User
expire := now.Add(resource.Live)
// 选择代理
proxy, gateway, err := selectProxy(count)
if err != nil {
return nil, err
}
// 取用端口
chans, err := selectPorts(proxy.ID, batchNo, count, expire)
if err != nil {
return nil, err
}
// 节点查询到提交,需要锁定防止并发取用
channels := make([]*m.Channel, count)
err = g.Redsync.WithLock(lockChannelCreateKey(), func() error {
// 取用节点
edges, err := selectEdges(gateway, filter, count)
// 资源锁,防止并发扣减失败导致的端口悬空问题
err := g.Redsync.WithLock(lockChannelCreateKey(resourceId), func() error {
// 检查并获取套餐与白名单
resource, whitelists, err := ensure(now, source, resourceId, authWhitelist, count)
if err != nil {
return err
}
user := resource.User
expire := now.Add(resource.Live)
// 选择代理
proxy, gateway, err := selectProxy(count)
if err != nil {
return err
}
// 取用端口
chans, err := selectPorts(proxy.ID, batchNo, count, expire)
if err != nil {
return err
}
@@ -67,7 +61,6 @@ func (s *channelBaiyinProvider) CreateChannels(source netip.Addr, resourceId int
edgeConfigs := make([]string, 0, count)
for i := range count {
ch := chans[i]
edge := edges[i]
// 通道数据
channels[i] = &m.Channel{
@@ -77,7 +70,6 @@ func (s *channelBaiyinProvider) CreateChannels(source netip.Addr, resourceId int
ProxyID: proxy.ID,
Host: u.Else(proxy.Host, proxy.IP.String()),
Port: ch.Port(),
EdgeRef: u.P(edge.EdgeID),
FilterISP: filter.Isp,
FilterProv: filter.Prov,
FilterCity: filter.City,
@@ -89,7 +81,12 @@ func (s *channelBaiyinProvider) CreateChannels(source netip.Addr, resourceId int
chanConfigs[i] = &g.PortConfigsReq{
Port: int(ch.Port()),
Status: true,
Edge: &[]string{edge.EdgeID},
AutoEdgeConfig: &g.AutoEdgeConfig{
Province: u.Z(filter.Prov),
City: u.Z(filter.City),
Isp: filter.Isp.String(),
Count: u.P(1),
},
}
// 白名单模式
@@ -105,26 +102,24 @@ func (s *channelBaiyinProvider) CreateChannels(source netip.Addr, resourceId int
channels[i].Password = &password
chanConfigs[i].Userpass = u.P(username + ":" + password)
}
// 连接配置数据
if edge.Type == EdgeInfoCloud {
edgeConfigs = append(edgeConfigs, edge.EdgeID)
}
}
// 提交配置
slog.Debug("提交代理端口配置", "proxy", proxy.IP.String(), "total_count", len(chanConfigs), "remote_count", len(edgeConfigs))
if env.RunMode == env.RunModeProd {
// 连接节点到网关
if err := g.Cloud.CloudConnect(&g.CloudConnectReq{Uuid: proxy.Mac, Edge: &edgeConfigs}); err != nil {
return core.NewServErr("连接云平台失败", err)
// 从云端补足节点
err := ensureEdges(proxy, gateway, filter, count)
if err != nil {
slog.Warn("ensureEdges 失败", "err", err) // 不阻止通道创建,继续走后续流程
}
// 启用网关代理通道
if err := gateway.GatewayPortConfigs(chanConfigs); err != nil {
slog.Warn("提交代理端口配置失败", "error", err.Error())
return core.NewServErr(fmt.Sprintf("配置代理 %s 端口失败", proxy.IP.String()), err)
if len(chanConfigs) > 0 {
if err := gateway.GatewayPortConfigs(chanConfigs); err != nil {
slog.Warn("提交代理端口配置失败", "error", err.Error())
return core.NewServErr(fmt.Sprintf("配置代理 %s 端口失败", proxy.IP.String()), err)
}
}
} else {
for _, item := range chanConfigs {
@@ -166,13 +161,13 @@ func (s *channelBaiyinProvider) CreateChannels(source netip.Addr, resourceId int
)
default:
return core.NewBizErr("套餐类型不正确,无法更新", nil)
return core.NewBizErr("套餐类型不正确,无法更新")
}
if err != nil {
return core.NewServErr("更新套餐使用记录失败", err)
}
if result.RowsAffected == 0 {
return core.NewBizErr("提取太频繁,请稍后再试", nil)
return core.NewBizErr("套餐状态已过期")
}
// 保存通道
@@ -214,75 +209,61 @@ func (s *channelBaiyinProvider) CreateChannels(source netip.Addr, resourceId int
return channels, nil
}
func (s *channelBaiyinProvider) RemoveChannels(batch string) error {
return g.Redsync.WithLock(lockChannelRemoveKey(batch), func() error {
func (s *channelBaiyinProvider) RemoveChannels(batchNo string) error {
return g.Redsync.WithLock(lockChannelRemoveKey(batchNo), func() error {
start := time.Now()
// 获取连接数据
channels, err := q.Channel.Where(q.Channel.BatchNo.Eq(batch)).Find()
channels, err := q.Channel.Where(q.Channel.BatchNo.Eq(batchNo)).Find()
if err != nil {
return core.NewServErr(fmt.Sprintf("获取通道数据失败batch%s", batch), err)
return core.NewServErr(fmt.Sprintf("获取通道数据失败batch%s", batchNo), err)
}
if len(channels) == 0 {
slog.Warn(fmt.Sprintf("未找到通道数据batch%s", batch))
slog.Warn(fmt.Sprintf("未找到通道数据batch%s", batchNo))
return nil
}
proxy, err := q.Proxy.Where(q.Proxy.ID.Eq(channels[0].ProxyID)).Take()
if err != nil {
return core.NewServErr(fmt.Sprintf("获取代理数据失败batch%s", batch), err)
return core.NewServErr(fmt.Sprintf("获取代理数据失败batch%s", batchNo), err)
}
// 检查通道是否存在
exist, err := g.Redis.Exists(context.Background(), usedChansKey(proxy.ID, batch)).Result()
chans, err := g.Redis.LRange(context.Background(), usedChansKey(proxy.ID, batchNo), 0, -1).Result()
if err != nil {
return core.NewServErr("查询使用中通道失败", err)
}
if exist == 0 {
if len(chans) == 0 {
slog.Debug("通道为空,跳过清理", "key", usedChansKey(proxy.ID, batchNo))
return nil // 没有使用中通道,已经被清理过了
}
// 准备配置数据
edgeConfigs := make([]string, len(channels))
configs := make([]*g.PortConfigsReq, len(channels))
for i, channel := range channels {
if channel.EdgeRef != nil {
edgeConfigs[i] = *channel.EdgeRef
} else {
slog.Warn(fmt.Sprintf("通道 %d 没有保存节点引用", channel.ID))
configs := make([]*g.PortConfigsReq, len(chans))
for i, ch := range chans {
ap, err := netip.ParseAddrPort(ch)
if err != nil {
return core.NewServErr(fmt.Sprintf("解析通道数据失败: %s", ch), err)
}
configs[i] = &g.PortConfigsReq{
Status: false,
Port: int(channel.Port),
Edge: &[]string{},
Port: int(ap.Port()),
Edge: &[]string{},
AutoEdgeConfig: &g.AutoEdgeConfig{Count: u.P(0)},
Status: false,
}
}
// 提交配置
if env.RunMode == env.RunModeProd {
// 清空通道配置
secret := strings.Split(u.Z(proxy.Secret), ":")
if len(secret) != 2 {
return core.NewServErr(fmt.Sprintf("代理 %s 密钥格式错误", proxy.IP.String()), nil)
}
gateway := g.NewGateway(proxy.IP.String(), secret[0], secret[1])
err := gateway.GatewayPortConfigs(configs)
gateway, err := proxyGateway(proxy)
if err != nil {
return core.NewServErr("创建代理网关失败", err)
}
if err = gateway.GatewayPortConfigs(configs); err != nil {
return core.NewServErr(fmt.Sprintf("清空代理 %s 端口配置失败", proxy.IP.String()), err)
}
// 断开节点连接
_, err = g.Cloud.CloudDisconnect(&g.CloudDisconnectReq{
Uuid: proxy.Mac,
Edge: &edgeConfigs,
})
if err != nil {
slog.Warn("断开云平台连接失败", "error", err.Error())
return core.NewServErr("断开云平台连接失败", err)
}
} else {
for _, item := range configs {
str, _ := json.Marshal(item)
@@ -291,12 +272,12 @@ func (s *channelBaiyinProvider) RemoveChannels(batch string) error {
}
// 释放端口
err = freeChans(proxy.ID, batch)
err = freeChans(proxy.ID, batchNo)
if err != nil {
return err
}
slog.Debug("清除代理端口配置", "proxy", proxy.ID, "batch", batch, "duration", time.Since(start).String())
slog.Debug("清除代理端口配置", "proxy", proxy.ID, "batch", batchNo, "duration", time.Since(start).String())
return nil
})
}
@@ -354,8 +335,8 @@ func (s *channelBaiyinProvider) ClearExpiredChannels(proxyId int32) (int, error)
return len(batchSet), nil
}
func lockChannelCreateKey() string {
return "platform:channel:create"
func lockChannelCreateKey(resourceId int32) string {
return fmt.Sprintf("platform:channel:create:%d", resourceId)
}
func lockChannelRemoveKey(bid string) string {
@@ -399,13 +380,12 @@ func selectProxy(count int) (*m.Proxy, g.GatewayClient, error) {
if maxCount < count {
return nil, nil, core.NewBizErr("无可用代理")
}
proxy := proxyMap[maxId]
secret := strings.Split(u.Z(proxy.Secret), ":")
if len(secret) != 2 {
return nil, nil, core.NewServErr(fmt.Sprintf("代理 %s 密钥格式错误", proxy.IP.String()), nil)
proxy := proxyMap[maxId]
gateway, err := proxyGateway(proxy)
if err != nil {
return nil, nil, core.NewServErr("创建代理网关失败", err)
}
gateway := g.NewGateway(proxy.IP.String(), secret[0], secret[1])
return proxy, gateway, nil
}
@@ -427,14 +407,16 @@ func selectPorts(proxyId int32, batchNo string, count int, expire time.Time) ([]
return chans, nil
}
// selectEdges 选择节点,优先本地节点,失败重试,直到达到重试次数限制
// ensureEdges 检查本地节点是否足够,如果不足从云端连入
// 本地节点通过 Assigned = false 排除已分配节点
// 云端节点通过 NoRepeat = true 排除已分配节点
func selectEdges(gateway g.GatewayClient, filter *EdgeFilter, count int) ([]EdgeInfo, error) {
edges := make([]EdgeInfo, 0, count)
func ensureEdges(proxy *m.Proxy, gateway g.GatewayClient, filter *EdgeFilter, count int) error {
if filter.IsEmpty() {
return nil // 没有过滤条件,直接返回空,避免无意义的查询
}
// 先查本地
localEdgesResp, err := gateway.GatewayEdge(&g.GatewayEdgeReq{
localEdges, err := gateway.GatewayEdge(&g.GatewayEdgeReq{
Province: filter.Prov,
City: filter.City,
Isp: u.X(filter.Isp.String()),
@@ -442,22 +424,15 @@ func selectEdges(gateway g.GatewayClient, filter *EdgeFilter, count int) ([]Edge
Assigned: u.P(false),
})
if err != nil {
return nil, core.NewBizErr("获取可用节点失败[1]", err)
return core.NewBizErr("检查可用节点失败[1]", err)
}
for id, _ := range localEdgesResp {
edges = append(edges, EdgeInfo{
Type: EdgeInfoLocal,
EdgeID: id,
})
}
if len(edges) >= count {
return edges, nil
if len(localEdges) >= count {
return nil // 本地节点足够,直接返回空,后续逻辑会优先使用本地节点
}
// 再查云端
remaining := count - len(edges)
cloudEdgesResp, err := g.Cloud.CloudEdges(&g.CloudEdgesReq{
remaining := count - len(localEdges)
cloudEdges, err := g.Cloud.CloudEdges(&g.CloudEdgesReq{
Province: filter.Prov,
City: filter.City,
Isp: u.X(filter.Isp.String()),
@@ -467,20 +442,23 @@ func selectEdges(gateway g.GatewayClient, filter *EdgeFilter, count int) ([]Edge
IpUnchangedTime: u.P(3600),
})
if err != nil {
return nil, core.NewBizErr("获取可用节点失败[2]", err)
return core.NewBizErr("检查可用节点失败[2]", err)
}
if len(cloudEdges.Edges) < remaining {
return core.NewBizErr("地区可用节点数量不足")
}
for _, edge := range cloudEdgesResp.Edges {
edges = append(edges, EdgeInfo{
Type: EdgeInfoCloud,
EdgeID: edge.EdgeID,
})
}
if len(edges) < count {
return nil, core.NewBizErr("地区可用节点数量不足")
// 连入云端节点
edges := make([]string, remaining)
for i, edge := range cloudEdges.Edges {
edges[i] = edge.EdgeID
}
return edges, nil
if err := g.Cloud.CloudConnect(&g.CloudConnectReq{Uuid: proxy.Mac, Edge: &edges}); err != nil {
return core.NewServErr("连接云平台失败", err)
}
return nil
}
type EdgeInfo struct {