diff --git a/README.md b/README.md index 3d5bf59..2f15604 100644 --- a/README.md +++ b/README.md @@ -1,20 +1,18 @@ ## todo -ProxyConn 直接实现 Conn 相同的接口,不再取出 Conn 使用 +压力测试 读取 conn 时加上超时机制 代理节点超时控制 +在控制通道直接传输目标地址,客户端可以同时开始数据通道和目标地址的连接建立 + 网关根据代理节点对目标服务连接的反馈,决定向用户返回的 socks 响应 数据通道池化 -在控制通道直接传输目标地址,客户端可以同时开始数据通道和目标地址的连接建立 - -检查退出超时的问题 - -补全测试 +协程池化 ### 长期 diff --git a/server/fwd/fwd.go b/server/fwd/fwd.go index efb362d..186fd16 100644 --- a/server/fwd/fwd.go +++ b/server/fwd/fwd.go @@ -27,7 +27,6 @@ type Service struct { cancel context.CancelFunc userConnMap sync.Map - ctrlConnMap sync.Map fwdLesWg utils.CountWaitGroup ctrlConnWg utils.CountWaitGroup @@ -42,11 +41,11 @@ func New(config *Config) *Service { ctx, cancel := context.WithCancel(context.Background()) return &Service{ - Config: config, - ctx: ctx, - cancel: cancel, + Config: config, + ctx: ctx, + cancel: cancel, + userConnMap: sync.Map{}, - ctrlConnMap: sync.Map{}, fwdLesWg: utils.CountWaitGroup{}, ctrlConnWg: utils.CountWaitGroup{}, @@ -101,7 +100,6 @@ func (s *Service) Run() { } wg.Wait() - // 协程建立有先后顺序,不能乱,否则会泄露 s.dataConnWg.Wait() s.ctrlConnWg.Wait() s.fwdLesWg.Wait() @@ -115,13 +113,6 @@ func (s *Service) Run() { }) s.userConnMap.Clear() - s.ctrlConnMap.Range(func(key, value any) bool { - conn := value.(net.Conn) - utils.Close(conn) - return true - }) - s.ctrlConnMap.Clear() - s.ctrlConnWg.Wait() slog.Debug("控制通道连接已关闭") s.dataConnWg.Wait() diff --git a/server/server.go b/server/server.go index b775ead..f3a0fb6 100644 --- a/server/server.go +++ b/server/server.go @@ -38,17 +38,15 @@ func Start() { osQuit := make(chan os.Signal) signal.Notify(osQuit, os.Interrupt, syscall.SIGTERM) - errQuit := make(chan struct{}) - defer close(errQuit) - // 启动服务 slog.Info("启动服务") ctx, cancel := context.WithCancel(context.Background()) defer cancel() wg := sync.WaitGroup{} - wg.Add(1) + errQuit := make(chan struct{}, 1) + defer close(errQuit) go func() { defer wg.Done() err := startFwdServer(ctx) @@ -72,14 +70,14 @@ func Start() { timeout, cancel := context.WithTimeout(context.Background(), 5*time.Second) defer cancel() + wg.Wait() + select { case <-utils.ChanWgWait(timeout, &wg): slog.Info("服务已退出") case <-timeout.Done(): slog.Warn("退出超时,强制退出") } - - time.Sleep(3 * time.Second) } func startFwdServer(ctx context.Context) error {