From f86cf47e86fafb296166c16c3677fd30182b1fca Mon Sep 17 00:00:00 2001 From: luorijun Date: Wed, 14 May 2025 15:13:44 +0800 Subject: [PATCH] =?UTF-8?q?=E4=BC=98=E5=8C=96=E6=9C=8D=E5=8A=A1=E5=90=AF?= =?UTF-8?q?=E5=8A=A8=E6=B5=81=E7=A8=8B?= MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit --- client/geo/ipapi.go | 45 +++++++++++- server/fwd/fwd.go | 14 ++-- server/pkg/env/env.go | 11 ++- server/pkg/log/logs.go | 10 +-- server/report/report.go | 91 +++++++++++++++++++++++ server/server.go | 159 +++++++++++++++++----------------------- server/web/web.go | 36 +++++---- 7 files changed, 243 insertions(+), 123 deletions(-) create mode 100644 server/report/report.go diff --git a/client/geo/ipapi.go b/client/geo/ipapi.go index c4c87b6..f26efa1 100644 --- a/client/geo/ipapi.go +++ b/client/geo/ipapi.go @@ -4,6 +4,7 @@ import ( "encoding/json" "github.com/pkg/errors" "net/http" + "strings" ) func Ipapi() (prov, city, isp string, err error) { @@ -31,7 +32,49 @@ func Ipapi() (prov, city, isp string, err error) { prov = data.RegionName city = data.City - isp = data.As + + var telecom = []string{"AS4134", "AS4812", "AS134419", "AS140292"} + var unicom = []string{"AS4837", "AS17621", "AS17816"} + var mobile = []string{ + "AS9808", "AS24444", "AS24445", "AS24547", "AS38019", + "AS56040", "AS56041", "AS56042", "AS56044", "AS56046", "AS56047", + "AS132525", "AS134810", + } + var foreign = []string{ + "AS9299", + } + + for _, telecomAsn := range telecom { + if strings.HasPrefix(data.As, telecomAsn) { + isp = "电信" + break + } + } + if isp == "" { + for _, unicomAsn := range unicom { + if strings.HasPrefix(data.As, unicomAsn) { + isp = "联通" + break + } + } + } + if isp == "" { + for _, mobileAsn := range mobile { + if strings.HasPrefix(data.As, mobileAsn) { + isp = "移动" + break + } + } + } + if isp == "" { + for _, foreignAsn := range foreign { + if strings.HasPrefix(data.As, foreignAsn) { + isp = "国外" + break + } + } + } + if prov == "" || city == "" || isp == "" { return "", "", "", errors.New("解析数据为空") } diff --git a/server/fwd/fwd.go b/server/fwd/fwd.go index 302b774..6c5a4be 100644 --- a/server/fwd/fwd.go +++ b/server/fwd/fwd.go @@ -37,11 +37,7 @@ func New(config *Config) *Service { } } -func (s *Service) Close() { - s.cancel() -} - -func (s *Service) Run() { +func (s *Service) Run() error { slog.Info("启动 fwd 服务") errQuit := make(chan struct{}, 2) @@ -79,7 +75,7 @@ func (s *Service) Run() { slog.Info("fwd 服务主动退出") case <-errQuit: slog.Warn("fwd 服务异常退出") - s.Close() + s.Stop() } wg.Wait() @@ -103,4 +99,10 @@ func (s *Service) Run() { slog.Debug("转发服务已关闭") wg.Wait() slog.Info("fwd 服务已退出") + + return nil +} + +func (s *Service) Stop() { + s.cancel() } diff --git a/server/pkg/env/env.go b/server/pkg/env/env.go index 1d09593..93b5f0d 100644 --- a/server/pkg/env/env.go +++ b/server/pkg/env/env.go @@ -25,8 +25,9 @@ var ( DbPassword string DbTimezone string = "Asia/Shanghai" - EndpointOnline string - EndpointOffline string + EndpointOnline string + EndpointOffline string + EndpointAssigned string ) func Init() { @@ -137,4 +138,10 @@ func Init() { } else { panic("环境变量 ENDPOINT_OFFLINE 未设置") } + value = os.Getenv("ENDPOINT_ASSIGNED") + if value != "" { + EndpointAssigned = value + } else { + panic("环境变量 ENDPOINT_ASSIGNED 未设置") + } } diff --git a/server/pkg/log/logs.go b/server/pkg/log/logs.go index df96203..2e0f458 100644 --- a/server/pkg/log/logs.go +++ b/server/pkg/log/logs.go @@ -3,6 +3,7 @@ package log import ( "log/slog" "os" + "proxy-server/server/pkg/env" "time" "github.com/lmittmann/tint" @@ -10,13 +11,12 @@ import ( ) func Init() { - mode := os.Getenv("APP_LOG_MODE") - if mode == "" { - mode = "dev" + var mode = env.AppLogMode + var level = slog.LevelDebug + if mode == "test" { + level = slog.LevelInfo } - level := slog.LevelInfo - switch mode { case "dev": writer := colorable.NewColorable(os.Stdout) diff --git a/server/report/report.go b/server/report/report.go new file mode 100644 index 0000000..f95ec79 --- /dev/null +++ b/server/report/report.go @@ -0,0 +1,91 @@ +package report + +import ( + "context" + "encoding/base64" + "encoding/json" + "errors" + "io" + "log/slog" + "net/http" + "proxy-server/client/core" + "proxy-server/server/pkg/env" + "strings" + "time" +) + +func Online(ctx context.Context, name string) (id int32, err error) { + var resp string + resp, err = repeat(ctx, env.EndpointOnline, map[string]any{ + "name": name, + "version": core.Version, + }) + if err != nil { + return 0, err + } + + var body struct { + Id int32 `json:"id"` + } + err = json.Unmarshal([]byte(resp), &body) + if err != nil { + return 0, err + } + + if body.Id == 0 { + return 0, errors.New("服务注册返回 ID 有误") + } else { + return body.Id, nil + } +} + +func Offline(ctx context.Context, name string) (err error) { + _, err = repeat(ctx, env.EndpointOffline, map[string]any{ + "name": name, + "version": core.Version, + }) + return err +} + +func Assigned(ctx context.Context, id int32, edgeId int32, port int16) (err error) { + _, err = repeat(ctx, env.EndpointAssigned, map[string]any{ + "proxy": id, + "edge": edgeId, + "port": port, + }) + return err +} + +func repeat(ctx context.Context, endpoint string, body any) (string, error) { + bodyStr, err := json.Marshal(body) + if err != nil { + panic(err) + } + + for { + req, err := http.NewRequest("POST", endpoint, strings.NewReader(string(bodyStr))) + if err != nil { + panic(err) + } + + req.Header.Set("Content-Type", "application/json") + req.Header.Set("Authorization", "Basic "+base64.RawURLEncoding.EncodeToString([]byte("proxy:proxy"))) + + resp, err := http.DefaultClient.Do(req) + if resp != nil && resp.StatusCode == http.StatusOK { + var body, err = io.ReadAll(resp.Body) + if err != nil { + return "", err + } + return string(body), nil + } + select { + case <-ctx.Done(): + return "", ctx.Err() + default: + } + + slog.Warn("服务注册失败,五秒后重试", "err", err) + time.Sleep(5 * time.Second) + } +} diff --git a/server/server.go b/server/server.go index 414315a..ebfafa9 100644 --- a/server/server.go +++ b/server/server.go @@ -2,10 +2,7 @@ package server import ( "context" - "encoding/base64" - "encoding/json" "log/slog" - "net/http" "os" "os/signal" "proxy-server/pkg/utils" @@ -14,8 +11,8 @@ import ( "proxy-server/server/pkg/env" "proxy-server/server/pkg/log" "proxy-server/server/pkg/orm" + "proxy-server/server/report" "proxy-server/server/web" - "strings" "sync" "syscall" "time" @@ -33,6 +30,7 @@ const ( ) type server struct { + id int32 name string } @@ -62,27 +60,22 @@ func (s *server) Run() (err error) { // 转发服务 wg.Add(1) - fwdQuit := make(chan struct{}, 1) + fwdQuit := make(chan error, 1) + defer close(fwdQuit) go func() { defer wg.Done() - defer close(fwdQuit) - err := startFwdServer(ctx) - if err != nil { - slog.Error("转发服务发生错误", "err", err) - } - fwdQuit <- struct{}{} + err = s.startFwd(ctx) + fwdQuit <- err }() // 接口服务 wg.Add(1) - apiQuit := make(chan struct{}, 1) + apiQuit := make(chan error, 1) + defer close(apiQuit) go func() { defer wg.Done() - err := startWebServer(ctx) - if err != nil { - slog.Error("接口服务发生错误", "err", err) - } - apiQuit <- struct{}{} + err := s.startWeb(ctx) + apiQuit <- err }() // debug @@ -101,42 +94,73 @@ func (s *server) Run() (err error) { // 报告上线 slog.Debug("报告服务上线") - go reportOnline(ctx, s.name) + var reportErrCh = make(chan error, 1) + defer close(reportErrCh) + go func() { + id, err := report.Online(ctx, s.name) + if err != nil { + reportErrCh <- err + } + s.id = id + }() // 等待退出信号 osQuit := make(chan os.Signal, 1) signal.Notify(osQuit, os.Interrupt, syscall.SIGTERM) + var reportErr error select { case <-osQuit: slog.Info("服务主动退出") - case <-fwdQuit: - slog.Warn("fwd 服务异常退出") - case <-apiQuit: - slog.Warn("web 服务异常退出") + case err := <-fwdQuit: + slog.Warn("fwd 服务异常退出", "err", err) + case err := <-apiQuit: + slog.Warn("web 服务异常退出", "err", err) + case reportErr = <-reportErrCh: + slog.Warn("报告服务上线发生错误", "err", reportErr) } - - // 报告下线 - slog.Debug("报告服务下线") - go reportOffline(ctx, s.name) - - // 退出其他服务 cancel() + // 报告下线 + if reportErr == nil { + slog.Debug("报告服务下线") + go func() { + err := report.Offline(ctx, s.name) + if err != nil { + slog.Error("报告服务下线发生错误", "err", err) + } + }() + } + + // 等待其它服务关闭 timeout, cancel := context.WithTimeout(context.Background(), 5*time.Second) defer cancel() select { case <-utils.ChanWgWait(timeout, &wg): - slog.Info("服务已退出") + slog.Info("服务正常关闭") case <-timeout.Done(): - slog.Warn("退出超时,强制退出") + slog.Warn("超时强制关闭") } return nil } -func (s *server) restore() (err error) { +func (s *server) init() error { + + err := godotenv.Load() + if err != nil { + println("没有本地环境变量文件") + } + + log.Init() + env.Init() + orm.Init() + + return nil +} + +func (s *server) restore() error { var file = "proxy.lock" bytes, err := os.ReadFile(file) @@ -165,74 +189,23 @@ func (s *server) restore() (err error) { return nil } -func (s *server) init() error { - - err := godotenv.Load() - if err != nil { - println("没有本地环境变量文件") - } - - log.Init() - env.Init() - orm.Init() - - return nil -} - -func startFwdServer(ctx context.Context) error { +func (s *server) startFwd(ctx context.Context) error { server := fwd.New(nil) go func() { <-ctx.Done() - server.Close() + server.Stop() }() - server.Run() - return nil + return server.Run() } -func startWebServer(ctx context.Context) error { - return web.Start(ctx) -} - -func reportOnline(ctx context.Context, name string) { - reportRepeat(ctx, env.EndpointOnline, map[string]any{ - "name": name, - "version": Version, - }) -} - -func reportOffline(ctx context.Context, name string) { - reportRepeat(ctx, env.EndpointOffline, map[string]any{ - "name": name, - "version": Version, - }) -} - -func reportRepeat(ctx context.Context, endpoint string, body any) { - bodyStr, err := json.Marshal(body) - if err != nil { - panic(err) - } - - for { - req, err := http.NewRequest("POST", endpoint, strings.NewReader(string(bodyStr))) +func (s *server) startWeb(ctx context.Context) error { + server := web.New() + go func() { + <-ctx.Done() + err := server.Stop() if err != nil { - panic(err) + slog.Error("web 服务关闭发生错误", "err", err) } - - req.Header.Set("Content-Type", "application/json") - req.Header.Set("Authorization", "Basic "+base64.RawURLEncoding.EncodeToString([]byte("proxy:proxy"))) - - resp, err := http.DefaultClient.Do(req) - if resp != nil && resp.StatusCode == http.StatusOK { - return - } - select { - case <-ctx.Done(): - return - default: - } - - slog.Warn("服务注册失败,五秒后重试", "err", err) - time.Sleep(5 * time.Second) - } + }() + return server.Run() } diff --git a/server/web/web.go b/server/web/web.go index d47c365..2fc1b64 100644 --- a/server/web/web.go +++ b/server/web/web.go @@ -2,38 +2,42 @@ package web import ( "context" - "log/slog" "net/http" "proxy-server/server/pkg/env" "strconv" "github.com/gin-gonic/gin" - "github.com/pkg/errors" ) -var server *http.Server +type Server struct { + http *http.Server +} -func Start(ctx context.Context) error { +func New() *Server { + return &Server{} +} + +func (s *Server) Run() error { address := ":" + strconv.Itoa(int(env.AppWebPort)) engine := gin.Default() - server = &http.Server{Addr: address, Handler: engine} + s.http = &http.Server{Addr: address, Handler: engine} // 配置中间件和路由 Router(engine) - // 监听关闭信号 - go func() { - <-ctx.Done() - err := server.Shutdown(context.Background()) - if err != nil { - slog.Error("web 服务关闭失败", err) - } - }() - // 启动服务 - err := server.ListenAndServe() + err := s.http.ListenAndServe() if err != nil { - return errors.Wrap(err, "web 服务启动失败") + return err + } + + return nil +} + +func (s *Server) Stop() error { + err := s.http.Shutdown(context.Background()) + if err != nil { + return err } return nil