diff --git a/README.md b/README.md index 8d5a1cc..412689d 100644 --- a/README.md +++ b/README.md @@ -3,9 +3,9 @@ 核心流程: - [x] 注册与登录 - - [ ] 对接短信接口 - - [ ] 人机风险分级验证 - - [ ] jwt 签发 + - [ ] 对接短信接口 + - [ ] 人机风险分级验证 + - [ ] jwt 签发 - [x] 鉴权 - [ ] 实名认证 - [ ] 充值余额 @@ -19,6 +19,8 @@ - [ ] Limiter - [ ] Compress +现在的节点分配逻辑是,每个 user_host:node_port 组算一个分配数,考虑是否改成每个用户算一个分配数 + 考虑将鉴权逻辑放到 handler 里,统一动静态鉴权以及解耦服务层 有些地方在用手动事务,有时间改成自动事务 @@ -62,4 +64,13 @@ captcha_id 关联用户本机信息,实现验证码设备绑定(或者其他 ### 外部服务 服务器ip 110.40.82.248 -api账密:api:123456 \ No newline at end of file +api账密:api:123456 + +## 业务逻辑 + +### 动态 ip 淘汰策略 + +1. 用户请求节点 +2. 查询对应条件下的节点池 +3. 选择对应数量的节点,发起连接 +4. diff --git a/cmd/fill/main.go b/cmd/fill/main.go new file mode 100644 index 0000000..ceed466 --- /dev/null +++ b/cmd/fill/main.go @@ -0,0 +1,54 @@ +package main + +import ( + "log/slog" + "platform/pkg/env" + "platform/pkg/logs" + "platform/pkg/orm" + "platform/web/models" + q "platform/web/queries" +) + +func main() { + env.Init() + logs.Init() + orm.Init() + + q.User.Select( + q.User.Phone, + ).Create(&models.User{ + Phone: "12312341234", + }) + + q.Proxy.Select( + q.Proxy.Version, + q.Proxy.Name, + q.Proxy.Host, + q.Proxy.Type, + ).Create(&models.Proxy{ + Version: 1, + Name: "7a17e8b4-cdc3-4500-bf16-4a665991a7f6", + Host: "110.40.82.248", + Type: 1, + }) + + q.Node.Select( + q.Node.Version, + q.Node.Name, + q.Node.Host, + q.Node.Isp, + q.Node.Prov, + q.Node.City, + q.Node.Status, + ).Create(&models.Node{ + Version: 1, + Name: "test-node", + Host: "123", + Isp: "test-isp", + Prov: "test-prov", + City: "test-city", + Status: 1, + }) + + slog.Info("✔ Data inserted successfully") +} diff --git a/cmd/playground/main.go b/cmd/playground/main.go index 4c637bc..c4c8328 100644 --- a/cmd/playground/main.go +++ b/cmd/playground/main.go @@ -1,7 +1,5 @@ package main -import "encoding/base64" - func main() { - println(base64.URLEncoding.EncodeToString([]byte("app:123456"))) + println(string(rune(122))) } diff --git a/pkg/orm/orm.go b/pkg/orm/orm.go index ccdd8fc..04dbc91 100644 --- a/pkg/orm/orm.go +++ b/pkg/orm/orm.go @@ -6,6 +6,8 @@ import ( "platform/pkg/env" "platform/web/queries" + "gorm.io/gen" + "gorm.io/gen/field" "gorm.io/gorm" "gorm.io/gorm/schema" ) @@ -44,3 +46,13 @@ func Init() { DB = db } + +type WithAlias interface { + Alias() string +} + +func Alias(model WithAlias) func(db gen.Dao) gen.Dao { + return func(db gen.Dao) gen.Dao { + return db.Unscoped().Where(field.NewBool(model.Alias(), "deleted_at").IsNull()) + } +} diff --git a/pkg/remote/remote.go b/pkg/remote/remote.go index 2b32db0..0647482 100644 --- a/pkg/remote/remote.go +++ b/pkg/remote/remote.go @@ -5,15 +5,13 @@ import ( "errors" "io" "net/http" + "strconv" "strings" ) type client struct { - gatewayUrl string - username string - password string - cloudUrl string - token string + url string + token string } var Client client @@ -21,23 +19,260 @@ var Client client func Init() { // todo 从环境变量中获取参数 Client = client{ - gatewayUrl: "http://110.40.82.248:9990", - username: "api", - password: "123456", - cloudUrl: "http://103.139.212.110", + url: "http://103.139.212.110", } } -type PortConfig struct { +type AutoConfig struct { + Province string `json:"province"` + City string `json:"city"` + Isp string `json:"isp"` + Count int `json:"count"` +} + +// region cloud:/edges + +type CloudEdgesReq struct { + Province string + City string + Isp string + Offset int + Limit int +} + +type CloudEdgesResp struct { + Edges []Edge `json:"edges"` + Total int `json:"total"` + Offset int `json:"offset"` + Limit int `json:"limit"` +} + +type Edge struct { + EdgesId int `json:"edges_id"` + Province string `json:"province"` + City string `json:"city"` + Isp string `json:"isp"` + Ip string `json:"ip"` + Rtt int `json:"rtt"` + PacketLoss int `json:"packet_loss"` +} + +func (c *client) CloudEdges(param CloudEdgesReq) (*CloudEdgesResp, error) { + data := strings.Builder{} + data.WriteString("province=") + data.WriteString(param.Province) + data.WriteString("&city=") + data.WriteString(param.City) + data.WriteString("&isp=") + data.WriteString(param.Isp) + data.WriteString("&offset=") + data.WriteString(strconv.Itoa(param.Offset)) + data.WriteString("&limit=") + data.WriteString(strconv.Itoa(param.Limit)) + + resp, err := c.requestCloud("GET", "/edges?"+data.String(), "") + if err != nil { + return nil, err + } + defer func(Body io.ReadCloser) { + _ = Body.Close() + }(resp.Body) + + if resp.StatusCode != http.StatusOK { + return nil, errors.New("failed to get edges") + } + + body, err := io.ReadAll(resp.Body) + if err != nil { + return nil, err + } + + var result CloudEdgesResp + err = json.Unmarshal(body, &result) + if err != nil { + return nil, err + } + + return &result, nil +} + +// endregion + +// region cloud:/connect + +type CloudConnectReq struct { + Uuid string `json:"uuid"` + Edge []string `json:"edge,omitempty"` + AutoConfig []AutoConfig `json:"auto_config,omitempty"` +} + +func (c *client) CloudConnect(param CloudConnectReq) (int, error) { + data, err := json.Marshal(param) + if err != nil { + return 0, err + } + + resp, err := c.requestCloud("POST", "/connect", string(data)) + if err != nil { + return 0, err + } + + defer func(Body io.ReadCloser) { + _ = Body.Close() + }(resp.Body) + + if resp.StatusCode != http.StatusOK { + return 0, errors.New("failed to connect") + } + + body, err := io.ReadAll(resp.Body) + if err != nil { + return 0, err + } + + var result map[string]any + err = json.Unmarshal(body, &result) + if err != nil { + return 0, err + } + + if result["status"] == "error" { + return 0, errors.New(result["details"].(string)) + } + + return int(result["connected_edges"].(float64)), nil +} + +// endregion + +// region cloud:/disconnect + +type CloudDisconnectReq struct { + Uuid string `json:"uuid"` + Edge []string `json:"edge"` + Config []Config `json:"auto_config"` +} + +type Config struct { + Province string `json:"province"` + City string `json:"city"` + Isp string `json:"isp"` + Count int `json:"count"` + Online bool `json:"online"` +} + +func (c *client) CloudDisconnect(param CloudDisconnectReq) (int, error) { + data, err := json.Marshal(param) + if err != nil { + return 0, err + } + + resp, err := c.requestCloud("POST", "/disconnect", string(data)) + if err != nil { + return 0, err + } + + defer func(Body io.ReadCloser) { + _ = Body.Close() + }(resp.Body) + + if resp.StatusCode != http.StatusOK { + return 0, errors.New("failed to disconnect") + } + + body, err := io.ReadAll(resp.Body) + if err != nil { + return 0, err + } + + var result map[string]any + err = json.Unmarshal(body, &result) + if err != nil { + return 0, err + } + + if result["status"] == "error" { + return 0, errors.New(result["details"].(string)) + } + + return int(result["disconnected_edges"].(float64)), nil +} + +// endregion + +// region cloud:/auto_query + +type CloudConnectResp map[string]AutoConfig + +func (c *client) CloudAutoQuery() (CloudConnectResp, error) { + resp, err := c.requestCloud("GET", "/auto_query", "") + if err != nil { + return nil, err + } + defer func(Body io.ReadCloser) { + _ = Body.Close() + }(resp.Body) + + if resp.StatusCode != http.StatusOK { + return nil, errors.New("failed to get auto_query") + } + + body, err := io.ReadAll(resp.Body) + if err != nil { + return nil, err + } + + var result CloudConnectResp + err = json.Unmarshal(body, &result) + if err != nil { + return nil, err + } + + return result, nil +} + +// endregion + +func (c *client) requestCloud(method string, url string, data string) (*http.Response, error) { + + req, err := http.NewRequest(method, c.url+url, strings.NewReader(data)) + if err != nil { + return nil, err + } + + req.Header.Set("Content-Type", "application/json") + req.Header.Set("token", c.token) + + resp, err := http.DefaultClient.Do(req) + if err != nil { + return nil, err + } + + return resp, nil +} + +type Gateway struct { + url string + username string + password string +} + +func InitGateway(url, username, password string) *Gateway { + return &Gateway{url, username, password} +} + +// region gateway:/port/configs + +type PortConfigsReq struct { Port string `json:"port"` - Edge []string `json:"edge"` - Type string `json:"type"` - Time int `json:"time"` - Status bool `json:"status"` - Rate int `json:"rate"` - Whitelist []string `json:"whitelist"` - Userpass string `json:"userpass"` - AutoEdgeConfig AutoEdgeConfig `json:"auto_edge_config"` + Edge []string `json:"edge,omitempty"` + Type string `json:"type,omitempty"` + Time int `json:"time,omitempty"` + Status bool `json:"status,omitempty"` + Rate int `json:"rate,omitempty"` + Whitelist []string `json:"whitelist,omitempty"` + Userpass string `json:"userpass,omitempty"` + AutoEdgeConfig AutoEdgeConfig `json:"auto_edge_config,omitempty"` } type AutoEdgeConfig struct { @@ -48,8 +283,17 @@ type AutoEdgeConfig struct { PacketLoss int `json:"packet_loss"` } -func (c *client) PortConfigs(params ...PortConfig) error { - resp, err := c.requestCloud("/port/configs", params) +func (c *Gateway) GatewayPortConfigs(params []PortConfigsReq) error { + if len(params) == 0 { + return errors.New("params is empty") + } + + data, err := json.Marshal(params) + if err != nil { + return err + } + + resp, err := c.requestGateway("POST", "/port/configs", string(data)) if err != nil { return err } @@ -79,41 +323,87 @@ func (c *client) PortConfigs(params ...PortConfig) error { return nil } -func (c *client) requestGateway(url string, data any) (*http.Response, error) { +// endregion + +// region gateway:/port/active + +type PortActiveReq struct { + Port string `json:"port"` + Active bool `json:"active"` + Status bool `json:"status"` +} + +type PortActiveResp struct { + Code int `json:"code"` + Msg string `json:"msg"` + Data map[string]PortData `json:"data"` +} + +type PortData struct { + Edge []string `json:"edge"` + Type string `json:"type"` + Status bool `json:"status"` + Active bool `json:"active"` + Time int `json:"time"` + Whitelist []string `json:"whitelist"` + Userpass string `json:"userpass"` +} + +func (c *Gateway) GatewayPortActive(param PortActiveReq) (*PortActiveResp, error) { + + url := strings.Builder{} + url.WriteString("/port/active") + + if param.Port != "" { + url.WriteString("/") + url.WriteString(param.Port) + } + + url.WriteString("?active=") + url.WriteString(strconv.FormatBool(param.Active)) + url.WriteString("&status=") + url.WriteString(strconv.FormatBool(param.Status)) + + resp, err := c.requestGateway("POST", url.String(), "") + if err != nil { + return nil, err + } + defer func(Body io.ReadCloser) { + _ = Body.Close() + }(resp.Body) + + if resp.StatusCode != http.StatusOK { + return nil, errors.New("failed to get port active") + } + + body, err := io.ReadAll(resp.Body) + if err != nil { + return nil, err + } + + var result PortActiveResp + err = json.Unmarshal(body, &result) + if err != nil { + return nil, err + } + + return &result, nil +} + +// endregion + +func (c *Gateway) requestGateway(method string, url string, data any) (*http.Response, error) { jsonData, err := json.Marshal(data) if err != nil { return nil, err } - req, err := http.NewRequest("POST", c.gatewayUrl+url, strings.NewReader(string(jsonData))) + req, err := http.NewRequest(method, c.username+":"+c.password+"@"+c.url+url, strings.NewReader(string(jsonData))) if err != nil { return nil, err } req.Header.Set("Content-Type", "application/json") - req.SetBasicAuth(c.username, c.password) - - resp, err := http.DefaultClient.Do(req) - if err != nil { - return nil, err - } - - return resp, nil -} - -func (c *client) requestCloud(url string, data any) (*http.Response, error) { - jsonData, err := json.Marshal(data) - if err != nil { - return nil, err - } - - req, err := http.NewRequest("POST", c.cloudUrl+url, strings.NewReader(string(jsonData))) - if err != nil { - return nil, err - } - - req.Header.Set("Content-Type", "application/json") - req.Header.Set("token", c.token) resp, err := http.DefaultClient.Do(req) if err != nil { diff --git a/scripts/sql/init.sql b/scripts/sql/init.sql index e5ee514..08bebe2 100644 --- a/scripts/sql/init.sql +++ b/scripts/sql/init.sql @@ -380,34 +380,72 @@ comment on column client_permission_link.deleted_at is '删除时间'; -- region 节点信息 -- ==================== +-- proxy +drop table if exists proxy cascade; +create table proxy ( + id serial primary key, + version int not null, + name varchar(255) not null unique, + host varchar(255) not null, + type int not null default 0, + created_at timestamp default current_timestamp, + updated_at timestamp default current_timestamp, + deleted_at timestamp +); +create index proxy_name_index on proxy (name); +create index proxy_host_index on proxy (host); + +-- proxy表字段注释 +comment on table proxy is '代理服务表'; +comment on column proxy.id is '代理服务ID'; +comment on column proxy.version is '代理服务版本'; +comment on column proxy.name is '代理服务名称'; +comment on column proxy.host is '代理服务地址'; +comment on column proxy.type is '代理服务类型:0-自有,1-三方'; +comment on column proxy.created_at is '创建时间'; +comment on column proxy.updated_at is '更新时间'; +comment on column proxy.deleted_at is '删除时间'; + -- node drop table if exists node cascade; create table node ( id serial primary key, - name varchar(255) not null unique, version int not null, - fwd_port int not null, - isp varchar(255) not null, - prov varchar(255) not null, - city varchar(255) not null, - - created_at timestamp default current_timestamp, - updated_at timestamp default current_timestamp, + name varchar(255) not null unique, + host varchar(255) not null, + isp varchar(255) not null, + prov varchar(255) not null, + city varchar(255) not null, + proxy_id int references proxy (id) + on update cascade + on delete cascade, + proxy_port int, + status int not null default 0, + rtt int default 0, + loss int default 0, + created_at timestamp default current_timestamp, + updated_at timestamp default current_timestamp, deleted_at timestamp ); create index node_isp_index on node (isp); create index node_prov_index on node (prov); create index node_city_index on node (city); +create index node_proxy_id_index on node (proxy_id); -- node表字段注释 comment on table node is '节点表'; comment on column node.id is '节点ID'; -comment on column node.name is '节点名称'; comment on column node.version is '节点版本'; -comment on column node.fwd_port is '转发端口'; +comment on column node.name is '节点名称'; +comment on column node.host is '节点地址'; comment on column node.isp is '运营商'; comment on column node.prov is '省份'; comment on column node.city is '城市'; +comment on column node.proxy_id is '代理ID'; +comment on column node.proxy_port is '代理端口'; +comment on column node.status is '节点状态:1-正常,0-离线'; +comment on column node.rtt is '延迟'; +comment on column node.loss is '丢包率'; comment on column node.created_at is '创建时间'; comment on column node.updated_at is '更新时间'; comment on column node.deleted_at is '删除时间'; @@ -440,37 +478,45 @@ comment on column whitelist.deleted_at is '删除时间'; drop table if exists channel cascade; create table channel ( id serial primary key, - user_id int not null references "user" (id) + user_id int not null references "user" (id) on update cascade on delete cascade, - node_id int references node (id) -- - on update cascade -- - on delete set null, - user_host varchar(255) not null, - node_port int, - auth_ip bool not null default false, - auth_pass bool not null default false, + proxy_id int not null references proxy (id) -- + on update cascade -- + on delete set null, + node_id int references node (id) -- + on update cascade -- + on delete set null, + proxy_port int not null, protocol varchar(255), + user_host varchar(255), + node_host varchar(255), + auth_ip bool not null default false, + auth_pass bool not null default false, username varchar(255) unique, password varchar(255), - expiration timestamp not null, - created_at timestamp default current_timestamp, - updated_at timestamp default current_timestamp, + expiration timestamp not null, + created_at timestamp default current_timestamp, + updated_at timestamp default current_timestamp, deleted_at timestamp ); create index channel_user_id_index on channel (user_id); +create index channel_proxy_id_index on channel (proxy_id); create index channel_node_id_index on channel (node_id); create index channel_user_host_index on channel (user_host); -create index channel_node_port_index on channel (node_port); +create index channel_proxy_port_index on channel (proxy_port); +create index channel_node_host_index on channel (node_host); create index channel_expiration_index on channel (expiration); -- channel表字段注释 comment on table channel is '通道表'; comment on column channel.id is '通道ID'; comment on column channel.user_id is '用户ID'; +comment on column channel.proxy_id is '代理ID'; comment on column channel.node_id is '节点ID'; comment on column channel.user_host is '用户地址'; -comment on column channel.node_port is '节点端口'; +comment on column channel.proxy_port is '转发端口'; +comment on column channel.node_host is '节点地址'; comment on column channel.auth_ip is 'IP认证'; comment on column channel.auth_pass is '密码认证'; comment on column channel.protocol is '协议'; diff --git a/web/handlers/channel.go b/web/handlers/channel.go index 3bac3d7..29a467b 100644 --- a/web/handlers/channel.go +++ b/web/handlers/channel.go @@ -12,11 +12,13 @@ import ( // region CreateChannel type CreateChannelReq struct { - Region string `json:"region" validate:"required"` - Provider string `json:"provider" validate:"required"` + ResourceId int32 `json:"resource_id" validate:"required"` Protocol services.ChannelProtocol `json:"protocol" validate:"required,oneof=socks5 http https"` - ResourceId int `json:"resource_id" validate:"required"` + AuthType services.ChannelAuthType `json:"auth_type" validate:"required,oneof=0 1"` Count int `json:"count" validate:"required"` + Prov string `json:"prov" validate:"required"` + City string `json:"city" validate:"required"` + Isp string `json:"isp" validate:"required"` ResultType CreateChannelResultType `json:"return_type" validate:"required,oneof=json text"` ResultSeparator CreateChannelResultSeparator `json:"return_separator" validate:"required,oneof=enter line both tab"` } @@ -33,13 +35,18 @@ func CreateChannel(c *fiber.Ctx) error { return errors.New("user not found") } - channels, err := services.Channel.CreateChannel( - auth.Payload.Id, - req.Region, - req.Provider, - req.Protocol, + channels, err := services.Channel.RemoteCreateChannel( + c.Context(), + auth, req.ResourceId, + req.Protocol, + req.AuthType, req.Count, + services.NodeFilterConfig{ + Isp: req.Isp, + Prov: req.Prov, + City: req.City, + }, ) if err != nil { return err @@ -48,7 +55,7 @@ func CreateChannel(c *fiber.Ctx) error { // 返回连接通道列表 var result []string for _, channel := range channels { - url := fmt.Sprintf("%s://%s:%d", channel.Protocol, channel.UserHost, channel.NodePort) + url := fmt.Sprintf("%s://%s:%d", channel.Protocol, channel.UserHost, channel.ProxyPort) result = append(result, url) } diff --git a/web/models/channel.gen.go b/web/models/channel.gen.go index f8fc279..829fe93 100644 --- a/web/models/channel.gen.go +++ b/web/models/channel.gen.go @@ -16,12 +16,14 @@ const TableNameChannel = "channel" type Channel struct { ID int32 `gorm:"column:id;primaryKey;autoIncrement:true;comment:通道ID" json:"id"` // 通道ID UserID int32 `gorm:"column:user_id;not null;comment:用户ID" json:"user_id"` // 用户ID + ProxyID int32 `gorm:"column:proxy_id;not null;comment:代理ID" json:"proxy_id"` // 代理ID NodeID int32 `gorm:"column:node_id;comment:节点ID" json:"node_id"` // 节点ID - UserHost string `gorm:"column:user_host;not null;comment:用户地址" json:"user_host"` // 用户地址 - NodePort int32 `gorm:"column:node_port;comment:节点端口" json:"node_port"` // 节点端口 + ProxyPort int32 `gorm:"column:proxy_port;not null;comment:转发端口" json:"proxy_port"` // 转发端口 + Protocol string `gorm:"column:protocol;comment:协议" json:"protocol"` // 协议 + UserHost string `gorm:"column:user_host;comment:用户地址" json:"user_host"` // 用户地址 + NodeHost string `gorm:"column:node_host;comment:节点地址" json:"node_host"` // 节点地址 AuthIP bool `gorm:"column:auth_ip;not null;comment:IP认证" json:"auth_ip"` // IP认证 AuthPass bool `gorm:"column:auth_pass;not null;comment:密码认证" json:"auth_pass"` // 密码认证 - Protocol string `gorm:"column:protocol;comment:协议" json:"protocol"` // 协议 Username string `gorm:"column:username;comment:用户名" json:"username"` // 用户名 Password string `gorm:"column:password;comment:密码" json:"password"` // 密码 Expiration time.Time `gorm:"column:expiration;not null;comment:过期时间" json:"expiration"` // 过期时间 diff --git a/web/models/node.gen.go b/web/models/node.gen.go index c075ba4..3b073bc 100644 --- a/web/models/node.gen.go +++ b/web/models/node.gen.go @@ -15,12 +15,17 @@ const TableNameNode = "node" // Node mapped from table type Node struct { ID int32 `gorm:"column:id;primaryKey;autoIncrement:true;comment:节点ID" json:"id"` // 节点ID - Name string `gorm:"column:name;not null;comment:节点名称" json:"name"` // 节点名称 Version int32 `gorm:"column:version;not null;comment:节点版本" json:"version"` // 节点版本 - FwdPort int32 `gorm:"column:fwd_port;not null;comment:转发端口" json:"fwd_port"` // 转发端口 + Name string `gorm:"column:name;not null;comment:节点名称" json:"name"` // 节点名称 + Host string `gorm:"column:host;not null;comment:节点地址" json:"host"` // 节点地址 Isp string `gorm:"column:isp;not null;comment:运营商" json:"isp"` // 运营商 Prov string `gorm:"column:prov;not null;comment:省份" json:"prov"` // 省份 City string `gorm:"column:city;not null;comment:城市" json:"city"` // 城市 + ProxyID int32 `gorm:"column:proxy_id;comment:代理ID" json:"proxy_id"` // 代理ID + ProxyPort int32 `gorm:"column:proxy_port;comment:代理端口" json:"proxy_port"` // 代理端口 + Status int32 `gorm:"column:status;not null;comment:节点状态:1-正常,0-离线" json:"status"` // 节点状态:1-正常,0-离线 + Rtt int32 `gorm:"column:rtt;comment:延迟" json:"rtt"` // 延迟 + Loss int32 `gorm:"column:loss;comment:丢包率" json:"loss"` // 丢包率 CreatedAt time.Time `gorm:"column:created_at;default:CURRENT_TIMESTAMP;comment:创建时间" json:"created_at"` // 创建时间 UpdatedAt time.Time `gorm:"column:updated_at;default:CURRENT_TIMESTAMP;comment:更新时间" json:"updated_at"` // 更新时间 DeletedAt gorm.DeletedAt `gorm:"column:deleted_at;comment:删除时间" json:"deleted_at"` // 删除时间 diff --git a/web/models/proxy.gen.go b/web/models/proxy.gen.go new file mode 100644 index 0000000..be0e452 --- /dev/null +++ b/web/models/proxy.gen.go @@ -0,0 +1,30 @@ +// Code generated by gorm.io/gen. DO NOT EDIT. +// Code generated by gorm.io/gen. DO NOT EDIT. +// Code generated by gorm.io/gen. DO NOT EDIT. + +package models + +import ( + "time" + + "gorm.io/gorm" +) + +const TableNameProxy = "proxy" + +// Proxy mapped from table +type Proxy struct { + ID int32 `gorm:"column:id;primaryKey;autoIncrement:true;comment:代理服务ID" json:"id"` // 代理服务ID + Version int32 `gorm:"column:version;not null;comment:代理服务版本" json:"version"` // 代理服务版本 + Name string `gorm:"column:name;not null;comment:代理服务名称" json:"name"` // 代理服务名称 + Host string `gorm:"column:host;not null;comment:代理服务地址" json:"host"` // 代理服务地址 + Type int32 `gorm:"column:type;not null;comment:代理服务类型:0-自有,1-三方" json:"type"` // 代理服务类型:0-自有,1-三方 + CreatedAt time.Time `gorm:"column:created_at;default:CURRENT_TIMESTAMP;comment:创建时间" json:"created_at"` // 创建时间 + UpdatedAt time.Time `gorm:"column:updated_at;default:CURRENT_TIMESTAMP;comment:更新时间" json:"updated_at"` // 更新时间 + DeletedAt gorm.DeletedAt `gorm:"column:deleted_at;comment:删除时间" json:"deleted_at"` // 删除时间 +} + +// TableName Proxy's table name +func (*Proxy) TableName() string { + return TableNameProxy +} diff --git a/web/queries/channel.gen.go b/web/queries/channel.gen.go index 537edf3..5ceccad 100644 --- a/web/queries/channel.gen.go +++ b/web/queries/channel.gen.go @@ -29,12 +29,14 @@ func newChannel(db *gorm.DB, opts ...gen.DOOption) channel { _channel.ALL = field.NewAsterisk(tableName) _channel.ID = field.NewInt32(tableName, "id") _channel.UserID = field.NewInt32(tableName, "user_id") + _channel.ProxyID = field.NewInt32(tableName, "proxy_id") _channel.NodeID = field.NewInt32(tableName, "node_id") + _channel.ProxyPort = field.NewInt32(tableName, "proxy_port") + _channel.Protocol = field.NewString(tableName, "protocol") _channel.UserHost = field.NewString(tableName, "user_host") - _channel.NodePort = field.NewInt32(tableName, "node_port") + _channel.NodeHost = field.NewString(tableName, "node_host") _channel.AuthIP = field.NewBool(tableName, "auth_ip") _channel.AuthPass = field.NewBool(tableName, "auth_pass") - _channel.Protocol = field.NewString(tableName, "protocol") _channel.Username = field.NewString(tableName, "username") _channel.Password = field.NewString(tableName, "password") _channel.Expiration = field.NewTime(tableName, "expiration") @@ -53,12 +55,14 @@ type channel struct { ALL field.Asterisk ID field.Int32 // 通道ID UserID field.Int32 // 用户ID + ProxyID field.Int32 // 代理ID NodeID field.Int32 // 节点ID + ProxyPort field.Int32 // 转发端口 + Protocol field.String // 协议 UserHost field.String // 用户地址 - NodePort field.Int32 // 节点端口 + NodeHost field.String // 节点地址 AuthIP field.Bool // IP认证 AuthPass field.Bool // 密码认证 - Protocol field.String // 协议 Username field.String // 用户名 Password field.String // 密码 Expiration field.Time // 过期时间 @@ -83,12 +87,14 @@ func (c *channel) updateTableName(table string) *channel { c.ALL = field.NewAsterisk(table) c.ID = field.NewInt32(table, "id") c.UserID = field.NewInt32(table, "user_id") + c.ProxyID = field.NewInt32(table, "proxy_id") c.NodeID = field.NewInt32(table, "node_id") + c.ProxyPort = field.NewInt32(table, "proxy_port") + c.Protocol = field.NewString(table, "protocol") c.UserHost = field.NewString(table, "user_host") - c.NodePort = field.NewInt32(table, "node_port") + c.NodeHost = field.NewString(table, "node_host") c.AuthIP = field.NewBool(table, "auth_ip") c.AuthPass = field.NewBool(table, "auth_pass") - c.Protocol = field.NewString(table, "protocol") c.Username = field.NewString(table, "username") c.Password = field.NewString(table, "password") c.Expiration = field.NewTime(table, "expiration") @@ -111,15 +117,17 @@ func (c *channel) GetFieldByName(fieldName string) (field.OrderExpr, bool) { } func (c *channel) fillFieldMap() { - c.fieldMap = make(map[string]field.Expr, 14) + c.fieldMap = make(map[string]field.Expr, 16) c.fieldMap["id"] = c.ID c.fieldMap["user_id"] = c.UserID + c.fieldMap["proxy_id"] = c.ProxyID c.fieldMap["node_id"] = c.NodeID + c.fieldMap["proxy_port"] = c.ProxyPort + c.fieldMap["protocol"] = c.Protocol c.fieldMap["user_host"] = c.UserHost - c.fieldMap["node_port"] = c.NodePort + c.fieldMap["node_host"] = c.NodeHost c.fieldMap["auth_ip"] = c.AuthIP c.fieldMap["auth_pass"] = c.AuthPass - c.fieldMap["protocol"] = c.Protocol c.fieldMap["username"] = c.Username c.fieldMap["password"] = c.Password c.fieldMap["expiration"] = c.Expiration diff --git a/web/queries/gen.go b/web/queries/gen.go index eace030..34ee86e 100644 --- a/web/queries/gen.go +++ b/web/queries/gen.go @@ -28,6 +28,7 @@ var ( Node *node Permission *permission Product *product + Proxy *proxy Refund *refund Resource *resource ResourcePps *resourcePps @@ -54,6 +55,7 @@ func SetDefault(db *gorm.DB, opts ...gen.DOOption) { Node = &Q.Node Permission = &Q.Permission Product = &Q.Product + Proxy = &Q.Proxy Refund = &Q.Refund Resource = &Q.Resource ResourcePps = &Q.ResourcePps @@ -81,6 +83,7 @@ func Use(db *gorm.DB, opts ...gen.DOOption) *Query { Node: newNode(db, opts...), Permission: newPermission(db, opts...), Product: newProduct(db, opts...), + Proxy: newProxy(db, opts...), Refund: newRefund(db, opts...), Resource: newResource(db, opts...), ResourcePps: newResourcePps(db, opts...), @@ -109,6 +112,7 @@ type Query struct { Node node Permission permission Product product + Proxy proxy Refund refund Resource resource ResourcePps resourcePps @@ -138,6 +142,7 @@ func (q *Query) clone(db *gorm.DB) *Query { Node: q.Node.clone(db), Permission: q.Permission.clone(db), Product: q.Product.clone(db), + Proxy: q.Proxy.clone(db), Refund: q.Refund.clone(db), Resource: q.Resource.clone(db), ResourcePps: q.ResourcePps.clone(db), @@ -174,6 +179,7 @@ func (q *Query) ReplaceDB(db *gorm.DB) *Query { Node: q.Node.replaceDB(db), Permission: q.Permission.replaceDB(db), Product: q.Product.replaceDB(db), + Proxy: q.Proxy.replaceDB(db), Refund: q.Refund.replaceDB(db), Resource: q.Resource.replaceDB(db), ResourcePps: q.ResourcePps.replaceDB(db), @@ -200,6 +206,7 @@ type queryCtx struct { Node *nodeDo Permission *permissionDo Product *productDo + Proxy *proxyDo Refund *refundDo Resource *resourceDo ResourcePps *resourcePpsDo @@ -226,6 +233,7 @@ func (q *Query) WithContext(ctx context.Context) *queryCtx { Node: q.Node.WithContext(ctx), Permission: q.Permission.WithContext(ctx), Product: q.Product.WithContext(ctx), + Proxy: q.Proxy.WithContext(ctx), Refund: q.Refund.WithContext(ctx), Resource: q.Resource.WithContext(ctx), ResourcePps: q.ResourcePps.WithContext(ctx), diff --git a/web/queries/node.gen.go b/web/queries/node.gen.go index 23f7e00..4243751 100644 --- a/web/queries/node.gen.go +++ b/web/queries/node.gen.go @@ -28,12 +28,17 @@ func newNode(db *gorm.DB, opts ...gen.DOOption) node { tableName := _node.nodeDo.TableName() _node.ALL = field.NewAsterisk(tableName) _node.ID = field.NewInt32(tableName, "id") - _node.Name = field.NewString(tableName, "name") _node.Version = field.NewInt32(tableName, "version") - _node.FwdPort = field.NewInt32(tableName, "fwd_port") + _node.Name = field.NewString(tableName, "name") + _node.Host = field.NewString(tableName, "host") _node.Isp = field.NewString(tableName, "isp") _node.Prov = field.NewString(tableName, "prov") _node.City = field.NewString(tableName, "city") + _node.ProxyID = field.NewInt32(tableName, "proxy_id") + _node.ProxyPort = field.NewInt32(tableName, "proxy_port") + _node.Status = field.NewInt32(tableName, "status") + _node.Rtt = field.NewInt32(tableName, "rtt") + _node.Loss = field.NewInt32(tableName, "loss") _node.CreatedAt = field.NewTime(tableName, "created_at") _node.UpdatedAt = field.NewTime(tableName, "updated_at") _node.DeletedAt = field.NewField(tableName, "deleted_at") @@ -48,12 +53,17 @@ type node struct { ALL field.Asterisk ID field.Int32 // 节点ID - Name field.String // 节点名称 Version field.Int32 // 节点版本 - FwdPort field.Int32 // 转发端口 + Name field.String // 节点名称 + Host field.String // 节点地址 Isp field.String // 运营商 Prov field.String // 省份 City field.String // 城市 + ProxyID field.Int32 // 代理ID + ProxyPort field.Int32 // 代理端口 + Status field.Int32 // 节点状态:1-正常,0-离线 + Rtt field.Int32 // 延迟 + Loss field.Int32 // 丢包率 CreatedAt field.Time // 创建时间 UpdatedAt field.Time // 更新时间 DeletedAt field.Field // 删除时间 @@ -74,12 +84,17 @@ func (n node) As(alias string) *node { func (n *node) updateTableName(table string) *node { n.ALL = field.NewAsterisk(table) n.ID = field.NewInt32(table, "id") - n.Name = field.NewString(table, "name") n.Version = field.NewInt32(table, "version") - n.FwdPort = field.NewInt32(table, "fwd_port") + n.Name = field.NewString(table, "name") + n.Host = field.NewString(table, "host") n.Isp = field.NewString(table, "isp") n.Prov = field.NewString(table, "prov") n.City = field.NewString(table, "city") + n.ProxyID = field.NewInt32(table, "proxy_id") + n.ProxyPort = field.NewInt32(table, "proxy_port") + n.Status = field.NewInt32(table, "status") + n.Rtt = field.NewInt32(table, "rtt") + n.Loss = field.NewInt32(table, "loss") n.CreatedAt = field.NewTime(table, "created_at") n.UpdatedAt = field.NewTime(table, "updated_at") n.DeletedAt = field.NewField(table, "deleted_at") @@ -99,14 +114,19 @@ func (n *node) GetFieldByName(fieldName string) (field.OrderExpr, bool) { } func (n *node) fillFieldMap() { - n.fieldMap = make(map[string]field.Expr, 10) + n.fieldMap = make(map[string]field.Expr, 15) n.fieldMap["id"] = n.ID - n.fieldMap["name"] = n.Name n.fieldMap["version"] = n.Version - n.fieldMap["fwd_port"] = n.FwdPort + n.fieldMap["name"] = n.Name + n.fieldMap["host"] = n.Host n.fieldMap["isp"] = n.Isp n.fieldMap["prov"] = n.Prov n.fieldMap["city"] = n.City + n.fieldMap["proxy_id"] = n.ProxyID + n.fieldMap["proxy_port"] = n.ProxyPort + n.fieldMap["status"] = n.Status + n.fieldMap["rtt"] = n.Rtt + n.fieldMap["loss"] = n.Loss n.fieldMap["created_at"] = n.CreatedAt n.fieldMap["updated_at"] = n.UpdatedAt n.fieldMap["deleted_at"] = n.DeletedAt diff --git a/web/queries/proxy.gen.go b/web/queries/proxy.gen.go new file mode 100644 index 0000000..866ea9c --- /dev/null +++ b/web/queries/proxy.gen.go @@ -0,0 +1,347 @@ +// Code generated by gorm.io/gen. DO NOT EDIT. +// Code generated by gorm.io/gen. DO NOT EDIT. +// Code generated by gorm.io/gen. DO NOT EDIT. + +package queries + +import ( + "context" + + "gorm.io/gorm" + "gorm.io/gorm/clause" + "gorm.io/gorm/schema" + + "gorm.io/gen" + "gorm.io/gen/field" + + "gorm.io/plugin/dbresolver" + + "platform/web/models" +) + +func newProxy(db *gorm.DB, opts ...gen.DOOption) proxy { + _proxy := proxy{} + + _proxy.proxyDo.UseDB(db, opts...) + _proxy.proxyDo.UseModel(&models.Proxy{}) + + tableName := _proxy.proxyDo.TableName() + _proxy.ALL = field.NewAsterisk(tableName) + _proxy.ID = field.NewInt32(tableName, "id") + _proxy.Version = field.NewInt32(tableName, "version") + _proxy.Name = field.NewString(tableName, "name") + _proxy.Host = field.NewString(tableName, "host") + _proxy.Type = field.NewInt32(tableName, "type") + _proxy.CreatedAt = field.NewTime(tableName, "created_at") + _proxy.UpdatedAt = field.NewTime(tableName, "updated_at") + _proxy.DeletedAt = field.NewField(tableName, "deleted_at") + + _proxy.fillFieldMap() + + return _proxy +} + +type proxy struct { + proxyDo + + ALL field.Asterisk + ID field.Int32 // 代理服务ID + Version field.Int32 // 代理服务版本 + Name field.String // 代理服务名称 + Host field.String // 代理服务地址 + Type field.Int32 // 代理服务类型:0-自有,1-三方 + CreatedAt field.Time // 创建时间 + UpdatedAt field.Time // 更新时间 + DeletedAt field.Field // 删除时间 + + fieldMap map[string]field.Expr +} + +func (p proxy) Table(newTableName string) *proxy { + p.proxyDo.UseTable(newTableName) + return p.updateTableName(newTableName) +} + +func (p proxy) As(alias string) *proxy { + p.proxyDo.DO = *(p.proxyDo.As(alias).(*gen.DO)) + return p.updateTableName(alias) +} + +func (p *proxy) updateTableName(table string) *proxy { + p.ALL = field.NewAsterisk(table) + p.ID = field.NewInt32(table, "id") + p.Version = field.NewInt32(table, "version") + p.Name = field.NewString(table, "name") + p.Host = field.NewString(table, "host") + p.Type = field.NewInt32(table, "type") + p.CreatedAt = field.NewTime(table, "created_at") + p.UpdatedAt = field.NewTime(table, "updated_at") + p.DeletedAt = field.NewField(table, "deleted_at") + + p.fillFieldMap() + + return p +} + +func (p *proxy) GetFieldByName(fieldName string) (field.OrderExpr, bool) { + _f, ok := p.fieldMap[fieldName] + if !ok || _f == nil { + return nil, false + } + _oe, ok := _f.(field.OrderExpr) + return _oe, ok +} + +func (p *proxy) fillFieldMap() { + p.fieldMap = make(map[string]field.Expr, 8) + p.fieldMap["id"] = p.ID + p.fieldMap["version"] = p.Version + p.fieldMap["name"] = p.Name + p.fieldMap["host"] = p.Host + p.fieldMap["type"] = p.Type + p.fieldMap["created_at"] = p.CreatedAt + p.fieldMap["updated_at"] = p.UpdatedAt + p.fieldMap["deleted_at"] = p.DeletedAt +} + +func (p proxy) clone(db *gorm.DB) proxy { + p.proxyDo.ReplaceConnPool(db.Statement.ConnPool) + return p +} + +func (p proxy) replaceDB(db *gorm.DB) proxy { + p.proxyDo.ReplaceDB(db) + return p +} + +type proxyDo struct{ gen.DO } + +func (p proxyDo) Debug() *proxyDo { + return p.withDO(p.DO.Debug()) +} + +func (p proxyDo) WithContext(ctx context.Context) *proxyDo { + return p.withDO(p.DO.WithContext(ctx)) +} + +func (p proxyDo) ReadDB() *proxyDo { + return p.Clauses(dbresolver.Read) +} + +func (p proxyDo) WriteDB() *proxyDo { + return p.Clauses(dbresolver.Write) +} + +func (p proxyDo) Session(config *gorm.Session) *proxyDo { + return p.withDO(p.DO.Session(config)) +} + +func (p proxyDo) Clauses(conds ...clause.Expression) *proxyDo { + return p.withDO(p.DO.Clauses(conds...)) +} + +func (p proxyDo) Returning(value interface{}, columns ...string) *proxyDo { + return p.withDO(p.DO.Returning(value, columns...)) +} + +func (p proxyDo) Not(conds ...gen.Condition) *proxyDo { + return p.withDO(p.DO.Not(conds...)) +} + +func (p proxyDo) Or(conds ...gen.Condition) *proxyDo { + return p.withDO(p.DO.Or(conds...)) +} + +func (p proxyDo) Select(conds ...field.Expr) *proxyDo { + return p.withDO(p.DO.Select(conds...)) +} + +func (p proxyDo) Where(conds ...gen.Condition) *proxyDo { + return p.withDO(p.DO.Where(conds...)) +} + +func (p proxyDo) Order(conds ...field.Expr) *proxyDo { + return p.withDO(p.DO.Order(conds...)) +} + +func (p proxyDo) Distinct(cols ...field.Expr) *proxyDo { + return p.withDO(p.DO.Distinct(cols...)) +} + +func (p proxyDo) Omit(cols ...field.Expr) *proxyDo { + return p.withDO(p.DO.Omit(cols...)) +} + +func (p proxyDo) Join(table schema.Tabler, on ...field.Expr) *proxyDo { + return p.withDO(p.DO.Join(table, on...)) +} + +func (p proxyDo) LeftJoin(table schema.Tabler, on ...field.Expr) *proxyDo { + return p.withDO(p.DO.LeftJoin(table, on...)) +} + +func (p proxyDo) RightJoin(table schema.Tabler, on ...field.Expr) *proxyDo { + return p.withDO(p.DO.RightJoin(table, on...)) +} + +func (p proxyDo) Group(cols ...field.Expr) *proxyDo { + return p.withDO(p.DO.Group(cols...)) +} + +func (p proxyDo) Having(conds ...gen.Condition) *proxyDo { + return p.withDO(p.DO.Having(conds...)) +} + +func (p proxyDo) Limit(limit int) *proxyDo { + return p.withDO(p.DO.Limit(limit)) +} + +func (p proxyDo) Offset(offset int) *proxyDo { + return p.withDO(p.DO.Offset(offset)) +} + +func (p proxyDo) Scopes(funcs ...func(gen.Dao) gen.Dao) *proxyDo { + return p.withDO(p.DO.Scopes(funcs...)) +} + +func (p proxyDo) Unscoped() *proxyDo { + return p.withDO(p.DO.Unscoped()) +} + +func (p proxyDo) Create(values ...*models.Proxy) error { + if len(values) == 0 { + return nil + } + return p.DO.Create(values) +} + +func (p proxyDo) CreateInBatches(values []*models.Proxy, batchSize int) error { + return p.DO.CreateInBatches(values, batchSize) +} + +// Save : !!! underlying implementation is different with GORM +// The method is equivalent to executing the statement: db.Clauses(clause.OnConflict{UpdateAll: true}).Create(values) +func (p proxyDo) Save(values ...*models.Proxy) error { + if len(values) == 0 { + return nil + } + return p.DO.Save(values) +} + +func (p proxyDo) First() (*models.Proxy, error) { + if result, err := p.DO.First(); err != nil { + return nil, err + } else { + return result.(*models.Proxy), nil + } +} + +func (p proxyDo) Take() (*models.Proxy, error) { + if result, err := p.DO.Take(); err != nil { + return nil, err + } else { + return result.(*models.Proxy), nil + } +} + +func (p proxyDo) Last() (*models.Proxy, error) { + if result, err := p.DO.Last(); err != nil { + return nil, err + } else { + return result.(*models.Proxy), nil + } +} + +func (p proxyDo) Find() ([]*models.Proxy, error) { + result, err := p.DO.Find() + return result.([]*models.Proxy), err +} + +func (p proxyDo) FindInBatch(batchSize int, fc func(tx gen.Dao, batch int) error) (results []*models.Proxy, err error) { + buf := make([]*models.Proxy, 0, batchSize) + err = p.DO.FindInBatches(&buf, batchSize, func(tx gen.Dao, batch int) error { + defer func() { results = append(results, buf...) }() + return fc(tx, batch) + }) + return results, err +} + +func (p proxyDo) FindInBatches(result *[]*models.Proxy, batchSize int, fc func(tx gen.Dao, batch int) error) error { + return p.DO.FindInBatches(result, batchSize, fc) +} + +func (p proxyDo) Attrs(attrs ...field.AssignExpr) *proxyDo { + return p.withDO(p.DO.Attrs(attrs...)) +} + +func (p proxyDo) Assign(attrs ...field.AssignExpr) *proxyDo { + return p.withDO(p.DO.Assign(attrs...)) +} + +func (p proxyDo) Joins(fields ...field.RelationField) *proxyDo { + for _, _f := range fields { + p = *p.withDO(p.DO.Joins(_f)) + } + return &p +} + +func (p proxyDo) Preload(fields ...field.RelationField) *proxyDo { + for _, _f := range fields { + p = *p.withDO(p.DO.Preload(_f)) + } + return &p +} + +func (p proxyDo) FirstOrInit() (*models.Proxy, error) { + if result, err := p.DO.FirstOrInit(); err != nil { + return nil, err + } else { + return result.(*models.Proxy), nil + } +} + +func (p proxyDo) FirstOrCreate() (*models.Proxy, error) { + if result, err := p.DO.FirstOrCreate(); err != nil { + return nil, err + } else { + return result.(*models.Proxy), nil + } +} + +func (p proxyDo) FindByPage(offset int, limit int) (result []*models.Proxy, count int64, err error) { + result, err = p.Offset(offset).Limit(limit).Find() + if err != nil { + return + } + + if size := len(result); 0 < limit && 0 < size && size < limit { + count = int64(size + offset) + return + } + + count, err = p.Offset(-1).Limit(-1).Count() + return +} + +func (p proxyDo) ScanByPage(result interface{}, offset int, limit int) (count int64, err error) { + count, err = p.Count() + if err != nil { + return + } + + err = p.Offset(offset).Limit(limit).Scan(result) + return +} + +func (p proxyDo) Scan(result interface{}) (err error) { + return p.DO.Scan(result) +} + +func (p proxyDo) Delete(models ...*models.Proxy) (result gen.ResultInfo, err error) { + return p.DO.Delete(models) +} + +func (p *proxyDo) withDO(do gen.Dao) *proxyDo { + p.DO = *do.(*gen.DO) + return p +} diff --git a/web/services/channel.go b/web/services/channel.go index 9a9bb6e..69acf83 100644 --- a/web/services/channel.go +++ b/web/services/channel.go @@ -4,11 +4,15 @@ import ( "context" "errors" "fmt" + "log/slog" "math" + "platform/pkg/orm" "platform/pkg/rds" + "platform/pkg/remote" "platform/web/common" "platform/web/models" q "platform/web/queries" + "strconv" "time" "github.com/google/uuid" @@ -82,7 +86,7 @@ func (s *channelService) CreateChannel( } // 筛选可用节点 - nodes, err := Node.Filter(auth.Payload.Id, protocol, count, nodeFilter...) + nodes, err := Node.Filter(ctx, auth.Payload.Id, count, nodeFilter...) if err != nil { return err } @@ -103,10 +107,11 @@ func (s *channelService) CreateChannel( channels = append(channels, &models.Channel{ UserID: auth.Payload.Id, NodeID: node.ID, - NodePort: node.FwdPort, + UserHost: allowed.Host, + NodeHost: node.Host, + ProxyPort: node.ProxyPort, Protocol: string(protocol), AuthIP: authType == ChannelAuthTypeIp, - UserHost: allowed.Host, AuthPass: authType == ChannelAuthTypePass, Username: username, Password: password, @@ -146,17 +151,7 @@ func (s *channelService) CreateChannel( } // 缓存通道信息与异步删除任务 - pipe := rds.Client.TxPipeline() - zList := make([]redis.Z, 0, len(channels)) - for _, channel := range channels { - pipe.Set(ctx, chKey(channel), channel, channel.Expiration.Sub(time.Now())) - zList = append(zList, redis.Z{ - Score: float64(channel.Expiration.Unix()), - Member: channel.ID, - }) - } - pipe.ZAdd(ctx, "tasks:channel", zList...) - _, err = pipe.Exec(ctx) + err = cache(ctx, channels) if err != nil { return nil, err } @@ -194,7 +189,7 @@ func genPassPair() (string, string) { return username, password } -func (s *channelService) RemoveChannels(auth *AuthContext, id ...int32) error { +func (s *channelService) RemoveChannels(ctx context.Context, auth *AuthContext, id ...int32) error { var channels []*models.Channel @@ -231,16 +226,16 @@ func (s *channelService) RemoveChannels(auth *AuthContext, id ...int32) error { } // 删除缓存,异步任务直接在消费端处理删除 - pipe := rds.Client.TxPipeline() - for _, channel := range channels { - pipe.Del(context.Background(), chKey(channel)) + err = deleteCache(ctx, channels) + if err != nil { + return err } return nil } func chKey(channel *models.Channel) string { - return fmt.Sprintf("channel:%s:%d", channel.UserHost, channel.NodePort) + return fmt.Sprintf("channel:%s:%s", channel.UserHost, channel.NodeHost) } type ChannelServiceErr string @@ -248,3 +243,327 @@ type ChannelServiceErr string func (c ChannelServiceErr) Error() string { return string(c) } + +// region channel by remote + +func (s *channelService) RemoteCreateChannel( + ctx context.Context, + auth *AuthContext, + resourceId int32, + protocol ChannelProtocol, + authType ChannelAuthType, + count int, + nodeFilter ...NodeFilterConfig, +) ([]*models.Channel, error) { + + filter := NodeFilterConfig{} + if len(nodeFilter) > 0 { + filter = nodeFilter[0] + } + + // 查找套餐 + var resource = new(ResourceInfo) + data := q.Resource.As("data") + pss := q.ResourcePss.As("pss") + err := data.Scopes(orm.Alias(data)). + Select(data.ALL, pss.ALL). + LeftJoin(q.ResourcePss.As("pss"), pss.ResourceID.EqCol(data.ID)). + Where(data.ID.Eq(resourceId)). + Scan(&resource) + if err != nil { + if errors.Is(err, gorm.ErrRecordNotFound) { + return nil, ChannelServiceErr("套餐不存在") + } + return nil, err + } + + // 检查用户权限 + err = checkUser(auth, resource, count) + if err != nil { + return nil, err + } + + // 申请节点 + assigned, err := assignEdge(count, filter) + if err != nil { + return nil, err + } + + // 分配端口 + expiration := time.Now().Add(time.Duration(resource.pss.Live) * time.Second) + channels, err := assignPort(assigned, auth.Payload.Id, protocol, authType, expiration, filter) + if err != nil { + return nil, err + } + + // 缓存并关闭代理 + err = cache(ctx, channels) + if err != nil { + return nil, err + } + + return channels, nil +} + +// endregion + +func checkUser(auth *AuthContext, resource *ResourceInfo, count int) error { + + // 检查使用人 + if auth.Payload.Type == PayloadUser && auth.Payload.Id != resource.data.UserID { + return common.AuthForbiddenErr("无权限访问") + } + + // 检查套餐状态 + if !resource.data.Active { + return ChannelServiceErr("套餐已失效") + } + + // 检查每日限额 + today := time.Now().Format("2006-01-02") == resource.pss.DailyLast.Format("2006-01-02") + dailyRemain := int(math.Max(float64(resource.pss.DailyLimit-resource.pss.DailyUsed), 0)) + if today && dailyRemain < count { + return ChannelServiceErr("套餐每日配额不足") + } + + // 检查时间或配额 + if resource.pss.Type == 1 { // 包时 + if resource.pss.Expire.Before(time.Now()) { + return ChannelServiceErr("套餐已过期") + } + } else { // 包量 + remain := int(math.Max(float64(resource.pss.Quota-resource.pss.Used), 0)) + if remain < count { + return ChannelServiceErr("套餐配额不足") + } + } + + return nil +} + +// assignEdge 分配边缘节点数量 +func assignEdge(count int, filter NodeFilterConfig) ([]AssignEdgeResult, error) { + // 查询现有节点连接情况 + edgeConfigs, err := remote.Client.CloudAutoQuery() + if err != nil { + return nil, err + } + + proxies, err := q.Proxy. + Where(q.Proxy.Type.Eq(1)). + Find() + if err != nil { + return nil, err + } + + // 尽量平均分配节点用量 + var total = count + for _, v := range edgeConfigs { + total += v.Count + } + avg := int(math.Ceil(float64(total) / float64(len(edgeConfigs)))) + + var result []AssignEdgeResult + var rCount = 0 + for _, proxy := range proxies { + prev, ok := edgeConfigs[proxy.Name] + var nextCount = 0 + if !ok || (prev.Count < avg && prev.Count < total) { + nextCount = int(math.Min(float64(avg), float64(total))) + result = append(result, AssignEdgeResult{ + proxy: proxy, + count: nextCount - prev.Count, + }) + total -= nextCount + } else { + continue + } + _rCount, err := remote.Client.CloudConnect(remote.CloudConnectReq{ + Uuid: proxy.Name, + Edge: nil, + AutoConfig: []remote.AutoConfig{{ + Province: filter.Prov, + City: filter.City, + Isp: filter.Isp, + Count: nextCount, + }}, + }) + if err != nil { + return nil, err + } + rCount += _rCount + } + slog.Debug("cloud connect", "count", rCount) + + return result, nil +} + +type AssignEdgeResult struct { + proxy *models.Proxy + count int +} + +// assignPort 分配指定数量的端口 +func assignPort( + assigns []AssignEdgeResult, + userId int32, + protocol ChannelProtocol, + authType ChannelAuthType, + expiration time.Time, + filter NodeFilterConfig, +) ([]*models.Channel, error) { + // 查询代理已配置端口 + var proxyIds = make([]int32, 0, len(assigns)) + for _, assigned := range assigns { + proxyIds = append(proxyIds, assigned.proxy.ID) + } + channels, err := q.Channel. + Select( + q.Channel.ProxyID, + q.Channel.ProxyPort). + Where( + q.Channel.ProxyID.In(proxyIds...), + q.Channel.Expiration.Gt(time.Now())). + Group( + q.Channel.ProxyPort). + Find() + if err != nil { + return nil, err + } + + // 端口查找表 + var proxyPorts = make(map[uint64]struct{}) + for _, channel := range channels { + key := uint64(channel.ProxyID)<<32 | uint64(channel.ProxyPort) + proxyPorts[key] = struct{}{} + } + + // 配置启用代理 + var result []*models.Channel + for i := 0; i < len(assigns); i++ { + proxy := assigns[i].proxy + count := assigns[i].count + + // 筛选可用端口 + var portConfigs = make([]remote.PortConfigsReq, count) + for port := 10000; port < 20000 || len(portConfigs) < count; port++ { + // 跳过存在的端口 + key := uint64(proxy.ID)<<32 | uint64(port) + _, ok := proxyPorts[key] + if ok { + continue + } + + // 配置新端口 + portConfigs[port] = remote.PortConfigsReq{ + Port: strconv.Itoa(port), + Edge: nil, + Status: true, + AutoEdgeConfig: remote.AutoEdgeConfig{ + Province: filter.Prov, + City: filter.City, + Isp: filter.Isp, + Count: 1, + }, + } + switch authType { + case ChannelAuthTypeIp: + var whitelist []string + err := q.Whitelist. + Where(q.Whitelist.UserID.Eq(userId)). + Select(q.Whitelist.Host). + Scan(&whitelist) + if err != nil { + return nil, err + } + portConfigs[port].Whitelist = whitelist + for _, item := range whitelist { + result = append(result, &models.Channel{ + UserID: userId, + ProxyID: proxy.ID, + UserHost: item, + ProxyPort: int32(port), + AuthIP: true, + AuthPass: false, + Protocol: string(protocol), + Expiration: expiration, + }) + } + case ChannelAuthTypePass: + username, password := genPassPair() + portConfigs[port].Userpass = fmt.Sprintf("%s:%s", username, password) + result = append(result, &models.Channel{ + UserID: userId, + ProxyID: proxy.ID, + ProxyPort: int32(port), + AuthIP: false, + AuthPass: true, + Username: username, + Password: password, + Protocol: string(protocol), + Expiration: expiration, + }) + } + } + + // 提交端口配置 + gateway := remote.InitGateway( + proxy.Host, + "api", + "123456", + ) + err = gateway.GatewayPortConfigs(portConfigs) + if err != nil { + return nil, err + } + } + + err = q.Channel.Save(result...) + if err != nil { + return nil, err + } + + return result, nil +} + +func cache(ctx context.Context, channels []*models.Channel) error { + pipe := rds.Client.TxPipeline() + + zList := make([]redis.Z, 0, len(channels)) + for _, channel := range channels { + pipe.Set(ctx, chKey(channel), channel, channel.Expiration.Sub(time.Now())) + zList = append(zList, redis.Z{ + Score: float64(channel.Expiration.Unix()), + Member: channel.ID, + }) + } + pipe.ZAdd(ctx, "tasks:channel", zList...) + + _, err := pipe.Exec(ctx) + if err != nil { + return err + } + + return nil +} + +func deleteCache(ctx context.Context, channels []*models.Channel) error { + pipe := rds.Client.TxPipeline() + keys := make([]string, len(channels)) + for i := range keys { + keys[i] = chKey(channels[i]) + } + pipe.Del(ctx, keys...) + // 忽略异步任务,zrem 效率较低,在使用时再删除 + _, err := pipe.Exec(ctx) + if err != nil { + return err + } + + return nil +} + +type ResourceInfo struct { + data models.Resource + pss models.ResourcePss +} diff --git a/web/services/node.go b/web/services/node.go index b8279fa..3501abe 100644 --- a/web/services/node.go +++ b/web/services/node.go @@ -1,16 +1,22 @@ package services import ( - "encoding/json" - "fmt" + "context" "platform/pkg/orm" + "platform/web/models" ) +type NodeServiceErr string + +func (e NodeServiceErr) Error() string { + return string(e) +} + var Node = &nodeService{} type nodeService struct{} -func (s *nodeService) Filter(userId int32, count int, config ...NodeFilterConfig) ([]*FilteredNode, error) { +func (s *nodeService) Filter(ctx context.Context, userId int32, count int, config ...NodeFilterConfig) ([]*models.Node, error) { _config := NodeFilterConfig{} if len(config) > 0 { _config = config[0] @@ -24,24 +30,35 @@ func (s *nodeService) Filter(userId int32, count int, config ...NodeFilterConfig Limit(count). Find(&nodes) - rs, _ := json.Marshal(nodes) - fmt.Printf(string(rs)) + // todo 异步任务关闭代理 - // 返回节点列表 - return nodes, nil + // todo 异步任务缩容 + + return nil, nil } type NodeFilterConfig struct { - Isp string - Prov string - City string + Isp string `json:"isp"` + Prov string `json:"prov"` + City string `json:"city"` } +type NodeFilterAsyncTask struct { + Config NodeFilterConfig `json:"config"` + Count int `json:"count"` +} + +// 筛选节点的SQL语句,暂时用不到 +// 筛选已连接的符合条件且未分配给用户过的节点 +// +// 静态条件:省,市,运营商 +// 排序方式,1.分配给该用户的次数 2.分配给全部用户的次数 const filterSqlRaw = ` select n.id as id, n.name as name, - n.fwd_port as port, + n.host as host, + n.fwd_port as fwd_port, count(c.*) as total, count(c.*) filter ( where c.user_id = ? ) as assigned from @@ -61,7 +78,8 @@ order by type FilteredNode struct { Id int32 `json:"id"` Name string `json:"name"` - Port int32 `json:"port"` + Host string `json:"host"` + FwdPort int32 `json:"fwd_port"` Total int32 `json:"total"` Assigned int32 `json:"assigned"` }