GET类型通道创建端点;修改完善返回格式处理逻辑;动态刷新remote令牌
This commit is contained in:
@@ -1,5 +1,6 @@
|
||||
package main
|
||||
|
||||
func main() {
|
||||
println(float64(166) * 11 / 10)
|
||||
println(rune('w'))
|
||||
println(rune('m'))
|
||||
}
|
||||
|
||||
@@ -10,7 +10,6 @@ import (
|
||||
"platform/pkg/env"
|
||||
"platform/pkg/logs"
|
||||
"platform/pkg/rds"
|
||||
"reflect"
|
||||
"strconv"
|
||||
"strings"
|
||||
"sync"
|
||||
@@ -19,6 +18,8 @@ import (
|
||||
"github.com/redis/go-redis/v9"
|
||||
)
|
||||
|
||||
var RemoveEndpoint = "http://localhost:8080/api/channel/remove"
|
||||
|
||||
func main() {
|
||||
Start()
|
||||
}
|
||||
@@ -68,55 +69,38 @@ func process(ctx context.Context, curr time.Time) error {
|
||||
|
||||
func stopChannels(ctx context.Context, curr time.Time) error {
|
||||
|
||||
// 获取并删除
|
||||
script := redis.NewScript(`
|
||||
local result = redis.call('ZRANGEBYSCORE', KEYS[1], 0, ARGV[1])
|
||||
if #result > 0 then
|
||||
redis.call('ZREMRANGEBYSCORE', KEYS[1], 0, ARGV[1])
|
||||
end
|
||||
return result
|
||||
`)
|
||||
|
||||
// 执行脚本
|
||||
result, err := script.Run(ctx, rds.Client, []string{"tasks:channel"}, curr.Unix()).Result()
|
||||
// 查询过期的 channel
|
||||
result, err := rds.Client.ZRangeByScore(ctx, "tasks:channel", &redis.ZRangeBy{
|
||||
Min: "0",
|
||||
Max: strconv.FormatInt(curr.Unix(), 10),
|
||||
}).Result()
|
||||
if err != nil {
|
||||
return err
|
||||
}
|
||||
|
||||
// 处理结果
|
||||
list, ok := result.([]any)
|
||||
if !ok {
|
||||
return errors.New("failed to convert result to []string")
|
||||
}
|
||||
var ids = make([]int, len(list))
|
||||
for i, item := range list {
|
||||
idStr, ok := item.(string)
|
||||
if !ok {
|
||||
slog.Debug(reflect.TypeOf(item).String())
|
||||
return errors.New("failed to convert item to string")
|
||||
}
|
||||
id, err := strconv.Atoi(idStr)
|
||||
if err != nil {
|
||||
return err
|
||||
}
|
||||
ids[i] = id
|
||||
}
|
||||
if len(ids) == 0 {
|
||||
if len(result) == 0 {
|
||||
return nil
|
||||
}
|
||||
|
||||
var body = map[string]any{
|
||||
"by_ids": ids,
|
||||
ids := make([]int32, len(result))
|
||||
for i, str := range result {
|
||||
id, err := strconv.ParseInt(str, 10, 32)
|
||||
if err != nil {
|
||||
return err
|
||||
}
|
||||
ids[i] = int32(id)
|
||||
}
|
||||
bodyStr, err := json.Marshal(body)
|
||||
|
||||
// 删除过期的 channel
|
||||
body, err := json.Marshal(map[string]any{
|
||||
"by_ids": ids,
|
||||
})
|
||||
if err != nil {
|
||||
return err
|
||||
}
|
||||
|
||||
req, err := http.NewRequest(
|
||||
http.MethodPost,
|
||||
"http://localhost:8080/api/channel/remove", // todo 环境变量获取服务地址
|
||||
strings.NewReader(string(bodyStr)),
|
||||
RemoveEndpoint,
|
||||
strings.NewReader(string(body)),
|
||||
)
|
||||
if err != nil {
|
||||
return err
|
||||
@@ -128,6 +112,7 @@ func stopChannels(ctx context.Context, curr time.Time) error {
|
||||
if err != nil {
|
||||
return err
|
||||
}
|
||||
defer resp.Body.Close()
|
||||
|
||||
if resp.StatusCode != http.StatusOK {
|
||||
body, err := io.ReadAll(resp.Body)
|
||||
@@ -137,5 +122,8 @@ func stopChannels(ctx context.Context, curr time.Time) error {
|
||||
return errors.New("failed to stop channels: " + string(body))
|
||||
}
|
||||
|
||||
// 删除 redis 中的 channel
|
||||
_, err = rds.Client.ZRemRangeByScore(ctx, "tasks:channel", "0", strconv.FormatInt(curr.Unix(), 10)).Result()
|
||||
|
||||
return nil
|
||||
}
|
||||
|
||||
Reference in New Issue
Block a user