基本完成远端配置的通道创建

This commit is contained in:
2025-03-28 18:15:03 +08:00
parent edec734b71
commit 444b1a7b99
9 changed files with 295 additions and 158 deletions

View File

@@ -1,5 +1,33 @@
package main package main
func main() { import (
println(string(rune(122))) "fmt"
"platform/pkg/env"
"platform/pkg/logs"
"platform/pkg/orm"
"platform/web/models"
q "platform/web/queries"
)
type ResourceInfo struct {
data models.Resource
pss models.ResourcePss
}
func main() {
env.Init()
logs.Init()
orm.Init()
var resource = new(ResourceInfo)
data := q.Resource.As("data")
pss := q.ResourcePss.As("pss")
_ = data.Debug().Scopes(orm.Alias(data)).
Select(data.ALL, pss.ALL).
LeftJoin(q.ResourcePss.As("pss"), pss.ResourceID.EqCol(data.ID)).
Where(data.ID.Eq(1)).
Scan(&resource)
fmt.Printf("%+v\n", resource)
} }

View File

@@ -3,6 +3,7 @@ package remote
import ( import (
"encoding/json" "encoding/json"
"errors" "errors"
"fmt"
"io" "io"
"net/http" "net/http"
"strconv" "strconv"
@@ -19,7 +20,8 @@ var Client client
func Init() { func Init() {
// todo 从环境变量中获取参数 // todo 从环境变量中获取参数
Client = client{ Client = client{
url: "http://103.139.212.110", url: "http://103.139.212.110:9989",
token: "tHDarLc1ct6M9NMAxeO98lN2YsEadYSx.anVpcA==.MTc0MzA4MTAwMQ==",
} }
} }
@@ -106,15 +108,15 @@ type CloudConnectReq struct {
AutoConfig []AutoConfig `json:"auto_config,omitempty"` AutoConfig []AutoConfig `json:"auto_config,omitempty"`
} }
func (c *client) CloudConnect(param CloudConnectReq) (int, error) { func (c *client) CloudConnect(param CloudConnectReq) error {
data, err := json.Marshal(param) data, err := json.Marshal(param)
if err != nil { if err != nil {
return 0, err return err
} }
resp, err := c.requestCloud("POST", "/connect", string(data)) resp, err := c.requestCloud("POST", "/connect", string(data))
if err != nil { if err != nil {
return 0, err return err
} }
defer func(Body io.ReadCloser) { defer func(Body io.ReadCloser) {
@@ -122,25 +124,25 @@ func (c *client) CloudConnect(param CloudConnectReq) (int, error) {
}(resp.Body) }(resp.Body)
if resp.StatusCode != http.StatusOK { if resp.StatusCode != http.StatusOK {
return 0, errors.New("failed to connect") return errors.New("failed to connect")
} }
body, err := io.ReadAll(resp.Body) body, err := io.ReadAll(resp.Body)
if err != nil { if err != nil {
return 0, err return err
} }
var result map[string]any var result map[string]any
err = json.Unmarshal(body, &result) err = json.Unmarshal(body, &result)
if err != nil { if err != nil {
return 0, err return err
} }
if result["status"] == "error" { if result["status"] == "error" {
return 0, errors.New(result["details"].(string)) return errors.New(result["details"].(string))
} }
return int(result["connected_edges"].(float64)), nil return nil
} }
// endregion // endregion
@@ -202,7 +204,7 @@ func (c *client) CloudDisconnect(param CloudDisconnectReq) (int, error) {
// region cloud:/auto_query // region cloud:/auto_query
type CloudConnectResp map[string]AutoConfig type CloudConnectResp map[string][]AutoConfig
func (c *client) CloudAutoQuery() (CloudConnectResp, error) { func (c *client) CloudAutoQuery() (CloudConnectResp, error) {
resp, err := c.requestCloud("GET", "/auto_query", "") resp, err := c.requestCloud("GET", "/auto_query", "")
@@ -235,7 +237,8 @@ func (c *client) CloudAutoQuery() (CloudConnectResp, error) {
func (c *client) requestCloud(method string, url string, data string) (*http.Response, error) { func (c *client) requestCloud(method string, url string, data string) (*http.Response, error) {
req, err := http.NewRequest(method, c.url+url, strings.NewReader(data)) url = fmt.Sprintf("%s/api%s", c.url, url)
req, err := http.NewRequest(method, url, strings.NewReader(data))
if err != nil { if err != nil {
return nil, err return nil, err
} }
@@ -264,7 +267,7 @@ func InitGateway(url, username, password string) *Gateway {
// region gateway:/port/configs // region gateway:/port/configs
type PortConfigsReq struct { type PortConfigsReq struct {
Port string `json:"port"` Port int `json:"port"`
Edge []string `json:"edge,omitempty"` Edge []string `json:"edge,omitempty"`
Type string `json:"type,omitempty"` Type string `json:"type,omitempty"`
Time int `json:"time,omitempty"` Time int `json:"time,omitempty"`
@@ -301,15 +304,15 @@ func (c *Gateway) GatewayPortConfigs(params []PortConfigsReq) error {
_ = Body.Close() _ = Body.Close()
}(resp.Body) }(resp.Body)
if resp.StatusCode != http.StatusOK {
return errors.New("failed to configure port")
}
body, err := io.ReadAll(resp.Body) body, err := io.ReadAll(resp.Body)
if err != nil { if err != nil {
return err return err
} }
if resp.StatusCode != http.StatusOK {
return errors.New("failed to get port configs: " + string(body))
}
var result map[string]any var result map[string]any
err = json.Unmarshal(body, &result) err = json.Unmarshal(body, &result)
if err != nil { if err != nil {
@@ -398,7 +401,8 @@ func (c *Gateway) requestGateway(method string, url string, data any) (*http.Res
return nil, err return nil, err
} }
req, err := http.NewRequest(method, c.username+":"+c.password+"@"+c.url+url, strings.NewReader(string(jsonData))) url = fmt.Sprintf("http://%s:%s@%s:9990%s", c.username, c.password, c.url, url)
req, err := http.NewRequest(method, url, strings.NewReader(string(jsonData)))
if err != nil { if err != nil {
return nil, err return nil, err
} }

View File

@@ -563,12 +563,12 @@ comment on column product.deleted_at is '删除时间';
drop table if exists resource cascade; drop table if exists resource cascade;
create table resource ( create table resource (
id serial primary key, id serial primary key,
user_id int not null references "user" (id) user_id int not null references "user" (id)
on update cascade on update cascade
on delete cascade, on delete cascade,
active bool default true, active bool not null default true,
created_at timestamp default current_timestamp, created_at timestamp default current_timestamp,
updated_at timestamp default current_timestamp, updated_at timestamp default current_timestamp,
deleted_at timestamp deleted_at timestamp
); );
create index resource_user_id_index on resource (user_id); create index resource_user_id_index on resource (user_id);
@@ -595,11 +595,11 @@ create table resource_pss (
quota int, quota int,
used int, used int,
expire timestamp, expire timestamp,
daily_limit int, daily_limit int not null default 0,
daily_used int, daily_used int not null default 0,
daily_last timestamp, daily_last timestamp,
created_at timestamp default current_timestamp, created_at timestamp default current_timestamp,
updated_at timestamp default current_timestamp, updated_at timestamp default current_timestamp,
deleted_at timestamp deleted_at timestamp
); );
create index resource_pss_resource_id_index on resource_pss (resource_id); create index resource_pss_resource_id_index on resource_pss (resource_id);

View File

@@ -1,5 +1,44 @@
GET http://api:123456@110.40.82.248:9990/port/active ### remote 令牌
GET http://110.40.82.250:18702/server/index/getToken/key/juipbyjdapiverify
### remote 配置信息
GET http://103.139.212.110:9989/api/auto_query
token: tHDarLc1ct6M9NMAxeO98lN2YsEadYSx.anVpcA==.MTc0MzA4MTAwMQ==
### gateway 端口信息
GET http://api:123456@110.40.82.248:9990/port/active/
### remote 配置重置
POST http://103.139.212.110:9989/api/connect
token: tHDarLc1ct6M9NMAxeO98lN2YsEadYSx.anVpcA==.MTc0MzA4MTAwMQ==
Content-Type: application/json
{
"uuid": "7a17e8b4-cdc3-4500-bf16-4a665991a7f6",
"auto_config": [
{
"count": 200
}
]
}
### 密码模式代理
POST http://localhost:8080/api/channel/create
Authorization: Bearer b21568ed-09a9-4f1c-add6-d6b24bde7473
Content-Type: application/json
Accept: application/json Accept: application/json
### {
"resource_id": 1,
"protocol": "http",
"auth_type": 1,
"count": 200,
"prov": "",
"city": "",
"isp": "",
"result_type": "text",
"result_separator": "both"
}
### 白名单模式代理

View File

@@ -19,8 +19,8 @@ type CreateChannelReq struct {
Prov string `json:"prov" validate:"required"` Prov string `json:"prov" validate:"required"`
City string `json:"city" validate:"required"` City string `json:"city" validate:"required"`
Isp string `json:"isp" validate:"required"` Isp string `json:"isp" validate:"required"`
ResultType CreateChannelResultType `json:"return_type" validate:"required,oneof=json text"` ResultType CreateChannelResultType `json:"result_type" validate:"required,oneof=json text"`
ResultSeparator CreateChannelResultSeparator `json:"return_separator" validate:"required,oneof=enter line both tab"` ResultSeparator CreateChannelResultSeparator `json:"result_separator" validate:"required,oneof=enter line both tab"`
} }
func CreateChannel(c *fiber.Ctx) error { func CreateChannel(c *fiber.Ctx) error {
@@ -30,12 +30,12 @@ func CreateChannel(c *fiber.Ctx) error {
} }
// 建立连接通道 // 建立连接通道
auth, ok := c.Locals("user").(*services.AuthContext) auth, ok := c.Locals("auth").(*services.AuthContext)
if !ok { if !ok {
return errors.New("user not found") return errors.New("user not found")
} }
channels, err := services.Channel.RemoteCreateChannel( assigns, err := services.Channel.RemoteCreateChannel(
c.Context(), c.Context(),
auth, auth,
req.ResourceId, req.ResourceId,
@@ -54,9 +54,13 @@ func CreateChannel(c *fiber.Ctx) error {
// 返回连接通道列表 // 返回连接通道列表
var result []string var result []string
for _, channel := range channels { for _, assign := range assigns {
url := fmt.Sprintf("%s://%s:%d", channel.Protocol, channel.UserHost, channel.ProxyPort) var proxy = assign.Proxy
result = append(result, url) var channels = assign.Channels
for _, channel := range channels {
url := fmt.Sprintf("%s://%s:%d", channel.Protocol, proxy.Host, channel.ProxyPort)
result = append(result, url)
}
} }
switch req.ResultType { switch req.ResultType {

View File

@@ -16,7 +16,7 @@ const TableNameResource = "resource"
type Resource struct { type Resource struct {
ID int32 `gorm:"column:id;primaryKey;autoIncrement:true;comment:套餐ID" json:"id"` // 套餐ID ID int32 `gorm:"column:id;primaryKey;autoIncrement:true;comment:套餐ID" json:"id"` // 套餐ID
UserID int32 `gorm:"column:user_id;not null;comment:用户ID" json:"user_id"` // 用户ID UserID int32 `gorm:"column:user_id;not null;comment:用户ID" json:"user_id"` // 用户ID
Active bool `gorm:"column:active;default:true;comment:套餐状态" json:"active"` // 套餐状态 Active bool `gorm:"column:active;not null;default:true;comment:套餐状态" json:"active"` // 套餐状态
CreatedAt time.Time `gorm:"column:created_at;default:CURRENT_TIMESTAMP;comment:创建时间" json:"created_at"` // 创建时间 CreatedAt time.Time `gorm:"column:created_at;default:CURRENT_TIMESTAMP;comment:创建时间" json:"created_at"` // 创建时间
UpdatedAt time.Time `gorm:"column:updated_at;default:CURRENT_TIMESTAMP;comment:更新时间" json:"updated_at"` // 更新时间 UpdatedAt time.Time `gorm:"column:updated_at;default:CURRENT_TIMESTAMP;comment:更新时间" json:"updated_at"` // 更新时间
DeletedAt gorm.DeletedAt `gorm:"column:deleted_at;comment:删除时间" json:"deleted_at"` // 删除时间 DeletedAt gorm.DeletedAt `gorm:"column:deleted_at;comment:删除时间" json:"deleted_at"` // 删除时间

View File

@@ -21,8 +21,8 @@ type ResourcePss struct {
Quota int32 `gorm:"column:quota;comment:配额数量" json:"quota"` // 配额数量 Quota int32 `gorm:"column:quota;comment:配额数量" json:"quota"` // 配额数量
Used int32 `gorm:"column:used;comment:已用数量" json:"used"` // 已用数量 Used int32 `gorm:"column:used;comment:已用数量" json:"used"` // 已用数量
Expire time.Time `gorm:"column:expire;comment:过期时间" json:"expire"` // 过期时间 Expire time.Time `gorm:"column:expire;comment:过期时间" json:"expire"` // 过期时间
DailyLimit int32 `gorm:"column:daily_limit;comment:每日限制" json:"daily_limit"` // 每日限制 DailyLimit int32 `gorm:"column:daily_limit;not null;comment:每日限制" json:"daily_limit"` // 每日限制
DailyUsed int32 `gorm:"column:daily_used;comment:今日已用数量" json:"daily_used"` // 今日已用数量 DailyUsed int32 `gorm:"column:daily_used;not null;comment:今日已用数量" json:"daily_used"` // 今日已用数量
DailyLast time.Time `gorm:"column:daily_last;comment:今日最后使用时间" json:"daily_last"` // 今日最后使用时间 DailyLast time.Time `gorm:"column:daily_last;comment:今日最后使用时间" json:"daily_last"` // 今日最后使用时间
CreatedAt time.Time `gorm:"column:created_at;default:CURRENT_TIMESTAMP;comment:创建时间" json:"created_at"` // 创建时间 CreatedAt time.Time `gorm:"column:created_at;default:CURRENT_TIMESTAMP;comment:创建时间" json:"created_at"` // 创建时间
UpdatedAt time.Time `gorm:"column:updated_at;default:CURRENT_TIMESTAMP;comment:更新时间" json:"updated_at"` // 更新时间 UpdatedAt time.Time `gorm:"column:updated_at;default:CURRENT_TIMESTAMP;comment:更新时间" json:"updated_at"` // 更新时间

View File

@@ -16,6 +16,6 @@ func ApplyRouters(app *fiber.App) {
auth.Post("/token", handlers.Token) auth.Post("/token", handlers.Token)
// 通道 // 通道
channel := api.Group("/channel", PermitUser()) channel := api.Group("/channel")
channel.Post("/create", handlers.CreateChannel) channel.Post("/create", PermitUser(), handlers.CreateChannel)
} }

View File

@@ -2,6 +2,7 @@ package services
import ( import (
"context" "context"
"encoding/json"
"errors" "errors"
"fmt" "fmt"
"log/slog" "log/slog"
@@ -12,7 +13,6 @@ import (
"platform/web/common" "platform/web/common"
"platform/web/models" "platform/web/models"
q "platform/web/queries" q "platform/web/queries"
"strconv"
"time" "time"
"github.com/google/uuid" "github.com/google/uuid"
@@ -41,10 +41,7 @@ func (s *channelService) CreateChannel(
var channels []*models.Channel var channels []*models.Channel
err := q.Q.Transaction(func(tx *q.Query) error { err := q.Q.Transaction(func(tx *q.Query) error {
// 查找套餐 // 查找套餐
var resource = struct { var resource = ResourceInfo{}
data models.Resource
pss models.ResourcePss
}{}
err := q.Resource.As("data"). err := q.Resource.As("data").
LeftJoin(q.ResourcePss.As("pss"), q.ResourcePss.ResourceID.EqCol(q.Resource.ID)). LeftJoin(q.ResourcePss.As("pss"), q.ResourcePss.ResourceID.EqCol(q.Resource.ID)).
Where(q.Resource.ID.Eq(resourceId)). Where(q.Resource.ID.Eq(resourceId)).
@@ -57,29 +54,29 @@ func (s *channelService) CreateChannel(
} }
// 检查使用人 // 检查使用人
if auth.Payload.Type == PayloadUser && auth.Payload.Id != resource.data.UserID { if auth.Payload.Type == PayloadUser && auth.Payload.Id != resource.UserId {
return common.AuthForbiddenErr("无权限访问") return common.AuthForbiddenErr("无权限访问")
} }
// 检查套餐状态 // 检查套餐状态
if !resource.data.Active { if !resource.Active {
return ChannelServiceErr("套餐已失效") return ChannelServiceErr("套餐已失效")
} }
// 检查每日限额 // 检查每日限额
today := time.Now().Format("2006-01-02") == resource.pss.DailyLast.Format("2006-01-02") today := time.Now().Format("2006-01-02") == resource.DailyLast.Format("2006-01-02")
dailyRemain := int(math.Max(float64(resource.pss.DailyLimit-resource.pss.DailyUsed), 0)) dailyRemain := int(math.Max(float64(resource.DailyLimit-resource.DailyUsed), 0))
if today && dailyRemain < count { if today && dailyRemain < count {
return ChannelServiceErr("套餐每日配额不足") return ChannelServiceErr("套餐每日配额不足")
} }
// 检查时间或配额 // 检查时间或配额
if resource.pss.Type == 1 { // 包时 if resource.Type == 1 { // 包时
if resource.pss.Expire.Before(time.Now()) { if resource.Expire.Before(time.Now()) {
return ChannelServiceErr("套餐已过期") return ChannelServiceErr("套餐已过期")
} }
} else { // 包量 } else { // 包量
remain := int(math.Max(float64(resource.pss.Quota-resource.pss.Used), 0)) remain := int(math.Max(float64(resource.Quota-resource.Used), 0))
if remain < count { if remain < count {
return ChannelServiceErr("套餐配额不足") return ChannelServiceErr("套餐配额不足")
} }
@@ -115,7 +112,7 @@ func (s *channelService) CreateChannel(
AuthPass: authType == ChannelAuthTypePass, AuthPass: authType == ChannelAuthTypePass,
Username: username, Username: username,
Password: password, Password: password,
Expiration: time.Now().Add(time.Duration(resource.pss.Live) * time.Second), Expiration: time.Now().Add(time.Duration(resource.Live) * time.Second),
}) })
} }
} }
@@ -128,18 +125,24 @@ func (s *channelService) CreateChannel(
// 更新套餐使用记录 // 更新套餐使用记录
if today { if today {
resource.pss.DailyUsed += int32(count) resource.DailyUsed += int32(count)
resource.pss.Used += int32(count) resource.Used += int32(count)
} else { } else {
resource.pss.DailyLast = time.Now() resource.DailyLast = time.Now()
resource.pss.DailyUsed = int32(count) resource.DailyUsed = int32(count)
resource.pss.Used += int32(count) resource.Used += int32(count)
} }
err = tx.ResourcePss. err = tx.ResourcePss.
Where(q.ResourcePss.ID.Eq(resource.pss.ID)). Where(q.ResourcePss.ID.Eq(resource.Id)).
Omit(q.ResourcePss.ResourceID). Select(
Save(&resource.pss) q.ResourcePss.Used,
q.ResourcePss.DailyUsed,
q.ResourcePss.DailyLast).
Save(&models.ResourcePss{
Used: resource.Used,
DailyUsed: resource.DailyUsed,
DailyLast: resource.DailyLast})
if err != nil { if err != nil {
return err return err
} }
@@ -151,10 +154,10 @@ func (s *channelService) CreateChannel(
} }
// 缓存通道信息与异步删除任务 // 缓存通道信息与异步删除任务
err = cache(ctx, channels) // err = cache(ctx, channels)
if err != nil { // if err != nil {
return nil, err // return nil, err
} // }
// 返回连接通道列表 // 返回连接通道列表
return channels, errors.New("not implemented") return channels, errors.New("not implemented")
@@ -234,10 +237,6 @@ func (s *channelService) RemoveChannels(ctx context.Context, auth *AuthContext,
return nil return nil
} }
func chKey(channel *models.Channel) string {
return fmt.Sprintf("channel:%s:%s", channel.UserHost, channel.NodeHost)
}
type ChannelServiceErr string type ChannelServiceErr string
func (c ChannelServiceErr) Error() string { func (c ChannelServiceErr) Error() string {
@@ -254,7 +253,7 @@ func (s *channelService) RemoteCreateChannel(
authType ChannelAuthType, authType ChannelAuthType,
count int, count int,
nodeFilter ...NodeFilterConfig, nodeFilter ...NodeFilterConfig,
) ([]*models.Channel, error) { ) ([]AssignPortResult, error) {
filter := NodeFilterConfig{} filter := NodeFilterConfig{}
if len(nodeFilter) > 0 { if len(nodeFilter) > 0 {
@@ -265,8 +264,11 @@ func (s *channelService) RemoteCreateChannel(
var resource = new(ResourceInfo) var resource = new(ResourceInfo)
data := q.Resource.As("data") data := q.Resource.As("data")
pss := q.ResourcePss.As("pss") pss := q.ResourcePss.As("pss")
err := data.Scopes(orm.Alias(data)). err := data.Debug().Scopes(orm.Alias(data)).
Select(data.ALL, pss.ALL). Select(
data.ID, data.UserID, data.Active,
pss.Type, pss.Live, pss.DailyUsed, pss.DailyLimit, pss.DailyLast, pss.Quota, pss.Used, pss.Expire,
).
LeftJoin(q.ResourcePss.As("pss"), pss.ResourceID.EqCol(data.ID)). LeftJoin(q.ResourcePss.As("pss"), pss.ResourceID.EqCol(data.ID)).
Where(data.ID.Eq(resourceId)). Where(data.ID.Eq(resourceId)).
Scan(&resource) Scan(&resource)
@@ -282,27 +284,32 @@ func (s *channelService) RemoteCreateChannel(
if err != nil { if err != nil {
return nil, err return nil, err
} }
slog.Debug("检查用户权限完成")
// 申请节点 // 申请节点
assigned, err := assignEdge(count, filter) edgeAssigns, err := assignEdge(count, filter)
if err != nil { if err != nil {
return nil, err return nil, err
} }
debugAssigned := fmt.Sprintf("%+v", edgeAssigns)
slog.Debug("申请节点完成", "edgeAssigns", debugAssigned)
// 分配端口 // 分配端口
expiration := time.Now().Add(time.Duration(resource.pss.Live) * time.Second) expiration := time.Now().Add(time.Duration(resource.Live) * time.Second)
channels, err := assignPort(assigned, auth.Payload.Id, protocol, authType, expiration, filter) portAssigns, err := assignPort(edgeAssigns, auth.Payload.Id, protocol, authType, expiration, filter)
if err != nil { if err != nil {
return nil, err return nil, err
} }
debugChannels := fmt.Sprintf("%+v", portAssigns)
slog.Debug("分配端口完成", "portAssigns", debugChannels)
// 缓存并关闭代理 // 缓存并关闭代理
err = cache(ctx, channels) err = cache(ctx, portAssigns)
if err != nil { if err != nil {
return nil, err return nil, err
} }
return channels, nil return portAssigns, nil
} }
// endregion // endregion
@@ -310,29 +317,29 @@ func (s *channelService) RemoteCreateChannel(
func checkUser(auth *AuthContext, resource *ResourceInfo, count int) error { func checkUser(auth *AuthContext, resource *ResourceInfo, count int) error {
// 检查使用人 // 检查使用人
if auth.Payload.Type == PayloadUser && auth.Payload.Id != resource.data.UserID { if auth.Payload.Type == PayloadUser && auth.Payload.Id != resource.UserId {
return common.AuthForbiddenErr("无权限访问") return common.AuthForbiddenErr("无权限访问")
} }
// 检查套餐状态 // 检查套餐状态
if !resource.data.Active { if !resource.Active {
return ChannelServiceErr("套餐已失效") return ChannelServiceErr("套餐已失效")
} }
// 检查每日限额 // 检查每日限额
today := time.Now().Format("2006-01-02") == resource.pss.DailyLast.Format("2006-01-02") today := time.Now().Format("2006-01-02") == resource.DailyLast.Format("2006-01-02")
dailyRemain := int(math.Max(float64(resource.pss.DailyLimit-resource.pss.DailyUsed), 0)) dailyRemain := int(math.Max(float64(resource.DailyLimit-resource.DailyUsed), 0))
if today && dailyRemain < count { if today && dailyRemain < count {
return ChannelServiceErr("套餐每日配额不足") return ChannelServiceErr("套餐每日配额不足")
} }
// 检查时间或配额 // 检查时间或配额
if resource.pss.Type == 1 { // 包时 if resource.Type == 1 { // 包时
if resource.pss.Expire.Before(time.Now()) { if resource.Expire.Before(time.Now()) {
return ChannelServiceErr("套餐已过期") return ChannelServiceErr("套餐已过期")
} }
} else { // 包量 } else { // 包量
remain := int(math.Max(float64(resource.pss.Quota-resource.pss.Used), 0)) remain := int(math.Max(float64(resource.Quota-resource.Used), 0))
if remain < count { if remain < count {
return ChannelServiceErr("套餐配额不足") return ChannelServiceErr("套餐配额不足")
} }
@@ -342,7 +349,7 @@ func checkUser(auth *AuthContext, resource *ResourceInfo, count int) error {
} }
// assignEdge 分配边缘节点数量 // assignEdge 分配边缘节点数量
func assignEdge(count int, filter NodeFilterConfig) ([]AssignEdgeResult, error) { func assignEdge(count int, filter NodeFilterConfig) ([]*AssignEdgeResult, error) {
// 查询现有节点连接情况 // 查询现有节点连接情况
edgeConfigs, err := remote.Client.CloudAutoQuery() edgeConfigs, err := remote.Client.CloudAutoQuery()
if err != nil { if err != nil {
@@ -356,46 +363,59 @@ func assignEdge(count int, filter NodeFilterConfig) ([]AssignEdgeResult, error)
return nil, err return nil, err
} }
// 尽量平均分配节点用量 // 过滤需要变动的连接配置
var total = count type ConfigInfo struct {
for _, v := range edgeConfigs { proxy *models.Proxy
total += v.Count config *remote.AutoConfig
} }
avg := int(math.Ceil(float64(total) / float64(len(edgeConfigs)))) var total = count
var assigns = make([]*AssignEdgeResult, len(proxies), len(proxies))
var result []AssignEdgeResult for i, proxy := range proxies {
var rCount = 0 remoteConfigs := edgeConfigs[proxy.Name]
for _, proxy := range proxies { for _, config := range remoteConfigs {
prev, ok := edgeConfigs[proxy.Name] if config.Isp == filter.Isp && config.City == filter.City && config.Province == filter.Prov {
var nextCount = 0 total += config.Count
if !ok || (prev.Count < avg && prev.Count < total) { assigns[i] = &AssignEdgeResult{
nextCount = int(math.Min(float64(avg), float64(total))) proxy: proxy,
result = append(result, AssignEdgeResult{ count: config.Count,
}
}
}
if assigns[i] == nil {
assigns[i] = &AssignEdgeResult{
proxy: proxy, proxy: proxy,
count: nextCount - prev.Count, count: 0,
}) }
total -= nextCount }
}
avg := int(math.Ceil(float64(total) / float64(len(proxies))))
for i, assign := range assigns {
var prev = assign.count
var next = assign.count
if prev < avg && prev < total {
next = int(math.Min(float64(avg), float64(total)))
assigns[i].count = next - prev
total -= next
} else { } else {
continue continue
} }
_rCount, err := remote.Client.CloudConnect(remote.CloudConnectReq{ // err := remote.Client.CloudConnect(remote.CloudConnectReq{
Uuid: proxy.Name, // Uuid: assign.Proxy.Name,
Edge: nil, // Edge: nil,
AutoConfig: []remote.AutoConfig{{ // AutoConfig: []remote.AutoConfig{{
Province: filter.Prov, // Province: filter.Prov,
City: filter.City, // City: filter.City,
Isp: filter.Isp, // Isp: filter.Isp,
Count: nextCount, // Count: next,
}}, // }},
}) // })
if err != nil { // if err != nil {
return nil, err // return nil, err
} // }
rCount += _rCount
} }
slog.Debug("cloud connect", "count", rCount)
return result, nil return assigns, nil
} }
type AssignEdgeResult struct { type AssignEdgeResult struct {
@@ -405,13 +425,13 @@ type AssignEdgeResult struct {
// assignPort 分配指定数量的端口 // assignPort 分配指定数量的端口
func assignPort( func assignPort(
assigns []AssignEdgeResult, assigns []*AssignEdgeResult,
userId int32, userId int32,
protocol ChannelProtocol, protocol ChannelProtocol,
authType ChannelAuthType, authType ChannelAuthType,
expiration time.Time, expiration time.Time,
filter NodeFilterConfig, filter NodeFilterConfig,
) ([]*models.Channel, error) { ) ([]AssignPortResult, error) {
// 查询代理已配置端口 // 查询代理已配置端口
var proxyIds = make([]int32, 0, len(assigns)) var proxyIds = make([]int32, 0, len(assigns))
for _, assigned := range assigns { for _, assigned := range assigns {
@@ -425,7 +445,8 @@ func assignPort(
q.Channel.ProxyID.In(proxyIds...), q.Channel.ProxyID.In(proxyIds...),
q.Channel.Expiration.Gt(time.Now())). q.Channel.Expiration.Gt(time.Now())).
Group( Group(
q.Channel.ProxyPort). q.Channel.ProxyPort,
q.Channel.ProxyID).
Find() Find()
if err != nil { if err != nil {
return nil, err return nil, err
@@ -439,14 +460,19 @@ func assignPort(
} }
// 配置启用代理 // 配置启用代理
var result []*models.Channel var result = make([]AssignPortResult, len(assigns))
for i := 0; i < len(assigns); i++ { for i, assign := range assigns {
proxy := assigns[i].proxy var proxy = assign.proxy
count := assigns[i].count var count = assign.count
result[i] = AssignPortResult{
Proxy: proxy,
}
// 筛选可用端口 // 筛选可用端口
var portConfigs = make([]remote.PortConfigsReq, count) var channels = result[i].Channels
for port := 10000; port < 20000 || len(portConfigs) < count; port++ { var configs = make([]remote.PortConfigsReq, 0, count)
for port := 10000; port < 20000 && len(configs) < count; port++ {
// 跳过存在的端口 // 跳过存在的端口
key := uint64(proxy.ID)<<32 | uint64(port) key := uint64(proxy.ID)<<32 | uint64(port)
_, ok := proxyPorts[key] _, ok := proxyPorts[key]
@@ -455,8 +481,9 @@ func assignPort(
} }
// 配置新端口 // 配置新端口
portConfigs[port] = remote.PortConfigsReq{ var i = len(configs)
Port: strconv.Itoa(port), configs = append(configs, remote.PortConfigsReq{
Port: port,
Edge: nil, Edge: nil,
Status: true, Status: true,
AutoEdgeConfig: remote.AutoEdgeConfig{ AutoEdgeConfig: remote.AutoEdgeConfig{
@@ -465,7 +492,7 @@ func assignPort(
Isp: filter.Isp, Isp: filter.Isp,
Count: 1, Count: 1,
}, },
} })
switch authType { switch authType {
case ChannelAuthTypeIp: case ChannelAuthTypeIp:
var whitelist []string var whitelist []string
@@ -476,9 +503,9 @@ func assignPort(
if err != nil { if err != nil {
return nil, err return nil, err
} }
portConfigs[port].Whitelist = whitelist configs[i].Whitelist = whitelist
for _, item := range whitelist { for _, item := range whitelist {
result = append(result, &models.Channel{ channels = append(channels, &models.Channel{
UserID: userId, UserID: userId,
ProxyID: proxy.ID, ProxyID: proxy.ID,
UserHost: item, UserHost: item,
@@ -491,8 +518,8 @@ func assignPort(
} }
case ChannelAuthTypePass: case ChannelAuthTypePass:
username, password := genPassPair() username, password := genPassPair()
portConfigs[port].Userpass = fmt.Sprintf("%s:%s", username, password) configs[i].Userpass = fmt.Sprintf("%s:%s", username, password)
result = append(result, &models.Channel{ channels = append(channels, &models.Channel{
UserID: userId, UserID: userId,
ProxyID: proxy.ID, ProxyID: proxy.ID,
ProxyPort: int32(port), ProxyPort: int32(port),
@@ -506,38 +533,64 @@ func assignPort(
} }
} }
result[i].Channels = channels
if len(configs) < count {
return nil, ChannelServiceErr("网关端口数量到达上限,无法分配")
}
// 提交端口配置 // 提交端口配置
gateway := remote.InitGateway( // gateway := remote.InitGateway(
proxy.Host, // proxy.Host,
"api", // "api",
"123456", // "123456",
) // )
err = gateway.GatewayPortConfigs(portConfigs) // err = gateway.GatewayPortConfigs(configs)
// if err != nil {
// return nil, err
// }
// 保存到数据库
err = q.Channel.
Omit(q.Channel.NodeID).
Save(channels...)
if err != nil { if err != nil {
return nil, err return nil, err
} }
} }
err = q.Channel.Save(result...)
if err != nil {
return nil, err
}
return result, nil return result, nil
} }
func cache(ctx context.Context, channels []*models.Channel) error { type AssignPortResult struct {
Proxy *models.Proxy
Channels []*models.Channel
}
func chKey(channel *models.Channel) string {
return fmt.Sprintf("channel:%s:%s", channel.UserHost, channel.NodeHost)
}
func cache(ctx context.Context, assigns []AssignPortResult) error {
pipe := rds.Client.TxPipeline() pipe := rds.Client.TxPipeline()
zList := make([]redis.Z, 0, len(channels)) zList := make([]redis.Z, 0, len(assigns))
for _, channel := range channels { for _, assign := range assigns {
pipe.Set(ctx, chKey(channel), channel, channel.Expiration.Sub(time.Now())) var channels = assign.Channels
zList = append(zList, redis.Z{ for _, channel := range channels {
Score: float64(channel.Expiration.Unix()), marshal, err := json.Marshal(assign)
Member: channel.ID, if err != nil {
}) return err
}
pipe.Set(ctx, chKey(channel), string(marshal), channel.Expiration.Sub(time.Now()))
zList = append(zList, redis.Z{
Score: float64(channel.Expiration.Unix()),
Member: channel.ID,
})
}
} }
pipe.ZAdd(ctx, "tasks:channel", zList...) pipe.ZAdd(ctx, "tasks:assign", zList...)
_, err := pipe.Exec(ctx) _, err := pipe.Exec(ctx)
if err != nil { if err != nil {
@@ -549,7 +602,7 @@ func cache(ctx context.Context, channels []*models.Channel) error {
func deleteCache(ctx context.Context, channels []*models.Channel) error { func deleteCache(ctx context.Context, channels []*models.Channel) error {
pipe := rds.Client.TxPipeline() pipe := rds.Client.TxPipeline()
keys := make([]string, len(channels)) keys := make([]string, 0, len(channels))
for i := range keys { for i := range keys {
keys[i] = chKey(channels[i]) keys[i] = chKey(channels[i])
} }
@@ -564,6 +617,15 @@ func deleteCache(ctx context.Context, channels []*models.Channel) error {
} }
type ResourceInfo struct { type ResourceInfo struct {
data models.Resource Id int32
pss models.ResourcePss UserId int32
Active bool
Type int32
Live int32
DailyLimit int32
DailyUsed int32
DailyLast time.Time
Quota int32
Used int32
Expire time.Time
} }