重构代码结构,优化连接管理和日志记录

This commit is contained in:
2025-02-28 09:45:31 +08:00
parent 037c2c53c6
commit 06bcaf8bc7
15 changed files with 82 additions and 102 deletions

View File

@@ -6,8 +6,6 @@
ProxyConn 直接实现 Conn 相同的接口,不再取出 Conn 使用 ProxyConn 直接实现 Conn 相同的接口,不再取出 Conn 使用
web 服务目录结构,不要 app 那层了
配置退出等待时间 配置退出等待时间
log 控制台颜色,输出错误堆栈 log 控制台颜色,输出错误堆栈
@@ -24,6 +22,8 @@ log 控制台颜色,输出错误堆栈
### 长期 ### 长期
退出顺序好像有问题,需要检查
实现一个 socks context 以在子组件中获取 socks 相关信息 实现一个 socks context 以在子组件中获取 socks 相关信息
代理端口支持混合端口转发 代理端口支持混合端口转发

View File

@@ -13,7 +13,10 @@ func ChanConnAccept(ctx context.Context, ls net.Listener) chan net.Conn {
go func() { go func() {
for { for {
conn, err := ls.Accept() conn, err := ls.Accept()
if err != nil && !errors.Is(err, net.ErrClosed) { if err != nil {
if errors.Is(err, net.ErrClosed) {
return
}
slog.Error("接受连接失败", err) slog.Error("接受连接失败", err)
// 临时错误重试连接 // 临时错误重试连接
var ne net.Error var ne net.Error

View File

@@ -8,7 +8,7 @@ import (
"proxy-server/pkg/utils" "proxy-server/pkg/utils"
"proxy-server/server/fwd/socks" "proxy-server/server/fwd/socks"
"proxy-server/server/pkg/orm" "proxy-server/server/pkg/orm"
"proxy-server/server/web/app/models" "proxy-server/server/web/models"
"time" "time"
"github.com/pkg/errors" "github.com/pkg/errors"

View File

@@ -28,6 +28,7 @@ type Service struct {
ctrlConnWg utils.CountWaitGroup ctrlConnWg utils.CountWaitGroup
dataConnWg utils.CountWaitGroup dataConnWg utils.CountWaitGroup
fwdLesWg utils.CountWaitGroup
} }
func New(config *Config) *Service { func New(config *Config) *Service {
@@ -43,70 +44,71 @@ func New(config *Config) *Service {
userConnMap: make(map[string]socks.ProxyConn), userConnMap: make(map[string]socks.ProxyConn),
ctrlConnWg: utils.CountWaitGroup{}, ctrlConnWg: utils.CountWaitGroup{},
dataConnWg: utils.CountWaitGroup{}, dataConnWg: utils.CountWaitGroup{},
fwdLesWg: utils.CountWaitGroup{},
} }
} }
func (s *Service) Close() { func (s *Service) Close() {
start := time.Now()
s.cancel() s.cancel()
for _, conn := range s.userConnMap { slog.Debug("退出服务", "duration", time.Since(start))
utils.Close(conn)
}
clear(s.userConnMap)
} }
func (s *Service) Run() { func (s *Service) Run() {
slog.Debug("启动 fwd 服务") slog.Info("启动 fwd 服务")
errQuit := make(chan struct{}) errQuit := make(chan struct{})
defer close(errQuit) defer close(errQuit)
wg := sync.WaitGroup{} wg := sync.WaitGroup{}
// 启动工作协程 // 控制通道监听
wg.Add(1) wg.Add(1)
go func() { go func() {
defer wg.Done() defer wg.Done()
err := s.startCtrlTun() err := s.startCtrlTun()
if err != nil { if err != nil {
slog.Error("控制通道发生错误", "err", err) slog.Error("fwd 控制通道监听发生错误", "err", err)
errQuit <- struct{}{} errQuit <- struct{}{}
return return
} }
}() }()
// 数据通道监听
wg.Add(1) wg.Add(1)
go func() { go func() {
defer wg.Done() defer wg.Done()
err := s.startDataTun() err := s.startDataTun()
if err != nil { if err != nil {
slog.Error("数据通道发生错误", "err", err) slog.Error("fwd 数据通道监听发生错误", "err", err)
errQuit <- struct{}{} errQuit <- struct{}{}
return return
} }
}() }()
// 等待结束 // 等待退出
select { select {
case <-s.ctx.Done(): case <-s.ctx.Done():
slog.Debug("服务关闭") slog.Info("fwd 服务主动退出")
case <-errQuit: case <-errQuit:
slog.Debug("服务异常退出") slog.Warn("fwd 服务异常退出")
s.Close()
} }
// 退出 // 清理资源
s.Close() for _, conn := range s.userConnMap {
utils.Close(conn)
timeout, cancel := context.WithTimeout(context.Background(), 5*time.Second)
defer cancel()
wgCh := utils.ChanWgWait(timeout, &wg)
defer close(wgCh)
select {
case <-timeout.Done():
slog.Warn("关闭超时,强制关闭")
case <-wgCh:
slog.Debug("服务已退出")
} }
clear(s.userConnMap)
s.ctrlConnWg.Wait()
slog.Debug("控制通道连接已关闭")
s.dataConnWg.Wait()
slog.Debug("数据通道连接已关闭")
s.fwdLesWg.Wait()
slog.Debug("转发服务已关闭")
wg.Wait()
slog.Info("fwd 服务已退出")
} }
func (s *Service) startCtrlTun() error { func (s *Service) startCtrlTun() error {
@@ -125,15 +127,15 @@ func (s *Service) startCtrlTun() error {
defer close(connCh) defer close(connCh)
// 处理连接 // 处理连接
for loop := true; loop; { for {
select { select {
case <-s.ctx.Done(): case <-s.ctx.Done():
slog.Debug("结束处理连接,由于上下文取消") slog.Debug("服务关闭 startCtrlTun")
loop = false return nil
case conn, ok := <-connCh: case conn, ok := <-connCh:
if !ok { if !ok {
slog.Debug("结束处理连接,由于获取连接失败") slog.Debug("结束处理连接,由于获取连接失败")
loop = false return errors.New("获取连接失败")
} }
s.ctrlConnWg.Add(1) s.ctrlConnWg.Add(1)
go func() { go func() {
@@ -146,26 +148,10 @@ func (s *Service) startCtrlTun() error {
}() }()
} }
} }
// 等待子协程结束
timeout, cancel := context.WithTimeout(context.Background(), 5*time.Second)
defer cancel()
procCh := utils.ChanWgWait(timeout, &s.ctrlConnWg)
defer close(procCh)
select {
case <-timeout.Done():
slog.Warn("等待控制通道子协程结束超时")
case <-procCh:
slog.Debug("控制通道子协程结束")
}
slog.Debug("关闭控制通道")
return nil
} }
func (s *Service) processCtrlConn(controller net.Conn) error { func (s *Service) processCtrlConn(controller net.Conn) error {
slog.Info("客户端连入", "addr", controller.RemoteAddr().String()) slog.Debug("客户端连入", "addr", controller.RemoteAddr().String())
reader := bufio.NewReader(controller) reader := bufio.NewReader(controller)
@@ -177,7 +163,7 @@ func (s *Service) processCtrlConn(controller net.Conn) error {
port := binary.BigEndian.Uint16(portBuf) port := binary.BigEndian.Uint16(portBuf)
// 开放转发端口 todo 混合转发 // 开放转发端口 todo 混合转发
slog.Info("开放转发端口", "port", port) slog.Debug("开放转发端口", "port", port)
proxy, err := socks.New(&socks.Config{ proxy, err := socks.New(&socks.Config{
Name: strconv.Itoa(int(port)), Name: strconv.Itoa(int(port)),
Port: port, Port: port,
@@ -191,7 +177,9 @@ func (s *Service) processCtrlConn(controller net.Conn) error {
} }
defer proxy.Close() defer proxy.Close()
s.fwdLesWg.Add(1)
go func() { go func() {
defer s.fwdLesWg.Done()
err := proxy.Run() err := proxy.Run()
if err != nil { if err != nil {
slog.Error("代理服务启动失败", "err", err) slog.Error("代理服务启动失败", "err", err)
@@ -250,15 +238,15 @@ func (s *Service) startDataTun() error {
defer close(connCh) defer close(connCh)
// 处理连接 // 处理连接
for loop := true; loop; { for {
select { select {
case <-s.ctx.Done(): case <-s.ctx.Done():
slog.Debug("结束处理连接,由于上下文取消") slog.Debug("服务关闭 startDataTun")
loop = false return nil
case conn, ok := <-connCh: case conn, ok := <-connCh:
if !ok { if !ok {
slog.Debug("结束处理连接,由于获取连接失败") slog.Debug("结束处理连接,由于获取连接失败")
loop = false return errors.New("获取连接失败")
} }
s.dataConnWg.Add(1) s.dataConnWg.Add(1)
go func() { go func() {
@@ -271,22 +259,6 @@ func (s *Service) startDataTun() error {
}() }()
} }
} }
// 等待子协程结束 todo 可配置等待时间
timeout, cancel := context.WithTimeout(context.Background(), 5*time.Second)
defer cancel()
procCh := utils.ChanWgWait(timeout, &s.dataConnWg)
defer close(procCh)
select {
case <-timeout.Done():
slog.Warn("等待数据通道子协程结束超时")
case <-procCh:
slog.Debug("数据通道子协程结束")
}
slog.Debug("关闭数据通道")
return nil
} }
func (s *Service) processDataConn(client net.Conn) error { func (s *Service) processDataConn(client net.Conn) error {
@@ -303,22 +275,23 @@ func (s *Service) processDataConn(client net.Conn) error {
} }
tag := string(tagBuf) tag := string(tagBuf)
// 找到用户连接
var data socks.ProxyConn
var ok bool
select { select {
case <-s.ctx.Done(): case <-s.ctx.Done():
return nil return nil
default: default:
data, ok = s.userConnMap[tag]
if !ok {
return errors.New("查找用户连接失败")
}
defer func() {
delete(s.userConnMap, tag)
utils.Close(data)
}()
} }
// 找到用户连接
data, ok := s.userConnMap[tag]
if !ok {
return errors.New("查找用户连接失败")
}
defer func() {
delete(s.userConnMap, tag)
utils.Close(data)
}()
// 响应用户 // 响应用户
user := data.Conn user := data.Conn
err = socks.SendSuccess(user, client) err = socks.SendSuccess(user, client)

View File

@@ -253,7 +253,13 @@ func (s *Server) handleConnect(ctx context.Context, conn net.Conn, req *Request)
} }
slog.Info("需要向 " + req.DestAddr.Address() + " 建立连接") slog.Info("需要向 " + req.DestAddr.Address() + " 建立连接")
s.Conn <- ProxyConn{conn, req.realDestAddr.Address()} select {
case <-s.ctx.Done():
if conn != nil {
utils.Close(conn)
}
case s.Conn <- ProxyConn{conn, req.realDestAddr.Address()}:
}
return nil return nil
} }

View File

@@ -122,12 +122,13 @@ func (s *Server) Run() error {
for loop := true; loop; { for loop := true; loop; {
select { select {
case <-s.ctx.Done(): case <-s.ctx.Done():
slog.Debug("服务主动停止") slog.Debug("socks 服务主动停止")
loop = false loop = false
case conn, ok := <-connCh: case conn, ok := <-connCh:
if !ok { if !ok {
err = errors.New("意外错误,无法获取连接") err = errors.New("意外错误,无法获取连接")
loop = false loop = false
s.Close()
break break
} }
s.wg.Add(1) s.wg.Add(1)
@@ -141,9 +142,6 @@ func (s *Server) Run() error {
}() }()
} }
} }
if err != nil {
s.Close()
}
// 关闭服务 // 关闭服务
timeout, cancel := context.WithTimeout(context.Background(), 5*time.Second) timeout, cancel := context.WithTimeout(context.Background(), 5*time.Second)
@@ -157,10 +155,7 @@ func (s *Server) Run() error {
case <-wgCh: case <-wgCh:
} }
if s.Conn != nil { close(s.Conn)
close(s.Conn)
}
return err return err
} }

View File

@@ -37,6 +37,7 @@ func Start() {
defer close(errQuit) defer close(errQuit)
// 启动服务 // 启动服务
slog.Info("启动服务")
ctx, cancel := context.WithCancel(context.Background()) ctx, cancel := context.WithCancel(context.Background())
defer cancel() defer cancel()
@@ -55,9 +56,9 @@ func Start() {
// 等待退出信号 // 等待退出信号
select { select {
case <-osQuit: case <-osQuit:
slog.Info("服务关闭") slog.Info("服务主动退出")
case <-errQuit: case <-errQuit:
slog.Error("服务异常退出") slog.Warn("服务异常退出")
} }
// 退出服务 // 退出服务
@@ -69,11 +70,13 @@ func Start() {
close(wgCh) close(wgCh)
select { select {
case <-timeout.Done():
slog.Warn("关闭超时,强制关闭")
case <-wgCh: case <-wgCh:
slog.Debug("服务已退出") slog.Info("服务已退出")
case <-timeout.Done():
slog.Warn("退出超时,强制退出")
} }
time.Sleep(3 * time.Second)
} }
func initLog() { func initLog() {

View File

@@ -4,7 +4,7 @@ import (
"log/slog" "log/slog"
"proxy-server/server/pkg/orm" "proxy-server/server/pkg/orm"
"proxy-server/server/pkg/resp" "proxy-server/server/pkg/resp"
"proxy-server/server/web/app/models" "proxy-server/server/web/models"
"strings" "strings"
"time" "time"

View File

@@ -3,7 +3,7 @@ package handlers
import ( import (
"os" "os"
"proxy-server/server/pkg/orm" "proxy-server/server/pkg/orm"
"proxy-server/server/web/app/models" "proxy-server/server/web/models"
"github.com/gin-gonic/gin" "github.com/gin-gonic/gin"
"github.com/pkg/errors" "github.com/pkg/errors"

View File

@@ -1,17 +1,17 @@
package router package router
import ( import (
"proxy-server/server/web/app/handlers" handlers2 "proxy-server/server/web/handlers"
"github.com/gin-gonic/gin" "github.com/gin-gonic/gin"
) )
func Apply(r *gin.Engine) { func Apply(r *gin.Engine) {
r.POST("/node/register", handlers.NodeRegister) r.POST("/node/register", handlers2.NodeRegister)
r.POST("/node/report", handlers.NodeReport) r.POST("/node/report", handlers2.NodeReport)
r.POST("/chan/request", handlers.ChanRequest) r.POST("/chan/request", handlers2.ChanRequest)
r.POST("/chan/auth", handlers.ChanAuth) r.POST("/chan/auth", handlers2.ChanAuth)
r.POST("/chan/test", handlers.ChanTest) r.POST("/chan/test", handlers2.ChanTest)
} }