重构连接监听与处理代码逻辑,连接信息存储于全局通过接口展示

This commit is contained in:
2025-05-19 09:41:41 +08:00
parent aa967fbd6a
commit 24351e1c56
8 changed files with 126 additions and 67 deletions

View File

@@ -2,6 +2,7 @@ package app
import ( import (
"proxy-server/gateway/core" "proxy-server/gateway/core"
"proxy-server/utils"
) )
type Stoppable interface { type Stoppable interface {
@@ -16,6 +17,12 @@ var (
Assigns = core.SyncMap[uint16, int32]{} // 转发端口 -> 节点 ID Assigns = core.SyncMap[uint16, int32]{} // 转发端口 -> 节点 ID
Edges = core.SyncMap[int32, uint16]{} // 节点 ID -> 转发端口 Edges = core.SyncMap[int32, uint16]{} // 节点 ID -> 转发端口
Permits = core.SyncMap[int32, *core.Permit]{} // 转发端口 -> 权限配置 Permits = core.SyncMap[int32, *core.Permit]{} // 转发端口 -> 权限配置
CtrlConnWg utils.CountWaitGroup // 控制通道计数器
DataConnWg utils.CountWaitGroup // 数据通道计数器
FwdLesWg utils.CountWaitGroup // 转发监听端口计数器
UserConnWg utils.CountWaitGroup // 用户连接计数器
UserConnMap core.SyncMap[string, *core.Conn] // 用户连接暂存
) )
func AddEdge(id int32, port uint16) { func AddEdge(id int32, port uint16) {

View File

@@ -27,7 +27,7 @@ const (
CtrlCmdProxy CtrlCmdProxy
) )
func (s *Service) listenCtrl() error { func ListenCtrl(ctx context.Context) error {
ctrlPort := env.AppCtrlPort ctrlPort := env.AppCtrlPort
// 监听端口 // 监听端口
@@ -53,7 +53,7 @@ func (s *Service) listenCtrl() error {
} }
select { select {
case connCh <- conn: case connCh <- conn:
case <-s.ctx.Done(): case <-ctx.Done():
utils.Close(conn) utils.Close(conn)
return return
} }
@@ -63,14 +63,14 @@ func (s *Service) listenCtrl() error {
err = nil err = nil
for { for {
select { select {
case <-s.ctx.Done(): case <-ctx.Done():
return nil return nil
case conn := <-connCh: case conn := <-connCh:
s.ctrlConnWg.Add(1) app.CtrlConnWg.Add(1)
go func() { go func() {
defer s.ctrlConnWg.Done() defer app.CtrlConnWg.Done()
defer utils.Close(conn) defer utils.Close(conn)
err := s.processCtrlConn(s.ctx, conn) err := processCtrlConn(ctx, conn)
if err != nil { if err != nil {
slog.Error("处理控制通道连接失败", "err", err) slog.Error("处理控制通道连接失败", "err", err)
} }
@@ -79,7 +79,7 @@ func (s *Service) listenCtrl() error {
} }
} }
func (s *Service) processCtrlConn(_ctx context.Context, conn net.Conn) (err error) { func processCtrlConn(_ctx context.Context, conn net.Conn) (err error) {
// 通道上下文 // 通道上下文
ctx, cancel := context.WithCancel(_ctx) ctx, cancel := context.WithCancel(_ctx)
@@ -126,21 +126,21 @@ func (s *Service) processCtrlConn(_ctx context.Context, conn net.Conn) (err erro
return fmt.Errorf("读取节点 ID 失败: %w", err) return fmt.Errorf("读取节点 ID 失败: %w", err)
} }
var client = int32(binary.BigEndian.Uint32(recv)) var client = int32(binary.BigEndian.Uint32(recv))
fwdPort, err = s.onOpen(ctx, conn, client) fwdPort, err = onOpen(ctx, conn, client)
if err != nil { if err != nil {
return fmt.Errorf("处理连接建立命令失败: %w", err) return fmt.Errorf("处理连接建立命令失败: %w", err)
} }
// 心跳命令 // 心跳命令
case CtrlCmdPing: case CtrlCmdPing:
err = s.onPing(conn) err = onPing(conn)
if err != nil { if err != nil {
return fmt.Errorf("处理心跳命令失败: %w", err) return fmt.Errorf("处理心跳命令失败: %w", err)
} }
// 连接关闭命令 // 连接关闭命令
case CtrlCmdClose: case CtrlCmdClose:
err = s.onClose(conn) err = onClose(conn)
if err != nil { if err != nil {
return fmt.Errorf("处理关闭命令失败: %w", err) return fmt.Errorf("处理关闭命令失败: %w", err)
} }
@@ -153,7 +153,7 @@ func (s *Service) processCtrlConn(_ctx context.Context, conn net.Conn) (err erro
} }
} }
func (s *Service) onOpen(ctx context.Context, writer io.Writer, edge int32) (port uint16, err error) { func onOpen(ctx context.Context, writer io.Writer, edge int32) (port uint16, err error) {
// open 命令全局只执行一次 // open 命令全局只执行一次
_, ok := app.Edges.Load(edge) _, ok := app.Edges.Load(edge)
if ok { if ok {
@@ -181,16 +181,16 @@ func (s *Service) onOpen(ctx context.Context, writer io.Writer, edge int32) (por
} }
// 响应节点 // 响应节点
if err = s.sendPong(writer); err != nil { if err = sendPong(writer); err != nil {
return 0, fmt.Errorf("响应节点失败: %w", err) return 0, fmt.Errorf("响应节点失败: %w", err)
} }
// 启动转发服务 // 启动转发服务
s.fwdLesWg.Add(1) app.FwdLesWg.Add(1)
go func() { go func() {
defer s.fwdLesWg.Done() defer app.FwdLesWg.Done()
slog.Info("监听转发端口", "port", port, "edge", edge) slog.Info("监听转发端口", "port", port, "edge", edge)
err = s.listenUser(ctx, port, writer) err = ListenUser(ctx, port, writer)
if err != nil { if err != nil {
slog.Error("监听转发端口失败", "port", port, "edge", edge, "err", err) slog.Error("监听转发端口失败", "port", port, "edge", edge, "err", err)
} }
@@ -199,15 +199,15 @@ func (s *Service) onOpen(ctx context.Context, writer io.Writer, edge int32) (por
return port, nil return port, nil
} }
func (s *Service) onPing(writer io.Writer) (err error) { func onPing(writer io.Writer) (err error) {
return s.sendPong(writer) return sendPong(writer)
} }
func (s *Service) onClose(writer io.Writer) (err error) { func onClose(writer io.Writer) (err error) {
return s.sendPong(writer) return sendPong(writer)
} }
func (s *Service) sendPong(writer io.Writer) (err error) { func sendPong(writer io.Writer) (err error) {
_, err = writer.Write([]byte{byte(CtrlCmdPong)}) _, err = writer.Write([]byte{byte(CtrlCmdPong)})
if err != nil { if err != nil {
return fmt.Errorf("响应节点失败: %w", err) return fmt.Errorf("响应节点失败: %w", err)
@@ -215,7 +215,7 @@ func (s *Service) sendPong(writer io.Writer) (err error) {
return nil return nil
} }
func (s *Service) sendProxy(writer io.Writer, tag [16]byte, addr string) (err error) { func sendProxy(writer io.Writer, tag [16]byte, addr string) (err error) {
if len(addr) > 65535 { if len(addr) > 65535 {
return fmt.Errorf("代理地址过长: %s", addr) return fmt.Errorf("代理地址过长: %s", addr)
} }

View File

@@ -2,22 +2,24 @@ package fwd
import ( import (
"bufio" "bufio"
"context"
"errors" "errors"
"fmt" "fmt"
"github.com/google/uuid" "github.com/google/uuid"
"io" "io"
"log/slog" "log/slog"
"net" "net"
"proxy-server/gateway/app"
"proxy-server/gateway/debug" "proxy-server/gateway/debug"
"proxy-server/gateway/env" "proxy-server/gateway/env"
"proxy-server/gateway/fwd/metrics" "proxy-server/gateway/fwd/metrics"
utils2 "proxy-server/utils" "proxy-server/utils"
"strconv" "strconv"
"sync" "sync"
"time" "time"
) )
func (s *Service) listenData() error { func ListenData(ctx context.Context) error {
dataPort := env.AppDataPort dataPort := env.AppDataPort
// 监听端口 // 监听端口
@@ -25,7 +27,7 @@ func (s *Service) listenData() error {
if err != nil { if err != nil {
return fmt.Errorf("监听数据通道失败: %w", err) return fmt.Errorf("监听数据通道失败: %w", err)
} }
defer utils2.Close(ls) defer utils.Close(ls)
// 异步等待连接 // 异步等待连接
var connCh = make(chan net.Conn) var connCh = make(chan net.Conn)
@@ -42,8 +44,8 @@ func (s *Service) listenData() error {
} }
select { select {
case connCh <- conn: case connCh <- conn:
case <-s.ctx.Done(): case <-ctx.Done():
utils2.Close(conn) utils.Close(conn)
return return
} }
} }
@@ -52,14 +54,14 @@ func (s *Service) listenData() error {
// 处理连接 // 处理连接
for { for {
select { select {
case <-s.ctx.Done(): case <-ctx.Done():
return nil return nil
case conn := <-connCh: case conn := <-connCh:
s.dataConnWg.Add(1) app.DataConnWg.Add(1)
go func() { go func() {
defer s.dataConnWg.Done() defer app.DataConnWg.Done()
defer utils2.Close(conn) defer utils.Close(conn)
err := s.processDataConn(conn) err := processDataConn(ctx, conn)
if err != nil { if err != nil {
slog.Error("处理数据通道连接失败", "err", err) slog.Error("处理数据通道连接失败", "err", err)
} }
@@ -68,7 +70,7 @@ func (s *Service) listenData() error {
} }
} }
func (s *Service) processDataConn(client net.Conn) error { func processDataConn(ctx context.Context, client net.Conn) error {
var reader = bufio.NewReader(client) var reader = bufio.NewReader(client)
// 接收连接结果 // 接收连接结果
@@ -83,11 +85,11 @@ func (s *Service) processDataConn(client net.Conn) error {
// 加载用户连接 // 加载用户连接
var tagStr = uuid.UUID(tag).String() var tagStr = uuid.UUID(tag).String()
user, ok := s.userConnMap.LoadAndDelete(tagStr) user, ok := app.UserConnMap.LoadAndDelete(tagStr)
if !ok { if !ok {
return fmt.Errorf("用户连接已关闭tag%s", tagStr) return fmt.Errorf("用户连接已关闭tag%s", tagStr)
} }
defer utils2.Close(user) defer utils.Close(user)
// 检查状态 // 检查状态
if status != 1 { if status != 1 {
@@ -98,7 +100,7 @@ func (s *Service) processDataConn(client net.Conn) error {
data := time.Now() data := time.Now()
userPipeReader, userPipeWriter := io.Pipe() userPipeReader, userPipeWriter := io.Pipe()
defer utils2.Close(userPipeWriter) defer utils.Close(userPipeWriter)
teeUser := io.TeeReader(user, userPipeWriter) teeUser := io.TeeReader(user, userPipeWriter)
go func() { go func() {
@@ -127,10 +129,10 @@ func (s *Service) processDataConn(client net.Conn) error {
select { select {
case <-s.ctx.Done(): case <-ctx.Done():
return nil return nil
case <-utils2.WgWait(&wg): case <-utils.WgWait(&wg):
proxy := time.Now() proxy := time.Now()
start, startOk := metrics.TimerStart.Load(user.Conn) start, startOk := metrics.TimerStart.Load(user.Conn)

View File

@@ -3,22 +3,14 @@ package fwd
import ( import (
"context" "context"
"log/slog" "log/slog"
"proxy-server/gateway/core" "proxy-server/gateway/app"
"proxy-server/gateway/env" "proxy-server/gateway/env"
"proxy-server/utils"
"sync" "sync"
) )
type Service struct { type Service struct {
ctx context.Context ctx context.Context
cancel context.CancelFunc cancel context.CancelFunc
userConnMap core.SyncMap[string, *core.Conn]
fwdLesWg utils.CountWaitGroup
ctrlConnWg utils.CountWaitGroup
dataConnWg utils.CountWaitGroup
userConnWg utils.CountWaitGroup
} }
func New() *Service { func New() *Service {
@@ -41,7 +33,7 @@ func (s *Service) Run() error {
wg.Add(1) wg.Add(1)
go func() { go func() {
defer wg.Done() defer wg.Done()
err := s.listenCtrl() err := ListenCtrl(s.ctx)
if err != nil { if err != nil {
slog.Error("fwd 控制通道监听发生错误", "err", err) slog.Error("fwd 控制通道监听发生错误", "err", err)
errQuit <- struct{}{} errQuit <- struct{}{}
@@ -53,7 +45,7 @@ func (s *Service) Run() error {
wg.Add(1) wg.Add(1)
go func() { go func() {
defer wg.Done() defer wg.Done()
err := s.listenData() err := ListenData(s.ctx)
if err != nil { if err != nil {
slog.Error("fwd 数据通道监听发生错误", "err", err) slog.Error("fwd 数据通道监听发生错误", "err", err)
errQuit <- struct{}{} errQuit <- struct{}{}
@@ -70,10 +62,10 @@ func (s *Service) Run() error {
} }
wg.Wait() wg.Wait()
s.fwdLesWg.Wait() app.FwdLesWg.Wait()
s.ctrlConnWg.Wait() app.CtrlConnWg.Wait()
s.userConnWg.Wait() app.UserConnWg.Wait()
s.dataConnWg.Wait() app.DataConnWg.Wait()
return nil return nil
} }

View File

@@ -7,6 +7,7 @@ import (
"fmt" "fmt"
"io" "io"
"log/slog" "log/slog"
"proxy-server/gateway/app"
"proxy-server/gateway/core" "proxy-server/gateway/core"
"proxy-server/gateway/env" "proxy-server/gateway/env"
"proxy-server/gateway/fwd/dispatcher" "proxy-server/gateway/fwd/dispatcher"
@@ -15,7 +16,7 @@ import (
"time" "time"
) )
func (s *Service) listenUser(ctx context.Context, port uint16, ctrl io.Writer) error { func ListenUser(ctx context.Context, port uint16, ctrl io.Writer) error {
dspt, err := dispatcher.New(port, time.Duration(env.AppUserTimeout)*time.Second) dspt, err := dispatcher.New(port, time.Duration(env.AppUserTimeout)*time.Second)
if err != nil { if err != nil {
return err return err
@@ -44,10 +45,10 @@ func (s *Service) listenUser(ctx context.Context, port uint16, ctrl io.Writer) e
return err return err
case user := <-dspt.Conn: case user := <-dspt.Conn:
metrics.TimerAuth.Store(user.Conn, time.Now()) metrics.TimerAuth.Store(user.Conn, time.Now())
s.userConnWg.Add(1) app.UserConnWg.Add(1)
go func() { go func() {
defer s.userConnWg.Done() defer app.UserConnWg.Done()
err := s.processUserConn(user, ctrl) err := processUserConn(ctx, user, ctrl)
if err != nil { if err != nil {
slog.Error("处理用户连接失败", "err", err) slog.Error("处理用户连接失败", "err", err)
utils.Close(user) utils.Close(user)
@@ -57,16 +58,16 @@ func (s *Service) listenUser(ctx context.Context, port uint16, ctrl io.Writer) e
} }
} }
func (s *Service) processUserConn(user *core.Conn, ctrl io.Writer) (err error) { func processUserConn(ctx context.Context, user *core.Conn, ctrl io.Writer) (err error) {
// 发送代理命令 // 发送代理命令
err = s.sendProxy(ctrl, user.Tag, user.Dest.String()) err = sendProxy(ctrl, user.Tag, user.Dest.String())
if err != nil { if err != nil {
return err return err
} }
// 保存用户连接 // 保存用户连接
s.userConnMap.Store(hex.EncodeToString(user.Tag[:]), user) app.UserConnMap.Store(hex.EncodeToString(user.Tag[:]), user)
// 如果限定时间内没有建立数据通道,则关闭连接 // 如果限定时间内没有建立数据通道,则关闭连接
var timeout, cancel = context.WithTimeout(context.Background(), time.Duration(env.AppDataTimeout)*time.Second) var timeout, cancel = context.WithTimeout(context.Background(), time.Duration(env.AppDataTimeout)*time.Second)
@@ -75,11 +76,11 @@ func (s *Service) processUserConn(user *core.Conn, ctrl io.Writer) (err error) {
select { select {
case <-timeout.Done(): case <-timeout.Done():
err = timeout.Err() err = timeout.Err()
case <-s.ctx.Done(): case <-ctx.Done():
err = s.ctx.Err() err = ctx.Err()
} }
_, ok := s.userConnMap.LoadAndDelete(hex.EncodeToString(user.Tag[:])) _, ok := app.UserConnMap.LoadAndDelete(hex.EncodeToString(user.Tag[:]))
if ok { if ok {
utils.Close(user) utils.Close(user)
if errors.Is(err, context.DeadlineExceeded) { if errors.Is(err, context.DeadlineExceeded) {

View File

@@ -6,12 +6,12 @@ import (
"proxy-server/gateway/core" "proxy-server/gateway/core"
) )
type AuthReq struct { type PermitReq struct {
Id int32 `json:"id"` Id int32 `json:"id"`
core.Permit core.Permit
} }
func Auth(ctx *fiber.Ctx) (err error) { func Permit(ctx *fiber.Ctx) (err error) {
// 安全验证 // 安全验证
var sec core.SecuredReq var sec core.SecuredReq
@@ -20,7 +20,7 @@ func Auth(ctx *fiber.Ctx) (err error) {
} }
// 获取请求参数 // 获取请求参数
req, err := core.Decrypt[AuthReq](&sec, app.PlatformSecret) req, err := core.Decrypt[PermitReq](&sec, app.PlatformSecret)
if err != nil { if err != nil {
return err return err
} }

View File

@@ -0,0 +1,53 @@
package handlers
import (
"github.com/gofiber/fiber/v2"
"proxy-server/gateway/app"
"proxy-server/gateway/core"
)
type InfoResp struct {
Id int32 `json:"id"`
Name string `json:"name"`
FwdListeners int `json:"fwd_listeners"`
UserConnections int `json:"user_connections"`
CtrlConnections int `json:"ctrl_connections"`
DataConnections int `json:"data_connections"`
Edges []EdgeResp `json:"edges"`
}
type EdgeResp struct {
Id int32 `json:"id"`
Port uint16 `json:"port"`
Permit *core.Permit `json:"permit"`
}
func Info(c *fiber.Ctx) error {
var edges = make([]EdgeResp, 0)
app.Edges.Range(func(id int32, port uint16) bool {
permit, ok := app.Permits.Load(id)
if !ok {
return true
}
edges = append(edges, EdgeResp{
Id: id,
Port: port,
Permit: permit,
})
return true
})
return c.JSON(InfoResp{
Id: app.Id,
Name: app.Name,
FwdListeners: int(app.FwdLesWg.Count()),
UserConnections: int(app.UserConnWg.Count()),
CtrlConnections: int(app.CtrlConnWg.Count()),
DataConnections: int(app.DataConnWg.Count()),
Edges: edges,
})
}

View File

@@ -7,6 +7,10 @@ import (
func Router(r *fiber.App) { func Router(r *fiber.App) {
var debug = r.Group("/debug") var debug = r.Group("/debug")
debug.Get("/debug/consuming/list", handlers.GetConsuming) debug.Get("/info", handlers.Info)
debug.Get("/debug/consuming/reset", handlers.RestConsuming) debug.Get("/consuming/list", handlers.GetConsuming)
debug.Get("/consuming/reset", handlers.RestConsuming)
var api = r.Group("/api")
api.Post("/permit", handlers.Permit)
} }