diff --git a/client/client.go b/client/client.go index b701947..130bac1 100644 --- a/client/client.go +++ b/client/client.go @@ -43,27 +43,43 @@ func Start() error { return fmt.Errorf("注册节点失败: %w", err) } - // 建立控制通道 + // 连接到网关 var ctx, cancel = signal.NotifyContext(context.Background(), os.Interrupt, os.Kill) defer cancel() + var errCh = make(chan error) go func() { for { err = ctrl(ctx, id, host) if err == nil { + errCh <- nil return } + select { case <-ctx.Done(): return default: + slog.Error("建立控制通道失败", "err", err) + slog.Info(fmt.Sprintf("%d 秒后重试", core.RetryInterval)) + } + + select { + case <-ctx.Done(): + return + case <-time.After(time.Duration(core.RetryInterval) * time.Second): } - slog.Error("建立控制通道失败", "err", err) - slog.Info(fmt.Sprintf("%d 秒后重试", core.RetryInterval)) - time.Sleep(time.Duration(core.RetryInterval) * time.Second) } }() + // 等待退出 + select { + case err := <-errCh: + if err != nil { + slog.Error("控制通道发生错误", "err", err) + } + } + // 下线节点 slog.Debug("下线节点...") err = report.Offline() diff --git a/client/report/report.go b/client/report/report.go index 4e4c9ec..44b2e20 100644 --- a/client/report/report.go +++ b/client/report/report.go @@ -69,5 +69,27 @@ func Online(prov, city, isp string) (id int32, host string, err error) { } func Offline() error { + var bytes, err = json.Marshal(map[string]any{ + "name": env.Name, + }) + if err != nil { + return err + } + var body = strings.NewReader(string(bytes)) + + req, err := http.NewRequest("POST", env.EndpointOffline, body) + if err != nil { + return fmt.Errorf("创建请求失败: %w", err) + } + + resp, err := http.DefaultClient.Do(req) + if err != nil { + return fmt.Errorf("执行请求失败: %w", err) + } + defer resp.Body.Close() + if resp.StatusCode != http.StatusOK { + return errors.New("状态码: " + resp.Status) + } + return nil } diff --git a/pkg/utils/chan.go b/pkg/utils/chan.go index ef7b4be..c2f3057 100644 --- a/pkg/utils/chan.go +++ b/pkg/utils/chan.go @@ -1,44 +1,5 @@ package utils -import ( - "context" - "log/slog" - "net" - - "errors" -) - -func ChanConnAccept(ctx context.Context, ls net.Listener) chan net.Conn { - ch := make(chan net.Conn) - go func() { - defer close(ch) - for { - conn, err := ls.Accept() - if err != nil { - if errors.Is(err, net.ErrClosed) { - return - } - // 临时错误重试连接 - var ne net.Error - if errors.As(err, &ne) && ne.Temporary() { - slog.Debug("临时错误重试") - continue - } - slog.Error("接受连接失败", err) - return - } - // ctx 取消后退出 - select { - case <-ctx.Done(): - Close(conn) - return - case ch <- conn: - } - } - }() - return ch -} - func WgWait[T WaitGroup](wg T) <-chan struct{} { ch := make(chan struct{}) go func() { diff --git a/server/fwd/ctrl.go b/server/fwd/ctrl.go index 1fa17cd..2afe5c8 100644 --- a/server/fwd/ctrl.go +++ b/server/fwd/ctrl.go @@ -38,17 +38,34 @@ func (s *Service) listenCtrl() error { defer utils.Close(ls) // 处理连接 - connCh := utils.ChanConnAccept(s.ctx, ls) + // 异步等待连接 + var connCh = make(chan net.Conn) + go func() { + for { + conn, err := ls.Accept() + if errors.Is(err, net.ErrClosed) { + slog.Debug("控制通道监听关闭") + return + } + if err != nil { + slog.Error("接受控制通道连接失败", "err", err) + return + } + select { + case connCh <- conn: + case <-s.ctx.Done(): + utils.Close(conn) + return + } + } + }() + err = nil - for loop := true; loop; { + for { select { case <-s.ctx.Done(): - loop = false - case conn, ok := <-connCh: - if !ok { - err = errors.New("获取连接失败") - loop = false - } + return nil + case conn := <-connCh: s.ctrlConnWg.Add(1) go func() { defer s.ctrlConnWg.Done() @@ -60,8 +77,6 @@ func (s *Service) listenCtrl() error { }() } } - - return err } func (s *Service) processCtrlConn(ctx context.Context, conn net.Conn) (err error) { diff --git a/server/fwd/data.go b/server/fwd/data.go index b083250..160b72d 100644 --- a/server/fwd/data.go +++ b/server/fwd/data.go @@ -2,6 +2,7 @@ package fwd import ( "bufio" + "errors" "fmt" "github.com/google/uuid" "io" @@ -14,8 +15,6 @@ import ( "strconv" "sync" "time" - - "errors" ) func (s *Service) listenData() error { @@ -29,29 +28,41 @@ func (s *Service) listenData() error { } defer utils.Close(ls) + // 异步等待连接 + var connCh = make(chan net.Conn) go func() { - <-s.ctx.Done() - utils.Close(ls) + for { + conn, err := ls.Accept() + if errors.Is(err, net.ErrClosed) { + slog.Debug("数据通道监听关闭") + return + } + if err != nil { + slog.Error("接受数据通道连接失败", "err", err) + return + } + select { + case connCh <- conn: + case <-s.ctx.Done(): + utils.Close(conn) + return + } + } }() + // 处理连接 for { - conn, err := ls.Accept() - if err != nil { - return fmt.Errorf("监听数据通道失败: %w", err) - } - select { case <-s.ctx.Done(): - utils.Close(conn) return nil - default: + case conn := <-connCh: s.dataConnWg.Add(1) go func() { defer s.dataConnWg.Done() defer utils.Close(conn) err := s.processDataConn(conn) if err != nil { - slog.Error("建立数据通道失败失败", "err", err) + slog.Error("处理数据通道连接失败", "err", err) } }() } @@ -89,6 +100,7 @@ func (s *Service) processDataConn(client net.Conn) error { userPipeReader, userPipeWriter := io.Pipe() defer utils.Close(userPipeWriter) + teeUser := io.TeeReader(user, userPipeWriter) go func() { err := analysisAndLog(user, userPipeReader) @@ -115,38 +127,40 @@ func (s *Service) processDataConn(client net.Conn) error { }() select { + case <-s.ctx.Done(): + return nil + case <-utils.WgWait(&wg): + proxy := time.Now() + + start, startOk := metrics.TimerStart.Load(user.Conn) + auth, authOk := metrics.TimerAuth.Load(user.Conn) + + var authDuration time.Duration + if startOk && authOk { + authDuration = auth.(time.Time).Sub(start.(time.Time)) + } + + var dataDuration time.Duration + if authOk { + dataDuration = data.Sub(auth.(time.Time)) + } + + proxyDuration := proxy.Sub(data) + + var totalDuration time.Duration + if startOk { + totalDuration = proxy.Sub(start.(time.Time)) + } + + debug.ConsumingCh <- debug.Consuming{ + Auth: authDuration, + Data: dataDuration, + Proxy: proxyDuration, + Total: totalDuration, + } + + return nil } - - proxy := time.Now() - - start, startOk := metrics.TimerStart.Load(user.Conn) - auth, authOk := metrics.TimerAuth.Load(user.Conn) - - var authDuration time.Duration - if startOk && authOk { - authDuration = auth.(time.Time).Sub(start.(time.Time)) - } - - var dataDuration time.Duration - if authOk { - dataDuration = data.Sub(auth.(time.Time)) - } - - proxyDuration := proxy.Sub(data) - - var totalDuration time.Duration - if startOk { - totalDuration = proxy.Sub(start.(time.Time)) - } - - debug.ConsumingCh <- debug.Consuming{ - Auth: authDuration, - Data: dataDuration, - Proxy: proxyDuration, - Total: totalDuration, - } - - return nil } diff --git a/server/fwd/fwd.go b/server/fwd/fwd.go index b0a80f2..9fa47ab 100644 --- a/server/fwd/fwd.go +++ b/server/fwd/fwd.go @@ -29,14 +29,14 @@ func New() *Service { } func (s *Service) Run() error { - slog.Info("启动 fwd 服务") + slog.Debug("启动转发服务") errQuit := make(chan struct{}, 2) defer close(errQuit) wg := sync.WaitGroup{} - // 控制通道监听 + // 控制通道 wg.Add(1) go func() { defer wg.Done() @@ -48,7 +48,7 @@ func (s *Service) Run() error { } }() - // 数据通道监听 + // 数据通道 wg.Add(1) go func() { defer wg.Done() @@ -63,26 +63,16 @@ func (s *Service) Run() error { // 等待退出 select { case <-s.ctx.Done(): - slog.Info("fwd 服务主动退出") case <-errQuit: slog.Warn("fwd 服务异常退出") s.Stop() } wg.Wait() - s.dataConnWg.Wait() - s.ctrlConnWg.Wait() s.fwdLesWg.Wait() + s.ctrlConnWg.Wait() s.userConnWg.Wait() - - s.ctrlConnWg.Wait() - slog.Debug("控制通道连接已关闭") s.dataConnWg.Wait() - slog.Debug("数据通道连接已关闭") - s.fwdLesWg.Wait() - slog.Debug("转发服务已关闭") - wg.Wait() - slog.Info("fwd 服务已退出") return nil } diff --git a/server/server.go b/server/server.go index db2c9c4..1401889 100644 --- a/server/server.go +++ b/server/server.go @@ -17,7 +17,6 @@ import ( "proxy-server/server/report" "proxy-server/server/web" "sync" - "syscall" "time" "github.com/google/uuid" @@ -49,7 +48,7 @@ func (s *server) Run() (err error) { } // 准备子服务 - ctx, cancel := context.WithCancel(context.Background()) + ctx, cancel := signal.NotifyContext(context.Background(), os.Interrupt, os.Kill) defer cancel() wg := sync.WaitGroup{} @@ -95,17 +94,16 @@ func (s *server) Run() (err error) { return fmt.Errorf("服务上线失败: %w", err) } - // 等待退出信号 - osQuit := make(chan os.Signal, 1) - signal.Notify(osQuit, os.Interrupt, syscall.SIGTERM) - select { - case <-osQuit: - slog.Info("服务主动退出") + case <-ctx.Done(): case err := <-fwdQuit: - slog.Warn("fwd 服务异常退出", "err", err) + if err != nil { + slog.Warn("fwd 服务异常退出", "err", err) + } case err := <-apiQuit: - slog.Warn("web 服务异常退出", "err", err) + if err != nil { + slog.Warn("web 服务异常退出", "err", err) + } } cancel()