package fwd import ( "context" "log/slog" "proxy-server/pkg/utils" "proxy-server/server/fwd/core" "sync" ) type Config struct { Id *int32 } type Service struct { Config *Config ctx context.Context cancel context.CancelFunc userConnMap core.ConnMap fwdLesWg utils.CountWaitGroup ctrlConnWg utils.CountWaitGroup dataConnWg utils.CountWaitGroup userConnWg utils.CountWaitGroup fwdPortMap map[uint16]int32 // 转发端口映射,key 为端口号,value 为边缘节点 ID } func New(config *Config) *Service { if config == nil { config = &Config{} } ctx, cancel := context.WithCancel(context.Background()) return &Service{ Config: config, ctx: ctx, cancel: cancel, fwdPortMap: make(map[uint16]int32), } } func (s *Service) Run() error { slog.Info("启动 fwd 服务") errQuit := make(chan struct{}, 2) defer close(errQuit) wg := sync.WaitGroup{} // 控制通道监听 wg.Add(1) go func() { defer wg.Done() err := s.startCtrlTun() if err != nil { slog.Error("fwd 控制通道监听发生错误", "err", err) errQuit <- struct{}{} return } }() // 数据通道监听 wg.Add(1) go func() { defer wg.Done() err := s.startDataTun() if err != nil { slog.Error("fwd 数据通道监听发生错误", "err", err) errQuit <- struct{}{} return } }() // 等待退出 select { case <-s.ctx.Done(): slog.Info("fwd 服务主动退出") case <-errQuit: slog.Warn("fwd 服务异常退出") s.Stop() } wg.Wait() s.dataConnWg.Wait() s.ctrlConnWg.Wait() s.fwdLesWg.Wait() s.userConnWg.Wait() // 清理资源 s.userConnMap.Range(func(key string, value *core.Conn) bool { utils.Close(value) return true }) s.userConnMap.Clear() s.ctrlConnWg.Wait() slog.Debug("控制通道连接已关闭") s.dataConnWg.Wait() slog.Debug("数据通道连接已关闭") s.fwdLesWg.Wait() slog.Debug("转发服务已关闭") wg.Wait() slog.Info("fwd 服务已退出") return nil } func (s *Service) Stop() { s.cancel() }