diff --git a/Dockerfile b/Dockerfile index 3152709..62b093e 100644 --- a/Dockerfile +++ b/Dockerfile @@ -24,6 +24,7 @@ RUN apt-get update && apt-get install -y ca-certificates # 从构建阶段复制编译好的二进制文件 COPY --from=builder /build/bin/proxy_linux_amd64 /app/proxy +COPY cmd/gateway/ip2region.xdb /app/id2region.xdb # 设置可执行权限 RUN chmod +x /app/proxy diff --git a/README.md b/README.md index 83b8ab0..2c088ae 100644 --- a/README.md +++ b/README.md @@ -1,11 +1,15 @@ ## TODO -节点发送 open 请求后,如果是新节点则访问 geo 接口获取 geo 信息,并连带 host 和 port 信息提交到更新队列 - -节点每次发送心跳后提供或计算 loss 和 rtt 信息,并对比 host 和 status,将需要更新的数据提交到更新队列 +- [x] 节点发送 open 请求后,如果是新节点则访问 geo 接口获取 geo 信息,并连带 host 和 port 信息提交到更新队列 +- [x] open 命令更新 status 网关监听并读取更新队列,定时发送更新数据到平台 + +app 需要提供节点下线更新函数,节点断联后调用下线函数更新 status + +节点每次发送心跳后提供或计算 loss 和 rtt 信息,并对比 host 和 status,将需要更新的数据提交到更新队列 + ### 长期 将协议内容抽离出公共包,gateway 和 edge 节点共同调用 diff --git a/cmd/gateway/ip2region.xdb b/cmd/gateway/ip2region.xdb new file mode 100644 index 0000000..0788d0a Binary files /dev/null and b/cmd/gateway/ip2region.xdb differ diff --git a/edge/edge.go b/edge/edge.go index 96b2dcd..9c5de84 100644 --- a/edge/edge.go +++ b/edge/edge.go @@ -14,7 +14,6 @@ import ( "os/signal" "proxy-server/edge/core" "proxy-server/edge/env" - "proxy-server/edge/geo" "proxy-server/edge/report" "proxy-server/utils" "time" @@ -31,13 +30,6 @@ func Start() error { return fmt.Errorf("初始化环境变量失败: %w", err) } - // 获取归属地 - slog.Debug("获取节点归属地...") - err = geo.Query() - if err != nil { - return fmt.Errorf("获取节点归属地失败: %w", err) - } - // 注册节点 slog.Debug("注册节点...") id, host, err := report.Online() diff --git a/edge/geo/geo.go b/edge/geo/geo.go deleted file mode 100644 index 9eaa6e8..0000000 --- a/edge/geo/geo.go +++ /dev/null @@ -1,159 +0,0 @@ -package geo - -import ( - "bufio" - "encoding/json" - "fmt" - "io" - "net/http" - "net/textproto" - "proxy-server/edge/env" - "strings" -) - -var ( - Ip string - Prov string - City string - Isp string -) - -func Query() (err error) { - - switch env.Mode { - case "dev": - err = dev() - default: - err = ipapi() - } - return err -} - -func dev() (err error) { - Prov = "河南省" - City = "郑州市" - Isp = "电信" - return nil -} - -func cip() (err error) { - //goland:noinspection HttpUrlsUsage - const endpoint = "http://cip.cc" - - req, err := http.NewRequest("GET", endpoint, nil) - if err != nil { - return fmt.Errorf("创建请求失败: %w", err) - } - req.Header.Set("User-Agent", "curl/8.9.1") - req.Header.Set("Accept", "*/*") - - resp, err := http.DefaultClient.Do(req) - if err != nil { - return fmt.Errorf("执行请求失败: %w", err) - } - defer func(Body io.ReadCloser) { - _ = Body.Close() - }(resp.Body) - if resp.StatusCode != http.StatusOK { - return fmt.Errorf("状态码: %s", resp.Status) - } - - reader := textproto.NewReader(bufio.NewReader(resp.Body)) - ipLine, err := reader.ReadLine() - if err != nil { - return fmt.Errorf("读取响应失败: %w", err) - } - Ip = strings.TrimSpace(strings.Split(ipLine, ":")[1]) - - addrLine, err := reader.ReadLine() - if err != nil { - return fmt.Errorf("读取响应失败: %w", err) - } - addr := strings.Split(strings.Split(addrLine, ":")[1], " ") - Prov = strings.TrimSpace(addr[1]) - City = strings.TrimSpace(addr[2]) - - ispLine, err := reader.ReadLine() - if err != nil { - return fmt.Errorf("读取响应失败: %w", err) - } - Isp = strings.TrimSpace(strings.Split(ispLine, ":")[1]) - - return nil -} - -func ipapi() (err error) { - //goland:noinspection HttpUrlsUsage - const endpoint = "http://ip-api.com/json/?fields=regionName,city,as,query&lang=zh-CN" - - resp, err := http.Get(endpoint) - if err != nil { - return fmt.Errorf("执行请求失败: %w", err) - } - defer func(Body io.ReadCloser) { - _ = Body.Close() - }(resp.Body) - - if resp.StatusCode != http.StatusOK { - return fmt.Errorf("状态码: %s", resp.Status) - } - - var data struct { - RegionName string `json:"regionName"` - City string `json:"city"` - As string `json:"as"` - Query string `json:"query"` - } - err = json.NewDecoder(resp.Body).Decode(&data) - if err != nil { - return fmt.Errorf("解析响应失败: %w", err) - } - - Ip = data.Query - Prov = data.RegionName - City = data.City - - var telecom = []string{"AS4134", "AS4812", "AS134419", "AS140292"} - var unicom = []string{"AS4837", "AS17621", "AS17816"} - var mobile = []string{ - "AS9808", "AS24444", "AS24445", "AS24547", "AS38019", - "AS56040", "AS56041", "AS56042", "AS56044", "AS56046", "AS56047", - "AS132525", "AS134810", - } - var foreign = []string{ - "AS9299", - } - - for _, telecomAsn := range telecom { - if strings.HasPrefix(data.As, telecomAsn) { - Isp = "电信" - break - } - } - if Isp == "" { - for _, unicomAsn := range unicom { - if strings.HasPrefix(data.As, unicomAsn) { - Isp = "联通" - break - } - } - } - if Isp == "" { - for _, mobileAsn := range mobile { - if strings.HasPrefix(data.As, mobileAsn) { - Isp = "移动" - break - } - } - } - if Isp == "" { - for _, foreignAsn := range foreign { - if strings.HasPrefix(data.As, foreignAsn) { - Isp = "国外" - break - } - } - } - - return nil -} diff --git a/gateway/app/app.go b/gateway/app/app.go index 0c59abd..bb2f864 100644 --- a/gateway/app/app.go +++ b/gateway/app/app.go @@ -32,40 +32,56 @@ var ( EdgeUpdates = make(chan *core.Edge, 1000) // 节点更新通知通道 ) -func NewEdge(id int32, port uint16, addr *net.TCPAddr) { - var host = addr.IP.String() - var edge = &core.Edge{ - Id: id, - Host: &host, - Port: &port, +func NewEdge(id int32, port uint16, addr *net.TCPAddr) error { + if addr == nil { + return fmt.Errorf("边缘节点 %d 地址无效", id) + } + info, err := IpGeo(addr.IP) + if err != nil { + return fmt.Errorf("查询 geo 信息失败:%w", err) } - // todo 查询 geo 信息 + var host = addr.IP.String() + var edge = &core.Edge{ + Id: id, + Host: &host, + Port: &port, + Prov: &info.Prov, + City: &info.City, + Isp: &info.Isp, + Status: &core.EdgeOnline, + } Edges.Store(id, edge) Assigns.Store(port, id) EdgeUpdates <- edge + + return nil } func TryUpdateEdge(id int32, addr *net.TCPAddr) error { if addr == nil { return fmt.Errorf("边缘节点 %d 地址无效", id) } - edge, ok := Edges.Load(id) if !ok { return fmt.Errorf("边缘节点 %d 不存在", id) } + edge.Status = &core.EdgeOnline + toUpdate := &core.Edge{ + Id: edge.Id, + Status: &core.EdgeOnline, + } + host := addr.IP.String() if edge.Host == nil || *edge.Host != host { edge.Host = &host - EdgeUpdates <- &core.Edge{ - Id: edge.Id, - Host: &host, - } + toUpdate.Host = &host } + EdgeUpdates <- toUpdate + return nil } diff --git a/gateway/app/geo.go b/gateway/app/geo.go new file mode 100644 index 0000000..34a5d18 --- /dev/null +++ b/gateway/app/geo.go @@ -0,0 +1,79 @@ +package app + +import ( + "encoding/json" + "fmt" + "io" + "log/slog" + "net" + "net/http" + g "proxy-server/gateway/globals" + "strings" +) + +const base = "https://whois.pconline.com.cn/ipJson.jsp?json=true&ip=" + +func IpGeo(ip net.IP) (info *IpGeoInfo, err error) { + defer func() { + var rs = recover() + if reErr, ok := rs.(error); ok { + err = fmt.Errorf("执行归属地查询异常 %w", reErr) + } + }() + + // 本地归属地查询 + str, err := g.Geo.SearchByStr(ip.String()) + if err != nil { + return nil, err + } + if str != "" { + slog.Debug("本地解析归属地结果", "str", str) + values := strings.Split(str, "|") + if len(values) != 5 { + return nil, fmt.Errorf("本地归属地查询解析失败") + } + return &IpGeoInfo{ + Prov: values[2], + City: values[3], + Isp: values[4], + }, nil + } + + // 归属地查询 + var url = base + ip.String() + resp, err := http.Get(url) + if err != nil { + return nil, fmt.Errorf("查询归属地失败: %w", err) + } + if resp.StatusCode != http.StatusOK { + return nil, fmt.Errorf("查询归属地失败: %s", resp.Status) + } + defer func(Body io.ReadCloser) { + _ = Body.Close() + }(resp.Body) + + // 返回节点信息 + var bytes []byte + _, err = io.ReadFull(resp.Body, bytes) + if err != nil { + return nil, fmt.Errorf("读取归属地响应失败: %w", err) + } + + var body = make(map[string]any) + err = json.Unmarshal(bytes, &body) + if err != nil { + return nil, fmt.Errorf("解析归属地响应失败: %w", err) + } + + return &IpGeoInfo{ + Prov: body["pro"].(string), + City: body["city"].(string), + Isp: strings.Split(body["addr"].(string), " ")[1], + }, nil +} + +type IpGeoInfo struct { + Prov string + City string + Isp string +} diff --git a/gateway/fwd/ctrl.go b/gateway/fwd/ctrl.go index eea76f9..12dca69 100644 --- a/gateway/fwd/ctrl.go +++ b/gateway/fwd/ctrl.go @@ -193,7 +193,10 @@ func onOpen(ctx context.Context, writer io.Writer, edgeId int32, addr net.Addr) return errors.New("没有可用的端口") } - app.NewEdge(edgeId, port, tcpAddr) + err := app.NewEdge(edgeId, port, tcpAddr) + if err != nil { + return fmt.Errorf("新增边缘节点失败:%w", err) + } app.LockPortAssign.Unlock() } else { @@ -201,7 +204,7 @@ func onOpen(ctx context.Context, writer io.Writer, edgeId int32, addr net.Addr) port = *edge.Port err := app.TryUpdateEdge(edgeId, tcpAddr) if err != nil { - return fmt.Errorf("尝试更新边缘节点地址失败: %w", err) + return fmt.Errorf("尝试更新边缘节点失败: %w", err) } } diff --git a/gateway/gateway.go b/gateway/gateway.go index e1f4ef1..ce53e87 100644 --- a/gateway/gateway.go +++ b/gateway/gateway.go @@ -118,8 +118,9 @@ func (s *server) Run() (err error) { slog.Error("服务下线失败", "err", err) } - // 关闭 redis + // 关闭服务 g.ExitRedis() + g.ExitGeo() }() // 等待其它服务关闭 @@ -143,6 +144,7 @@ func (s *server) init() error { log.Init() env.Init() g.InitRedis() + g.InitGeo() return nil } diff --git a/gateway/globals/geo.go b/gateway/globals/geo.go new file mode 100644 index 0000000..adec7d3 --- /dev/null +++ b/gateway/globals/geo.go @@ -0,0 +1,23 @@ +package globals + +import "github.com/lionsoul2014/ip2region/binding/golang/xdb" + +var Geo *xdb.Searcher + +func InitGeo() { + var err error + + buff, err := xdb.LoadContentFromFile("ip2region.xdb") + if err != nil { + panic("读取 geo 数据库文件失败") + } + + Geo, err = xdb.NewWithBuffer(buff) + if err != nil { + panic("初始化 geo 查询工具失败") + } +} + +func ExitGeo() { + Geo.Close() +} diff --git a/gateway/web/handlers/info.go b/gateway/web/handlers/info.go index 29c4dba..9f6b1a5 100644 --- a/gateway/web/handlers/info.go +++ b/gateway/web/handlers/info.go @@ -16,7 +16,6 @@ type InfoResp struct { DataConnections int `json:"data_connections"` // Edges []EdgeResp `json:"edges"` - Assigns map[uint16]int32 `json:"assigns"` Edges map[int32]*core.Edge `json:"edges"` Permits map[int32]*core.Permit `json:"permits"` } @@ -29,12 +28,6 @@ type EdgeResp struct { func Info(c *fiber.Ctx) error { - var assigns = make(map[uint16]int32) - app.Assigns.Range(func(port uint16, id int32) bool { - assigns[port] = id - return true - }) - var edges = make(map[int32]*core.Edge) app.Edges.Range(func(id int32, edge *core.Edge) bool { edges[id] = edge @@ -54,7 +47,6 @@ func Info(c *fiber.Ctx) error { UserConnections: int(app.UserConnWg.Count()), CtrlConnections: int(app.CtrlConnWg.Count()), DataConnections: int(app.DataConnWg.Count()), - Assigns: assigns, Edges: edges, Permits: permits, }) diff --git a/go.mod b/go.mod index 464a8b2..baf425e 100644 --- a/go.mod +++ b/go.mod @@ -17,8 +17,10 @@ require ( github.com/cespare/xxhash/v2 v2.3.0 // indirect github.com/dgryski/go-rendezvous v0.0.0-20200823014737-9f7001d12a5f // indirect github.com/klauspost/compress v1.18.0 // indirect + github.com/lionsoul2014/ip2region/binding/golang v0.0.0-20250508043914-ed57fa5c5274 // indirect github.com/mattn/go-isatty v0.0.20 // indirect github.com/mattn/go-runewidth v0.0.16 // indirect + github.com/mitchellh/go-homedir v1.1.0 // indirect github.com/rivo/uniseg v0.4.7 // indirect github.com/valyala/bytebufferpool v1.0.0 // indirect github.com/valyala/fasthttp v1.59.0 // indirect diff --git a/go.sum b/go.sum index 790b990..82e9ac5 100644 --- a/go.sum +++ b/go.sum @@ -1,4 +1,5 @@ github.com/andybalholm/brotli v1.1.1 h1:PR2pgnyFznKEugtsUo0xLdDop5SKXd5Qf5ysW+7XdTA= +github.com/andybalholm/brotli v1.1.1/go.mod h1:05ib4cKhjx3OQYUY22hTVd34Bc8upXjOLL2rKwwZBoA= github.com/bsm/ginkgo/v2 v2.12.0 h1:Ny8MWAHyOepLGlLKYmXG4IEkioBysk6GpaRTLC8zwWs= github.com/bsm/ginkgo/v2 v2.12.0/go.mod h1:SwYbGRRDovPVboqFv0tPTcG1sN61LM1Z4ARdbAV9g4c= github.com/bsm/gomega v1.27.10 h1:yeMWxP2pV2fG3FgAODIY8EiRE3dy0aeFYt4l7wh6yKA= @@ -14,6 +15,9 @@ github.com/google/uuid v1.6.0/go.mod h1:TIyPZe4MgqvfeYDBFedMoGGpEw/LqOeaOT+nhxU+ github.com/joho/godotenv v1.5.1 h1:7eLL/+HRGLY0ldzfGMeQkb7vMd0as4CfYvUVzLqw0N0= github.com/joho/godotenv v1.5.1/go.mod h1:f4LDr5Voq0i2e/R5DDNOoa2zzDfwtkZa6DnEwAbqwq4= github.com/klauspost/compress v1.18.0 h1:c/Cqfb0r+Yi+JtIEq73FWXVkRonBlf0CRNYc8Zttxdo= +github.com/klauspost/compress v1.18.0/go.mod h1:2Pp+KzxcywXVXMr50+X0Q/Lsb43OQHYWRCY2AiWywWQ= +github.com/lionsoul2014/ip2region/binding/golang v0.0.0-20250508043914-ed57fa5c5274 h1:Vslec/nYvO2TdLdhwex8/1x64OZoQNsUzG79WABQaWg= +github.com/lionsoul2014/ip2region/binding/golang v0.0.0-20250508043914-ed57fa5c5274/go.mod h1:C5LA5UO2ZXJrLaPLYtE1wUJMiyd/nwWaCO5cw/2pSHs= github.com/lmittmann/tint v1.0.7 h1:D/0OqWZ0YOGZ6AyC+5Y2kD8PBEzBk6rFHVSfOqCkF9Y= github.com/lmittmann/tint v1.0.7/go.mod h1:HIS3gSy7qNwGCj+5oRjAutErFBl4BzdQP6cJZ0NfMwE= github.com/mattn/go-colorable v0.1.14 h1:9A9LHSqF/7dyVVX6g0U9cwm9pG3kP9gSzcuIPHPsaIE= @@ -22,27 +26,35 @@ github.com/mattn/go-isatty v0.0.20 h1:xfD0iDuEKnDkl03q4limB+vH+GxLEtL/jb4xVJSWWE github.com/mattn/go-isatty v0.0.20/go.mod h1:W+V8PltTTMOvKvAeJH7IuucS94S2C6jfK/D7dTCTo3Y= github.com/mattn/go-runewidth v0.0.16 h1:E5ScNMtiwvlvB5paMFdw9p4kSQzbXFikJ5SQO6TULQc= github.com/mattn/go-runewidth v0.0.16/go.mod h1:Jdepj2loyihRzMpdS35Xk/zdY8IAYHsh153qUoGf23w= +github.com/mitchellh/go-homedir v1.1.0 h1:lukF9ziXFxDFPkA1vsr5zpc1XuPDn/wFntq5mG+4E0Y= +github.com/mitchellh/go-homedir v1.1.0/go.mod h1:SfyaCUpYCn1Vlf4IUYiD9fPX4A5wJrkLzIz1N1q0pr0= github.com/redis/go-redis/v9 v9.8.0 h1:q3nRvjrlge/6UD7eTu/DSg2uYiU2mCL0G/uzBWqhicI= github.com/redis/go-redis/v9 v9.8.0/go.mod h1:huWgSWd8mW6+m0VPhJjSSQ+d6Nh1VICQ6Q5lHuCH/Iw= github.com/rivo/uniseg v0.2.0/go.mod h1:J6wj4VEh+S6ZtnVlnTBMWIodfgj8LQOQFoIToxlJtxc= github.com/rivo/uniseg v0.4.7 h1:WUdvkW8uEhrYfLC4ZzdpI2ztxP1I582+49Oc5Mq64VQ= +github.com/rivo/uniseg v0.4.7/go.mod h1:FN3SvrM+Zdj16jyLfmOkMNblXMcoc8DfTHruCPUcx88= github.com/soheilhy/cmux v0.1.5 h1:jjzc5WVemNEDTLwv9tlmemhC73tI08BNOIGwBOo10Js= github.com/soheilhy/cmux v0.1.5/go.mod h1:T7TcVDs9LWfQgPlPsdngu6I6QIoyIFZDDC6sNE1GqG0= github.com/valyala/bytebufferpool v1.0.0 h1:GqA5TC/0021Y/b9FG4Oi9Mr3q7XYx6KllzawFIhcdPw= github.com/valyala/bytebufferpool v1.0.0/go.mod h1:6bBcMArwyJ5K/AmCkWv1jt77kVWyCJ6HpOuEn7z0Csc= github.com/valyala/fasthttp v1.59.0 h1:Qu0qYHfXvPk1mSLNqcFtEk6DpxgA26hy6bmydotDpRI= +github.com/valyala/fasthttp v1.59.0/go.mod h1:GTxNb9Bc6r2a9D0TWNSPwDz78UxnTGBViY3xZNEqyYU= github.com/xyproto/randomstring v1.0.5 h1:YtlWPoRdgMu3NZtP45drfy1GKoojuR7hmRcnhZqKjWU= +github.com/xyproto/randomstring v1.0.5/go.mod h1:rgmS5DeNXLivK7YprL0pY+lTuhNQW3iGxZ18UQApw/E= golang.org/x/crypto v0.0.0-20190308221718-c2843e01d9a2/go.mod h1:djNgcEr1/C05ACkg1iLfiJU5Ep61QUkGW8qpdssI0+w= golang.org/x/crypto v0.0.0-20200622213623-75b288015ac9/go.mod h1:LzIPMQfyMNhhGPhUkYOs5KpL4U8rLKemX1yGLhDgUto= golang.org/x/net v0.0.0-20190404232315-eb5bcb51f2a3/go.mod h1:t9HGtf8HONx5eT2rtn7q6eTqICYqUVnKs3thJo3Qplg= golang.org/x/net v0.0.0-20201202161906-c7110b5ffcbb/go.mod h1:sp8m0HH+o8qH0wwXwYZr8TS3Oi6o0r6Gce1SSxlDquU= golang.org/x/net v0.37.0 h1:1zLorHbz+LYj7MQlSf1+2tPIIgibq2eL5xkrGk6f+2c= +golang.org/x/net v0.37.0/go.mod h1:ivrbrMbzFq5J41QOQh0siUuly180yBYtLp+CKbEaFx8= golang.org/x/sys v0.0.0-20190215142949-d0b11bdaac8a/go.mod h1:STP8DvDyc/dI5b8T5hshtkjS+E42TnysNCUPdjciGhY= golang.org/x/sys v0.0.0-20190412213103-97732733099d/go.mod h1:h1NjWce9XRLGQEsW7wpKNCjG9DtNlClVuFLEZdDNbEs= golang.org/x/sys v0.0.0-20200930185726-fdedc70b468f/go.mod h1:h1NjWce9XRLGQEsW7wpKNCjG9DtNlClVuFLEZdDNbEs= golang.org/x/sys v0.6.0/go.mod h1:oPkhp1MJrh7nUepCBck5+mAzfO9JrbApNNgaTdGDITg= golang.org/x/sys v0.31.0 h1:ioabZlmFYtWhL+TRYpcnNlLwhyxaM9kWTDEmfnprqik= +golang.org/x/sys v0.31.0/go.mod h1:BJP2sWEmIv4KK5OTEluFJCKSidICx8ciO85XgH3Ak8k= golang.org/x/text v0.3.0/go.mod h1:NqM8EUOU14njkJ3fqMW+pc6Ldnwhi/IjpwHt7yyuwOQ= golang.org/x/text v0.3.3/go.mod h1:5Zoc/QRtKVWzQhOtBMvqHzDpF6irO9z98xDceosuGiQ= golang.org/x/text v0.23.0 h1:D71I7dUrlY+VX0gQShAThNGHFxZ13dGLBHQLVl1mJlY= +golang.org/x/text v0.23.0/go.mod h1:/BLNzu4aZCJ1+kcD0DNRotWKage4q2rGVAg4o22unh4= golang.org/x/tools v0.0.0-20180917221912-90fa682c2a6e/go.mod h1:n7NCudcB/nEzxVGmLbDWY5pfWTLqBcC2KZ6jyYvM4mQ=