优化节点连接管理逻辑与日志输出,修改连接错误处理函数的返回值以确保错误处理灵活性

This commit is contained in:
2025-05-29 14:44:06 +08:00
parent 1831c792ad
commit ceb381bc9b
9 changed files with 57 additions and 48 deletions

View File

@@ -1,5 +1,9 @@
## TODO ## TODO
解决节点断开立即重连会导致 status 覆盖的问题
实现一个机制使更新数据的后项能够覆盖前项
节点每次发送心跳后提供或计算 loss 和 rtt 信息,并对比 host 和 status将需要更新的数据提交到更新队列 节点每次发送心跳后提供或计算 loss 和 rtt 信息,并对比 host 和 status将需要更新的数据提交到更新队列
### 长期 ### 长期

View File

@@ -4,6 +4,7 @@ import (
"bufio" "bufio"
"context" "context"
"encoding/binary" "encoding/binary"
"encoding/hex"
"errors" "errors"
"fmt" "fmt"
"io" "io"
@@ -99,7 +100,7 @@ func ctrl(ctx context.Context, id int32, host string) error {
for { for {
// 读取命令 // 读取命令
cmd, err := reader.ReadByte() cmd, err := reader.ReadByte()
if ok, err := utils.WarpConnErr(err); !ok { if _, err := utils.WarpConnErr(err); err != nil {
errCh <- err errCh <- err
return return
} }
@@ -136,24 +137,23 @@ func ctrl(ctx context.Context, id int32, host string) error {
} }
func data(proxy string, destination string, tag [16]byte) error { func data(proxy string, destination string, tag [16]byte) error {
slog.Debug("建立数据通道", "tag", tag, "addr", destination) var tagStr = hex.EncodeToString(tag[:])
slog.Debug("建立数据通道", "tag", tagStr, "addr", destination)
// 向目标地址建立连接 // 向目标地址建立连接
var result = 1 var result = 1
var dstErr error var dstErr error
dst, err := net.Dial("tcp", destination) dst, err := net.Dial("tcp", destination)
if err != nil { if err != nil {
dstErr = fmt.Errorf("连接目标地址失败: %w", dstErr) dstErr = ErrDstConnFailed
result = 0 result = 0
} }
defer utils.Close(dst)
// 向服务端建立连接 // 向服务端建立连接
src, err := net.Dial("tcp", proxy) src, err := net.Dial("tcp", proxy)
if err != nil { if err != nil {
return errors.New("连接服务端失败") return errors.New("连接服务端失败")
} }
defer utils.Close(src)
// 发送连接状态 // 发送连接状态
var buf = make([]byte, 17) var buf = make([]byte, 17)
@@ -168,39 +168,29 @@ func data(proxy string, destination string, tag [16]byte) error {
return dstErr return dstErr
} }
var waitSrc = make(chan error) var waitSrc = make(chan struct{}, 1)
go func() { go func() {
defer utils.Close(dst)
_, err := io.Copy(dst, src) _, err := io.Copy(dst, src)
switch { if ok, err := utils.WarpConnErr(err); !ok {
case errors.Is(err, net.ErrClosed): slog.Error("读取网关数据失败", "tag", tagStr, "err", err)
slog.Debug("网关连接意外关闭")
case err != nil:
slog.Error("读取网关数据失败", "err", err)
default:
slog.Debug("网关数据读取完成")
} }
waitSrc <- err waitSrc <- struct{}{}
}() }()
var waitDst = make(chan error) var waitDst = make(chan struct{}, 1)
go func() { go func() {
defer utils.Close(src)
_, err := io.Copy(src, dst) _, err := io.Copy(src, dst)
switch { if ok, err := utils.WarpConnErr(err); !ok {
case errors.Is(err, net.ErrClosed):
slog.Debug("目标连接意外关闭")
case err != nil && !errors.Is(err, net.ErrClosed):
slog.Error("读取目标数据失败", "err", err) slog.Error("读取目标数据失败", "err", err)
default:
slog.Debug("目标数据读取完成")
} }
waitDst <- err waitDst <- struct{}{}
}() }()
// 等待任意一方关闭数据连接 // 等待连接全部安全关闭或超时
select { <-waitSrc
case <-waitSrc: <-waitDst
case <-waitDst:
}
return nil return nil
} }
@@ -261,7 +251,10 @@ func onConn(reader io.Reader, dataAddr string) (err error) {
// 异步建立数据通道 // 异步建立数据通道
go func() { go func() {
err := data(dataAddr, addr, tag) err := data(dataAddr, addr, tag)
if err != nil { switch {
case errors.Is(err, ErrDstConnFailed):
slog.Debug("目标地址连接失败", "tag", hex.EncodeToString(tag[:]), "addr", addr)
case err != nil:
slog.Error("建立数据通道失败", "err", err) slog.Error("建立数据通道失败", "err", err)
} }
}() }()
@@ -273,3 +266,13 @@ type ConnCmd struct {
Tag [16]byte Tag [16]byte
Addr string Addr string
} }
type Err string
func (e Err) Error() string {
return string(e)
}
const (
ErrDstConnFailed = Err("目标地址连接失败")
)

2
edge/env/env.go vendored
View File

@@ -40,7 +40,7 @@ func Init() error {
if Mode == "dev" { if Mode == "dev" {
slog.SetLogLoggerLevel(slog.LevelDebug) slog.SetLogLoggerLevel(slog.LevelDebug)
} else { } else {
slog.SetLogLoggerLevel(slog.LevelWarn) slog.SetLogLoggerLevel(slog.LevelInfo)
} }
return nil return nil

2
gateway/env/env.go vendored
View File

@@ -19,7 +19,7 @@ var (
AppWebPort uint16 = 8848 AppWebPort uint16 = 8848
AppLogMode = "dev" AppLogMode = "dev"
AppExitTimeout = 5 // 等待服务停止的超时时间 AppExitTimeout = 5 // 等待服务停止的超时时间
AppDataTimeout = 10 // 等待数据通道连接的超时时间 AppDataTimeout = 20 // 等待数据通道连接的超时时间
AppUserRWTimeout = 10 // 等待用户连接读写超时时间 AppUserRWTimeout = 10 // 等待用户连接读写超时时间
AppDataRWTimeout = 10 // 等待数据通道读写超时时间 AppDataRWTimeout = 10 // 等待数据通道读写超时时间
AppCtrlRWTimeout = 10 // 等待控制通道读写超时时间 AppCtrlRWTimeout = 10 // 等待控制通道读写超时时间

View File

@@ -107,7 +107,7 @@ func processCtrlConn(_ctx context.Context, conn net.Conn) (err error) {
} }
cmd, err := reader.ReadByte() cmd, err := reader.ReadByte()
if ok, err := utils.WarpConnErr(err); !ok { if _, err := utils.WarpConnErr(err); err != nil {
errCh <- err errCh <- err
return return
} }

View File

@@ -121,7 +121,7 @@ func processDataConn(ctx context.Context, edge net.Conn) error {
_, err := io.Copy(user, reader) _, err := io.Copy(user, reader)
switch { switch {
case errors.Is(err, net.ErrClosed): case errors.Is(err, net.ErrClosed):
slog.Debug("节点连接意外关闭") slog.Warn("节点连接意外关闭")
case err != nil: case err != nil:
slog.Error("读取节点数据失败", "err", err) slog.Error("读取节点数据失败", "err", err)
default: default:
@@ -136,7 +136,7 @@ func processDataConn(ctx context.Context, edge net.Conn) error {
_, err := io.Copy(edge, teeUser) _, err := io.Copy(edge, teeUser)
switch { switch {
case errors.Is(err, net.ErrClosed): case errors.Is(err, net.ErrClosed):
slog.Debug("用户连接意外关闭") slog.Warn("用户连接意外关闭")
case err != nil: case err != nil:
slog.Error("读取用户数据失败", "err", err) slog.Error("读取用户数据失败", "err", err)
default: default:

View File

@@ -210,14 +210,15 @@ func startTask(ctx context.Context) error {
if len(updates) == 0 { if len(updates) == 0 {
continue continue
} }
err := app.Update(updates)
lock.Lock() lock.Lock()
clear(updates) err := app.Update(updates)
updates = updates[:0]
lock.Unlock()
if err != nil { if err != nil {
slog.Error("调度更新任务失败", "err", err) slog.Error("调度更新任务失败", "err", err)
} else {
clear(updates)
updates = updates[:0]
} }
lock.Unlock()
} }
} }
} }

View File

@@ -4,7 +4,6 @@ import (
"errors" "errors"
"fmt" "fmt"
"io" "io"
"log/slog"
"net" "net"
"syscall" "syscall"
) )
@@ -18,23 +17,18 @@ func WarpConnErr(err error) (bool, error) {
switch { switch {
case errors.Is(err, net.ErrClosed): case errors.Is(err, net.ErrClosed):
slog.Debug("连接已关闭") return true, errors.New("连接已关闭")
return false, nil
case errors.Is(err, io.EOF): case errors.Is(err, io.EOF):
slog.Debug("连接被对端关闭") return true, errors.New("连接已结束")
return false, nil
case errors.As(err, &opErr): case errors.As(err, &opErr):
switch { switch {
case case
errors.Is(opErr.Err, syscall.WSAECONNRESET), errors.Is(opErr.Err, syscall.WSAECONNABORTED), errors.Is(opErr.Err, syscall.WSAECONNRESET), errors.Is(opErr.Err, syscall.WSAECONNABORTED),
errors.Is(opErr.Err, syscall.ECONNRESET), errors.Is(opErr.Err, syscall.ECONNABORTED): errors.Is(opErr.Err, syscall.ECONNRESET), errors.Is(opErr.Err, syscall.ECONNABORTED):
slog.Debug("连接被对端重置") return false, errors.New("连接已重置或中止")
return false, nil
case opErr.Timeout(): case opErr.Timeout():
slog.Debug("连接已超时") return false, errors.New("连接已超时")
return false, nil
} }
} }
return false, fmt.Errorf("连接发生未处理的错误: %w", err) return false, fmt.Errorf("连接发生未处理的错误: %w", err)

View File

@@ -1,8 +1,10 @@
package utils package utils
import ( import (
"fmt"
"io" "io"
"log/slog" "log/slog"
"runtime"
) )
func ReadByte(reader io.Reader) (byte, error) { func ReadByte(reader io.Reader) (byte, error) {
@@ -32,7 +34,12 @@ func Close(v any) {
if v, ok := v.(io.Closer); ok { if v, ok := v.(io.Closer); ok {
err := v.Close() err := v.Close()
if err != nil { if err != nil {
slog.Warn("对象关闭失败", "err", err) var logger = slog.Default()
_, file, line, ok := runtime.Caller(1)
if ok {
logger = logger.With("source", fmt.Sprintf("%s:%d", file, line))
}
logger.Warn("对象关闭失败", "err", err)
} }
} }
} }