新增节点下线更新函数,节点断联后提交节点状态更新;重命名边缘节点相关函数以提高可读性;调整节点更新通知通道大小
This commit is contained in:
@@ -2,12 +2,10 @@
|
||||
|
||||
- [x] 节点发送 open 请求后,如果是新节点则访问 geo 接口获取 geo 信息,并连带 host 和 port 信息提交到更新队列
|
||||
- [x] open 命令更新 status
|
||||
- [x] app 需要提供节点下线更新函数,节点断联后调用下线函数更新 status
|
||||
|
||||
网关监听并读取更新队列,定时发送更新数据到平台
|
||||
|
||||
|
||||
app 需要提供节点下线更新函数,节点断联后调用下线函数更新 status
|
||||
|
||||
节点每次发送心跳后提供或计算 loss 和 rtt 信息,并对比 host 和 status,将需要更新的数据提交到更新队列
|
||||
|
||||
### 长期
|
||||
|
||||
@@ -28,11 +28,11 @@ var (
|
||||
UserConnWg utils.CountWaitGroup // 用户连接计数器
|
||||
UserConnMap core.SyncMap[string, *core.Conn] // 用户连接暂存
|
||||
|
||||
LockPortAssign = sync.Mutex{} // 锁定端口分配,防止并发冲突
|
||||
EdgeUpdates = make(chan *core.Edge, 1000) // 节点更新通知通道
|
||||
LockPortAssign = sync.Mutex{} // 锁定端口分配,防止并发冲突
|
||||
EdgeUpdates = make(chan *core.Edge, 100) // 节点更新通知通道
|
||||
)
|
||||
|
||||
func NewEdge(id int32, port uint16, addr *net.TCPAddr) error {
|
||||
func OnlineEdgeNew(id int32, port uint16, addr *net.TCPAddr) error {
|
||||
if addr == nil {
|
||||
return fmt.Errorf("边缘节点 %d 地址无效", id)
|
||||
}
|
||||
@@ -59,7 +59,7 @@ func NewEdge(id int32, port uint16, addr *net.TCPAddr) error {
|
||||
return nil
|
||||
}
|
||||
|
||||
func TryUpdateEdge(id int32, addr *net.TCPAddr) error {
|
||||
func OnlineEdgeUpdate(id int32, addr *net.TCPAddr) error {
|
||||
if addr == nil {
|
||||
return fmt.Errorf("边缘节点 %d 地址无效", id)
|
||||
}
|
||||
@@ -85,6 +85,21 @@ func TryUpdateEdge(id int32, addr *net.TCPAddr) error {
|
||||
return nil
|
||||
}
|
||||
|
||||
func OfflineEdge(id int32) error {
|
||||
edge, ok := Edges.Load(id)
|
||||
if !ok || edge == nil {
|
||||
return fmt.Errorf("边缘节点 %d 不存在", id)
|
||||
}
|
||||
|
||||
*edge.Status = core.EdgeOffline
|
||||
EdgeUpdates <- &core.Edge{
|
||||
Id: id,
|
||||
Status: &core.EdgeOffline,
|
||||
}
|
||||
|
||||
return nil
|
||||
}
|
||||
|
||||
func StoreEdge(edge *core.Edge) error {
|
||||
if edge == nil || edge.Id == 0 || edge.Port == nil {
|
||||
return fmt.Errorf("无效的边缘节点: %+v", edge)
|
||||
|
||||
@@ -87,9 +87,9 @@ func processCtrlConn(_ctx context.Context, conn net.Conn) (err error) {
|
||||
// 结束时清理
|
||||
var edgeId int32
|
||||
defer func() {
|
||||
var edge, ok = app.Edges.Load(edgeId)
|
||||
if ok {
|
||||
*edge.Status = 0
|
||||
err := app.OfflineEdge(edgeId)
|
||||
if err != nil {
|
||||
slog.Error("管理节点下线失败", "err", err)
|
||||
}
|
||||
}()
|
||||
|
||||
@@ -193,7 +193,7 @@ func onOpen(ctx context.Context, writer io.Writer, edgeId int32, addr net.Addr)
|
||||
return errors.New("没有可用的端口")
|
||||
}
|
||||
|
||||
err := app.NewEdge(edgeId, port, tcpAddr)
|
||||
err := app.OnlineEdgeNew(edgeId, port, tcpAddr)
|
||||
if err != nil {
|
||||
return fmt.Errorf("新增边缘节点失败:%w", err)
|
||||
}
|
||||
@@ -202,7 +202,7 @@ func onOpen(ctx context.Context, writer io.Writer, edgeId int32, addr net.Addr)
|
||||
} else {
|
||||
// 更新边缘节点地址
|
||||
port = *edge.Port
|
||||
err := app.TryUpdateEdge(edgeId, tcpAddr)
|
||||
err := app.OnlineEdgeUpdate(edgeId, tcpAddr)
|
||||
if err != nil {
|
||||
return fmt.Errorf("尝试更新边缘节点失败: %w", err)
|
||||
}
|
||||
|
||||
Reference in New Issue
Block a user