重构 Dockerfile 添加构建过程
This commit is contained in:
@@ -1,132 +0,0 @@
|
||||
package main
|
||||
|
||||
import (
|
||||
"context"
|
||||
"encoding/json"
|
||||
"errors"
|
||||
"io"
|
||||
"log/slog"
|
||||
"net/http"
|
||||
"platform/pkg/env"
|
||||
"platform/pkg/logs"
|
||||
"platform/pkg/rds"
|
||||
"strconv"
|
||||
"strings"
|
||||
"sync"
|
||||
"time"
|
||||
|
||||
"github.com/redis/go-redis/v9"
|
||||
)
|
||||
|
||||
var RemoveEndpoint = "http://localhost:8080/api/channel/remove"
|
||||
|
||||
func main() {
|
||||
Start()
|
||||
}
|
||||
|
||||
var taskList = make(map[string]func(ctx context.Context, curr time.Time) error)
|
||||
|
||||
func Start() {
|
||||
ctx := context.Background()
|
||||
|
||||
env.Init()
|
||||
logs.Init()
|
||||
rds.Init()
|
||||
|
||||
taskList["stopChannels"] = stopChannels
|
||||
|
||||
ticker := time.NewTicker(30 * time.Second)
|
||||
defer ticker.Stop()
|
||||
|
||||
// 互斥锁确保同一时间只有一个协程运行
|
||||
// 如果之前的 tick 操作未完成,则跳过当前 tick
|
||||
var mutex = &sync.Mutex{}
|
||||
for curr := range ticker.C {
|
||||
if mutex.TryLock() {
|
||||
err := process(ctx, curr)
|
||||
if err != nil {
|
||||
panic(err)
|
||||
}
|
||||
mutex.Unlock()
|
||||
} else {
|
||||
slog.Warn("skip tick", slog.String("tick", curr.Format("2006-01-02 15:04:05")))
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
func process(ctx context.Context, curr time.Time) error {
|
||||
|
||||
// todo 异步化
|
||||
for name, task := range taskList {
|
||||
err := task(ctx, curr)
|
||||
if err != nil {
|
||||
slog.Error("task failed", slog.String("task", name), slog.String("error", err.Error()))
|
||||
}
|
||||
}
|
||||
|
||||
return nil
|
||||
}
|
||||
|
||||
func stopChannels(ctx context.Context, curr time.Time) error {
|
||||
|
||||
// 查询过期的 channel
|
||||
result, err := rds.Client.ZRangeByScore(ctx, "tasks:channel", &redis.ZRangeBy{
|
||||
Min: "0",
|
||||
Max: strconv.FormatInt(curr.Unix(), 10),
|
||||
}).Result()
|
||||
if err != nil {
|
||||
return err
|
||||
}
|
||||
if len(result) == 0 {
|
||||
return nil
|
||||
}
|
||||
|
||||
ids := make([]int32, len(result))
|
||||
for i, str := range result {
|
||||
id, err := strconv.ParseInt(str, 10, 32)
|
||||
if err != nil {
|
||||
return err
|
||||
}
|
||||
ids[i] = int32(id)
|
||||
}
|
||||
slog.Debug("stopChannels", slog.String("result", strings.Join(result, ",")))
|
||||
|
||||
// 删除过期的 channel
|
||||
body, err := json.Marshal(map[string]any{
|
||||
"by_ids": ids,
|
||||
})
|
||||
if err != nil {
|
||||
return err
|
||||
}
|
||||
req, err := http.NewRequest(
|
||||
http.MethodPost,
|
||||
RemoveEndpoint,
|
||||
strings.NewReader(string(body)),
|
||||
)
|
||||
if err != nil {
|
||||
return err
|
||||
}
|
||||
req.Header.Set("Content-Type", "application/json")
|
||||
req.SetBasicAuth("tasks", "tasks")
|
||||
|
||||
resp, err := http.DefaultClient.Do(req)
|
||||
if err != nil {
|
||||
return err
|
||||
}
|
||||
defer func(Body io.ReadCloser) {
|
||||
_ = Body.Close()
|
||||
}(resp.Body)
|
||||
|
||||
if resp.StatusCode != http.StatusOK {
|
||||
body, err := io.ReadAll(resp.Body)
|
||||
if err != nil {
|
||||
return err
|
||||
}
|
||||
return errors.New("failed to stop channels: " + string(body))
|
||||
}
|
||||
|
||||
// 删除 redis 中的 channel
|
||||
_, err = rds.Client.ZRemRangeByScore(ctx, "tasks:channel", "0", strconv.FormatInt(curr.Unix(), 10)).Result()
|
||||
|
||||
return nil
|
||||
}
|
||||
Reference in New Issue
Block a user