From a0b0be2b8e5835fdedf27d9e020228c51a2a6cc4 Mon Sep 17 00:00:00 2001 From: luorijun Date: Thu, 7 May 2026 14:58:11 +0800 Subject: [PATCH] =?UTF-8?q?=E5=AE=9E=E7=8E=B0=E5=AE=9A=E6=97=B6=E9=80=9A?= =?UTF-8?q?=E9=81=93=E8=BF=87=E6=9C=9F=E6=B8=85=E7=90=86?= MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit --- README.md | 4 ++ web/core/http.go | 2 +- web/error.go | 1 - web/events/channel.go | 6 +++ web/handlers/channel.go | 2 + web/routes.go | 7 +++ web/services/channel.go | 5 +++ web/services/channel_baiyin.go | 81 +++++++++++++++++++++------------- web/tasks/task.go | 8 ++++ web/web.go | 28 ++++++++++++ 10 files changed, 112 insertions(+), 32 deletions(-) diff --git a/README.md b/README.md index fb8e974..be2ba4d 100644 --- a/README.md +++ b/README.md @@ -1,6 +1,10 @@ ## TODO +- 选择代理部分,可以检查 redis 中的可用端口数量,无需查数据库 +- 取用端口后直接写入关闭任务,避免中途失败导致端口泄漏 +- +--- 错误提示增强,展示整链路信息 diff --git a/web/core/http.go b/web/core/http.go index 604fb7c..bf72a8e 100644 --- a/web/core/http.go +++ b/web/core/http.go @@ -118,7 +118,7 @@ func Query(in any) url.Values { case int: out.Add(name, strconv.Itoa(value)) case bool: - if tags[1] == "b2i" { + if len(tags) > 1 && tags[1] == "b2i" { out.Add(name, u.Ternary(value, "1", "0")) } else { out.Add(name, strconv.FormatBool(value)) diff --git a/web/error.go b/web/error.go index d5a30e9..93322eb 100644 --- a/web/error.go +++ b/web/error.go @@ -79,7 +79,6 @@ func ErrorHandler(c *fiber.Ctx, err error) error { slog.Warn("未处理的异常", slog.String("type", t.String()), slog.String("error", err.Error())) } - slog.Warn(message) c.Set(fiber.HeaderContentType, fiber.MIMETextPlainCharsetUTF8) return c.Status(code).SendString(message) } diff --git a/web/events/channel.go b/web/events/channel.go index 83fd339..149a622 100644 --- a/web/events/channel.go +++ b/web/events/channel.go @@ -9,3 +9,9 @@ const RemoveChannel = "channel:remove" func NewRemoveChannel(batch string) *asynq.Task { return asynq.NewTask(RemoveChannel, []byte(batch)) } + +const ClearExpiredChannels = "channel:clear_expired" + +func NewClearExpiredChannels() *asynq.Task { + return asynq.NewTask(ClearExpiredChannels, nil) +} diff --git a/web/handlers/channel.go b/web/handlers/channel.go index 0c701bc..5159db6 100644 --- a/web/handlers/channel.go +++ b/web/handlers/channel.go @@ -212,6 +212,7 @@ func CreateChannel(c *fiber.Ctx) error { resp[i] = &CreateChannelRespItem{ Proto: req.Protocol, Host: channel.Host, + IP: channel.Proxy.IP.String(), Port: channel.Port, } if req.AuthType == s.ChannelAuthTypePass { @@ -235,6 +236,7 @@ type CreateChannelReq struct { type CreateChannelRespItem struct { Proto int `json:"-"` Host string `json:"host"` + IP string `json:"ip"` Port uint16 `json:"port"` Username *string `json:"username,omitempty"` Password *string `json:"password,omitempty"` diff --git a/web/routes.go b/web/routes.go index 27a18f3..f7352ea 100644 --- a/web/routes.go +++ b/web/routes.go @@ -4,6 +4,7 @@ import ( "platform/pkg/env" auth2 "platform/web/auth" "platform/web/handlers" + s "platform/web/services" "time" q "platform/web/queries" @@ -33,6 +34,12 @@ func ApplyRouters(app *fiber.App) { } return ctx.JSON(rs) }) + debug.Get("/channel/clear-expired", func(ctx *fiber.Ctx) error { + if err := s.Channel.ClearExpiredChannels(); err != nil { + return err + } + return ctx.SendStatus(fiber.StatusOK) + }) } } diff --git a/web/services/channel.go b/web/services/channel.go index e078091..dcb5453 100644 --- a/web/services/channel.go +++ b/web/services/channel.go @@ -26,6 +26,7 @@ var Channel = &channelServer{ type ChannelServiceProvider interface { CreateChannels(source netip.Addr, resourceId int32, authWhitelist bool, authPassword bool, count int, edgeFilter *EdgeFilter) ([]*m.Channel, error) RemoveChannels(batch string) error + ClearExpiredChannels() error } type channelServer struct { @@ -40,6 +41,10 @@ func (s *channelServer) RemoveChannels(batch string) error { return s.provider.RemoveChannels(batch) } +func (s *channelServer) ClearExpiredChannels() error { + return s.provider.ClearExpiredChannels() +} + // 授权方式 type ChannelAuthType int diff --git a/web/services/channel_baiyin.go b/web/services/channel_baiyin.go index b80789f..442f4cb 100644 --- a/web/services/channel_baiyin.go +++ b/web/services/channel_baiyin.go @@ -62,7 +62,7 @@ func (s *channelBaiyinProvider) CreateChannels(source netip.Addr, resourceId int } proxy := proxyResult.Proxy - // 锁内确认状态并锁定端口,避免与状态切换并发穿透 + // 取用端口 var chans []netip.AddrPort err = g.Redsync.WithLock(proxyStatusLockKey(proxy.ID), func() error { lockedProxy, err := q.Proxy.Where(q.Proxy.ID.Eq(proxy.ID)).Take() @@ -85,18 +85,27 @@ func (s *channelBaiyinProvider) CreateChannels(source netip.Addr, resourceId int return nil, err } - // 获取可用节点 + _, err = g.Asynq.Enqueue( + e.NewRemoveChannel(batch), + asynq.ProcessAt(expire), + ) + if err != nil { + return nil, core.NewServErr("提交关闭通道任务失败", err) + } + + // 取用节点 secret := strings.Split(u.Z(proxy.Secret), ":") if len(secret) != 2 { return nil, core.NewServErr(fmt.Sprintf("代理 %s 密钥格式错误", proxy.IP.String()), nil) } gateway := g.NewGateway(proxy.IP.String(), secret[0], secret[1]) + edges, err := getAvailableEdges(gateway, filter, count) if err != nil { return nil, err } - // 准备通道数据 + // 绑定节点到端口 channels := make([]*m.Channel, count) chanConfigs := make([]*g.PortConfigsReq, count) edgeConfigs := make([]string, 0, count) @@ -117,6 +126,7 @@ func (s *channelBaiyinProvider) CreateChannels(source netip.Addr, resourceId int FilterProv: filter.Prov, FilterCity: filter.City, ExpiredAt: expire, + Proxy: &proxy, } // 通道配置数据 @@ -146,23 +156,12 @@ func (s *channelBaiyinProvider) CreateChannels(source netip.Addr, resourceId int } } - // 提交异步任务关闭通道 - _, err = g.Asynq.Enqueue( - e.NewRemoveChannel(batch), - asynq.ProcessAt(expire), - ) - if err != nil { - return nil, core.NewServErr("提交关闭通道任务失败", err) - } - // 保存数据 err = q.Q.Transaction(func(q *q.Query) error { - // 根据套餐类型和模式更新使用记录 - isShortType := resource.Type == m.ResourceTypeShort - isLongType := resource.Type == m.ResourceTypeLong - - switch { - case isShortType: + // 更新使用记录 + var err error + switch resource.Type { + case m.ResourceTypeShort: _, err = q.ResourceShort. Where( q.ResourceShort.ID.Eq(*resource.ShortId), @@ -175,7 +174,7 @@ func (s *channelBaiyinProvider) CreateChannels(source netip.Addr, resourceId int q.ResourceShort.LastAt.Value(now), ) - case isLongType: + case m.ResourceTypeLong: _, err = q.ResourceLong. Where( q.ResourceLong.ID.Eq(*resource.LongId), @@ -244,7 +243,7 @@ func (s *channelBaiyinProvider) CreateChannels(source netip.Addr, resourceId int return nil, core.NewServErr(fmt.Sprintf("配置代理 %s 端口失败", proxy.IP.String()), err) } } else { - slog.Debug("提交代理端口配置", "proxy", proxy.IP.String()) + slog.Debug("提交代理端口配置", "proxy", proxy.IP.String(), "count", len(chanConfigs), "remote", len(edgeConfigs)) for _, item := range chanConfigs { str, _ := json.Marshal(item) fmt.Println(string(str)) @@ -292,12 +291,6 @@ func (s *channelBaiyinProvider) RemoveChannels(batch string) error { // 提交配置 if env.RunMode == env.RunModeProd { - // 断开节点连接 - g.Cloud.CloudDisconnect(&g.CloudDisconnectReq{ - Uuid: proxy.Mac, - Edge: &edgeConfigs, - }) - // 清空通道配置 secret := strings.Split(u.Z(proxy.Secret), ":") gateway := g.NewGateway(proxy.IP.String(), secret[0], secret[1]) @@ -305,8 +298,18 @@ func (s *channelBaiyinProvider) RemoveChannels(batch string) error { if err != nil { return core.NewServErr(fmt.Sprintf("清空代理 %s 端口配置失败", proxy.IP.String()), err) } + + // 断开节点连接 + _, err = g.Cloud.CloudDisconnect(&g.CloudDisconnectReq{ + Uuid: proxy.Mac, + Edge: &edgeConfigs, + }) + if err != nil { + slog.Warn("断开云平台连接失败", "error", err.Error()) + return core.NewServErr("断开云平台连接失败", err) + } + } else { - slog.Debug("清除代理端口配置", "proxy", proxy.IP) for _, item := range configs { str, _ := json.Marshal(item) fmt.Println(string(str)) @@ -319,7 +322,25 @@ func (s *channelBaiyinProvider) RemoveChannels(batch string) error { return err } - slog.Debug("清除代理端口配置", "duration", time.Since(start).String()) + slog.Debug("清除代理端口配置", "proxy", proxy.IP.String(), "batch", batch, "duration", time.Since(start).String()) + return nil +} + +func (s *channelBaiyinProvider) ClearExpiredChannels() error { + now := time.Now().Add(time.Hour) + var batches []struct{ BatchNo string } + err := q.Channel.Select(q.Channel.BatchNo).Where(q.Channel.ExpiredAt.Lt(now)).Group(q.Channel.BatchNo).Scan(&batches) + if err != nil { + return core.NewServErr("查询过期通道失败", err) + } + + for _, batch := range batches { + err := s.RemoveChannels(batch.BatchNo) + if err != nil { + slog.Error("清理过期通道失败", "batch", batch.BatchNo, "error", err) + } + } + return nil } @@ -335,7 +356,7 @@ func getAvailableEdges(gateway g.GatewayClient, filter *EdgeFilter, count int) ( Assigned: u.P(false), }) if err != nil { - return nil, core.NewBizErr("获取可用节点失败", err) + return nil, core.NewBizErr("获取可用节点失败[1]", err) } for id, _ := range localEdgesResp { @@ -360,7 +381,7 @@ func getAvailableEdges(gateway g.GatewayClient, filter *EdgeFilter, count int) ( IpUnchangedTime: u.P(3600), }) if err != nil { - return nil, core.NewBizErr("获取可用节点失败", err) + return nil, core.NewBizErr("获取可用节点失败[2]", err) } for _, edge := range cloudEdgesResp.Edges { diff --git a/web/tasks/task.go b/web/tasks/task.go index 55ef830..aede48f 100644 --- a/web/tasks/task.go +++ b/web/tasks/task.go @@ -49,3 +49,11 @@ func HandleRemoveChannel(_ context.Context, task *asynq.Task) (err error) { } return nil } + +func HandleClearExpiredChannels(_ context.Context, _ *asynq.Task) (err error) { + err = s.Channel.ClearExpiredChannels() + if err != nil { + return fmt.Errorf("清理过期通道失败: %w", err) + } + return nil +} diff --git a/web/web.go b/web/web.go index ae5e512..f28007f 100644 --- a/web/web.go +++ b/web/web.go @@ -42,6 +42,10 @@ func RunApp(pCtx context.Context) error { return RunTask(ctx) }) + g.Go(func() error { + return RunCron(ctx) + }) + return g.Wait() } @@ -90,6 +94,7 @@ func RunTask(ctx context.Context) error { var mux = asynq.NewServeMux() mux.HandleFunc(events.RemoveChannel, tasks.HandleRemoveChannel) mux.HandleFunc(events.CloseTrade, tasks.HandleCompleteTrade) + mux.HandleFunc(events.ClearExpiredChannels, tasks.HandleClearExpiredChannels) // 停止服务 go func() { @@ -105,3 +110,26 @@ func RunTask(ctx context.Context) error { return nil } + +func RunCron(ctx context.Context) error { + server := asynq.NewSchedulerFromRedisClient(deps.Redis, &asynq.SchedulerOpts{ + Location: time.Local, + }) + + // 每小时清理一次一小时之前的过期通道 + server.Register("0 * * * *", asynq.NewTask(events.ClearExpiredChannels, nil)) + + // 停止服务 + go func() { + <-ctx.Done() + server.Shutdown() + }() + + // 启动服务 + err := server.Run() + if err != nil { + return fmt.Errorf("定时任务服务运行失败: %w", err) + } + + return nil +}