优化表结构,重构模型,重新实现基于白银网关的提取节点流程
This commit is contained in:
@@ -4,8 +4,15 @@ import (
|
||||
"context"
|
||||
"encoding/json"
|
||||
"fmt"
|
||||
"log/slog"
|
||||
"platform/pkg/env"
|
||||
"platform/pkg/u"
|
||||
"platform/web/events"
|
||||
e "platform/web/events"
|
||||
g "platform/web/globals"
|
||||
m "platform/web/models"
|
||||
s "platform/web/services"
|
||||
"strings"
|
||||
"time"
|
||||
|
||||
"github.com/hibiken/asynq"
|
||||
@@ -29,16 +36,93 @@ func HandleCancelTrade(_ context.Context, task *asynq.Task) (err error) {
|
||||
}
|
||||
|
||||
func HandleRemoveChannel(_ context.Context, task *asynq.Task) (err error) {
|
||||
data := make([]int32, 0)
|
||||
err = json.Unmarshal(task.Payload(), &data)
|
||||
data := new(e.RemoveChannelData)
|
||||
err = json.Unmarshal(task.Payload(), data)
|
||||
if err != nil {
|
||||
return fmt.Errorf("解析任务参数失败: %w", err)
|
||||
}
|
||||
|
||||
err = s.Channel.RemoveChannels(data)
|
||||
err = s.Channel.RemoveChannels(data.Batch, data.IDs)
|
||||
if err != nil {
|
||||
return fmt.Errorf("删除通道失败: %w", err)
|
||||
}
|
||||
|
||||
return nil
|
||||
}
|
||||
|
||||
func HandleFlushGateway(_ context.Context, task *asynq.Task) (err error) {
|
||||
now := time.Now()
|
||||
|
||||
// 获取所有网关:配置组
|
||||
proxies, err := s.Proxy.AllProxies(m.ProxyTypeBaiYin, true)
|
||||
if err != nil {
|
||||
return fmt.Errorf("获取网关失败: %w", err)
|
||||
}
|
||||
|
||||
for _, proxy := range proxies {
|
||||
|
||||
// 获取当前后备配置
|
||||
locals := map[string]int{}
|
||||
for _, channel := range proxy.Channels {
|
||||
isp := channel.FilterISP.String()
|
||||
prov := u.Z(channel.FilterProv)
|
||||
city := u.Z(channel.FilterCity)
|
||||
locals[fmt.Sprintf("%s:%s:%s", isp, prov, city)]++
|
||||
}
|
||||
|
||||
// 获取之前的后备配置
|
||||
remotes := map[string]int{}
|
||||
if proxy.Meta != nil {
|
||||
meta, ok := proxy.Meta.Data().([]g.AutoConfig)
|
||||
if !ok {
|
||||
return fmt.Errorf("解析网关数据失败")
|
||||
}
|
||||
for _, m := range meta {
|
||||
remotes[fmt.Sprintf("%s:%s:%s", m.Isp, m.Province, m.City)] = m.Count
|
||||
}
|
||||
}
|
||||
|
||||
// 检查是否需要更新
|
||||
pass := true
|
||||
for k, local := range locals {
|
||||
remote, ok := remotes[k]
|
||||
if !ok {
|
||||
pass = false
|
||||
} else {
|
||||
local, remote := float64(local), float64(remote)
|
||||
if remote < local*1.5 || remote > local*3 {
|
||||
pass = false
|
||||
}
|
||||
}
|
||||
}
|
||||
if pass {
|
||||
continue
|
||||
}
|
||||
|
||||
// 更新后备配置
|
||||
configs := make([]g.AutoConfig, 0)
|
||||
for k, local := range locals {
|
||||
arr := strings.Split(k, ":")
|
||||
isp, prov, city := arr[0], arr[1], arr[2]
|
||||
configs = append(configs, g.AutoConfig{
|
||||
Isp: isp,
|
||||
Province: prov,
|
||||
City: city,
|
||||
Count: local * 2,
|
||||
})
|
||||
}
|
||||
|
||||
if env.DebugExternalChange {
|
||||
g.Cloud.CloudConnect(g.CloudConnectReq{
|
||||
Uuid: proxy.Mac,
|
||||
AutoConfig: configs,
|
||||
})
|
||||
} else {
|
||||
bytes, _ := json.Marshal(configs)
|
||||
slog.Debug("更新代理后备配置", "config", string(bytes))
|
||||
}
|
||||
}
|
||||
|
||||
slog.Debug("更新代理后备配置", "time", time.Since(now).String())
|
||||
return nil
|
||||
}
|
||||
|
||||
Reference in New Issue
Block a user