端口分配时加锁;网关上线后保存平台恢复的节点与授权数据;现在新节点连接后会按需向平台报告更新

This commit is contained in:
2025-05-27 16:03:00 +08:00
parent c2dcae7af5
commit 48dba2c0c3
11 changed files with 137 additions and 135 deletions

View File

@@ -1,8 +1,11 @@
package app
import (
"fmt"
"net"
"proxy-server/gateway/core"
"proxy-server/utils"
"sync"
)
type Stoppable interface {
@@ -14,25 +17,44 @@ var (
Name string
PlatformSecret string // 平台密钥,验证接收的请求是否属于平台
Assigns = core.SyncMap[uint16, int32]{} // 转发端口 -> 节点 ID
Edges = core.SyncMap[int32, uint16]{} // 节点 ID -> 转发端口
Permits = core.SyncMap[int32, *core.Permit]{} // 转发端口 -> 权限配置
Edges = core.SyncMap[int32, *core.Edge]{} // 节点信息表 (包外只读!数据存储有关联性,所有写入操作只在包内进行)
Assigns = core.SyncMap[uint16, int32]{} // 分配索引 (包外只读!数据存储有关联性,所有写入操作只在包内进行)
Permits = core.SyncMap[int32, *core.Permit]{} // 节点权限表 (包外只读!数据存储有关联性,所有写入操作只在包内进行)
CtrlConnWg utils.CountWaitGroup // 控制通道计数器
DataConnWg utils.CountWaitGroup // 数据通道计数器
FwdLesWg utils.CountWaitGroup // 转发监听端口计数器
UserConnWg utils.CountWaitGroup // 用户连接计数器
UserConnMap core.SyncMap[string, *core.Conn] // 用户连接暂存
LockPortAssign = sync.Mutex{} // 锁定端口分配,防止并发冲突
EdgeUpdates = make(chan *core.Edge, 1000) // 节点更新通知通道
)
func AddEdge(id int32, port uint16) {
Edges.Store(id, port)
func NewEdge(id int32, port uint16, addr *net.TCPAddr) {
var host = addr.IP.String()
var edge = &core.Edge{
Id: id,
Host: &host,
Port: &port,
}
// todo 查询 geo 信息
Edges.Store(id, edge)
Assigns.Store(port, id)
EdgeUpdates <- edge
}
func DelEdge(port uint16) {
id, _ := Assigns.LoadAndDelete(port)
Edges.Delete(id)
func StoreEdge(edge *core.Edge) error {
if edge == nil || edge.Id == 0 || edge.Port == nil {
return fmt.Errorf("无效的边缘节点: %+v", edge)
}
Edges.Store(edge.Id, edge)
Assigns.Store(*edge.Port, edge.Id)
return nil
}
func LoadPermit(port uint16) *core.Permit {
@@ -48,3 +70,7 @@ func LoadPermit(port uint16) *core.Permit {
return permit
}
func StorePermit(def *core.PermitDef) {
Permits.Store(def.Id, &def.Permit)
}

18
gateway/core/edge.go Normal file
View File

@@ -0,0 +1,18 @@
package core
type Edge struct {
Id int32 `json:"id"`
Host *string `json:"host,omitempty"`
Port *uint16 `json:"port,omitempty"`
Prov *string `json:"prov,omitempty"`
City *string `json:"city,omitempty"`
Isp *string `json:"isp,omitempty"`
Status *int `json:"status,omitempty"`
Rtt *int `json:"rtt,omitempty"` // 节点响应时间,单位毫秒
Loss *int `json:"loss,omitempty"` // 节点丢包率,单位百分比
}
var (
EdgeOffline = 0
EdgeOnline = 1
)

View File

@@ -11,7 +11,6 @@ import (
"net"
"proxy-server/gateway/app"
"proxy-server/gateway/env"
"proxy-server/gateway/report"
"proxy-server/utils"
"strconv"
"time"
@@ -85,7 +84,14 @@ func processCtrlConn(_ctx context.Context, conn net.Conn) (err error) {
ctx, cancel := context.WithCancel(_ctx)
defer cancel()
var fwdPort uint16
// 结束时清理
var edgeId int32
defer func() {
var edge, ok = app.Edges.Load(edgeId)
if ok {
*edge.Status = 0
}
}()
// 处理连接命令
var errCh = make(chan error)
@@ -122,8 +128,8 @@ func processCtrlConn(_ctx context.Context, conn net.Conn) (err error) {
errCh <- fmt.Errorf("读取节点 ID 失败: %w", err)
return
}
var client = int32(binary.BigEndian.Uint32(recv))
fwdPort, err = onOpen(ctx, conn, client)
edgeId = int32(binary.BigEndian.Uint32(recv))
err = onOpen(ctx, conn, edgeId, conn.RemoteAddr())
if err != nil {
errCh <- fmt.Errorf("处理连接建立命令失败: %w", err)
return
@@ -159,54 +165,58 @@ func processCtrlConn(_ctx context.Context, conn net.Conn) (err error) {
case err = <-errCh:
}
app.DelEdge(fwdPort)
return
}
func onOpen(ctx context.Context, writer io.Writer, edge int32) (port uint16, err error) {
// open 命令全局只执行一次
_, ok := app.Edges.Load(edge)
if ok {
return 0, fmt.Errorf("节点 ID %d 已经连接", edge)
}
func onOpen(ctx context.Context, writer io.Writer, edgeId int32, addr net.Addr) (err error) {
var port uint16
// 分配端口
var minim uint16 = 20000
var maxim uint16 = 60000
for i := minim; i < maxim; i++ {
var _, ok = app.Assigns.Load(i)
if !ok {
port = i
app.AddEdge(edge, port)
break
edge, ok := app.Edges.Load(edgeId)
if ok && edge.Port != nil {
port = *edge.Port
} else {
// 分配端口
app.LockPortAssign.Lock()
var minim uint16 = 20000
var maxim uint16 = 60000
for i := minim; i < maxim; i++ {
var _, ok = app.Assigns.Load(i)
if !ok {
port = i
break
}
}
if port == 0 {
return errors.New("没有可用的端口")
}
}
if port == 0 {
return 0, errors.New("没有可用的端口")
}
// 报告端口分配
if err = report.Assigned(edge, port); err != nil {
return 0, fmt.Errorf("报告端口分配失败: %w", err)
}
if tcpAddr, ok := addr.(*net.TCPAddr); ok {
app.NewEdge(edgeId, port, tcpAddr)
} else {
return fmt.Errorf("无效的地址类型: %T", addr)
}
// 响应节点
if err = sendPong(writer); err != nil {
return 0, fmt.Errorf("响应节点失败: %w", err)
app.LockPortAssign.Unlock()
}
// 启动转发服务
app.FwdLesWg.Add(1)
go func() {
defer app.FwdLesWg.Done()
slog.Info("监听转发端口", "port", port, "edge", edge)
slog.Info("监听转发端口", "port", port, "edge", edgeId)
err = ListenUser(ctx, port, writer)
if err != nil {
slog.Error("监听转发端口失败", "port", port, "edge", edge, "err", err)
slog.Error("监听转发端口失败", "port", port, "edge", edgeId, "err", err)
}
}()
return port, nil
// 响应节点
if err = sendPong(writer); err != nil {
return fmt.Errorf("响应节点失败: %w", err)
}
return nil
}
func onPing(writer io.Writer) (err error) {

View File

@@ -113,7 +113,7 @@ func (s *server) Run() (err error) {
defer wg.Done()
// 报告下线
slog.Debug("报告服务下线")
err = report.Offline(app.Name)
err = report.Offline()
if err != nil {
slog.Error("服务下线失败", "err", err)
}

View File

@@ -5,6 +5,7 @@ import (
"encoding/json"
"fmt"
"io"
"log/slog"
"net/http"
"proxy-server/gateway/app"
"proxy-server/gateway/core"
@@ -23,9 +24,10 @@ func Online(name string) (err error) {
}
var body struct {
Id int32 `json:"id"`
Secret string `json:"secret"`
Permits []core.PermitDef `json:"permits"`
Id int32 `json:"id"`
Secret string `json:"secret"`
Permits []*core.PermitDef `json:"permits"`
Edges []*core.Edge `json:"edges"`
}
err = json.Unmarshal([]byte(resp), &body)
if err != nil {
@@ -35,25 +37,20 @@ func Online(name string) (err error) {
app.Id = body.Id
app.PlatformSecret = body.Secret
for _, def := range body.Permits {
app.Permits.Store(def.Id, &def.Permit)
app.StorePermit(def)
}
for _, edge := range body.Edges {
err := app.StoreEdge(edge)
if err != nil {
slog.Error("存储边缘节点失败", "err", err, "edge", edge)
}
}
return nil
}
func Offline(name string) (err error) {
func Offline() (err error) {
_, err = call(env.EndpointOffline, map[string]any{
"name": name,
"version": core.Version,
})
return err
}
func Assigned(edgeId int32, port uint16) (err error) {
_, err = call(env.EndpointAssigned, map[string]any{
"proxy": app.Id,
"edge": edgeId,
"port": port,
"id": app.Id,
})
return err
}
@@ -78,7 +75,10 @@ func call(endpoint string, body any) (string, error) {
if err != nil {
return "", err
}
defer resp.Body.Close()
defer func(Body io.ReadCloser) {
_ = Body.Close()
}(resp.Body)
if resp.StatusCode != http.StatusOK {
return "", fmt.Errorf("请求失败,状态码:%d", resp.StatusCode)
}

View File

@@ -24,7 +24,7 @@ func Permit(ctx *fiber.Ctx) (err error) {
// 保存授权配置
for _, permit := range *req {
app.Permits.Store(permit.Id, &permit.Permit)
app.StorePermit(&permit)
}
return nil

View File

@@ -17,7 +17,7 @@ type InfoResp struct {
// Edges []EdgeResp `json:"edges"`
Assigns map[uint16]int32 `json:"assigns"`
Edges map[int32]uint16 `json:"edges"`
Edges map[int32]*core.Edge `json:"edges"`
Permits map[int32]*core.Permit `json:"permits"`
}
@@ -29,30 +29,15 @@ type EdgeResp struct {
func Info(c *fiber.Ctx) error {
// var edges = make([]EdgeResp, 0)
// app.Edges.Range(func(id int32, port uint16) bool {
// permit, ok := app.Permits.Load(id)
// if !ok {
// return true
// }
//
// edges = append(edges, EdgeResp{
// Id: id,
// Port: port,
// Permit: permit,
// })
// return true
// })
var assigns = make(map[uint16]int32)
app.Assigns.Range(func(port uint16, id int32) bool {
assigns[port] = id
return true
})
var edges = make(map[int32]uint16)
app.Edges.Range(func(id int32, port uint16) bool {
edges[id] = port
var edges = make(map[int32]*core.Edge)
app.Edges.Range(func(id int32, edge *core.Edge) bool {
edges[id] = edge
return true
})