重构代码结构与认证体系,集成异步任务消费者

This commit is contained in:
2025-11-17 18:38:10 +08:00
parent a97c970166
commit a245229bc2
70 changed files with 2000 additions and 2334 deletions

View File

@@ -1,166 +1,89 @@
package web
import (
"github.com/gofiber/fiber/v2"
"github.com/gofiber/fiber/v2/middleware/logger"
"github.com/gofiber/fiber/v2/middleware/recover"
"github.com/gofiber/fiber/v2/middleware/requestid"
"github.com/google/uuid"
"github.com/jxskiss/base62"
"context"
"fmt"
"log/slog"
"net/http"
_ "net/http/pprof"
"platform/web/auth"
g "platform/web/globals"
q "platform/web/queries"
"runtime"
"strings"
"time"
"platform/web/events"
base "platform/web/globals"
"platform/web/tasks"
"github.com/gofiber/fiber/v2"
"github.com/hibiken/asynq"
"golang.org/x/sync/errgroup"
)
// region web
func RunApp(pCtx context.Context) error {
g, ctx := errgroup.WithContext(pCtx)
type Config struct {
Listen string
}
type Server struct {
config *Config
fiber *fiber.App
}
func New(config *Config) (*Server, error) {
_config := config
if config == nil {
_config = &Config{}
// 初始化依赖
err := base.Init(ctx)
if err != nil {
return fmt.Errorf("初始化依赖失败: %w", err)
}
return &Server{
config: _config,
}, nil
// 运行服务
g.Go(func() error {
return RunWeb(ctx)
})
g.Go(func() error {
return RunTask(ctx)
})
return g.Wait()
}
func (s *Server) Run() error {
func RunWeb(ctx context.Context) error {
// inits
g.Init()
q.SetDefault(g.DB)
// config
s.fiber = fiber.New(fiber.Config{
fiber := fiber.New(fiber.Config{
ProxyHeader: fiber.HeaderXForwardedFor,
ErrorHandler: ErrorHandler,
})
// middlewares
s.fiber.Use(newRecover())
s.fiber.Use(newRequestId())
s.fiber.Use(newLogger())
ApplyMiddlewares(fiber)
ApplyRouters(fiber)
// routes
ApplyRouters(s.fiber)
// pprof
// 停止服务
go func() {
runtime.SetBlockProfileRate(1)
err := http.ListenAndServe(":6060", nil)
<-ctx.Done()
err := fiber.Shutdown()
if err != nil {
slog.Error("pprof 服务错误", slog.Any("err", err))
slog.Error("服务停止失败", "error", err)
}
}()
// listen
slog.Info("服务开始监听 :8080")
err := s.fiber.Listen("0.0.0.0:8080")
// 启动服务
slog.Info("web 服务开始监听 :8080")
err := fiber.Listen("0.0.0.0:8080")
if err != nil {
slog.Error("Failed to start server", slog.Any("err", err))
return fmt.Errorf("web 服务监听失败: %w", err)
}
slog.Info("服务已停止")
slog.Info("web 服务已停止")
return nil
}
func (s *Server) Stop() {
err := g.ExitRedis()
func RunTask(ctx context.Context) error {
var server = asynq.NewServerFromRedisClient(base.Redis, asynq.Config{})
var mux = asynq.NewServeMux()
mux.HandleFunc(events.RemoveChannel, tasks.HandleRemoveChannel)
mux.HandleFunc(events.CancelTrade, tasks.HandleCancelTrade)
// 停止服务
go func() {
<-ctx.Done()
server.Shutdown()
}()
// 启动服务
err := server.Run(mux)
if err != nil {
slog.Error("Failed to close Redis connection", slog.Any("err", err))
return fmt.Errorf("任务服务运行失败: %w", err)
}
err = g.ExitOrm()
if err != nil {
slog.Error("Failed to close database connection", slog.Any("err", err))
}
err = s.fiber.Shutdown()
if err != nil {
slog.Error("Failed to shutdown server", slog.Any("err", err))
}
return nil
}
// endregion
// region middlewares
func newRequestId() fiber.Handler {
return requestid.New(requestid.Config{
Generator: func() string {
binary, _ := uuid.New().MarshalBinary()
return base62.EncodeToString(binary)
},
})
}
func newLogger() fiber.Handler {
return logger.New(logger.Config{
DisableColors: true,
Format: "🚀 ${time} | ${locals:authtype} ${locals:authid} | ${method} ${path} | ${status} | ${latency} | ${error}\n",
TimeFormat: "2006-01-02 15:04:05",
TimeZone: "Asia/Shanghai",
Next: func(c *fiber.Ctx) bool {
authCtx, ok := c.Locals("auth").(*auth.Context)
if ok {
c.Locals("authtype", authCtx.Payload.Type.ToStr())
c.Locals("authid", authCtx.Payload.Id)
} else {
c.Locals("authtype", auth.PayloadNone.ToStr())
c.Locals("authid", 0)
}
return false
},
Done: func(c *fiber.Ctx, logBytes []byte) {
var logStr = strings.TrimPrefix(string(logBytes), "🚀")
var logVars = strings.Split(logStr, "|")
var reqTimeStr = strings.TrimSpace(logVars[0])
reqTime, err := time.ParseInLocation("2006-01-02 15:04:05", reqTimeStr, time.Local)
if err != nil {
slog.Error("时间解析错误", slog.Any("err", err))
return
}
var latency = strings.TrimSpace(logVars[4])
var errStr = strings.TrimSpace(logVars[5])
slog.Info("接口请求",
slog.String("identity", c.Locals("authtype").(string)),
slog.Int("visitor", c.Locals("authid").(int)),
slog.String("ip", c.IP()),
slog.String("ua", c.Get("User-Agent")),
slog.String("method", c.Method()),
slog.String("path", c.Path()),
slog.Int("status", c.Response().StatusCode()),
slog.String("error", errStr),
slog.String("latency", latency),
slog.Time("time", reqTime),
)
},
})
}
func newRecover() fiber.Handler {
return recover.New(recover.Config{
EnableStackTrace: true,
})
}
// endregion