From 2d053ddf49d4bb72e1de9cba34a6202be95b8945 Mon Sep 17 00:00:00 2001 From: luorijun Date: Thu, 4 Dec 2025 09:45:35 +0800 Subject: [PATCH] =?UTF-8?q?=E5=AE=9E=E7=8E=B0=20sse=20=E6=A3=80=E6=9F=A5?= =?UTF-8?q?=E8=AE=A2=E5=8D=95=EF=BC=8C=E5=87=8F=E5=B0=91=E8=AF=B7=E6=B1=82?= =?UTF-8?q?=E6=AC=A1=E6=95=B0?= MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit --- README.md | 21 ++++++------ go.mod | 2 +- pkg/env/env.go | 6 ++-- web/events/log.go | 33 ------------------ web/globals/validator.go | 19 ++++++++--- web/handlers/channel.go | 2 +- web/handlers/resource.go | 8 ++--- web/handlers/trade.go | 62 ++++++++++++++++++++++++++++++++-- web/handlers/whitelist.go | 2 +- web/middlewares.go | 10 ++++-- web/routes.go | 1 + web/services/channel_baiyin.go | 4 +-- web/services/trade.go | 2 +- web/tasks/task.go | 2 +- 14 files changed, 107 insertions(+), 67 deletions(-) delete mode 100644 web/events/log.go diff --git a/README.md b/README.md index 3fc2628..ead0490 100644 --- a/README.md +++ b/README.md @@ -1,27 +1,28 @@ ## TODO -实现 sse 检查订单,减少请求次数 +重新梳理提取逻辑: +- 注册代理 & 注销代理 + - 实现两个 set 池,分别保存可用端口和全部端口 + - 添加代理时,同时将端口加入全部池和可用池 + - 注销代理时,同时将端口从全部池和可用池中移除 +- 调整通道分配策略,提供一个 all set 和一个 free set,提取时取交集再取出,归还时取交集再归还。 +- redis channel lease 加一个 zset,定时处理没有成功释放的端口 + +### 低优先 trade/create 性能问题,缩短事务时间,考虑其他方式实现可靠分布式事务 -重新实现接口 proxy 注册与注销接口: -- 注册时向 redis ports 可用池中加入端口 -- 注销时需要同时移除可用池与租约池中的端口(需要考虑具体实现,考虑正在使用的节点归还问题) -- 调整通道分配策略,提供一个 all set 和一个 free set,提取时取交集再取出,归还时取交集再归还。 - 需要确认以下 ID.GenSerial 的分布式并发安全性 -网关缩扩容太慢 +jsonb 类型转换问题 -redis channel lease 加一个 zset,定时处理没有成功释放的端口 +标准化生产环境 cors 配置 底层调用集成 otel - redis - gorm - 三方接口 -### 长期 - 分离 task 的客户端,支持多进程(prefork 必要!) 调整目录结构: diff --git a/go.mod b/go.mod index 903c1bf..1e22672 100644 --- a/go.mod +++ b/go.mod @@ -20,6 +20,7 @@ require ( github.com/redis/go-redis/v9 v9.17.2 github.com/shopspring/decimal v1.4.0 github.com/smartwalle/alipay/v3 v3.2.28 + github.com/valyala/fasthttp v1.68.0 github.com/wechatpay-apiv3/wechatpay-go v0.2.21 go.opentelemetry.io/otel v1.38.0 go.opentelemetry.io/otel/exporters/otlp/otlptrace/otlptracegrpc v1.38.0 @@ -80,7 +81,6 @@ require ( github.com/spf13/cast v1.10.0 // indirect github.com/tjfoc/gmsm v1.4.1 // indirect github.com/valyala/bytebufferpool v1.0.0 // indirect - github.com/valyala/fasthttp v1.68.0 // indirect go.opentelemetry.io/auto/sdk v1.2.1 // indirect go.opentelemetry.io/contrib v1.38.0 // indirect go.opentelemetry.io/otel/exporters/otlp/otlptrace v1.38.0 // indirect diff --git a/pkg/env/env.go b/pkg/env/env.go index 7da8583..1dc4d4b 100644 --- a/pkg/env/env.go +++ b/pkg/env/env.go @@ -20,9 +20,9 @@ const ( var ( RunMode = RunModeProd LogLevel = slog.LevelDebug - TradeExpire = 30 * 60 // 交易过期时间,单位秒 - SessionAccessExpire = 60 * 60 * 2 // 默认 2 小时 - SessionRefreshExpire = 60 * 60 * 24 * 7 // 默认 7 天 + TradeExpire = 15 * 60 // 交易过期时间,单位秒。默认 15 分钟 + SessionAccessExpire = 60 * 60 * 2 // 访问令牌过期时间,单位秒。默认 2 小时 + SessionRefreshExpire = 60 * 60 * 24 * 7 // 刷新令牌过期时间,单位秒。默认 7 天 DebugHttpDump = false // 是否打印请求和响应的原始数据 DebugExternalChange = true // 是否实际执行外部非幂等接口调用,在开发调试时可以关闭,避免对外部数据产生影响 diff --git a/web/events/log.go b/web/events/log.go deleted file mode 100644 index eec2aca..0000000 --- a/web/events/log.go +++ /dev/null @@ -1,33 +0,0 @@ -package events - -import ( - "encoding/json" - "github.com/hibiken/asynq" - "log/slog" - "time" -) - -type RequestLog struct { - Type string - User int32 - IP string - UA string - Method string - Path string - Status int - Error string - Latency time.Duration - Time time.Time -} - -func NewRequestLog(data RequestLog) *asynq.Task { - var rs, err = json.Marshal(data) - if err != nil { - slog.Error("日志数据序列化失败", slog.Any("err", err), slog.Any("data", data)) - return nil - } - return asynq.NewTask("logs:request", rs) -} - -type LoginLog struct { -} diff --git a/web/globals/validator.go b/web/globals/validator.go index 50d79bc..df1311b 100644 --- a/web/globals/validator.go +++ b/web/globals/validator.go @@ -3,6 +3,7 @@ package globals import ( "errors" "fmt" + "log/slog" "strings" "github.com/go-playground/locales/zh" @@ -19,22 +20,32 @@ type ValidatorClient struct { translator ut.Translator } -func (v *ValidatorClient) Validate(c *fiber.Ctx, data any) error { - +func (v *ValidatorClient) ParseBody(c *fiber.Ctx, data any) error { if err := c.BodyParser(data); err != nil { return err } + return validate(data) +} - if errs := v.validator.Struct(data); errs != nil { +func (v *ValidatorClient) ParseQuery(c *fiber.Ctx, data any) error { + if err := c.QueryParser(data); err != nil { + return err + } + return validate(data) +} + +func validate(data any) error { + if errs := Validator.validator.Struct(data); errs != nil { var sb = strings.Builder{} var typeErrs validator.ValidationErrors errors.As(errs, &typeErrs) for i, err := range typeErrs { - sb.WriteString(err.Translate(v.translator)) + sb.WriteString(err.Translate(Validator.translator)) if i < len(typeErrs)-1 { sb.WriteString("\n") } } + slog.Debug("请求参数验证失败", "msg", sb.String()) return fiber.NewError(fiber.StatusBadRequest, sb.String()) } return nil diff --git a/web/handlers/channel.go b/web/handlers/channel.go index 07798d9..b2cbd0b 100644 --- a/web/handlers/channel.go +++ b/web/handlers/channel.go @@ -113,7 +113,7 @@ func CreateChannel(c *fiber.Ctx) error { // 解析参数 req := new(CreateChannelReq) - if err := g.Validator.Validate(c, req); err != nil { + if err := g.Validator.ParseBody(c, req); err != nil { return core.NewBizErr("解析参数失败", err) } diff --git a/web/handlers/resource.go b/web/handlers/resource.go index ea7452c..aba4e1f 100644 --- a/web/handlers/resource.go +++ b/web/handlers/resource.go @@ -147,7 +147,7 @@ func ListResourceLong(c *fiber.Ctx) error { do.Where(q.ResourceLong.As(q.Resource.Long.Name()).Expire.Lte(*req.ExpireBefore)) } - resource, err := q.Resource.Debug().Where(do). + resource, err := q.Resource.Where(do). Joins(q.Resource.Long). Order(q.Resource.CreatedAt.Desc()). Offset(req.GetOffset()). @@ -354,7 +354,7 @@ func StatisticResourceUsage(c *fiber.Ctx) error { // 解析请求参数 var req = new(StatisticResourceUsageReq) - if err := g.Validator.Validate(c, req); err != nil { + if err := g.Validator.ParseBody(c, req); err != nil { return err } @@ -416,7 +416,7 @@ func CreateResource(c *fiber.Ctx) error { // 解析请求参数 var req = new(CreateResourceReq) - if err := g.Validator.Validate(c, req); err != nil { + if err := g.Validator.ParseBody(c, req); err != nil { return err } @@ -438,7 +438,7 @@ func ResourcePrice(c *fiber.Ctx) error { // 解析请求参数 var req = new(CreateResourceReq) - if err := g.Validator.Validate(c, req); err != nil { + if err := g.Validator.ParseBody(c, req); err != nil { return err } diff --git a/web/handlers/trade.go b/web/handlers/trade.go index bf0e6f7..9be38e7 100644 --- a/web/handlers/trade.go +++ b/web/handlers/trade.go @@ -1,15 +1,20 @@ package handlers import ( + "bufio" + "fmt" "log/slog" + "platform/pkg/env" "platform/web/auth" "platform/web/core" g "platform/web/globals" m "platform/web/models" s "platform/web/services" + "reflect" "time" "github.com/gofiber/fiber/v2" + "github.com/valyala/fasthttp" ) type TradeCreateReq struct { @@ -33,7 +38,7 @@ func TradeCreate(c *fiber.Ctx) error { // 解析请求参数 req := new(TradeCreateReq) - if err := g.Validator.Validate(c, req); err != nil { + if err := g.Validator.ParseBody(c, req); err != nil { return err } @@ -76,7 +81,7 @@ func TradeComplete(c *fiber.Ctx) error { // 解析请求参数 req := new(TradeCompleteReq) - if err := g.Validator.Validate(c, req); err != nil { + if err := g.Validator.ParseBody(c, req); err != nil { return err } @@ -102,7 +107,7 @@ func TradeCancel(c *fiber.Ctx) error { // 解析请求参数 req := new(TradeCancelReq) - if err := g.Validator.Validate(c, req); err != nil { + if err := g.Validator.ParseBody(c, req); err != nil { return err } @@ -115,3 +120,54 @@ func TradeCancel(c *fiber.Ctx) error { return c.SendStatus(fiber.StatusNoContent) } + +type TradeCheckReq struct { + s.ModifyTradeData +} + +func TradeCheck(c *fiber.Ctx) error { + // 解析请求参数 + req := new(TradeCheckReq) + if err := g.Validator.ParseQuery(c, req); err != nil { + return err + } + + c.Set(fiber.HeaderContentType, "text/event-stream") + c.Set(fiber.HeaderCacheControl, "no-cache") + c.Set(fiber.HeaderConnection, "keep-alive") + c.Set(fiber.HeaderTransferEncoding, "chunked") + c.Context().SetBodyStreamWriter(fasthttp.StreamWriter(func(w *bufio.Writer) { + + expire := env.TradeExpire + interval := 5 + for range expire / interval { + // 检查订单状态 + result, err := s.Trade.CheckTrade(&req.ModifyTradeData) + if err != nil { + slog.Error("检查订单状态失败", "trade_no", req.TradeNo, "error", err) + return + } + + // 写入订单状态 + _, err = fmt.Fprintf(w, "data: %d\n\n", result.Status) + if err != nil { + slog.Error("写入订单状态失败", "trade_no", req.TradeNo, "error", err) + return + } + err = w.Flush() + if err != nil { + slog.Error("刷新缓冲区失败", "trade_no", req.TradeNo, "error", err, "errType", reflect.TypeOf(err)) + return + } + + // 当订单离开支付状态后结束查询 + if result.Status != m.TradeStatusPending { + return + } + + time.Sleep(time.Duration(interval) * time.Second) + } + })) + + return nil +} diff --git a/web/handlers/whitelist.go b/web/handlers/whitelist.go index fc89771..56f73cd 100644 --- a/web/handlers/whitelist.go +++ b/web/handlers/whitelist.go @@ -85,7 +85,7 @@ func CreateWhitelist(c *fiber.Ctx) error { // 解析请求参数 req := new(CreateWhitelistReq) - err = g.Validator.Validate(c, req) + err = g.Validator.ParseBody(c, req) if err != nil { return err } diff --git a/web/middlewares.go b/web/middlewares.go index 84c5d30..3be700a 100644 --- a/web/middlewares.go +++ b/web/middlewares.go @@ -5,6 +5,7 @@ import ( "github.com/gofiber/contrib/otelfiber/v2" "github.com/gofiber/fiber/v2" + "github.com/gofiber/fiber/v2/middleware/cors" "github.com/gofiber/fiber/v2/middleware/logger" "github.com/gofiber/fiber/v2/middleware/recover" "github.com/gofiber/fiber/v2/middleware/requestid" @@ -19,9 +20,6 @@ func ApplyMiddlewares(app *fiber.App) { EnableStackTrace: true, })) - // metric - app.Use(otelfiber.Middleware()) - // logger app.Use(logger.New(logger.Config{ Next: func(c *fiber.Ctx) bool { @@ -29,6 +27,9 @@ func ApplyMiddlewares(app *fiber.App) { }, })) + // metric + app.Use(otelfiber.Middleware()) + // request id app.Use(requestid.New(requestid.Config{ Generator: func() string { @@ -37,6 +38,9 @@ func ApplyMiddlewares(app *fiber.App) { }, })) + // cors + app.Use(cors.New()) + // authenticate app.Use(auth.Authenticate()) } diff --git a/web/routes.go b/web/routes.go index a988563..3813c99 100644 --- a/web/routes.go +++ b/web/routes.go @@ -52,6 +52,7 @@ func ApplyRouters(app *fiber.App) { trade.Post("/create", handlers.TradeCreate) trade.Post("/complete", handlers.TradeComplete) trade.Post("/cancel", handlers.TradeCancel) + trade.Get("/check", handlers.TradeCheck) // 账单 bill := api.Group("/bill") diff --git a/web/services/channel_baiyin.go b/web/services/channel_baiyin.go index b0ff870..ad1e02b 100644 --- a/web/services/channel_baiyin.go +++ b/web/services/channel_baiyin.go @@ -283,7 +283,7 @@ func (s *channelBaiyinService) CreateChannels(source netip.Addr, resourceId int3 } } else { bytes, _ := json.Marshal(configs) - slog.Debug("提交代理端口配置", "config", string(bytes)) + slog.Debug("提交代理端口配置", "proxy", proxy.IP.String(), "config", string(bytes)) } } @@ -348,7 +348,7 @@ func (s *channelBaiyinService) RemoveChannels(batch string, ids []int32) error { } } else { bytes, _ := json.Marshal(configs) - slog.Debug("清除代理端口配置", "config", string(bytes)) + slog.Debug("清除代理端口配置", "proxy", ip, "config", string(bytes)) } } diff --git a/web/services/trade.go b/web/services/trade.go index 7e47836..537c1f3 100644 --- a/web/services/trade.go +++ b/web/services/trade.go @@ -668,7 +668,7 @@ type CreateTradeResult struct { } type ModifyTradeData struct { - TradeNo string `json:"trade_no" validate:"required"` + TradeNo string `json:"trade_no" query:"trade_no" validate:"required"` Method m.TradeMethod `json:"method" validate:"required"` } diff --git a/web/tasks/task.go b/web/tasks/task.go index e63e51c..3f519bb 100644 --- a/web/tasks/task.go +++ b/web/tasks/task.go @@ -141,7 +141,7 @@ func HandleFlushGateway(_ context.Context, task *asynq.Task) error { } } else { bytes, _ := json.Marshal(configs) - slog.Debug("更新代理后备配置", "config", string(bytes)) + slog.Debug("更新代理后备配置", "proxy", proxy.IP.String(), "config", string(bytes)) } _, err := q.Proxy.