Files
platform/cmd/tasks/main.go

133 lines
2.6 KiB
Go

package main
import (
"context"
"encoding/json"
"errors"
"io"
"log/slog"
"net/http"
"platform/pkg/env"
"platform/pkg/logs"
"platform/pkg/rds"
"strconv"
"strings"
"sync"
"time"
"github.com/redis/go-redis/v9"
)
var RemoveEndpoint = "http://localhost:8080/api/channel/remove"
func main() {
Start()
}
var taskList = make(map[string]func(ctx context.Context, curr time.Time) error)
func Start() {
ctx := context.Background()
env.Init()
logs.Init()
rds.Init()
taskList["stopChannels"] = stopChannels
ticker := time.NewTicker(30 * time.Second)
defer ticker.Stop()
// 互斥锁确保同一时间只有一个协程运行
// 如果之前的 tick 操作未完成,则跳过当前 tick
var mutex = &sync.Mutex{}
for curr := range ticker.C {
if mutex.TryLock() {
err := process(ctx, curr)
if err != nil {
panic(err)
}
mutex.Unlock()
} else {
slog.Warn("skip tick", slog.String("tick", curr.Format("2006-01-02 15:04:05")))
}
}
}
func process(ctx context.Context, curr time.Time) error {
// todo 异步化
for name, task := range taskList {
err := task(ctx, curr)
if err != nil {
slog.Error("task failed", slog.String("task", name), slog.String("error", err.Error()))
}
}
return nil
}
func stopChannels(ctx context.Context, curr time.Time) error {
// 查询过期的 channel
result, err := rds.Client.ZRangeByScore(ctx, "tasks:channel", &redis.ZRangeBy{
Min: "0",
Max: strconv.FormatInt(curr.Unix(), 10),
}).Result()
if err != nil {
return err
}
if len(result) == 0 {
return nil
}
ids := make([]int32, len(result))
for i, str := range result {
id, err := strconv.ParseInt(str, 10, 32)
if err != nil {
return err
}
ids[i] = int32(id)
}
slog.Debug("stopChannels", slog.String("result", strings.Join(result, ",")))
// 删除过期的 channel
body, err := json.Marshal(map[string]any{
"by_ids": ids,
})
if err != nil {
return err
}
req, err := http.NewRequest(
http.MethodPost,
RemoveEndpoint,
strings.NewReader(string(body)),
)
if err != nil {
return err
}
req.Header.Set("Content-Type", "application/json")
req.SetBasicAuth("tasks", "tasks")
resp, err := http.DefaultClient.Do(req)
if err != nil {
return err
}
defer func(Body io.ReadCloser) {
_ = Body.Close()
}(resp.Body)
if resp.StatusCode != http.StatusOK {
body, err := io.ReadAll(resp.Body)
if err != nil {
return err
}
return errors.New("failed to stop channels: " + string(body))
}
// 删除 redis 中的 channel
_, err = rds.Client.ZRemRangeByScore(ctx, "tasks:channel", "0", strconv.FormatInt(curr.Unix(), 10)).Result()
return nil
}