实现定时通道过期清理

This commit is contained in:
2026-05-07 14:58:11 +08:00
parent 8fc1d30578
commit a0b0be2b8e
10 changed files with 112 additions and 32 deletions

View File

@@ -1,6 +1,10 @@
## TODO
- 选择代理部分,可以检查 redis 中的可用端口数量,无需查数据库
- 取用端口后直接写入关闭任务,避免中途失败导致端口泄漏
-
---
错误提示增强,展示整链路信息

View File

@@ -118,7 +118,7 @@ func Query(in any) url.Values {
case int:
out.Add(name, strconv.Itoa(value))
case bool:
if tags[1] == "b2i" {
if len(tags) > 1 && tags[1] == "b2i" {
out.Add(name, u.Ternary(value, "1", "0"))
} else {
out.Add(name, strconv.FormatBool(value))

View File

@@ -79,7 +79,6 @@ func ErrorHandler(c *fiber.Ctx, err error) error {
slog.Warn("未处理的异常", slog.String("type", t.String()), slog.String("error", err.Error()))
}
slog.Warn(message)
c.Set(fiber.HeaderContentType, fiber.MIMETextPlainCharsetUTF8)
return c.Status(code).SendString(message)
}

View File

@@ -9,3 +9,9 @@ const RemoveChannel = "channel:remove"
func NewRemoveChannel(batch string) *asynq.Task {
return asynq.NewTask(RemoveChannel, []byte(batch))
}
const ClearExpiredChannels = "channel:clear_expired"
func NewClearExpiredChannels() *asynq.Task {
return asynq.NewTask(ClearExpiredChannels, nil)
}

View File

@@ -212,6 +212,7 @@ func CreateChannel(c *fiber.Ctx) error {
resp[i] = &CreateChannelRespItem{
Proto: req.Protocol,
Host: channel.Host,
IP: channel.Proxy.IP.String(),
Port: channel.Port,
}
if req.AuthType == s.ChannelAuthTypePass {
@@ -235,6 +236,7 @@ type CreateChannelReq struct {
type CreateChannelRespItem struct {
Proto int `json:"-"`
Host string `json:"host"`
IP string `json:"ip"`
Port uint16 `json:"port"`
Username *string `json:"username,omitempty"`
Password *string `json:"password,omitempty"`

View File

@@ -4,6 +4,7 @@ import (
"platform/pkg/env"
auth2 "platform/web/auth"
"platform/web/handlers"
s "platform/web/services"
"time"
q "platform/web/queries"
@@ -33,6 +34,12 @@ func ApplyRouters(app *fiber.App) {
}
return ctx.JSON(rs)
})
debug.Get("/channel/clear-expired", func(ctx *fiber.Ctx) error {
if err := s.Channel.ClearExpiredChannels(); err != nil {
return err
}
return ctx.SendStatus(fiber.StatusOK)
})
}
}

View File

@@ -26,6 +26,7 @@ var Channel = &channelServer{
type ChannelServiceProvider interface {
CreateChannels(source netip.Addr, resourceId int32, authWhitelist bool, authPassword bool, count int, edgeFilter *EdgeFilter) ([]*m.Channel, error)
RemoveChannels(batch string) error
ClearExpiredChannels() error
}
type channelServer struct {
@@ -40,6 +41,10 @@ func (s *channelServer) RemoveChannels(batch string) error {
return s.provider.RemoveChannels(batch)
}
func (s *channelServer) ClearExpiredChannels() error {
return s.provider.ClearExpiredChannels()
}
// 授权方式
type ChannelAuthType int

View File

@@ -62,7 +62,7 @@ func (s *channelBaiyinProvider) CreateChannels(source netip.Addr, resourceId int
}
proxy := proxyResult.Proxy
// 锁内确认状态并锁定端口,避免与状态切换并发穿透
// 取用端口
var chans []netip.AddrPort
err = g.Redsync.WithLock(proxyStatusLockKey(proxy.ID), func() error {
lockedProxy, err := q.Proxy.Where(q.Proxy.ID.Eq(proxy.ID)).Take()
@@ -85,18 +85,27 @@ func (s *channelBaiyinProvider) CreateChannels(source netip.Addr, resourceId int
return nil, err
}
// 获取可用节点
_, err = g.Asynq.Enqueue(
e.NewRemoveChannel(batch),
asynq.ProcessAt(expire),
)
if err != nil {
return nil, core.NewServErr("提交关闭通道任务失败", err)
}
// 取用节点
secret := strings.Split(u.Z(proxy.Secret), ":")
if len(secret) != 2 {
return nil, core.NewServErr(fmt.Sprintf("代理 %s 密钥格式错误", proxy.IP.String()), nil)
}
gateway := g.NewGateway(proxy.IP.String(), secret[0], secret[1])
edges, err := getAvailableEdges(gateway, filter, count)
if err != nil {
return nil, err
}
// 准备通道数据
// 绑定节点到端口
channels := make([]*m.Channel, count)
chanConfigs := make([]*g.PortConfigsReq, count)
edgeConfigs := make([]string, 0, count)
@@ -117,6 +126,7 @@ func (s *channelBaiyinProvider) CreateChannels(source netip.Addr, resourceId int
FilterProv: filter.Prov,
FilterCity: filter.City,
ExpiredAt: expire,
Proxy: &proxy,
}
// 通道配置数据
@@ -146,23 +156,12 @@ func (s *channelBaiyinProvider) CreateChannels(source netip.Addr, resourceId int
}
}
// 提交异步任务关闭通道
_, err = g.Asynq.Enqueue(
e.NewRemoveChannel(batch),
asynq.ProcessAt(expire),
)
if err != nil {
return nil, core.NewServErr("提交关闭通道任务失败", err)
}
// 保存数据
err = q.Q.Transaction(func(q *q.Query) error {
// 根据套餐类型和模式更新使用记录
isShortType := resource.Type == m.ResourceTypeShort
isLongType := resource.Type == m.ResourceTypeLong
switch {
case isShortType:
// 更新使用记录
var err error
switch resource.Type {
case m.ResourceTypeShort:
_, err = q.ResourceShort.
Where(
q.ResourceShort.ID.Eq(*resource.ShortId),
@@ -175,7 +174,7 @@ func (s *channelBaiyinProvider) CreateChannels(source netip.Addr, resourceId int
q.ResourceShort.LastAt.Value(now),
)
case isLongType:
case m.ResourceTypeLong:
_, err = q.ResourceLong.
Where(
q.ResourceLong.ID.Eq(*resource.LongId),
@@ -244,7 +243,7 @@ func (s *channelBaiyinProvider) CreateChannels(source netip.Addr, resourceId int
return nil, core.NewServErr(fmt.Sprintf("配置代理 %s 端口失败", proxy.IP.String()), err)
}
} else {
slog.Debug("提交代理端口配置", "proxy", proxy.IP.String())
slog.Debug("提交代理端口配置", "proxy", proxy.IP.String(), "count", len(chanConfigs), "remote", len(edgeConfigs))
for _, item := range chanConfigs {
str, _ := json.Marshal(item)
fmt.Println(string(str))
@@ -292,12 +291,6 @@ func (s *channelBaiyinProvider) RemoveChannels(batch string) error {
// 提交配置
if env.RunMode == env.RunModeProd {
// 断开节点连接
g.Cloud.CloudDisconnect(&g.CloudDisconnectReq{
Uuid: proxy.Mac,
Edge: &edgeConfigs,
})
// 清空通道配置
secret := strings.Split(u.Z(proxy.Secret), ":")
gateway := g.NewGateway(proxy.IP.String(), secret[0], secret[1])
@@ -305,8 +298,18 @@ func (s *channelBaiyinProvider) RemoveChannels(batch string) error {
if err != nil {
return core.NewServErr(fmt.Sprintf("清空代理 %s 端口配置失败", proxy.IP.String()), err)
}
// 断开节点连接
_, err = g.Cloud.CloudDisconnect(&g.CloudDisconnectReq{
Uuid: proxy.Mac,
Edge: &edgeConfigs,
})
if err != nil {
slog.Warn("断开云平台连接失败", "error", err.Error())
return core.NewServErr("断开云平台连接失败", err)
}
} else {
slog.Debug("清除代理端口配置", "proxy", proxy.IP)
for _, item := range configs {
str, _ := json.Marshal(item)
fmt.Println(string(str))
@@ -319,7 +322,25 @@ func (s *channelBaiyinProvider) RemoveChannels(batch string) error {
return err
}
slog.Debug("清除代理端口配置", "duration", time.Since(start).String())
slog.Debug("清除代理端口配置", "proxy", proxy.IP.String(), "batch", batch, "duration", time.Since(start).String())
return nil
}
func (s *channelBaiyinProvider) ClearExpiredChannels() error {
now := time.Now().Add(time.Hour)
var batches []struct{ BatchNo string }
err := q.Channel.Select(q.Channel.BatchNo).Where(q.Channel.ExpiredAt.Lt(now)).Group(q.Channel.BatchNo).Scan(&batches)
if err != nil {
return core.NewServErr("查询过期通道失败", err)
}
for _, batch := range batches {
err := s.RemoveChannels(batch.BatchNo)
if err != nil {
slog.Error("清理过期通道失败", "batch", batch.BatchNo, "error", err)
}
}
return nil
}
@@ -335,7 +356,7 @@ func getAvailableEdges(gateway g.GatewayClient, filter *EdgeFilter, count int) (
Assigned: u.P(false),
})
if err != nil {
return nil, core.NewBizErr("获取可用节点失败", err)
return nil, core.NewBizErr("获取可用节点失败[1]", err)
}
for id, _ := range localEdgesResp {
@@ -360,7 +381,7 @@ func getAvailableEdges(gateway g.GatewayClient, filter *EdgeFilter, count int) (
IpUnchangedTime: u.P(3600),
})
if err != nil {
return nil, core.NewBizErr("获取可用节点失败", err)
return nil, core.NewBizErr("获取可用节点失败[2]", err)
}
for _, edge := range cloudEdgesResp.Edges {

View File

@@ -49,3 +49,11 @@ func HandleRemoveChannel(_ context.Context, task *asynq.Task) (err error) {
}
return nil
}
func HandleClearExpiredChannels(_ context.Context, _ *asynq.Task) (err error) {
err = s.Channel.ClearExpiredChannels()
if err != nil {
return fmt.Errorf("清理过期通道失败: %w", err)
}
return nil
}

View File

@@ -42,6 +42,10 @@ func RunApp(pCtx context.Context) error {
return RunTask(ctx)
})
g.Go(func() error {
return RunCron(ctx)
})
return g.Wait()
}
@@ -90,6 +94,7 @@ func RunTask(ctx context.Context) error {
var mux = asynq.NewServeMux()
mux.HandleFunc(events.RemoveChannel, tasks.HandleRemoveChannel)
mux.HandleFunc(events.CloseTrade, tasks.HandleCompleteTrade)
mux.HandleFunc(events.ClearExpiredChannels, tasks.HandleClearExpiredChannels)
// 停止服务
go func() {
@@ -105,3 +110,26 @@ func RunTask(ctx context.Context) error {
return nil
}
func RunCron(ctx context.Context) error {
server := asynq.NewSchedulerFromRedisClient(deps.Redis, &asynq.SchedulerOpts{
Location: time.Local,
})
// 每小时清理一次一小时之前的过期通道
server.Register("0 * * * *", asynq.NewTask(events.ClearExpiredChannels, nil))
// 停止服务
go func() {
<-ctx.Done()
server.Shutdown()
}()
// 启动服务
err := server.Run()
if err != nil {
return fmt.Errorf("定时任务服务运行失败: %w", err)
}
return nil
}