From f45ab3e89cf3042a0686c0a60778d63347bcc66a Mon Sep 17 00:00:00 2001 From: luorijun Date: Thu, 29 May 2025 15:22:50 +0800 Subject: [PATCH] =?UTF-8?q?=E4=BF=AE=E6=94=B9=E8=8A=82=E7=82=B9=E5=BF=83?= =?UTF-8?q?=E8=B7=B3=E5=8F=91=E9=80=81=E5=A4=B1=E8=B4=A5=E9=80=BB=E8=BE=91?= =?UTF-8?q?=EF=BC=8C=E7=8E=B0=E5=9C=A8=E4=BC=9A=E7=9B=B4=E6=8E=A5=E9=80=80?= =?UTF-8?q?=E5=87=BA=EF=BC=9B=E5=AE=8C=E5=96=84=E6=95=B0=E6=8D=AE=E9=80=9A?= =?UTF-8?q?=E9=81=93=E8=BF=9E=E6=8E=A5=E8=B6=85=E6=97=B6=E7=9A=84=E9=94=99?= =?UTF-8?q?=E8=AF=AF=E5=A4=84=E7=90=86?= MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit --- README.md | 4 ++++ edge/edge.go | 18 ++++++++++-------- gateway/fwd/data.go | 30 +++++++++--------------------- gateway/fwd/user.go | 2 +- utils/utils.go | 4 ++-- 5 files changed, 26 insertions(+), 32 deletions(-) diff --git a/README.md b/README.md index 8fac2a7..eaa9272 100644 --- a/README.md +++ b/README.md @@ -1,5 +1,9 @@ ## TODO +检查连接被提前关闭的问题 + +访问失败的也返回访问记录 + 解决节点断开立即重连会导致 status 覆盖的问题 实现一个机制使更新数据的后项能够覆盖前项 diff --git a/edge/edge.go b/edge/edge.go index e30c3ef..5c2811a 100644 --- a/edge/edge.go +++ b/edge/edge.go @@ -78,30 +78,32 @@ func ctrl(ctx context.Context, id int32, host string) error { } // 异步定时发送心跳 + var writeErr = make(chan error) go func() { - ticker := time.NewTicker(time.Duration(core.HeartbeatInterval) * time.Second) - defer ticker.Stop() + ticker := time.Tick(time.Duration(core.HeartbeatInterval) * time.Second) for { select { case <-ctx.Done(): + writeErr <- nil return - case tick := <-ticker.C: + case _ = <-ticker: err := sendPing(conn) if err != nil { - slog.Error("发送心跳失败", "time", tick, "err", err) + writeErr <- fmt.Errorf("发送心跳失败: %w", err) + return } } } }() // 异步读取节点命令 - var errCh = make(chan error) + var readErr = make(chan error) go func() { for { // 读取命令 cmd, err := reader.ReadByte() if _, err := utils.WarpConnErr(err); err != nil { - errCh <- err + readErr <- err return } switch cmd { @@ -113,7 +115,7 @@ func ctrl(ctx context.Context, id int32, host string) error { case 5: err := onConn(reader, dataAddr) if err != nil { - errCh <- fmt.Errorf("处理代理命令失败: %w", err) + readErr <- fmt.Errorf("处理代理命令失败: %w", err) return } } @@ -123,7 +125,7 @@ func ctrl(ctx context.Context, id int32, host string) error { // 等待建立数据通道 select { case <-ctx.Done(): - case err = <-errCh: + case err = <-readErr: } // 发送关闭连接 diff --git a/gateway/fwd/data.go b/gateway/fwd/data.go index 0a3b092..7f11107 100644 --- a/gateway/fwd/data.go +++ b/gateway/fwd/data.go @@ -60,10 +60,7 @@ func ListenData(ctx context.Context) error { app.DataConnWg.Add(1) go func() { defer app.DataConnWg.Done() - defer func() { - utils.Close(conn) - slog.Debug("关闭数据通道连接") - }() + defer utils.Close(conn) err := processDataConn(ctx, conn) if err != nil { slog.Error("处理数据通道连接失败", "err", err) @@ -89,12 +86,13 @@ func processDataConn(ctx context.Context, edge net.Conn) error { // 加载用户连接 user, ok := app.UserConnMap.LoadAndDelete(tag) if !ok { - return fmt.Errorf("用户连接已关闭,tag:%s", tag) + if status == 1 { + return fmt.Errorf("用户连接已关闭,tag:%s", tag) + } else { + return nil + } } - defer func() { - utils.Close(user) - slog.Debug("关闭用户连接") - }() + defer utils.Close(user) // 检查状态 if status != 1 { @@ -119,13 +117,8 @@ func processDataConn(ctx context.Context, edge net.Conn) error { var waitEdge = make(chan error) go func() { _, err := io.Copy(user, reader) - switch { - case errors.Is(err, net.ErrClosed): - slog.Warn("节点连接意外关闭") - case err != nil: + if ok, err := utils.WarpConnErr(err); !ok { slog.Error("读取节点数据失败", "err", err) - default: - slog.Debug("节点数据读取完成") } waitEdge <- err }() @@ -134,13 +127,8 @@ func processDataConn(ctx context.Context, edge net.Conn) error { var waitUser = make(chan error) go func() { _, err := io.Copy(edge, teeUser) - switch { - case errors.Is(err, net.ErrClosed): - slog.Warn("用户连接意外关闭") - case err != nil: + if ok, err := utils.WarpConnErr(err); !ok { slog.Error("读取用户数据失败", "err", err) - default: - slog.Debug("用户数据读取完成") } waitUser <- err }() diff --git a/gateway/fwd/user.go b/gateway/fwd/user.go index 70defe6..64f3569 100644 --- a/gateway/fwd/user.go +++ b/gateway/fwd/user.go @@ -85,7 +85,7 @@ func processUserConn(ctx context.Context, user *core.Conn, ctrl io.Writer) (err if ok { utils.Close(user) if errors.Is(err, context.DeadlineExceeded) { - slog.Error("用户连接超时", "tag", tag, "addr", user.RemoteAddr().String()) + slog.Warn("建立数据通道超时", "tag", tag, "addr", user.RemoteAddr().String()) } } diff --git a/utils/utils.go b/utils/utils.go index 9b5818d..e3ff899 100644 --- a/utils/utils.go +++ b/utils/utils.go @@ -31,8 +31,8 @@ func ReadBuffer(reader io.Reader, size int) ([]byte, error) { // Close 关闭对象,传入值绝对不能为 nil func Close(v any) { - if v, ok := v.(io.Closer); ok { - err := v.Close() + if c, ok := v.(io.Closer); ok { + err := c.Close() if err != nil { var logger = slog.Default() _, file, line, ok := runtime.Caller(1)