diff --git a/cmd/wrapper/main.go b/cmd/wrapper/main.go deleted file mode 100644 index 490bf07..0000000 --- a/cmd/wrapper/main.go +++ /dev/null @@ -1,85 +0,0 @@ -package main - -import ( - "context" - "log/slog" - "os" - "os/signal" - "platform/pkg/env" - "platform/pkg/logs" - "time" -) - -func main() { - - // 初始化环境 - env.Init() - logs.Init() - - // 上下文 - ctx, cancel := context.WithCancel(context.Background()) - - // 监听退出 - exit := make(chan os.Signal, 1) - defer close(exit) - signal.Notify(exit, os.Interrupt, os.Kill) - defer signal.Stop(exit) - - // 启动管理子线程 - errCh := make(chan error, 1) - go func() { - defer close(errCh) - err := start(ctx) - if err != nil { - errCh <- err - } - }() - - select { - case <-exit: - slog.Debug("exit by signal") - cancel() - case err := <-errCh: - slog.Error("exit by error", "error", err) - } -} - -// 连接池,硬编码提供 10000 的容量 -var idle = 100 -var maximum = 10000 - -var pool = make(map[string]*Node, 10000) - -var tick = 1 * time.Minute - -type Node struct { - Ip string -} - -var last time.Time - -func start(ctx context.Context) error { - ticker := time.NewTicker(tick) - go func() { - <-ctx.Done() - ticker.Stop() - }() - - for curr := range ticker.C { - last = curr - go func() { - process(ctx, curr) - }() - } - - return nil -} - -func process(ctx context.Context, curr time.Time) { - - // 查询节点状态 - - // 筛选在线节点添加到节点池 - - // -} diff --git a/pkg/remote/remote.go b/pkg/remote/remote.go index b1725d5..4b70d18 100644 --- a/pkg/remote/remote.go +++ b/pkg/remote/remote.go @@ -21,7 +21,7 @@ func Init() { // todo 从环境变量中获取参数 Client = client{ url: "http://103.139.212.110:9989", - token: "tHDarLc1ct6M9NMAxeO98lN2YsEadYSx.anVpcA==.MTc0MzA4MTAwMQ==", + token: "PhdnRF3z6VF2sPgygTSl1Xx6QJN0yFIK.anVpcA==.MTc0MzE2ODAwMQ==", } } @@ -279,11 +279,11 @@ type PortConfigsReq struct { } type AutoEdgeConfig struct { - Province string `json:"province"` - City string `json:"city"` - Isp string `json:"isp"` - Count int `json:"count"` - PacketLoss int `json:"packet_loss"` + Province string `json:"province,omitempty"` + City string `json:"city,omitempty"` + Isp string `json:"isp,omitempty"` + Count int `json:"count,omitempty"` + PacketLoss int `json:"packet_loss,omitempty"` } func (c *Gateway) GatewayPortConfigs(params []PortConfigsReq) error { @@ -319,7 +319,7 @@ func (c *Gateway) GatewayPortConfigs(params []PortConfigsReq) error { return err } - if result["code"] != 0 { + if result["code"].(float64) != 0 { return errors.New("failed to configure port") } @@ -395,14 +395,9 @@ func (c *Gateway) GatewayPortActive(param PortActiveReq) (*PortActiveResp, error // endregion -func (c *Gateway) requestGateway(method string, url string, data any) (*http.Response, error) { - jsonData, err := json.Marshal(data) - if err != nil { - return nil, err - } - +func (c *Gateway) requestGateway(method string, url string, data string) (*http.Response, error) { url = fmt.Sprintf("http://%s:%s@%s:9990%s", c.username, c.password, c.url, url) - req, err := http.NewRequest(method, url, strings.NewReader(string(jsonData))) + req, err := http.NewRequest(method, url, strings.NewReader(data)) if err != nil { return nil, err } diff --git a/test/test-api.http b/test/test-api.http index fbaa8b7..a1acdef 100644 --- a/test/test-api.http +++ b/test/test-api.http @@ -3,28 +3,65 @@ GET http://110.40.82.250:18702/server/index/getToken/key/juipbyjdapiverify ### remote 配置信息 GET http://103.139.212.110:9989/api/auto_query -token: tHDarLc1ct6M9NMAxeO98lN2YsEadYSx.anVpcA==.MTc0MzA4MTAwMQ== - -### gateway 端口信息 -GET http://api:123456@110.40.82.248:9990/port/active/ +token: PhdnRF3z6VF2sPgygTSl1Xx6QJN0yFIK.anVpcA==.MTc0MzE2ODAwMQ== ### remote 配置重置 POST http://103.139.212.110:9989/api/connect -token: tHDarLc1ct6M9NMAxeO98lN2YsEadYSx.anVpcA==.MTc0MzA4MTAwMQ== +token: PhdnRF3z6VF2sPgygTSl1Xx6QJN0yFIK.anVpcA==.MTc0MzE2ODAwMQ== Content-Type: application/json { "uuid": "7a17e8b4-cdc3-4500-bf16-4a665991a7f6", "auto_config": [ { - "count": 200 + "count": 1 } ] } +### gateway 端口信息 +GET http://api:123456@110.40.82.248:9990/port/active/ + +### gateway 配置端口代理 +POST http://api:123456@110.40.82.248:9990/port/configs +Content-Type: application/json + +//[ +// { +// "port": 10000, +// "status": true, +// "userpass": "mIfSlXIBwVUrKqObNdTzvB:cyDaGtfeBlhRfojTYJP2tR", +// "auto_edge_config": { +// "count": 1 +// } +// } +//] +[ + { + "port": 10000, + "status": false, + "edge": [] + } +] + +### 设备令牌 +POST http://localhost:8080/api/auth/token +Content-Type: application/json + +{ + "client_id": "test", + "client_secret": "test", + "grant_type": "client_credentials" +} + +> {% + client.global.set("access_token", response.body.access_token); + client.global.set("refresh_token", response.body.refresh_token); +%} + ### 密码模式代理 POST http://localhost:8080/api/channel/create -Authorization: Bearer b21568ed-09a9-4f1c-add6-d6b24bde7473 +Authorization: Bearer {{access_token}} Content-Type: application/json Accept: application/json @@ -32,7 +69,7 @@ Accept: application/json "resource_id": 1, "protocol": "http", "auth_type": 1, - "count": 200, + "count": 1, "prov": "", "city": "", "isp": "", diff --git a/web/auth.go b/web/auth.go index c1bbc24..e16941b 100644 --- a/web/auth.go +++ b/web/auth.go @@ -2,6 +2,7 @@ package web import ( "platform/web/common" + "slices" "strings" "platform/web/services" @@ -9,6 +10,66 @@ import ( "github.com/gofiber/fiber/v2" ) +func Permit(types []services.PayloadType, permissions ...string) fiber.Handler { + return func(c *fiber.Ctx) error { + // 获取令牌 + var header = c.Get("Authorization") + var token = strings.TrimPrefix(header, "Bearer ") + if token == "" { + return c.Status(fiber.StatusUnauthorized).JSON(common.ErrResp{ + Error: true, + Message: "没有权限", + }) + } + + // 验证令牌 + auth, err := services.Session.Find(c.Context(), token) + if err != nil { + return c.Status(fiber.StatusUnauthorized).JSON(common.ErrResp{ + Error: true, + Message: "没有权限", + }) + } + + // 检查权限 + // switch auth.Payload.Type { + // case services.PayloadAdmin: + // // 管理员不需要权限检查 + // case services.PayloadUser: + // if len(permissions) > 0 && !auth.AnyPermission(permissions...) { + // return c.Status(fiber.StatusForbidden).JSON(common.ErrResp{ + // Error: true, + // Message: "拒绝访问", + // }) + // } + // default: + // return c.Status(fiber.StatusForbidden).JSON(common.ErrResp{ + // Error: true, + // Message: "拒绝访问", + // }) + // } + if !slices.Contains(types, auth.Payload.Type) { + return c.Status(fiber.StatusForbidden).JSON(common.ErrResp{ + Error: true, + Message: "拒绝访问", + }) + } + if len(permissions) > 0 && !auth.AnyPermission(permissions...) { + return c.Status(fiber.StatusForbidden).JSON(common.ErrResp{ + Error: true, + Message: "拒绝访问", + }) + } + + // 将认证信息存储在上下文中 + c.Locals("auth", auth) + c.Locals("access_token", token) // 存储原始令牌,便于后续操作 + + return c.Next() + } + +} + // PermitUser 创建针对单个路由的鉴权中间件 func PermitUser(permissions ...string) fiber.Handler { return func(c *fiber.Ctx) error { diff --git a/web/router.go b/web/router.go index f3dfca5..df6c655 100644 --- a/web/router.go +++ b/web/router.go @@ -2,6 +2,7 @@ package web import ( "platform/web/handlers" + "platform/web/services" "github.com/gofiber/fiber/v2" ) @@ -17,5 +18,10 @@ func ApplyRouters(app *fiber.App) { // 通道 channel := api.Group("/channel") - channel.Post("/create", PermitUser(), handlers.CreateChannel) + channel.Post("/create", Permit([]services.PayloadType{ + services.PayloadClientConfidential, + services.PayloadClientPublic, + services.PayloadUser, + services.PayloadAdmin, + }), handlers.CreateChannel) } diff --git a/web/services/channel.go b/web/services/channel.go index da3c888..7248598 100644 --- a/web/services/channel.go +++ b/web/services/channel.go @@ -286,30 +286,38 @@ func (s *channelService) RemoteCreateChannel( } slog.Debug("检查用户权限完成") - // 申请节点 - edgeAssigns, err := assignEdge(count, filter) - if err != nil { - return nil, err - } - debugAssigned := fmt.Sprintf("%+v", edgeAssigns) - slog.Debug("申请节点完成", "edgeAssigns", debugAssigned) + var postAssigns []AssignPortResult + err = q.Q.Transaction(func(tx *q.Query) error { + // 申请节点 + edgeAssigns, err := assignEdge(count, filter) + if err != nil { + return err + } + debugAssigned := fmt.Sprintf("%+v", edgeAssigns) + slog.Debug("申请节点完成", "edgeAssigns", debugAssigned) - // 分配端口 - expiration := time.Now().Add(time.Duration(resource.Live) * time.Second) - portAssigns, err := assignPort(edgeAssigns, auth.Payload.Id, protocol, authType, expiration, filter) + // 分配端口 + expiration := time.Now().Add(time.Duration(resource.Live) * time.Second) + postAssigns, err = assignPort(edgeAssigns, auth.Payload.Id, protocol, authType, expiration, filter) + if err != nil { + return err + } + debugChannels := fmt.Sprintf("%+v", postAssigns) + slog.Debug("分配端口完成", "portAssigns", debugChannels) + + return nil + }) if err != nil { return nil, err } - debugChannels := fmt.Sprintf("%+v", portAssigns) - slog.Debug("分配端口完成", "portAssigns", debugChannels) // 缓存并关闭代理 - err = cache(ctx, portAssigns) + err = cache(ctx, postAssigns) if err != nil { return nil, err } - return portAssigns, nil + return postAssigns, nil } // endregion @@ -349,13 +357,9 @@ func checkUser(auth *AuthContext, resource *ResourceInfo, count int) error { } // assignEdge 分配边缘节点数量 -func assignEdge(count int, filter NodeFilterConfig) ([]*AssignEdgeResult, error) { - // 查询现有节点连接情况 - edgeConfigs, err := remote.Client.CloudAutoQuery() - if err != nil { - return nil, err - } +func assignEdge(count int, filter NodeFilterConfig) (*AssignEdgeResult, error) { + // 查询可以使用的网关 proxies, err := q.Proxy. Where(q.Proxy.Type.Eq(1)). Find() @@ -363,81 +367,18 @@ func assignEdge(count int, filter NodeFilterConfig) ([]*AssignEdgeResult, error) return nil, err } - // 过滤需要变动的连接配置 - type ConfigInfo struct { - proxy *models.Proxy - config *remote.AutoConfig + // 查询已配置的节点 + allConfigs, err := remote.Client.CloudAutoQuery() + if err != nil { + return nil, err } - var total = count - var assigns = make([]*AssignEdgeResult, len(proxies), len(proxies)) + + // 查询已分配的节点 + var proxyIds = make([]int32, len(proxies)) for i, proxy := range proxies { - remoteConfigs := edgeConfigs[proxy.Name] - for _, config := range remoteConfigs { - if config.Isp == filter.Isp && config.City == filter.City && config.Province == filter.Prov { - total += config.Count - assigns[i] = &AssignEdgeResult{ - proxy: proxy, - count: config.Count, - } - } - } - if assigns[i] == nil { - assigns[i] = &AssignEdgeResult{ - proxy: proxy, - count: 0, - } - } + proxyIds[i] = proxy.ID } - avg := int(math.Ceil(float64(total) / float64(len(proxies)))) - - for i, assign := range assigns { - var prev = assign.count - var next = assign.count - if prev < avg && prev < total { - next = int(math.Min(float64(avg), float64(total))) - assigns[i].count = next - prev - total -= next - } else { - continue - } - // err := remote.Client.CloudConnect(remote.CloudConnectReq{ - // Uuid: assign.Proxy.Name, - // Edge: nil, - // AutoConfig: []remote.AutoConfig{{ - // Province: filter.Prov, - // City: filter.City, - // Isp: filter.Isp, - // Count: next, - // }}, - // }) - // if err != nil { - // return nil, err - // } - } - - return assigns, nil -} - -type AssignEdgeResult struct { - proxy *models.Proxy - count int -} - -// assignPort 分配指定数量的端口 -func assignPort( - assigns []*AssignEdgeResult, - userId int32, - protocol ChannelProtocol, - authType ChannelAuthType, - expiration time.Time, - filter NodeFilterConfig, -) ([]AssignPortResult, error) { - // 查询代理已配置端口 - var proxyIds = make([]int32, 0, len(assigns)) - for _, assigned := range assigns { - proxyIds = append(proxyIds, assigned.proxy.ID) - } - channels, err := q.Channel. + assigns, err := q.Channel. Select( q.Channel.ProxyID, q.Channel.ProxyPort). @@ -452,6 +393,99 @@ func assignPort( return nil, err } + // 过滤需要变动的连接配置 + var current = 0 + var result = make([]ProxyConfig, len(proxies)) + for i, proxy := range proxies { + remoteConfigs, ok := allConfigs[proxy.Name] + if !ok { + result[i] = ProxyConfig{ + proxy: proxy, + config: &remote.AutoConfig{ + Province: filter.Prov, + City: filter.City, + Isp: filter.Isp, + Count: 0, + }, + } + continue + } + for _, config := range remoteConfigs { + if config.Isp == filter.Isp && config.City == filter.City && config.Province == filter.Prov { + current += config.Count + result[i] = ProxyConfig{ + proxy: proxy, + config: &config, + } + } + } + } + + // 如果需要新增节点 + var needed = len(assigns) + count + if needed-current > 0 { + slog.Debug("新增新节点", "needed", needed, "current", current) + avg := int(math.Ceil(float64(needed) / float64(len(proxies)))) + for i, assign := range result { + var prev = assign.config.Count + var next = assign.config.Count + if prev >= avg || prev >= needed { + continue + } + + next = int(math.Min(float64(avg), float64(needed))) + result[i].config.Count = next - prev + needed -= next + err := remote.Client.CloudConnect(remote.CloudConnectReq{ + Uuid: assign.proxy.Name, + Edge: nil, + AutoConfig: []remote.AutoConfig{{ + Province: filter.Prov, + City: filter.City, + Isp: filter.Isp, + Count: next, + }}, + }) + if err != nil { + return nil, err + } + } + } + + return &AssignEdgeResult{ + configs: result, + channels: assigns, + }, nil +} + +type AssignEdgeResult struct { + configs []ProxyConfig + channels []*models.Channel +} + +type ProxyConfig struct { + proxy *models.Proxy + config *remote.AutoConfig +} + +// assignPort 分配指定数量的端口 +func assignPort( + proxies *AssignEdgeResult, + userId int32, + protocol ChannelProtocol, + authType ChannelAuthType, + expiration time.Time, + filter NodeFilterConfig, +) ([]AssignPortResult, error) { + var assigns = proxies.configs + var channels = proxies.channels + + // 查询代理已配置端口 + var proxyIds = make([]int32, 0, len(assigns)) + for _, assigned := range assigns { + proxyIds = append(proxyIds, assigned.proxy.ID) + } + // 端口查找表 var proxyPorts = make(map[uint64]struct{}) for _, channel := range channels { @@ -462,8 +496,9 @@ func assignPort( // 配置启用代理 var result = make([]AssignPortResult, len(assigns)) for i, assign := range assigns { + var err error var proxy = assign.proxy - var count = assign.count + var count = assign.config.Count result[i] = AssignPortResult{ Proxy: proxy, @@ -540,15 +575,15 @@ func assignPort( } // 提交端口配置 - // gateway := remote.InitGateway( - // proxy.Host, - // "api", - // "123456", - // ) - // err = gateway.GatewayPortConfigs(configs) - // if err != nil { - // return nil, err - // } + gateway := remote.InitGateway( + proxy.Host, + "api", + "123456", + ) + err = gateway.GatewayPortConfigs(configs) + if err != nil { + return nil, err + } // 保存到数据库 err = q.Channel.