From caa997b95cf0dfdffb561a4c5f16bc85c5be3c73 Mon Sep 17 00:00:00 2001 From: luorijun Date: Wed, 3 Dec 2025 15:04:48 +0800 Subject: [PATCH] =?UTF-8?q?=E4=BF=AE=E5=A4=8D=E9=85=8D=E7=BD=AE=E6=9B=B4?= =?UTF-8?q?=E6=96=B0=E4=BB=BB=E5=8A=A1=E8=A7=A3=E6=9E=90=E5=A4=B1=E8=B4=A5?= =?UTF-8?q?=E9=97=AE=E9=A2=98?= MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit --- web/events/proxy.go | 10 +++++++--- web/tasks/task.go | 21 +++++++++++++++------ web/web.go | 8 ++++++-- 3 files changed, 28 insertions(+), 11 deletions(-) diff --git a/web/events/proxy.go b/web/events/proxy.go index 84837c6..e776332 100644 --- a/web/events/proxy.go +++ b/web/events/proxy.go @@ -1,9 +1,13 @@ package events -import "github.com/hibiken/asynq" +import ( + "time" + + "github.com/hibiken/asynq" +) const FlushGateway = "gateway:flush" -func NewFlushGateway() *asynq.Task { - return asynq.NewTask(FlushGateway, nil) +func NewFlushGateway(ttl time.Duration) *asynq.Task { + return asynq.NewTask(FlushGateway, nil, asynq.MaxRetry(0), asynq.Unique(ttl)) } diff --git a/web/tasks/task.go b/web/tasks/task.go index e7587fe..e63e51c 100644 --- a/web/tasks/task.go +++ b/web/tasks/task.go @@ -59,8 +59,14 @@ func HandleRemoveChannel(_ context.Context, task *asynq.Task) (err error) { return nil } -func HandleFlushGateway(_ context.Context, task *asynq.Task) (err error) { +func HandleFlushGateway(_ context.Context, task *asynq.Task) error { start := time.Now() + defer func() { + duration := time.Since(start) + if duration > time.Second { + slog.Warn("更新代理后备配置耗时过长", "time", duration.String()) + } + }() // 获取所有网关:配置组 proxies, err := s.Proxy.AllProxies(m.ProxyTypeBaiYin, true) @@ -82,12 +88,16 @@ func HandleFlushGateway(_ context.Context, task *asynq.Task) (err error) { // 获取之前的后备配置 remotes := map[string]int{} if proxy.Meta != nil { - meta, ok := proxy.Meta.Data().([]g.AutoConfig) + meta, ok := proxy.Meta.Data().([]any) if !ok { - return fmt.Errorf("解析网关数据失败") + return fmt.Errorf("解析网关数据失败: %T", proxy.Meta.Data()) } - for _, m := range meta { - remotes[fmt.Sprintf("%s:%s:%s", m.Isp, m.Province, m.City)] = m.Count + for _, rawM := range meta { + m, ok := rawM.(map[string]any) + if !ok { + return fmt.Errorf("解析网关数据失败: %T", rawM) + } + remotes[fmt.Sprintf("%s:%s:%s", m["isp"], m["province"], m["city"])] = int(m["count"].(float64)) } } @@ -142,6 +152,5 @@ func HandleFlushGateway(_ context.Context, task *asynq.Task) (err error) { } } - slog.Debug("更新代理后备配置", "time", time.Since(start).String()) return nil } diff --git a/web/web.go b/web/web.go index 715ede3..6e78eb0 100644 --- a/web/web.go +++ b/web/web.go @@ -81,7 +81,7 @@ func RunSchedule(ctx context.Context) error { Location: time.Local, }) - scheduler.Register("* * * * *", events.NewFlushGateway()) + scheduler.Register("@every 5s", events.NewFlushGateway(5*time.Second)) // 停止服务 go func() { @@ -99,7 +99,11 @@ func RunSchedule(ctx context.Context) error { } func RunTask(ctx context.Context) error { - var server = asynq.NewServerFromRedisClient(base.Redis, asynq.Config{}) + var server = asynq.NewServerFromRedisClient(base.Redis, asynq.Config{ + ErrorHandler: asynq.ErrorHandlerFunc(func(ctx context.Context, task *asynq.Task, err error) { + slog.Error("任务执行失败", "task", task.Type(), "error", err) + }), + }) var mux = asynq.NewServeMux() mux.HandleFunc(events.RemoveChannel, tasks.HandleRemoveChannel)