支付后异步任务先尝试完成订单,如果无法完成再取消

This commit is contained in:
2025-11-28 19:00:34 +08:00
parent 5b6e50de53
commit 93dfbc92fa
5 changed files with 23 additions and 18 deletions

View File

@@ -1,8 +1,6 @@
## TODO ## TODO
限制提取单次最大量 活体认证回调在后端实现
支付后异步任务,到时间后需要尝试完成订单,如果无法完成再关闭
重新实现接口 proxy 注册与注销接口: 重新实现接口 proxy 注册与注销接口:
- 注册时向 redis ports 可用池中加入端口 - 注册时向 redis ports 可用池中加入端口

View File

@@ -9,18 +9,18 @@ import (
"github.com/hibiken/asynq" "github.com/hibiken/asynq"
) )
const CancelTrade = "trade:update" const CompleteTrade = "trade:update"
type CancelTradeData struct { type CompleteTradeData struct {
TradeNo string `json:"trade_no" validate:"required"` TradeNo string `json:"trade_no" validate:"required"`
Method m.TradeMethod `json:"method" validate:"required"` Method m.TradeMethod `json:"method" validate:"required"`
} }
func NewCancelTrade(data CancelTradeData) *asynq.Task { func NewCancelTrade(data CompleteTradeData) *asynq.Task {
bytes, err := json.Marshal(data) bytes, err := json.Marshal(data)
if err != nil { if err != nil {
slog.Error("序列化更新交易任务失败", "error", err) slog.Error("序列化更新交易任务失败", "error", err)
return nil return nil
} }
return asynq.NewTask(CancelTrade, bytes) return asynq.NewTask(CompleteTrade, bytes)
} }

View File

@@ -240,7 +240,7 @@ func (s *tradeService) CreateTrade(uid int32, now time.Time, data *CreateTradeDa
// 提交异步关闭事件 // 提交异步关闭事件
closeAt := now.Add(time.Duration(env.TradeExpire) * time.Second) closeAt := now.Add(time.Duration(env.TradeExpire) * time.Second)
_, err = g.Asynq.Enqueue(e.NewCancelTrade(e.CancelTradeData{ _, err = g.Asynq.Enqueue(e.NewCancelTrade(e.CompleteTradeData{
TradeNo: tradeNo, TradeNo: tradeNo,
Method: method, Method: method,
}), asynq.ProcessAt(closeAt)) }), asynq.ProcessAt(closeAt))

View File

@@ -18,20 +18,27 @@ import (
"github.com/hibiken/asynq" "github.com/hibiken/asynq"
) )
func HandleCancelTrade(_ context.Context, task *asynq.Task) (err error) { func HandleCompleteTrade(_ context.Context, task *asynq.Task) (err error) {
data := new(events.CancelTradeData) event := new(events.CompleteTradeData)
err = json.Unmarshal(task.Payload(), data) err = json.Unmarshal(task.Payload(), event)
if err != nil { if err != nil {
return fmt.Errorf("解析任务参数失败: %w", err) return fmt.Errorf("解析任务参数失败: %w", err)
} }
err = s.Trade.CancelTrade(&s.ModifyTradeData{ data := &s.ModifyTradeData{
TradeNo: data.TradeNo, TradeNo: event.TradeNo,
Method: data.Method, Method: event.Method,
}, time.Now())
if err != nil {
return fmt.Errorf("取消交易失败: %w", err)
} }
err = s.Trade.CompleteTrade(data)
if err != nil {
slog.Debug("完成交易失败[异步结束订单]", "err", err)
err = s.Trade.CancelTrade(data, time.Now())
if err != nil {
return fmt.Errorf("取消交易失败[异步结束订单]: %w", err)
}
}
return nil return nil
} }

View File

@@ -97,7 +97,7 @@ func RunTask(ctx context.Context) error {
var mux = asynq.NewServeMux() var mux = asynq.NewServeMux()
mux.HandleFunc(events.RemoveChannel, tasks.HandleRemoveChannel) mux.HandleFunc(events.RemoveChannel, tasks.HandleRemoveChannel)
mux.HandleFunc(events.CancelTrade, tasks.HandleCancelTrade) mux.HandleFunc(events.CompleteTrade, tasks.HandleCompleteTrade)
mux.HandleFunc(events.FlushGateway, tasks.HandleFlushGateway) mux.HandleFunc(events.FlushGateway, tasks.HandleFlushGateway)
// 停止服务 // 停止服务