diff --git a/client/service.go b/client/client.go similarity index 90% rename from client/service.go rename to client/client.go index 214c3c9..5f0fede 100644 --- a/client/service.go +++ b/client/client.go @@ -56,6 +56,8 @@ func control() error { } defer utils.Close(conn) + reader := bufio.NewReader(conn) + // 请求转发端口 slog.Info("注册转发端口", "port", cfg.FwdPort) portBuf := make([]byte, 2) @@ -67,10 +69,8 @@ func control() error { // 等待用户连接 // 读写失败后退出重连,防止后续数据读写顺序错位导致卡死控制通道 + slog.Info("等待用户连接") for { - slog.Info("等待用户连接") - reader := bufio.NewReader(conn) - tagLen, err := utils.ReadByte(reader) if err != nil { return errors.Wrap(err, "接收 tagLen 失败") @@ -81,8 +81,8 @@ func control() error { } // 建立数据通道 + slog.Info("收到用户连接,建立数据通道", "tag", string(tagBuf)) go func() { - slog.Info("收到用户连接,建立数据通道") err := data(tagLen, tagBuf) if err != nil { slog.Error("建立数据通道失败", err) @@ -139,9 +139,9 @@ func data(tagLen byte, tagBuf []byte) error { errCh <- err return } else { - slog.Info("上行流量代理结束") + slog.Debug("上行流量代理结束") } - slog.Info("上行流量", "bytes", written) + slog.Debug("上行流量", "bytes", written) errCh <- nil }() go func() { @@ -151,14 +151,14 @@ func data(tagLen byte, tagBuf []byte) error { errCh <- err return } else { - slog.Info("下行流量代理结束") + slog.Debug("下行流量代理结束") } - slog.Info("下行流量", "bytes", written) + slog.Debug("下行流量", "bytes", written) errCh <- nil }() <-errCh - slog.Info("代理流量结束", "time", time.Since(timer)) - slog.Info("数据通道结束", "time", time.Since(timerAll)) + slog.Debug("代理流量结束", "time", time.Since(timer)) + slog.Debug("数据通道结束", "time", time.Since(timerAll)) return nil } @@ -207,5 +207,5 @@ func initEnv() { } func initLog() { - slog.SetLogLoggerLevel(slog.LevelDebug) + slog.SetLogLoggerLevel(slog.LevelInfo) } diff --git a/cmd/mock/main.go b/cmd/mock/main.go index 93cdb67..c7ea927 100644 --- a/cmd/mock/main.go +++ b/cmd/mock/main.go @@ -3,10 +3,7 @@ package main import ( "math/rand" "net/http" - "net/url" "time" - - vegeta "github.com/tsenart/vegeta/lib" ) func main() { @@ -15,39 +12,20 @@ func main() { } func mock() { + http.HandleFunc("/", func(w http.ResponseWriter, r *http.Request) { waiting := rand.Intn(450) + 50 time.Sleep(time.Duration(waiting) * time.Millisecond) w.Write([]byte("Hello World")) }) - err := http.ListenAndServe(":8080", nil) + serv := &http.Server{ + Addr: ":8080", + Handler: nil, + } + serv.SetKeepAlivesEnabled(false) + err := serv.ListenAndServe() if err != nil { panic(err) } } - -func attack() { - targeter := vegeta.NewStaticTargeter(vegeta.Target{ - Method: "GET", - URL: "http://localhost:8080", - }) - - rate := vegeta.Rate{Freq: 500, Per: time.Second} - - duration := 10 * time.Second - - attacker := vegeta.NewAttacker() - - vegeta.Proxy(func(request *http.Request) (*url.URL, error) { - return url.Parse("http://test-api.imfree.site:20001") - })(attacker) - - result := attacker.Attack(targeter, rate, duration, "test") - - maxNum := 0 - for res := range result { - println(res.Latency) - } - println(maxNum) -} diff --git a/server/fwd/dispatcher/dispatch.go b/server/fwd/dispatcher/dispatch.go index b01469b..181a3b4 100644 --- a/server/fwd/dispatcher/dispatch.go +++ b/server/fwd/dispatcher/dispatch.go @@ -86,12 +86,15 @@ func (s *Server) Run() error { errCh <- err }() + err = nil select { case <-s.ctx.Done(): - return nil - case err := <-errCh: - return err + case err = <-errCh: } + + close(s.Conn) + + return err } func (s *Server) acceptHttp(ls net.Listener) error { diff --git a/server/fwd/fwd.go b/server/fwd/fwd.go index ad5c13d..36eeaba 100644 --- a/server/fwd/fwd.go +++ b/server/fwd/fwd.go @@ -61,7 +61,7 @@ func (s *Service) Close() { func (s *Service) Run() { slog.Info("启动 fwd 服务") - errQuit := make(chan struct{}) + errQuit := make(chan struct{}, 2) defer close(errQuit) wg := sync.WaitGroup{} @@ -234,16 +234,22 @@ func (s *Service) startDataTun() error { } defer utils.Close(ls) - // 处理连接 - connCh := utils.ChanConnAccept(s.ctx, ls) + go func() { + <-s.ctx.Done() + utils.Close(ls) + }() + for { + conn, err := ls.Accept() + if err != nil { + return errors.Wrap(err, "监听数据通道失败") + } + select { case <-s.ctx.Done(): + utils.Close(conn) return nil - case conn, ok := <-connCh: - if !ok { - return errors.New("获取连接失败") - } + default: s.dataConnWg.Add(1) go func() { defer s.dataConnWg.Done() @@ -258,7 +264,6 @@ func (s *Service) startDataTun() error { } func (s *Service) processDataConn(client net.Conn) error { - slog.Info("客户端准备接收数据 " + client.RemoteAddr().String()) // 读取 tag var tag string @@ -326,11 +331,7 @@ func (s *Service) processDataConn(client net.Conn) error { defer wg.Done() _, err := io.Copy(user, client) if err != nil { - if errors.Is(err, net.ErrClosed) { - - } else { - // slog.Error("数据转发失败 client->user", "err", err, "errType", reflect.TypeOf(err)) - } + slog.Error("数据转发失败 client->user", "err", err, "errType") } }()