diff --git a/README.md b/README.md index e17f21b..a9f1b11 100644 --- a/README.md +++ b/README.md @@ -1,8 +1,6 @@ ## TODO -限制提取单次最大量 - -支付后异步任务,到时间后需要尝试完成订单,如果无法完成再关闭 +活体认证回调在后端实现 重新实现接口 proxy 注册与注销接口: - 注册时向 redis ports 可用池中加入端口 diff --git a/web/events/trade.go b/web/events/trade.go index 3351f07..6824693 100644 --- a/web/events/trade.go +++ b/web/events/trade.go @@ -9,18 +9,18 @@ import ( "github.com/hibiken/asynq" ) -const CancelTrade = "trade:update" +const CompleteTrade = "trade:update" -type CancelTradeData struct { +type CompleteTradeData struct { TradeNo string `json:"trade_no" validate:"required"` Method m.TradeMethod `json:"method" validate:"required"` } -func NewCancelTrade(data CancelTradeData) *asynq.Task { +func NewCancelTrade(data CompleteTradeData) *asynq.Task { bytes, err := json.Marshal(data) if err != nil { slog.Error("序列化更新交易任务失败", "error", err) return nil } - return asynq.NewTask(CancelTrade, bytes) + return asynq.NewTask(CompleteTrade, bytes) } diff --git a/web/services/trade.go b/web/services/trade.go index 99006bf..7e47836 100644 --- a/web/services/trade.go +++ b/web/services/trade.go @@ -240,7 +240,7 @@ func (s *tradeService) CreateTrade(uid int32, now time.Time, data *CreateTradeDa // 提交异步关闭事件 closeAt := now.Add(time.Duration(env.TradeExpire) * time.Second) - _, err = g.Asynq.Enqueue(e.NewCancelTrade(e.CancelTradeData{ + _, err = g.Asynq.Enqueue(e.NewCancelTrade(e.CompleteTradeData{ TradeNo: tradeNo, Method: method, }), asynq.ProcessAt(closeAt)) diff --git a/web/tasks/task.go b/web/tasks/task.go index 6010915..2a55090 100644 --- a/web/tasks/task.go +++ b/web/tasks/task.go @@ -18,20 +18,27 @@ import ( "github.com/hibiken/asynq" ) -func HandleCancelTrade(_ context.Context, task *asynq.Task) (err error) { - data := new(events.CancelTradeData) - err = json.Unmarshal(task.Payload(), data) +func HandleCompleteTrade(_ context.Context, task *asynq.Task) (err error) { + event := new(events.CompleteTradeData) + err = json.Unmarshal(task.Payload(), event) if err != nil { return fmt.Errorf("解析任务参数失败: %w", err) } - err = s.Trade.CancelTrade(&s.ModifyTradeData{ - TradeNo: data.TradeNo, - Method: data.Method, - }, time.Now()) - if err != nil { - return fmt.Errorf("取消交易失败: %w", err) + data := &s.ModifyTradeData{ + TradeNo: event.TradeNo, + Method: event.Method, } + + err = s.Trade.CompleteTrade(data) + if err != nil { + slog.Debug("完成交易失败[异步结束订单]", "err", err) + err = s.Trade.CancelTrade(data, time.Now()) + if err != nil { + return fmt.Errorf("取消交易失败[异步结束订单]: %w", err) + } + } + return nil } diff --git a/web/web.go b/web/web.go index a666838..e15cc20 100644 --- a/web/web.go +++ b/web/web.go @@ -97,7 +97,7 @@ func RunTask(ctx context.Context) error { var mux = asynq.NewServeMux() mux.HandleFunc(events.RemoveChannel, tasks.HandleRemoveChannel) - mux.HandleFunc(events.CancelTrade, tasks.HandleCancelTrade) + mux.HandleFunc(events.CompleteTrade, tasks.HandleCompleteTrade) mux.HandleFunc(events.FlushGateway, tasks.HandleFlushGateway) // 停止服务