package fwd import ( "bufio" "context" "encoding/binary" "io" "log/slog" "net" "proxy-server/pkg/utils" "proxy-server/server/fwd/core" "proxy-server/server/fwd/dispatcher" "proxy-server/server/pkg/env" "strconv" "strings" "sync" "github.com/pkg/errors" ) type Config struct { } type Service struct { Config *Config ctx context.Context cancel context.CancelFunc userConnMap sync.Map fwdLesWg utils.CountWaitGroup ctrlConnWg utils.CountWaitGroup dataConnWg utils.CountWaitGroup userConnWg utils.CountWaitGroup } func New(config *Config) *Service { if config == nil { config = &Config{} } ctx, cancel := context.WithCancel(context.Background()) return &Service{ Config: config, ctx: ctx, cancel: cancel, userConnMap: sync.Map{}, fwdLesWg: utils.CountWaitGroup{}, ctrlConnWg: utils.CountWaitGroup{}, dataConnWg: utils.CountWaitGroup{}, userConnWg: utils.CountWaitGroup{}, } } func (s *Service) Close() { s.cancel() } func (s *Service) Run() { slog.Info("启动 fwd 服务") errQuit := make(chan struct{}) defer close(errQuit) wg := sync.WaitGroup{} // 控制通道监听 wg.Add(1) go func() { defer wg.Done() err := s.startCtrlTun() if err != nil { slog.Error("fwd 控制通道监听发生错误", "err", err) errQuit <- struct{}{} return } }() // 数据通道监听 wg.Add(1) go func() { defer wg.Done() err := s.startDataTun() if err != nil { slog.Error("fwd 数据通道监听发生错误", "err", err) errQuit <- struct{}{} return } }() // 等待退出 select { case <-s.ctx.Done(): slog.Info("fwd 服务主动退出") case <-errQuit: slog.Warn("fwd 服务异常退出") s.Close() } wg.Wait() s.dataConnWg.Wait() s.ctrlConnWg.Wait() s.fwdLesWg.Wait() s.userConnWg.Wait() // 清理资源 s.userConnMap.Range(func(key, value any) bool { conn := value.(core.Conn) utils.Close(conn) return true }) s.userConnMap.Clear() s.ctrlConnWg.Wait() slog.Debug("控制通道连接已关闭") s.dataConnWg.Wait() slog.Debug("数据通道连接已关闭") s.fwdLesWg.Wait() slog.Debug("转发服务已关闭") wg.Wait() slog.Info("fwd 服务已退出") } func (s *Service) startCtrlTun() error { ctrlPort := env.AppCtrlPort slog.Debug("监听控制通道", slog.Uint64("port", uint64(ctrlPort))) // 监听端口 ls, err := net.Listen("tcp", ":"+strconv.Itoa(int(ctrlPort))) if err != nil { return errors.Wrap(err, "监听控制通道失败") } defer utils.Close(ls) // 处理连接 connCh := utils.ChanConnAccept(s.ctx, ls) for { select { case <-s.ctx.Done(): return nil case conn, ok := <-connCh: if !ok { return errors.New("获取连接失败") } s.ctrlConnWg.Add(1) go func() { defer s.ctrlConnWg.Done() err := s.processCtrlConn(conn) if err != nil { slog.Error("处理控制通道连接失败", "err", err) utils.Close(conn) } }() } } } func (s *Service) processCtrlConn(conn net.Conn) error { slog.Debug("客户端连入", "addr", conn.RemoteAddr().String()) reader := bufio.NewReader(conn) // 获取转发端口 portBuf, err := utils.ReadBuffer(reader, 2) if err != nil { return errors.Wrap(err, "获取转发端口失败") } port := binary.BigEndian.Uint16(portBuf) // 启动转发服务 proxy, err := dispatcher.New(port) if err != nil { return errors.Wrap(err, "创建 socks 转发服务失败") } defer proxy.Close() s.fwdLesWg.Add(1) go func() { defer s.fwdLesWg.Done() err := proxy.Run() if err != nil { slog.Error("代理服务运行失败", "err", err) return } }() // 监听客户端连接 errCh := make(chan error) defer close(errCh) go func() { _, err := reader.ReadByte() errCh <- err }() // 处理连接 for { select { case <-s.ctx.Done(): return nil case err := <-errCh: switch { case strings.Contains(err.Error(), "An existing connection was forcibly closed by the remote host."): slog.Debug("客户端主动断开连接") return nil case err == nil: return errors.New("客户端握手失败") default: return errors.Wrap(err, "客户端意外断开连接") } case user := <-proxy.Conn: s.userConnWg.Add(1) go func() { defer s.userConnWg.Done() err := s.processUserConn(user, conn) if err != nil { slog.Error("处理用户连接失败", "err", err) utils.Close(user) } }() } } } func (s *Service) startDataTun() error { dataPort := env.AppDataPort slog.Debug("监听数据通道", slog.Uint64("port", uint64(dataPort))) // 监听端口 ls, err := net.Listen("tcp", ":"+strconv.Itoa(int(dataPort))) if err != nil { return errors.Wrap(err, "监听数据通道失败") } defer utils.Close(ls) // 处理连接 connCh := utils.ChanConnAccept(s.ctx, ls) for { select { case <-s.ctx.Done(): return nil case conn, ok := <-connCh: if !ok { return errors.New("获取连接失败") } s.dataConnWg.Add(1) go func() { defer s.dataConnWg.Done() defer utils.Close(conn) err := s.processDataConn(conn) if err != nil { slog.Error("处理数据通道失败", "err", err) } }() } } } func (s *Service) processDataConn(client net.Conn) error { slog.Info("客户端准备接收数据 " + client.RemoteAddr().String()) // 读取 tag var tag string select { case <-s.ctx.Done(): return nil default: tagLen, err := utils.ReadByte(client) if err != nil { return errors.Wrap(err, "从客户端获取 tag 失败") } tagBuf, err := utils.ReadBuffer(client, int(tagLen)) if err != nil { return errors.Wrap(err, "从客户端获取 tag 失败") } tag = string(tagBuf) } // 找到用户连接 userAny, ok := s.userConnMap.Load(tag) if !ok { return errors.New("查找用户连接失败") } user := userAny.(*core.Conn) defer utils.Close(user) defer s.userConnMap.Delete(tag) // 发送目标地址 select { case <-s.ctx.Done(): return nil default: dest := user.Dest.String() destLen := len(dest) destBuf := make([]byte, 1+destLen) destBuf[0] = byte(destLen) copy(destBuf[1:], dest) _, err := client.Write(destBuf) if err != nil { return errors.Wrap(err, "向客户端发送目标地址失败") } } // 数据转发 userPipeReader, userPipeWriter := io.Pipe() defer utils.Close(userPipeWriter) teeUser := io.TeeReader(user, userPipeWriter) go func() { err := analysisAndLog(user, userPipeReader) if err != nil { slog.Error("数据解析失败", "err", err) } }() wg := sync.WaitGroup{} wg.Add(2) go func() { defer wg.Done() _, err := io.Copy(client, teeUser) if err != nil { slog.Error("数据转发失败 user->client", "err", err) } }() go func() { defer wg.Done() _, err := io.Copy(user, client) if err != nil { if errors.Is(err, net.ErrClosed) { } else { // slog.Error("数据转发失败 client->user", "err", err, "errType", reflect.TypeOf(err)) } } }() select { case <-s.ctx.Done(): case <-utils.ChanWgWait(s.ctx, &wg): } return nil } func (s *Service) processUserConn(user *core.Conn, ctrl net.Conn) error { // 发送 tag tag := user.Tag tagLen := len(tag) tagBuf := make([]byte, 1+tagLen) tagBuf[0] = byte(tagLen) copy(tagBuf[1:], tag) _, err := ctrl.Write(tagBuf) if err != nil { return errors.Wrap(err, "向控制通道发送 tag 失败") } // 记录用户连接 s.userConnMap.Store(user.Tag, user) return nil }