现在已注册的节点会检查并更新 host;修复一个导致断开的连接没有正常退出的问题
This commit is contained in:
@@ -1,10 +1,10 @@
|
|||||||
## TODO
|
## TODO
|
||||||
|
|
||||||
节点心跳传输 geo 信息,geo 查询接口:`https://opendata.baidu.com/api.php?co=&resource_id=6006&oe=utf8&query=123.160.207.85`
|
节点发送 open 请求后,如果是新节点则访问 geo 接口获取 geo 信息,并连带 host 和 port 信息提交到更新队列
|
||||||
|
|
||||||
网关维护节点数据,节点发送心跳后,网关对比维护的数据,更新维护数据并将更新的部分追加到更新列表中。
|
节点每次发送心跳后提供或计算 loss 和 rtt 信息,并对比 host 和 status,将需要更新的数据提交到更新队列
|
||||||
|
|
||||||
网关定时发送更新配置
|
网关监听并读取更新队列,定时发送更新数据到平台
|
||||||
|
|
||||||
### 长期
|
### 长期
|
||||||
|
|
||||||
|
|||||||
@@ -79,7 +79,6 @@ func ctrl(ctx context.Context, id int32, host string) error {
|
|||||||
var reader = bufio.NewReader(conn)
|
var reader = bufio.NewReader(conn)
|
||||||
|
|
||||||
// 发送开启连接
|
// 发送开启连接
|
||||||
slog.Debug("发送节点连接命令")
|
|
||||||
err = sendOpen(conn, id)
|
err = sendOpen(conn, id)
|
||||||
if err != nil {
|
if err != nil {
|
||||||
return fmt.Errorf("发送节点信息失败: %w", err)
|
return fmt.Errorf("发送节点信息失败: %w", err)
|
||||||
@@ -108,7 +107,7 @@ func ctrl(ctx context.Context, id int32, host string) error {
|
|||||||
for {
|
for {
|
||||||
// 读取命令
|
// 读取命令
|
||||||
cmd, err := reader.ReadByte()
|
cmd, err := reader.ReadByte()
|
||||||
if err := utils.WarpConnErr(err); err != nil {
|
if ok, err := utils.WarpConnErr(err); !ok {
|
||||||
errCh <- err
|
errCh <- err
|
||||||
return
|
return
|
||||||
}
|
}
|
||||||
|
|||||||
8
edge/env/env.go
vendored
8
edge/env/env.go
vendored
@@ -9,13 +9,13 @@ import (
|
|||||||
var Mode = "dev"
|
var Mode = "dev"
|
||||||
var Name = "dev-edge"
|
var Name = "dev-edge"
|
||||||
|
|
||||||
var EndpointOnline = "https://api.lanhuip.com/api/edge/assign"
|
var EndpointAssign = "https://api.lanhuip.com/api/edge/assign"
|
||||||
|
|
||||||
func Init() error {
|
func Init() error {
|
||||||
|
|
||||||
var env = flag.String("e", "dev", "环境变量,可选值 dev 或 prod")
|
var env = flag.String("e", "dev", "环境变量,可选值 dev 或 prod")
|
||||||
var name = flag.String("n", "", "节点唯一标识")
|
var name = flag.String("n", "", "节点唯一标识")
|
||||||
var online = flag.String("online", "", "服务发现地址")
|
var assign = flag.String("assign", "", "服务发现地址")
|
||||||
|
|
||||||
flag.Parse()
|
flag.Parse()
|
||||||
|
|
||||||
@@ -33,8 +33,8 @@ func Init() error {
|
|||||||
return errors.New("节点唯一标识不能为空")
|
return errors.New("节点唯一标识不能为空")
|
||||||
}
|
}
|
||||||
|
|
||||||
if online != nil && *online != "" {
|
if assign != nil && *assign != "" {
|
||||||
EndpointOnline = *online
|
EndpointAssign = *assign
|
||||||
}
|
}
|
||||||
|
|
||||||
if Mode == "dev" {
|
if Mode == "dev" {
|
||||||
|
|||||||
@@ -21,7 +21,7 @@ func Online() (id int32, host string, err error) {
|
|||||||
}
|
}
|
||||||
var body = strings.NewReader(string(bytes))
|
var body = strings.NewReader(string(bytes))
|
||||||
|
|
||||||
resp, err := http.Post(env.EndpointOnline, "application/json", body)
|
resp, err := http.Post(env.EndpointAssign, "application/json", body)
|
||||||
if err != nil {
|
if err != nil {
|
||||||
return 0, "", fmt.Errorf("执行请求失败: %w", err)
|
return 0, "", fmt.Errorf("执行请求失败: %w", err)
|
||||||
}
|
}
|
||||||
|
|||||||
@@ -47,6 +47,28 @@ func NewEdge(id int32, port uint16, addr *net.TCPAddr) {
|
|||||||
EdgeUpdates <- edge
|
EdgeUpdates <- edge
|
||||||
}
|
}
|
||||||
|
|
||||||
|
func TryUpdateEdge(id int32, addr *net.TCPAddr) error {
|
||||||
|
if addr == nil {
|
||||||
|
return fmt.Errorf("边缘节点 %d 地址无效", id)
|
||||||
|
}
|
||||||
|
|
||||||
|
edge, ok := Edges.Load(id)
|
||||||
|
if !ok {
|
||||||
|
return fmt.Errorf("边缘节点 %d 不存在", id)
|
||||||
|
}
|
||||||
|
|
||||||
|
host := addr.IP.String()
|
||||||
|
if edge.Host == nil || *edge.Host != host {
|
||||||
|
edge.Host = &host
|
||||||
|
EdgeUpdates <- &core.Edge{
|
||||||
|
Id: edge.Id,
|
||||||
|
Host: &host,
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
return nil
|
||||||
|
}
|
||||||
|
|
||||||
func StoreEdge(edge *core.Edge) error {
|
func StoreEdge(edge *core.Edge) error {
|
||||||
if edge == nil || edge.Id == 0 || edge.Port == nil {
|
if edge == nil || edge.Id == 0 || edge.Port == nil {
|
||||||
return fmt.Errorf("无效的边缘节点: %+v", edge)
|
return fmt.Errorf("无效的边缘节点: %+v", edge)
|
||||||
|
|||||||
@@ -107,7 +107,7 @@ func processCtrlConn(_ctx context.Context, conn net.Conn) (err error) {
|
|||||||
}
|
}
|
||||||
|
|
||||||
cmd, err := reader.ReadByte()
|
cmd, err := reader.ReadByte()
|
||||||
if err := utils.WarpConnErr(err); err != nil {
|
if ok, err := utils.WarpConnErr(err); !ok {
|
||||||
errCh <- err
|
errCh <- err
|
||||||
return
|
return
|
||||||
}
|
}
|
||||||
@@ -169,12 +169,14 @@ func processCtrlConn(_ctx context.Context, conn net.Conn) (err error) {
|
|||||||
}
|
}
|
||||||
|
|
||||||
func onOpen(ctx context.Context, writer io.Writer, edgeId int32, addr net.Addr) (err error) {
|
func onOpen(ctx context.Context, writer io.Writer, edgeId int32, addr net.Addr) (err error) {
|
||||||
var port uint16
|
tcpAddr, ok := addr.(*net.TCPAddr)
|
||||||
|
if !ok {
|
||||||
|
return fmt.Errorf("无效的地址类型: %T", addr)
|
||||||
|
}
|
||||||
|
|
||||||
|
var port uint16
|
||||||
edge, ok := app.Edges.Load(edgeId)
|
edge, ok := app.Edges.Load(edgeId)
|
||||||
if ok && edge.Port != nil {
|
if !ok || edge.Port == nil {
|
||||||
port = *edge.Port
|
|
||||||
} else {
|
|
||||||
// 分配端口
|
// 分配端口
|
||||||
app.LockPortAssign.Lock()
|
app.LockPortAssign.Lock()
|
||||||
|
|
||||||
@@ -191,13 +193,16 @@ func onOpen(ctx context.Context, writer io.Writer, edgeId int32, addr net.Addr)
|
|||||||
return errors.New("没有可用的端口")
|
return errors.New("没有可用的端口")
|
||||||
}
|
}
|
||||||
|
|
||||||
if tcpAddr, ok := addr.(*net.TCPAddr); ok {
|
app.NewEdge(edgeId, port, tcpAddr)
|
||||||
app.NewEdge(edgeId, port, tcpAddr)
|
|
||||||
} else {
|
|
||||||
return fmt.Errorf("无效的地址类型: %T", addr)
|
|
||||||
}
|
|
||||||
|
|
||||||
app.LockPortAssign.Unlock()
|
app.LockPortAssign.Unlock()
|
||||||
|
} else {
|
||||||
|
// 更新边缘节点地址
|
||||||
|
port = *edge.Port
|
||||||
|
err := app.TryUpdateEdge(edgeId, tcpAddr)
|
||||||
|
if err != nil {
|
||||||
|
return fmt.Errorf("尝试更新边缘节点地址失败: %w", err)
|
||||||
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
// 启动转发服务
|
// 启动转发服务
|
||||||
|
|||||||
@@ -9,9 +9,9 @@ import (
|
|||||||
"syscall"
|
"syscall"
|
||||||
)
|
)
|
||||||
|
|
||||||
func WarpConnErr(err error) error {
|
func WarpConnErr(err error) (bool, error) {
|
||||||
if err == nil {
|
if err == nil {
|
||||||
return nil
|
return true, nil
|
||||||
}
|
}
|
||||||
|
|
||||||
var opErr *net.OpError
|
var opErr *net.OpError
|
||||||
@@ -19,11 +19,11 @@ func WarpConnErr(err error) error {
|
|||||||
|
|
||||||
case errors.Is(err, net.ErrClosed):
|
case errors.Is(err, net.ErrClosed):
|
||||||
slog.Debug("连接已关闭")
|
slog.Debug("连接已关闭")
|
||||||
return nil
|
return false, nil
|
||||||
|
|
||||||
case errors.Is(err, io.EOF):
|
case errors.Is(err, io.EOF):
|
||||||
slog.Debug("连接被对端关闭")
|
slog.Debug("连接被对端关闭")
|
||||||
return nil
|
return false, nil
|
||||||
|
|
||||||
case errors.As(err, &opErr):
|
case errors.As(err, &opErr):
|
||||||
switch {
|
switch {
|
||||||
@@ -31,11 +31,11 @@ func WarpConnErr(err error) error {
|
|||||||
errors.Is(opErr.Err, syscall.WSAECONNRESET), errors.Is(opErr.Err, syscall.WSAECONNABORTED),
|
errors.Is(opErr.Err, syscall.WSAECONNRESET), errors.Is(opErr.Err, syscall.WSAECONNABORTED),
|
||||||
errors.Is(opErr.Err, syscall.ECONNRESET), errors.Is(opErr.Err, syscall.ECONNABORTED):
|
errors.Is(opErr.Err, syscall.ECONNRESET), errors.Is(opErr.Err, syscall.ECONNABORTED):
|
||||||
slog.Debug("连接被对端重置")
|
slog.Debug("连接被对端重置")
|
||||||
return nil
|
return false, nil
|
||||||
case opErr.Timeout():
|
case opErr.Timeout():
|
||||||
slog.Debug("连接已超时")
|
slog.Debug("连接已超时")
|
||||||
return nil
|
return false, nil
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
return fmt.Errorf("连接发生未处理的错误: %w", err)
|
return false, fmt.Errorf("连接发生未处理的错误: %w", err)
|
||||||
}
|
}
|
||||||
|
|||||||
Reference in New Issue
Block a user