重命名客户端相关术语为节点;移动 utils 包到根路径;优化网关对节点各种连接状态的处理,并在节点断联后统一清理资源

This commit is contained in:
2025-05-17 10:00:28 +08:00
parent 20ac7dbd91
commit 84e01d3b50
14 changed files with 150 additions and 129 deletions

View File

@@ -9,7 +9,22 @@ var (
Name string
PlatformSecret string // 平台密钥,验证接收的请求是否属于平台
Clients = core.SyncMap[int32, uint16]{} // 节点 ID -> 转发端口
Edges = core.SyncMap[int32, uint16]{} // 节点 ID -> 转发端口
Assigns = core.SyncMap[uint16, int32]{} // 转发端口 -> 节点 ID
Permits = core.SyncMap[uint16, *core.Permit]{} // 转发端口 -> 权限配置
)
func AddEdge(id int32, port uint16) {
Edges.Store(id, port)
Assigns.Store(port, id)
}
func DelEdge(port uint16) {
id, _ := Assigns.LoadAndDelete(port)
Edges.Delete(id)
Permits.Delete(port)
}
func PermitEdge(port uint16, permit *core.Permit) {
Permits.Store(port, permit)
}

View File

@@ -7,7 +7,7 @@ import (
"io"
"log/slog"
"proxy-server/gateway/core"
"proxy-server/pkg/utils"
"proxy-server/utils"
"strings"
"errors"

View File

@@ -12,8 +12,9 @@ import (
"proxy-server/gateway/app"
"proxy-server/gateway/env"
"proxy-server/gateway/report"
"proxy-server/pkg/utils"
"proxy-server/utils"
"strconv"
"syscall"
)
type CtrlCmdType int
@@ -80,6 +81,21 @@ func (s *Service) listenCtrl() error {
}
func (s *Service) processCtrlConn(ctx context.Context, conn net.Conn) (err error) {
defer func() {
_, portStr, err := net.SplitHostPort(conn.LocalAddr().String())
if err != nil {
slog.Error("获取控制通道端口失败", "err", err)
return
}
port, err := strconv.ParseUint(portStr, 10, 16)
if err != nil {
slog.Error("解析控制通道端口失败", "err", err)
return
}
app.DelEdge(uint16(port))
}()
reader := bufio.NewReader(conn)
for {
// 循环等待直到服务关闭
@@ -90,12 +106,21 @@ func (s *Service) processCtrlConn(ctx context.Context, conn net.Conn) (err error
}
// 读取命令
cmdByte, err := reader.ReadByte()
cmd, err := reader.ReadByte()
if errors.Is(err, syscall.WSAECONNRESET) {
slog.Debug("节点重置了控制通道连接(WSAECONNRESET)")
return nil
}
if errors.Is(err, io.EOF) {
slog.Debug("节点关闭了控制通道")
return nil
}
if err != nil {
return fmt.Errorf("读取节点命令失败: %w", err)
}
var cmd = CtrlCmdType(cmdByte)
switch cmd {
// 处理节点命令
switch CtrlCmdType(cmd) {
// 连接建立命令
case CtrlCmdOpen:
@@ -132,15 +157,11 @@ func (s *Service) processCtrlConn(ctx context.Context, conn net.Conn) (err error
}
}
func (s *Service) onPing(conn net.Conn) (err error) {
return s.sendPong(conn)
}
func (s *Service) onOpen(conn net.Conn, client int32) (err error) {
func (s *Service) onOpen(writer io.Writer, edge int32) (err error) {
// open 命令全局只执行一次
_, ok := app.Clients.Load(client)
_, ok := app.Edges.Load(edge)
if ok {
return fmt.Errorf("节点 ID %d 已经连接", client)
return fmt.Errorf("节点 ID %d 已经连接", edge)
}
// 分配端口
@@ -151,8 +172,7 @@ func (s *Service) onOpen(conn net.Conn, client int32) (err error) {
var _, ok = app.Assigns.Load(i)
if !ok {
port = i
app.Assigns.Store(i, client)
app.Clients.Store(client, i)
app.AddEdge(edge, port)
break
}
}
@@ -161,62 +181,46 @@ func (s *Service) onOpen(conn net.Conn, client int32) (err error) {
}
// 报告端口分配
if err = report.Assigned(client, port); err != nil {
if err = report.Assigned(edge, port); err != nil {
return fmt.Errorf("报告端口分配失败: %w", err)
}
// 响应客户端
if err = s.sendPong(conn); err != nil {
return fmt.Errorf("响应客户端失败: %w", err)
// 响应节点
if err = s.sendPong(writer); err != nil {
return fmt.Errorf("响应节点失败: %w", err)
}
// 启动转发服务
s.fwdLesWg.Add(1)
go func() {
defer s.fwdLesWg.Done()
slog.Info("监听转发端口", "port", port, "client", client)
err = s.listenUser(port, conn)
slog.Info("监听转发端口", "port", port, "edge", edge)
err = s.listenUser(port, writer)
if err != nil {
slog.Error("监听转发端口失败", "port", port, "client", client, "err", err)
slog.Error("监听转发端口失败", "port", port, "edge", edge, "err", err)
}
}()
return nil
}
func (s *Service) onClose(conn net.Conn) (err error) {
_, portStr, err := net.SplitHostPort(conn.LocalAddr().String())
if err != nil {
return err
}
port, err := strconv.ParseUint(portStr, 10, 16)
if err != nil {
return err
}
id, _ := app.Assigns.LoadAndDelete(uint16(port))
app.Clients.Delete(id)
app.Assigns.Delete(uint16(port))
app.Permits.Delete(uint16(port))
err = s.sendPong(conn)
if err != nil {
return err
}
return nil
func (s *Service) onPing(writer io.Writer) (err error) {
return s.sendPong(writer)
}
func (s *Service) sendPong(conn net.Conn) (err error) {
_, err = conn.Write([]byte{byte(CtrlCmdPong)})
func (s *Service) onClose(writer io.Writer) (err error) {
return s.sendPong(writer)
}
func (s *Service) sendPong(writer io.Writer) (err error) {
_, err = writer.Write([]byte{byte(CtrlCmdPong)})
if err != nil {
return fmt.Errorf("响应客户端失败: %w", err)
return fmt.Errorf("响应节点失败: %w", err)
}
return nil
}
func (s *Service) sendProxy(conn net.Conn, tag [16]byte, addr string) (err error) {
func (s *Service) sendProxy(writer io.Writer, tag [16]byte, addr string) (err error) {
if len(addr) > 65535 {
return fmt.Errorf("代理地址过长: %s", addr)
}
@@ -227,7 +231,7 @@ func (s *Service) sendProxy(conn net.Conn, tag [16]byte, addr string) (err error
binary.BigEndian.PutUint16(buf[17:], uint16(len(addr)))
copy(buf[19:], addr)
_, err = conn.Write(buf)
_, err = writer.Write(buf)
if err != nil {
return fmt.Errorf("发送代理命令失败: %w", err)
}

View File

@@ -11,7 +11,7 @@ import (
"proxy-server/gateway/debug"
"proxy-server/gateway/env"
"proxy-server/gateway/fwd/metrics"
"proxy-server/pkg/utils"
utils2 "proxy-server/utils"
"strconv"
"sync"
"time"
@@ -26,7 +26,7 @@ func (s *Service) listenData() error {
if err != nil {
return fmt.Errorf("监听数据通道失败: %w", err)
}
defer utils.Close(ls)
defer utils2.Close(ls)
// 异步等待连接
var connCh = make(chan net.Conn)
@@ -44,7 +44,7 @@ func (s *Service) listenData() error {
select {
case connCh <- conn:
case <-s.ctx.Done():
utils.Close(conn)
utils2.Close(conn)
return
}
}
@@ -59,7 +59,7 @@ func (s *Service) listenData() error {
s.dataConnWg.Add(1)
go func() {
defer s.dataConnWg.Done()
defer utils.Close(conn)
defer utils2.Close(conn)
err := s.processDataConn(conn)
if err != nil {
slog.Error("处理数据通道连接失败", "err", err)
@@ -76,7 +76,7 @@ func (s *Service) processDataConn(client net.Conn) error {
var buf = make([]byte, 17)
_, err := io.ReadFull(reader, buf)
if err != nil {
return fmt.Errorf("从客户端获取连接结果失败: %w", err)
return fmt.Errorf("从节点获取连接结果失败: %w", err)
}
tag := buf[0:16]
@@ -88,7 +88,7 @@ func (s *Service) processDataConn(client net.Conn) error {
if !ok {
return fmt.Errorf("用户连接已关闭tag%s", tagStr)
}
defer utils.Close(user)
defer utils2.Close(user)
// 检查状态
if status != 1 {
@@ -99,7 +99,7 @@ func (s *Service) processDataConn(client net.Conn) error {
data := time.Now()
userPipeReader, userPipeWriter := io.Pipe()
defer utils.Close(userPipeWriter)
defer utils2.Close(userPipeWriter)
teeUser := io.TeeReader(user, userPipeWriter)
go func() {
@@ -131,7 +131,7 @@ func (s *Service) processDataConn(client net.Conn) error {
case <-s.ctx.Done():
return nil
case <-utils.WgWait(&wg):
case <-utils2.WgWait(&wg):
proxy := time.Now()
start, startOk := metrics.TimerStart.Load(user.Conn)

View File

@@ -9,7 +9,7 @@ import (
"proxy-server/gateway/fwd/http"
"proxy-server/gateway/fwd/metrics"
"proxy-server/gateway/fwd/socks"
"proxy-server/pkg/utils"
"proxy-server/utils"
"strconv"
"strings"
"time"

View File

@@ -4,7 +4,7 @@ import (
"context"
"log/slog"
"proxy-server/gateway/core"
"proxy-server/pkg/utils"
"proxy-server/utils"
"sync"
)
@@ -29,7 +29,7 @@ func New() *Service {
}
func (s *Service) Run() error {
slog.Debug("启动转发服务")
slog.Info("启动转发服务")
errQuit := make(chan struct{}, 2)
defer close(errQuit)

View File

@@ -12,7 +12,7 @@ import (
"net"
"proxy-server/gateway/core"
"proxy-server/gateway/fwd/auth"
"proxy-server/pkg/utils"
"proxy-server/utils"
"slices"
)
@@ -90,7 +90,7 @@ func Process(ctx context.Context, conn net.Conn) (*core.Conn, error) {
}, nil
}
// checkVersion 检查客户端版本
// checkVersion 检查节点版本
func checkVersion(reader io.Reader) error {
version, err := utils.ReadByte(reader)
if err != nil {
@@ -98,7 +98,7 @@ func checkVersion(reader io.Reader) error {
}
if version != Version {
return errors.New("客户端版本不兼容")
return errors.New("节点版本不兼容")
}
return nil
@@ -113,7 +113,7 @@ func authenticate(ctx context.Context, reader *bufio.Reader, conn net.Conn) (aut
return nil, err
}
// 获取客户端认证方式
// 获取节点认证方式
nAuth, err := utils.ReadByte(reader)
if err != nil {
return nil, err

View File

@@ -4,17 +4,17 @@ import (
"context"
"encoding/hex"
"errors"
"io"
"log/slog"
"net"
"proxy-server/gateway/core"
"proxy-server/gateway/env"
"proxy-server/gateway/fwd/dispatcher"
"proxy-server/gateway/fwd/metrics"
"proxy-server/pkg/utils"
"proxy-server/utils"
"time"
)
func (s *Service) listenUser(port uint16, ctrl net.Conn) error {
func (s *Service) listenUser(port uint16, ctrl io.Writer) error {
dspt, err := dispatcher.New(port, time.Duration(env.AppUserTimeout)*time.Second)
if err != nil {
return err
@@ -48,7 +48,7 @@ func (s *Service) listenUser(port uint16, ctrl net.Conn) error {
}
}
func (s *Service) processUserConn(user *core.Conn, ctrl net.Conn) (err error) {
func (s *Service) processUserConn(user *core.Conn, ctrl io.Writer) (err error) {
// 发送代理命令
err = s.sendProxy(ctrl, user.Tag, user.Dest.String())

View File

@@ -15,7 +15,7 @@ import (
"proxy-server/gateway/log"
"proxy-server/gateway/report"
"proxy-server/gateway/web"
"proxy-server/pkg/utils"
"proxy-server/utils"
"sync"
"time"
@@ -88,7 +88,7 @@ func (s *server) Run() (err error) {
// }()
// 报告上线
slog.Debug("报告服务上线")
slog.Info("报告服务上线")
err = report.Online(app.Name)
if err != nil {
return fmt.Errorf("服务上线失败: %w", err)
@@ -125,11 +125,11 @@ func (s *server) Run() (err error) {
// 等待其它服务关闭
select {
case <-utils.WgWait(&wg):
slog.Info("服务正常关闭")
case <-time.After(time.Duration(env.AppExitTimeout) * time.Second):
slog.Warn("超时强制关闭")
}
slog.Info("服务已退出")
return nil
}

View File

@@ -26,7 +26,7 @@ func Auth(ctx *fiber.Ctx) (err error) {
}
// 保存授权配置
app.Permits.Store(req.Port, &req.Permit)
app.PermitEdge(req.Port, &req.Permit)
return nil
}