package gateway import ( "context" "fmt" "log/slog" "os" "os/signal" "proxy-server/gateway/app" "proxy-server/gateway/core" "proxy-server/gateway/debug" "proxy-server/gateway/env" "proxy-server/gateway/fwd" g "proxy-server/gateway/globals" "proxy-server/gateway/log" "proxy-server/gateway/web" "proxy-server/utils" "sync" "time" "github.com/google/uuid" _ "net/http/pprof" ) func Start() (err error) { // 初始化 setup() // 恢复服务状态 err = restore() if err != nil { return err } // 准备子服务 ctx, cancel := signal.NotifyContext(context.Background(), os.Interrupt, os.Kill) defer cancel() wg := sync.WaitGroup{} // 转发服务 wg.Add(1) fwdQuit := make(chan error, 1) defer close(fwdQuit) go func() { defer wg.Done() err = startFwd(ctx) fwdQuit <- err }() // 接口服务 wg.Add(1) webQuit := make(chan error, 1) defer close(webQuit) go func() { defer wg.Done() err := startWeb(ctx) webQuit <- err }() // 调度任务服务 wg.Add(1) go func() { defer wg.Done() err := startTask(ctx) if err != nil { slog.Error("调度任务服务异常退出", "err", err) } }() // debug go func() { debug.Start(ctx) }() // 性能监控 // go func() { // runtime.SetBlockProfileRate(1) // err := http.ListenAndServe(":6060", nil) // if err != nil { // slog.Error("性能监控服务发生错误", "err", err) // } // }() // 报告上线 slog.Info("报告服务上线") err = app.Online(app.Name) if err != nil { return fmt.Errorf("服务上线失败: %w", err) } select { case <-ctx.Done(): case err := <-fwdQuit: if err != nil { slog.Warn("fwd 服务异常退出", "err", err) } case err := <-webQuit: if err != nil { slog.Warn("web 服务异常退出", "err", err) } } cancel() // 主协程退出流程 wg.Add(1) go func() { defer wg.Done() // 报告下线 slog.Debug("报告服务下线") err = app.Offline() if err != nil { slog.Error("服务下线失败", "err", err) } // 关闭服务 g.ExitRedis() g.ExitGeo() }() // 等待其它服务关闭 select { case <-utils.WgWait(&wg): case <-time.After(time.Duration(env.AppExitTimeout) * time.Second): slog.Warn("超时强制关闭") } slog.Info("服务已退出") return nil } func setup() { log.Init() env.Init() g.InitRedis() g.InitGeo() } func restore() error { var file = "proxy.lock" bytes, err := os.ReadFile(file) if err != nil { return err } if len(bytes) == 17 && bytes[0] == core.RestoreMagic { app.Name = uuid.UUID(bytes[1:]).String() slog.Info("恢复服务名称", "name", app.Name) } else { var u = uuid.New() app.Name = u.String() bytes = make([]byte, 17) bytes[0] = core.RestoreMagic copy(bytes[1:], u[:]) err := os.WriteFile(file, bytes, 0644) if err != nil { return err } slog.Info("生成服务名称", "name", app.Name) } return nil } func startFwd(ctx context.Context) error { server := fwd.New() go func() { <-ctx.Done() server.Stop() }() return server.Run() } func startWeb(ctx context.Context) error { server := web.New() go func() { <-ctx.Done() err := server.Stop() if err != nil { slog.Error("web 服务关闭发生错误", "err", err) } }() return server.Run() } func startTask(ctx context.Context) error { // 维护一个修改表,每次提交后清空,每个节点的修改在同一阶段内只有一次 var lock = sync.Mutex{} var updates = make([]*core.Edge, 0) var updatesMap = make(map[int32]*core.Edge) go func() { for data := range app.EdgeUpdates { lock.Lock() if update, ok := updatesMap[data.Id]; ok { if data.Status != update.Status { // 如果状态发生变化,则更新 updatesMap[data.Id] = data updates = append(updates, data) } } else { updatesMap[data.Id] = data updates = append(updates, data) } lock.Unlock() } }() // 每 30 秒批量提交一次更新 var scheduler = time.Tick(30 * time.Second) for { select { case <-ctx.Done(): return nil case <-scheduler: if len(updates) == 0 { continue } lock.Lock() err := app.Update(updates) if err != nil { slog.Error("调度更新任务失败", "err", err) } else { clear(updates) updates = updates[:0] } lock.Unlock() } } }