修改节点心跳发送失败逻辑,现在会直接退出;完善数据通道连接超时的错误处理
This commit is contained in:
18
edge/edge.go
18
edge/edge.go
@@ -78,30 +78,32 @@ func ctrl(ctx context.Context, id int32, host string) error {
|
||||
}
|
||||
|
||||
// 异步定时发送心跳
|
||||
var writeErr = make(chan error)
|
||||
go func() {
|
||||
ticker := time.NewTicker(time.Duration(core.HeartbeatInterval) * time.Second)
|
||||
defer ticker.Stop()
|
||||
ticker := time.Tick(time.Duration(core.HeartbeatInterval) * time.Second)
|
||||
for {
|
||||
select {
|
||||
case <-ctx.Done():
|
||||
writeErr <- nil
|
||||
return
|
||||
case tick := <-ticker.C:
|
||||
case _ = <-ticker:
|
||||
err := sendPing(conn)
|
||||
if err != nil {
|
||||
slog.Error("发送心跳失败", "time", tick, "err", err)
|
||||
writeErr <- fmt.Errorf("发送心跳失败: %w", err)
|
||||
return
|
||||
}
|
||||
}
|
||||
}
|
||||
}()
|
||||
|
||||
// 异步读取节点命令
|
||||
var errCh = make(chan error)
|
||||
var readErr = make(chan error)
|
||||
go func() {
|
||||
for {
|
||||
// 读取命令
|
||||
cmd, err := reader.ReadByte()
|
||||
if _, err := utils.WarpConnErr(err); err != nil {
|
||||
errCh <- err
|
||||
readErr <- err
|
||||
return
|
||||
}
|
||||
switch cmd {
|
||||
@@ -113,7 +115,7 @@ func ctrl(ctx context.Context, id int32, host string) error {
|
||||
case 5:
|
||||
err := onConn(reader, dataAddr)
|
||||
if err != nil {
|
||||
errCh <- fmt.Errorf("处理代理命令失败: %w", err)
|
||||
readErr <- fmt.Errorf("处理代理命令失败: %w", err)
|
||||
return
|
||||
}
|
||||
}
|
||||
@@ -123,7 +125,7 @@ func ctrl(ctx context.Context, id int32, host string) error {
|
||||
// 等待建立数据通道
|
||||
select {
|
||||
case <-ctx.Done():
|
||||
case err = <-errCh:
|
||||
case err = <-readErr:
|
||||
}
|
||||
|
||||
// 发送关闭连接
|
||||
|
||||
Reference in New Issue
Block a user