diff --git a/README.md b/README.md index be2ba4d..e8cc4f0 100644 --- a/README.md +++ b/README.md @@ -1,10 +1,10 @@ ## TODO -- 选择代理部分,可以检查 redis 中的可用端口数量,无需查数据库 -- 取用端口后直接写入关闭任务,避免中途失败导致端口泄漏 -- +- 解决提取成功率问题 +- otel 没有正确记录接口失败信息 +- 购买数低于最小限制 ---- +创建 channel 时以 defer 的思路执行 fallback 函数,确保执行幂等性 错误提示增强,展示整链路信息 diff --git a/web/core/scopes.go b/web/core/scopes.go index 30770e2..a423d95 100644 --- a/web/core/scopes.go +++ b/web/core/scopes.go @@ -63,10 +63,11 @@ const ( ScopeBatchReadOfUser = string("batch:read:of_user") // 读取指定用户的批次列表 ScopeBatchWrite = string("batch:write") // 写入批次 - ScopeChannel = string("channel") // IP - ScopeChannelRead = string("channel:read") // 读取 IP 列表 - ScopeChannelReadOfUser = string("channel:read:of_user") // 读取指定用户的 IP 列表 - ScopeChannelWrite = string("channel:write") // 写入 IP + ScopeChannel = string("channel") // IP + ScopeChannelRead = string("channel:read") // 读取 IP 列表 + ScopeChannelReadOfUser = string("channel:read:of_user") // 读取指定用户的 IP 列表 + ScopeChannelWrite = string("channel:write") // 写入 IP + ScopeChannelWriteClearExpired = string("channel:write:clear_expired") // 清理过期 IP ScopeProxy = string("proxy") // 代理 ScopeProxyRead = string("proxy:read") // 读取代理列表 diff --git a/web/events/channel.go b/web/events/channel.go index 149a622..83fd339 100644 --- a/web/events/channel.go +++ b/web/events/channel.go @@ -9,9 +9,3 @@ const RemoveChannel = "channel:remove" func NewRemoveChannel(batch string) *asynq.Task { return asynq.NewTask(RemoveChannel, []byte(batch)) } - -const ClearExpiredChannels = "channel:clear_expired" - -func NewClearExpiredChannels() *asynq.Task { - return asynq.NewTask(ClearExpiredChannels, nil) -} diff --git a/web/handlers/channel.go b/web/handlers/channel.go index 5159db6..fa9f12c 100644 --- a/web/handlers/channel.go +++ b/web/handlers/channel.go @@ -257,7 +257,7 @@ func RemoveChannels(c *fiber.Ctx) error { } // 删除通道 - err = s.Channel.RemoveChannels(req.Batch) + err = s.Channel.RemoveChannels(req.Batch, nil) if err != nil { return err } @@ -341,3 +341,32 @@ type PageChannelOfUserByAdminReq struct { ExpiredAtStart *time.Time `json:"expired_at_start"` ExpiredAtEnd *time.Time `json:"expired_at_end"` } + +// SyncChannelClearExpiredByAdmin 清理过期通道 +func SyncChannelClearExpiredByAdmin(c *fiber.Ctx) error { + if _, err := auth.GetAuthCtx(c).PermitAdmin(core.ScopeChannelWriteClearExpired); err != nil { + return err + } + + var req SyncChannelClearExpiredByAdminReq + if err := g.Validator.ParseBody(c, &req); err != nil { + return err + } + + count, err := s.Channel.ClearExpiredChannels(req.ProxyID) + if err != nil { + return err + } + + return c.JSON(SyncChannelClearExpiredByAdminResp{ + Count: count, + }) +} + +type SyncChannelClearExpiredByAdminReq struct { + ProxyID int32 `json:"proxy_id" validate:"required"` +} + +type SyncChannelClearExpiredByAdminResp struct { + Count int `json:"count"` +} diff --git a/web/handlers/proxy.go b/web/handlers/proxy.go index ef7672e..618ff7e 100644 --- a/web/handlers/proxy.go +++ b/web/handlers/proxy.go @@ -120,6 +120,8 @@ func RemoveProxy(c *fiber.Ctx) error { return c.JSON(nil) } +// ==================== + // region 报告上线 func ProxyReportOnline(c *fiber.Ctx) (err error) { return c.JSON(map[string]any{ diff --git a/web/middlewares.go b/web/middlewares.go index 7b74d07..6bc5eaf 100644 --- a/web/middlewares.go +++ b/web/middlewares.go @@ -11,6 +11,8 @@ import ( "github.com/gofiber/fiber/v2/middleware/requestid" "github.com/google/uuid" "github.com/jxskiss/base62" + "go.opentelemetry.io/otel/attribute" + "go.opentelemetry.io/otel/trace" ) func ApplyMiddlewares(app *fiber.App) { @@ -20,14 +22,6 @@ func ApplyMiddlewares(app *fiber.App) { EnableStackTrace: true, })) - // cors - app.Use(cors.New(cors.Config{ - AllowCredentials: true, - AllowOriginsFunc: func(origin string) bool { - return true - }, - })) - // logger app.Use(logger.New(logger.Config{ Next: func(c *fiber.Ctx) bool { @@ -37,6 +31,34 @@ func ApplyMiddlewares(app *fiber.App) { // metric app.Use(otelfiber.Middleware()) + app.Use(func(c *fiber.Ctx) error { + err := c.Next() + + span := trace.SpanFromContext(c.UserContext()) + if !span.IsRecording() { + return err + } + + status := c.Response().StatusCode() + body := []byte{} + if status < 200 || status >= 300 { + body = c.Response().Body() + if len(body) > 1024 { + body = body[:1024] + } + } + + span.SetAttributes(attribute.String("http.response.error", string(body))) + return err + }) + + // cors + app.Use(cors.New(cors.Config{ + AllowCredentials: true, + AllowOriginsFunc: func(origin string) bool { + return true + }, + })) // request id app.Use(requestid.New(requestid.Config{ diff --git a/web/routes.go b/web/routes.go index f7352ea..19e28f6 100644 --- a/web/routes.go +++ b/web/routes.go @@ -3,8 +3,8 @@ package web import ( "platform/pkg/env" auth2 "platform/web/auth" + "platform/web/core" "platform/web/handlers" - s "platform/web/services" "time" q "platform/web/queries" @@ -34,11 +34,8 @@ func ApplyRouters(app *fiber.App) { } return ctx.JSON(rs) }) - debug.Get("/channel/clear-expired", func(ctx *fiber.Ctx) error { - if err := s.Channel.ClearExpiredChannels(); err != nil { - return err - } - return ctx.SendStatus(fiber.StatusOK) + debug.Get("/test/err", func(ctx *fiber.Ctx) error { + return core.NewBizErr("测试错误") }) } } @@ -208,6 +205,7 @@ func adminRouter(api fiber.Router) { var channel = api.Group("/channel") channel.Post("/page", handlers.PageChannelByAdmin) channel.Post("/page/of-user", handlers.PageChannelOfUserByAdmin) + channel.Post("/sync/clear-expired", handlers.SyncChannelClearExpiredByAdmin) // proxy 代理 var proxy = api.Group("/proxy") diff --git a/web/services/channel.go b/web/services/channel.go index dcb5453..08eb18a 100644 --- a/web/services/channel.go +++ b/web/services/channel.go @@ -25,8 +25,8 @@ var Channel = &channelServer{ type ChannelServiceProvider interface { CreateChannels(source netip.Addr, resourceId int32, authWhitelist bool, authPassword bool, count int, edgeFilter *EdgeFilter) ([]*m.Channel, error) - RemoveChannels(batch string) error - ClearExpiredChannels() error + RemoveChannels(batch string, proxyId *int32) error + ClearExpiredChannels(proxyId int32) (int, error) } type channelServer struct { @@ -37,12 +37,12 @@ func (s *channelServer) CreateChannels(source netip.Addr, resourceId int32, auth return s.provider.CreateChannels(source, resourceId, authWhitelist, authPassword, count, edgeFilter) } -func (s *channelServer) RemoveChannels(batch string) error { - return s.provider.RemoveChannels(batch) +func (s *channelServer) RemoveChannels(batch string, proxyId *int32) error { + return s.provider.RemoveChannels(batch, proxyId) } -func (s *channelServer) ClearExpiredChannels() error { - return s.provider.ClearExpiredChannels() +func (s *channelServer) ClearExpiredChannels(proxyId int32) (int, error) { + return s.provider.ClearExpiredChannels(proxyId) } // 授权方式 @@ -220,10 +220,13 @@ func ensure(now time.Time, source netip.Addr, resourceId int32, authWhitelist bo return resource, ips, nil } -var ( - freeChansKey = "channel:free" - usedChansKey = "channel:used" -) +func freeChansKey(proxy int32) string { + return "channel:free:" + strconv.Itoa(int(proxy)) +} + +func usedChansKey(proxy int32, batch string) string { + return "channel:used:" + strconv.Itoa(int(proxy)) + ":" + batch +} // 扩容通道 func regChans(proxy int32, chans []netip.AddrPort) error { @@ -232,7 +235,7 @@ func regChans(proxy int32, chans []netip.AddrPort) error { strs[i] = ch.String() } - key := freeChansKey + ":" + strconv.Itoa(int(proxy)) + key := freeChansKey(proxy) err := g.Redis.SAdd(context.Background(), key, strs...).Err() if err != nil { return fmt.Errorf("扩容通道失败: %w", err) @@ -242,7 +245,7 @@ func regChans(proxy int32, chans []netip.AddrPort) error { // 缩容通道 func remChans(proxy int32) error { - key := freeChansKey + ":" + strconv.Itoa(int(proxy)) + key := freeChansKey(proxy) err := g.Redis.Del(context.Background(), key).Err() if err != nil { return fmt.Errorf("缩容通道失败: %w", err) @@ -252,13 +255,12 @@ func remChans(proxy int32) error { // 取用通道 func lockChans(proxy int32, batch string, count int) ([]netip.AddrPort, error) { - pid := strconv.Itoa(int(proxy)) chans, err := RedisScriptLockChans.Run( context.Background(), g.Redis, []string{ - freeChansKey + ":" + pid, - usedChansKey + ":" + pid + ":" + batch, + freeChansKey(proxy), + usedChansKey(proxy, batch), }, count, ).StringSlice() @@ -296,13 +298,12 @@ return ports // 归还通道 func freeChans(proxy int32, batch string) error { - pid := strconv.Itoa(int(proxy)) err := RedisScriptFreeChans.Run( context.Background(), g.Redis, []string{ - freeChansKey + ":" + pid, - usedChansKey + ":" + pid + ":" + batch, + freeChansKey(proxy), + usedChansKey(proxy, batch), }, ).Err() if err != nil { diff --git a/web/services/channel_baiyin.go b/web/services/channel_baiyin.go index 442f4cb..2aa8f57 100644 --- a/web/services/channel_baiyin.go +++ b/web/services/channel_baiyin.go @@ -1,6 +1,7 @@ package services import ( + "context" "encoding/json" "fmt" "log/slog" @@ -39,6 +40,15 @@ func (s *channelBaiyinProvider) CreateChannels(source netip.Addr, resourceId int user := resource.User expire := now.Add(resource.Live) + // 注册异步关闭任务 + _, err = g.Asynq.Enqueue( + e.NewRemoveChannel(batch), + asynq.ProcessAt(expire), + ) + if err != nil { + return nil, core.NewServErr("注册异步关闭通道任务失败", err) + } + // 选择代理 proxyResult := struct { m.Proxy @@ -85,14 +95,6 @@ func (s *channelBaiyinProvider) CreateChannels(source netip.Addr, resourceId int return nil, err } - _, err = g.Asynq.Enqueue( - e.NewRemoveChannel(batch), - asynq.ProcessAt(expire), - ) - if err != nil { - return nil, core.NewServErr("提交关闭通道任务失败", err) - } - // 取用节点 secret := strings.Split(u.Z(proxy.Secret), ":") if len(secret) != 2 { @@ -156,6 +158,32 @@ func (s *channelBaiyinProvider) CreateChannels(source netip.Addr, resourceId int } } + // 提交配置 + slog.Debug("提交代理端口配置", "proxy", proxy.IP.String(), "total_count", len(chanConfigs), "remote_count", len(edgeConfigs)) + if env.RunMode == env.RunModeProd { + + // 连接节点到网关 + err = g.Cloud.CloudConnect(&g.CloudConnectReq{ + Uuid: proxy.Mac, + Edge: &edgeConfigs, + }) + if err != nil { + return nil, core.NewServErr("连接云平台失败", err) + } + + // 启用网关代理通道 + err = gateway.GatewayPortConfigs(chanConfigs) + if err != nil { + slog.Warn("提交代理端口配置失败", "error", err.Error()) + return nil, core.NewServErr(fmt.Sprintf("配置代理 %s 端口失败", proxy.IP.String()), err) + } + } else { + for _, item := range chanConfigs { + str, _ := json.Marshal(item) + fmt.Println(string(str)) + } + } + // 保存数据 err = q.Q.Transaction(func(q *q.Query) error { // 更新使用记录 @@ -224,124 +252,155 @@ func (s *channelBaiyinProvider) CreateChannels(source netip.Addr, resourceId int return nil, err } - // 提交配置 - if env.RunMode == env.RunModeProd { - - // 连接节点到网关 - err = g.Cloud.CloudConnect(&g.CloudConnectReq{ - Uuid: proxy.Mac, - Edge: &edgeConfigs, - }) - if err != nil { - return nil, core.NewServErr("连接云平台失败", err) - } - - // 启用网关代理通道 - err = gateway.GatewayPortConfigs(chanConfigs) - if err != nil { - slog.Warn("提交代理端口配置失败", "error", err.Error()) - return nil, core.NewServErr(fmt.Sprintf("配置代理 %s 端口失败", proxy.IP.String()), err) - } - } else { - slog.Debug("提交代理端口配置", "proxy", proxy.IP.String(), "count", len(chanConfigs), "remote", len(edgeConfigs)) - for _, item := range chanConfigs { - str, _ := json.Marshal(item) - fmt.Println(string(str)) - } - } - return channels, nil } -func (s *channelBaiyinProvider) RemoveChannels(batch string) error { - start := time.Now() +func (s *channelBaiyinProvider) RemoveChannels(batch string, proxyId *int32) error { + return g.Redsync.WithLock(batchRemoveExpiredKey(batch), func() error { + start := time.Now() - // 获取连接数据 - channels, err := q.Channel.Where(q.Channel.BatchNo.Eq(batch)).Find() - if err != nil { - return core.NewServErr(fmt.Sprintf("获取通道数据失败,batch:%s", batch), err) - } - if len(channels) == 0 { - slog.Warn(fmt.Sprintf("未找到通道数据,batch:%s", batch)) - return nil - } + pid := int32(0) + if proxyId == nil { + // 获取连接数据 + channels, err := q.Channel.Where(q.Channel.BatchNo.Eq(batch)).Find() + if err != nil { + return core.NewServErr(fmt.Sprintf("获取通道数据失败,batch:%s", batch), err) + } + if len(channels) == 0 { + slog.Warn(fmt.Sprintf("未找到通道数据,batch:%s", batch)) + return nil + } - proxy, err := q.Proxy.Where(q.Proxy.ID.Eq(channels[0].ProxyID)).Take() - if err != nil { - return core.NewServErr(fmt.Sprintf("获取代理数据失败,batch:%s", batch), err) - } + proxy, err := q.Proxy.Where(q.Proxy.ID.Eq(channels[0].ProxyID)).Take() + if err != nil { + return core.NewServErr(fmt.Sprintf("获取代理数据失败,batch:%s", batch), err) + } - // 准备配置数据 - edgeConfigs := make([]string, len(channels)) - configs := make([]*g.PortConfigsReq, len(channels)) - for i, channel := range channels { - if channel.EdgeRef != nil { - edgeConfigs[i] = *channel.EdgeRef + // 检查通道是否存在 + exist, err := g.Redis.Exists(context.Background(), usedChansKey(proxy.ID, batch)).Result() + if err != nil { + return core.NewServErr("查询使用中通道失败", err) + } + if exist == 0 { + return nil // 没有使用中通道,已经被清理过了 + } + + // 准备配置数据 + edgeConfigs := make([]string, len(channels)) + configs := make([]*g.PortConfigsReq, len(channels)) + for i, channel := range channels { + if channel.EdgeRef != nil { + edgeConfigs[i] = *channel.EdgeRef + } else { + slog.Warn(fmt.Sprintf("通道 %d 没有保存节点引用", channel.ID)) + } + + configs[i] = &g.PortConfigsReq{ + Status: false, + Port: int(channel.Port), + Edge: &[]string{}, + } + } + + // 提交配置 + if env.RunMode == env.RunModeProd { + + // 清空通道配置 + secret := strings.Split(u.Z(proxy.Secret), ":") + gateway := g.NewGateway(proxy.IP.String(), secret[0], secret[1]) + err := gateway.GatewayPortConfigs(configs) + if err != nil { + return core.NewServErr(fmt.Sprintf("清空代理 %s 端口配置失败", proxy.IP.String()), err) + } + + // 断开节点连接 + _, err = g.Cloud.CloudDisconnect(&g.CloudDisconnectReq{ + Uuid: proxy.Mac, + Edge: &edgeConfigs, + }) + if err != nil { + slog.Warn("断开云平台连接失败", "error", err.Error()) + return core.NewServErr("断开云平台连接失败", err) + } + + } else { + for _, item := range configs { + str, _ := json.Marshal(item) + fmt.Println(string(str)) + } + } + + pid = proxy.ID } else { - slog.Warn(fmt.Sprintf("通道 %d 没有保存节点引用", channel.ID)) + pid = *proxyId } - configs[i] = &g.PortConfigsReq{ - Status: false, - Port: int(channel.Port), - Edge: &[]string{}, - } - } - - // 提交配置 - if env.RunMode == env.RunModeProd { - - // 清空通道配置 - secret := strings.Split(u.Z(proxy.Secret), ":") - gateway := g.NewGateway(proxy.IP.String(), secret[0], secret[1]) - err := gateway.GatewayPortConfigs(configs) + // 释放端口 + err := freeChans(pid, batch) if err != nil { - return core.NewServErr(fmt.Sprintf("清空代理 %s 端口配置失败", proxy.IP.String()), err) + return err } - // 断开节点连接 - _, err = g.Cloud.CloudDisconnect(&g.CloudDisconnectReq{ - Uuid: proxy.Mac, - Edge: &edgeConfigs, - }) - if err != nil { - slog.Warn("断开云平台连接失败", "error", err.Error()) - return core.NewServErr("断开云平台连接失败", err) - } - - } else { - for _, item := range configs { - str, _ := json.Marshal(item) - fmt.Println(string(str)) - } - } - - // 释放端口 - err = freeChans(proxy.ID, batch) - if err != nil { - return err - } - - slog.Debug("清除代理端口配置", "proxy", proxy.IP.String(), "batch", batch, "duration", time.Since(start).String()) - return nil + slog.Debug("清除代理端口配置", "proxy", pid, "batch", batch, "duration", time.Since(start).String()) + return nil + }) } -func (s *channelBaiyinProvider) ClearExpiredChannels() error { - now := time.Now().Add(time.Hour) - var batches []struct{ BatchNo string } - err := q.Channel.Select(q.Channel.BatchNo).Where(q.Channel.ExpiredAt.Lt(now)).Group(q.Channel.BatchNo).Scan(&batches) +// ClearExpiredChannels 定期清理过期通道,返回清理数量 +// 通道有三种情况: +// - 过期等待清理,过期时间在一小时内,可以等待异步任务回收通道 +// - 过期未清理,过期时间超过一小时,说明异步任务可能失败了,需要强制清理 +// - 异常通道,取用后任务失败,导致通道悬空,需要强制清理 +func (s *channelBaiyinProvider) ClearExpiredChannels(proxyId int32) (int, error) { + now := time.Now() + + // 获取使用中通道批次 + keys, err := g.Redis.Keys(context.Background(), usedChansKey(proxyId, "*")).Result() if err != nil { - return core.NewServErr("查询过期通道失败", err) + return 0, core.NewServErr("查询使用中通道失败", err) + } + batchList := make([]string, len(keys)) + batchSet := make(map[string]struct{}, len(keys)) + for i, key := range keys { + parts := strings.Split(key, ":") + if len(parts) != 4 { + return 0, core.NewServErr(fmt.Sprintf("使用中通道键格式错误: %s", key), nil) + } + batchList[i] = parts[3] + batchSet[parts[3]] = struct{}{} } - for _, batch := range batches { - err := s.RemoveChannels(batch.BatchNo) + // 排除未过期通道 + var batchQueried []struct{ BatchNo string } + err = q.Channel.Debug(). + Select(q.Channel.BatchNo). + Where( + q.Channel.BatchNo.In(batchList...), + q.Channel.ExpiredAt.Gte(now), + ). + Group(q.Channel.BatchNo). + Scan(&batchQueried) + if err != nil { + return 0, core.NewServErr("查询过期通道失败", err) + } + for _, batch := range batchQueried { + delete(batchSet, batch.BatchNo) + } + + // 清理过期通道 + slog.Info("批量清理过期通道", "count", len(batchSet)) + for batchNo, _ := range batchSet { + err := s.RemoveChannels(batchNo, &proxyId) if err != nil { - slog.Error("清理过期通道失败", "batch", batch.BatchNo, "error", err) + slog.Error("清理过期通道失败", "batch", batchNo, "error", err) } } - return nil + return len(batchSet), nil +} + +func batchRemoveExpiredKey(bid string) string { + return fmt.Sprintf("platform:batch:remove_expired:%s", bid) } func getAvailableEdges(gateway g.GatewayClient, filter *EdgeFilter, count int) ([]EdgeInfo, error) { diff --git a/web/services/proxy.go b/web/services/proxy.go index 6157c65..bf4ee77 100644 --- a/web/services/proxy.go +++ b/web/services/proxy.go @@ -10,7 +10,6 @@ import ( "platform/web/globals/orm" m "platform/web/models" q "platform/web/queries" - "strconv" "time" "gorm.io/gen/field" @@ -26,7 +25,7 @@ func proxyStatusLockKey(id int32) string { func hasUsedChans(proxyID int32) (bool, error) { ctx := context.Background() - pattern := usedChansKey + ":" + strconv.Itoa(int(proxyID)) + ":*" + pattern := usedChansKey(proxyID, "*") keys, _, err := g.Redis.Scan(ctx, 0, pattern, 1).Result() if err != nil { return false, err diff --git a/web/tasks/task.go b/web/tasks/task.go index aede48f..7a85400 100644 --- a/web/tasks/task.go +++ b/web/tasks/task.go @@ -13,6 +13,7 @@ import ( ) func HandleCompleteTrade(_ context.Context, task *asynq.Task) error { + slog.Info("[event]尝试结束交易") var event events.CloseTradeData if err := json.Unmarshal(task.Payload(), &event); err != nil { return fmt.Errorf("解析任务参数失败: %w", err) @@ -30,11 +31,11 @@ func HandleCompleteTrade(_ context.Context, task *asynq.Task) error { } if err := s.Trade.CompleteTrade(user, &data); err != nil { - slog.Debug("完成交易失败[异步结束订单]", "err", err) + slog.Debug("结束交易失败:完成交易失败", "err", err) // 交易无法完成,关闭交易 if err := s.Trade.CancelTrade(&data); err != nil { - return fmt.Errorf("取消交易失败[异步结束订单]: %w", err) + return fmt.Errorf("结束交易失败:取消交易失败: %w", err) } } @@ -43,17 +44,11 @@ func HandleCompleteTrade(_ context.Context, task *asynq.Task) error { func HandleRemoveChannel(_ context.Context, task *asynq.Task) (err error) { batch := string(task.Payload()) - err = s.Channel.RemoveChannels(batch) + slog.Info("[event]删除通道", "batch", batch) + + err = s.Channel.RemoveChannels(batch, nil) if err != nil { return fmt.Errorf("删除通道失败: %w", err) } return nil } - -func HandleClearExpiredChannels(_ context.Context, _ *asynq.Task) (err error) { - err = s.Channel.ClearExpiredChannels() - if err != nil { - return fmt.Errorf("清理过期通道失败: %w", err) - } - return nil -} diff --git a/web/web.go b/web/web.go index 94ec63c..84c8469 100644 --- a/web/web.go +++ b/web/web.go @@ -42,10 +42,6 @@ func RunApp(pCtx context.Context) error { return RunTask(ctx) }) - g.Go(func() error { - return RunCron(ctx) - }) - return g.Wait() } @@ -95,7 +91,6 @@ func RunTask(ctx context.Context) error { var mux = asynq.NewServeMux() mux.HandleFunc(events.RemoveChannel, tasks.HandleRemoveChannel) mux.HandleFunc(events.CloseTrade, tasks.HandleCompleteTrade) - mux.HandleFunc(events.ClearExpiredChannels, tasks.HandleClearExpiredChannels) // 停止服务 go func() { @@ -112,30 +107,6 @@ func RunTask(ctx context.Context) error { return nil } -func RunCron(ctx context.Context) error { - server := asynq.NewSchedulerFromRedisClient(deps.Redis, &asynq.SchedulerOpts{ - Location: time.Local, - Logger: &AppAsynqLogger{}, - }) - - // 每小时清理一次一小时之前的过期通道 - server.Register("0 * * * *", asynq.NewTask(events.ClearExpiredChannels, nil)) - - // 停止服务 - go func() { - <-ctx.Done() - server.Shutdown() - }() - - // 启动服务 - err := server.Run() - if err != nil { - return fmt.Errorf("定时任务服务运行失败: %w", err) - } - - return nil -} - type AppAsynqLogger struct{} func (l *AppAsynqLogger) Debug(args ...any) {