修复连接超时问题

This commit is contained in:
2025-03-06 14:35:21 +08:00
parent 9d8bdaec7e
commit 2a50237af2
4 changed files with 38 additions and 56 deletions

View File

@@ -56,6 +56,8 @@ func control() error {
} }
defer utils.Close(conn) defer utils.Close(conn)
reader := bufio.NewReader(conn)
// 请求转发端口 // 请求转发端口
slog.Info("注册转发端口", "port", cfg.FwdPort) slog.Info("注册转发端口", "port", cfg.FwdPort)
portBuf := make([]byte, 2) portBuf := make([]byte, 2)
@@ -67,10 +69,8 @@ func control() error {
// 等待用户连接 // 等待用户连接
// 读写失败后退出重连,防止后续数据读写顺序错位导致卡死控制通道 // 读写失败后退出重连,防止后续数据读写顺序错位导致卡死控制通道
for {
slog.Info("等待用户连接") slog.Info("等待用户连接")
reader := bufio.NewReader(conn) for {
tagLen, err := utils.ReadByte(reader) tagLen, err := utils.ReadByte(reader)
if err != nil { if err != nil {
return errors.Wrap(err, "接收 tagLen 失败") return errors.Wrap(err, "接收 tagLen 失败")
@@ -81,8 +81,8 @@ func control() error {
} }
// 建立数据通道 // 建立数据通道
slog.Info("收到用户连接,建立数据通道", "tag", string(tagBuf))
go func() { go func() {
slog.Info("收到用户连接,建立数据通道")
err := data(tagLen, tagBuf) err := data(tagLen, tagBuf)
if err != nil { if err != nil {
slog.Error("建立数据通道失败", err) slog.Error("建立数据通道失败", err)
@@ -139,9 +139,9 @@ func data(tagLen byte, tagBuf []byte) error {
errCh <- err errCh <- err
return return
} else { } else {
slog.Info("上行流量代理结束") slog.Debug("上行流量代理结束")
} }
slog.Info("上行流量", "bytes", written) slog.Debug("上行流量", "bytes", written)
errCh <- nil errCh <- nil
}() }()
go func() { go func() {
@@ -151,14 +151,14 @@ func data(tagLen byte, tagBuf []byte) error {
errCh <- err errCh <- err
return return
} else { } else {
slog.Info("下行流量代理结束") slog.Debug("下行流量代理结束")
} }
slog.Info("下行流量", "bytes", written) slog.Debug("下行流量", "bytes", written)
errCh <- nil errCh <- nil
}() }()
<-errCh <-errCh
slog.Info("代理流量结束", "time", time.Since(timer)) slog.Debug("代理流量结束", "time", time.Since(timer))
slog.Info("数据通道结束", "time", time.Since(timerAll)) slog.Debug("数据通道结束", "time", time.Since(timerAll))
return nil return nil
} }
@@ -207,5 +207,5 @@ func initEnv() {
} }
func initLog() { func initLog() {
slog.SetLogLoggerLevel(slog.LevelDebug) slog.SetLogLoggerLevel(slog.LevelInfo)
} }

View File

@@ -3,10 +3,7 @@ package main
import ( import (
"math/rand" "math/rand"
"net/http" "net/http"
"net/url"
"time" "time"
vegeta "github.com/tsenart/vegeta/lib"
) )
func main() { func main() {
@@ -15,39 +12,20 @@ func main() {
} }
func mock() { func mock() {
http.HandleFunc("/", func(w http.ResponseWriter, r *http.Request) { http.HandleFunc("/", func(w http.ResponseWriter, r *http.Request) {
waiting := rand.Intn(450) + 50 waiting := rand.Intn(450) + 50
time.Sleep(time.Duration(waiting) * time.Millisecond) time.Sleep(time.Duration(waiting) * time.Millisecond)
w.Write([]byte("Hello World")) w.Write([]byte("Hello World"))
}) })
err := http.ListenAndServe(":8080", nil) serv := &http.Server{
Addr: ":8080",
Handler: nil,
}
serv.SetKeepAlivesEnabled(false)
err := serv.ListenAndServe()
if err != nil { if err != nil {
panic(err) panic(err)
} }
} }
func attack() {
targeter := vegeta.NewStaticTargeter(vegeta.Target{
Method: "GET",
URL: "http://localhost:8080",
})
rate := vegeta.Rate{Freq: 500, Per: time.Second}
duration := 10 * time.Second
attacker := vegeta.NewAttacker()
vegeta.Proxy(func(request *http.Request) (*url.URL, error) {
return url.Parse("http://test-api.imfree.site:20001")
})(attacker)
result := attacker.Attack(targeter, rate, duration, "test")
maxNum := 0
for res := range result {
println(res.Latency)
}
println(maxNum)
}

View File

@@ -86,12 +86,15 @@ func (s *Server) Run() error {
errCh <- err errCh <- err
}() }()
err = nil
select { select {
case <-s.ctx.Done(): case <-s.ctx.Done():
return nil case err = <-errCh:
case err := <-errCh:
return err
} }
close(s.Conn)
return err
} }
func (s *Server) acceptHttp(ls net.Listener) error { func (s *Server) acceptHttp(ls net.Listener) error {

View File

@@ -61,7 +61,7 @@ func (s *Service) Close() {
func (s *Service) Run() { func (s *Service) Run() {
slog.Info("启动 fwd 服务") slog.Info("启动 fwd 服务")
errQuit := make(chan struct{}) errQuit := make(chan struct{}, 2)
defer close(errQuit) defer close(errQuit)
wg := sync.WaitGroup{} wg := sync.WaitGroup{}
@@ -234,16 +234,22 @@ func (s *Service) startDataTun() error {
} }
defer utils.Close(ls) defer utils.Close(ls)
// 处理连接 go func() {
connCh := utils.ChanConnAccept(s.ctx, ls) <-s.ctx.Done()
utils.Close(ls)
}()
for { for {
conn, err := ls.Accept()
if err != nil {
return errors.Wrap(err, "监听数据通道失败")
}
select { select {
case <-s.ctx.Done(): case <-s.ctx.Done():
utils.Close(conn)
return nil return nil
case conn, ok := <-connCh: default:
if !ok {
return errors.New("获取连接失败")
}
s.dataConnWg.Add(1) s.dataConnWg.Add(1)
go func() { go func() {
defer s.dataConnWg.Done() defer s.dataConnWg.Done()
@@ -258,7 +264,6 @@ func (s *Service) startDataTun() error {
} }
func (s *Service) processDataConn(client net.Conn) error { func (s *Service) processDataConn(client net.Conn) error {
slog.Info("客户端准备接收数据 " + client.RemoteAddr().String())
// 读取 tag // 读取 tag
var tag string var tag string
@@ -326,11 +331,7 @@ func (s *Service) processDataConn(client net.Conn) error {
defer wg.Done() defer wg.Done()
_, err := io.Copy(user, client) _, err := io.Copy(user, client)
if err != nil { if err != nil {
if errors.Is(err, net.ErrClosed) { slog.Error("数据转发失败 client->user", "err", err, "errType")
} else {
// slog.Error("数据转发失败 client->user", "err", err, "errType", reflect.TypeOf(err))
}
} }
}() }()