From 75569d2d6dd39b5e09f0d0265be6b0e2d972dc06 Mon Sep 17 00:00:00 2001 From: luorijun Date: Wed, 14 May 2025 17:46:34 +0800 Subject: [PATCH] =?UTF-8?q?=E5=AE=8C=E5=96=84=E5=AE=A2=E6=88=B7=E7=AB=AF?= =?UTF-8?q?=E4=B8=8E=E6=9C=8D=E5=8A=A1=E7=AB=AF=E6=B3=A8=E5=86=8C=EF=BC=8C?= =?UTF-8?q?=E7=AB=AF=E5=8F=A3=E5=88=86=E9=85=8D=E5=92=8C=E5=8D=8F=E8=AE=AE?= =?UTF-8?q?=E4=BA=A4=E4=BA=92=E9=80=BB=E8=BE=91?= MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit --- README.md | 6 +- client/client.go | 76 ++++++++------------ client/core/env.go | 10 +-- client/env/env.go | 53 ++++++++++++++ client/geo/cip.go | 57 --------------- client/geo/geo.go | 156 +++++++++++++++++++++++++++++++++++++++- client/geo/ipapi.go | 83 --------------------- client/report/online.go | 25 ++++--- cmd/client/main.go | 2 +- cmd/server/main.go | 4 +- server/fwd/ctrl.go | 99 +++++++++++-------------- server/fwd/fwd.go | 10 ++- server/report/report.go | 4 +- server/server.go | 5 +- 14 files changed, 313 insertions(+), 277 deletions(-) create mode 100644 client/env/env.go delete mode 100644 client/geo/cip.go delete mode 100644 client/geo/ipapi.go diff --git a/README.md b/README.md index 909af2c..13c2f33 100644 --- a/README.md +++ b/README.md @@ -57,9 +57,9 @@ ERR: 除非有必要,否则全部 error 都使用 `errors.Wrap()` 包裹(如 客户端: -| version(1) | name_len(1) | name_buf(n) | -|------------|-------------|-------------| -| 版本号 | 名称长度 | 名称 | +| id(4) | +|--------| +| 客户端 ID | 服务端: diff --git a/client/client.go b/client/client.go index d62059f..b2e1ab2 100644 --- a/client/client.go +++ b/client/client.go @@ -2,42 +2,43 @@ package client import ( "bufio" + "encoding/binary" "fmt" "io" "log/slog" "net" - "os" "proxy-server/client/core" + "proxy-server/client/env" "proxy-server/client/geo" "proxy-server/client/report" "proxy-server/pkg/utils" "time" "errors" - "github.com/joho/godotenv" - _ "net/http/pprof" ) -var Geo geo.Func = geo.Ipapi - func Start() error { // 初始化环境变量 - slog.SetLogLoggerLevel(slog.LevelDebug) - - err := godotenv.Load() + slog.Debug("初始化环境变量...") + err := env.Init() if err != nil { - slog.Debug("没有本地环境变量文件") - } else { - online := os.Getenv("ENDPOINT_ONLINE") - if online != "" { - core.EndpointOnline = online - } - offline := os.Getenv("ENDPOINT_OFFLINE") - if offline != "" { - core.EndpointOffline = offline - } + return fmt.Errorf("初始化环境变量失败: %w", err) + } + + // 获取归属地 + slog.Debug("获取节点归属地...") + err = geo.Query() + if err != nil { + slog.Error("获取归属地失败", "err", err) + } + + // 注册节点 + slog.Debug("注册节点...") + id, host, err := report.Online(geo.Prov, geo.City, geo.Isp) + if err != nil { + return fmt.Errorf("注册节点失败: %w", err) } // 性能监控 @@ -49,24 +50,9 @@ func Start() error { // } // }() - // 获取归属地 - slog.Debug("获取节点归属地...") - prov, city, isp, err := Geo() - if err != nil { - slog.Error("获取归属地失败", "err", err) - } - - // 注册节点 - slog.Debug("注册节点...") - host, err := report.Online(prov, city, isp) - if err != nil { - slog.Error("节点注册失败", "err", err) - return err - } - // 建立控制通道 for { - err := ctrl(host) + err := ctrl(id, host) if err != nil { slog.Error("建立控制通道失败", "err", err) slog.Info(fmt.Sprintf("%d 秒后重试", core.RetryInterval)) @@ -75,7 +61,7 @@ func Start() error { } } -func ctrl(host string) error { +func ctrl(id int32, host string) error { ctrlAddr := net.JoinHostPort(host, fmt.Sprintf("%d", core.FwdCtrlPort)) dataAddr := net.JoinHostPort(host, fmt.Sprintf("%d", core.FwdDataPort)) @@ -86,25 +72,19 @@ func ctrl(host string) error { } defer utils.Close(conn) - reader := bufio.NewReader(conn) - - // 请求转发端口 - _, err = conn.Write([]byte{core.Version}) + // 发送客户端信息 + var buf = make([]byte, 4) + _, err = binary.Encode(buf, binary.BigEndian, id) if err != nil { - return errors.New("发送版本号失败") + return fmt.Errorf("编码客户端 ID 失败: %w", err) } - - // 发送客户端名称 - nameLen := byte(len(core.Name)) - nameBuf := make([]byte, 1+nameLen) - nameBuf[0] = nameLen - copy(nameBuf[1:], core.Name) - _, err = conn.Write(nameBuf) + _, err = conn.Write(buf) if err != nil { - return errors.New("发送 name 失败") + return fmt.Errorf("发送客户端 ID 失败: %w", err) } // 等待服务端响应 + reader := bufio.NewReader(conn) respBuf, err := reader.ReadByte() if err != nil { return errors.New("接收响应失败") diff --git a/client/core/env.go b/client/core/env.go index 8d8a615..403fa3c 100644 --- a/client/core/env.go +++ b/client/core/env.go @@ -1,12 +1,8 @@ package core const Version byte = 1 -const Name = "test-edge" -var FwdCtrlPort uint = 18080 -var FwdDataPort uint = 18081 -var RetryInterval uint = 5 +const FwdCtrlPort uint = 18080 +const FwdDataPort uint = 18081 -var EndpointOnline = "https://api.lanhuip.com/api/edge/online" -var EndpointOffline = "https://api.lanhuip.com/api/edge/offline" -var EndpointGeo = "http://cip.cc" +const RetryInterval uint = 5 diff --git a/client/env/env.go b/client/env/env.go new file mode 100644 index 0000000..548de09 --- /dev/null +++ b/client/env/env.go @@ -0,0 +1,53 @@ +package env + +import ( + "errors" + "flag" + "log/slog" +) + +var Mode = "dev" +var Name = "dev-edge" + +var EndpointOnline = "https://api.lanhuip.com/api/edge/online" +var EndpointOffline = "https://api.lanhuip.com/api/edge/offline" + +func Init() error { + + var env = flag.String("e", "dev", "环境变量,可选值 dev 或 prod") + var name = flag.String("n", "", "客户端唯一标识") + var online = flag.String("online", "", "服务注册地址") + var offline = flag.String("offline", "", "服务注销地址") + + flag.Parse() + + if env != nil && *env != "" { + if *env == "dev" || *env == "prod" { + Mode = *env + } else { + return errors.New("环境变量只能为 dev 或 prod") + } + } + + if name != nil && *name != "" { + Name = *name + } else { + return errors.New("客户端唯一标识不能为空") + } + + if online != nil && *online != "" { + EndpointOnline = *online + } + + if offline != nil && *offline != "" { + EndpointOffline = *offline + } + + if Mode == "dev" { + slog.SetLogLoggerLevel(slog.LevelDebug) + } else { + slog.SetLogLoggerLevel(slog.LevelWarn) + } + + return nil +} diff --git a/client/geo/cip.go b/client/geo/cip.go deleted file mode 100644 index bffe221..0000000 --- a/client/geo/cip.go +++ /dev/null @@ -1,57 +0,0 @@ -package geo - -import ( - "bufio" - "github.com/pkg/errors" - "log/slog" - "net/http" - "net/textproto" - "strings" -) - -func Cip() (prov, city, isp string, err error) { - const endpoint = "http://cip.cc" - - req, err := http.NewRequest("GET", endpoint, nil) - if err != nil { - return "", "", "", errors.Wrap(err, "创建请求失败") - } - req.Header.Set("User-Agent", "curl/8.9.1") - req.Header.Set("Accept", "*/*") - - resp, err := http.DefaultClient.Do(req) - if err != nil { - return "", "", "", errors.Wrap(err, "请求失败") - } - defer resp.Body.Close() - if resp.StatusCode != http.StatusOK { - return "", "", "", errors.New("请求失败,状态码: " + resp.Status) - } - - reader := textproto.NewReader(bufio.NewReader(resp.Body)) - _, err = reader.ReadLine() - if err != nil { - return "", "", "", errors.Wrap(err, "读取响应失败") - } - - addrLine, err := reader.ReadLine() - if err != nil { - return "", "", "", errors.Wrap(err, "读取响应失败") - } - addr := strings.Split(strings.Split(addrLine, ":")[1], " ") - prov = strings.TrimSpace(addr[1]) - city = strings.TrimSpace(addr[2]) - - ispLine, err := reader.ReadLine() - if err != nil { - return "", "", "", errors.Wrap(err, "读取响应失败") - } - isp = strings.TrimSpace(strings.Split(ispLine, ":")[1]) - - if prov == "" || city == "" || isp == "" { - return "", "", "", errors.New("解析数据为空") - } - - slog.Debug("获取归属地", "prov", prov, "city", city, "isp", isp) - return prov, city, isp, nil -} diff --git a/client/geo/geo.go b/client/geo/geo.go index 81a6ed1..fa5639e 100644 --- a/client/geo/geo.go +++ b/client/geo/geo.go @@ -1,3 +1,157 @@ package geo -type Func func() (prov, city, isp string, err error) +import ( + "bufio" + "encoding/json" + "fmt" + "io" + "net/http" + "net/textproto" + "proxy-server/client/env" + "strings" +) + +var ( + Ip string + Prov string + City string + Isp string +) + +func Query() (err error) { + + switch env.Mode { + case "dev": + err = dev() + default: + err = ipapi() + } + return err +} + +func dev() (err error) { + Prov = "河南省" + City = "郑州市" + Isp = "电信" + return nil +} + +func cip() (err error) { + const endpoint = "http://cip.cc" + + req, err := http.NewRequest("GET", endpoint, nil) + if err != nil { + return fmt.Errorf("创建请求失败: %w", err) + } + req.Header.Set("User-Agent", "curl/8.9.1") + req.Header.Set("Accept", "*/*") + + resp, err := http.DefaultClient.Do(req) + if err != nil { + return fmt.Errorf("执行请求失败: %w", err) + } + defer func(Body io.ReadCloser) { + _ = Body.Close() + }(resp.Body) + if resp.StatusCode != http.StatusOK { + return fmt.Errorf("状态码: %s", resp.Status) + } + + reader := textproto.NewReader(bufio.NewReader(resp.Body)) + ipLine, err := reader.ReadLine() + if err != nil { + return fmt.Errorf("读取响应失败: %w", err) + } + Ip = strings.TrimSpace(strings.Split(ipLine, ":")[1]) + + addrLine, err := reader.ReadLine() + if err != nil { + return fmt.Errorf("读取响应失败: %w", err) + } + addr := strings.Split(strings.Split(addrLine, ":")[1], " ") + Prov = strings.TrimSpace(addr[1]) + City = strings.TrimSpace(addr[2]) + + ispLine, err := reader.ReadLine() + if err != nil { + return fmt.Errorf("读取响应失败: %w", err) + } + Isp = strings.TrimSpace(strings.Split(ispLine, ":")[1]) + + return nil +} + +func ipapi() (err error) { + const endpoint = "http://ip-api.com/json/?fields=regionName,city,as,query&lang=zh-CN" + + resp, err := http.Get(endpoint) + if err != nil { + return fmt.Errorf("执行请求失败: %w", err) + } + defer func(Body io.ReadCloser) { + _ = Body.Close() + }(resp.Body) + + if resp.StatusCode != http.StatusOK { + return fmt.Errorf("状态码: %s", resp.Status) + } + + var data struct { + RegionName string `json:"regionName"` + City string `json:"city"` + As string `json:"as"` + Query string `json:"query"` + } + err = json.NewDecoder(resp.Body).Decode(&data) + if err != nil { + return fmt.Errorf("解析响应失败: %w", err) + } + + Ip = data.Query + Prov = data.RegionName + City = data.City + + var telecom = []string{"AS4134", "AS4812", "AS134419", "AS140292"} + var unicom = []string{"AS4837", "AS17621", "AS17816"} + var mobile = []string{ + "AS9808", "AS24444", "AS24445", "AS24547", "AS38019", + "AS56040", "AS56041", "AS56042", "AS56044", "AS56046", "AS56047", + "AS132525", "AS134810", + } + var foreign = []string{ + "AS9299", + } + + for _, telecomAsn := range telecom { + if strings.HasPrefix(data.As, telecomAsn) { + Isp = "电信" + break + } + } + if Isp == "" { + for _, unicomAsn := range unicom { + if strings.HasPrefix(data.As, unicomAsn) { + Isp = "联通" + break + } + } + } + if Isp == "" { + for _, mobileAsn := range mobile { + if strings.HasPrefix(data.As, mobileAsn) { + Isp = "移动" + break + } + } + } + if Isp == "" { + for _, foreignAsn := range foreign { + if strings.HasPrefix(data.As, foreignAsn) { + Isp = "国外" + break + } + } + } + + return nil +} diff --git a/client/geo/ipapi.go b/client/geo/ipapi.go deleted file mode 100644 index f26efa1..0000000 --- a/client/geo/ipapi.go +++ /dev/null @@ -1,83 +0,0 @@ -package geo - -import ( - "encoding/json" - "github.com/pkg/errors" - "net/http" - "strings" -) - -func Ipapi() (prov, city, isp string, err error) { - const endpoint = "http://ip-api.com/json/?fields=regionName,city,as&lang=zh-CN" - - resp, err := http.Get(endpoint) - if err != nil { - return "", "", "", err - } - defer resp.Body.Close() - - if resp.StatusCode != http.StatusOK { - return "", "", "", err - } - - var data struct { - RegionName string `json:"regionName"` - City string `json:"city"` - As string `json:"as"` - } - err = json.NewDecoder(resp.Body).Decode(&data) - if err != nil { - return "", "", "", err - } - - prov = data.RegionName - city = data.City - - var telecom = []string{"AS4134", "AS4812", "AS134419", "AS140292"} - var unicom = []string{"AS4837", "AS17621", "AS17816"} - var mobile = []string{ - "AS9808", "AS24444", "AS24445", "AS24547", "AS38019", - "AS56040", "AS56041", "AS56042", "AS56044", "AS56046", "AS56047", - "AS132525", "AS134810", - } - var foreign = []string{ - "AS9299", - } - - for _, telecomAsn := range telecom { - if strings.HasPrefix(data.As, telecomAsn) { - isp = "电信" - break - } - } - if isp == "" { - for _, unicomAsn := range unicom { - if strings.HasPrefix(data.As, unicomAsn) { - isp = "联通" - break - } - } - } - if isp == "" { - for _, mobileAsn := range mobile { - if strings.HasPrefix(data.As, mobileAsn) { - isp = "移动" - break - } - } - } - if isp == "" { - for _, foreignAsn := range foreign { - if strings.HasPrefix(data.As, foreignAsn) { - isp = "国外" - break - } - } - } - - if prov == "" || city == "" || isp == "" { - return "", "", "", errors.New("解析数据为空") - } - - return prov, city, isp, nil -} diff --git a/client/report/online.go b/client/report/online.go index dd8d54c..e9a14bc 100644 --- a/client/report/online.go +++ b/client/report/online.go @@ -3,13 +3,15 @@ package report import ( "encoding/json" "errors" + "fmt" "io" "net/http" "proxy-server/client/core" + "proxy-server/client/env" "strings" ) -func Online(prov, city, isp string) (host string, err error) { +func Online(prov, city, isp string) (id int32, host string, err error) { var ispInt = 0 switch isp { @@ -25,42 +27,43 @@ func Online(prov, city, isp string) (host string, err error) { "prov": prov, "city": city, "isp": ispInt, - "name": core.Name, + "name": env.Name, "version": core.Version, }) if err != nil { - return "", err + return 0, "", err } - req, err := http.NewRequest("POST", core.EndpointOnline, strings.NewReader(string(body))) + req, err := http.NewRequest("POST", env.EndpointOnline, strings.NewReader(string(body))) if err != nil { - return "", errors.New("创建节点注册请求失败") + return 0, "", fmt.Errorf("创建请求失败: %w", err) } req.Header.Set("Content-Type", "application/json") resp, err := http.DefaultClient.Do(req) if err != nil { - return "", errors.New("节点注册失败") + return 0, "", fmt.Errorf("执行请求失败: %w", err) } defer resp.Body.Close() if resp.StatusCode != http.StatusOK { - return "", errors.New("节点注册失败,状态码: " + resp.Status) + return 0, "", errors.New("状态码: " + resp.Status) } bytes, err := io.ReadAll(resp.Body) if err != nil { - return "", errors.New("读取节点注册响应失败") + return 0, "", fmt.Errorf("读取响应失败: %w", err) } var respBody struct { + Id int32 `json:"id"` Host string `json:"host"` } err = json.Unmarshal(bytes, &respBody) if err != nil { - return "", errors.New("解析节点注册响应失败") + return 0, "", fmt.Errorf("解析响应失败: %w", err) } if respBody.Host == "" { - return "", errors.New("节点注册失败,响应体为空") + return 0, "", errors.New("响应体为空") } - return respBody.Host, nil + return respBody.Id, respBody.Host, nil } diff --git a/cmd/client/main.go b/cmd/client/main.go index d6e574a..ab4a0d9 100644 --- a/cmd/client/main.go +++ b/cmd/client/main.go @@ -5,6 +5,6 @@ import "proxy-server/client" func main() { err := client.Start() if err != nil { - println(err) + println(err.Error()) } } diff --git a/cmd/server/main.go b/cmd/server/main.go index 3b826bd..3f3d746 100644 --- a/cmd/server/main.go +++ b/cmd/server/main.go @@ -7,5 +7,7 @@ import ( func main() { var app = server.New() var err = app.Run() - println(err) + if err != nil { + println(err.Error()) + } } diff --git a/server/fwd/ctrl.go b/server/fwd/ctrl.go index 53af34d..bd9ae73 100644 --- a/server/fwd/ctrl.go +++ b/server/fwd/ctrl.go @@ -3,20 +3,22 @@ package fwd import ( "bufio" "context" + "encoding/binary" + "fmt" + "io" "log/slog" "net" "proxy-server/pkg/utils" "proxy-server/server/fwd/core" "proxy-server/server/fwd/dispatcher" "proxy-server/server/fwd/metrics" - "proxy-server/server/fwd/repo" "proxy-server/server/pkg/env" - "proxy-server/server/pkg/orm" + "proxy-server/server/report" "strconv" "strings" "time" - "github.com/pkg/errors" + "errors" ) type CtrlCmd struct { @@ -33,7 +35,7 @@ func (s *Service) startCtrlTun() error { // 监听端口 ls, err := net.Listen("tcp", ":"+strconv.Itoa(int(ctrlPort))) if err != nil { - return errors.Wrap(err, "监听控制通道失败") + return fmt.Errorf("监听控制通道失败: %w", err) } defer utils.Close(ls) @@ -67,58 +69,49 @@ func (s *Service) startCtrlTun() error { func (s *Service) processCtrlConn(conn net.Conn) error { reader := bufio.NewReader(conn) - // version - version, err := reader.ReadByte() + var recv = make([]byte, 4) + _, err := io.ReadFull(reader, recv) if err != nil { - _ = ctrlResp(conn, CtrlFail) - return errors.Wrap(err, "获取版本号失败") + return fmt.Errorf("读取客户端 ID 失败: %w", err) + } + var clientId = int32(binary.BigEndian.Uint32(recv)) + + // 分配端口 + var minim uint16 = 20000 + var maxim uint16 = 60000 + var fwdPort uint16 + for i := minim; i < maxim; i++ { + var _, ok = s.fwdPortMap[i] + if !ok { + fwdPort = i + s.fwdPortMap[i] = clientId + break + } + } + if fwdPort == 0 { + return errors.New("没有可用的端口") } - // name - nameLen, err := reader.ReadByte() + // 报告端口分配 + if s.Config.Id == nil || *s.Config.Id == 0 { + return errors.New("转发服务未成功注册,无法提供服务") + } + err = report.Assigned(s.ctx, *s.Config.Id, clientId, fwdPort) if err != nil { - _ = ctrlResp(conn, CtrlFail) - return errors.Wrap(err, "获取 name 失败") + return fmt.Errorf("报告端口分配失败: %w", err) } - nameBuf, err := utils.ReadBuffer(reader, int(nameLen)) + + // 响应客户端 + _, err = conn.Write([]byte{1}) if err != nil { - _ = ctrlResp(conn, CtrlFail) - return errors.Wrap(err, "获取 name 失败") + return fmt.Errorf("响应客户端失败: %w", err) } - name := string(nameBuf) - - if name == "" { - _ = ctrlResp(conn, CtrlFail) - return errors.New("客户端名称不能为空") - } - - // 检查客户端 - var node repo.Node - err = orm.DB.Take(&node, &repo.Node{ - Name: name, - }).Error - if err != nil { - _ = ctrlResp(conn, CtrlFail) - return errors.Wrap(err, "查询客户端失败") - } - - if version != node.Version { - _ = ctrlResp(conn, CtrlFail) - return errors.New("客户端版本不匹配") - } - - err = ctrlResp(conn, CtrlDone) - if err != nil { - return errors.Wrap(err, "向客户端发送响应失败") - } - - port := node.FwdPort - slog.Info("监听转发端口", "port", port, "client", name) // 启动转发服务 - proxy, err := dispatcher.New(port) + slog.Info("监听转发端口", "port", fwdPort, "client", clientId) + proxy, err := dispatcher.New(fwdPort) if err != nil { - return errors.Wrap(err, "创建 socks 转发服务失败") + return err } defer proxy.Close() @@ -168,7 +161,7 @@ func (s *Service) processCtrlConn(conn net.Conn) error { case err == nil: return errors.New("客户端握手失败") default: - return errors.Wrap(err, "客户端意外断开连接") + return fmt.Errorf("客户端意外断开连接: %w", err) } case user := <-proxy.Conn: metrics.TimerAuth.Store(user.Conn, time.Now()) @@ -226,15 +219,3 @@ func (s *Service) processUserConn(user *core.Conn, ctrl net.Conn) error { return nil } - -type CtrlResult byte - -const ( - CtrlFail CtrlResult = iota - CtrlDone -) - -func ctrlResp(conn net.Conn, result CtrlResult) error { - _, err := conn.Write([]byte{byte(result)}) - return err -} diff --git a/server/fwd/fwd.go b/server/fwd/fwd.go index 6c5a4be..9a63755 100644 --- a/server/fwd/fwd.go +++ b/server/fwd/fwd.go @@ -9,6 +9,7 @@ import ( ) type Config struct { + Id *int32 } type Service struct { @@ -22,6 +23,8 @@ type Service struct { ctrlConnWg utils.CountWaitGroup dataConnWg utils.CountWaitGroup userConnWg utils.CountWaitGroup + + fwdPortMap map[uint16]int32 // 转发端口映射,key 为端口号,value 为边缘节点 ID } func New(config *Config) *Service { @@ -31,9 +34,10 @@ func New(config *Config) *Service { ctx, cancel := context.WithCancel(context.Background()) return &Service{ - Config: config, - ctx: ctx, - cancel: cancel, + Config: config, + ctx: ctx, + cancel: cancel, + fwdPortMap: make(map[uint16]int32), } } diff --git a/server/report/report.go b/server/report/report.go index f95ec79..cc296d7 100644 --- a/server/report/report.go +++ b/server/report/report.go @@ -47,7 +47,7 @@ func Offline(ctx context.Context, name string) (err error) { return err } -func Assigned(ctx context.Context, id int32, edgeId int32, port int16) (err error) { +func Assigned(ctx context.Context, id int32, edgeId int32, port uint16) (err error) { _, err = repeat(ctx, env.EndpointAssigned, map[string]any{ "proxy": id, "edge": edgeId, @@ -85,7 +85,7 @@ func repeat(ctx context.Context, endpoint string, body any) (string, error) { default: } - slog.Warn("服务注册失败,五秒后重试", "err", err) + slog.Warn("服务调用失败,五秒后重试", "err", err) time.Sleep(5 * time.Second) } } diff --git a/server/server.go b/server/server.go index ebfafa9..e09f214 100644 --- a/server/server.go +++ b/server/server.go @@ -100,6 +100,7 @@ func (s *server) Run() (err error) { id, err := report.Online(ctx, s.name) if err != nil { reportErrCh <- err + return } s.id = id }() @@ -190,7 +191,9 @@ func (s *server) restore() error { } func (s *server) startFwd(ctx context.Context) error { - server := fwd.New(nil) + server := fwd.New(&fwd.Config{ + Id: &s.id, + }) go func() { <-ctx.Done() server.Stop()