package fwd import ( "fmt" "io" "log/slog" "net" "proxy-server/pkg/utils" "proxy-server/server/debug" "proxy-server/server/env" "proxy-server/server/fwd/metrics" "strconv" "sync" "time" "errors" ) func (s *Service) startDataTun() error { dataPort := env.AppDataPort slog.Debug("监听数据通道", slog.Uint64("port", uint64(dataPort))) // 监听端口 ls, err := net.Listen("tcp", ":"+strconv.Itoa(int(dataPort))) if err != nil { return fmt.Errorf("监听数据通道失败: %w", err) } defer utils.Close(ls) go func() { <-s.ctx.Done() utils.Close(ls) }() for { conn, err := ls.Accept() if err != nil { return fmt.Errorf("监听数据通道失败: %w", err) } select { case <-s.ctx.Done(): utils.Close(conn) return nil default: s.dataConnWg.Add(1) go func() { defer s.dataConnWg.Done() defer utils.Close(conn) err := s.processDataConn(conn) if err != nil { slog.Error("建立数据通道失败失败", "err", err) } }() } } } func (s *Service) processDataConn(client net.Conn) error { // 接收 status status, err := utils.ReadByte(client) if err != nil { return fmt.Errorf("从客户端获取 status 失败: %w", err) } // 接收 tag tagLen, err := utils.ReadByte(client) if err != nil { return fmt.Errorf("从客户端获取 tag 失败: %w", err) } tagBuf, err := utils.ReadBuffer(client, int(tagLen)) if err != nil { return fmt.Errorf("从客户端获取 tag 失败: %w", err) } tag := string(tagBuf) // 找到用户连接 user, ok := s.userConnMap.LoadAndDelete(tag) if !ok { return errors.New("用户连接已关闭,tag:" + tag) } defer utils.Close(user) data := time.Now() // 检查状态 if status != 1 { return errors.New("目标地址建立连接失败") } // 数据转发 userPipeReader, userPipeWriter := io.Pipe() defer utils.Close(userPipeWriter) teeUser := io.TeeReader(user, userPipeWriter) go func() { err := analysisAndLog(user, userPipeReader) if err != nil { slog.Error("数据解析失败", "err", err) } }() wg := sync.WaitGroup{} wg.Add(2) go func() { defer wg.Done() _, err := io.Copy(client, teeUser) if err != nil { slog.Error("数据转发失败 user->client", "err", err) } }() go func() { defer wg.Done() _, err := io.Copy(user, client) if err != nil { slog.Error("数据转发失败 client->user", "err", err) } }() select { case <-s.ctx.Done(): case <-utils.ChanWgWait(s.ctx, &wg): } proxy := time.Now() start, startOk := metrics.TimerStart.Load(user.Conn) auth, authOk := metrics.TimerAuth.Load(user.Conn) var authDuration time.Duration if startOk && authOk { authDuration = auth.(time.Time).Sub(start.(time.Time)) } var dataDuration time.Duration if authOk { dataDuration = data.Sub(auth.(time.Time)) } proxyDuration := proxy.Sub(data) var totalDuration time.Duration if startOk { totalDuration = proxy.Sub(start.(time.Time)) } debug.ConsumingCh <- debug.Consuming{ Auth: authDuration, Data: dataDuration, Proxy: proxyDuration, Total: totalDuration, } return nil }