package fwd import ( "bufio" "context" "encoding/binary" "fmt" "io" "log/slog" "net" "proxy-server/pkg/utils" "proxy-server/server/fwd/core" "proxy-server/server/fwd/dispatcher" "proxy-server/server/fwd/metrics" "proxy-server/server/pkg/env" "proxy-server/server/report" "strconv" "strings" "time" "errors" ) type CtrlCmd struct { conn net.Conn buf []byte } var ctrlCmdChan = make(chan CtrlCmd, 1024) func (s *Service) startCtrlTun() error { ctrlPort := env.AppCtrlPort slog.Debug("监听控制通道", slog.Uint64("port", uint64(ctrlPort))) // 监听端口 ls, err := net.Listen("tcp", ":"+strconv.Itoa(int(ctrlPort))) if err != nil { return fmt.Errorf("监听控制通道失败: %w", err) } defer utils.Close(ls) // 处理连接 connCh := utils.ChanConnAccept(s.ctx, ls) err = nil for loop := true; loop; { select { case <-s.ctx.Done(): loop = false case conn, ok := <-connCh: if !ok { err = errors.New("获取连接失败") loop = false } s.ctrlConnWg.Add(1) go func() { defer s.ctrlConnWg.Done() defer utils.Close(conn) err := s.processCtrlConn(conn) if err != nil { slog.Error("处理控制通道连接失败", "err", err) } }() } } return err } func (s *Service) processCtrlConn(conn net.Conn) error { reader := bufio.NewReader(conn) var recv = make([]byte, 4) _, err := io.ReadFull(reader, recv) if err != nil { return fmt.Errorf("读取客户端 ID 失败: %w", err) } var clientId = int32(binary.BigEndian.Uint32(recv)) // 分配端口 var minim uint16 = 20000 var maxim uint16 = 60000 var fwdPort uint16 for i := minim; i < maxim; i++ { var _, ok = s.fwdPortMap[i] if !ok { fwdPort = i s.fwdPortMap[i] = clientId break } } if fwdPort == 0 { return errors.New("没有可用的端口") } // 报告端口分配 if s.Config.Id == nil || *s.Config.Id == 0 { return errors.New("转发服务未成功注册,无法提供服务") } err = report.Assigned(s.ctx, *s.Config.Id, clientId, fwdPort) if err != nil { return fmt.Errorf("报告端口分配失败: %w", err) } // 响应客户端 _, err = conn.Write([]byte{1}) if err != nil { return fmt.Errorf("响应客户端失败: %w", err) } // 启动转发服务 slog.Info("监听转发端口", "port", fwdPort, "client", clientId) proxy, err := dispatcher.New(fwdPort) if err != nil { return err } defer proxy.Close() s.fwdLesWg.Add(1) go func() { defer s.fwdLesWg.Done() err := proxy.Run() if err != nil { slog.Error("代理服务运行失败", "err", err) } }() // 监听控制通道连接 errCh := make(chan error, 1) go func() { defer close(errCh) _, err := reader.ReadByte() errCh <- err }() // 批量同步写入 go func() { for { select { case <-s.ctx.Done(): return case cmd := <-ctrlCmdChan: _, err := cmd.conn.Write(cmd.buf) if err != nil { slog.Error("批量写入失败", "err", err) utils.Close(cmd.conn) } } } }() // 处理连接 for { select { case <-s.ctx.Done(): return nil case err := <-errCh: switch { case strings.Contains(err.Error(), "An existing connection was forcibly closed by the remote host."): slog.Debug("客户端主动断开连接") return nil case err == nil: return errors.New("客户端握手失败") default: return fmt.Errorf("客户端意外断开连接: %w", err) } case user := <-proxy.Conn: metrics.TimerAuth.Store(user.Conn, time.Now()) s.userConnWg.Add(1) go func() { defer s.userConnWg.Done() err := s.processUserConn(user, conn) if err != nil { slog.Error("处理用户连接失败", "err", err) utils.Close(user) } }() } } } func (s *Service) processUserConn(user *core.Conn, ctrl net.Conn) error { // 组织写入信息 dst := user.DestAddr().String() dstLen := len(dst) tag := user.Tag tagLen := len(tag) writeBuf := make([]byte, 2+dstLen+tagLen) writeBuf[0] = byte(dstLen) copy(writeBuf[1:], dst) writeBuf[1+dstLen] = byte(tagLen) copy(writeBuf[2+dstLen:], tag) // 异步写入命令 ctrlCmdChan <- CtrlCmd{ conn: ctrl, buf: writeBuf, } // 记录用户连接 s.userConnMap.Store(user.Tag, user) // 如果限定时间内没有建立数据通道,则关闭连接 timeout, cancel := context.WithTimeout(context.Background(), 30*time.Second) defer cancel() select { case <-s.ctx.Done(): // 服务会在退出时统一关闭未消费的连接 case <-timeout.Done(): storedUser, ok := s.userConnMap.LoadAndDelete(user.Tag) if ok { slog.Debug("建立数据通道超时", "tag", user.Tag) utils.Close(storedUser) } } return nil }