package fwd import ( "context" "encoding/hex" "errors" "fmt" "io" "log/slog" "proxy-server/gateway/core" "proxy-server/gateway/env" "proxy-server/gateway/fwd/dispatcher" "proxy-server/gateway/fwd/metrics" "proxy-server/utils" "time" ) func (s *Service) listenUser(ctx context.Context, port uint16, ctrl io.Writer) error { dspt, err := dispatcher.New(port, time.Duration(env.AppUserTimeout)*time.Second) if err != nil { return err } defer dspt.Stop() var errCh = make(chan error) go func() { err := dspt.Run() if err != nil { // slog.Error("代理服务运行失败", "err", err) err = fmt.Errorf("协议嗅探服务运行失败: %w", err) } errCh <- err }() // 处理连接 for { select { case <-ctx.Done(): return nil case err := <-errCh: if err != nil { err = fmt.Errorf("监听转发端口失败: %w", err) } return err case user := <-dspt.Conn: metrics.TimerAuth.Store(user.Conn, time.Now()) s.userConnWg.Add(1) go func() { defer s.userConnWg.Done() err := s.processUserConn(user, ctrl) if err != nil { slog.Error("处理用户连接失败", "err", err) utils.Close(user) } }() } } } func (s *Service) processUserConn(user *core.Conn, ctrl io.Writer) (err error) { // 发送代理命令 err = s.sendProxy(ctrl, user.Tag, user.Dest.String()) if err != nil { return err } // 保存用户连接 s.userConnMap.Store(hex.EncodeToString(user.Tag[:]), user) // 如果限定时间内没有建立数据通道,则关闭连接 var timeout, cancel = context.WithTimeout(context.Background(), time.Duration(env.AppDataTimeout)*time.Second) defer cancel() select { case <-timeout.Done(): err = timeout.Err() case <-s.ctx.Done(): err = s.ctx.Err() } _, ok := s.userConnMap.LoadAndDelete(hex.EncodeToString(user.Tag[:])) if ok { utils.Close(user) if errors.Is(err, context.DeadlineExceeded) { slog.Error("用户连接超时", "tag", hex.EncodeToString(user.Tag[:]), "addr", user.RemoteAddr().String()) } } return nil }