From 2fd17dde22b91d6354ad7b7a244e10d0502f4554 Mon Sep 17 00:00:00 2001 From: luorijun Date: Mon, 23 Jun 2025 18:32:20 +0800 Subject: [PATCH] =?UTF-8?q?=E4=BC=98=E5=8C=96=E4=BA=A4=E6=98=93=E5=8F=96?= =?UTF-8?q?=E6=B6=88=E9=80=BB=E8=BE=91=E5=B9=B6=E6=9B=B4=E6=96=B0=E7=9B=B8?= =?UTF-8?q?=E5=85=B3=E6=95=B0=E6=8D=AE=E7=BB=93=E6=9E=84=EF=BC=9B=E4=BF=AE?= =?UTF-8?q?=E5=A4=8D=E5=A5=97=E9=A4=90=E7=94=A8=E9=87=8F=E7=BB=9F=E8=AE=A1?= =?UTF-8?q?=E6=8E=A5=E5=8F=A3=E9=97=AE=E9=A2=98?= MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit --- README.md | 6 +++--- web/handlers/resource.go | 4 ++-- web/handlers/trade.go | 40 ++++++++++++++++++++++++++-------------- web/router.go | 5 +++++ web/services/trade.go | 5 ++++- web/tasks/trade.go | 13 +++++++++---- 6 files changed, 49 insertions(+), 24 deletions(-) diff --git a/README.md b/README.md index be42bc8..51439e3 100644 --- a/README.md +++ b/README.md @@ -1,5 +1,7 @@ ## TODO +tasks 取消交易时,需要判断错误的类型,如果是因为支付已完成导致的取消,则正常结束流程 + pre 环境屏蔽外部配置后启动,主要用于检查和配置采集器 创建交易订单后添加一个关闭订单的异步任务 @@ -12,11 +14,9 @@ pre 环境屏蔽外部配置后启动,主要用于检查和配置采集器 ### 长期 -异步日志记录,统一收集 - 模型字段修改,特定枚举字段使用自定义类型代替通用 int32 -更新接口可以传输更结构化的数据,直接区分不同类型以加快更新速度 +proxy 网关更新接口可以传输更结构化的数据,直接区分不同类型以加快更新速度 ## 业务逻辑 diff --git a/web/handlers/resource.go b/web/handlers/resource.go index 9d58197..7d2a34f 100644 --- a/web/handlers/resource.go +++ b/web/handlers/resource.go @@ -385,14 +385,14 @@ func StatisticResourceUsage(c *fiber.Ctx) error { err = q.LogsUserUsage. Select( q.LogsUserUsage.Count_.Sum().As("count"), - field.NewField("", "date_trunc('day', time)").As("date"), + field.NewUnsafeFieldRaw("date_trunc('day', time)").As("date"), ). Where( q.LogsUserUsage.UserID.Eq(session.Payload.Id), do, ). Group( - field.NewField("", "date_trunc('day', time)"), + field.NewUnsafeFieldRaw("date_trunc('day', time)"), ). Order( field.NewField("", "date").Desc(), diff --git a/web/handlers/trade.go b/web/handlers/trade.go index 7519aca..18d7f1a 100644 --- a/web/handlers/trade.go +++ b/web/handlers/trade.go @@ -6,10 +6,12 @@ import ( "github.com/valyala/fasthttp/fasthttpadaptor" "log/slog" "net/http" + "platform/web/auth" trade2 "platform/web/domains/trade" g "platform/web/globals" q "platform/web/queries" s "platform/web/services" + "platform/web/tasks" "time" "github.com/gofiber/fiber/v2" @@ -17,8 +19,6 @@ import ( "github.com/wechatpay-apiv3/wechatpay-go/services/payments" ) -// region TradeCheckSSE - func TradeCheckSSE(c *fiber.Ctx) error { // 设置响应头 c.Set("Content-Type", "text/event-stream") @@ -28,9 +28,31 @@ func TradeCheckSSE(c *fiber.Ctx) error { return nil } -// endregion +type TradeCheckReq struct { + tasks.CancelTradeData +} -// region AlipayCallback +func TradeCancelByTask(c *fiber.Ctx) error { + // 检查权限 + _, err := auth.Protect(c, []auth.PayloadType{auth.PayloadInternalServer}, []string{}) + if err != nil { + return err + } + + // 解析请求参数 + req := new(TradeCheckReq) + if err := c.BodyParser(req); err != nil { + return err + } + + // 取消支付 + err = s.Trade.CancelTrade(req.TradeNo, req.Method) + if err != nil { + slog.Warn("取消交易失败", "trade_no", req.TradeNo, "method", req.Method, "error", err) + } + + return c.SendStatus(fiber.StatusNoContent) +} func AlipayCallback(c *fiber.Ctx) (err error) { @@ -121,14 +143,6 @@ func AlipayCallback(c *fiber.Ctx) (err error) { return c.SendString("success") } -func isRefund(notification *alipay.Notification) bool { - return notification.OutBizNo != "" || notification.RefundFee != "" || notification.GmtRefund != "" -} - -// endregion - -// region WechatPayCallback - func WechatPayCallback(c *fiber.Ctx) error { // 解析请求参数 @@ -188,5 +202,3 @@ func WechatPayCallback(c *fiber.Ctx) error { return nil } - -// endregion diff --git a/web/router.go b/web/router.go index 247a18f..cd50e0a 100644 --- a/web/router.go +++ b/web/router.go @@ -82,4 +82,9 @@ func ApplyRouters(app *fiber.App) { app.Get("/test", func(c *fiber.Ctx) error { return core.NewBizErr("测试错误") }) + + // 异步任务客户端 + tasks := api.Group("/tasks") + tasks.Post("/channel/remove", handlers.RemoveChannelByTask) + tasks.Post("/trade/cancel", handlers.TradeCancelByTask) } diff --git a/web/services/trade.go b/web/services/trade.go index 8c5fd06..75c4354 100644 --- a/web/services/trade.go +++ b/web/services/trade.go @@ -233,7 +233,10 @@ func (s *tradeService) CreateTrade(q *q.Query, uid int32, now time.Time, data *T } // 提交异步任务更新订单状态 - _, err = g.Asynq.Enqueue(tasks.NewUpdateTrade(tradeNo, method)) + _, err = g.Asynq.Enqueue(tasks.NewCancelTrade(tasks.CancelTradeData{ + TradeNo: tradeNo, + Method: method, + })) if err != nil { return nil, err } diff --git a/web/tasks/trade.go b/web/tasks/trade.go index accb0c8..4ee25e4 100644 --- a/web/tasks/trade.go +++ b/web/tasks/trade.go @@ -7,13 +7,18 @@ import ( trade2 "platform/web/domains/trade" ) -const UpdateTrade = "trade:update" +const CancelTrade = "trade:update" -func NewUpdateTrade(tradeNo string, method trade2.Method) *asynq.Task { - bytes, err := json.Marshal(tradeNo) +type CancelTradeData struct { + TradeNo string `json:"trade_no" validate:"required"` + Method trade2.Method `json:"method" validate:"required"` +} + +func NewCancelTrade(data CancelTradeData) *asynq.Task { + bytes, err := json.Marshal(data) if err != nil { slog.Error("序列化更新交易任务失败", "error", err) return nil } - return asynq.NewTask(UpdateTrade, bytes) + return asynq.NewTask(CancelTrade, bytes) }