package fwd import ( "bufio" "errors" "fmt" "github.com/google/uuid" "io" "log/slog" "net" "proxy-server/gateway/debug" "proxy-server/gateway/env" "proxy-server/gateway/fwd/metrics" "proxy-server/pkg/utils" "strconv" "sync" "time" ) func (s *Service) listenData() error { dataPort := env.AppDataPort slog.Debug("监听数据通道", slog.Uint64("port", uint64(dataPort))) // 监听端口 ls, err := net.Listen("tcp", ":"+strconv.Itoa(int(dataPort))) if err != nil { return fmt.Errorf("监听数据通道失败: %w", err) } defer utils.Close(ls) // 异步等待连接 var connCh = make(chan net.Conn) go func() { for { conn, err := ls.Accept() if errors.Is(err, net.ErrClosed) { slog.Debug("数据通道监听关闭") return } if err != nil { slog.Error("接受数据通道连接失败", "err", err) return } select { case connCh <- conn: case <-s.ctx.Done(): utils.Close(conn) return } } }() // 处理连接 for { select { case <-s.ctx.Done(): return nil case conn := <-connCh: s.dataConnWg.Add(1) go func() { defer s.dataConnWg.Done() defer utils.Close(conn) err := s.processDataConn(conn) if err != nil { slog.Error("处理数据通道连接失败", "err", err) } }() } } } func (s *Service) processDataConn(client net.Conn) error { var reader = bufio.NewReader(client) // 接收连接结果 var buf = make([]byte, 17) _, err := io.ReadFull(reader, buf) if err != nil { return fmt.Errorf("从客户端获取连接结果失败: %w", err) } tag := buf[0:16] status := buf[16] // 加载用户连接 var tagStr = uuid.UUID(tag).String() user, ok := s.userConnMap.LoadAndDelete(tagStr) if !ok { return fmt.Errorf("用户连接已关闭,tag:%s", tagStr) } defer utils.Close(user) // 检查状态 if status != 1 { return errors.New("目标地址建立连接失败") } // 转发数据 data := time.Now() userPipeReader, userPipeWriter := io.Pipe() defer utils.Close(userPipeWriter) teeUser := io.TeeReader(user, userPipeWriter) go func() { err := analysisAndLog(user, userPipeReader) if err != nil { slog.Error("数据解析失败", "err", err) } }() wg := sync.WaitGroup{} wg.Add(2) go func() { defer wg.Done() _, err := io.Copy(client, teeUser) if err != nil { slog.Error("数据转发失败 user->client", "err", err) } }() go func() { defer wg.Done() _, err := io.Copy(user, reader) if err != nil { slog.Error("数据转发失败 client->user", "err", err) } }() select { case <-s.ctx.Done(): return nil case <-utils.WgWait(&wg): proxy := time.Now() start, startOk := metrics.TimerStart.Load(user.Conn) auth, authOk := metrics.TimerAuth.Load(user.Conn) var authDuration time.Duration if startOk && authOk { authDuration = auth.(time.Time).Sub(start.(time.Time)) } var dataDuration time.Duration if authOk { dataDuration = data.Sub(auth.(time.Time)) } proxyDuration := proxy.Sub(data) var totalDuration time.Duration if startOk { totalDuration = proxy.Sub(start.(time.Time)) } debug.ConsumingCh <- debug.Consuming{ Auth: authDuration, Data: dataDuration, Proxy: proxyDuration, Total: totalDuration, } return nil } }