From 5b5b67429305809df4954872454566a9a60c7a85 Mon Sep 17 00:00:00 2001 From: luorijun Date: Wed, 28 May 2025 14:41:11 +0800 Subject: [PATCH] =?UTF-8?q?=E6=96=B0=E5=A2=9E=E8=8A=82=E7=82=B9=E4=B8=8B?= =?UTF-8?q?=E7=BA=BF=E6=9B=B4=E6=96=B0=E5=87=BD=E6=95=B0=EF=BC=8C=E8=8A=82?= =?UTF-8?q?=E7=82=B9=E6=96=AD=E8=81=94=E5=90=8E=E6=8F=90=E4=BA=A4=E8=8A=82?= =?UTF-8?q?=E7=82=B9=E7=8A=B6=E6=80=81=E6=9B=B4=E6=96=B0=EF=BC=9B=E9=87=8D?= =?UTF-8?q?=E5=91=BD=E5=90=8D=E8=BE=B9=E7=BC=98=E8=8A=82=E7=82=B9=E7=9B=B8?= =?UTF-8?q?=E5=85=B3=E5=87=BD=E6=95=B0=E4=BB=A5=E6=8F=90=E9=AB=98=E5=8F=AF?= =?UTF-8?q?=E8=AF=BB=E6=80=A7=EF=BC=9B=E8=B0=83=E6=95=B4=E8=8A=82=E7=82=B9?= =?UTF-8?q?=E6=9B=B4=E6=96=B0=E9=80=9A=E7=9F=A5=E9=80=9A=E9=81=93=E5=A4=A7?= =?UTF-8?q?=E5=B0=8F?= MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit --- README.md | 4 +--- gateway/app/app.go | 23 +++++++++++++++++++---- gateway/fwd/ctrl.go | 10 +++++----- 3 files changed, 25 insertions(+), 12 deletions(-) diff --git a/README.md b/README.md index 2c088ae..dfaf39d 100644 --- a/README.md +++ b/README.md @@ -2,12 +2,10 @@ - [x] 节点发送 open 请求后,如果是新节点则访问 geo 接口获取 geo 信息,并连带 host 和 port 信息提交到更新队列 - [x] open 命令更新 status +- [x] app 需要提供节点下线更新函数,节点断联后调用下线函数更新 status 网关监听并读取更新队列,定时发送更新数据到平台 - -app 需要提供节点下线更新函数,节点断联后调用下线函数更新 status - 节点每次发送心跳后提供或计算 loss 和 rtt 信息,并对比 host 和 status,将需要更新的数据提交到更新队列 ### 长期 diff --git a/gateway/app/app.go b/gateway/app/app.go index bb2f864..738c0e1 100644 --- a/gateway/app/app.go +++ b/gateway/app/app.go @@ -28,11 +28,11 @@ var ( UserConnWg utils.CountWaitGroup // 用户连接计数器 UserConnMap core.SyncMap[string, *core.Conn] // 用户连接暂存 - LockPortAssign = sync.Mutex{} // 锁定端口分配,防止并发冲突 - EdgeUpdates = make(chan *core.Edge, 1000) // 节点更新通知通道 + LockPortAssign = sync.Mutex{} // 锁定端口分配,防止并发冲突 + EdgeUpdates = make(chan *core.Edge, 100) // 节点更新通知通道 ) -func NewEdge(id int32, port uint16, addr *net.TCPAddr) error { +func OnlineEdgeNew(id int32, port uint16, addr *net.TCPAddr) error { if addr == nil { return fmt.Errorf("边缘节点 %d 地址无效", id) } @@ -59,7 +59,7 @@ func NewEdge(id int32, port uint16, addr *net.TCPAddr) error { return nil } -func TryUpdateEdge(id int32, addr *net.TCPAddr) error { +func OnlineEdgeUpdate(id int32, addr *net.TCPAddr) error { if addr == nil { return fmt.Errorf("边缘节点 %d 地址无效", id) } @@ -85,6 +85,21 @@ func TryUpdateEdge(id int32, addr *net.TCPAddr) error { return nil } +func OfflineEdge(id int32) error { + edge, ok := Edges.Load(id) + if !ok || edge == nil { + return fmt.Errorf("边缘节点 %d 不存在", id) + } + + *edge.Status = core.EdgeOffline + EdgeUpdates <- &core.Edge{ + Id: id, + Status: &core.EdgeOffline, + } + + return nil +} + func StoreEdge(edge *core.Edge) error { if edge == nil || edge.Id == 0 || edge.Port == nil { return fmt.Errorf("无效的边缘节点: %+v", edge) diff --git a/gateway/fwd/ctrl.go b/gateway/fwd/ctrl.go index 12dca69..32449cd 100644 --- a/gateway/fwd/ctrl.go +++ b/gateway/fwd/ctrl.go @@ -87,9 +87,9 @@ func processCtrlConn(_ctx context.Context, conn net.Conn) (err error) { // 结束时清理 var edgeId int32 defer func() { - var edge, ok = app.Edges.Load(edgeId) - if ok { - *edge.Status = 0 + err := app.OfflineEdge(edgeId) + if err != nil { + slog.Error("管理节点下线失败", "err", err) } }() @@ -193,7 +193,7 @@ func onOpen(ctx context.Context, writer io.Writer, edgeId int32, addr net.Addr) return errors.New("没有可用的端口") } - err := app.NewEdge(edgeId, port, tcpAddr) + err := app.OnlineEdgeNew(edgeId, port, tcpAddr) if err != nil { return fmt.Errorf("新增边缘节点失败:%w", err) } @@ -202,7 +202,7 @@ func onOpen(ctx context.Context, writer io.Writer, edgeId int32, addr net.Addr) } else { // 更新边缘节点地址 port = *edge.Port - err := app.TryUpdateEdge(edgeId, tcpAddr) + err := app.OnlineEdgeUpdate(edgeId, tcpAddr) if err != nil { return fmt.Errorf("尝试更新边缘节点失败: %w", err) }