From 1831c792ad563a4a6503a6046e59c7f0cbfdce04 Mon Sep 17 00:00:00 2001 From: luorijun Date: Wed, 28 May 2025 16:12:50 +0800 Subject: [PATCH] =?UTF-8?q?=E5=AE=9E=E7=8E=B0=E7=BD=91=E5=85=B3=E7=9B=91?= =?UTF-8?q?=E5=90=AC=E5=B9=B6=E8=AF=BB=E5=8F=96=E6=9B=B4=E6=96=B0=E9=98=9F?= =?UTF-8?q?=E5=88=97=EF=BC=8C=E5=AE=9A=E6=97=B6=E5=8F=91=E9=80=81=E6=9B=B4?= =?UTF-8?q?=E6=96=B0=E6=95=B0=E6=8D=AE=E5=88=B0=E5=B9=B3=E5=8F=B0?= MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit --- README.md | 8 +-- cmd/gateway/main.go | 3 +- gateway/app/app.go | 8 ++- gateway/app/geo.go | 56 +++++------------- gateway/{report => app}/report.go | 24 +++++--- gateway/env/env.go | 13 +++-- gateway/gateway.go | 95 +++++++++++++++++++------------ 7 files changed, 106 insertions(+), 101 deletions(-) rename gateway/{report => app}/report.go (83%) diff --git a/README.md b/README.md index dfaf39d..d9f8286 100644 --- a/README.md +++ b/README.md @@ -1,15 +1,11 @@ ## TODO -- [x] 节点发送 open 请求后,如果是新节点则访问 geo 接口获取 geo 信息,并连带 host 和 port 信息提交到更新队列 -- [x] open 命令更新 status -- [x] app 需要提供节点下线更新函数,节点断联后调用下线函数更新 status - -网关监听并读取更新队列,定时发送更新数据到平台 - 节点每次发送心跳后提供或计算 loss 和 rtt 信息,并对比 host 和 status,将需要更新的数据提交到更新队列 ### 长期 +实现一个接口以手动全量同步数据库中的节点信息,防止网关节点数据与数据库出现偏差 + 将协议内容抽离出公共包,gateway 和 edge 节点共同调用 ## 开发相关 diff --git a/cmd/gateway/main.go b/cmd/gateway/main.go index 9e35e44..47942ba 100644 --- a/cmd/gateway/main.go +++ b/cmd/gateway/main.go @@ -5,8 +5,7 @@ import ( ) func main() { - var app = gateway.New() - var err = app.Run() + var err = gateway.Start() if err != nil { println(err.Error()) } diff --git a/gateway/app/app.go b/gateway/app/app.go index 738c0e1..443cf73 100644 --- a/gateway/app/app.go +++ b/gateway/app/app.go @@ -91,7 +91,7 @@ func OfflineEdge(id int32) error { return fmt.Errorf("边缘节点 %d 不存在", id) } - *edge.Status = core.EdgeOffline + edge.Status = &core.EdgeOffline EdgeUpdates <- &core.Edge{ Id: id, Status: &core.EdgeOffline, @@ -101,12 +101,14 @@ func OfflineEdge(id int32) error { } func StoreEdge(edge *core.Edge) error { - if edge == nil || edge.Id == 0 || edge.Port == nil { + if edge == nil || edge.Id == 0 { return fmt.Errorf("无效的边缘节点: %+v", edge) } Edges.Store(edge.Id, edge) - Assigns.Store(*edge.Port, edge.Id) + if edge.Port != nil { + Assigns.Store(*edge.Port, edge.Id) + } return nil } diff --git a/gateway/app/geo.go b/gateway/app/geo.go index 34a5d18..8b9a3f0 100644 --- a/gateway/app/geo.go +++ b/gateway/app/geo.go @@ -1,18 +1,13 @@ package app import ( - "encoding/json" "fmt" - "io" "log/slog" "net" - "net/http" g "proxy-server/gateway/globals" "strings" ) -const base = "https://whois.pconline.com.cn/ipJson.jsp?json=true&ip=" - func IpGeo(ip net.IP) (info *IpGeoInfo, err error) { defer func() { var rs = recover() @@ -26,50 +21,29 @@ func IpGeo(ip net.IP) (info *IpGeoInfo, err error) { if err != nil { return nil, err } + if str != "" { slog.Debug("本地解析归属地结果", "str", str) values := strings.Split(str, "|") if len(values) != 5 { return nil, fmt.Errorf("本地归属地查询解析失败") } - return &IpGeoInfo{ - Prov: values[2], - City: values[3], - Isp: values[4], - }, nil + + var info = &IpGeoInfo{} + + if values[2] != "0" { + info.Prov = values[2] + } + if values[3] != "0" { + info.City = values[3] + } + if values[4] != "0" { + info.Isp = values[4] + } + return info, nil } - // 归属地查询 - var url = base + ip.String() - resp, err := http.Get(url) - if err != nil { - return nil, fmt.Errorf("查询归属地失败: %w", err) - } - if resp.StatusCode != http.StatusOK { - return nil, fmt.Errorf("查询归属地失败: %s", resp.Status) - } - defer func(Body io.ReadCloser) { - _ = Body.Close() - }(resp.Body) - - // 返回节点信息 - var bytes []byte - _, err = io.ReadFull(resp.Body, bytes) - if err != nil { - return nil, fmt.Errorf("读取归属地响应失败: %w", err) - } - - var body = make(map[string]any) - err = json.Unmarshal(bytes, &body) - if err != nil { - return nil, fmt.Errorf("解析归属地响应失败: %w", err) - } - - return &IpGeoInfo{ - Prov: body["pro"].(string), - City: body["city"].(string), - Isp: strings.Split(body["addr"].(string), " ")[1], - }, nil + return nil, fmt.Errorf("本地归属地查询为空") } type IpGeoInfo struct { diff --git a/gateway/report/report.go b/gateway/app/report.go similarity index 83% rename from gateway/report/report.go rename to gateway/app/report.go index bc00cf9..034f958 100644 --- a/gateway/report/report.go +++ b/gateway/app/report.go @@ -1,4 +1,4 @@ -package report +package app import ( "encoding/base64" @@ -7,7 +7,6 @@ import ( "io" "log/slog" "net/http" - "proxy-server/gateway/app" "proxy-server/gateway/core" "proxy-server/gateway/env" "strings" @@ -34,13 +33,13 @@ func Online(name string) (err error) { return err } - app.Id = body.Id - app.PlatformSecret = body.Secret + Id = body.Id + PlatformSecret = body.Secret for _, def := range body.Permits { - app.StorePermit(def) + StorePermit(def) } for _, edge := range body.Edges { - err := app.StoreEdge(edge) + err := StoreEdge(edge) if err != nil { slog.Error("存储边缘节点失败", "err", err, "edge", edge) } @@ -50,11 +49,22 @@ func Online(name string) (err error) { func Offline() (err error) { _, err = call(env.EndpointOffline, map[string]any{ - "id": app.Id, + "id": Id, }) return err } +func Update(data []*core.Edge) (err error) { + _, err = call(env.EndpointUpdate, map[string]any{ + "id": Id, + "edges": data, + }) + if err != nil { + err = fmt.Errorf("更新节点数据失败:%w", err) + } + return err +} + func call(endpoint string, body any) (string, error) { bodyStr, err := json.Marshal(body) if err != nil { diff --git a/gateway/env/env.go b/gateway/env/env.go index 62042dd..4cd3ab0 100644 --- a/gateway/env/env.go +++ b/gateway/env/env.go @@ -35,9 +35,9 @@ var ( RedisDb = 0 RedisPass = "" - EndpointOnline string - EndpointOffline string - EndpointAssigned string + EndpointOnline string + EndpointOffline string + EndpointUpdate string ) func Init() { @@ -209,10 +209,11 @@ func Init() { } else { panic("环境变量 ENDPOINT_OFFLINE 未设置") } - value = os.Getenv("ENDPOINT_ASSIGNED") + + value = os.Getenv("ENDPOINT_UPDATE") if value != "" { - EndpointAssigned = value + EndpointUpdate = value } else { - panic("环境变量 ENDPOINT_ASSIGNED 未设置") + panic("环境变量 ENDPOINT_UPDATE 未设置") } } diff --git a/gateway/gateway.go b/gateway/gateway.go index ce53e87..aff9b67 100644 --- a/gateway/gateway.go +++ b/gateway/gateway.go @@ -13,7 +13,6 @@ import ( "proxy-server/gateway/fwd" g "proxy-server/gateway/globals" "proxy-server/gateway/log" - "proxy-server/gateway/report" "proxy-server/gateway/web" "proxy-server/utils" "sync" @@ -21,28 +20,16 @@ import ( "github.com/google/uuid" - "github.com/joho/godotenv" - _ "net/http/pprof" ) -type server struct { -} - -func New() *server { - return &server{} -} - -func (s *server) Run() (err error) { +func Start() (err error) { // 初始化 - err = s.init() - if err != nil { - return err - } + setup() // 恢复服务状态 - err = s.restore() + err = restore() if err != nil { return err } @@ -59,18 +46,28 @@ func (s *server) Run() (err error) { defer close(fwdQuit) go func() { defer wg.Done() - err = s.startFwd(ctx) + err = startFwd(ctx) fwdQuit <- err }() // 接口服务 wg.Add(1) - apiQuit := make(chan error, 1) - defer close(apiQuit) + webQuit := make(chan error, 1) + defer close(webQuit) go func() { defer wg.Done() - err := s.startWeb(ctx) - apiQuit <- err + err := startWeb(ctx) + webQuit <- err + }() + + // 调度任务服务 + wg.Add(1) + go func() { + defer wg.Done() + err := startTask(ctx) + if err != nil { + slog.Error("调度任务服务异常退出", "err", err) + } }() // debug @@ -89,7 +86,7 @@ func (s *server) Run() (err error) { // 报告上线 slog.Info("报告服务上线") - err = report.Online(app.Name) + err = app.Online(app.Name) if err != nil { return fmt.Errorf("服务上线失败: %w", err) } @@ -100,7 +97,7 @@ func (s *server) Run() (err error) { if err != nil { slog.Warn("fwd 服务异常退出", "err", err) } - case err := <-apiQuit: + case err := <-webQuit: if err != nil { slog.Warn("web 服务异常退出", "err", err) } @@ -113,7 +110,7 @@ func (s *server) Run() (err error) { defer wg.Done() // 报告下线 slog.Debug("报告服务下线") - err = report.Offline() + err = app.Offline() if err != nil { slog.Error("服务下线失败", "err", err) } @@ -134,22 +131,14 @@ func (s *server) Run() (err error) { return nil } -func (s *server) init() error { - - err := godotenv.Load() - if err != nil { - println("没有本地环境变量文件") - } - +func setup() { log.Init() env.Init() g.InitRedis() g.InitGeo() - - return nil } -func (s *server) restore() error { +func restore() error { var file = "proxy.lock" bytes, err := os.ReadFile(file) @@ -178,7 +167,7 @@ func (s *server) restore() error { return nil } -func (s *server) startFwd(ctx context.Context) error { +func startFwd(ctx context.Context) error { server := fwd.New() go func() { <-ctx.Done() @@ -187,7 +176,7 @@ func (s *server) startFwd(ctx context.Context) error { return server.Run() } -func (s *server) startWeb(ctx context.Context) error { +func startWeb(ctx context.Context) error { server := web.New() go func() { <-ctx.Done() @@ -198,3 +187,37 @@ func (s *server) startWeb(ctx context.Context) error { }() return server.Run() } + +func startTask(ctx context.Context) error { + + var lock = sync.Mutex{} + var updates = make([]*core.Edge, 0) + go func() { + for data := range app.EdgeUpdates { + lock.Lock() + updates = append(updates, data) + lock.Unlock() + } + }() + + // 每 30 秒批量提交一次更新 + var scheduler = time.Tick(30 * time.Second) + for { + select { + case <-ctx.Done(): + return nil + case <-scheduler: + if len(updates) == 0 { + continue + } + err := app.Update(updates) + lock.Lock() + clear(updates) + updates = updates[:0] + lock.Unlock() + if err != nil { + slog.Error("调度更新任务失败", "err", err) + } + } + } +}