diff --git a/.gitignore b/.gitignore index 386239d..b0a9201 100644 --- a/.gitignore +++ b/.gitignore @@ -17,5 +17,5 @@ bin/ scripts/* !scripts/env/ !scripts/env/dev/ -!scripts/env/pre/ +!scripts/pre/ !scripts/sql/ diff --git a/README.md b/README.md index 76ca66f..f801f3f 100644 --- a/README.md +++ b/README.md @@ -1,7 +1,5 @@ ## TODO -tasks 取消交易时,需要判断错误的类型,如果是因为支付已完成导致的取消,则正常结束流程 - pre 环境屏蔽外部配置后启动,主要用于检查和配置采集器 支付回调需要判断可能重复调用的情况 @@ -9,14 +7,11 @@ pre 环境屏蔽外部配置后启动,主要用于检查和配置采集器 实现订单状态查询的 SSE 接口 + ### 长期 -所有 domain 实现 NewXXModel 方法,约束模型创建字段,以防止数据库可空声明变化 - 更新支付状态后,缓存结果以便查询 -考虑将重复量比较大的异步任务修改成定时调度任务 - 模型字段修改,特定枚举字段使用自定义类型代替通用 int32 proxy 网关更新接口可以传输更结构化的数据,直接区分不同类型以加快更新速度 diff --git a/scripts/env/dev/docker-compose.yaml b/docker-compose.yaml similarity index 100% rename from scripts/env/dev/docker-compose.yaml rename to docker-compose.yaml diff --git a/go.mod b/go.mod index 48ce59d..1c3d587 100644 --- a/go.mod +++ b/go.mod @@ -1,6 +1,6 @@ module platform -go 1.24.0 +go 1.24.5 require ( github.com/alibabacloud-go/darabonba-openapi/v2 v2.1.7 diff --git a/scripts/env/pre/docker-compose.yaml b/scripts/pre/docker-compose.yaml similarity index 97% rename from scripts/env/pre/docker-compose.yaml rename to scripts/pre/docker-compose.yaml index f577827..9d74f69 100644 --- a/scripts/env/pre/docker-compose.yaml +++ b/scripts/pre/docker-compose.yaml @@ -21,7 +21,7 @@ services: platform: build: - context: ../../.. + context: ../.. dockerfile: Dockerfile environment: - RUN_MODE=production diff --git a/scripts/env/pre/vector/vector.toml b/scripts/pre/vector/vector.toml similarity index 100% rename from scripts/env/pre/vector/vector.toml rename to scripts/pre/vector/vector.toml diff --git a/web/handlers/trade.go b/web/handlers/trade.go index a4bc4b4..e1b9ff8 100644 --- a/web/handlers/trade.go +++ b/web/handlers/trade.go @@ -83,15 +83,10 @@ func TradeComplete(c *fiber.Ctx) error { } // 检查订单状态 - result, err := s.Trade.ConfirmTradeCompleted(&s.CheckTradeData{ + err = s.Trade.CompleteTrade(&s.CheckTradeData{ TradeNo: req.TradeNo, Method: req.Method, }) - if err != nil { - return err - } - - err = s.Trade.OnTradeCompleted(&s.OnTradeCompletedData{req.TradeNo, *result}) if err != nil { slog.Error("完成交易失败", "trade_no", req.TradeNo, "method", req.Method, "error", err) return c.Status(fiber.StatusInternalServerError).JSON(fiber.Map{"error": "完成交易失败"}) diff --git a/web/services/trade.go b/web/services/trade.go index 9efc7a1..bc363d2 100644 --- a/web/services/trade.go +++ b/web/services/trade.go @@ -4,8 +4,6 @@ import ( "context" "errors" "fmt" - "github.com/shopspring/decimal" - wecahtpaycore "github.com/wechatpay-apiv3/wechatpay-go/core" "io" "log/slog" "net/http" @@ -21,16 +19,14 @@ import ( "platform/web/tasks" "time" + "github.com/shopspring/decimal" + wecahtpaycore "github.com/wechatpay-apiv3/wechatpay-go/core" + "github.com/smartwalle/alipay/v3" "github.com/wechatpay-apiv3/wechatpay-go/services/payments/native" "gorm.io/gorm" ) -var ComplementEvents = []trade2.CompleteEvent{ - ResourceOnTradeComplete{}, - UserOnTradeComplete{}, -} - var Trade = &tradeService{} type tradeService struct { @@ -258,67 +254,92 @@ func (s *tradeService) CreateTrade(uid int32, now time.Time, data *CreateTradeDa }, nil } -func (s *tradeService) OnTradeCompleted(data *OnTradeCompletedData) error { - // 更新交易状态 - var trade = new(m.Trade) - var err = g.Redsync.WithLock(tradeLockKey(data.TradeNo), func() error { - return q.Q.Transaction(func(q *q.Query) (err error) { - trade, err = completeTrade(q, data) - return - }) +func (s *tradeService) CompleteTrade(data *CheckTradeData) error { + return g.Redsync.WithLock(tradeLockKey(data.TradeNo), func() error { + + // 检查订单状态 + result, err := s.ConfirmTradeCompleted(data) + if err != nil { + return core.NewServErr("确认交易状态失败", err) + } + + // 更新交易状态 + trade, err := completeTrade(&OnTradeCompletedData{data.TradeNo, *result}) + if err != nil { + return core.NewServErr("处理交易失败", err) + } + + // 处理交易完成事件 + err = afterTradeComplete(trade) + if err != nil { + return core.NewServErr("处理交易完成事件失败", err) + } + + return nil }) - if err != nil { - return core.NewServErr("处理交易失败", err) - } - - // 处理交易完成事件 - err = completeTradeAfter(trade) - if err != nil { - return core.NewServErr("处理交易完成事件失败", err) - } - - return nil } -func completeTrade(q *q.Query, data *OnTradeCompletedData) (*m.Trade, error) { - var tradeNo = data.TradeNo - var transId = data.TransId - var payment = data.Payment - var acquirer = data.Acquirer - var paidAt = data.Time +func (s *tradeService) OnTradeCompleted(data *OnTradeCompletedData) error { + return g.Redsync.WithLock(tradeLockKey(data.TradeNo), func() error { - // 获取交易信息 - trade, err := q.Trade. - Where(q.Trade.InnerNo.Eq(tradeNo)). - Take() - if err != nil { - return nil, core.NewBizErr("获取交易信息失败", err) - } + // 更新交易状态 + trade, err := completeTrade(data) + if err != nil { + return core.NewServErr("处理交易失败", err) + } - // 检查交易状态 - switch trade2.Status(trade.Status) { - case trade2.StatusCanceled: - return nil, core.NewBizErr("交易已取消") - case trade2.StatusSuccess: - return nil, core.NewBizErr("交易已完成") - case trade2.StatusPending: - } + // 处理交易完成事件 + err = afterTradeComplete(trade) + if err != nil { + return core.NewServErr("处理交易完成事件失败", err) + } - // 更新交易信息 - trade.Status = int32(trade2.StatusSuccess) - trade.OuterNo = &transId - trade.Payment = payment - trade.Acquirer = u.P(int32(acquirer)) - trade.CompletedAt = u.P(orm.LocalDateTime(paidAt)) - _, err = q.Trade. - Where(q.Trade.InnerNo.Eq(tradeNo)). - Updates(trade) - if err != nil { - return nil, core.NewServErr("更新交易信息失败", err) - } - - return trade, nil + return nil + }) } -func completeTradeAfter(trade *m.Trade) error { +func completeTrade(data *OnTradeCompletedData) (*m.Trade, error) { + var trade = new(m.Trade) + var err = q.Q.Transaction(func(tx *q.Query) error { + var tradeNo = data.TradeNo + var transId = data.TransId + var payment = data.Payment + var acquirer = data.Acquirer + var paidAt = data.Time + + // 获取交易信息 + trade, err := q.Trade. + Where(q.Trade.InnerNo.Eq(tradeNo)). + Take() + if err != nil { + return core.NewBizErr("获取交易信息失败", err) + } + + // 检查交易状态 + switch trade2.Status(trade.Status) { + case trade2.StatusCanceled: + return core.NewBizErr("交易已取消") + case trade2.StatusSuccess: + return core.NewBizErr("交易已完成") + case trade2.StatusPending: + } + + // 更新交易信息 + trade.Status = int32(trade2.StatusSuccess) + trade.OuterNo = &transId + trade.Payment = payment + trade.Acquirer = u.P(int32(acquirer)) + trade.CompletedAt = u.P(orm.LocalDateTime(paidAt)) + _, err = q.Trade. + Where(q.Trade.InnerNo.Eq(tradeNo)). + Updates(trade) + if err != nil { + return core.NewServErr("更新交易信息失败", err) + } + + return nil + }) + return trade, err +} +func afterTradeComplete(trade *m.Trade) error { // 恢复购买信息 productData, err := g.Redis.Get(context.Background(), tradeProductKey(trade.InnerNo)).Result() @@ -327,6 +348,11 @@ func completeTradeAfter(trade *m.Trade) error { } // 执行资源创建 + var ComplementEvents = []trade2.CompleteEvent{ + ResourceOnTradeComplete{}, + UserOnTradeComplete{}, + } + for _, event := range ComplementEvents { info, ok := event.Check(trade2.Type(trade.Type)) if !ok { @@ -348,7 +374,7 @@ func completeTradeAfter(trade *m.Trade) error { } func (s *tradeService) CancelTrade(tradeNo string, method trade2.Method, now time.Time) error { - err := g.Redsync.WithLock(tradeLockKey(tradeNo), func() error { + return g.Redsync.WithLock(tradeLockKey(tradeNo), func() error { switch method { case trade2.MethodAlipay: @@ -356,11 +382,11 @@ func (s *tradeService) CancelTrade(tradeNo string, method trade2.Method, now tim OutTradeNo: tradeNo, }) if err != nil { - return err + return core.NewServErr("上游取消交易失败", err) } if resp.Code != alipay.CodeSuccess { - slog.Warn("支付宝交易取消失败", "code", resp.Code, "sub_code", resp.SubCode, "msg", resp.Msg) - return errors.New("交易取消失败") + slog.Error("支付宝交易取消失败", "code", resp.Code, "sub_code", resp.SubCode, "msg", resp.Msg) + return errors.New("上游取消交易失败") } case trade2.MethodWeChat: @@ -369,12 +395,16 @@ func (s *tradeService) CancelTrade(tradeNo string, method trade2.Method, now tim OutTradeNo: &tradeNo, }) if err != nil { - return err + return core.NewServErr("上游取消交易失败", err) } if resp.Response.StatusCode != http.StatusNoContent { - body, _ := io.ReadAll(resp.Response.Body) - slog.Warn("微信交易取消失败", "code", resp.Response.StatusCode, "body", string(body)) - return errors.New("交易取消失败") + body, err := io.ReadAll(resp.Response.Body) + if err != nil { + slog.Error("读取微信交易取消响应失败", "error", err) + return core.NewServErr("上游取消交易失败", err) + } + slog.Error("微信交易取消失败", "code", resp.Response.StatusCode, "body", string(body)) + return errors.New("上游取消交易失败") } case trade2.MethodSft, trade2.MethodSftAlipay, trade2.MethodSftWeChat: @@ -390,61 +420,56 @@ func (s *tradeService) CancelTrade(tradeNo string, method trade2.Method, now tim return ErrTransactionNotSupported } - err := q.Q.Transaction(func(q *q.Query) error { - return cancelTrade(q, tradeNo, now) - }) + err := cancelTrade(tradeNo, now) if err != nil { return err } return nil }) - if err != nil { - return core.NewServErr("处理交易取消失败", err) - } - return nil } -func (s *tradeService) OnTradeCanceled(q *q.Query, tradeNo string, now time.Time) error { +func (s *tradeService) OnTradeCanceled(tradeNo string, now time.Time) error { err := g.Redsync.WithLock(tradeLockKey(tradeNo), func() error { - return cancelTrade(q, tradeNo, now) + return cancelTrade(tradeNo, now) }) if err != nil { return core.NewServErr("处理交易取消失败", err) } - return nil } -func cancelTrade(q *q.Query, tradeNo string, now time.Time) error { - // 获取交易信息 - var status trade2.Status - err := q.Trade. - Where(q.Trade.InnerNo.Eq(tradeNo)). - Select(q.Trade.Status). - Scan(&status) - if err != nil { - return core.NewBizErr("获取交易信息失败", err) - } +func cancelTrade(tradeNo string, now time.Time) error { + return q.Q.Transaction(func(q *q.Query) error { + // 获取交易信息 + var status trade2.Status + err := q.Trade. + Where(q.Trade.InnerNo.Eq(tradeNo)). + Select(q.Trade.Status). + Scan(&status) + if err != nil { + return core.NewBizErr("获取交易信息失败", err) + } - // 检查交易状态 - switch status { - case trade2.StatusCanceled: - return core.NewBizErr("交易已取消") - case trade2.StatusSuccess: - return core.NewBizErr("交易已完成") - case trade2.StatusPending: - } + // 检查交易状态 + switch status { + case trade2.StatusCanceled: + return core.NewBizErr("交易已取消") + case trade2.StatusSuccess: + return core.NewBizErr("交易已完成") + case trade2.StatusPending: + } - // 更新交易状态 - _, err = q.Trade. - Where(q.Trade.InnerNo.Eq(tradeNo)). - UpdateSimple( - q.Trade.Status.Value(int32(trade2.StatusCanceled)), - q.Trade.CanceledAt.Value(orm.LocalDateTime(now)), - ) - if err != nil { - return core.NewServErr("更新交易状态失败", err) - } - return nil + // 更新交易状态 + _, err = q.Trade. + Where(q.Trade.InnerNo.Eq(tradeNo)). + UpdateSimple( + q.Trade.Status.Value(int32(trade2.StatusCanceled)), + q.Trade.CanceledAt.Value(orm.LocalDateTime(now)), + ) + if err != nil { + return core.NewServErr("更新交易状态失败", err) + } + return nil + }) } func (s *tradeService) RefundTrade(tradeNo string, method trade2.Method) error { diff --git a/web/tasks/log.go b/web/tasks/log.go new file mode 100644 index 0000000..182e954 --- /dev/null +++ b/web/tasks/log.go @@ -0,0 +1,33 @@ +package tasks + +import ( + "encoding/json" + "github.com/hibiken/asynq" + "log/slog" + "time" +) + +type RequestLog struct { + Type string + User int32 + IP string + UA string + Method string + Path string + Status int + Error string + Latency time.Duration + Time time.Time +} + +func NewRequestLog(data RequestLog) *asynq.Task { + var rs, err = json.Marshal(data) + if err != nil { + slog.Error("日志数据序列化失败", slog.Any("err", err), slog.Any("data", data)) + return nil + } + return asynq.NewTask("logs:request", rs) +} + +type LoginLog struct { +}