From 38d5341e8481cca9fe3e47bd778e8854cc79bf8e Mon Sep 17 00:00:00 2001 From: luorijun Date: Thu, 27 Feb 2025 18:07:00 +0800 Subject: [PATCH] =?UTF-8?q?=E4=BC=98=E5=8C=96=E9=A1=B9=E7=9B=AE=E7=BB=93?= =?UTF-8?q?=E6=9E=84?= MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit --- README.md | 14 ++++---- cmd/server/.env.example | 2 ++ pkg/utils/sync.go | 6 ++++ server/fwd/analysis.go | 57 +++++++++++++++++++++++++++++++ server/fwd/fwd.go | 41 +++++++++++----------- server/fwd/logs/logs.go | 5 --- server/fwd/socks/socks.go | 4 +++ server/mnt/mnt.go | 11 +++--- server/server.go | 72 +++++++++++++++++++++++++++++++++++++-- 9 files changed, 176 insertions(+), 36 deletions(-) create mode 100644 server/fwd/analysis.go delete mode 100644 server/fwd/logs/logs.go diff --git a/README.md b/README.md index 2c932e8..8ee0c72 100644 --- a/README.md +++ b/README.md @@ -1,22 +1,24 @@ ## todo -监听进程信号,优雅关闭服务 +配置退出等待时间 + +log 控制台颜色,输出错误堆栈 读取 conn 时加上超时机制 -检查 ip 时需要判断同一 ip 的不同写法 - 代理节点超时控制 -实现一个 socks context 以在子组件中获取 socks 相关信息 - 网关根据代理节点对目标服务连接的反馈,决定向用户返回的 socks 响应 数据通道池化 +在控制通道直接传输目标地址,客户端可以同时开始数据通道和目标地址的连接建立 + ### 长期 -代理端口支持混合端口转发(支持 tcp_mux) +实现一个 socks context 以在子组件中获取 socks 相关信息 + +代理端口支持混合端口转发 数据通道支持 tcp 多路复用(分离逻辑流) diff --git a/cmd/server/.env.example b/cmd/server/.env.example index e3fd97f..3055361 100644 --- a/cmd/server/.env.example +++ b/cmd/server/.env.example @@ -1,6 +1,8 @@ +# 应用配置 APP_CTRL_PORT=18080 APP_DATA_PORT=18081 +# 数据库配置 DB_HOST=localhost DB_PORT=5432 DB_DATABASE=app diff --git a/pkg/utils/sync.go b/pkg/utils/sync.go index 5c43733..b388690 100644 --- a/pkg/utils/sync.go +++ b/pkg/utils/sync.go @@ -5,6 +5,12 @@ import ( "sync/atomic" ) +type WaitGroup interface { + Add(delta uint) + Done() + Wait() +} + type CountWaitGroup struct { wg sync.WaitGroup num atomic.Int64 diff --git a/server/fwd/analysis.go b/server/fwd/analysis.go new file mode 100644 index 0000000..6bcc96e --- /dev/null +++ b/server/fwd/analysis.go @@ -0,0 +1,57 @@ +package fwd + +import ( + "bufio" + "io" + "log/slog" + "proxy-server/pkg/utils" +) + +func analysis(reader io.Reader) { + buf := bufio.NewReader(reader) + first, err := buf.Peek(8) + if err != nil { + slog.Error("analysis peek error", "err", err) + } else { + + switch { + case first[0] == 0x16: + analysisHttps(reader) + case + string(first[:4]) == "GET ", + // string(first[:4]) == "PUT ", + string(first[:5]) == "POST ": + // string(first[:4]) == "HEAD ", + // string(first[:4]) == "TRACE ", + // string(first[:4]) == "PATCH ", + // string(first[:4]) == "DELETE ", + // string(first[:4]) == "CONNECT ", + // string(first[:4]) == "OPTIONS ": + analysisHttp(reader) + } + } + discord(reader) +} + +func analysisHttp(reader io.Reader) { +} + +func analysisHttps(reader io.Reader) { + + head, err := utils.ReadBuffer(reader, 5) + if err != nil { + slog.Error("analysis https err", "err", err) + return + } + + if head[1] == 0x03 && head[2] == 0x03 { + // tls1.2 + } +} + +func discord(reader io.Reader) { + _, err := io.Copy(io.Discard, reader) + if err != nil { + slog.Error("analysis discord err", "err", err) + } +} diff --git a/server/fwd/fwd.go b/server/fwd/fwd.go index 084376b..cca96fb 100644 --- a/server/fwd/fwd.go +++ b/server/fwd/fwd.go @@ -30,27 +30,19 @@ type Service struct { } func New(config *Config) *Service { - _config := config - if _config == nil { - _config = &Config{} + if config == nil { + config = &Config{} } return &Service{ - Config: _config, + Config: config, connMap: make(map[string]socks.ProxyConn), ctrlConnWg: utils.CountWaitGroup{}, dataConnWg: utils.CountWaitGroup{}, } } -func (s *Service) Run(ctx context.Context, errCh chan error) { - defer func() { - err := recover() - if err != nil { - slog.Error("服务由于意外的 panic 导致退出", err) - } - }() - +func (s *Service) Run(ctx context.Context) { slog.Info("启动 fwd 服务") // 启动工作协程 @@ -61,7 +53,7 @@ func (s *Service) Run(ctx context.Context, errCh chan error) { subErrCh := make(chan error, goNum) defer close(subErrCh) - go s.startCtrlTun(subCtx, subErrCh) + go s.startCtrlTun(subCtx) go s.startDataTun(subCtx, subErrCh) // 等待结束 @@ -80,18 +72,20 @@ func (s *Service) Run(ctx context.Context, errCh chan error) { } slog.Info("fwd 服务已结束") - errCh <- firstSubErr } -func (s *Service) startCtrlTun(ctx context.Context, errCh chan error) { +func (s *Service) Close() { + +} + +func (s *Service) startCtrlTun(ctx context.Context) error { ctrlPort := env.AppCtrlPort slog.Debug("监听控制通道", slog.Uint64("port", uint64(ctrlPort))) // 监听端口 ls, err := net.Listen("tcp", ":"+strconv.Itoa(int(ctrlPort))) if err != nil { - slog.Error("监听控制通道失败", "err", err) - return + return errors.Wrap(err, "监听控制通道失败") } defer utils.Close(ls) @@ -120,9 +114,10 @@ func (s *Service) startCtrlTun(ctx context.Context, errCh chan error) { } // 等待子协程结束 todo 可配置等待时间 + s.Close() + timeout, cancel := context.WithTimeout(context.Background(), 5*time.Second) defer cancel() - procCh := utils.ChanWgWait(timeout, &s.ctrlConnWg) defer close(procCh) @@ -134,7 +129,7 @@ func (s *Service) startCtrlTun(ctx context.Context, errCh chan error) { } slog.Debug("关闭控制通道") - errCh <- nil + return nil } func (s *Service) processCtrlConn(controller net.Conn) { @@ -283,6 +278,11 @@ func (s *Service) processDataConn(client net.Conn) { // 数据转发 slog.Info("开始数据转发 " + client.RemoteAddr().String() + " <-> " + data.Dest) + + // userPipeReader, userPipeWriter := io.Pipe() + // defer utils.Close(userPipeWriter) + // teeUser := io.TeeReader(user, userPipeWriter) + errCh := make(chan error) go func() { _, err := io.Copy(client, user) @@ -291,6 +291,8 @@ func (s *Service) processDataConn(client net.Conn) { } errCh <- err }() + // go analysis(userPipeReader) + go func() { _, err := io.Copy(user, client) if err != nil { @@ -298,6 +300,7 @@ func (s *Service) processDataConn(client net.Conn) { } errCh <- err }() + <-errCh slog.Info("数据转发结束 " + client.RemoteAddr().String() + " <-> " + data.Dest) } diff --git a/server/fwd/logs/logs.go b/server/fwd/logs/logs.go deleted file mode 100644 index 453f5db..0000000 --- a/server/fwd/logs/logs.go +++ /dev/null @@ -1,5 +0,0 @@ -package logs - -func Write(str ...string) { - -} diff --git a/server/fwd/socks/socks.go b/server/fwd/socks/socks.go index 9c04fd8..4abcaa8 100644 --- a/server/fwd/socks/socks.go +++ b/server/fwd/socks/socks.go @@ -61,6 +61,10 @@ type Server struct { // New 创建服务器 func New(conf *Config) (*Server, error) { + if conf == nil { + conf = &Config{} + } + if len(conf.AuthMethods) == 0 { return nil, ConfigError("认证方法不能为空") } diff --git a/server/mnt/mnt.go b/server/mnt/mnt.go index 58337f9..d671bce 100644 --- a/server/mnt/mnt.go +++ b/server/mnt/mnt.go @@ -2,12 +2,12 @@ package mnt import ( "context" - "encoding/hex" "log/slog" "github.com/google/gopacket" "github.com/google/gopacket/pcap" "github.com/pkg/errors" + "golang.org/x/text/encoding/simplifiedchinese" ) func Start(ctx context.Context, errCh chan error) { @@ -15,9 +15,12 @@ func Start(ctx context.Context, errCh chan error) { // 打开一个网络接口 device, err := pcap.OpenLive("WLAN", 1600, true, pcap.BlockForever) if err != nil { - b, er := hex.DecodeString("\\xbb") - slog.Debug("b", b, er) - errCh <- errors.Wrap(err, "打开网络接口失败") + gbk := simplifiedchinese.GBK.NewDecoder() + errMsg, err := gbk.String(err.Error()) + if err != nil { + errMsg = err.Error() + } + errCh <- errors.Wrap(err, "打开网络接口失败, "+errMsg) return } defer device.Close() diff --git a/server/server.go b/server/server.go index 2ee0c03..5dcfcd1 100644 --- a/server/server.go +++ b/server/server.go @@ -4,16 +4,25 @@ import ( "context" "log/slog" "os" + "os/signal" + "proxy-server/pkg/utils" "proxy-server/server/fwd" "proxy-server/server/pkg/env" "proxy-server/server/pkg/orm" "proxy-server/server/web" + "syscall" + "time" "github.com/joho/godotenv" "github.com/lmittmann/tint" "github.com/mattn/go-colorable" ) +type Context struct { + context.Context + log *slog.Logger +} + func Start() { // 初始化 @@ -21,14 +30,73 @@ func Start() { env.Init() orm.Init() - // 启动代理服务 - fwd.New(nil).Run(context.Background(), make(chan error)) + // 启动服务 + ctx, cancel := context.WithCancel(context.Background()) + defer cancel() + errQuit := make(chan error) + defer close(errQuit) + wg := utils.CountWaitGroup{} + + wg.Add(1) + go func() { + defer wg.Done() + err := startFwdServer(ctx) + if err != nil { + slog.Error("代理服务发生错误", "err", err) + } + errQuit <- err + }() + + // 等待退出信号 + osQuit := make(chan os.Signal) + signal.Notify(osQuit, os.Interrupt, syscall.SIGTERM) + + select { + case <-osQuit: + slog.Info("服务关闭") + case <-errQuit: + slog.Error("服务异常退出") + } + + // 等待子服务退出 + cancel() + + timeout, cancel := context.WithTimeout(context.Background(), 5*time.Second) + defer cancel() + wgCh := utils.ChanWgWait(timeout, &wg) + + select { + case <-timeout.Done(): + slog.Error("关闭超时,强制关闭") + case <-wgCh: + slog.Info("服务已退出") + } } func initLog() { slog.SetLogLoggerLevel(slog.LevelDebug) } +func startFwdServer(ctx context.Context) error { + server := fwd.New(nil) + + go func() { + <-ctx.Done() + server.Close() + }() + + server.Run(ctx) + return nil +} + +func startMntServer(ctx context.Context) { + +} + +func startWebServer(ctx context.Context) { + +} + func Start2() { defer func() { err := recover()