package tasks import ( "context" "encoding/json" "fmt" "log/slog" "platform/pkg/env" "platform/pkg/u" "platform/web/events" e "platform/web/events" g "platform/web/globals" m "platform/web/models" s "platform/web/services" "strings" "time" "github.com/hibiken/asynq" ) func HandleCancelTrade(_ context.Context, task *asynq.Task) (err error) { data := new(events.CancelTradeData) err = json.Unmarshal(task.Payload(), data) if err != nil { return fmt.Errorf("解析任务参数失败: %w", err) } err = s.Trade.CancelTrade(&s.ModifyTradeData{ TradeNo: data.TradeNo, Method: data.Method, }, time.Now()) if err != nil { return fmt.Errorf("取消交易失败: %w", err) } return nil } func HandleRemoveChannel(_ context.Context, task *asynq.Task) (err error) { data := new(e.RemoveChannelData) err = json.Unmarshal(task.Payload(), data) if err != nil { return fmt.Errorf("解析任务参数失败: %w", err) } err = s.Channel.RemoveChannels(data.Batch, data.IDs) if err != nil { return fmt.Errorf("删除通道失败: %w", err) } return nil } func HandleFlushGateway(_ context.Context, task *asynq.Task) (err error) { now := time.Now() // 获取所有网关:配置组 proxies, err := s.Proxy.AllProxies(m.ProxyTypeBaiYin, true) if err != nil { return fmt.Errorf("获取网关失败: %w", err) } for _, proxy := range proxies { // 获取当前后备配置 locals := map[string]int{} for _, channel := range proxy.Channels { isp := channel.FilterISP.String() prov := u.Z(channel.FilterProv) city := u.Z(channel.FilterCity) locals[fmt.Sprintf("%s:%s:%s", isp, prov, city)]++ } // 获取之前的后备配置 remotes := map[string]int{} if proxy.Meta != nil { meta, ok := proxy.Meta.Data().([]g.AutoConfig) if !ok { return fmt.Errorf("解析网关数据失败") } for _, m := range meta { remotes[fmt.Sprintf("%s:%s:%s", m.Isp, m.Province, m.City)] = m.Count } } // 检查是否需要更新 pass := true for k, local := range locals { remote, ok := remotes[k] if !ok { pass = false } else { local, remote := float64(local), float64(remote) if remote < local*1.5 || remote > local*3 { pass = false } } } if pass { continue } // 更新后备配置 configs := make([]g.AutoConfig, 0) for k, local := range locals { arr := strings.Split(k, ":") isp, prov, city := arr[0], arr[1], arr[2] configs = append(configs, g.AutoConfig{ Isp: isp, Province: prov, City: city, Count: local * 2, }) } if env.DebugExternalChange { g.Cloud.CloudConnect(g.CloudConnectReq{ Uuid: proxy.Mac, AutoConfig: configs, }) } else { bytes, _ := json.Marshal(configs) slog.Debug("更新代理后备配置", "config", string(bytes)) } } slog.Debug("更新代理后备配置", "time", time.Since(now).String()) return nil }