diff --git a/README.md b/README.md index 309d7e4..83b8ab0 100644 --- a/README.md +++ b/README.md @@ -1,10 +1,10 @@ ## TODO -节点心跳传输 geo 信息,geo 查询接口:`https://opendata.baidu.com/api.php?co=&resource_id=6006&oe=utf8&query=123.160.207.85` +节点发送 open 请求后,如果是新节点则访问 geo 接口获取 geo 信息,并连带 host 和 port 信息提交到更新队列 -网关维护节点数据,节点发送心跳后,网关对比维护的数据,更新维护数据并将更新的部分追加到更新列表中。 +节点每次发送心跳后提供或计算 loss 和 rtt 信息,并对比 host 和 status,将需要更新的数据提交到更新队列 -网关定时发送更新配置 +网关监听并读取更新队列,定时发送更新数据到平台 ### 长期 diff --git a/edge/edge.go b/edge/edge.go index 83dc966..96b2dcd 100644 --- a/edge/edge.go +++ b/edge/edge.go @@ -79,7 +79,6 @@ func ctrl(ctx context.Context, id int32, host string) error { var reader = bufio.NewReader(conn) // 发送开启连接 - slog.Debug("发送节点连接命令") err = sendOpen(conn, id) if err != nil { return fmt.Errorf("发送节点信息失败: %w", err) @@ -108,7 +107,7 @@ func ctrl(ctx context.Context, id int32, host string) error { for { // 读取命令 cmd, err := reader.ReadByte() - if err := utils.WarpConnErr(err); err != nil { + if ok, err := utils.WarpConnErr(err); !ok { errCh <- err return } diff --git a/edge/env/env.go b/edge/env/env.go index 75115ac..bbcba58 100644 --- a/edge/env/env.go +++ b/edge/env/env.go @@ -9,13 +9,13 @@ import ( var Mode = "dev" var Name = "dev-edge" -var EndpointOnline = "https://api.lanhuip.com/api/edge/assign" +var EndpointAssign = "https://api.lanhuip.com/api/edge/assign" func Init() error { var env = flag.String("e", "dev", "环境变量,可选值 dev 或 prod") var name = flag.String("n", "", "节点唯一标识") - var online = flag.String("online", "", "服务发现地址") + var assign = flag.String("assign", "", "服务发现地址") flag.Parse() @@ -33,8 +33,8 @@ func Init() error { return errors.New("节点唯一标识不能为空") } - if online != nil && *online != "" { - EndpointOnline = *online + if assign != nil && *assign != "" { + EndpointAssign = *assign } if Mode == "dev" { diff --git a/edge/report/report.go b/edge/report/report.go index 87cab64..c81759c 100644 --- a/edge/report/report.go +++ b/edge/report/report.go @@ -21,7 +21,7 @@ func Online() (id int32, host string, err error) { } var body = strings.NewReader(string(bytes)) - resp, err := http.Post(env.EndpointOnline, "application/json", body) + resp, err := http.Post(env.EndpointAssign, "application/json", body) if err != nil { return 0, "", fmt.Errorf("执行请求失败: %w", err) } diff --git a/gateway/app/app.go b/gateway/app/app.go index fe5b90d..0c59abd 100644 --- a/gateway/app/app.go +++ b/gateway/app/app.go @@ -47,6 +47,28 @@ func NewEdge(id int32, port uint16, addr *net.TCPAddr) { EdgeUpdates <- edge } +func TryUpdateEdge(id int32, addr *net.TCPAddr) error { + if addr == nil { + return fmt.Errorf("边缘节点 %d 地址无效", id) + } + + edge, ok := Edges.Load(id) + if !ok { + return fmt.Errorf("边缘节点 %d 不存在", id) + } + + host := addr.IP.String() + if edge.Host == nil || *edge.Host != host { + edge.Host = &host + EdgeUpdates <- &core.Edge{ + Id: edge.Id, + Host: &host, + } + } + + return nil +} + func StoreEdge(edge *core.Edge) error { if edge == nil || edge.Id == 0 || edge.Port == nil { return fmt.Errorf("无效的边缘节点: %+v", edge) diff --git a/gateway/fwd/ctrl.go b/gateway/fwd/ctrl.go index 279b001..eea76f9 100644 --- a/gateway/fwd/ctrl.go +++ b/gateway/fwd/ctrl.go @@ -107,7 +107,7 @@ func processCtrlConn(_ctx context.Context, conn net.Conn) (err error) { } cmd, err := reader.ReadByte() - if err := utils.WarpConnErr(err); err != nil { + if ok, err := utils.WarpConnErr(err); !ok { errCh <- err return } @@ -169,12 +169,14 @@ func processCtrlConn(_ctx context.Context, conn net.Conn) (err error) { } func onOpen(ctx context.Context, writer io.Writer, edgeId int32, addr net.Addr) (err error) { - var port uint16 + tcpAddr, ok := addr.(*net.TCPAddr) + if !ok { + return fmt.Errorf("无效的地址类型: %T", addr) + } + var port uint16 edge, ok := app.Edges.Load(edgeId) - if ok && edge.Port != nil { - port = *edge.Port - } else { + if !ok || edge.Port == nil { // 分配端口 app.LockPortAssign.Lock() @@ -191,13 +193,16 @@ func onOpen(ctx context.Context, writer io.Writer, edgeId int32, addr net.Addr) return errors.New("没有可用的端口") } - if tcpAddr, ok := addr.(*net.TCPAddr); ok { - app.NewEdge(edgeId, port, tcpAddr) - } else { - return fmt.Errorf("无效的地址类型: %T", addr) - } + app.NewEdge(edgeId, port, tcpAddr) app.LockPortAssign.Unlock() + } else { + // 更新边缘节点地址 + port = *edge.Port + err := app.TryUpdateEdge(edgeId, tcpAddr) + if err != nil { + return fmt.Errorf("尝试更新边缘节点地址失败: %w", err) + } } // 启动转发服务 diff --git a/utils/conn.go b/utils/conn.go index 39ce9b2..066a575 100644 --- a/utils/conn.go +++ b/utils/conn.go @@ -9,9 +9,9 @@ import ( "syscall" ) -func WarpConnErr(err error) error { +func WarpConnErr(err error) (bool, error) { if err == nil { - return nil + return true, nil } var opErr *net.OpError @@ -19,11 +19,11 @@ func WarpConnErr(err error) error { case errors.Is(err, net.ErrClosed): slog.Debug("连接已关闭") - return nil + return false, nil case errors.Is(err, io.EOF): slog.Debug("连接被对端关闭") - return nil + return false, nil case errors.As(err, &opErr): switch { @@ -31,11 +31,11 @@ func WarpConnErr(err error) error { errors.Is(opErr.Err, syscall.WSAECONNRESET), errors.Is(opErr.Err, syscall.WSAECONNABORTED), errors.Is(opErr.Err, syscall.ECONNRESET), errors.Is(opErr.Err, syscall.ECONNABORTED): slog.Debug("连接被对端重置") - return nil + return false, nil case opErr.Timeout(): slog.Debug("连接已超时") - return nil + return false, nil } } - return fmt.Errorf("连接发生未处理的错误: %w", err) + return false, fmt.Errorf("连接发生未处理的错误: %w", err) }