优化交易取消逻辑并更新相关数据结构;修复套餐用量统计接口问题
This commit is contained in:
@@ -1,5 +1,7 @@
|
|||||||
## TODO
|
## TODO
|
||||||
|
|
||||||
|
tasks 取消交易时,需要判断错误的类型,如果是因为支付已完成导致的取消,则正常结束流程
|
||||||
|
|
||||||
pre 环境屏蔽外部配置后启动,主要用于检查和配置采集器
|
pre 环境屏蔽外部配置后启动,主要用于检查和配置采集器
|
||||||
|
|
||||||
创建交易订单后添加一个关闭订单的异步任务
|
创建交易订单后添加一个关闭订单的异步任务
|
||||||
@@ -12,11 +14,9 @@ pre 环境屏蔽外部配置后启动,主要用于检查和配置采集器
|
|||||||
|
|
||||||
### 长期
|
### 长期
|
||||||
|
|
||||||
异步日志记录,统一收集
|
|
||||||
|
|
||||||
模型字段修改,特定枚举字段使用自定义类型代替通用 int32
|
模型字段修改,特定枚举字段使用自定义类型代替通用 int32
|
||||||
|
|
||||||
更新接口可以传输更结构化的数据,直接区分不同类型以加快更新速度
|
proxy 网关更新接口可以传输更结构化的数据,直接区分不同类型以加快更新速度
|
||||||
|
|
||||||
## 业务逻辑
|
## 业务逻辑
|
||||||
|
|
||||||
|
|||||||
@@ -385,14 +385,14 @@ func StatisticResourceUsage(c *fiber.Ctx) error {
|
|||||||
err = q.LogsUserUsage.
|
err = q.LogsUserUsage.
|
||||||
Select(
|
Select(
|
||||||
q.LogsUserUsage.Count_.Sum().As("count"),
|
q.LogsUserUsage.Count_.Sum().As("count"),
|
||||||
field.NewField("", "date_trunc('day', time)").As("date"),
|
field.NewUnsafeFieldRaw("date_trunc('day', time)").As("date"),
|
||||||
).
|
).
|
||||||
Where(
|
Where(
|
||||||
q.LogsUserUsage.UserID.Eq(session.Payload.Id),
|
q.LogsUserUsage.UserID.Eq(session.Payload.Id),
|
||||||
do,
|
do,
|
||||||
).
|
).
|
||||||
Group(
|
Group(
|
||||||
field.NewField("", "date_trunc('day', time)"),
|
field.NewUnsafeFieldRaw("date_trunc('day', time)"),
|
||||||
).
|
).
|
||||||
Order(
|
Order(
|
||||||
field.NewField("", "date").Desc(),
|
field.NewField("", "date").Desc(),
|
||||||
|
|||||||
@@ -6,10 +6,12 @@ import (
|
|||||||
"github.com/valyala/fasthttp/fasthttpadaptor"
|
"github.com/valyala/fasthttp/fasthttpadaptor"
|
||||||
"log/slog"
|
"log/slog"
|
||||||
"net/http"
|
"net/http"
|
||||||
|
"platform/web/auth"
|
||||||
trade2 "platform/web/domains/trade"
|
trade2 "platform/web/domains/trade"
|
||||||
g "platform/web/globals"
|
g "platform/web/globals"
|
||||||
q "platform/web/queries"
|
q "platform/web/queries"
|
||||||
s "platform/web/services"
|
s "platform/web/services"
|
||||||
|
"platform/web/tasks"
|
||||||
"time"
|
"time"
|
||||||
|
|
||||||
"github.com/gofiber/fiber/v2"
|
"github.com/gofiber/fiber/v2"
|
||||||
@@ -17,8 +19,6 @@ import (
|
|||||||
"github.com/wechatpay-apiv3/wechatpay-go/services/payments"
|
"github.com/wechatpay-apiv3/wechatpay-go/services/payments"
|
||||||
)
|
)
|
||||||
|
|
||||||
// region TradeCheckSSE
|
|
||||||
|
|
||||||
func TradeCheckSSE(c *fiber.Ctx) error {
|
func TradeCheckSSE(c *fiber.Ctx) error {
|
||||||
// 设置响应头
|
// 设置响应头
|
||||||
c.Set("Content-Type", "text/event-stream")
|
c.Set("Content-Type", "text/event-stream")
|
||||||
@@ -28,9 +28,31 @@ func TradeCheckSSE(c *fiber.Ctx) error {
|
|||||||
return nil
|
return nil
|
||||||
}
|
}
|
||||||
|
|
||||||
// endregion
|
type TradeCheckReq struct {
|
||||||
|
tasks.CancelTradeData
|
||||||
|
}
|
||||||
|
|
||||||
// region AlipayCallback
|
func TradeCancelByTask(c *fiber.Ctx) error {
|
||||||
|
// 检查权限
|
||||||
|
_, err := auth.Protect(c, []auth.PayloadType{auth.PayloadInternalServer}, []string{})
|
||||||
|
if err != nil {
|
||||||
|
return err
|
||||||
|
}
|
||||||
|
|
||||||
|
// 解析请求参数
|
||||||
|
req := new(TradeCheckReq)
|
||||||
|
if err := c.BodyParser(req); err != nil {
|
||||||
|
return err
|
||||||
|
}
|
||||||
|
|
||||||
|
// 取消支付
|
||||||
|
err = s.Trade.CancelTrade(req.TradeNo, req.Method)
|
||||||
|
if err != nil {
|
||||||
|
slog.Warn("取消交易失败", "trade_no", req.TradeNo, "method", req.Method, "error", err)
|
||||||
|
}
|
||||||
|
|
||||||
|
return c.SendStatus(fiber.StatusNoContent)
|
||||||
|
}
|
||||||
|
|
||||||
func AlipayCallback(c *fiber.Ctx) (err error) {
|
func AlipayCallback(c *fiber.Ctx) (err error) {
|
||||||
|
|
||||||
@@ -121,14 +143,6 @@ func AlipayCallback(c *fiber.Ctx) (err error) {
|
|||||||
return c.SendString("success")
|
return c.SendString("success")
|
||||||
}
|
}
|
||||||
|
|
||||||
func isRefund(notification *alipay.Notification) bool {
|
|
||||||
return notification.OutBizNo != "" || notification.RefundFee != "" || notification.GmtRefund != ""
|
|
||||||
}
|
|
||||||
|
|
||||||
// endregion
|
|
||||||
|
|
||||||
// region WechatPayCallback
|
|
||||||
|
|
||||||
func WechatPayCallback(c *fiber.Ctx) error {
|
func WechatPayCallback(c *fiber.Ctx) error {
|
||||||
|
|
||||||
// 解析请求参数
|
// 解析请求参数
|
||||||
@@ -188,5 +202,3 @@ func WechatPayCallback(c *fiber.Ctx) error {
|
|||||||
|
|
||||||
return nil
|
return nil
|
||||||
}
|
}
|
||||||
|
|
||||||
// endregion
|
|
||||||
|
|||||||
@@ -82,4 +82,9 @@ func ApplyRouters(app *fiber.App) {
|
|||||||
app.Get("/test", func(c *fiber.Ctx) error {
|
app.Get("/test", func(c *fiber.Ctx) error {
|
||||||
return core.NewBizErr("测试错误")
|
return core.NewBizErr("测试错误")
|
||||||
})
|
})
|
||||||
|
|
||||||
|
// 异步任务客户端
|
||||||
|
tasks := api.Group("/tasks")
|
||||||
|
tasks.Post("/channel/remove", handlers.RemoveChannelByTask)
|
||||||
|
tasks.Post("/trade/cancel", handlers.TradeCancelByTask)
|
||||||
}
|
}
|
||||||
|
|||||||
@@ -233,7 +233,10 @@ func (s *tradeService) CreateTrade(q *q.Query, uid int32, now time.Time, data *T
|
|||||||
}
|
}
|
||||||
|
|
||||||
// 提交异步任务更新订单状态
|
// 提交异步任务更新订单状态
|
||||||
_, err = g.Asynq.Enqueue(tasks.NewUpdateTrade(tradeNo, method))
|
_, err = g.Asynq.Enqueue(tasks.NewCancelTrade(tasks.CancelTradeData{
|
||||||
|
TradeNo: tradeNo,
|
||||||
|
Method: method,
|
||||||
|
}))
|
||||||
if err != nil {
|
if err != nil {
|
||||||
return nil, err
|
return nil, err
|
||||||
}
|
}
|
||||||
|
|||||||
@@ -7,13 +7,18 @@ import (
|
|||||||
trade2 "platform/web/domains/trade"
|
trade2 "platform/web/domains/trade"
|
||||||
)
|
)
|
||||||
|
|
||||||
const UpdateTrade = "trade:update"
|
const CancelTrade = "trade:update"
|
||||||
|
|
||||||
func NewUpdateTrade(tradeNo string, method trade2.Method) *asynq.Task {
|
type CancelTradeData struct {
|
||||||
bytes, err := json.Marshal(tradeNo)
|
TradeNo string `json:"trade_no" validate:"required"`
|
||||||
|
Method trade2.Method `json:"method" validate:"required"`
|
||||||
|
}
|
||||||
|
|
||||||
|
func NewCancelTrade(data CancelTradeData) *asynq.Task {
|
||||||
|
bytes, err := json.Marshal(data)
|
||||||
if err != nil {
|
if err != nil {
|
||||||
slog.Error("序列化更新交易任务失败", "error", err)
|
slog.Error("序列化更新交易任务失败", "error", err)
|
||||||
return nil
|
return nil
|
||||||
}
|
}
|
||||||
return asynq.NewTask(UpdateTrade, bytes)
|
return asynq.NewTask(CancelTrade, bytes)
|
||||||
}
|
}
|
||||||
|
|||||||
Reference in New Issue
Block a user