diff --git a/README.md b/README.md index 2d219ad..4814996 100644 --- a/README.md +++ b/README.md @@ -1,5 +1,13 @@ ## TODO +创建交易订单后添加一个关闭订单的异步任务 + +支付回调需要判断可能重复调用的情况 + +实现订单状态查询的 SSE 接口 + +考虑将重复量比较大的异步任务修改成定时调度任务 + ### 长期 模型字段修改,特定枚举字段使用自定义类型代替通用 int32 @@ -8,28 +16,16 @@ ## 业务逻辑 -### 代理服务与节点的增量更新 +### 支付处理流程 -代理服务定时提交增量更新数据包,格式为: +1. 创建订单,推送异步检查任务 +2. sse 接口推送订单状态 -| version(1) | proxy_id(4) | count(2) | edge_id(4) | mask(1) | info(...) | -|------------|-------------|----------|------------|---------|-----------| -| 数据包版本 | 代理服务 id | 更新的节点数量 | 节点 id | 节点更新项 | 节点更新内容 | +- 支付回调更新支付状态 +- 异步任务更新支付状态 +- 主动查询更新支付状态 -其中 mask 部分,每个位代表一个节点更新项,具体如下: - -| 位数(从前到后) | 节点更新项 | -|----------|----------| -| 0 | 保留位,预留扩展 | -| 1 | 空,固定为 0 | -| 2 | 空,固定为 0 | -| 3 | 端口 | -| 4 | 省份 | -| 5 | 城市 | -| 6 | 运营商 | -| 7 | 状态 | - -节点更新的具体内容大小视数据结构而定 +更新支付状态后,缓存结果以便查询 ### 产品字典表 @@ -37,14 +33,3 @@ |-------|------| | short | 短效代理 | | long | 长效代理 | - -## 业务流程 - -### 短效动态 - -1. 获取远端连接配置 -2. 查询并计算实际连接状况 -3. 每个网关: - 1. 根据负载分配算法,计算是否需要为此网关分配新的连接 - 2. 如果需要新的连接且预计连接数大于已配置的连接数,则更新远端连接配置 - 3. 为新增连接分配可用端口,并更新网关端口配置 \ No newline at end of file diff --git a/web/handlers/trade.go b/web/handlers/trade.go index 1510e87..7d0c3a4 100644 --- a/web/handlers/trade.go +++ b/web/handlers/trade.go @@ -3,6 +3,7 @@ package handlers import ( "fmt" "github.com/shopspring/decimal" + "github.com/valyala/fasthttp/fasthttpadaptor" "log/slog" "net/http" trade2 "platform/web/domains/trade" @@ -13,13 +14,25 @@ import ( "github.com/gofiber/fiber/v2" "github.com/smartwalle/alipay/v3" - "github.com/valyala/fasthttp/fasthttpadaptor" "github.com/wechatpay-apiv3/wechatpay-go/services/payments" ) +// region TradeCheckSSE + +func TradeCheckSSE(c *fiber.Ctx) error { + // 设置响应头 + c.Set("Content-Type", "text/event-stream") + c.Set("Cache-Control", "no-cache") + c.Set("Connection", "keep-alive") + + return nil +} + +// endregion + // region AlipayCallback -func AlipayCallback(c *fiber.Ctx) error { +func AlipayCallback(c *fiber.Ctx) (err error) { // 解析请求 httpRequest := new(http.Request) @@ -36,21 +49,34 @@ func AlipayCallback(c *fiber.Ctx) error { slog.Debug("支付宝支付回调", "notification", fmt.Sprintf("%+v", notification)) - // todo 退款通知 - if isRefund(notification) { - return act(c) - } - // 查询交易信息 trade, err := q.Q.Trade.Where(q.Trade.InnerNo.Eq(notification.OutTradeNo)).Take() if err != nil { - // 跳过测试通知 - return act(c) + return c.SendString("success") } - switch notification.NotifyType { + switch alipay.TradeStatus(notification.NotifyType) { + + // 等待支付 + case alipay.TradeStatusWaitBuyerPay: + // 不需要处理 + + // 支付关闭 + case alipay.TradeStatusClosed: + switch trade2.Type(trade.Type) { + + // 购买产品 + case trade2.TypePurchase: + err = s.Resource.CancelResource(notification.OutTradeNo, time.Now(), true) + if err != nil { + return err + } + + // 余额充值 + case trade2.TypeRecharge: + } // 支付成功 - case string(alipay.TradeStatusSuccess): + case alipay.TradeStatusSuccess: // 收集交易状态 payment, err := decimal.NewFromString(notification.TotalAmount) @@ -61,7 +87,7 @@ func AlipayCallback(c *fiber.Ctx) error { if err != nil { return err } - verified := &s.TransactionVerifyResult{ + verified := &s.TradeSuccessResult{ TransId: notification.TradeNo, Payment: payment, Time: paidAt, @@ -83,48 +109,18 @@ func AlipayCallback(c *fiber.Ctx) error { } } - // 支付关闭 - case string(alipay.TradeStatusClosed): - switch trade2.Type(trade.Type) { - - // 购买产品 - case trade2.TypePurchase: - err = s.Resource.CancelResource(notification.OutTradeNo, time.Now(), true) - if err != nil { - return err - } - default: - } + // 交易结束 + case alipay.TradeStatusFinished: + // 结束交易状态 } - return act(c) -} - -type AdapterWriter struct { - c *fiber.Ctx -} - -func (a AdapterWriter) Header() http.Header { - panic("implement me") -} - -func (a AdapterWriter) Write(bytes []byte) (int, error) { - return a.c.Write(bytes) -} - -func (a AdapterWriter) WriteHeader(statusCode int) { - a.c.Status(statusCode) + return c.SendString("success") } func isRefund(notification *alipay.Notification) bool { return notification.OutBizNo != "" || notification.RefundFee != "" || notification.GmtRefund != "" } -func act(c *fiber.Ctx) error { - g.Alipay.ACKNotification(AdapterWriter{c: c}) - return nil -} - // endregion // region WechatPayCallback @@ -162,7 +158,7 @@ func WechatPayCallback(c *fiber.Ctx) error { if err != nil { return err } - verified := &s.TransactionVerifyResult{ + verified := &s.TradeSuccessResult{ TransId: *content.TransactionId, Payment: payment, Time: paidAt, diff --git a/web/handlers/user.go b/web/handlers/user.go index 6ae55a1..1d6a0eb 100644 --- a/web/handlers/user.go +++ b/web/handlers/user.go @@ -179,9 +179,9 @@ func RechargePrepareAlipay(c *fiber.Ctx) error { if err != nil { return err } - var result *s.TransactionPrepareResult + var result *s.TradeCreateResult err = q.Q.Transaction(func(tx *q.Query) error { - result, err = s.Transaction.PrepareTransaction(tx, authContext.Payload.Id, now, &s.TransactionPrepareData{ + result, err = s.Trade.SendCreateTradeByQrcode(tx, authContext.Payload.Id, now, &s.TradeCreateData{ Subject: "账户充值 - " + amount.StringFixed(2) + "元", Amount: amount, ExpireAt: time.Now().Add(30 * time.Minute), @@ -215,7 +215,7 @@ func RechargeConfirmAlipay(c *fiber.Ctx) error { } // 验证支付结果 - result, err := s.Transaction.VerifyTransaction(&s.TransactionVerifyData{ + result, err := s.Trade.VerifyTrade(&s.TradeVerifyData{ TradeNo: req.TradeNo, Method: trade2.MethodAlipay, }) @@ -251,9 +251,9 @@ func RechargePrepareWechat(c *fiber.Ctx) error { if err != nil { return err } - var result *s.TransactionPrepareResult + var result *s.TradeCreateResult err = q.Q.Transaction(func(tx *q.Query) error { - result, err = s.Transaction.PrepareTransaction(tx, authContext.Payload.Id, now, &s.TransactionPrepareData{ + result, err = s.Trade.SendCreateTradeByQrcode(tx, authContext.Payload.Id, now, &s.TradeCreateData{ Subject: "账户充值 - " + amount.StringFixed(2) + "元", Amount: amount, ExpireAt: now.Add(30 * time.Minute), @@ -289,7 +289,7 @@ func RechargeConfirmWechat(c *fiber.Ctx) error { } // 验证支付结果 - result, err := s.Transaction.VerifyTransaction(&s.TransactionVerifyData{ + result, err := s.Trade.VerifyTrade(&s.TradeVerifyData{ TradeNo: req.TradeNo, Method: trade2.MethodWeChat, }) diff --git a/web/handlers/verifier.go b/web/handlers/verifier.go index 04b0406..19639c7 100644 --- a/web/handlers/verifier.go +++ b/web/handlers/verifier.go @@ -17,7 +17,7 @@ type VerifierReq struct { func SmsCode(c *fiber.Ctx) error { - _, err := auth.Protect(c, []auth.PayloadType{auth.PayloadSecuredServer}, []string{}) + _, err := auth.Protect(c, []auth.PayloadType{auth.PayloadInternalServer}, []string{}) if err != nil { return err } diff --git a/web/router.go b/web/router.go index ec4bd0a..ed7c0f9 100644 --- a/web/router.go +++ b/web/router.go @@ -56,6 +56,8 @@ func ApplyRouters(app *fiber.App) { // 交易 trade := api.Group("/trade") trade.Post("/callback/alipay", handlers.AlipayCallback) + trade.Post("/callback/wechat", handlers.WechatPayCallback) + trade.Post("/check", handlers.TradeCheckSSE) // 账单 bill := api.Group("/bill") diff --git a/web/services/resource.go b/web/services/resource.go index 8aa347e..5d117e2 100644 --- a/web/services/resource.go +++ b/web/services/resource.go @@ -87,7 +87,7 @@ func (s *resourceService) CreateResource(uid int32, now time.Time, ser *CreateRe return nil } -func (s *resourceService) PrepareResource(uid int32, now time.Time, method trade2.Method, ser *CreateResourceSerializer) (*TransactionPrepareResult, error) { +func (s *resourceService) PrepareResource(uid int32, now time.Time, method trade2.Method, ser *CreateResourceSerializer) (*TradeCreateResult, error) { data, err := ser.ToData() if err != nil { @@ -98,12 +98,12 @@ func (s *resourceService) PrepareResource(uid int32, now time.Time, method trade amount := data.GetPrice() // 保存到数据库 - var result *TransactionPrepareResult + var result *TradeCreateResult err = q.Q.Transaction(func(q *q.Query) error { var err error // 生成交易订单 - result, err = Transaction.PrepareTransaction(q, uid, now, &TransactionPrepareData{ + result, err = Trade.SendCreateTradeByQrcode(q, uid, now, &TradeCreateData{ Subject: "购买套餐 - " + name, Amount: amount, ExpireAt: time.Now().Add(30 * time.Minute), @@ -140,7 +140,7 @@ func (s *resourceService) PrepareResource(uid int32, now time.Time, method trade return result, nil } -func (s *resourceService) CompleteResource(tradeNo string, now time.Time, opResult ...*TransactionVerifyResult) error { +func (s *resourceService) CompleteResource(tradeNo string, now time.Time, opResult ...*TradeSuccessResult) error { // 获取请求缓存 reqStr, err := g.Redis.Get(context.Background(), tradeNo).Result() @@ -153,12 +153,12 @@ func (s *resourceService) CompleteResource(tradeNo string, now time.Time, opResu } // 检查交易结果 - var rs *TransactionVerifyResult + var rs *TradeSuccessResult if len(opResult) > 0 && opResult[0] != nil { rs = opResult[0] } else { var err error - rs, err = Transaction.VerifyTransaction(&TransactionVerifyData{ + rs, err = Trade.VerifyTrade(&TradeVerifyData{ TradeNo: tradeNo, Method: cache.Method, }) @@ -175,10 +175,19 @@ func (s *resourceService) CompleteResource(tradeNo string, now time.Time, opResu // 保存交易信息 err = q.Q.Transaction(func(q *q.Query) error { + // 完成交易 + _, err = Trade.OnTradeCreated(q, &OnTradeCreateData{ + TradeNo: tradeNo, + TradeSuccessResult: *rs, + }) + if err != nil { + return fmt.Errorf("完成交易失败: %w", err) + } + // 保存套餐 resource, err := createResource(q, cache.Uid, now, data) if err != nil { - return err + return fmt.Errorf("创建套餐失败: %w", err) } // 更新账单 @@ -189,22 +198,13 @@ func (s *resourceService) CompleteResource(tradeNo string, now time.Time, opResu ResourceID: &resource.ID, }) if err != nil { - return err - } - - // 完成交易 - _, err = Transaction.CompleteTransaction(q, &TransactionCompleteData{ - TradeNo: tradeNo, - TransactionVerifyResult: *rs, - }) - if err != nil { - return err + return fmt.Errorf("更新账单失败: %w", err) } // 删除缓存 err = g.Redis.Del(context.Background(), tradeNo).Err() if err != nil { - return err + return fmt.Errorf("删除缓存失败: %w", err) } return nil @@ -229,14 +229,14 @@ func (s *resourceService) CancelResource(tradeNo string, now time.Time, opRevoke // 取消交易 if len(opRevoked) <= 0 { - err = Transaction.RevokeTransaction(tradeNo, cache.Method) + err = Trade.SendCancelTrade(tradeNo, cache.Method) if err != nil { return err } } // 更新订单状态 - err = Transaction.FinishTransaction(q.Q, tradeNo, now) + err = Trade.OnTradeCanceled(q.Q, tradeNo, now) if err != nil { return err } diff --git a/web/services/transaction.go b/web/services/trade.go similarity index 84% rename from web/services/transaction.go rename to web/services/trade.go index 00debd0..5b40ddc 100644 --- a/web/services/transaction.go +++ b/web/services/trade.go @@ -16,6 +16,7 @@ import ( "platform/web/globals/orm" m "platform/web/models" q "platform/web/queries" + "platform/web/tasks" "time" "github.com/smartwalle/alipay/v3" @@ -23,12 +24,12 @@ import ( "gorm.io/gorm" ) -var Transaction = &transactionService{} +var Trade = &tradeService{} -type transactionService struct { +type tradeService struct { } -func (s *transactionService) PrepareTransaction(q *q.Query, uid int32, now time.Time, data *TransactionPrepareData) (*TransactionPrepareResult, error) { +func (s *tradeService) SendCreateTradeByQrcode(q *q.Query, uid int32, now time.Time, data *TradeCreateData) (*TradeCreateResult, error) { var subject = data.Subject var expire = data.ExpireAt var tType = data.Type @@ -184,15 +185,118 @@ func (s *transactionService) PrepareTransaction(q *q.Query, uid int32, now time. return nil, err } - return &TransactionPrepareResult{ + // 提交异步任务更新订单状态 + _, err = g.Asynq.Enqueue(tasks.NewUpdateTrade(tradeNo, method)) + if err != nil { + return nil, err + } + + return &TradeCreateResult{ TradeNo: tradeNo, PayURL: payUrl, Bill: &bill, Trade: &trade, }, nil } +func (s *tradeService) SendCreateTradeByRedirect() { + panic("todo") +} -func (s *transactionService) VerifyTransaction(data *TransactionVerifyData) (*TransactionVerifyResult, error) { +func (s *tradeService) OnTradeCreated(q *q.Query, data *OnTradeCreateData) (*m.Trade, error) { + var transId = data.TransId + var tradeNo = data.TradeNo + var payment = data.Payment + var paidAt = data.Time + + // 获取交易信息 + trade, err := q.Trade. + Where(q.Trade.InnerNo.Eq(tradeNo)). + First() + if err != nil { + return nil, err + } + + // 检查交易状态 + switch trade2.Status(trade.Status) { + + // 如果已退款或取消,则返回错误 + case trade2.StatusCanceled, trade2.StatusRefunded: + return nil, errors.New("交易已取消或已退款") + + // 如果是未支付,则更新支付状态 + case trade2.StatusPending: + trade.Status = int32(trade2.StatusSuccess) + trade.OuterNo = &transId + trade.Payment = payment + trade.PaidAt = u.P(orm.LocalDateTime(paidAt)) + trade.PayURL = u.P("") + _, err = q.Trade.Updates(trade) + if err != nil { + return nil, err + } + case trade2.StatusSuccess: + } + + return trade, nil +} + +func (s *tradeService) SendCancelTrade(tradeNo string, method trade2.Method) error { + + switch method { + + case trade2.MethodAlipay: + resp, err := g.Alipay.TradeCancel(context.Background(), alipay.TradeCancel{ + OutTradeNo: tradeNo, + }) + if err != nil { + return err + } + if resp.Code != alipay.CodeSuccess { + slog.Warn("支付宝交易取消失败", "code", resp.Code, "sub_code", resp.SubCode, "msg", resp.Msg) + return errors.New("交易取消失败") + } + + case trade2.MethodWeChat: + resp, err := g.WechatPay.Native.CloseOrder(context.Background(), native.CloseOrderRequest{ + Mchid: &env.WechatPayMchId, + OutTradeNo: &tradeNo, + }) + if err != nil { + return err + } + if resp.Response.StatusCode != http.StatusNoContent { + body, _ := io.ReadAll(resp.Response.Body) + slog.Warn("微信交易取消失败", "code", resp.Response.StatusCode, "body", string(body)) + return errors.New("交易取消失败") + } + } + + return nil +} +func (s *tradeService) OnTradeCanceled(q *q.Query, tradeNo string, now time.Time) error { + _, err := q.Trade. + Where(q.Trade.InnerNo.Eq(tradeNo)). + Select(q.Trade.Status, q.Trade.CancelAt, q.Trade.PayURL). + Updates(m.Trade{ + Status: int32(trade2.StatusCanceled), + CancelAt: u.P(orm.LocalDateTime(now)), + PayURL: u.P(""), + }) + if err != nil { + return err + } + + return nil +} + +func (s *tradeService) SendRefundTrade(tradeNo string, method trade2.Method) error { + panic("todo") +} +func (s *tradeService) OnTradeRefunded(q *q.Query, tradeNo string, now time.Time) error { + panic("todo") +} + +func (s *tradeService) VerifyTrade(data *TradeVerifyData) (*TradeSuccessResult, error) { var tradeNo = data.TradeNo var method = data.Method @@ -253,104 +357,14 @@ func (s *transactionService) VerifyTransaction(data *TransactionVerifyData) (*Tr return nil, ErrTransactionNotSupported } - return &TransactionVerifyResult{ + return &TradeSuccessResult{ TransId: transId, Payment: payment, Time: paidAt, }, nil } -func (s *transactionService) CompleteTransaction(q *q.Query, data *TransactionCompleteData) (*TransactionCompleteResult, error) { - var transId = data.TransId - var tradeNo = data.TradeNo - var payment = data.Payment - var paidAt = data.Time - - // 获取交易信息 - trade, err := q.Trade. - Where(q.Trade.InnerNo.Eq(tradeNo)). - First() - if err != nil { - return nil, err - } - - // 检查交易状态 - switch trade2.Status(trade.Status) { - - // 如果已退款或取消,则返回错误 - case trade2.StatusCanceled, trade2.StatusRefunded: - return nil, errors.New("交易已取消或已退款") - - // 如果是未支付,则更新支付状态 - case trade2.StatusPending: - trade.Status = int32(trade2.StatusSuccess) - trade.OuterNo = &transId - trade.Payment = payment - trade.PaidAt = u.P(orm.LocalDateTime(paidAt)) - trade.PayURL = u.P("") - _, err = q.Trade.Updates(trade) - if err != nil { - return nil, err - } - case trade2.StatusSuccess: - } - - return &TransactionCompleteResult{ - Trade: trade, - }, nil -} - -func (s *transactionService) RevokeTransaction(tradeNo string, method trade2.Method) error { - - switch method { - - case trade2.MethodAlipay: - resp, err := g.Alipay.TradeCancel(context.Background(), alipay.TradeCancel{ - OutTradeNo: tradeNo, - }) - if err != nil { - return err - } - if resp.Code != alipay.CodeSuccess { - slog.Warn("支付宝交易取消失败", "code", resp.Code, "sub_code", resp.SubCode, "msg", resp.Msg) - return errors.New("交易取消失败") - } - - case trade2.MethodWeChat: - resp, err := g.WechatPay.Native.CloseOrder(context.Background(), native.CloseOrderRequest{ - Mchid: &env.WechatPayMchId, - OutTradeNo: &tradeNo, - }) - if err != nil { - return err - } - if resp.Response.StatusCode != http.StatusNoContent { - body, _ := io.ReadAll(resp.Response.Body) - slog.Warn("微信交易取消失败", "code", resp.Response.StatusCode, "body", string(body)) - return errors.New("交易取消失败") - } - } - - return nil -} - -func (s *transactionService) FinishTransaction(q *q.Query, tradeNo string, now time.Time) error { - _, err := q.Trade. - Where(q.Trade.InnerNo.Eq(tradeNo)). - Select(q.Trade.Status, q.Trade.CancelAt, q.Trade.PayURL). - Updates(m.Trade{ - Status: int32(trade2.StatusCanceled), - CancelAt: u.P(orm.LocalDateTime(now)), - PayURL: u.P(""), - }) - if err != nil { - return err - } - - return nil -} - -type TransactionPrepareData struct { +type TradeCreateData struct { Subject string Amount decimal.Decimal ExpireAt time.Time @@ -359,40 +373,44 @@ type TransactionPrepareData struct { CouponCode string } -type TransactionPrepareResult struct { +type TradeCreateResult struct { TradeNo string PayURL string Bill *m.Bill Trade *m.Trade } -type TransactionVerifyData struct { +type TradeVerifyData struct { TradeNo string Method trade2.Method } -type TransactionVerifyResult struct { +type TradeSuccessResult struct { TransId string Payment decimal.Decimal Time time.Time } -type TransactionCompleteData struct { +type OnTradeCreateData struct { TradeNo string - TransactionVerifyResult + TradeSuccessResult } -type TransactionCompleteResult struct { - Trade *m.Trade -} +type TradeResult int -type TransactionErr string +const ( + TradeSuccess TradeResult = iota + 1 + TradeCanceled + TradeClosed +) -func (e TransactionErr) Error() string { +type TradeErr string + +func (e TradeErr) Error() string { return string(e) } var ( - ErrTransactionNotPaid = TransactionErr("交易未支付") - ErrTransactionNotSupported = TransactionErr("不支持的支付方式") + ErrTransactionNotPaid = TradeErr("交易未支付") + ErrTransactionNotSupported = TradeErr("不支持的支付方式") ) diff --git a/web/services/user.go b/web/services/user.go index b7a0cf9..c97a373 100644 --- a/web/services/user.go +++ b/web/services/user.go @@ -8,14 +8,14 @@ var User = &userService{} type userService struct{} -func (s *userService) RechargeConfirm(tradeNo string, verified *TransactionVerifyResult) error { +func (s *userService) RechargeConfirm(tradeNo string, verified *TradeSuccessResult) error { err := q.Q.Transaction(func(tx *q.Query) error { // 更新交易状态 - result, err := Transaction.CompleteTransaction(tx, &TransactionCompleteData{ - TradeNo: tradeNo, - TransactionVerifyResult: *verified, + trade, err := Trade.OnTradeCreated(tx, &OnTradeCreateData{ + TradeNo: tradeNo, + TradeSuccessResult: *verified, }) if err != nil { return err @@ -23,14 +23,14 @@ func (s *userService) RechargeConfirm(tradeNo string, verified *TransactionVerif // 更新用户余额 user, err := tx.User. - Where(tx.User.ID.Eq(result.Trade.UserID)).Take() + Where(tx.User.ID.Eq(trade.UserID)).Take() if err != nil { return err } _, err = tx.User. Where(tx.User.ID.Eq(user.ID)). - UpdateSimple(tx.User.Balance.Value(user.Balance.Add(result.Trade.Amount))) + UpdateSimple(tx.User.Balance.Value(user.Balance.Add(trade.Amount))) if err != nil { return err } diff --git a/web/tasks/trade.go b/web/tasks/trade.go new file mode 100644 index 0000000..accb0c8 --- /dev/null +++ b/web/tasks/trade.go @@ -0,0 +1,19 @@ +package tasks + +import ( + "encoding/json" + "github.com/hibiken/asynq" + "log/slog" + trade2 "platform/web/domains/trade" +) + +const UpdateTrade = "trade:update" + +func NewUpdateTrade(tradeNo string, method trade2.Method) *asynq.Task { + bytes, err := json.Marshal(tradeNo) + if err != nil { + slog.Error("序列化更新交易任务失败", "error", err) + return nil + } + return asynq.NewTask(UpdateTrade, bytes) +}