diff --git a/README.md b/README.md index 5599c2e..82bec8a 100644 --- a/README.md +++ b/README.md @@ -6,8 +6,6 @@ ProxyConn 直接实现 Conn 相同的接口,不再取出 Conn 使用 -web 服务目录结构,不要 app 那层了 - 配置退出等待时间 log 控制台颜色,输出错误堆栈 @@ -24,6 +22,8 @@ log 控制台颜色,输出错误堆栈 ### 长期 +退出顺序好像有问题,需要检查 + 实现一个 socks context 以在子组件中获取 socks 相关信息 代理端口支持混合端口转发 diff --git a/pkg/utils/chan.go b/pkg/utils/chan.go index 7215563..61f485a 100644 --- a/pkg/utils/chan.go +++ b/pkg/utils/chan.go @@ -13,7 +13,10 @@ func ChanConnAccept(ctx context.Context, ls net.Listener) chan net.Conn { go func() { for { conn, err := ls.Accept() - if err != nil && !errors.Is(err, net.ErrClosed) { + if err != nil { + if errors.Is(err, net.ErrClosed) { + return + } slog.Error("接受连接失败", err) // 临时错误重试连接 var ne net.Error diff --git a/server/fwd/auth.go b/server/fwd/auth.go index 400133b..a8ebc94 100644 --- a/server/fwd/auth.go +++ b/server/fwd/auth.go @@ -8,7 +8,7 @@ import ( "proxy-server/pkg/utils" "proxy-server/server/fwd/socks" "proxy-server/server/pkg/orm" - "proxy-server/server/web/app/models" + "proxy-server/server/web/models" "time" "github.com/pkg/errors" diff --git a/server/fwd/fwd.go b/server/fwd/fwd.go index 9e6024e..2b46fc9 100644 --- a/server/fwd/fwd.go +++ b/server/fwd/fwd.go @@ -28,6 +28,7 @@ type Service struct { ctrlConnWg utils.CountWaitGroup dataConnWg utils.CountWaitGroup + fwdLesWg utils.CountWaitGroup } func New(config *Config) *Service { @@ -43,70 +44,71 @@ func New(config *Config) *Service { userConnMap: make(map[string]socks.ProxyConn), ctrlConnWg: utils.CountWaitGroup{}, dataConnWg: utils.CountWaitGroup{}, + fwdLesWg: utils.CountWaitGroup{}, } } func (s *Service) Close() { + start := time.Now() s.cancel() - for _, conn := range s.userConnMap { - utils.Close(conn) - } - clear(s.userConnMap) + slog.Debug("退出服务", "duration", time.Since(start)) } func (s *Service) Run() { - slog.Debug("启动 fwd 服务") + slog.Info("启动 fwd 服务") errQuit := make(chan struct{}) defer close(errQuit) wg := sync.WaitGroup{} - // 启动工作协程 + // 控制通道监听 wg.Add(1) go func() { defer wg.Done() err := s.startCtrlTun() if err != nil { - slog.Error("控制通道发生错误", "err", err) + slog.Error("fwd 控制通道监听发生错误", "err", err) errQuit <- struct{}{} return } }() + // 数据通道监听 wg.Add(1) go func() { defer wg.Done() err := s.startDataTun() if err != nil { - slog.Error("数据通道发生错误", "err", err) + slog.Error("fwd 数据通道监听发生错误", "err", err) errQuit <- struct{}{} return } }() - // 等待结束 + // 等待退出 select { case <-s.ctx.Done(): - slog.Debug("服务关闭") + slog.Info("fwd 服务主动退出") case <-errQuit: - slog.Debug("服务异常退出") + slog.Warn("fwd 服务异常退出") + s.Close() } - // 退出 - s.Close() - - timeout, cancel := context.WithTimeout(context.Background(), 5*time.Second) - defer cancel() - wgCh := utils.ChanWgWait(timeout, &wg) - defer close(wgCh) - - select { - case <-timeout.Done(): - slog.Warn("关闭超时,强制关闭") - case <-wgCh: - slog.Debug("服务已退出") + // 清理资源 + for _, conn := range s.userConnMap { + utils.Close(conn) } + clear(s.userConnMap) + + s.ctrlConnWg.Wait() + slog.Debug("控制通道连接已关闭") + s.dataConnWg.Wait() + slog.Debug("数据通道连接已关闭") + s.fwdLesWg.Wait() + slog.Debug("转发服务已关闭") + wg.Wait() + slog.Info("fwd 服务已退出") } func (s *Service) startCtrlTun() error { @@ -125,15 +127,15 @@ func (s *Service) startCtrlTun() error { defer close(connCh) // 处理连接 - for loop := true; loop; { + for { select { case <-s.ctx.Done(): - slog.Debug("结束处理连接,由于上下文取消") - loop = false + slog.Debug("服务关闭 startCtrlTun") + return nil case conn, ok := <-connCh: if !ok { slog.Debug("结束处理连接,由于获取连接失败") - loop = false + return errors.New("获取连接失败") } s.ctrlConnWg.Add(1) go func() { @@ -146,26 +148,10 @@ func (s *Service) startCtrlTun() error { }() } } - - // 等待子协程结束 - timeout, cancel := context.WithTimeout(context.Background(), 5*time.Second) - defer cancel() - procCh := utils.ChanWgWait(timeout, &s.ctrlConnWg) - defer close(procCh) - - select { - case <-timeout.Done(): - slog.Warn("等待控制通道子协程结束超时") - case <-procCh: - slog.Debug("控制通道子协程结束") - } - - slog.Debug("关闭控制通道") - return nil } func (s *Service) processCtrlConn(controller net.Conn) error { - slog.Info("客户端连入", "addr", controller.RemoteAddr().String()) + slog.Debug("客户端连入", "addr", controller.RemoteAddr().String()) reader := bufio.NewReader(controller) @@ -177,7 +163,7 @@ func (s *Service) processCtrlConn(controller net.Conn) error { port := binary.BigEndian.Uint16(portBuf) // 开放转发端口 todo 混合转发 - slog.Info("开放转发端口", "port", port) + slog.Debug("开放转发端口", "port", port) proxy, err := socks.New(&socks.Config{ Name: strconv.Itoa(int(port)), Port: port, @@ -191,7 +177,9 @@ func (s *Service) processCtrlConn(controller net.Conn) error { } defer proxy.Close() + s.fwdLesWg.Add(1) go func() { + defer s.fwdLesWg.Done() err := proxy.Run() if err != nil { slog.Error("代理服务启动失败", "err", err) @@ -250,15 +238,15 @@ func (s *Service) startDataTun() error { defer close(connCh) // 处理连接 - for loop := true; loop; { + for { select { case <-s.ctx.Done(): - slog.Debug("结束处理连接,由于上下文取消") - loop = false + slog.Debug("服务关闭 startDataTun") + return nil case conn, ok := <-connCh: if !ok { slog.Debug("结束处理连接,由于获取连接失败") - loop = false + return errors.New("获取连接失败") } s.dataConnWg.Add(1) go func() { @@ -271,22 +259,6 @@ func (s *Service) startDataTun() error { }() } } - - // 等待子协程结束 todo 可配置等待时间 - timeout, cancel := context.WithTimeout(context.Background(), 5*time.Second) - defer cancel() - procCh := utils.ChanWgWait(timeout, &s.dataConnWg) - defer close(procCh) - - select { - case <-timeout.Done(): - slog.Warn("等待数据通道子协程结束超时") - case <-procCh: - slog.Debug("数据通道子协程结束") - } - - slog.Debug("关闭数据通道") - return nil } func (s *Service) processDataConn(client net.Conn) error { @@ -303,22 +275,23 @@ func (s *Service) processDataConn(client net.Conn) error { } tag := string(tagBuf) + // 找到用户连接 + var data socks.ProxyConn + var ok bool select { case <-s.ctx.Done(): return nil default: + data, ok = s.userConnMap[tag] + if !ok { + return errors.New("查找用户连接失败") + } + defer func() { + delete(s.userConnMap, tag) + utils.Close(data) + }() } - // 找到用户连接 - data, ok := s.userConnMap[tag] - if !ok { - return errors.New("查找用户连接失败") - } - defer func() { - delete(s.userConnMap, tag) - utils.Close(data) - }() - // 响应用户 user := data.Conn err = socks.SendSuccess(user, client) diff --git a/server/fwd/socks/request.go b/server/fwd/socks/request.go index e35b9fd..691f2c2 100644 --- a/server/fwd/socks/request.go +++ b/server/fwd/socks/request.go @@ -253,7 +253,13 @@ func (s *Server) handleConnect(ctx context.Context, conn net.Conn, req *Request) } slog.Info("需要向 " + req.DestAddr.Address() + " 建立连接") - s.Conn <- ProxyConn{conn, req.realDestAddr.Address()} + select { + case <-s.ctx.Done(): + if conn != nil { + utils.Close(conn) + } + case s.Conn <- ProxyConn{conn, req.realDestAddr.Address()}: + } return nil } diff --git a/server/fwd/socks/socks.go b/server/fwd/socks/socks.go index 0346dc0..cb83898 100644 --- a/server/fwd/socks/socks.go +++ b/server/fwd/socks/socks.go @@ -122,12 +122,13 @@ func (s *Server) Run() error { for loop := true; loop; { select { case <-s.ctx.Done(): - slog.Debug("服务主动停止") + slog.Debug("socks 服务主动停止") loop = false case conn, ok := <-connCh: if !ok { err = errors.New("意外错误,无法获取连接") loop = false + s.Close() break } s.wg.Add(1) @@ -141,9 +142,6 @@ func (s *Server) Run() error { }() } } - if err != nil { - s.Close() - } // 关闭服务 timeout, cancel := context.WithTimeout(context.Background(), 5*time.Second) @@ -157,10 +155,7 @@ func (s *Server) Run() error { case <-wgCh: } - if s.Conn != nil { - close(s.Conn) - } - + close(s.Conn) return err } diff --git a/server/server.go b/server/server.go index c230241..a54f202 100644 --- a/server/server.go +++ b/server/server.go @@ -37,6 +37,7 @@ func Start() { defer close(errQuit) // 启动服务 + slog.Info("启动服务") ctx, cancel := context.WithCancel(context.Background()) defer cancel() @@ -55,9 +56,9 @@ func Start() { // 等待退出信号 select { case <-osQuit: - slog.Info("服务关闭") + slog.Info("服务主动退出") case <-errQuit: - slog.Error("服务异常退出") + slog.Warn("服务异常退出") } // 退出服务 @@ -69,11 +70,13 @@ func Start() { close(wgCh) select { - case <-timeout.Done(): - slog.Warn("关闭超时,强制关闭") case <-wgCh: - slog.Debug("服务已退出") + slog.Info("服务已退出") + case <-timeout.Done(): + slog.Warn("退出超时,强制退出") } + + time.Sleep(3 * time.Second) } func initLog() { diff --git a/server/web/app/handlers/channel.go b/server/web/handlers/channel.go similarity index 98% rename from server/web/app/handlers/channel.go rename to server/web/handlers/channel.go index 05c7062..4bfff3e 100644 --- a/server/web/app/handlers/channel.go +++ b/server/web/handlers/channel.go @@ -4,7 +4,7 @@ import ( "log/slog" "proxy-server/server/pkg/orm" "proxy-server/server/pkg/resp" - "proxy-server/server/web/app/models" + "proxy-server/server/web/models" "strings" "time" diff --git a/server/web/app/handlers/node.go b/server/web/handlers/node.go similarity index 98% rename from server/web/app/handlers/node.go rename to server/web/handlers/node.go index d9a01f5..4272cb8 100644 --- a/server/web/app/handlers/node.go +++ b/server/web/handlers/node.go @@ -3,7 +3,7 @@ package handlers import ( "os" "proxy-server/server/pkg/orm" - "proxy-server/server/web/app/models" + "proxy-server/server/web/models" "github.com/gin-gonic/gin" "github.com/pkg/errors" diff --git a/server/web/app/handlers/user.go b/server/web/handlers/user.go similarity index 100% rename from server/web/app/handlers/user.go rename to server/web/handlers/user.go diff --git a/server/web/app/models/channel.go b/server/web/models/channel.go similarity index 100% rename from server/web/app/models/channel.go rename to server/web/models/channel.go diff --git a/server/web/app/models/node.go b/server/web/models/node.go similarity index 100% rename from server/web/app/models/node.go rename to server/web/models/node.go diff --git a/server/web/app/models/user-ip.go b/server/web/models/user-ip.go similarity index 100% rename from server/web/app/models/user-ip.go rename to server/web/models/user-ip.go diff --git a/server/web/app/models/user.go b/server/web/models/user.go similarity index 100% rename from server/web/app/models/user.go rename to server/web/models/user.go diff --git a/server/web/router/router.go b/server/web/router/router.go index 33f1b8f..ef77fd4 100644 --- a/server/web/router/router.go +++ b/server/web/router/router.go @@ -1,17 +1,17 @@ package router import ( - "proxy-server/server/web/app/handlers" + handlers2 "proxy-server/server/web/handlers" "github.com/gin-gonic/gin" ) func Apply(r *gin.Engine) { - r.POST("/node/register", handlers.NodeRegister) - r.POST("/node/report", handlers.NodeReport) + r.POST("/node/register", handlers2.NodeRegister) + r.POST("/node/report", handlers2.NodeReport) - r.POST("/chan/request", handlers.ChanRequest) - r.POST("/chan/auth", handlers.ChanAuth) - r.POST("/chan/test", handlers.ChanTest) + r.POST("/chan/request", handlers2.ChanRequest) + r.POST("/chan/auth", handlers2.ChanAuth) + r.POST("/chan/test", handlers2.ChanTest) }