diff --git a/README.md b/README.md index 2c088ae..dfaf39d 100644 --- a/README.md +++ b/README.md @@ -2,12 +2,10 @@ - [x] 节点发送 open 请求后,如果是新节点则访问 geo 接口获取 geo 信息,并连带 host 和 port 信息提交到更新队列 - [x] open 命令更新 status +- [x] app 需要提供节点下线更新函数,节点断联后调用下线函数更新 status 网关监听并读取更新队列,定时发送更新数据到平台 - -app 需要提供节点下线更新函数,节点断联后调用下线函数更新 status - 节点每次发送心跳后提供或计算 loss 和 rtt 信息,并对比 host 和 status,将需要更新的数据提交到更新队列 ### 长期 diff --git a/gateway/app/app.go b/gateway/app/app.go index bb2f864..738c0e1 100644 --- a/gateway/app/app.go +++ b/gateway/app/app.go @@ -28,11 +28,11 @@ var ( UserConnWg utils.CountWaitGroup // 用户连接计数器 UserConnMap core.SyncMap[string, *core.Conn] // 用户连接暂存 - LockPortAssign = sync.Mutex{} // 锁定端口分配,防止并发冲突 - EdgeUpdates = make(chan *core.Edge, 1000) // 节点更新通知通道 + LockPortAssign = sync.Mutex{} // 锁定端口分配,防止并发冲突 + EdgeUpdates = make(chan *core.Edge, 100) // 节点更新通知通道 ) -func NewEdge(id int32, port uint16, addr *net.TCPAddr) error { +func OnlineEdgeNew(id int32, port uint16, addr *net.TCPAddr) error { if addr == nil { return fmt.Errorf("边缘节点 %d 地址无效", id) } @@ -59,7 +59,7 @@ func NewEdge(id int32, port uint16, addr *net.TCPAddr) error { return nil } -func TryUpdateEdge(id int32, addr *net.TCPAddr) error { +func OnlineEdgeUpdate(id int32, addr *net.TCPAddr) error { if addr == nil { return fmt.Errorf("边缘节点 %d 地址无效", id) } @@ -85,6 +85,21 @@ func TryUpdateEdge(id int32, addr *net.TCPAddr) error { return nil } +func OfflineEdge(id int32) error { + edge, ok := Edges.Load(id) + if !ok || edge == nil { + return fmt.Errorf("边缘节点 %d 不存在", id) + } + + *edge.Status = core.EdgeOffline + EdgeUpdates <- &core.Edge{ + Id: id, + Status: &core.EdgeOffline, + } + + return nil +} + func StoreEdge(edge *core.Edge) error { if edge == nil || edge.Id == 0 || edge.Port == nil { return fmt.Errorf("无效的边缘节点: %+v", edge) diff --git a/gateway/fwd/ctrl.go b/gateway/fwd/ctrl.go index 12dca69..32449cd 100644 --- a/gateway/fwd/ctrl.go +++ b/gateway/fwd/ctrl.go @@ -87,9 +87,9 @@ func processCtrlConn(_ctx context.Context, conn net.Conn) (err error) { // 结束时清理 var edgeId int32 defer func() { - var edge, ok = app.Edges.Load(edgeId) - if ok { - *edge.Status = 0 + err := app.OfflineEdge(edgeId) + if err != nil { + slog.Error("管理节点下线失败", "err", err) } }() @@ -193,7 +193,7 @@ func onOpen(ctx context.Context, writer io.Writer, edgeId int32, addr net.Addr) return errors.New("没有可用的端口") } - err := app.NewEdge(edgeId, port, tcpAddr) + err := app.OnlineEdgeNew(edgeId, port, tcpAddr) if err != nil { return fmt.Errorf("新增边缘节点失败:%w", err) } @@ -202,7 +202,7 @@ func onOpen(ctx context.Context, writer io.Writer, edgeId int32, addr net.Addr) } else { // 更新边缘节点地址 port = *edge.Port - err := app.TryUpdateEdge(edgeId, tcpAddr) + err := app.OnlineEdgeUpdate(edgeId, tcpAddr) if err != nil { return fmt.Errorf("尝试更新边缘节点失败: %w", err) }