diff --git a/README.md b/README.md index d9f8286..8fac2a7 100644 --- a/README.md +++ b/README.md @@ -1,5 +1,9 @@ ## TODO +解决节点断开立即重连会导致 status 覆盖的问题 + +实现一个机制使更新数据的后项能够覆盖前项 + 节点每次发送心跳后提供或计算 loss 和 rtt 信息,并对比 host 和 status,将需要更新的数据提交到更新队列 ### 长期 diff --git a/edge/edge.go b/edge/edge.go index 9c5de84..e30c3ef 100644 --- a/edge/edge.go +++ b/edge/edge.go @@ -4,6 +4,7 @@ import ( "bufio" "context" "encoding/binary" + "encoding/hex" "errors" "fmt" "io" @@ -99,7 +100,7 @@ func ctrl(ctx context.Context, id int32, host string) error { for { // 读取命令 cmd, err := reader.ReadByte() - if ok, err := utils.WarpConnErr(err); !ok { + if _, err := utils.WarpConnErr(err); err != nil { errCh <- err return } @@ -136,24 +137,23 @@ func ctrl(ctx context.Context, id int32, host string) error { } func data(proxy string, destination string, tag [16]byte) error { - slog.Debug("建立数据通道", "tag", tag, "addr", destination) + var tagStr = hex.EncodeToString(tag[:]) + slog.Debug("建立数据通道", "tag", tagStr, "addr", destination) // 向目标地址建立连接 var result = 1 var dstErr error dst, err := net.Dial("tcp", destination) if err != nil { - dstErr = fmt.Errorf("连接目标地址失败: %w", dstErr) + dstErr = ErrDstConnFailed result = 0 } - defer utils.Close(dst) // 向服务端建立连接 src, err := net.Dial("tcp", proxy) if err != nil { return errors.New("连接服务端失败") } - defer utils.Close(src) // 发送连接状态 var buf = make([]byte, 17) @@ -168,39 +168,29 @@ func data(proxy string, destination string, tag [16]byte) error { return dstErr } - var waitSrc = make(chan error) + var waitSrc = make(chan struct{}, 1) go func() { + defer utils.Close(dst) _, err := io.Copy(dst, src) - switch { - case errors.Is(err, net.ErrClosed): - slog.Debug("网关连接意外关闭") - case err != nil: - slog.Error("读取网关数据失败", "err", err) - default: - slog.Debug("网关数据读取完成") + if ok, err := utils.WarpConnErr(err); !ok { + slog.Error("读取网关数据失败", "tag", tagStr, "err", err) } - waitSrc <- err + waitSrc <- struct{}{} }() - var waitDst = make(chan error) + var waitDst = make(chan struct{}, 1) go func() { + defer utils.Close(src) _, err := io.Copy(src, dst) - switch { - case errors.Is(err, net.ErrClosed): - slog.Debug("目标连接意外关闭") - case err != nil && !errors.Is(err, net.ErrClosed): + if ok, err := utils.WarpConnErr(err); !ok { slog.Error("读取目标数据失败", "err", err) - default: - slog.Debug("目标数据读取完成") } - waitDst <- err + waitDst <- struct{}{} }() - // 等待任意一方关闭数据连接 - select { - case <-waitSrc: - case <-waitDst: - } + // 等待连接全部安全关闭或超时 + <-waitSrc + <-waitDst return nil } @@ -261,7 +251,10 @@ func onConn(reader io.Reader, dataAddr string) (err error) { // 异步建立数据通道 go func() { err := data(dataAddr, addr, tag) - if err != nil { + switch { + case errors.Is(err, ErrDstConnFailed): + slog.Debug("目标地址连接失败", "tag", hex.EncodeToString(tag[:]), "addr", addr) + case err != nil: slog.Error("建立数据通道失败", "err", err) } }() @@ -273,3 +266,13 @@ type ConnCmd struct { Tag [16]byte Addr string } + +type Err string + +func (e Err) Error() string { + return string(e) +} + +const ( + ErrDstConnFailed = Err("目标地址连接失败") +) diff --git a/edge/env/env.go b/edge/env/env.go index bbcba58..c731560 100644 --- a/edge/env/env.go +++ b/edge/env/env.go @@ -40,7 +40,7 @@ func Init() error { if Mode == "dev" { slog.SetLogLoggerLevel(slog.LevelDebug) } else { - slog.SetLogLoggerLevel(slog.LevelWarn) + slog.SetLogLoggerLevel(slog.LevelInfo) } return nil diff --git a/gateway/env/env.go b/gateway/env/env.go index 4cd3ab0..1a0fd29 100644 --- a/gateway/env/env.go +++ b/gateway/env/env.go @@ -19,7 +19,7 @@ var ( AppWebPort uint16 = 8848 AppLogMode = "dev" AppExitTimeout = 5 // 等待服务停止的超时时间 - AppDataTimeout = 10 // 等待数据通道连接的超时时间 + AppDataTimeout = 20 // 等待数据通道连接的超时时间 AppUserRWTimeout = 10 // 等待用户连接读写超时时间 AppDataRWTimeout = 10 // 等待数据通道读写超时时间 AppCtrlRWTimeout = 10 // 等待控制通道读写超时时间 diff --git a/gateway/fwd/ctrl.go b/gateway/fwd/ctrl.go index 32449cd..2ef46e1 100644 --- a/gateway/fwd/ctrl.go +++ b/gateway/fwd/ctrl.go @@ -107,7 +107,7 @@ func processCtrlConn(_ctx context.Context, conn net.Conn) (err error) { } cmd, err := reader.ReadByte() - if ok, err := utils.WarpConnErr(err); !ok { + if _, err := utils.WarpConnErr(err); err != nil { errCh <- err return } diff --git a/gateway/fwd/data.go b/gateway/fwd/data.go index 420bd75..0a3b092 100644 --- a/gateway/fwd/data.go +++ b/gateway/fwd/data.go @@ -121,7 +121,7 @@ func processDataConn(ctx context.Context, edge net.Conn) error { _, err := io.Copy(user, reader) switch { case errors.Is(err, net.ErrClosed): - slog.Debug("节点连接意外关闭") + slog.Warn("节点连接意外关闭") case err != nil: slog.Error("读取节点数据失败", "err", err) default: @@ -136,7 +136,7 @@ func processDataConn(ctx context.Context, edge net.Conn) error { _, err := io.Copy(edge, teeUser) switch { case errors.Is(err, net.ErrClosed): - slog.Debug("用户连接意外关闭") + slog.Warn("用户连接意外关闭") case err != nil: slog.Error("读取用户数据失败", "err", err) default: diff --git a/gateway/gateway.go b/gateway/gateway.go index aff9b67..e291634 100644 --- a/gateway/gateway.go +++ b/gateway/gateway.go @@ -210,14 +210,15 @@ func startTask(ctx context.Context) error { if len(updates) == 0 { continue } - err := app.Update(updates) lock.Lock() - clear(updates) - updates = updates[:0] - lock.Unlock() + err := app.Update(updates) if err != nil { slog.Error("调度更新任务失败", "err", err) + } else { + clear(updates) + updates = updates[:0] } + lock.Unlock() } } } diff --git a/utils/conn.go b/utils/conn.go index 066a575..dedb760 100644 --- a/utils/conn.go +++ b/utils/conn.go @@ -4,7 +4,6 @@ import ( "errors" "fmt" "io" - "log/slog" "net" "syscall" ) @@ -18,23 +17,18 @@ func WarpConnErr(err error) (bool, error) { switch { case errors.Is(err, net.ErrClosed): - slog.Debug("连接已关闭") - return false, nil - + return true, errors.New("连接已关闭") case errors.Is(err, io.EOF): - slog.Debug("连接被对端关闭") - return false, nil + return true, errors.New("连接已结束") case errors.As(err, &opErr): switch { case errors.Is(opErr.Err, syscall.WSAECONNRESET), errors.Is(opErr.Err, syscall.WSAECONNABORTED), errors.Is(opErr.Err, syscall.ECONNRESET), errors.Is(opErr.Err, syscall.ECONNABORTED): - slog.Debug("连接被对端重置") - return false, nil + return false, errors.New("连接已重置或中止") case opErr.Timeout(): - slog.Debug("连接已超时") - return false, nil + return false, errors.New("连接已超时") } } return false, fmt.Errorf("连接发生未处理的错误: %w", err) diff --git a/utils/utils.go b/utils/utils.go index 217e9a9..9b5818d 100644 --- a/utils/utils.go +++ b/utils/utils.go @@ -1,8 +1,10 @@ package utils import ( + "fmt" "io" "log/slog" + "runtime" ) func ReadByte(reader io.Reader) (byte, error) { @@ -32,7 +34,12 @@ func Close(v any) { if v, ok := v.(io.Closer); ok { err := v.Close() if err != nil { - slog.Warn("对象关闭失败", "err", err) + var logger = slog.Default() + _, file, line, ok := runtime.Caller(1) + if ok { + logger = logger.With("source", fmt.Sprintf("%s:%d", file, line)) + } + logger.Warn("对象关闭失败", "err", err) } } }