修改交易服务函数命名;调整 docker compose 脚本;
This commit is contained in:
2
.gitignore
vendored
2
.gitignore
vendored
@@ -17,5 +17,5 @@ bin/
|
|||||||
scripts/*
|
scripts/*
|
||||||
!scripts/env/
|
!scripts/env/
|
||||||
!scripts/env/dev/
|
!scripts/env/dev/
|
||||||
!scripts/env/pre/
|
!scripts/pre/
|
||||||
!scripts/sql/
|
!scripts/sql/
|
||||||
|
|||||||
@@ -1,7 +1,5 @@
|
|||||||
## TODO
|
## TODO
|
||||||
|
|
||||||
tasks 取消交易时,需要判断错误的类型,如果是因为支付已完成导致的取消,则正常结束流程
|
|
||||||
|
|
||||||
pre 环境屏蔽外部配置后启动,主要用于检查和配置采集器
|
pre 环境屏蔽外部配置后启动,主要用于检查和配置采集器
|
||||||
|
|
||||||
支付回调需要判断可能重复调用的情况
|
支付回调需要判断可能重复调用的情况
|
||||||
@@ -9,14 +7,11 @@ pre 环境屏蔽外部配置后启动,主要用于检查和配置采集器
|
|||||||
实现订单状态查询的 SSE 接口
|
实现订单状态查询的 SSE 接口
|
||||||
|
|
||||||
|
|
||||||
|
|
||||||
### 长期
|
### 长期
|
||||||
|
|
||||||
所有 domain 实现 NewXXModel 方法,约束模型创建字段,以防止数据库可空声明变化
|
|
||||||
|
|
||||||
更新支付状态后,缓存结果以便查询
|
更新支付状态后,缓存结果以便查询
|
||||||
|
|
||||||
考虑将重复量比较大的异步任务修改成定时调度任务
|
|
||||||
|
|
||||||
模型字段修改,特定枚举字段使用自定义类型代替通用 int32
|
模型字段修改,特定枚举字段使用自定义类型代替通用 int32
|
||||||
|
|
||||||
proxy 网关更新接口可以传输更结构化的数据,直接区分不同类型以加快更新速度
|
proxy 网关更新接口可以传输更结构化的数据,直接区分不同类型以加快更新速度
|
||||||
|
|||||||
2
go.mod
2
go.mod
@@ -1,6 +1,6 @@
|
|||||||
module platform
|
module platform
|
||||||
|
|
||||||
go 1.24.0
|
go 1.24.5
|
||||||
|
|
||||||
require (
|
require (
|
||||||
github.com/alibabacloud-go/darabonba-openapi/v2 v2.1.7
|
github.com/alibabacloud-go/darabonba-openapi/v2 v2.1.7
|
||||||
|
|||||||
@@ -21,7 +21,7 @@ services:
|
|||||||
|
|
||||||
platform:
|
platform:
|
||||||
build:
|
build:
|
||||||
context: ../../..
|
context: ../..
|
||||||
dockerfile: Dockerfile
|
dockerfile: Dockerfile
|
||||||
environment:
|
environment:
|
||||||
- RUN_MODE=production
|
- RUN_MODE=production
|
||||||
@@ -83,15 +83,10 @@ func TradeComplete(c *fiber.Ctx) error {
|
|||||||
}
|
}
|
||||||
|
|
||||||
// 检查订单状态
|
// 检查订单状态
|
||||||
result, err := s.Trade.ConfirmTradeCompleted(&s.CheckTradeData{
|
err = s.Trade.CompleteTrade(&s.CheckTradeData{
|
||||||
TradeNo: req.TradeNo,
|
TradeNo: req.TradeNo,
|
||||||
Method: req.Method,
|
Method: req.Method,
|
||||||
})
|
})
|
||||||
if err != nil {
|
|
||||||
return err
|
|
||||||
}
|
|
||||||
|
|
||||||
err = s.Trade.OnTradeCompleted(&s.OnTradeCompletedData{req.TradeNo, *result})
|
|
||||||
if err != nil {
|
if err != nil {
|
||||||
slog.Error("完成交易失败", "trade_no", req.TradeNo, "method", req.Method, "error", err)
|
slog.Error("完成交易失败", "trade_no", req.TradeNo, "method", req.Method, "error", err)
|
||||||
return c.Status(fiber.StatusInternalServerError).JSON(fiber.Map{"error": "完成交易失败"})
|
return c.Status(fiber.StatusInternalServerError).JSON(fiber.Map{"error": "完成交易失败"})
|
||||||
|
|||||||
@@ -4,8 +4,6 @@ import (
|
|||||||
"context"
|
"context"
|
||||||
"errors"
|
"errors"
|
||||||
"fmt"
|
"fmt"
|
||||||
"github.com/shopspring/decimal"
|
|
||||||
wecahtpaycore "github.com/wechatpay-apiv3/wechatpay-go/core"
|
|
||||||
"io"
|
"io"
|
||||||
"log/slog"
|
"log/slog"
|
||||||
"net/http"
|
"net/http"
|
||||||
@@ -21,16 +19,14 @@ import (
|
|||||||
"platform/web/tasks"
|
"platform/web/tasks"
|
||||||
"time"
|
"time"
|
||||||
|
|
||||||
|
"github.com/shopspring/decimal"
|
||||||
|
wecahtpaycore "github.com/wechatpay-apiv3/wechatpay-go/core"
|
||||||
|
|
||||||
"github.com/smartwalle/alipay/v3"
|
"github.com/smartwalle/alipay/v3"
|
||||||
"github.com/wechatpay-apiv3/wechatpay-go/services/payments/native"
|
"github.com/wechatpay-apiv3/wechatpay-go/services/payments/native"
|
||||||
"gorm.io/gorm"
|
"gorm.io/gorm"
|
||||||
)
|
)
|
||||||
|
|
||||||
var ComplementEvents = []trade2.CompleteEvent{
|
|
||||||
ResourceOnTradeComplete{},
|
|
||||||
UserOnTradeComplete{},
|
|
||||||
}
|
|
||||||
|
|
||||||
var Trade = &tradeService{}
|
var Trade = &tradeService{}
|
||||||
|
|
||||||
type tradeService struct {
|
type tradeService struct {
|
||||||
@@ -258,28 +254,51 @@ func (s *tradeService) CreateTrade(uid int32, now time.Time, data *CreateTradeDa
|
|||||||
}, nil
|
}, nil
|
||||||
}
|
}
|
||||||
|
|
||||||
func (s *tradeService) OnTradeCompleted(data *OnTradeCompletedData) error {
|
func (s *tradeService) CompleteTrade(data *CheckTradeData) error {
|
||||||
|
return g.Redsync.WithLock(tradeLockKey(data.TradeNo), func() error {
|
||||||
|
|
||||||
|
// 检查订单状态
|
||||||
|
result, err := s.ConfirmTradeCompleted(data)
|
||||||
|
if err != nil {
|
||||||
|
return core.NewServErr("确认交易状态失败", err)
|
||||||
|
}
|
||||||
|
|
||||||
// 更新交易状态
|
// 更新交易状态
|
||||||
var trade = new(m.Trade)
|
trade, err := completeTrade(&OnTradeCompletedData{data.TradeNo, *result})
|
||||||
var err = g.Redsync.WithLock(tradeLockKey(data.TradeNo), func() error {
|
|
||||||
return q.Q.Transaction(func(q *q.Query) (err error) {
|
|
||||||
trade, err = completeTrade(q, data)
|
|
||||||
return
|
|
||||||
})
|
|
||||||
})
|
|
||||||
if err != nil {
|
if err != nil {
|
||||||
return core.NewServErr("处理交易失败", err)
|
return core.NewServErr("处理交易失败", err)
|
||||||
}
|
}
|
||||||
|
|
||||||
// 处理交易完成事件
|
// 处理交易完成事件
|
||||||
err = completeTradeAfter(trade)
|
err = afterTradeComplete(trade)
|
||||||
if err != nil {
|
if err != nil {
|
||||||
return core.NewServErr("处理交易完成事件失败", err)
|
return core.NewServErr("处理交易完成事件失败", err)
|
||||||
}
|
}
|
||||||
|
|
||||||
return nil
|
return nil
|
||||||
|
})
|
||||||
}
|
}
|
||||||
func completeTrade(q *q.Query, data *OnTradeCompletedData) (*m.Trade, error) {
|
func (s *tradeService) OnTradeCompleted(data *OnTradeCompletedData) error {
|
||||||
|
return g.Redsync.WithLock(tradeLockKey(data.TradeNo), func() error {
|
||||||
|
|
||||||
|
// 更新交易状态
|
||||||
|
trade, err := completeTrade(data)
|
||||||
|
if err != nil {
|
||||||
|
return core.NewServErr("处理交易失败", err)
|
||||||
|
}
|
||||||
|
|
||||||
|
// 处理交易完成事件
|
||||||
|
err = afterTradeComplete(trade)
|
||||||
|
if err != nil {
|
||||||
|
return core.NewServErr("处理交易完成事件失败", err)
|
||||||
|
}
|
||||||
|
|
||||||
|
return nil
|
||||||
|
})
|
||||||
|
}
|
||||||
|
func completeTrade(data *OnTradeCompletedData) (*m.Trade, error) {
|
||||||
|
var trade = new(m.Trade)
|
||||||
|
var err = q.Q.Transaction(func(tx *q.Query) error {
|
||||||
var tradeNo = data.TradeNo
|
var tradeNo = data.TradeNo
|
||||||
var transId = data.TransId
|
var transId = data.TransId
|
||||||
var payment = data.Payment
|
var payment = data.Payment
|
||||||
@@ -291,15 +310,15 @@ func completeTrade(q *q.Query, data *OnTradeCompletedData) (*m.Trade, error) {
|
|||||||
Where(q.Trade.InnerNo.Eq(tradeNo)).
|
Where(q.Trade.InnerNo.Eq(tradeNo)).
|
||||||
Take()
|
Take()
|
||||||
if err != nil {
|
if err != nil {
|
||||||
return nil, core.NewBizErr("获取交易信息失败", err)
|
return core.NewBizErr("获取交易信息失败", err)
|
||||||
}
|
}
|
||||||
|
|
||||||
// 检查交易状态
|
// 检查交易状态
|
||||||
switch trade2.Status(trade.Status) {
|
switch trade2.Status(trade.Status) {
|
||||||
case trade2.StatusCanceled:
|
case trade2.StatusCanceled:
|
||||||
return nil, core.NewBizErr("交易已取消")
|
return core.NewBizErr("交易已取消")
|
||||||
case trade2.StatusSuccess:
|
case trade2.StatusSuccess:
|
||||||
return nil, core.NewBizErr("交易已完成")
|
return core.NewBizErr("交易已完成")
|
||||||
case trade2.StatusPending:
|
case trade2.StatusPending:
|
||||||
}
|
}
|
||||||
|
|
||||||
@@ -313,12 +332,14 @@ func completeTrade(q *q.Query, data *OnTradeCompletedData) (*m.Trade, error) {
|
|||||||
Where(q.Trade.InnerNo.Eq(tradeNo)).
|
Where(q.Trade.InnerNo.Eq(tradeNo)).
|
||||||
Updates(trade)
|
Updates(trade)
|
||||||
if err != nil {
|
if err != nil {
|
||||||
return nil, core.NewServErr("更新交易信息失败", err)
|
return core.NewServErr("更新交易信息失败", err)
|
||||||
}
|
}
|
||||||
|
|
||||||
return trade, nil
|
return nil
|
||||||
|
})
|
||||||
|
return trade, err
|
||||||
}
|
}
|
||||||
func completeTradeAfter(trade *m.Trade) error {
|
func afterTradeComplete(trade *m.Trade) error {
|
||||||
|
|
||||||
// 恢复购买信息
|
// 恢复购买信息
|
||||||
productData, err := g.Redis.Get(context.Background(), tradeProductKey(trade.InnerNo)).Result()
|
productData, err := g.Redis.Get(context.Background(), tradeProductKey(trade.InnerNo)).Result()
|
||||||
@@ -327,6 +348,11 @@ func completeTradeAfter(trade *m.Trade) error {
|
|||||||
}
|
}
|
||||||
|
|
||||||
// 执行资源创建
|
// 执行资源创建
|
||||||
|
var ComplementEvents = []trade2.CompleteEvent{
|
||||||
|
ResourceOnTradeComplete{},
|
||||||
|
UserOnTradeComplete{},
|
||||||
|
}
|
||||||
|
|
||||||
for _, event := range ComplementEvents {
|
for _, event := range ComplementEvents {
|
||||||
info, ok := event.Check(trade2.Type(trade.Type))
|
info, ok := event.Check(trade2.Type(trade.Type))
|
||||||
if !ok {
|
if !ok {
|
||||||
@@ -348,7 +374,7 @@ func completeTradeAfter(trade *m.Trade) error {
|
|||||||
}
|
}
|
||||||
|
|
||||||
func (s *tradeService) CancelTrade(tradeNo string, method trade2.Method, now time.Time) error {
|
func (s *tradeService) CancelTrade(tradeNo string, method trade2.Method, now time.Time) error {
|
||||||
err := g.Redsync.WithLock(tradeLockKey(tradeNo), func() error {
|
return g.Redsync.WithLock(tradeLockKey(tradeNo), func() error {
|
||||||
switch method {
|
switch method {
|
||||||
|
|
||||||
case trade2.MethodAlipay:
|
case trade2.MethodAlipay:
|
||||||
@@ -356,11 +382,11 @@ func (s *tradeService) CancelTrade(tradeNo string, method trade2.Method, now tim
|
|||||||
OutTradeNo: tradeNo,
|
OutTradeNo: tradeNo,
|
||||||
})
|
})
|
||||||
if err != nil {
|
if err != nil {
|
||||||
return err
|
return core.NewServErr("上游取消交易失败", err)
|
||||||
}
|
}
|
||||||
if resp.Code != alipay.CodeSuccess {
|
if resp.Code != alipay.CodeSuccess {
|
||||||
slog.Warn("支付宝交易取消失败", "code", resp.Code, "sub_code", resp.SubCode, "msg", resp.Msg)
|
slog.Error("支付宝交易取消失败", "code", resp.Code, "sub_code", resp.SubCode, "msg", resp.Msg)
|
||||||
return errors.New("交易取消失败")
|
return errors.New("上游取消交易失败")
|
||||||
}
|
}
|
||||||
|
|
||||||
case trade2.MethodWeChat:
|
case trade2.MethodWeChat:
|
||||||
@@ -369,12 +395,16 @@ func (s *tradeService) CancelTrade(tradeNo string, method trade2.Method, now tim
|
|||||||
OutTradeNo: &tradeNo,
|
OutTradeNo: &tradeNo,
|
||||||
})
|
})
|
||||||
if err != nil {
|
if err != nil {
|
||||||
return err
|
return core.NewServErr("上游取消交易失败", err)
|
||||||
}
|
}
|
||||||
if resp.Response.StatusCode != http.StatusNoContent {
|
if resp.Response.StatusCode != http.StatusNoContent {
|
||||||
body, _ := io.ReadAll(resp.Response.Body)
|
body, err := io.ReadAll(resp.Response.Body)
|
||||||
slog.Warn("微信交易取消失败", "code", resp.Response.StatusCode, "body", string(body))
|
if err != nil {
|
||||||
return errors.New("交易取消失败")
|
slog.Error("读取微信交易取消响应失败", "error", err)
|
||||||
|
return core.NewServErr("上游取消交易失败", err)
|
||||||
|
}
|
||||||
|
slog.Error("微信交易取消失败", "code", resp.Response.StatusCode, "body", string(body))
|
||||||
|
return errors.New("上游取消交易失败")
|
||||||
}
|
}
|
||||||
|
|
||||||
case trade2.MethodSft, trade2.MethodSftAlipay, trade2.MethodSftWeChat:
|
case trade2.MethodSft, trade2.MethodSftAlipay, trade2.MethodSftWeChat:
|
||||||
@@ -390,31 +420,25 @@ func (s *tradeService) CancelTrade(tradeNo string, method trade2.Method, now tim
|
|||||||
return ErrTransactionNotSupported
|
return ErrTransactionNotSupported
|
||||||
}
|
}
|
||||||
|
|
||||||
err := q.Q.Transaction(func(q *q.Query) error {
|
err := cancelTrade(tradeNo, now)
|
||||||
return cancelTrade(q, tradeNo, now)
|
|
||||||
})
|
|
||||||
if err != nil {
|
if err != nil {
|
||||||
return err
|
return err
|
||||||
}
|
}
|
||||||
|
|
||||||
return nil
|
return nil
|
||||||
})
|
})
|
||||||
if err != nil {
|
|
||||||
return core.NewServErr("处理交易取消失败", err)
|
|
||||||
}
|
}
|
||||||
return nil
|
func (s *tradeService) OnTradeCanceled(tradeNo string, now time.Time) error {
|
||||||
}
|
|
||||||
func (s *tradeService) OnTradeCanceled(q *q.Query, tradeNo string, now time.Time) error {
|
|
||||||
err := g.Redsync.WithLock(tradeLockKey(tradeNo), func() error {
|
err := g.Redsync.WithLock(tradeLockKey(tradeNo), func() error {
|
||||||
return cancelTrade(q, tradeNo, now)
|
return cancelTrade(tradeNo, now)
|
||||||
})
|
})
|
||||||
if err != nil {
|
if err != nil {
|
||||||
return core.NewServErr("处理交易取消失败", err)
|
return core.NewServErr("处理交易取消失败", err)
|
||||||
}
|
}
|
||||||
|
|
||||||
return nil
|
return nil
|
||||||
}
|
}
|
||||||
func cancelTrade(q *q.Query, tradeNo string, now time.Time) error {
|
func cancelTrade(tradeNo string, now time.Time) error {
|
||||||
|
return q.Q.Transaction(func(q *q.Query) error {
|
||||||
// 获取交易信息
|
// 获取交易信息
|
||||||
var status trade2.Status
|
var status trade2.Status
|
||||||
err := q.Trade.
|
err := q.Trade.
|
||||||
@@ -445,6 +469,7 @@ func cancelTrade(q *q.Query, tradeNo string, now time.Time) error {
|
|||||||
return core.NewServErr("更新交易状态失败", err)
|
return core.NewServErr("更新交易状态失败", err)
|
||||||
}
|
}
|
||||||
return nil
|
return nil
|
||||||
|
})
|
||||||
}
|
}
|
||||||
|
|
||||||
func (s *tradeService) RefundTrade(tradeNo string, method trade2.Method) error {
|
func (s *tradeService) RefundTrade(tradeNo string, method trade2.Method) error {
|
||||||
|
|||||||
33
web/tasks/log.go
Normal file
33
web/tasks/log.go
Normal file
@@ -0,0 +1,33 @@
|
|||||||
|
package tasks
|
||||||
|
|
||||||
|
import (
|
||||||
|
"encoding/json"
|
||||||
|
"github.com/hibiken/asynq"
|
||||||
|
"log/slog"
|
||||||
|
"time"
|
||||||
|
)
|
||||||
|
|
||||||
|
type RequestLog struct {
|
||||||
|
Type string
|
||||||
|
User int32
|
||||||
|
IP string
|
||||||
|
UA string
|
||||||
|
Method string
|
||||||
|
Path string
|
||||||
|
Status int
|
||||||
|
Error string
|
||||||
|
Latency time.Duration
|
||||||
|
Time time.Time
|
||||||
|
}
|
||||||
|
|
||||||
|
func NewRequestLog(data RequestLog) *asynq.Task {
|
||||||
|
var rs, err = json.Marshal(data)
|
||||||
|
if err != nil {
|
||||||
|
slog.Error("日志数据序列化失败", slog.Any("err", err), slog.Any("data", data))
|
||||||
|
return nil
|
||||||
|
}
|
||||||
|
return asynq.NewTask("logs:request", rs)
|
||||||
|
}
|
||||||
|
|
||||||
|
type LoginLog struct {
|
||||||
|
}
|
||||||
Reference in New Issue
Block a user