package client import ( "bufio" "flag" "fmt" "io" "log/slog" "net" "net/http" "os" "proxy-server/pkg/utils" "runtime" "strconv" "time" "github.com/joho/godotenv" "github.com/pkg/errors" _ "net/http/pprof" ) const Version byte = 1 type Config struct { Name string FwdHost string FwdCtrlPort uint FwdDataPort uint RetryInterval uint } var cfg Config var frpCtrlAddr string var frpDataAddr string func Start() { initLog() initCmd() initDevEnv() frpCtrlAddr = net.JoinHostPort(cfg.FwdHost, strconv.Itoa(int(cfg.FwdCtrlPort))) frpDataAddr = net.JoinHostPort(cfg.FwdHost, strconv.Itoa(int(cfg.FwdDataPort))) // 性能监控 go func() { runtime.SetBlockProfileRate(1) err := http.ListenAndServe(":7070", nil) if err != nil { slog.Error("性能监控服务启动失败", "err", err) } }() // 建立控制通道 for { err := ctrl() if err != nil { slog.Error("建立控制通道失败", err) slog.Info(fmt.Sprintf("%d 秒后重试", cfg.RetryInterval)) time.Sleep(time.Duration(cfg.RetryInterval) * time.Second) } } } func ctrl() error { slog.Info("建立控制通道", "addr", frpCtrlAddr) conn, err := net.Dial("tcp", frpCtrlAddr) if err != nil { return errors.Wrap(err, "连接失败") } defer utils.Close(conn) reader := bufio.NewReader(conn) // 请求转发端口 _, err = conn.Write([]byte{Version}) if err != nil { return errors.Wrap(err, "发送版本号失败") } // 发送客户端名称 nameLen := byte(len(cfg.Name)) nameBuf := make([]byte, 1+nameLen) nameBuf[0] = nameLen copy(nameBuf[1:], cfg.Name) _, err = conn.Write(nameBuf) if err != nil { return errors.Wrap(err, "发送 name 失败") } // 等待服务端响应 respBuf, err := reader.ReadByte() if err != nil { return errors.Wrap(err, "接收响应失败") } if respBuf != 1 { return errors.New("服务端响应失败") } else { slog.Info("成功建立连接") } // 等待用户连接 // 读写失败后退出重连,防止后续数据读写顺序错位导致卡死控制通道 slog.Info("等待用户连接") for { // 接收 dst dstLen, err := reader.ReadByte() if err != nil { return errors.Wrap(err, "接收 dstLen 失败") } dstBuf, err := utils.ReadBuffer(reader, int(dstLen)) if err != nil { return errors.Wrap(err, "接收 dstBuf 失败") } addr := string(dstBuf) // 接收 tag tagLen, err := reader.ReadByte() if err != nil { return errors.Wrap(err, "接收 tagLen 失败") } tagBuf, err := utils.ReadBuffer(reader, int(tagLen)) if err != nil { return errors.Wrap(err, "接收 tagBuf 失败") } // 建立数据通道 go func() { err := data(addr, tagBuf) if err != nil { slog.Error("建立数据通道失败", err) } }() } } func data(addr string, tag []byte) error { // 向服务端建立连接 src, err := net.Dial("tcp", frpDataAddr) if err != nil { return errors.Wrap(err, "连接服务端失败") } tagLen := byte(len(tag)) tagBuf := make([]byte, 2+tagLen) tagBuf[1] = tagLen copy(tagBuf[2:], tag) // 向目标地址建立连接 dst, dstErr := net.Dial("tcp", addr) if dstErr != nil { tagBuf[0] = 0 } else { tagBuf[0] = 1 } // 发送连接状态 _, err = src.Write(tagBuf) if err != nil { utils.Close(src) if dst != nil { utils.Close(dst) } return errors.Wrap(err, "发送连接状态失败") } if tagBuf[0] == 0 { utils.Close(src) if dst != nil { utils.Close(dst) } return errors.Wrap(dstErr, "连接目标地址失败") } go func() { defer utils.Close(dst) _, err := io.Copy(dst, src) if err != nil && !errors.Is(err, net.ErrClosed) { slog.Error("上行流量代理失败", "err", err) } }() go func() { defer utils.Close(src) _, err := io.Copy(src, dst) if err != nil && !errors.Is(err, net.ErrClosed) { slog.Error("下行流量代理失败", "err", err) } }() return nil } func initLog() { slog.SetLogLoggerLevel(slog.LevelDebug) } func initCmd() { flag.StringVar(&cfg.Name, "n", "", "客户端名称") flag.StringVar(&cfg.FwdHost, "h", "", "转发服务器地址") flag.UintVar(&cfg.FwdCtrlPort, "c", 18080, "转发服务器控制通道端口") flag.UintVar(&cfg.FwdDataPort, "d", 18081, "转发服务器数据通道端口") flag.UintVar(&cfg.RetryInterval, "r", 5, "重试间隔时间") flag.Parse() if cfg.Name == "" { slog.Error("客户端名称不能为空") flag.Usage() os.Exit(1) } } func initDevEnv() { err := godotenv.Load() if err != nil { slog.Debug("没有本地环境变量文件") } cfg.FwdHost = os.Getenv("FWD_HOST") }