实现 sse 检查订单,减少请求次数

This commit is contained in:
2025-12-04 09:45:35 +08:00
parent caa997b95c
commit 2d053ddf49
14 changed files with 107 additions and 67 deletions

View File

@@ -1,27 +1,28 @@
## TODO
实现 sse 检查订单,减少请求次数
重新梳理提取逻辑:
- 注册代理 & 注销代理
- 实现两个 set 池,分别保存可用端口和全部端口
- 添加代理时,同时将端口加入全部池和可用池
- 注销代理时,同时将端口从全部池和可用池中移除
- 调整通道分配策略,提供一个 all set 和一个 free set提取时取交集再取出归还时取交集再归还。
- redis channel lease 加一个 zset定时处理没有成功释放的端口
### 低优先
trade/create 性能问题,缩短事务时间,考虑其他方式实现可靠分布式事务
重新实现接口 proxy 注册与注销接口:
- 注册时向 redis ports 可用池中加入端口
- 注销时需要同时移除可用池与租约池中的端口(需要考虑具体实现,考虑正在使用的节点归还问题)
- 调整通道分配策略,提供一个 all set 和一个 free set提取时取交集再取出归还时取交集再归还。
需要确认以下 ID.GenSerial 的分布式并发安全性
网关缩扩容太慢
jsonb 类型转换问题
redis channel lease 加一个 zset定时处理没有成功释放的端口
标准化生产环境 cors 配置
底层调用集成 otel
- redis
- gorm
- 三方接口
### 长期
分离 task 的客户端支持多进程prefork 必要!)
调整目录结构:

2
go.mod
View File

