package main import ( "context" "encoding/json" "errors" "io" "log/slog" "net/http" "platform/pkg/env" "platform/pkg/logs" "platform/pkg/rds" "strconv" "strings" "sync" "time" "github.com/redis/go-redis/v9" ) var RemoveEndpoint = "http://localhost:8080/api/channel/remove" func main() { Start() } var taskList = make(map[string]func(ctx context.Context, curr time.Time) error) func Start() { ctx := context.Background() env.Init() logs.Init() rds.Init() taskList["stopChannels"] = stopChannels ticker := time.NewTicker(30 * time.Second) defer ticker.Stop() // 互斥锁确保同一时间只有一个协程运行 // 如果之前的 tick 操作未完成,则跳过当前 tick var mutex = &sync.Mutex{} for curr := range ticker.C { if mutex.TryLock() { err := process(ctx, curr) if err != nil { panic(err) } mutex.Unlock() } else { slog.Warn("skip tick", slog.String("tick", curr.Format("2006-01-02 15:04:05"))) } } } func process(ctx context.Context, curr time.Time) error { // todo 异步化 for name, task := range taskList { err := task(ctx, curr) if err != nil { slog.Error("task failed", slog.String("task", name), slog.String("error", err.Error())) } } return nil } func stopChannels(ctx context.Context, curr time.Time) error { // 查询过期的 channel result, err := rds.Client.ZRangeByScore(ctx, "tasks:channel", &redis.ZRangeBy{ Min: "0", Max: strconv.FormatInt(curr.Unix(), 10), }).Result() if err != nil { return err } if len(result) == 0 { return nil } ids := make([]int32, len(result)) for i, str := range result { id, err := strconv.ParseInt(str, 10, 32) if err != nil { return err } ids[i] = int32(id) } // 删除过期的 channel body, err := json.Marshal(map[string]any{ "by_ids": ids, }) if err != nil { return err } req, err := http.NewRequest( http.MethodPost, RemoveEndpoint, strings.NewReader(string(body)), ) if err != nil { return err } req.Header.Set("Content-Type", "application/json") req.SetBasicAuth("tasks", "tasks") resp, err := http.DefaultClient.Do(req) if err != nil { return err } defer resp.Body.Close() if resp.StatusCode != http.StatusOK { body, err := io.ReadAll(resp.Body) if err != nil { return err } return errors.New("failed to stop channels: " + string(body)) } // 删除 redis 中的 channel _, err = rds.Client.ZRemRangeByScore(ctx, "tasks:channel", "0", strconv.FormatInt(curr.Unix(), 10)).Result() return nil }