优化数据分析和日志记录,重构连接管理,添加对空缓冲区的处理

This commit is contained in:
2025-02-28 17:50:48 +08:00
parent 06bcaf8bc7
commit b8a3dd93dc
8 changed files with 324 additions and 151 deletions

View File

@@ -24,7 +24,7 @@ type Service struct {
Config *Config
ctx context.Context
cancel context.CancelFunc
userConnMap map[string]socks.ProxyConn
userConnMap sync.Map
ctrlConnWg utils.CountWaitGroup
dataConnWg utils.CountWaitGroup
@@ -41,7 +41,7 @@ func New(config *Config) *Service {
Config: config,
ctx: ctx,
cancel: cancel,
userConnMap: make(map[string]socks.ProxyConn),
userConnMap: sync.Map{},
ctrlConnWg: utils.CountWaitGroup{},
dataConnWg: utils.CountWaitGroup{},
fwdLesWg: utils.CountWaitGroup{},
@@ -96,10 +96,13 @@ func (s *Service) Run() {
}
// 清理资源
for _, conn := range s.userConnMap {
s.userConnMap.Range(func(key, value any) bool {
conn := value.(socks.ProxyConn)
utils.Close(conn)
}
clear(s.userConnMap)
s.userConnMap.Delete(key)
return true
})
s.userConnMap.Clear()
s.ctrlConnWg.Wait()
slog.Debug("控制通道连接已关闭")
@@ -143,7 +146,7 @@ func (s *Service) startCtrlTun() error {
defer utils.Close(conn)
err := s.processCtrlConn(conn)
if err != nil {
slog.Error("处理控制通道连接失败", err)
slog.Error("处理控制通道连接失败", "err", err)
}
}()
}
@@ -213,7 +216,7 @@ func (s *Service) processCtrlConn(controller net.Conn) error {
slog.Error("向客户端发送 tag 失败", "err", err)
return
}
s.userConnMap[tag] = user
s.userConnMap.Store(tag, user)
}()
}
}
@@ -254,7 +257,7 @@ func (s *Service) startDataTun() error {
defer utils.Close(conn)
err := s.processDataConn(conn)
if err != nil {
slog.Error("处理数据通道失败", err)
slog.Error("处理数据通道失败", "err", err)
}
}()
}
@@ -277,17 +280,17 @@ func (s *Service) processDataConn(client net.Conn) error {
// 找到用户连接
var data socks.ProxyConn
var ok bool
select {
case <-s.ctx.Done():
return nil
default:
data, ok = s.userConnMap[tag]
dataAny, ok := s.userConnMap.Load(tag)
if !ok {
return errors.New("查找用户连接失败")
}
data = dataAny.(socks.ProxyConn)
defer func() {
delete(s.userConnMap, tag)
s.userConnMap.Delete(tag)
utils.Close(data)
}()
}
@@ -314,15 +317,21 @@ func (s *Service) processDataConn(client net.Conn) error {
// 数据转发
slog.Info("开始数据转发 " + client.RemoteAddr().String() + " <-> " + data.Dest)
// userPipeReader, userPipeWriter := io.Pipe()
// defer utils.Close(userPipeWriter)
// teeUser := io.TeeReader(user, userPipeWriter)
userPipeReader, userPipeWriter := io.Pipe()
defer utils.Close(userPipeWriter)
teeUser := io.TeeReader(user, userPipeWriter)
go func() {
err := analysisAndLog(data, userPipeReader)
if err != nil {
slog.Error("数据解析失败", "err", err)
}
}()
wg := sync.WaitGroup{}
wg.Add(1)
go func() {
defer wg.Done()
_, err := io.Copy(client, user)
_, err := io.Copy(client, teeUser)
if err != nil {
slog.Error("数据转发失败 user->client", "err", err)
}
@@ -332,7 +341,11 @@ func (s *Service) processDataConn(client net.Conn) error {
defer wg.Done()
_, err := io.Copy(user, client)
if err != nil {
slog.Error("数据转发失败 client->user", "err", err)
if errors.Is(err, net.ErrClosed) {
} else {
// slog.Error("数据转发失败 client->user", "err", err, "errType", reflect.TypeOf(err))
}
}
}()
wg.Wait()