Files
proxy/server/fwd/data.go

155 lines
3.1 KiB
Go
Raw Blame History

This file contains ambiguous Unicode characters
This file contains Unicode characters that might be confused with other characters. If you think that this is intentional, you can safely ignore this warning. Use the Escape button to reveal them.
package fwd
import (
"fmt"
"io"
"log/slog"
"net"
"proxy-server/pkg/utils"
"proxy-server/server/debug"
"proxy-server/server/fwd/metrics"
"proxy-server/server/pkg/env"
"strconv"
"sync"
"time"
"errors"
)
func (s *Service) startDataTun() error {
dataPort := env.AppDataPort
slog.Debug("监听数据通道", slog.Uint64("port", uint64(dataPort)))
// 监听端口
ls, err := net.Listen("tcp", ":"+strconv.Itoa(int(dataPort)))
if err != nil {
return fmt.Errorf("监听数据通道失败: %w", err)
}
defer utils.Close(ls)
go func() {
<-s.ctx.Done()
utils.Close(ls)
}()
for {
conn, err := ls.Accept()
if err != nil {
return fmt.Errorf("监听数据通道失败: %w", err)
}
select {
case <-s.ctx.Done():
utils.Close(conn)
return nil
default:
s.dataConnWg.Add(1)
go func() {
defer s.dataConnWg.Done()
defer utils.Close(conn)
err := s.processDataConn(conn)
if err != nil {
slog.Error("建立数据通道失败失败", "err", err)
}
}()
}
}
}
func (s *Service) processDataConn(client net.Conn) error {
// 接收 status
status, err := utils.ReadByte(client)
if err != nil {
return fmt.Errorf("从客户端获取 status 失败: %w", err)
}
// 接收 tag
tagLen, err := utils.ReadByte(client)
if err != nil {
return fmt.Errorf("从客户端获取 tag 失败: %w", err)
}
tagBuf, err := utils.ReadBuffer(client, int(tagLen))
if err != nil {
return fmt.Errorf("从客户端获取 tag 失败: %w", err)
}
tag := string(tagBuf)
// 找到用户连接
user, ok := s.userConnMap.LoadAndDelete(tag)
if !ok {
return errors.New("用户连接已关闭tag" + tag)
}
defer utils.Close(user)
data := time.Now()
// 检查状态
if status != 1 {
return errors.New("目标地址建立连接失败")
}
// 数据转发
userPipeReader, userPipeWriter := io.Pipe()
defer utils.Close(userPipeWriter)
teeUser := io.TeeReader(user, userPipeWriter)
go func() {
err := analysisAndLog(user, userPipeReader)
if err != nil {
slog.Error("数据解析失败", "err", err)
}
}()
wg := sync.WaitGroup{}
wg.Add(2)
go func() {
defer wg.Done()
_, err := io.Copy(client, teeUser)
if err != nil {
slog.Error("数据转发失败 user->client", "err", err)
}
}()
go func() {
defer wg.Done()
_, err := io.Copy(user, client)
if err != nil {
slog.Error("数据转发失败 client->user", "err", err)
}
}()
select {
case <-s.ctx.Done():
case <-utils.ChanWgWait(s.ctx, &wg):
}
proxy := time.Now()
start, startOk := metrics.TimerStart.Load(user.Conn)
auth, authOk := metrics.TimerAuth.Load(user.Conn)
var authDuration time.Duration
if startOk && authOk {
authDuration = auth.(time.Time).Sub(start.(time.Time))
}
var dataDuration time.Duration
if authOk {
dataDuration = data.Sub(auth.(time.Time))
}
proxyDuration := proxy.Sub(data)
var totalDuration time.Duration
if startOk {
totalDuration = proxy.Sub(start.(time.Time))
}
debug.ConsumingCh <- debug.Consuming{
Auth: authDuration,
Data: dataDuration,
Proxy: proxyDuration,
Total: totalDuration,
}
return nil
}