@@ -20,6 +20,7 @@ require (
github.com/redis/go-redis/v9 v9.17.2
github.com/shopspring/decimal v1.4.0
github.com/smartwalle/alipay/v3 v3.2.28
github.com/valyala/fasthttp v1.68.0
github.com/wechatpay-apiv3/wechatpay-go v0.2.21
go.opentelemetry.io/otel v1.38.0
go.opentelemetry.io/otel/exporters/otlp/otlptrace/otlptracegrpc v1.38.0
@@ -80,7 +81,6 @@ require (
github.com/spf13/cast v1.10.0 // indirect
github.com/tjfoc/gmsm v1.4.1 // indirect
github.com/valyala/bytebufferpool v1.0.0 // indirect
github.com/valyala/fasthttp v1.68.0 // indirect
go.opentelemetry.io/auto/sdk v1.2.1 // indirect
go.opentelemetry.io/contrib v1.38.0 // indirect
go.opentelemetry.io/otel/exporters/otlp/otlptrace v1.38.0 // indirect

6
pkg/env/env.go vendored
View File

@@ -20,9 +20,9 @@ const (
var (
RunMode = RunModeProd
LogLevel = slog.LevelDebug
TradeExpire = 30 * 60 // 交易过期时间,单位秒
SessionAccessExpire = 60 * 60 * 2 // 默认 2 小时
SessionRefreshExpire = 60 * 60 * 24 * 7 // 默认 7 天
TradeExpire = 15 * 60 // 交易过期时间,单位秒。默认 15 分钟
SessionAccessExpire = 60 * 60 * 2 // 访问令牌过期时间,单位秒。默认 2 小时
SessionRefreshExpire = 60 * 60 * 24 * 7 // 刷新令牌过期时间,单位秒。默认 7 天
DebugHttpDump = false // 是否打印请求和响应的原始数据
DebugExternalChange = true // 是否实际执行外部非幂等接口调用,在开发调试时可以关闭,避免对外部数据产生影响

View File

@@ -1,33 +0,0 @@
package events
import (
"encoding/json"
"github.com/hibiken/asynq"
"log/slog"
"time"
)
type RequestLog struct {
Type string
User int32
IP string
UA string
Method string
Path string
Status int
Error string
Latency time.Duration
Time time.Time
}
func NewRequestLog(data RequestLog) *asynq.Task {
var rs, err = json.Marshal(data)
if err != nil {
slog.Error("日志数据序列化失败", slog.Any("err", err), slog.Any("data", data))
return nil
}
return asynq.NewTask("logs:request", rs)
}
type LoginLog struct {
}

View File

@@ -3,6 +3,7 @@ package globals
import (
"errors"
"fmt"
"log/slog"
"strings"
"github.com/go-playground/locales/zh"
@@ -19,22 +20,32 @@ type ValidatorClient struct {
translator ut.Translator
}
func (v *ValidatorClient) Validate(c *fiber.Ctx, data any) error {
func (v *ValidatorClient) ParseBody(c *fiber.Ctx, data any) error {
if err := c.BodyParser(data); err != nil {
return err
}
return validate(data)
}
if errs := v.validator.Struct(data); errs != nil {
func (v *ValidatorClient) ParseQuery(c *fiber.Ctx, data any) error {
if err := c.QueryParser(data); err != nil {
return err
}
return validate(data)
}
func validate(data any) error {
if errs := Validator.validator.Struct(data); errs != nil {
var sb = strings.Builder{}
var typeErrs validator.ValidationErrors
errors.As(errs, &typeErrs)
for i, err := range typeErrs {
sb.WriteString(err.Translate(v.translator))
sb.WriteString(err.Translate(Validator.translator))
if i < len(typeErrs)-1 {
sb.WriteString("\n")
}
}
slog.Debug("请求参数验证失败", "msg", sb.String())
return fiber.NewError(fiber.StatusBadRequest, sb.String())
}
return nil

View File

@@ -113,7 +113,7 @@ func CreateChannel(c *fiber.Ctx) error {
// 解析参数
req := new(CreateChannelReq)
if err := g.Validator.Validate(c, req); err != nil {
if err := g.Validator.ParseBody(c, req); err != nil {
return core.NewBizErr("解析参数失败", err)
}

View File

@@ -147,7 +147,7 @@ func ListResourceLong(c *fiber.Ctx) error {
do.Where(q.ResourceLong.As(q.Resource.Long.Name()).Expire.Lte(*req.ExpireBefore))
}
resource, err := q.Resource.Debug().Where(do).
resource, err := q.Resource.Where(do).
Joins(q.Resource.Long).
Order(q.Resource.CreatedAt.Desc()).
Offset(req.GetOffset()).
@@ -354,7 +354,7 @@ func StatisticResourceUsage(c *fiber.Ctx) error {
// 解析请求参数
var req = new(StatisticResourceUsageReq)
if err := g.Validator.Validate(c, req); err != nil {
if err := g.Validator.ParseBody(c, req); err != nil {
return err
}
@@ -416,7 +416,7 @@ func CreateResource(c *fiber.Ctx) error {
// 解析请求参数
var req = new(CreateResourceReq)
if err := g.Validator.Validate(c, req); err != nil {
if err := g.Validator.ParseBody(c, req); err != nil {
return err
}
@@ -438,7 +438,7 @@ func ResourcePrice(c *fiber.Ctx) error {
// 解析请求参数
var req = new(CreateResourceReq)
if err := g.Validator.Validate(c, req); err != nil {
if err := g.Validator.ParseBody(c, req); err != nil {
return err
}

View File

@@ -1,15 +1,20 @@
package handlers
import (
"bufio"
"fmt"
"log/slog"
"platform/pkg/env"
"platform/web/auth"
"platform/web/core"
g "platform/web/globals"
m "platform/web/models"
s "platform/web/services"
"reflect"
"time"
"github.com/gofiber/fiber/v2"
"github.com/valyala/fasthttp"
)
type TradeCreateReq struct {
@@ -33,7 +38,7 @@ func TradeCreate(c *fiber.Ctx) error {
// 解析请求参数
req := new(TradeCreateReq)
if err := g.Validator.Validate(c, req); err != nil {
if err := g.Validator.ParseBody(c, req); err != nil {
return err
}
@@ -76,7 +81,7 @@ func TradeComplete(c *fiber.Ctx) error {
// 解析请求参数
req := new(TradeCompleteReq)
if err := g.Validator.Validate(c, req); err != nil {
if err := g.Validator.ParseBody(c, req); err != nil {
return err
}
@@ -102,7 +107,7 @@ func TradeCancel(c *fiber.Ctx) error {
// 解析请求参数
req := new(TradeCancelReq)
if err := g.Validator.Validate(c, req); err != nil {
if err := g.Validator.ParseBody(c, req); err != nil {
return err
}
@@ -115,3 +120,54 @@ func TradeCancel(c *fiber.Ctx) error {
return c.SendStatus(fiber.StatusNoContent)
}
type TradeCheckReq struct {
s.ModifyTradeData
}
func TradeCheck(c *fiber.Ctx) error {
// 解析请求参数
req := new(TradeCheckReq)
if err := g.Validator.ParseQuery(c, req); err != nil {
return err
}
c.Set(fiber.HeaderContentType, "text/event-stream")
c.Set(fiber.HeaderCacheControl, "no-cache")
c.Set(fiber.HeaderConnection, "keep-alive")
c.Set(fiber.HeaderTransferEncoding, "chunked")
c.Context().SetBodyStreamWriter(fasthttp.StreamWriter(func(w *bufio.Writer) {
expire := env.TradeExpire
interval := 5
for range expire / interval {
// 检查订单状态
result, err := s.Trade.CheckTrade(&req.ModifyTradeData)
if err != nil {
slog.Error("检查订单状态失败", "trade_no", req.TradeNo, "error", err)
return
}
// 写入订单状态
_, err = fmt.Fprintf(w, "data: %d\n\n", result.Status)
if err != nil {
slog.Error("写入订单状态失败", "trade_no", req.TradeNo, "error", err)
return
}
err = w.Flush()
if err != nil {
slog.Error("刷新缓冲区失败", "trade_no", req.TradeNo, "error", err, "errType", reflect.TypeOf(err))
return
}
// 当订单离开支付状态后结束查询
if result.Status != m.TradeStatusPending {
return
}
time.Sleep(time.Duration(interval) * time.Second)
}
}))
return nil
}

View File

@@ -85,7 +85,7 @@ func CreateWhitelist(c *fiber.Ctx) error {
// 解析请求参数
req := new(CreateWhitelistReq)
err = g.Validator.Validate(c, req)
err = g.Validator.ParseBody(c, req)
if err != nil {
return err
}

View File

@@ -5,6 +5,7 @@ import (
"github.com/gofiber/contrib/otelfiber/v2"
"github.com/gofiber/fiber/v2"
"github.com/gofiber/fiber/v2/middleware/cors"
"github.com/gofiber/fiber/v2/middleware/logger"
"github.com/gofiber/fiber/v2/middleware/recover"
"github.com/gofiber/fiber/v2/middleware/requestid"
@@ -19,9 +20,6 @@ func ApplyMiddlewares(app *fiber.App) {
EnableStackTrace: true,
}))
// metric
app.Use(otelfiber.Middleware())
// logger
app.Use(logger.New(logger.Config{
Next: func(c *fiber.Ctx) bool {
@@ -29,6 +27,9 @@ func ApplyMiddlewares(app *fiber.App) {
},
}))
// metric
app.Use(otelfiber.Middleware())
// request id
app.Use(requestid.New(requestid.Config{
Generator: func() string {
@@ -37,6 +38,9 @@ func ApplyMiddlewares(app *fiber.App) {
},
}))
// cors
app.Use(cors.New())
// authenticate
app.Use(auth.Authenticate())
}

View File

@@ -52,6 +52,7 @@ func ApplyRouters(app *fiber.App) {
trade.Post("/create", handlers.TradeCreate)
trade.Post("/complete", handlers.TradeComplete)
trade.Post("/cancel", handlers.TradeCancel)
trade.Get("/check", handlers.TradeCheck)
// 账单
bill := api.Group("/bill")

View File

@@ -283,7 +283,7 @@ func (s *channelBaiyinService) CreateChannels(source netip.Addr, resourceId int3
}
} else {
bytes, _ := json.Marshal(configs)
slog.Debug("提交代理端口配置", "config", string(bytes))
slog.Debug("提交代理端口配置", "proxy", proxy.IP.String(), "config", string(bytes))
}
}
@@ -348,7 +348,7 @@ func (s *channelBaiyinService) RemoveChannels(batch string, ids []int32) error {
}
} else {
bytes, _ := json.Marshal(configs)
slog.Debug("清除代理端口配置", "config", string(bytes))
slog.Debug("清除代理端口配置", "proxy", ip, "config", string(bytes))
}
}

View File

@@ -668,7 +668,7 @@ type CreateTradeResult struct {
}
type ModifyTradeData struct {
TradeNo string `json:"trade_no" validate:"required"`
TradeNo string `json:"trade_no" query:"trade_no" validate:"required"`
Method m.TradeMethod `json:"method" validate:"required"`
}

View File

@@ -141,7 +141,7 @@ func HandleFlushGateway(_ context.Context, task *asynq.Task) error {
}
} else {
bytes, _ := json.Marshal(configs)
slog.Debug("更新代理后备配置", "config", string(bytes))
slog.Debug("更新代理后备配置", "proxy", proxy.IP.String(), "config", string(bytes))
}
_, err := q.Proxy.