package tasks import ( "context" "encoding/json" "fmt" "log/slog" "platform/pkg/env" "platform/pkg/u" "platform/web/events" e "platform/web/events" g "platform/web/globals" m "platform/web/models" q "platform/web/queries" s "platform/web/services" "strings" "time" "github.com/hibiken/asynq" "gorm.io/datatypes" ) func HandleCompleteTrade(_ context.Context, task *asynq.Task) (err error) { event := new(events.CompleteTradeData) err = json.Unmarshal(task.Payload(), event) if err != nil { return fmt.Errorf("解析任务参数失败: %w", err) } data := &s.ModifyTradeData{ TradeNo: event.TradeNo, Method: event.Method, } err = s.Trade.CompleteTrade(data) if err != nil { slog.Debug("完成交易失败[异步结束订单]", "err", err) err = s.Trade.CancelTrade(data, time.Now()) if err != nil { return fmt.Errorf("取消交易失败[异步结束订单]: %w", err) } } return nil } func HandleRemoveChannel(_ context.Context, task *asynq.Task) (err error) { data := new(e.RemoveChannelData) err = json.Unmarshal(task.Payload(), data) if err != nil { return fmt.Errorf("解析任务参数失败: %w", err) } err = s.Channel.RemoveChannels(data.Batch, data.IDs) if err != nil { return fmt.Errorf("删除通道失败: %w", err) } return nil } func HandleFlushGateway(_ context.Context, task *asynq.Task) error { start := time.Now() defer func() { duration := time.Since(start) if duration > time.Second { slog.Warn("更新代理后备配置耗时过长", "time", duration.String()) } }() // 获取所有网关:配置组 proxies, err := s.Proxy.AllProxies(m.ProxyTypeBaiYin, true) if err != nil { return fmt.Errorf("获取网关失败: %w", err) } for _, proxy := range proxies { // 获取当前后备配置 locals := map[string]int{} for _, channel := range proxy.Channels { isp := channel.FilterISP.String() prov := u.Z(channel.FilterProv) city := u.Z(channel.FilterCity) locals[fmt.Sprintf("%s:%s:%s", isp, prov, city)]++ } // 获取之前的后备配置 remotes := map[string]int{} if proxy.Meta != nil { meta, ok := proxy.Meta.Data().([]any) if !ok { return fmt.Errorf("解析网关数据失败: %T", proxy.Meta.Data()) } for _, rawM := range meta { m, ok := rawM.(map[string]any) if !ok { return fmt.Errorf("解析网关数据失败: %T", rawM) } remotes[fmt.Sprintf("%s:%s:%s", m["isp"], m["province"], m["city"])] = int(m["count"].(float64)) } } // 检查是否需要更新 pass := true for k, local := range locals { remote, ok := remotes[k] if !ok { pass = false } else { local, remote := float64(local), float64(remote) if remote < local*1.5 || remote > local*3 { pass = false } } } if pass { continue } // 更新后备配置 configs := make([]g.AutoConfig, 0) for k, local := range locals { arr := strings.Split(k, ":") isp, prov, city := arr[0], arr[1], arr[2] configs = append(configs, g.AutoConfig{ Isp: isp, Province: prov, City: city, Count: local * 2, }) } if env.DebugExternalChange { err := g.Cloud.CloudConnect(g.CloudConnectReq{ Uuid: proxy.Mac, AutoConfig: configs, }) if err != nil { slog.Error("提交代理后备配置失败", "error", err) } } else { bytes, _ := json.Marshal(configs) slog.Debug("更新代理后备配置", "config", string(bytes)) } _, err := q.Proxy. Where(q.Proxy.ID.Eq(proxy.ID)). UpdateSimple(q.Proxy.Meta.Value(datatypes.NewJSONType(configs))) if err != nil { slog.Error("更新代理后备配置失败", "error", err) } } return nil }