完善通道管理机制 & 增强 otel 记录字段

This commit is contained in:
2026-05-08 17:30:51 +08:00
parent 042c8d1a51
commit 65f8ee360b
12 changed files with 265 additions and 194 deletions

View File

@@ -25,8 +25,8 @@ var Channel = &channelServer{
type ChannelServiceProvider interface {
CreateChannels(source netip.Addr, resourceId int32, authWhitelist bool, authPassword bool, count int, edgeFilter *EdgeFilter) ([]*m.Channel, error)
RemoveChannels(batch string) error
ClearExpiredChannels() error
RemoveChannels(batch string, proxyId *int32) error
ClearExpiredChannels(proxyId int32) (int, error)
}
type channelServer struct {
@@ -37,12 +37,12 @@ func (s *channelServer) CreateChannels(source netip.Addr, resourceId int32, auth
return s.provider.CreateChannels(source, resourceId, authWhitelist, authPassword, count, edgeFilter)
}
func (s *channelServer) RemoveChannels(batch string) error {
return s.provider.RemoveChannels(batch)
func (s *channelServer) RemoveChannels(batch string, proxyId *int32) error {
return s.provider.RemoveChannels(batch, proxyId)
}
func (s *channelServer) ClearExpiredChannels() error {
return s.provider.ClearExpiredChannels()
func (s *channelServer) ClearExpiredChannels(proxyId int32) (int, error) {
return s.provider.ClearExpiredChannels(proxyId)
}
// 授权方式
@@ -220,10 +220,13 @@ func ensure(now time.Time, source netip.Addr, resourceId int32, authWhitelist bo
return resource, ips, nil
}
var (
freeChansKey = "channel:free"
usedChansKey = "channel:used"
)
func freeChansKey(proxy int32) string {
return "channel:free:" + strconv.Itoa(int(proxy))
}
func usedChansKey(proxy int32, batch string) string {
return "channel:used:" + strconv.Itoa(int(proxy)) + ":" + batch
}
// 扩容通道
func regChans(proxy int32, chans []netip.AddrPort) error {
@@ -232,7 +235,7 @@ func regChans(proxy int32, chans []netip.AddrPort) error {
strs[i] = ch.String()
}
key := freeChansKey + ":" + strconv.Itoa(int(proxy))
key := freeChansKey(proxy)
err := g.Redis.SAdd(context.Background(), key, strs...).Err()
if err != nil {
return fmt.Errorf("扩容通道失败: %w", err)
@@ -242,7 +245,7 @@ func regChans(proxy int32, chans []netip.AddrPort) error {
// 缩容通道
func remChans(proxy int32) error {
key := freeChansKey + ":" + strconv.Itoa(int(proxy))
key := freeChansKey(proxy)
err := g.Redis.Del(context.Background(), key).Err()
if err != nil {
return fmt.Errorf("缩容通道失败: %w", err)
@@ -252,13 +255,12 @@ func remChans(proxy int32) error {
// 取用通道
func lockChans(proxy int32, batch string, count int) ([]netip.AddrPort, error) {
pid := strconv.Itoa(int(proxy))
chans, err := RedisScriptLockChans.Run(
context.Background(),
g.Redis,
[]string{
freeChansKey + ":" + pid,
usedChansKey + ":" + pid + ":" + batch,
freeChansKey(proxy),
usedChansKey(proxy, batch),
},
count,
).StringSlice()
@@ -296,13 +298,12 @@ return ports
// 归还通道
func freeChans(proxy int32, batch string) error {
pid := strconv.Itoa(int(proxy))
err := RedisScriptFreeChans.Run(
context.Background(),
g.Redis,
[]string{
freeChansKey + ":" + pid,
usedChansKey + ":" + pid + ":" + batch,
freeChansKey(proxy),
usedChansKey(proxy, batch),
},
).Err()
if err != nil {

View File

@@ -1,6 +1,7 @@
package services
import (
"context"
"encoding/json"
"fmt"
"log/slog"
@@ -39,6 +40,15 @@ func (s *channelBaiyinProvider) CreateChannels(source netip.Addr, resourceId int
user := resource.User
expire := now.Add(resource.Live)
// 注册异步关闭任务
_, err = g.Asynq.Enqueue(
e.NewRemoveChannel(batch),
asynq.ProcessAt(expire),
)
if err != nil {
return nil, core.NewServErr("注册异步关闭通道任务失败", err)
}
// 选择代理
proxyResult := struct {
m.Proxy
@@ -85,14 +95,6 @@ func (s *channelBaiyinProvider) CreateChannels(source netip.Addr, resourceId int
return nil, err
}
_, err = g.Asynq.Enqueue(
e.NewRemoveChannel(batch),
asynq.ProcessAt(expire),
)
if err != nil {
return nil, core.NewServErr("提交关闭通道任务失败", err)
}
// 取用节点
secret := strings.Split(u.Z(proxy.Secret), ":")
if len(secret) != 2 {
@@ -156,6 +158,32 @@ func (s *channelBaiyinProvider) CreateChannels(source netip.Addr, resourceId int
}
}
// 提交配置
slog.Debug("提交代理端口配置", "proxy", proxy.IP.String(), "total_count", len(chanConfigs), "remote_count", len(edgeConfigs))
if env.RunMode == env.RunModeProd {
// 连接节点到网关
err = g.Cloud.CloudConnect(&g.CloudConnectReq{
Uuid: proxy.Mac,
Edge: &edgeConfigs,
})
if err != nil {
return nil, core.NewServErr("连接云平台失败", err)
}
// 启用网关代理通道
err = gateway.GatewayPortConfigs(chanConfigs)
if err != nil {
slog.Warn("提交代理端口配置失败", "error", err.Error())
return nil, core.NewServErr(fmt.Sprintf("配置代理 %s 端口失败", proxy.IP.String()), err)
}
} else {
for _, item := range chanConfigs {
str, _ := json.Marshal(item)
fmt.Println(string(str))
}
}
// 保存数据
err = q.Q.Transaction(func(q *q.Query) error {
// 更新使用记录
@@ -224,124 +252,155 @@ func (s *channelBaiyinProvider) CreateChannels(source netip.Addr, resourceId int
return nil, err
}
// 提交配置
if env.RunMode == env.RunModeProd {
// 连接节点到网关
err = g.Cloud.CloudConnect(&g.CloudConnectReq{
Uuid: proxy.Mac,
Edge: &edgeConfigs,
})
if err != nil {
return nil, core.NewServErr("连接云平台失败", err)
}
// 启用网关代理通道
err = gateway.GatewayPortConfigs(chanConfigs)
if err != nil {
slog.Warn("提交代理端口配置失败", "error", err.Error())
return nil, core.NewServErr(fmt.Sprintf("配置代理 %s 端口失败", proxy.IP.String()), err)
}
} else {
slog.Debug("提交代理端口配置", "proxy", proxy.IP.String(), "count", len(chanConfigs), "remote", len(edgeConfigs))
for _, item := range chanConfigs {
str, _ := json.Marshal(item)
fmt.Println(string(str))
}
}
return channels, nil
}
func (s *channelBaiyinProvider) RemoveChannels(batch string) error {
start := time.Now()
func (s *channelBaiyinProvider) RemoveChannels(batch string, proxyId *int32) error {
return g.Redsync.WithLock(batchRemoveExpiredKey(batch), func() error {
start := time.Now()
// 获取连接数据
channels, err := q.Channel.Where(q.Channel.BatchNo.Eq(batch)).Find()
if err != nil {
return core.NewServErr(fmt.Sprintf("获取通道数据失败batch%s", batch), err)
}
if len(channels) == 0 {
slog.Warn(fmt.Sprintf("未找到通道数据batch%s", batch))
return nil
}
pid := int32(0)
if proxyId == nil {
// 获取连接数据
channels, err := q.Channel.Where(q.Channel.BatchNo.Eq(batch)).Find()
if err != nil {
return core.NewServErr(fmt.Sprintf("获取通道数据失败batch%s", batch), err)
}
if len(channels) == 0 {
slog.Warn(fmt.Sprintf("未找到通道数据batch%s", batch))
return nil
}
proxy, err := q.Proxy.Where(q.Proxy.ID.Eq(channels[0].ProxyID)).Take()
if err != nil {
return core.NewServErr(fmt.Sprintf("获取代理数据失败batch%s", batch), err)
}
proxy, err := q.Proxy.Where(q.Proxy.ID.Eq(channels[0].ProxyID)).Take()
if err != nil {
return core.NewServErr(fmt.Sprintf("获取代理数据失败batch%s", batch), err)
}
// 准备配置数据
edgeConfigs := make([]string, len(channels))
configs := make([]*g.PortConfigsReq, len(channels))
for i, channel := range channels {
if channel.EdgeRef != nil {
edgeConfigs[i] = *channel.EdgeRef
// 检查通道是否存在
exist, err := g.Redis.Exists(context.Background(), usedChansKey(proxy.ID, batch)).Result()
if err != nil {
return core.NewServErr("查询使用中通道失败", err)
}
if exist == 0 {
return nil // 没有使用中通道,已经被清理过了
}
// 准备配置数据
edgeConfigs := make([]string, len(channels))
configs := make([]*g.PortConfigsReq, len(channels))
for i, channel := range channels {
if channel.EdgeRef != nil {
edgeConfigs[i] = *channel.EdgeRef
} else {
slog.Warn(fmt.Sprintf("通道 %d 没有保存节点引用", channel.ID))
}
configs[i] = &g.PortConfigsReq{
Status: false,
Port: int(channel.Port),
Edge: &[]string{},
}
}
// 提交配置
if env.RunMode == env.RunModeProd {
// 清空通道配置
secret := strings.Split(u.Z(proxy.Secret), ":")
gateway := g.NewGateway(proxy.IP.String(), secret[0], secret[1])
err := gateway.GatewayPortConfigs(configs)
if err != nil {
return core.NewServErr(fmt.Sprintf("清空代理 %s 端口配置失败", proxy.IP.String()), err)
}
// 断开节点连接
_, err = g.Cloud.CloudDisconnect(&g.CloudDisconnectReq{
Uuid: proxy.Mac,
Edge: &edgeConfigs,
})
if err != nil {
slog.Warn("断开云平台连接失败", "error", err.Error())
return core.NewServErr("断开云平台连接失败", err)
}
} else {
for _, item := range configs {
str, _ := json.Marshal(item)
fmt.Println(string(str))
}
}
pid = proxy.ID
} else {
slog.Warn(fmt.Sprintf("通道 %d 没有保存节点引用", channel.ID))
pid = *proxyId
}
configs[i] = &g.PortConfigsReq{
Status: false,
Port: int(channel.Port),
Edge: &[]string{},
}
}
// 提交配置
if env.RunMode == env.RunModeProd {
// 清空通道配置
secret := strings.Split(u.Z(proxy.Secret), ":")
gateway := g.NewGateway(proxy.IP.String(), secret[0], secret[1])
err := gateway.GatewayPortConfigs(configs)
// 释放端口
err := freeChans(pid, batch)
if err != nil {
return core.NewServErr(fmt.Sprintf("清空代理 %s 端口配置失败", proxy.IP.String()), err)
return err
}
// 断开节点连接
_, err = g.Cloud.CloudDisconnect(&g.CloudDisconnectReq{
Uuid: proxy.Mac,
Edge: &edgeConfigs,
})
if err != nil {
slog.Warn("断开云平台连接失败", "error", err.Error())
return core.NewServErr("断开云平台连接失败", err)
}
} else {
for _, item := range configs {
str, _ := json.Marshal(item)
fmt.Println(string(str))
}
}
// 释放端口
err = freeChans(proxy.ID, batch)
if err != nil {
return err
}
slog.Debug("清除代理端口配置", "proxy", proxy.IP.String(), "batch", batch, "duration", time.Since(start).String())
return nil
slog.Debug("清除代理端口配置", "proxy", pid, "batch", batch, "duration", time.Since(start).String())
return nil
})
}
func (s *channelBaiyinProvider) ClearExpiredChannels() error {
now := time.Now().Add(time.Hour)
var batches []struct{ BatchNo string }
err := q.Channel.Select(q.Channel.BatchNo).Where(q.Channel.ExpiredAt.Lt(now)).Group(q.Channel.BatchNo).Scan(&batches)
// ClearExpiredChannels 定期清理过期通道,返回清理数量
// 通道有三种情况:
// - 过期等待清理,过期时间在一小时内,可以等待异步任务回收通道
// - 过期未清理,过期时间超过一小时,说明异步任务可能失败了,需要强制清理
// - 异常通道,取用后任务失败,导致通道悬空,需要强制清理
func (s *channelBaiyinProvider) ClearExpiredChannels(proxyId int32) (int, error) {
now := time.Now()
// 获取使用中通道批次
keys, err := g.Redis.Keys(context.Background(), usedChansKey(proxyId, "*")).Result()
if err != nil {
return core.NewServErr("查询过期通道失败", err)
return 0, core.NewServErr("查询使用中通道失败", err)
}
batchList := make([]string, len(keys))
batchSet := make(map[string]struct{}, len(keys))
for i, key := range keys {
parts := strings.Split(key, ":")
if len(parts) != 4 {
return 0, core.NewServErr(fmt.Sprintf("使用中通道键格式错误: %s", key), nil)
}
batchList[i] = parts[3]
batchSet[parts[3]] = struct{}{}
}
for _, batch := range batches {
err := s.RemoveChannels(batch.BatchNo)
// 排除未过期通道
var batchQueried []struct{ BatchNo string }
err = q.Channel.Debug().
Select(q.Channel.BatchNo).
Where(
q.Channel.BatchNo.In(batchList...),
q.Channel.ExpiredAt.Gte(now),
).
Group(q.Channel.BatchNo).
Scan(&batchQueried)
if err != nil {
return 0, core.NewServErr("查询过期通道失败", err)
}
for _, batch := range batchQueried {
delete(batchSet, batch.BatchNo)
}
// 清理过期通道
slog.Info("批量清理过期通道", "count", len(batchSet))
for batchNo, _ := range batchSet {
err := s.RemoveChannels(batchNo, &proxyId)
if err != nil {
slog.Error("清理过期通道失败", "batch", batch.BatchNo, "error", err)
slog.Error("清理过期通道失败", "batch", batchNo, "error", err)
}
}
return nil
return len(batchSet), nil
}
func batchRemoveExpiredKey(bid string) string {
return fmt.Sprintf("platform:batch:remove_expired:%s", bid)
}
func getAvailableEdges(gateway g.GatewayClient, filter *EdgeFilter, count int) ([]EdgeInfo, error) {

View File

@@ -10,7 +10,6 @@ import (
"platform/web/globals/orm"
m "platform/web/models"
q "platform/web/queries"
"strconv"
"time"
"gorm.io/gen/field"
@@ -26,7 +25,7 @@ func proxyStatusLockKey(id int32) string {
func hasUsedChans(proxyID int32) (bool, error) {
ctx := context.Background()
pattern := usedChansKey + ":" + strconv.Itoa(int(proxyID)) + ":*"
pattern := usedChansKey(proxyID, "*")
keys, _, err := g.Redis.Scan(ctx, 0, pattern, 1).Result()
if err != nil {
return false, err