diff --git a/edge/edge.go b/edge/edge.go index c8d5032..73dde7f 100644 --- a/edge/edge.go +++ b/edge/edge.go @@ -118,7 +118,6 @@ func ctrl(ctx context.Context, id int32, host string) error { case <-ctx.Done(): return case tick := <-ticker.C: - slog.Debug("发送心跳", "time", tick) err := sendPing(conn) if err != nil { slog.Error("发送心跳失败", "time", tick, "err", err) @@ -225,20 +224,40 @@ func data(proxy string, dest string, tag [16]byte) error { return dstErr } + var waitSrc = make(chan error) go func() { - defer utils.Close(dst) _, err := io.Copy(dst, src) - if err != nil && !errors.Is(err, net.ErrClosed) { - slog.Error("上行流量代理失败", "err", err) + switch { + case errors.Is(err, net.ErrClosed): + slog.Debug("网关连接意外关闭") + case err != nil: + slog.Error("读取网关数据失败", "err", err) + default: + slog.Debug("网关数据读取完成") } + waitSrc <- err }() + + var waitDst = make(chan error) go func() { - defer utils.Close(src) _, err := io.Copy(src, dst) - if err != nil && !errors.Is(err, net.ErrClosed) { - slog.Error("下行流量代理失败", "err", err) + switch { + case errors.Is(err, net.ErrClosed): + slog.Debug("目标连接意外关闭") + case err != nil && !errors.Is(err, net.ErrClosed): + slog.Error("读取目标数据失败", "err", err) + default: + slog.Debug("目标数据读取完成") } + waitDst <- err }() + + // 等待任意一方关闭数据连接 + select { + case <-waitSrc: + case <-waitDst: + } + return nil } diff --git a/gateway/app/app.go b/gateway/app/app.go index 9a26c17..2a6e4e6 100644 --- a/gateway/app/app.go +++ b/gateway/app/app.go @@ -33,7 +33,6 @@ func AddEdge(id int32, port uint16) { func DelEdge(port uint16) { id, _ := Assigns.LoadAndDelete(port) Edges.Delete(id) - Permits.Delete(id) } func LoadPermit(port uint16) *core.Permit { diff --git a/gateway/core/conn.go b/gateway/core/conn.go index 90fc288..f4921a6 100644 --- a/gateway/core/conn.go +++ b/gateway/core/conn.go @@ -63,7 +63,6 @@ func (a FwdAddr) String() string { } type AuthContext struct { - Timeout float64 Payload Payload Meta map[string]any } diff --git a/gateway/env/env.go b/gateway/env/env.go index b19bb6b..d4abea6 100644 --- a/gateway/env/env.go +++ b/gateway/env/env.go @@ -3,8 +3,10 @@ package env import ( "fmt" "log/slog" + "net" "os" "strconv" + "strings" "github.com/joho/godotenv" ) @@ -20,6 +22,8 @@ var ( AppDataTimeout = 10 // 等待数据通道连接的超时时间 AppUserTimeout = 10 // 等待用户发送数据的超时时间(端口复用需要分析协议,如果用户长期不发送数据,将会阻塞分析协程) + AuthWhitelist []net.IP // 全局白名单,可以将白名单 IP 视为一个可信任代理 + ClientId string ClientSecret string @@ -99,6 +103,31 @@ func Init() { AppDataTimeout = appDataTimeout } + value = os.Getenv("APP_USER_TIMEOUT") + if value != "" { + appUserTimeout, err := strconv.Atoi(value) + if err != nil { + panic(fmt.Sprintf("环境变量 APP_USER_TIMEOUT 格式错误: %v", err)) + } + AppUserTimeout = appUserTimeout + } + + value = os.Getenv("AUTH_WHITELIST") + if value != "" { + ips := strings.Split(value, ",") + for _, ip := range ips { + ip = strings.TrimSpace(ip) + if ip == "" { + continue + } + parsedIP := net.ParseIP(ip) + if parsedIP == nil { + panic(fmt.Sprintf("环境变量 AUTH_WHITELIST 格式错误: %s", ip)) + } + AuthWhitelist = append(AuthWhitelist, parsedIP) + } + } + value = os.Getenv("CLIENT_ID") if value != "" { ClientId = value diff --git a/gateway/fwd/auth/auth.go b/gateway/fwd/auth/auth.go index 07a7bd2..816cba3 100644 --- a/gateway/fwd/auth/auth.go +++ b/gateway/fwd/auth/auth.go @@ -5,6 +5,8 @@ import ( "net" "proxy-server/gateway/app" "proxy-server/gateway/core" + "proxy-server/gateway/env" + "slices" "strconv" "time" @@ -35,6 +37,21 @@ func Protect(conn net.Conn, proto Protocol, username, password *string) (*core.A return nil, fmt.Errorf("noAuth 认证失败: %w", err) } + var id, _ = app.Assigns.Load(uint16(localPort)) + + // 检查全局白名单 + var remoteIp = net.ParseIP(remoteHost) + if remoteIp == nil { + return nil, fmt.Errorf("无法解析 IP 地址: %s", remoteHost) + } + if slices.ContainsFunc(env.AuthWhitelist, func(ip net.IP) bool { return ip.Equal(remoteIp) }) { + return &core.AuthContext{ + Payload: core.Payload{ + ID: id, + }, + }, nil + } + // 查找权限配置 var permit = app.LoadPermit(uint16(localPort)) if permit == nil { @@ -68,9 +85,7 @@ func Protect(conn net.Conn, proto Protocol, username, password *string) (*core.A } } - var id, _ = app.Assigns.Load(uint16(localPort)) return &core.AuthContext{ - Timeout: time.Since(permit.Expire).Seconds(), Payload: core.Payload{ ID: id, }, diff --git a/gateway/fwd/data.go b/gateway/fwd/data.go index d13f793..bca7dfe 100644 --- a/gateway/fwd/data.go +++ b/gateway/fwd/data.go @@ -3,19 +3,19 @@ package fwd import ( "bufio" "context" + "encoding/hex" "errors" "fmt" - "github.com/google/uuid" "io" "log/slog" "net" "proxy-server/gateway/app" + "proxy-server/gateway/core" "proxy-server/gateway/debug" "proxy-server/gateway/env" "proxy-server/gateway/fwd/metrics" "proxy-server/utils" "strconv" - "sync" "time" ) @@ -60,7 +60,10 @@ func ListenData(ctx context.Context) error { app.DataConnWg.Add(1) go func() { defer app.DataConnWg.Done() - defer utils.Close(conn) + defer func() { + utils.Close(conn) + slog.Debug("关闭数据通道连接") + }() err := processDataConn(ctx, conn) if err != nil { slog.Error("处理数据通道连接失败", "err", err) @@ -80,88 +83,107 @@ func processDataConn(ctx context.Context, client net.Conn) error { return fmt.Errorf("从节点获取连接结果失败: %w", err) } - tag := buf[0:16] + tag := hex.EncodeToString(buf[0:16]) status := buf[16] // 加载用户连接 - var tagStr = uuid.UUID(tag).String() - user, ok := app.UserConnMap.LoadAndDelete(tagStr) + user, ok := app.UserConnMap.LoadAndDelete(tag) if !ok { - return fmt.Errorf("用户连接已关闭,tag:%s", tagStr) + return fmt.Errorf("用户连接已关闭,tag:%s", tag) } - defer utils.Close(user) + defer func() { + utils.Close(user) + slog.Debug("关闭用户连接") + }() // 检查状态 if status != 1 { return errors.New("目标地址建立连接失败") } - // 转发数据 data := time.Now() - userPipeReader, userPipeWriter := io.Pipe() - defer utils.Close(userPipeWriter) + // 复制用户流量进行访问目标分析 + userCopyFrom, userCopyTo := io.Pipe() + defer utils.Close(userCopyTo) - teeUser := io.TeeReader(user, userPipeWriter) + teeUser := io.TeeReader(user, userCopyTo) go func() { - err := analysisAndLog(user, userPipeReader) + err := analysisAndLog(user, userCopyFrom) if err != nil { slog.Error("数据解析失败", "err", err) } }() - wg := sync.WaitGroup{} - wg.Add(2) + // 复制节点数据到用户 + var waitEdge = make(chan error) go func() { - defer wg.Done() - _, err := io.Copy(client, teeUser) - if err != nil { - slog.Error("数据转发失败 user->client", "err", err) - } - }() - go func() { - defer wg.Done() _, err := io.Copy(user, reader) - if err != nil { - slog.Error("数据转发失败 client->user", "err", err) + switch { + case errors.Is(err, net.ErrClosed): + slog.Debug("节点连接意外关闭") + case err != nil: + slog.Error("读取节点数据失败", "err", err) + default: + slog.Debug("节点数据读取完成") } + waitEdge <- err }() + // 复制用户数据到节点 + var waitUser = make(chan error) + go func() { + _, err := io.Copy(client, teeUser) + switch { + case errors.Is(err, net.ErrClosed): + slog.Debug("用户连接意外关闭") + case err != nil: + slog.Error("读取用户数据失败", "err", err) + default: + slog.Debug("用户数据读取完成") + } + waitUser <- err + }() + + // 等待数据转发完成,关闭数据通道的时机: select { - case <-ctx.Done(): - return nil + slog.Debug("服务关闭") + case <-waitEdge: + case <-waitUser: + storeConnMatrics(user, data) + } - case <-utils.WgWait(&wg): - proxy := time.Now() + return nil +} - start, startOk := metrics.TimerStart.Load(user.Conn) - auth, authOk := metrics.TimerAuth.Load(user.Conn) +func storeConnMatrics(user *core.Conn, data time.Time) { + proxy := time.Now() - var authDuration time.Duration - if startOk && authOk { - authDuration = auth.(time.Time).Sub(start.(time.Time)) - } + start, startOk := metrics.TimerStart.Load(user.Conn) + auth, authOk := metrics.TimerAuth.Load(user.Conn) - var dataDuration time.Duration - if authOk { - dataDuration = data.Sub(auth.(time.Time)) - } + var authDuration time.Duration + if startOk && authOk { + authDuration = auth.(time.Time).Sub(start.(time.Time)) + } - proxyDuration := proxy.Sub(data) + var dataDuration time.Duration + if authOk { + dataDuration = data.Sub(auth.(time.Time)) + } - var totalDuration time.Duration - if startOk { - totalDuration = proxy.Sub(start.(time.Time)) - } + proxyDuration := proxy.Sub(data) - debug.ConsumingCh <- debug.Consuming{ - Auth: authDuration, - Data: dataDuration, - Proxy: proxyDuration, - Total: totalDuration, - } + var totalDuration time.Duration + if startOk { + totalDuration = proxy.Sub(start.(time.Time)) + } - return nil + debug.ConsumingCh <- debug.Consuming{ + Auth: authDuration, + Data: dataDuration, + Proxy: proxyDuration, + Total: totalDuration, } } diff --git a/gateway/fwd/user.go b/gateway/fwd/user.go index 4256ace..bf62035 100644 --- a/gateway/fwd/user.go +++ b/gateway/fwd/user.go @@ -67,7 +67,8 @@ func processUserConn(ctx context.Context, user *core.Conn, ctrl io.Writer) (err } // 保存用户连接 - app.UserConnMap.Store(hex.EncodeToString(user.Tag[:]), user) + var tag = hex.EncodeToString(user.Tag[:]) + app.UserConnMap.Store(tag, user) // 如果限定时间内没有建立数据通道,则关闭连接 var timeout, cancel = context.WithTimeout(context.Background(), time.Duration(env.AppDataTimeout)*time.Second) @@ -80,11 +81,11 @@ func processUserConn(ctx context.Context, user *core.Conn, ctrl io.Writer) (err err = ctx.Err() } - _, ok := app.UserConnMap.LoadAndDelete(hex.EncodeToString(user.Tag[:])) + _, ok := app.UserConnMap.LoadAndDelete(tag) if ok { utils.Close(user) if errors.Is(err, context.DeadlineExceeded) { - slog.Error("用户连接超时", "tag", hex.EncodeToString(user.Tag[:]), "addr", user.RemoteAddr().String()) + slog.Error("用户连接超时", "tag", tag, "addr", user.RemoteAddr().String()) } } diff --git a/go.mod b/go.mod index 50e7e47..464a8b2 100644 --- a/go.mod +++ b/go.mod @@ -13,17 +13,16 @@ require ( ) require ( - github.com/andybalholm/brotli v1.1.0 // indirect + github.com/andybalholm/brotli v1.1.1 // indirect github.com/cespare/xxhash/v2 v2.3.0 // indirect github.com/dgryski/go-rendezvous v0.0.0-20200823014737-9f7001d12a5f // indirect - github.com/klauspost/compress v1.17.9 // indirect + github.com/klauspost/compress v1.18.0 // indirect github.com/mattn/go-isatty v0.0.20 // indirect github.com/mattn/go-runewidth v0.0.16 // indirect - github.com/rivo/uniseg v0.2.0 // indirect + github.com/rivo/uniseg v0.4.7 // indirect github.com/valyala/bytebufferpool v1.0.0 // indirect - github.com/valyala/fasthttp v1.51.0 // indirect - github.com/valyala/tcplisten v1.0.0 // indirect - golang.org/x/net v0.35.0 // indirect - golang.org/x/sys v0.30.0 // indirect - golang.org/x/text v0.22.0 // indirect + github.com/valyala/fasthttp v1.59.0 // indirect + golang.org/x/net v0.37.0 // indirect + golang.org/x/sys v0.31.0 // indirect + golang.org/x/text v0.23.0 // indirect ) diff --git a/go.sum b/go.sum index 857e3ed..790b990 100644 --- a/go.sum +++ b/go.sum @@ -1,5 +1,4 @@ -github.com/andybalholm/brotli v1.1.0 h1:eLKJA0d02Lf0mVpIDgYnqXcUn0GqVmEFny3VuID1U3M= -github.com/andybalholm/brotli v1.1.0/go.mod h1:sms7XGricyQI9K10gOSf56VKKWS4oLer58Q+mhRPtnY= +github.com/andybalholm/brotli v1.1.1 h1:PR2pgnyFznKEugtsUo0xLdDop5SKXd5Qf5ysW+7XdTA= github.com/bsm/ginkgo/v2 v2.12.0 h1:Ny8MWAHyOepLGlLKYmXG4IEkioBysk6GpaRTLC8zwWs= github.com/bsm/ginkgo/v2 v2.12.0/go.mod h1:SwYbGRRDovPVboqFv0tPTcG1sN61LM1Z4ARdbAV9g4c= github.com/bsm/gomega v1.27.10 h1:yeMWxP2pV2fG3FgAODIY8EiRE3dy0aeFYt4l7wh6yKA= @@ -14,8 +13,7 @@ github.com/google/uuid v1.6.0 h1:NIvaJDMOsjHA8n1jAhLSgzrAzy1Hgr+hNrb57e+94F0= github.com/google/uuid v1.6.0/go.mod h1:TIyPZe4MgqvfeYDBFedMoGGpEw/LqOeaOT+nhxU+yHo= github.com/joho/godotenv v1.5.1 h1:7eLL/+HRGLY0ldzfGMeQkb7vMd0as4CfYvUVzLqw0N0= github.com/joho/godotenv v1.5.1/go.mod h1:f4LDr5Voq0i2e/R5DDNOoa2zzDfwtkZa6DnEwAbqwq4= -github.com/klauspost/compress v1.17.9 h1:6KIumPrER1LHsvBVuDa0r5xaG0Es51mhhB9BQB2qeMA= -github.com/klauspost/compress v1.17.9/go.mod h1:Di0epgTjJY877eYKx5yC51cX2A2Vl2ibi7bDH9ttBbw= +github.com/klauspost/compress v1.18.0 h1:c/Cqfb0r+Yi+JtIEq73FWXVkRonBlf0CRNYc8Zttxdo= github.com/lmittmann/tint v1.0.7 h1:D/0OqWZ0YOGZ6AyC+5Y2kD8PBEzBk6rFHVSfOqCkF9Y= github.com/lmittmann/tint v1.0.7/go.mod h1:HIS3gSy7qNwGCj+5oRjAutErFBl4BzdQP6cJZ0NfMwE= github.com/mattn/go-colorable v0.1.14 h1:9A9LHSqF/7dyVVX6g0U9cwm9pG3kP9gSzcuIPHPsaIE= @@ -26,30 +24,25 @@ github.com/mattn/go-runewidth v0.0.16 h1:E5ScNMtiwvlvB5paMFdw9p4kSQzbXFikJ5SQO6T github.com/mattn/go-runewidth v0.0.16/go.mod h1:Jdepj2loyihRzMpdS35Xk/zdY8IAYHsh153qUoGf23w= github.com/redis/go-redis/v9 v9.8.0 h1:q3nRvjrlge/6UD7eTu/DSg2uYiU2mCL0G/uzBWqhicI= github.com/redis/go-redis/v9 v9.8.0/go.mod h1:huWgSWd8mW6+m0VPhJjSSQ+d6Nh1VICQ6Q5lHuCH/Iw= -github.com/rivo/uniseg v0.2.0 h1:S1pD9weZBuJdFmowNwbpi7BJ8TNftyUImj/0WQi72jY= github.com/rivo/uniseg v0.2.0/go.mod h1:J6wj4VEh+S6ZtnVlnTBMWIodfgj8LQOQFoIToxlJtxc= +github.com/rivo/uniseg v0.4.7 h1:WUdvkW8uEhrYfLC4ZzdpI2ztxP1I582+49Oc5Mq64VQ= github.com/soheilhy/cmux v0.1.5 h1:jjzc5WVemNEDTLwv9tlmemhC73tI08BNOIGwBOo10Js= github.com/soheilhy/cmux v0.1.5/go.mod h1:T7TcVDs9LWfQgPlPsdngu6I6QIoyIFZDDC6sNE1GqG0= github.com/valyala/bytebufferpool v1.0.0 h1:GqA5TC/0021Y/b9FG4Oi9Mr3q7XYx6KllzawFIhcdPw= github.com/valyala/bytebufferpool v1.0.0/go.mod h1:6bBcMArwyJ5K/AmCkWv1jt77kVWyCJ6HpOuEn7z0Csc= -github.com/valyala/fasthttp v1.51.0 h1:8b30A5JlZ6C7AS81RsWjYMQmrZG6feChmgAolCl1SqA= -github.com/valyala/fasthttp v1.51.0/go.mod h1:oI2XroL+lI7vdXyYoQk03bXBThfFl2cVdIA3Xl7cH8g= -github.com/valyala/tcplisten v1.0.0 h1:rBHj/Xf+E1tRGZyWIWwJDiRY0zc1Js+CV5DqwacVSA8= -github.com/valyala/tcplisten v1.0.0/go.mod h1:T0xQ8SeCZGxckz9qRXTfG43PvQ/mcWh7FwZEA7Ioqkc= +github.com/valyala/fasthttp v1.59.0 h1:Qu0qYHfXvPk1mSLNqcFtEk6DpxgA26hy6bmydotDpRI= +github.com/xyproto/randomstring v1.0.5 h1:YtlWPoRdgMu3NZtP45drfy1GKoojuR7hmRcnhZqKjWU= golang.org/x/crypto v0.0.0-20190308221718-c2843e01d9a2/go.mod h1:djNgcEr1/C05ACkg1iLfiJU5Ep61QUkGW8qpdssI0+w= golang.org/x/crypto v0.0.0-20200622213623-75b288015ac9/go.mod h1:LzIPMQfyMNhhGPhUkYOs5KpL4U8rLKemX1yGLhDgUto= golang.org/x/net v0.0.0-20190404232315-eb5bcb51f2a3/go.mod h1:t9HGtf8HONx5eT2rtn7q6eTqICYqUVnKs3thJo3Qplg= golang.org/x/net v0.0.0-20201202161906-c7110b5ffcbb/go.mod h1:sp8m0HH+o8qH0wwXwYZr8TS3Oi6o0r6Gce1SSxlDquU= -golang.org/x/net v0.35.0 h1:T5GQRQb2y08kTAByq9L4/bz8cipCdA8FbRTXewonqY8= -golang.org/x/net v0.35.0/go.mod h1:EglIi67kWsHKlRzzVMUD93VMSWGFOMSZgxFjparz1Qk= +golang.org/x/net v0.37.0 h1:1zLorHbz+LYj7MQlSf1+2tPIIgibq2eL5xkrGk6f+2c= golang.org/x/sys v0.0.0-20190215142949-d0b11bdaac8a/go.mod h1:STP8DvDyc/dI5b8T5hshtkjS+E42TnysNCUPdjciGhY= golang.org/x/sys v0.0.0-20190412213103-97732733099d/go.mod h1:h1NjWce9XRLGQEsW7wpKNCjG9DtNlClVuFLEZdDNbEs= golang.org/x/sys v0.0.0-20200930185726-fdedc70b468f/go.mod h1:h1NjWce9XRLGQEsW7wpKNCjG9DtNlClVuFLEZdDNbEs= golang.org/x/sys v0.6.0/go.mod h1:oPkhp1MJrh7nUepCBck5+mAzfO9JrbApNNgaTdGDITg= -golang.org/x/sys v0.30.0 h1:QjkSwP/36a20jFYWkSue1YwXzLmsV5Gfq7Eiy72C1uc= -golang.org/x/sys v0.30.0/go.mod h1:/VUhepiaJMQUp4+oa/7Zr1D23ma6VTLIYjOOTFZPUcA= +golang.org/x/sys v0.31.0 h1:ioabZlmFYtWhL+TRYpcnNlLwhyxaM9kWTDEmfnprqik= golang.org/x/text v0.3.0/go.mod h1:NqM8EUOU14njkJ3fqMW+pc6Ldnwhi/IjpwHt7yyuwOQ= golang.org/x/text v0.3.3/go.mod h1:5Zoc/QRtKVWzQhOtBMvqHzDpF6irO9z98xDceosuGiQ= -golang.org/x/text v0.22.0 h1:bofq7m3/HAFvbF51jz3Q9wLg3jkvSPuiZu/pD1XwgtM= -golang.org/x/text v0.22.0/go.mod h1:YRoo4H8PVmsu+E3Ou7cqLVH8oXWIHVoX0jqUWALQhfY= +golang.org/x/text v0.23.0 h1:D71I7dUrlY+VX0gQShAThNGHFxZ13dGLBHQLVl1mJlY= golang.org/x/tools v0.0.0-20180917221912-90fa682c2a6e/go.mod h1:n7NCudcB/nEzxVGmLbDWY5pfWTLqBcC2KZ6jyYvM4mQ= diff --git a/utils/utils.go b/utils/utils.go index 4dc6242..217e9a9 100644 --- a/utils/utils.go +++ b/utils/utils.go @@ -28,9 +28,11 @@ func ReadBuffer(reader io.Reader, size int) ([]byte, error) { } // Close 关闭对象,传入值绝对不能为 nil -func Close[T io.Closer](v T) { - err := v.Close() - if err != nil { - slog.Warn("对象关闭失败", "err", err) +func Close(v any) { + if v, ok := v.(io.Closer); ok { + err := v.Close() + if err != nil { + slog.Warn("对象关闭失败", "err", err) + } } }