Files
proxy/gateway/fwd/data.go

190 lines
3.8 KiB
Go
Raw Blame History

This file contains ambiguous Unicode characters
This file contains Unicode characters that might be confused with other characters. If you think that this is intentional, you can safely ignore this warning. Use the Escape button to reveal them.
package fwd
import (
"bufio"
"context"
"encoding/hex"
"errors"
"fmt"
"io"
"log/slog"
"net"
"proxy-server/gateway/app"
"proxy-server/gateway/core"
"proxy-server/gateway/debug"
"proxy-server/gateway/env"
"proxy-server/gateway/fwd/metrics"
"proxy-server/utils"
"strconv"
"time"
)
func ListenData(ctx context.Context) error {
dataPort := env.AppDataPort
// 监听端口
ls, err := net.Listen("tcp", ":"+strconv.Itoa(int(dataPort)))
if err != nil {
return fmt.Errorf("监听数据通道失败: %w", err)
}
defer utils.Close(ls)
// 异步等待连接
var connCh = make(chan net.Conn)
go func() {
for {
conn, err := ls.Accept()
if errors.Is(err, net.ErrClosed) {
slog.Debug("数据通道监听关闭")
return
}
if err != nil {
slog.Error("接受数据通道连接失败", "err", err)
return
}
select {
case connCh <- conn:
case <-ctx.Done():
utils.Close(conn)
return
}
}
}()
// 处理连接
for {
select {
case <-ctx.Done():
return nil
case conn := <-connCh:
app.DataConnWg.Add(1)
go func() {
defer app.DataConnWg.Done()
defer func() {
utils.Close(conn)
slog.Debug("关闭数据通道连接")
}()
err := processDataConn(ctx, conn)
if err != nil {
slog.Error("处理数据通道连接失败", "err", err)
}
}()
}
}
}
func processDataConn(ctx context.Context, client net.Conn) error {
var reader = bufio.NewReader(client)
// 接收连接结果
var buf = make([]byte, 17)
_, err := io.ReadFull(reader, buf)
if err != nil {
return fmt.Errorf("从节点获取连接结果失败: %w", err)
}
tag := hex.EncodeToString(buf[0:16])
status := buf[16]
// 加载用户连接
user, ok := app.UserConnMap.LoadAndDelete(tag)
if !ok {
return fmt.Errorf("用户连接已关闭tag%s", tag)
}
defer func() {
utils.Close(user)
slog.Debug("关闭用户连接")
}()
// 检查状态
if status != 1 {
return errors.New("目标地址建立连接失败")
}
data := time.Now()
// 复制用户流量进行访问目标分析
userCopyFrom, userCopyTo := io.Pipe()
defer utils.Close(userCopyTo)
teeUser := io.TeeReader(user, userCopyTo)
go func() {
err := analysisAndLog(user, userCopyFrom)
if err != nil {
slog.Error("数据解析失败", "err", err)
}
}()
// 复制节点数据到用户
var waitEdge = make(chan error)
go func() {
_, err := io.Copy(user, reader)
switch {
case errors.Is(err, net.ErrClosed):
slog.Debug("节点连接意外关闭")
case err != nil:
slog.Error("读取节点数据失败", "err", err)
default:
slog.Debug("节点数据读取完成")
}
waitEdge <- err
}()
// 复制用户数据到节点
var waitUser = make(chan error)
go func() {
_, err := io.Copy(client, teeUser)
switch {
case errors.Is(err, net.ErrClosed):
slog.Debug("用户连接意外关闭")
case err != nil:
slog.Error("读取用户数据失败", "err", err)
default:
slog.Debug("用户数据读取完成")
}
waitUser <- err
}()
// 等待数据转发完成,关闭数据通道的时机:
select {
case <-ctx.Done():
slog.Debug("服务关闭")
case <-waitEdge:
case <-waitUser:
storeConnMatrics(user, data)
}
return nil
}
func storeConnMatrics(user *core.Conn, data time.Time) {
proxy := time.Now()
start, startOk := metrics.TimerStart.Load(user.Conn)
auth, authOk := metrics.TimerAuth.Load(user.Conn)
var authDuration time.Duration
if startOk && authOk {
authDuration = auth.(time.Time).Sub(start.(time.Time))
}
var dataDuration time.Duration
if authOk {
dataDuration = data.Sub(auth.(time.Time))
}
proxyDuration := proxy.Sub(data)
var totalDuration time.Duration
if startOk {
totalDuration = proxy.Sub(start.(time.Time))
}
debug.ConsumingCh <- debug.Consuming{
Auth: authDuration,
Data: dataDuration,
Proxy: proxyDuration,
Total: totalDuration,
}
}