package fwd import ( "bufio" "context" "encoding/hex" "errors" "fmt" "io" "log/slog" "net" "proxy-server/gateway/app" "proxy-server/gateway/core" "proxy-server/gateway/debug" "proxy-server/gateway/env" "proxy-server/gateway/fwd/metrics" "proxy-server/utils" "strconv" "time" ) func ListenData(ctx context.Context) error { dataPort := env.AppDataPort // 监听端口 ls, err := net.Listen("tcp", ":"+strconv.Itoa(int(dataPort))) if err != nil { return fmt.Errorf("监听数据通道失败: %w", err) } defer utils.Close(ls) // 异步等待连接 var connCh = make(chan net.Conn) go func() { for { conn, err := ls.Accept() if errors.Is(err, net.ErrClosed) { slog.Debug("数据通道监听关闭") return } if err != nil { slog.Error("接受数据通道连接失败", "err", err) return } select { case connCh <- conn: case <-ctx.Done(): utils.Close(conn) return } } }() // 处理连接 for { select { case <-ctx.Done(): return nil case conn := <-connCh: app.DataConnWg.Add(1) go func() { defer app.DataConnWg.Done() defer func() { utils.Close(conn) slog.Debug("关闭数据通道连接") }() err := processDataConn(ctx, conn) if err != nil { slog.Error("处理数据通道连接失败", "err", err) } }() } } } func processDataConn(ctx context.Context, client net.Conn) error { var reader = bufio.NewReader(client) // 接收连接结果 var buf = make([]byte, 17) _, err := io.ReadFull(reader, buf) if err != nil { return fmt.Errorf("从节点获取连接结果失败: %w", err) } tag := hex.EncodeToString(buf[0:16]) status := buf[16] // 加载用户连接 user, ok := app.UserConnMap.LoadAndDelete(tag) if !ok { return fmt.Errorf("用户连接已关闭,tag:%s", tag) } defer func() { utils.Close(user) slog.Debug("关闭用户连接") }() // 检查状态 if status != 1 { return errors.New("目标地址建立连接失败") } data := time.Now() // 复制用户流量进行访问目标分析 userCopyFrom, userCopyTo := io.Pipe() defer utils.Close(userCopyTo) teeUser := io.TeeReader(user, userCopyTo) go func() { err := analysisAndLog(user, userCopyFrom) if err != nil { slog.Error("数据解析失败", "err", err) } }() // 复制节点数据到用户 var waitEdge = make(chan error) go func() { _, err := io.Copy(user, reader) switch { case errors.Is(err, net.ErrClosed): slog.Debug("节点连接意外关闭") case err != nil: slog.Error("读取节点数据失败", "err", err) default: slog.Debug("节点数据读取完成") } waitEdge <- err }() // 复制用户数据到节点 var waitUser = make(chan error) go func() { _, err := io.Copy(client, teeUser) switch { case errors.Is(err, net.ErrClosed): slog.Debug("用户连接意外关闭") case err != nil: slog.Error("读取用户数据失败", "err", err) default: slog.Debug("用户数据读取完成") } waitUser <- err }() // 等待数据转发完成,关闭数据通道的时机: select { case <-ctx.Done(): slog.Debug("服务关闭") case <-waitEdge: case <-waitUser: storeConnMatrics(user, data) } return nil } func storeConnMatrics(user *core.Conn, data time.Time) { proxy := time.Now() start, startOk := metrics.TimerStart.Load(user.Conn) auth, authOk := metrics.TimerAuth.Load(user.Conn) var authDuration time.Duration if startOk && authOk { authDuration = auth.(time.Time).Sub(start.(time.Time)) } var dataDuration time.Duration if authOk { dataDuration = data.Sub(auth.(time.Time)) } proxyDuration := proxy.Sub(data) var totalDuration time.Duration if startOk { totalDuration = proxy.Sub(start.(time.Time)) } debug.ConsumingCh <- debug.Consuming{ Auth: authDuration, Data: dataDuration, Proxy: proxyDuration, Total: totalDuration, } }