From 24351e1c56280e39127ab3011ec32170dfde1d55 Mon Sep 17 00:00:00 2001 From: luorijun Date: Mon, 19 May 2025 09:41:41 +0800 Subject: [PATCH] =?UTF-8?q?=E9=87=8D=E6=9E=84=E8=BF=9E=E6=8E=A5=E7=9B=91?= =?UTF-8?q?=E5=90=AC=E4=B8=8E=E5=A4=84=E7=90=86=E4=BB=A3=E7=A0=81=E9=80=BB?= =?UTF-8?q?=E8=BE=91=EF=BC=8C=E8=BF=9E=E6=8E=A5=E4=BF=A1=E6=81=AF=E5=AD=98?= =?UTF-8?q?=E5=82=A8=E4=BA=8E=E5=85=A8=E5=B1=80=E9=80=9A=E8=BF=87=E6=8E=A5?= =?UTF-8?q?=E5=8F=A3=E5=B1=95=E7=A4=BA?= MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit --- gateway/app/app.go | 7 +++++ gateway/fwd/ctrl.go | 42 ++++++++++++++-------------- gateway/fwd/data.go | 34 ++++++++++++----------- gateway/fwd/fwd.go | 22 +++++---------- gateway/fwd/user.go | 21 +++++++------- gateway/web/handlers/auth.go | 6 ++-- gateway/web/handlers/info.go | 53 ++++++++++++++++++++++++++++++++++++ gateway/web/router.go | 8 ++++-- 8 files changed, 126 insertions(+), 67 deletions(-) create mode 100644 gateway/web/handlers/info.go diff --git a/gateway/app/app.go b/gateway/app/app.go index 80b1794..9a26c17 100644 --- a/gateway/app/app.go +++ b/gateway/app/app.go @@ -2,6 +2,7 @@ package app import ( "proxy-server/gateway/core" + "proxy-server/utils" ) type Stoppable interface { @@ -16,6 +17,12 @@ var ( Assigns = core.SyncMap[uint16, int32]{} // 转发端口 -> 节点 ID Edges = core.SyncMap[int32, uint16]{} // 节点 ID -> 转发端口 Permits = core.SyncMap[int32, *core.Permit]{} // 转发端口 -> 权限配置 + + CtrlConnWg utils.CountWaitGroup // 控制通道计数器 + DataConnWg utils.CountWaitGroup // 数据通道计数器 + FwdLesWg utils.CountWaitGroup // 转发监听端口计数器 + UserConnWg utils.CountWaitGroup // 用户连接计数器 + UserConnMap core.SyncMap[string, *core.Conn] // 用户连接暂存 ) func AddEdge(id int32, port uint16) { diff --git a/gateway/fwd/ctrl.go b/gateway/fwd/ctrl.go index 7d1d808..9476989 100644 --- a/gateway/fwd/ctrl.go +++ b/gateway/fwd/ctrl.go @@ -27,7 +27,7 @@ const ( CtrlCmdProxy ) -func (s *Service) listenCtrl() error { +func ListenCtrl(ctx context.Context) error { ctrlPort := env.AppCtrlPort // 监听端口 @@ -53,7 +53,7 @@ func (s *Service) listenCtrl() error { } select { case connCh <- conn: - case <-s.ctx.Done(): + case <-ctx.Done(): utils.Close(conn) return } @@ -63,14 +63,14 @@ func (s *Service) listenCtrl() error { err = nil for { select { - case <-s.ctx.Done(): + case <-ctx.Done(): return nil case conn := <-connCh: - s.ctrlConnWg.Add(1) + app.CtrlConnWg.Add(1) go func() { - defer s.ctrlConnWg.Done() + defer app.CtrlConnWg.Done() defer utils.Close(conn) - err := s.processCtrlConn(s.ctx, conn) + err := processCtrlConn(ctx, conn) if err != nil { slog.Error("处理控制通道连接失败", "err", err) } @@ -79,7 +79,7 @@ func (s *Service) listenCtrl() error { } } -func (s *Service) processCtrlConn(_ctx context.Context, conn net.Conn) (err error) { +func processCtrlConn(_ctx context.Context, conn net.Conn) (err error) { // 通道上下文 ctx, cancel := context.WithCancel(_ctx) @@ -126,21 +126,21 @@ func (s *Service) processCtrlConn(_ctx context.Context, conn net.Conn) (err erro return fmt.Errorf("读取节点 ID 失败: %w", err) } var client = int32(binary.BigEndian.Uint32(recv)) - fwdPort, err = s.onOpen(ctx, conn, client) + fwdPort, err = onOpen(ctx, conn, client) if err != nil { return fmt.Errorf("处理连接建立命令失败: %w", err) } // 心跳命令 case CtrlCmdPing: - err = s.onPing(conn) + err = onPing(conn) if err != nil { return fmt.Errorf("处理心跳命令失败: %w", err) } // 连接关闭命令 case CtrlCmdClose: - err = s.onClose(conn) + err = onClose(conn) if err != nil { return fmt.Errorf("处理关闭命令失败: %w", err) } @@ -153,7 +153,7 @@ func (s *Service) processCtrlConn(_ctx context.Context, conn net.Conn) (err erro } } -func (s *Service) onOpen(ctx context.Context, writer io.Writer, edge int32) (port uint16, err error) { +func onOpen(ctx context.Context, writer io.Writer, edge int32) (port uint16, err error) { // open 命令全局只执行一次 _, ok := app.Edges.Load(edge) if ok { @@ -181,16 +181,16 @@ func (s *Service) onOpen(ctx context.Context, writer io.Writer, edge int32) (por } // 响应节点 - if err = s.sendPong(writer); err != nil { + if err = sendPong(writer); err != nil { return 0, fmt.Errorf("响应节点失败: %w", err) } // 启动转发服务 - s.fwdLesWg.Add(1) + app.FwdLesWg.Add(1) go func() { - defer s.fwdLesWg.Done() + defer app.FwdLesWg.Done() slog.Info("监听转发端口", "port", port, "edge", edge) - err = s.listenUser(ctx, port, writer) + err = ListenUser(ctx, port, writer) if err != nil { slog.Error("监听转发端口失败", "port", port, "edge", edge, "err", err) } @@ -199,15 +199,15 @@ func (s *Service) onOpen(ctx context.Context, writer io.Writer, edge int32) (por return port, nil } -func (s *Service) onPing(writer io.Writer) (err error) { - return s.sendPong(writer) +func onPing(writer io.Writer) (err error) { + return sendPong(writer) } -func (s *Service) onClose(writer io.Writer) (err error) { - return s.sendPong(writer) +func onClose(writer io.Writer) (err error) { + return sendPong(writer) } -func (s *Service) sendPong(writer io.Writer) (err error) { +func sendPong(writer io.Writer) (err error) { _, err = writer.Write([]byte{byte(CtrlCmdPong)}) if err != nil { return fmt.Errorf("响应节点失败: %w", err) @@ -215,7 +215,7 @@ func (s *Service) sendPong(writer io.Writer) (err error) { return nil } -func (s *Service) sendProxy(writer io.Writer, tag [16]byte, addr string) (err error) { +func sendProxy(writer io.Writer, tag [16]byte, addr string) (err error) { if len(addr) > 65535 { return fmt.Errorf("代理地址过长: %s", addr) } diff --git a/gateway/fwd/data.go b/gateway/fwd/data.go index 9ffb9dc..d13f793 100644 --- a/gateway/fwd/data.go +++ b/gateway/fwd/data.go @@ -2,22 +2,24 @@ package fwd import ( "bufio" + "context" "errors" "fmt" "github.com/google/uuid" "io" "log/slog" "net" + "proxy-server/gateway/app" "proxy-server/gateway/debug" "proxy-server/gateway/env" "proxy-server/gateway/fwd/metrics" - utils2 "proxy-server/utils" + "proxy-server/utils" "strconv" "sync" "time" ) -func (s *Service) listenData() error { +func ListenData(ctx context.Context) error { dataPort := env.AppDataPort // 监听端口 @@ -25,7 +27,7 @@ func (s *Service) listenData() error { if err != nil { return fmt.Errorf("监听数据通道失败: %w", err) } - defer utils2.Close(ls) + defer utils.Close(ls) // 异步等待连接 var connCh = make(chan net.Conn) @@ -42,8 +44,8 @@ func (s *Service) listenData() error { } select { case connCh <- conn: - case <-s.ctx.Done(): - utils2.Close(conn) + case <-ctx.Done(): + utils.Close(conn) return } } @@ -52,14 +54,14 @@ func (s *Service) listenData() error { // 处理连接 for { select { - case <-s.ctx.Done(): + case <-ctx.Done(): return nil case conn := <-connCh: - s.dataConnWg.Add(1) + app.DataConnWg.Add(1) go func() { - defer s.dataConnWg.Done() - defer utils2.Close(conn) - err := s.processDataConn(conn) + defer app.DataConnWg.Done() + defer utils.Close(conn) + err := processDataConn(ctx, conn) if err != nil { slog.Error("处理数据通道连接失败", "err", err) } @@ -68,7 +70,7 @@ func (s *Service) listenData() error { } } -func (s *Service) processDataConn(client net.Conn) error { +func processDataConn(ctx context.Context, client net.Conn) error { var reader = bufio.NewReader(client) // 接收连接结果 @@ -83,11 +85,11 @@ func (s *Service) processDataConn(client net.Conn) error { // 加载用户连接 var tagStr = uuid.UUID(tag).String() - user, ok := s.userConnMap.LoadAndDelete(tagStr) + user, ok := app.UserConnMap.LoadAndDelete(tagStr) if !ok { return fmt.Errorf("用户连接已关闭,tag:%s", tagStr) } - defer utils2.Close(user) + defer utils.Close(user) // 检查状态 if status != 1 { @@ -98,7 +100,7 @@ func (s *Service) processDataConn(client net.Conn) error { data := time.Now() userPipeReader, userPipeWriter := io.Pipe() - defer utils2.Close(userPipeWriter) + defer utils.Close(userPipeWriter) teeUser := io.TeeReader(user, userPipeWriter) go func() { @@ -127,10 +129,10 @@ func (s *Service) processDataConn(client net.Conn) error { select { - case <-s.ctx.Done(): + case <-ctx.Done(): return nil - case <-utils2.WgWait(&wg): + case <-utils.WgWait(&wg): proxy := time.Now() start, startOk := metrics.TimerStart.Load(user.Conn) diff --git a/gateway/fwd/fwd.go b/gateway/fwd/fwd.go index e1c4d9f..b8067fb 100644 --- a/gateway/fwd/fwd.go +++ b/gateway/fwd/fwd.go @@ -3,22 +3,14 @@ package fwd import ( "context" "log/slog" - "proxy-server/gateway/core" + "proxy-server/gateway/app" "proxy-server/gateway/env" - "proxy-server/utils" "sync" ) type Service struct { ctx context.Context cancel context.CancelFunc - - userConnMap core.SyncMap[string, *core.Conn] - - fwdLesWg utils.CountWaitGroup - ctrlConnWg utils.CountWaitGroup - dataConnWg utils.CountWaitGroup - userConnWg utils.CountWaitGroup } func New() *Service { @@ -41,7 +33,7 @@ func (s *Service) Run() error { wg.Add(1) go func() { defer wg.Done() - err := s.listenCtrl() + err := ListenCtrl(s.ctx) if err != nil { slog.Error("fwd 控制通道监听发生错误", "err", err) errQuit <- struct{}{} @@ -53,7 +45,7 @@ func (s *Service) Run() error { wg.Add(1) go func() { defer wg.Done() - err := s.listenData() + err := ListenData(s.ctx) if err != nil { slog.Error("fwd 数据通道监听发生错误", "err", err) errQuit <- struct{}{} @@ -70,10 +62,10 @@ func (s *Service) Run() error { } wg.Wait() - s.fwdLesWg.Wait() - s.ctrlConnWg.Wait() - s.userConnWg.Wait() - s.dataConnWg.Wait() + app.FwdLesWg.Wait() + app.CtrlConnWg.Wait() + app.UserConnWg.Wait() + app.DataConnWg.Wait() return nil } diff --git a/gateway/fwd/user.go b/gateway/fwd/user.go index 3058a85..4256ace 100644 --- a/gateway/fwd/user.go +++ b/gateway/fwd/user.go @@ -7,6 +7,7 @@ import ( "fmt" "io" "log/slog" + "proxy-server/gateway/app" "proxy-server/gateway/core" "proxy-server/gateway/env" "proxy-server/gateway/fwd/dispatcher" @@ -15,7 +16,7 @@ import ( "time" ) -func (s *Service) listenUser(ctx context.Context, port uint16, ctrl io.Writer) error { +func ListenUser(ctx context.Context, port uint16, ctrl io.Writer) error { dspt, err := dispatcher.New(port, time.Duration(env.AppUserTimeout)*time.Second) if err != nil { return err @@ -44,10 +45,10 @@ func (s *Service) listenUser(ctx context.Context, port uint16, ctrl io.Writer) e return err case user := <-dspt.Conn: metrics.TimerAuth.Store(user.Conn, time.Now()) - s.userConnWg.Add(1) + app.UserConnWg.Add(1) go func() { - defer s.userConnWg.Done() - err := s.processUserConn(user, ctrl) + defer app.UserConnWg.Done() + err := processUserConn(ctx, user, ctrl) if err != nil { slog.Error("处理用户连接失败", "err", err) utils.Close(user) @@ -57,16 +58,16 @@ func (s *Service) listenUser(ctx context.Context, port uint16, ctrl io.Writer) e } } -func (s *Service) processUserConn(user *core.Conn, ctrl io.Writer) (err error) { +func processUserConn(ctx context.Context, user *core.Conn, ctrl io.Writer) (err error) { // 发送代理命令 - err = s.sendProxy(ctrl, user.Tag, user.Dest.String()) + err = sendProxy(ctrl, user.Tag, user.Dest.String()) if err != nil { return err } // 保存用户连接 - s.userConnMap.Store(hex.EncodeToString(user.Tag[:]), user) + app.UserConnMap.Store(hex.EncodeToString(user.Tag[:]), user) // 如果限定时间内没有建立数据通道,则关闭连接 var timeout, cancel = context.WithTimeout(context.Background(), time.Duration(env.AppDataTimeout)*time.Second) @@ -75,11 +76,11 @@ func (s *Service) processUserConn(user *core.Conn, ctrl io.Writer) (err error) { select { case <-timeout.Done(): err = timeout.Err() - case <-s.ctx.Done(): - err = s.ctx.Err() + case <-ctx.Done(): + err = ctx.Err() } - _, ok := s.userConnMap.LoadAndDelete(hex.EncodeToString(user.Tag[:])) + _, ok := app.UserConnMap.LoadAndDelete(hex.EncodeToString(user.Tag[:])) if ok { utils.Close(user) if errors.Is(err, context.DeadlineExceeded) { diff --git a/gateway/web/handlers/auth.go b/gateway/web/handlers/auth.go index 89863a4..aa34e11 100644 --- a/gateway/web/handlers/auth.go +++ b/gateway/web/handlers/auth.go @@ -6,12 +6,12 @@ import ( "proxy-server/gateway/core" ) -type AuthReq struct { +type PermitReq struct { Id int32 `json:"id"` core.Permit } -func Auth(ctx *fiber.Ctx) (err error) { +func Permit(ctx *fiber.Ctx) (err error) { // 安全验证 var sec core.SecuredReq @@ -20,7 +20,7 @@ func Auth(ctx *fiber.Ctx) (err error) { } // 获取请求参数 - req, err := core.Decrypt[AuthReq](&sec, app.PlatformSecret) + req, err := core.Decrypt[PermitReq](&sec, app.PlatformSecret) if err != nil { return err } diff --git a/gateway/web/handlers/info.go b/gateway/web/handlers/info.go new file mode 100644 index 0000000..cd9a706 --- /dev/null +++ b/gateway/web/handlers/info.go @@ -0,0 +1,53 @@ +package handlers + +import ( + "github.com/gofiber/fiber/v2" + "proxy-server/gateway/app" + "proxy-server/gateway/core" +) + +type InfoResp struct { + Id int32 `json:"id"` + Name string `json:"name"` + + FwdListeners int `json:"fwd_listeners"` + UserConnections int `json:"user_connections"` + CtrlConnections int `json:"ctrl_connections"` + DataConnections int `json:"data_connections"` + + Edges []EdgeResp `json:"edges"` +} + +type EdgeResp struct { + Id int32 `json:"id"` + Port uint16 `json:"port"` + Permit *core.Permit `json:"permit"` +} + +func Info(c *fiber.Ctx) error { + + var edges = make([]EdgeResp, 0) + app.Edges.Range(func(id int32, port uint16) bool { + permit, ok := app.Permits.Load(id) + if !ok { + return true + } + + edges = append(edges, EdgeResp{ + Id: id, + Port: port, + Permit: permit, + }) + return true + }) + + return c.JSON(InfoResp{ + Id: app.Id, + Name: app.Name, + FwdListeners: int(app.FwdLesWg.Count()), + UserConnections: int(app.UserConnWg.Count()), + CtrlConnections: int(app.CtrlConnWg.Count()), + DataConnections: int(app.DataConnWg.Count()), + Edges: edges, + }) +} diff --git a/gateway/web/router.go b/gateway/web/router.go index ffe85fb..3f64e4c 100644 --- a/gateway/web/router.go +++ b/gateway/web/router.go @@ -7,6 +7,10 @@ import ( func Router(r *fiber.App) { var debug = r.Group("/debug") - debug.Get("/debug/consuming/list", handlers.GetConsuming) - debug.Get("/debug/consuming/reset", handlers.RestConsuming) + debug.Get("/info", handlers.Info) + debug.Get("/consuming/list", handlers.GetConsuming) + debug.Get("/consuming/reset", handlers.RestConsuming) + + var api = r.Group("/api") + api.Post("/permit", handlers.Permit) }