diff --git a/README.md b/README.md index 219699f..309d7e4 100644 --- a/README.md +++ b/README.md @@ -1,6 +1,14 @@ ## TODO -- 将协议内容抽离出公共包,gateway 和 edge 节点共同调用 +节点心跳传输 geo 信息,geo 查询接口:`https://opendata.baidu.com/api.php?co=&resource_id=6006&oe=utf8&query=123.160.207.85` + +网关维护节点数据,节点发送心跳后,网关对比维护的数据,更新维护数据并将更新的部分追加到更新列表中。 + +网关定时发送更新配置 + +### 长期 + +将协议内容抽离出公共包,gateway 和 edge 节点共同调用 ## 开发相关 diff --git a/edge/edge.go b/edge/edge.go index f7202f3..83dc966 100644 --- a/edge/edge.go +++ b/edge/edge.go @@ -40,7 +40,7 @@ func Start() error { // 注册节点 slog.Debug("注册节点...") - id, host, err := report.Online(geo.Prov, geo.City, geo.Isp) + id, host, err := report.Online() if err != nil { return fmt.Errorf("注册节点失败: %w", err) } @@ -63,13 +63,6 @@ func Start() error { } } - // 下线节点 - slog.Debug("下线节点...") - err = report.Offline() - if err != nil { - slog.Error("下线节点失败", "err", err) - } - return ctx.Err() } diff --git a/edge/env/env.go b/edge/env/env.go index 4f95a41..75115ac 100644 --- a/edge/env/env.go +++ b/edge/env/env.go @@ -9,15 +9,13 @@ import ( var Mode = "dev" var Name = "dev-edge" -var EndpointOnline = "https://api.lanhuip.com/api/edge/online" -var EndpointOffline = "https://api.lanhuip.com/api/edge/offline" +var EndpointOnline = "https://api.lanhuip.com/api/edge/assign" func Init() error { var env = flag.String("e", "dev", "环境变量,可选值 dev 或 prod") var name = flag.String("n", "", "节点唯一标识") - var online = flag.String("online", "", "服务注册地址") - var offline = flag.String("offline", "", "服务注销地址") + var online = flag.String("online", "", "服务发现地址") flag.Parse() @@ -39,10 +37,6 @@ func Init() error { EndpointOnline = *online } - if offline != nil && *offline != "" { - EndpointOffline = *offline - } - if Mode == "dev" { slog.SetLogLoggerLevel(slog.LevelDebug) } else { diff --git a/edge/report/report.go b/edge/report/report.go index fb54829..87cab64 100644 --- a/edge/report/report.go +++ b/edge/report/report.go @@ -11,22 +11,8 @@ import ( "strings" ) -func Online(prov, city, isp string) (id int32, host string, err error) { - - var ispInt = 0 - switch isp { - case "电信": - ispInt = 1 - case "联通": - ispInt = 2 - case "移动": - ispInt = 3 - } - +func Online() (id int32, host string, err error) { bytes, err := json.Marshal(map[string]any{ - "prov": prov, - "city": city, - "isp": ispInt, "name": env.Name, "version": core.Version, }) @@ -39,7 +25,10 @@ func Online(prov, city, isp string) (id int32, host string, err error) { if err != nil { return 0, "", fmt.Errorf("执行请求失败: %w", err) } - defer resp.Body.Close() + defer func(Body io.ReadCloser) { + _ = Body.Close() + }(resp.Body) + if resp.StatusCode != http.StatusOK { return 0, "", errors.New("状态码: " + resp.Status) } @@ -62,24 +51,3 @@ func Online(prov, city, isp string) (id int32, host string, err error) { return respBody.Id, respBody.Host, nil } - -func Offline() error { - var bytes, err = json.Marshal(map[string]any{ - "name": env.Name, - }) - if err != nil { - return err - } - var body = strings.NewReader(string(bytes)) - - resp, err := http.Post(env.EndpointOffline, "application/json", body) - if err != nil { - return fmt.Errorf("执行请求失败: %w", err) - } - defer resp.Body.Close() - if resp.StatusCode != http.StatusOK { - return errors.New("状态码: " + resp.Status) - } - - return nil -} diff --git a/gateway/app/app.go b/gateway/app/app.go index 2a6e4e6..fe5b90d 100644 --- a/gateway/app/app.go +++ b/gateway/app/app.go @@ -1,8 +1,11 @@ package app import ( + "fmt" + "net" "proxy-server/gateway/core" "proxy-server/utils" + "sync" ) type Stoppable interface { @@ -14,25 +17,44 @@ var ( Name string PlatformSecret string // 平台密钥,验证接收的请求是否属于平台 - Assigns = core.SyncMap[uint16, int32]{} // 转发端口 -> 节点 ID - Edges = core.SyncMap[int32, uint16]{} // 节点 ID -> 转发端口 - Permits = core.SyncMap[int32, *core.Permit]{} // 转发端口 -> 权限配置 + Edges = core.SyncMap[int32, *core.Edge]{} // 节点信息表 (包外只读!数据存储有关联性,所有写入操作只在包内进行) + Assigns = core.SyncMap[uint16, int32]{} // 分配索引 (包外只读!数据存储有关联性,所有写入操作只在包内进行) + + Permits = core.SyncMap[int32, *core.Permit]{} // 节点权限表 (包外只读!数据存储有关联性,所有写入操作只在包内进行) CtrlConnWg utils.CountWaitGroup // 控制通道计数器 DataConnWg utils.CountWaitGroup // 数据通道计数器 FwdLesWg utils.CountWaitGroup // 转发监听端口计数器 UserConnWg utils.CountWaitGroup // 用户连接计数器 UserConnMap core.SyncMap[string, *core.Conn] // 用户连接暂存 + + LockPortAssign = sync.Mutex{} // 锁定端口分配,防止并发冲突 + EdgeUpdates = make(chan *core.Edge, 1000) // 节点更新通知通道 ) -func AddEdge(id int32, port uint16) { - Edges.Store(id, port) +func NewEdge(id int32, port uint16, addr *net.TCPAddr) { + var host = addr.IP.String() + var edge = &core.Edge{ + Id: id, + Host: &host, + Port: &port, + } + + // todo 查询 geo 信息 + + Edges.Store(id, edge) Assigns.Store(port, id) + EdgeUpdates <- edge } -func DelEdge(port uint16) { - id, _ := Assigns.LoadAndDelete(port) - Edges.Delete(id) +func StoreEdge(edge *core.Edge) error { + if edge == nil || edge.Id == 0 || edge.Port == nil { + return fmt.Errorf("无效的边缘节点: %+v", edge) + } + + Edges.Store(edge.Id, edge) + Assigns.Store(*edge.Port, edge.Id) + return nil } func LoadPermit(port uint16) *core.Permit { @@ -48,3 +70,7 @@ func LoadPermit(port uint16) *core.Permit { return permit } + +func StorePermit(def *core.PermitDef) { + Permits.Store(def.Id, &def.Permit) +} diff --git a/gateway/core/edge.go b/gateway/core/edge.go new file mode 100644 index 0000000..38f8d16 --- /dev/null +++ b/gateway/core/edge.go @@ -0,0 +1,18 @@ +package core + +type Edge struct { + Id int32 `json:"id"` + Host *string `json:"host,omitempty"` + Port *uint16 `json:"port,omitempty"` + Prov *string `json:"prov,omitempty"` + City *string `json:"city,omitempty"` + Isp *string `json:"isp,omitempty"` + Status *int `json:"status,omitempty"` + Rtt *int `json:"rtt,omitempty"` // 节点响应时间,单位毫秒 + Loss *int `json:"loss,omitempty"` // 节点丢包率,单位百分比 +} + +var ( + EdgeOffline = 0 + EdgeOnline = 1 +) diff --git a/gateway/fwd/ctrl.go b/gateway/fwd/ctrl.go index ab5717b..279b001 100644 --- a/gateway/fwd/ctrl.go +++ b/gateway/fwd/ctrl.go @@ -11,7 +11,6 @@ import ( "net" "proxy-server/gateway/app" "proxy-server/gateway/env" - "proxy-server/gateway/report" "proxy-server/utils" "strconv" "time" @@ -85,7 +84,14 @@ func processCtrlConn(_ctx context.Context, conn net.Conn) (err error) { ctx, cancel := context.WithCancel(_ctx) defer cancel() - var fwdPort uint16 + // 结束时清理 + var edgeId int32 + defer func() { + var edge, ok = app.Edges.Load(edgeId) + if ok { + *edge.Status = 0 + } + }() // 处理连接命令 var errCh = make(chan error) @@ -122,8 +128,8 @@ func processCtrlConn(_ctx context.Context, conn net.Conn) (err error) { errCh <- fmt.Errorf("读取节点 ID 失败: %w", err) return } - var client = int32(binary.BigEndian.Uint32(recv)) - fwdPort, err = onOpen(ctx, conn, client) + edgeId = int32(binary.BigEndian.Uint32(recv)) + err = onOpen(ctx, conn, edgeId, conn.RemoteAddr()) if err != nil { errCh <- fmt.Errorf("处理连接建立命令失败: %w", err) return @@ -159,54 +165,58 @@ func processCtrlConn(_ctx context.Context, conn net.Conn) (err error) { case err = <-errCh: } - app.DelEdge(fwdPort) return } -func onOpen(ctx context.Context, writer io.Writer, edge int32) (port uint16, err error) { - // open 命令全局只执行一次 - _, ok := app.Edges.Load(edge) - if ok { - return 0, fmt.Errorf("节点 ID %d 已经连接", edge) - } +func onOpen(ctx context.Context, writer io.Writer, edgeId int32, addr net.Addr) (err error) { + var port uint16 - // 分配端口 - var minim uint16 = 20000 - var maxim uint16 = 60000 - for i := minim; i < maxim; i++ { - var _, ok = app.Assigns.Load(i) - if !ok { - port = i - app.AddEdge(edge, port) - break + edge, ok := app.Edges.Load(edgeId) + if ok && edge.Port != nil { + port = *edge.Port + } else { + // 分配端口 + app.LockPortAssign.Lock() + + var minim uint16 = 20000 + var maxim uint16 = 60000 + for i := minim; i < maxim; i++ { + var _, ok = app.Assigns.Load(i) + if !ok { + port = i + break + } + } + if port == 0 { + return errors.New("没有可用的端口") } - } - if port == 0 { - return 0, errors.New("没有可用的端口") - } - // 报告端口分配 - if err = report.Assigned(edge, port); err != nil { - return 0, fmt.Errorf("报告端口分配失败: %w", err) - } + if tcpAddr, ok := addr.(*net.TCPAddr); ok { + app.NewEdge(edgeId, port, tcpAddr) + } else { + return fmt.Errorf("无效的地址类型: %T", addr) + } - // 响应节点 - if err = sendPong(writer); err != nil { - return 0, fmt.Errorf("响应节点失败: %w", err) + app.LockPortAssign.Unlock() } // 启动转发服务 app.FwdLesWg.Add(1) go func() { defer app.FwdLesWg.Done() - slog.Info("监听转发端口", "port", port, "edge", edge) + slog.Info("监听转发端口", "port", port, "edge", edgeId) err = ListenUser(ctx, port, writer) if err != nil { - slog.Error("监听转发端口失败", "port", port, "edge", edge, "err", err) + slog.Error("监听转发端口失败", "port", port, "edge", edgeId, "err", err) } }() - return port, nil + // 响应节点 + if err = sendPong(writer); err != nil { + return fmt.Errorf("响应节点失败: %w", err) + } + + return nil } func onPing(writer io.Writer) (err error) { diff --git a/gateway/gateway.go b/gateway/gateway.go index addbfa9..e1f4ef1 100644 --- a/gateway/gateway.go +++ b/gateway/gateway.go @@ -113,7 +113,7 @@ func (s *server) Run() (err error) { defer wg.Done() // 报告下线 slog.Debug("报告服务下线") - err = report.Offline(app.Name) + err = report.Offline() if err != nil { slog.Error("服务下线失败", "err", err) } diff --git a/gateway/report/report.go b/gateway/report/report.go index c43ecbd..bc00cf9 100644 --- a/gateway/report/report.go +++ b/gateway/report/report.go @@ -5,6 +5,7 @@ import ( "encoding/json" "fmt" "io" + "log/slog" "net/http" "proxy-server/gateway/app" "proxy-server/gateway/core" @@ -23,9 +24,10 @@ func Online(name string) (err error) { } var body struct { - Id int32 `json:"id"` - Secret string `json:"secret"` - Permits []core.PermitDef `json:"permits"` + Id int32 `json:"id"` + Secret string `json:"secret"` + Permits []*core.PermitDef `json:"permits"` + Edges []*core.Edge `json:"edges"` } err = json.Unmarshal([]byte(resp), &body) if err != nil { @@ -35,25 +37,20 @@ func Online(name string) (err error) { app.Id = body.Id app.PlatformSecret = body.Secret for _, def := range body.Permits { - app.Permits.Store(def.Id, &def.Permit) + app.StorePermit(def) + } + for _, edge := range body.Edges { + err := app.StoreEdge(edge) + if err != nil { + slog.Error("存储边缘节点失败", "err", err, "edge", edge) + } } - return nil } -func Offline(name string) (err error) { +func Offline() (err error) { _, err = call(env.EndpointOffline, map[string]any{ - "name": name, - "version": core.Version, - }) - return err -} - -func Assigned(edgeId int32, port uint16) (err error) { - _, err = call(env.EndpointAssigned, map[string]any{ - "proxy": app.Id, - "edge": edgeId, - "port": port, + "id": app.Id, }) return err } @@ -78,7 +75,10 @@ func call(endpoint string, body any) (string, error) { if err != nil { return "", err } - defer resp.Body.Close() + defer func(Body io.ReadCloser) { + _ = Body.Close() + }(resp.Body) + if resp.StatusCode != http.StatusOK { return "", fmt.Errorf("请求失败,状态码:%d", resp.StatusCode) } diff --git a/gateway/web/handlers/auth.go b/gateway/web/handlers/auth.go index e552ebb..ca1dac9 100644 --- a/gateway/web/handlers/auth.go +++ b/gateway/web/handlers/auth.go @@ -24,7 +24,7 @@ func Permit(ctx *fiber.Ctx) (err error) { // 保存授权配置 for _, permit := range *req { - app.Permits.Store(permit.Id, &permit.Permit) + app.StorePermit(&permit) } return nil diff --git a/gateway/web/handlers/info.go b/gateway/web/handlers/info.go index 38bb11c..29c4dba 100644 --- a/gateway/web/handlers/info.go +++ b/gateway/web/handlers/info.go @@ -17,7 +17,7 @@ type InfoResp struct { // Edges []EdgeResp `json:"edges"` Assigns map[uint16]int32 `json:"assigns"` - Edges map[int32]uint16 `json:"edges"` + Edges map[int32]*core.Edge `json:"edges"` Permits map[int32]*core.Permit `json:"permits"` } @@ -29,30 +29,15 @@ type EdgeResp struct { func Info(c *fiber.Ctx) error { - // var edges = make([]EdgeResp, 0) - // app.Edges.Range(func(id int32, port uint16) bool { - // permit, ok := app.Permits.Load(id) - // if !ok { - // return true - // } - // - // edges = append(edges, EdgeResp{ - // Id: id, - // Port: port, - // Permit: permit, - // }) - // return true - // }) - var assigns = make(map[uint16]int32) app.Assigns.Range(func(port uint16, id int32) bool { assigns[port] = id return true }) - var edges = make(map[int32]uint16) - app.Edges.Range(func(id int32, port uint16) bool { - edges[id] = port + var edges = make(map[int32]*core.Edge) + app.Edges.Range(func(id int32, edge *core.Edge) bool { + edges[id] = edge return true })