diff --git a/.gitignore b/.gitignore index 344333d..386239d 100644 --- a/.gitignore +++ b/.gitignore @@ -15,5 +15,7 @@ bin/ */playground/ scripts/* -!scripts/dev/ +!scripts/env/ +!scripts/env/dev/ +!scripts/env/pre/ !scripts/sql/ diff --git a/README.md b/README.md index 712cbc3..be42bc8 100644 --- a/README.md +++ b/README.md @@ -1,5 +1,7 @@ ## TODO +pre 环境屏蔽外部配置后启动,主要用于检查和配置采集器 + 创建交易订单后添加一个关闭订单的异步任务 支付回调需要判断可能重复调用的情况 diff --git a/cmd/main/main.go b/cmd/main/main.go index 93dfbda..cbb727c 100644 --- a/cmd/main/main.go +++ b/cmd/main/main.go @@ -45,7 +45,7 @@ func main() { select { case err = <-errCh: case <-shutdown: - slog.Info("Received shutdown signal") + slog.Debug("捕获结束信号") app.Stop() err = <-errCh } diff --git a/scripts/dev/docker-compose.yaml b/scripts/env/dev/docker-compose.yaml similarity index 93% rename from scripts/dev/docker-compose.yaml rename to scripts/env/dev/docker-compose.yaml index b6cfbfd..78d2449 100644 --- a/scripts/dev/docker-compose.yaml +++ b/scripts/env/dev/docker-compose.yaml @@ -4,7 +4,6 @@ services: postgres: image: postgres:17 - restart: always environment: POSTGRES_USER: ${DB_USERNAME} POSTGRES_PASSWORD: ${DB_PASSWORD} @@ -16,7 +15,6 @@ services: postgres-migration: image: postgres:17 - restart: always environment: POSTGRES_USER: ${DB_USERNAME} POSTGRES_PASSWORD: ${DB_PASSWORD} diff --git a/scripts/env/pre/docker-compose.yaml b/scripts/env/pre/docker-compose.yaml new file mode 100644 index 0000000..f577827 --- /dev/null +++ b/scripts/env/pre/docker-compose.yaml @@ -0,0 +1,50 @@ +name: server-pre + +services: + + postgres: + image: postgres:17 + environment: + POSTGRES_USER: ${DB_USERNAME} + POSTGRES_PASSWORD: ${DB_PASSWORD} + POSTGRES_DB: ${DB_NAME} + ports: + - "5434:5432" + volumes: + - postgres_data:/var/lib/postgresql/data + + redis: + image: redis:7.4 + restart: always + ports: + - "6380:6379" + + platform: + build: + context: ../../.. + dockerfile: Dockerfile + environment: + - RUN_MODE=production + - DB_PORT=5434 + - REDIS_PORT=6380 + ports: + - "8081:8080" + depends_on: + - postgres + - redis + + vector: + image: timberio/vector:0.47.0-alpine + volumes: + - ./vector/vector.toml:/etc/vector/vector.toml + - vector_data:/var/lib/vector + ports: + - "9000:9000" + command: ["vector", "-c", "/etc/vector/vector.toml"] + depends_on: + - postgres + - platform + +volumes: + postgres_data: + vector_data: diff --git a/scripts/env/pre/vector/vector.toml b/scripts/env/pre/vector/vector.toml new file mode 100644 index 0000000..3b5296e --- /dev/null +++ b/scripts/env/pre/vector/vector.toml @@ -0,0 +1,43 @@ +## 源配置:从 Docker 获取容器日志 + +[sources.platform_logs] +type = "docker_logs" +include_containers = ["platform"] + +## 转换配置:为日志添加元数据 + +[transforms.platform_logs_parse] +type = "remap" +inputs = ["platform_logs"] +source = ''' +.container = "platform" +json, err = parse_json(.message) +if err != null { + log.error("日志转换 json 格式失败: {}", err) + .tag = "error" + return +} +. = merge(., json) +''' + +[transform.platform_logs_route] +type = "route" +inputs = ["platform_logs_parse"] + +[transform.platform_logs_route.route] +request = '.message == "接口请求"' +usage = '.message == "创建通道"' + +## 输出配置:将日志保存到 postgresql + +[sinks.platform_logs_request] +type = "postgres" +inputs = ["platform_logs_route.request"] + +[sinks.platform_logs_login] +type = "postgres" +inputs = ["platform_logs_route.login"] + +[sinks.platform_logs_usage] +type = "postgres" +inputs = ["platform_logs_route.usage"] diff --git a/web/auth/authenticate.go b/web/auth/authenticate.go index fa69752..f0f76df 100644 --- a/web/auth/authenticate.go +++ b/web/auth/authenticate.go @@ -98,8 +98,6 @@ func Protect(c *fiber.Ctx, types []PayloadType, permissions []string) (*Context, func Locals(c *fiber.Ctx, auth *Context) { c.Locals("auth", auth) - c.Locals("authtype", auth.Payload.Type.ToStr()) - c.Locals("authid", auth.Payload.Id) } func authBearer(ctx context.Context, token string) (*Context, error) { diff --git a/web/services/resource.go b/web/services/resource.go index 503192a..5a2041b 100644 --- a/web/services/resource.go +++ b/web/services/resource.go @@ -6,6 +6,7 @@ import ( "encoding/json" "fmt" "platform/pkg/u" + "platform/web/core" bill2 "platform/web/domains/bill" resource2 "platform/web/domains/resource" trade2 "platform/web/domains/trade" @@ -124,7 +125,7 @@ func (s *resourceService) PrepareResource(uid int32, now time.Time, ser *Prepare return err } - err = g.Redis.Set(context.Background(), result.TradeNo, &PrepareResourceCache{ + err = g.Redis.Set(context.Background(), resPrepareKey(result.TradeNo), &PrepareResourceCache{ Uid: uid, TradeId: result.Trade.ID, BillId: result.Bill.ID, @@ -146,9 +147,9 @@ func (s *resourceService) PrepareResource(uid int32, now time.Time, ser *Prepare func (s *resourceService) CompleteResource(tradeNo string, now time.Time, opResult ...*TradeSuccessResult) error { // 获取请求缓存 - reqStr, err := g.Redis.Get(context.Background(), tradeNo).Result() + reqStr, err := g.Redis.Get(context.Background(), resPrepareKey(tradeNo)).Result() if err != nil { - return err + return core.NewBizErr("交易不存在或已过期") } cache := new(PrepareResourceCache) if err := json.Unmarshal([]byte(reqStr), cache); err != nil { @@ -293,6 +294,10 @@ func createResource(q *q.Query, uid int32, now time.Time, data CreateTypeResourc return &resource, nil } +func resPrepareKey(tradeNo string) string { + return fmt.Sprintf("resource:prepare:%s", tradeNo) +} + type CreateTypeResourceDataInter interface { GetName() string GetPrice() decimal.Decimal diff --git a/web/web.go b/web/web.go index 550068a..93dfc36 100644 --- a/web/web.go +++ b/web/web.go @@ -10,14 +10,10 @@ import ( "log/slog" "net/http" _ "net/http/pprof" - "platform/pkg/u" "platform/web/auth" g "platform/web/globals" - "platform/web/globals/orm" - m "platform/web/models" q "platform/web/queries" "runtime" - "strconv" "strings" "time" ) @@ -74,13 +70,13 @@ func (s *Server) Run() error { }() // listen - slog.Info("Server started on :8080") + slog.Info("服务开始监听 :8080") err := s.fiber.Listen("0.0.0.0:8080") if err != nil { slog.Error("Failed to start server", slog.Any("err", err)) } - slog.Info("Server stopped") + slog.Info("服务已停止") return nil } @@ -121,55 +117,42 @@ func newLogger() fiber.Handler { TimeFormat: "2006-01-02 15:04:05", TimeZone: "Asia/Shanghai", Next: func(c *fiber.Ctx) bool { - c.Locals("authtype", auth.PayloadNone.ToStr()) - c.Locals("authid", 0) + authCtx, ok := c.Locals("auth").(*auth.Context) + if ok { + c.Locals("authtype", authCtx.Payload.Type.ToStr()) + c.Locals("authid", authCtx.Payload.Id) + } else { + c.Locals("authtype", auth.PayloadNone.ToStr()) + c.Locals("authid", 0) + } return false }, Done: func(c *fiber.Ctx, logBytes []byte) { - go func(ip, ua, method, path string, status int, logBytes []byte) { - var logStr = strings.TrimPrefix(string(logBytes), "🚀") - var logVars = strings.Split(logStr, "|") + var logStr = strings.TrimPrefix(string(logBytes), "🚀") + var logVars = strings.Split(logStr, "|") - var reqTimeStr = strings.TrimSpace(logVars[0]) - reqTime, err := time.ParseInLocation("2006-01-02 15:04:05", reqTimeStr, time.Local) - if err != nil { - slog.Error("时间解析错误", slog.Any("err", err)) - return - } + var reqTimeStr = strings.TrimSpace(logVars[0]) + reqTime, err := time.ParseInLocation("2006-01-02 15:04:05", reqTimeStr, time.Local) + if err != nil { + slog.Error("时间解析错误", slog.Any("err", err)) + return + } - var authInfo = strings.Split(strings.TrimSpace(logVars[1]), " ") - var authType = auth.PayloadTypeFromStr(strings.TrimSpace(authInfo[0])) - authID, err := strconv.Atoi(strings.TrimSpace(authInfo[1])) - if err != nil { - slog.Error("负载ID解析错误", slog.Any("err", err)) - return - } + var latency = strings.TrimSpace(logVars[4]) + var errStr = strings.TrimSpace(logVars[5]) - var latency = strings.TrimSpace(logVars[4]) - - var errStr = strings.TrimSpace(logVars[5]) - - var item = &m.LogsRequest{ - Identity: int32(authType), - IP: ip, - Ua: u.P(ua), - Method: method, - Path: path, - Latency: latency, - Status: int32(status), - Error: &errStr, - Time: orm.LocalDateTime(reqTime), - } - if authID != 0 { - item.Visitor = u.P(int32(authID)) - } - - err = q.LogsRequest.Create(item) - if err != nil { - slog.Error("日志记录错误", slog.Any("err", err)) - return - } - }(c.IP(), c.Get("User-Agent"), c.Method(), c.Path(), c.Response().StatusCode(), logBytes) + slog.Info("接口请求", + slog.String("identity", c.Locals("authtype").(string)), + slog.Int("visitor", c.Locals("authid").(int)), + slog.String("ip", c.IP()), + slog.String("ua", c.Get("User-Agent")), + slog.String("method", c.Method()), + slog.String("path", c.Path()), + slog.Int("status", c.Response().StatusCode()), + slog.String("error", errStr), + slog.String("latency", latency), + slog.Time("time", reqTime), + ) }, }) }