From 93dfbc92fad1ce8c49bc49167ec3e38b3ac457fa Mon Sep 17 00:00:00 2001 From: luorijun Date: Fri, 28 Nov 2025 19:00:34 +0800 Subject: [PATCH] =?UTF-8?q?=E6=94=AF=E4=BB=98=E5=90=8E=E5=BC=82=E6=AD=A5?= =?UTF-8?q?=E4=BB=BB=E5=8A=A1=E5=85=88=E5=B0=9D=E8=AF=95=E5=AE=8C=E6=88=90?= =?UTF-8?q?=E8=AE=A2=E5=8D=95=EF=BC=8C=E5=A6=82=E6=9E=9C=E6=97=A0=E6=B3=95?= =?UTF-8?q?=E5=AE=8C=E6=88=90=E5=86=8D=E5=8F=96=E6=B6=88?= MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit --- README.md | 4 +--- web/events/trade.go | 8 ++++---- web/services/trade.go | 2 +- web/tasks/task.go | 25 ++++++++++++++++--------- web/web.go | 2 +- 5 files changed, 23 insertions(+), 18 deletions(-) diff --git a/README.md b/README.md index e17f21b..a9f1b11 100644 --- a/README.md +++ b/README.md @@ -1,8 +1,6 @@ ## TODO -限制提取单次最大量 - -支付后异步任务,到时间后需要尝试完成订单,如果无法完成再关闭 +活体认证回调在后端实现 重新实现接口 proxy 注册与注销接口: - 注册时向 redis ports 可用池中加入端口 diff --git a/web/events/trade.go b/web/events/trade.go index 3351f07..6824693 100644 --- a/web/events/trade.go +++ b/web/events/trade.go @@ -9,18 +9,18 @@ import ( "github.com/hibiken/asynq" ) -const CancelTrade = "trade:update" +const CompleteTrade = "trade:update" -type CancelTradeData struct { +type CompleteTradeData struct { TradeNo string `json:"trade_no" validate:"required"` Method m.TradeMethod `json:"method" validate:"required"` } -func NewCancelTrade(data CancelTradeData) *asynq.Task { +func NewCancelTrade(data CompleteTradeData) *asynq.Task { bytes, err := json.Marshal(data) if err != nil { slog.Error("序列化更新交易任务失败", "error", err) return nil } - return asynq.NewTask(CancelTrade, bytes) + return asynq.NewTask(CompleteTrade, bytes) } diff --git a/web/services/trade.go b/web/services/trade.go index 99006bf..7e47836 100644 --- a/web/services/trade.go +++ b/web/services/trade.go @@ -240,7 +240,7 @@ func (s *tradeService) CreateTrade(uid int32, now time.Time, data *CreateTradeDa // 提交异步关闭事件 closeAt := now.Add(time.Duration(env.TradeExpire) * time.Second) - _, err = g.Asynq.Enqueue(e.NewCancelTrade(e.CancelTradeData{ + _, err = g.Asynq.Enqueue(e.NewCancelTrade(e.CompleteTradeData{ TradeNo: tradeNo, Method: method, }), asynq.ProcessAt(closeAt)) diff --git a/web/tasks/task.go b/web/tasks/task.go index 6010915..2a55090 100644 --- a/web/tasks/task.go +++ b/web/tasks/task.go @@ -18,20 +18,27 @@ import ( "github.com/hibiken/asynq" ) -func HandleCancelTrade(_ context.Context, task *asynq.Task) (err error) { - data := new(events.CancelTradeData) - err = json.Unmarshal(task.Payload(), data) +func HandleCompleteTrade(_ context.Context, task *asynq.Task) (err error) { + event := new(events.CompleteTradeData) + err = json.Unmarshal(task.Payload(), event) if err != nil { return fmt.Errorf("解析任务参数失败: %w", err) } - err = s.Trade.CancelTrade(&s.ModifyTradeData{ - TradeNo: data.TradeNo, - Method: data.Method, - }, time.Now()) - if err != nil { - return fmt.Errorf("取消交易失败: %w", err) + data := &s.ModifyTradeData{ + TradeNo: event.TradeNo, + Method: event.Method, } + + err = s.Trade.CompleteTrade(data) + if err != nil { + slog.Debug("完成交易失败[异步结束订单]", "err", err) + err = s.Trade.CancelTrade(data, time.Now()) + if err != nil { + return fmt.Errorf("取消交易失败[异步结束订单]: %w", err) + } + } + return nil } diff --git a/web/web.go b/web/web.go index a666838..e15cc20 100644 --- a/web/web.go +++ b/web/web.go @@ -97,7 +97,7 @@ func RunTask(ctx context.Context) error { var mux = asynq.NewServeMux() mux.HandleFunc(events.RemoveChannel, tasks.HandleRemoveChannel) - mux.HandleFunc(events.CancelTrade, tasks.HandleCancelTrade) + mux.HandleFunc(events.CompleteTrade, tasks.HandleCompleteTrade) mux.HandleFunc(events.FlushGateway, tasks.HandleFlushGateway) // 停止服务