优化日志输出信息,调整 Docker Compose 配置,新增 Vector 日志收集配置

This commit is contained in:
2025-06-23 16:28:02 +08:00
parent fda1a2de0e
commit 20aa3d929c
9 changed files with 139 additions and 58 deletions

4
.gitignore vendored
View File

@@ -15,5 +15,7 @@ bin/
*/playground/ */playground/
scripts/* scripts/*
!scripts/dev/ !scripts/env/
!scripts/env/dev/
!scripts/env/pre/
!scripts/sql/ !scripts/sql/

View File

@@ -1,5 +1,7 @@
## TODO ## TODO
pre 环境屏蔽外部配置后启动,主要用于检查和配置采集器
创建交易订单后添加一个关闭订单的异步任务 创建交易订单后添加一个关闭订单的异步任务
支付回调需要判断可能重复调用的情况 支付回调需要判断可能重复调用的情况

View File

@@ -45,7 +45,7 @@ func main() {
select { select {
case err = <-errCh: case err = <-errCh:
case <-shutdown: case <-shutdown:
slog.Info("Received shutdown signal") slog.Debug("捕获结束信号")
app.Stop() app.Stop()
err = <-errCh err = <-errCh
} }

View File

@@ -4,7 +4,6 @@ services:
postgres: postgres:
image: postgres:17 image: postgres:17
restart: always
environment: environment:
POSTGRES_USER: ${DB_USERNAME} POSTGRES_USER: ${DB_USERNAME}
POSTGRES_PASSWORD: ${DB_PASSWORD} POSTGRES_PASSWORD: ${DB_PASSWORD}
@@ -16,7 +15,6 @@ services:
postgres-migration: postgres-migration:
image: postgres:17 image: postgres:17
restart: always
environment: environment:
POSTGRES_USER: ${DB_USERNAME} POSTGRES_USER: ${DB_USERNAME}
POSTGRES_PASSWORD: ${DB_PASSWORD} POSTGRES_PASSWORD: ${DB_PASSWORD}

50
scripts/env/pre/docker-compose.yaml vendored Normal file
View File

@@ -0,0 +1,50 @@
name: server-pre
services:
postgres:
image: postgres:17
environment:
POSTGRES_USER: ${DB_USERNAME}
POSTGRES_PASSWORD: ${DB_PASSWORD}
POSTGRES_DB: ${DB_NAME}
ports:
- "5434:5432"
volumes:
- postgres_data:/var/lib/postgresql/data
redis:
image: redis:7.4
restart: always
ports:
- "6380:6379"
platform:
build:
context: ../../..
dockerfile: Dockerfile
environment:
- RUN_MODE=production
- DB_PORT=5434
- REDIS_PORT=6380
ports:
- "8081:8080"
depends_on:
- postgres
- redis
vector:
image: timberio/vector:0.47.0-alpine
volumes:
- ./vector/vector.toml:/etc/vector/vector.toml
- vector_data:/var/lib/vector
ports:
- "9000:9000"
command: ["vector", "-c", "/etc/vector/vector.toml"]
depends_on:
- postgres
- platform
volumes:
postgres_data:
vector_data:

43
scripts/env/pre/vector/vector.toml vendored Normal file
View File

@@ -0,0 +1,43 @@
## 源配置:从 Docker 获取容器日志
[sources.platform_logs]
type = "docker_logs"
include_containers = ["platform"]
## 转换配置:为日志添加元数据
[transforms.platform_logs_parse]
type = "remap"
inputs = ["platform_logs"]
source = '''
.container = "platform"
json, err = parse_json(.message)
if err != null {
log.error("日志转换 json 格式失败: {}", err)
.tag = "error"
return
}
. = merge(., json)
'''
[transform.platform_logs_route]
type = "route"
inputs = ["platform_logs_parse"]
[transform.platform_logs_route.route]
request = '.message == "接口请求"'
usage = '.message == "创建通道"'
## 输出配置:将日志保存到 postgresql
[sinks.platform_logs_request]
type = "postgres"
inputs = ["platform_logs_route.request"]
[sinks.platform_logs_login]
type = "postgres"
inputs = ["platform_logs_route.login"]
[sinks.platform_logs_usage]
type = "postgres"
inputs = ["platform_logs_route.usage"]

View File

@@ -98,8 +98,6 @@ func Protect(c *fiber.Ctx, types []PayloadType, permissions []string) (*Context,
func Locals(c *fiber.Ctx, auth *Context) { func Locals(c *fiber.Ctx, auth *Context) {
c.Locals("auth", auth) c.Locals("auth", auth)
c.Locals("authtype", auth.Payload.Type.ToStr())
c.Locals("authid", auth.Payload.Id)
} }
func authBearer(ctx context.Context, token string) (*Context, error) { func authBearer(ctx context.Context, token string) (*Context, error) {

View File

@@ -6,6 +6,7 @@ import (
"encoding/json" "encoding/json"
"fmt" "fmt"
"platform/pkg/u" "platform/pkg/u"
"platform/web/core"
bill2 "platform/web/domains/bill" bill2 "platform/web/domains/bill"
resource2 "platform/web/domains/resource" resource2 "platform/web/domains/resource"
trade2 "platform/web/domains/trade" trade2 "platform/web/domains/trade"
@@ -124,7 +125,7 @@ func (s *resourceService) PrepareResource(uid int32, now time.Time, ser *Prepare
return err return err
} }
err = g.Redis.Set(context.Background(), result.TradeNo, &PrepareResourceCache{ err = g.Redis.Set(context.Background(), resPrepareKey(result.TradeNo), &PrepareResourceCache{
Uid: uid, Uid: uid,
TradeId: result.Trade.ID, TradeId: result.Trade.ID,
BillId: result.Bill.ID, BillId: result.Bill.ID,
@@ -146,9 +147,9 @@ func (s *resourceService) PrepareResource(uid int32, now time.Time, ser *Prepare
func (s *resourceService) CompleteResource(tradeNo string, now time.Time, opResult ...*TradeSuccessResult) error { func (s *resourceService) CompleteResource(tradeNo string, now time.Time, opResult ...*TradeSuccessResult) error {
// 获取请求缓存 // 获取请求缓存
reqStr, err := g.Redis.Get(context.Background(), tradeNo).Result() reqStr, err := g.Redis.Get(context.Background(), resPrepareKey(tradeNo)).Result()
if err != nil { if err != nil {
return err return core.NewBizErr("交易不存在或已过期")
} }
cache := new(PrepareResourceCache) cache := new(PrepareResourceCache)
if err := json.Unmarshal([]byte(reqStr), cache); err != nil { if err := json.Unmarshal([]byte(reqStr), cache); err != nil {
@@ -293,6 +294,10 @@ func createResource(q *q.Query, uid int32, now time.Time, data CreateTypeResourc
return &resource, nil return &resource, nil
} }
func resPrepareKey(tradeNo string) string {
return fmt.Sprintf("resource:prepare:%s", tradeNo)
}
type CreateTypeResourceDataInter interface { type CreateTypeResourceDataInter interface {
GetName() string GetName() string
GetPrice() decimal.Decimal GetPrice() decimal.Decimal

View File

@@ -10,14 +10,10 @@ import (
"log/slog" "log/slog"
"net/http" "net/http"
_ "net/http/pprof" _ "net/http/pprof"
"platform/pkg/u"
"platform/web/auth" "platform/web/auth"
g "platform/web/globals" g "platform/web/globals"
"platform/web/globals/orm"
m "platform/web/models"
q "platform/web/queries" q "platform/web/queries"
"runtime" "runtime"
"strconv"
"strings" "strings"
"time" "time"
) )
@@ -74,13 +70,13 @@ func (s *Server) Run() error {
}() }()
// listen // listen
slog.Info("Server started on :8080") slog.Info("服务开始监听 :8080")
err := s.fiber.Listen("0.0.0.0:8080") err := s.fiber.Listen("0.0.0.0:8080")
if err != nil { if err != nil {
slog.Error("Failed to start server", slog.Any("err", err)) slog.Error("Failed to start server", slog.Any("err", err))
} }
slog.Info("Server stopped") slog.Info("服务已停止")
return nil return nil
} }
@@ -121,55 +117,42 @@ func newLogger() fiber.Handler {
TimeFormat: "2006-01-02 15:04:05", TimeFormat: "2006-01-02 15:04:05",
TimeZone: "Asia/Shanghai", TimeZone: "Asia/Shanghai",
Next: func(c *fiber.Ctx) bool { Next: func(c *fiber.Ctx) bool {
c.Locals("authtype", auth.PayloadNone.ToStr()) authCtx, ok := c.Locals("auth").(*auth.Context)
c.Locals("authid", 0) if ok {
c.Locals("authtype", authCtx.Payload.Type.ToStr())
c.Locals("authid", authCtx.Payload.Id)
} else {
c.Locals("authtype", auth.PayloadNone.ToStr())
c.Locals("authid", 0)
}
return false return false
}, },
Done: func(c *fiber.Ctx, logBytes []byte) { Done: func(c *fiber.Ctx, logBytes []byte) {
go func(ip, ua, method, path string, status int, logBytes []byte) { var logStr = strings.TrimPrefix(string(logBytes), "🚀")
var logStr = strings.TrimPrefix(string(logBytes), "🚀") var logVars = strings.Split(logStr, "|")
var logVars = strings.Split(logStr, "|")
var reqTimeStr = strings.TrimSpace(logVars[0]) var reqTimeStr = strings.TrimSpace(logVars[0])
reqTime, err := time.ParseInLocation("2006-01-02 15:04:05", reqTimeStr, time.Local) reqTime, err := time.ParseInLocation("2006-01-02 15:04:05", reqTimeStr, time.Local)
if err != nil { if err != nil {
slog.Error("时间解析错误", slog.Any("err", err)) slog.Error("时间解析错误", slog.Any("err", err))
return return
} }
var authInfo = strings.Split(strings.TrimSpace(logVars[1]), " ") var latency = strings.TrimSpace(logVars[4])
var authType = auth.PayloadTypeFromStr(strings.TrimSpace(authInfo[0])) var errStr = strings.TrimSpace(logVars[5])
authID, err := strconv.Atoi(strings.TrimSpace(authInfo[1]))
if err != nil {
slog.Error("负载ID解析错误", slog.Any("err", err))
return
}
var latency = strings.TrimSpace(logVars[4]) slog.Info("接口请求",
slog.String("identity", c.Locals("authtype").(string)),
var errStr = strings.TrimSpace(logVars[5]) slog.Int("visitor", c.Locals("authid").(int)),
slog.String("ip", c.IP()),
var item = &m.LogsRequest{ slog.String("ua", c.Get("User-Agent")),
Identity: int32(authType), slog.String("method", c.Method()),
IP: ip, slog.String("path", c.Path()),
Ua: u.P(ua), slog.Int("status", c.Response().StatusCode()),
Method: method, slog.String("error", errStr),
Path: path, slog.String("latency", latency),
Latency: latency, slog.Time("time", reqTime),
Status: int32(status), )
Error: &errStr,
Time: orm.LocalDateTime(reqTime),
}
if authID != 0 {
item.Visitor = u.P(int32(authID))
}
err = q.LogsRequest.Create(item)
if err != nil {
slog.Error("日志记录错误", slog.Any("err", err))
return
}
}(c.IP(), c.Get("User-Agent"), c.Method(), c.Path(), c.Response().StatusCode(), logBytes)
}, },
}) })
} }