分析优化 channel 接口用时
This commit is contained in:
@@ -19,6 +19,7 @@ import (
|
||||
"strings"
|
||||
"time"
|
||||
|
||||
"github.com/gofiber/fiber/v2/middleware/requestid"
|
||||
"github.com/google/uuid"
|
||||
"github.com/jxskiss/base62"
|
||||
"github.com/redis/go-redis/v9"
|
||||
@@ -62,8 +63,11 @@ type ResourceInfo struct {
|
||||
// region RemoveChannel
|
||||
|
||||
func (s *channelService) RemoveChannels(ctx context.Context, auth *AuthContext, id ...int32) error {
|
||||
// 删除通道
|
||||
var step = time.Now()
|
||||
var rid = ctx.Value(requestid.ConfigDefault.ContextKey).(string)
|
||||
|
||||
err := q.Q.Transaction(func(tx *q.Query) error {
|
||||
|
||||
// 查找通道
|
||||
channels, err := tx.Channel.Where(
|
||||
q.Channel.ID.In(id...),
|
||||
@@ -72,6 +76,8 @@ func (s *channelService) RemoveChannels(ctx context.Context, auth *AuthContext,
|
||||
return err
|
||||
}
|
||||
|
||||
slog.Debug("查找通道", "rid", rid, "step", time.Since(step))
|
||||
|
||||
// 检查权限,如果为用户操作的话,则只能删除自己的通道
|
||||
for _, channel := range channels {
|
||||
if auth.Payload.Type == PayloadUser && auth.Payload.Id != channel.UserID {
|
||||
@@ -80,6 +86,8 @@ func (s *channelService) RemoveChannels(ctx context.Context, auth *AuthContext,
|
||||
}
|
||||
|
||||
// 查找代理
|
||||
step = time.Now()
|
||||
|
||||
proxySet := make(map[int32]struct{})
|
||||
proxyIds := make([]int32, 0)
|
||||
for _, channel := range channels {
|
||||
@@ -92,6 +100,8 @@ func (s *channelService) RemoveChannels(ctx context.Context, auth *AuthContext,
|
||||
q.Proxy.ID.In(proxyIds...),
|
||||
).Find()
|
||||
|
||||
slog.Debug("查找代理", "rid", rid, "step", time.Since(step))
|
||||
|
||||
// 删除指定的通道
|
||||
result, err := tx.Channel.
|
||||
Where(q.Channel.ID.In(id...)).
|
||||
@@ -104,13 +114,20 @@ func (s *channelService) RemoveChannels(ctx context.Context, auth *AuthContext,
|
||||
}
|
||||
|
||||
// 删除缓存,异步任务直接在消费端处理删除
|
||||
step = time.Now()
|
||||
|
||||
err = deleteCache(ctx, channels)
|
||||
if err != nil {
|
||||
return err
|
||||
}
|
||||
|
||||
slog.Debug("删除缓存", "rid", rid, "step", time.Since(step))
|
||||
|
||||
// 禁用代理端口并下线用过的节点
|
||||
if env.DebugExternalChange {
|
||||
step = time.Now()
|
||||
|
||||
// 组织数据
|
||||
var configMap = make(map[int32][]remote.PortConfigsReq, len(proxies))
|
||||
var proxyMap = make(map[int32]*models.Proxy, len(proxies))
|
||||
for _, proxy := range proxies {
|
||||
@@ -133,6 +150,9 @@ func (s *channelService) RemoveChannels(ctx context.Context, auth *AuthContext,
|
||||
portMap[key] = struct{}{}
|
||||
}
|
||||
|
||||
slog.Debug("组织数据", "rid", rid, "step", time.Since(step))
|
||||
|
||||
// 更新配置
|
||||
for proxyId, configs := range configMap {
|
||||
if len(configs) == 0 {
|
||||
continue
|
||||
@@ -149,19 +169,29 @@ func (s *channelService) RemoveChannels(ctx context.Context, auth *AuthContext,
|
||||
secret[1],
|
||||
)
|
||||
|
||||
// 查询配置的节点
|
||||
// 查询节点配置
|
||||
step = time.Now()
|
||||
|
||||
actives, err := gateway.GatewayPortActive()
|
||||
if err != nil {
|
||||
return err
|
||||
}
|
||||
|
||||
// 取消配置
|
||||
slog.Debug("查询节点配置", "rid", rid, "step", time.Since(step))
|
||||
|
||||
// 更新节点配置
|
||||
step = time.Now()
|
||||
|
||||
err = gateway.GatewayPortConfigs(configs)
|
||||
if err != nil {
|
||||
return err
|
||||
}
|
||||
|
||||
slog.Debug("更新节点配置", "rid", rid, "step", time.Since(step))
|
||||
|
||||
// 下线对应节点
|
||||
step = time.Now()
|
||||
|
||||
var edges []string
|
||||
for portStr, active := range actives {
|
||||
port, err := strconv.Atoi(portStr)
|
||||
@@ -182,6 +212,8 @@ func (s *channelService) RemoveChannels(ctx context.Context, auth *AuthContext,
|
||||
return err
|
||||
}
|
||||
}
|
||||
|
||||
slog.Debug("下线对应节点", "rid", rid, "step", time.Since(step))
|
||||
}
|
||||
|
||||
}
|
||||
@@ -208,6 +240,8 @@ func (s *channelService) CreateChannel(
|
||||
count int,
|
||||
nodeFilter ...NodeFilterConfig,
|
||||
) ([]string, error) {
|
||||
var step = time.Now()
|
||||
var rid = ctx.Value(requestid.ConfigDefault.ContextKey).(string)
|
||||
|
||||
filter := NodeFilterConfig{}
|
||||
if len(nodeFilter) > 0 {
|
||||
@@ -218,6 +252,8 @@ func (s *channelService) CreateChannel(
|
||||
err := q.Q.Transaction(func(tx *q.Query) error {
|
||||
|
||||
// 查找套餐
|
||||
step = time.Now()
|
||||
|
||||
var resource = new(ResourceInfo)
|
||||
data := q.Resource.As("data")
|
||||
pss := q.ResourcePss.As("pss")
|
||||
@@ -236,6 +272,8 @@ func (s *channelService) CreateChannel(
|
||||
return err
|
||||
}
|
||||
|
||||
slog.Debug("查找套餐", "rid", rid, "step", time.Since(step))
|
||||
|
||||
// 检查用户权限
|
||||
err = checkUser(auth, resource, count)
|
||||
if err != nil {
|
||||
@@ -243,12 +281,18 @@ func (s *channelService) CreateChannel(
|
||||
}
|
||||
|
||||
// 申请节点
|
||||
step = time.Now()
|
||||
|
||||
edgeAssigns, err := assignEdge(count, filter)
|
||||
if err != nil {
|
||||
return err
|
||||
}
|
||||
|
||||
slog.Debug("申请节点", "rid", rid, "total", time.Since(step))
|
||||
|
||||
// 分配端口
|
||||
step = time.Now()
|
||||
|
||||
now := time.Now()
|
||||
expiration := now.Add(time.Duration(resource.Live) * time.Second)
|
||||
_addr, channels, err := assignPort(edgeAssigns, auth.Payload.Id, protocol, authType, expiration, filter)
|
||||
@@ -257,7 +301,11 @@ func (s *channelService) CreateChannel(
|
||||
}
|
||||
addr = _addr
|
||||
|
||||
slog.Debug("分配端口", "rid", rid, "total", time.Since(step))
|
||||
|
||||
// 更新套餐使用记录
|
||||
step = time.Now()
|
||||
|
||||
_, err = q.ResourcePss.
|
||||
Where(q.ResourcePss.ResourceID.Eq(resourceId)).
|
||||
Select(
|
||||
@@ -274,12 +322,18 @@ func (s *channelService) CreateChannel(
|
||||
return err
|
||||
}
|
||||
|
||||
slog.Debug("更新套餐使用记录", "rid", rid, "step", time.Since(step))
|
||||
|
||||
// 缓存通道数据
|
||||
step = time.Now()
|
||||
|
||||
err = cache(ctx, channels)
|
||||
if err != nil {
|
||||
return err
|
||||
}
|
||||
|
||||
slog.Debug("缓存通道数据", "rid", rid, "step", time.Since(step))
|
||||
|
||||
return nil
|
||||
})
|
||||
if err != nil {
|
||||
@@ -327,6 +381,8 @@ func checkUser(auth *AuthContext, resource *ResourceInfo, count int) error {
|
||||
func assignEdge(count int, filter NodeFilterConfig) (*AssignEdgeResult, error) {
|
||||
|
||||
// 查询可以使用的网关
|
||||
var step = time.Now()
|
||||
|
||||
proxies, err := q.Proxy.
|
||||
Where(q.Proxy.Type.Eq(1)).
|
||||
Find()
|
||||
@@ -334,13 +390,21 @@ func assignEdge(count int, filter NodeFilterConfig) (*AssignEdgeResult, error) {
|
||||
return nil, err
|
||||
}
|
||||
|
||||
slog.Debug("查找网关", "step", time.Since(step))
|
||||
|
||||
// 查询已配置的节点
|
||||
step = time.Now()
|
||||
|
||||
rProxyConfigs, err := remote.Client.CloudAutoQuery()
|
||||
if err != nil {
|
||||
return nil, err
|
||||
}
|
||||
|
||||
slog.Debug("查询已配置节点 (remote)", "step", time.Since(step))
|
||||
|
||||
// 查询已使用的节点
|
||||
step = time.Now()
|
||||
|
||||
var proxyIds = make([]int32, len(proxies))
|
||||
for i, proxy := range proxies {
|
||||
proxyIds[i] = proxy.ID
|
||||
@@ -364,6 +428,8 @@ func assignEdge(count int, filter NodeFilterConfig) (*AssignEdgeResult, error) {
|
||||
proxyUses[channel.ProxyID]++
|
||||
}
|
||||
|
||||
slog.Debug("查找已使用节点", "step", time.Since(step))
|
||||
|
||||
// 组织数据
|
||||
var infos = make([]*ProxyInfo, len(proxies))
|
||||
for i, proxy := range proxies {
|
||||
@@ -397,6 +463,8 @@ func assignEdge(count int, filter NodeFilterConfig) (*AssignEdgeResult, error) {
|
||||
needed -= info.used
|
||||
|
||||
if env.DebugExternalChange && info.used > info.count {
|
||||
step = time.Now()
|
||||
|
||||
slog.Debug("新增新节点", "proxy", info.proxy.Name, "used", info.used, "count", info.count)
|
||||
err := remote.Client.CloudConnect(remote.CloudConnectReq{
|
||||
Uuid: info.proxy.Name,
|
||||
@@ -405,12 +473,14 @@ func assignEdge(count int, filter NodeFilterConfig) (*AssignEdgeResult, error) {
|
||||
Province: filter.Prov,
|
||||
City: filter.City,
|
||||
Isp: filter.Isp,
|
||||
Count: int(math.Ceil(float64(info.used) * 1.1)),
|
||||
Count: int(math.Ceil(float64(info.used) * 11 / 10)),
|
||||
}},
|
||||
})
|
||||
if err != nil {
|
||||
return nil, err
|
||||
}
|
||||
|
||||
slog.Debug("分配新增的节点", "step", time.Since(step))
|
||||
}
|
||||
|
||||
configs[i] = &ProxyConfig{
|
||||
@@ -450,36 +520,44 @@ func assignPort(
|
||||
expiration time.Time,
|
||||
filter NodeFilterConfig,
|
||||
) ([]string, []*models.Channel, error) {
|
||||
var assigns = proxies.configs
|
||||
var step = time.Now()
|
||||
|
||||
var configs = proxies.configs
|
||||
var exists = proxies.channels
|
||||
|
||||
// 查询代理已配置端口
|
||||
var proxyIds = make([]int32, 0, len(assigns))
|
||||
for _, assigned := range assigns {
|
||||
proxyIds = append(proxyIds, assigned.proxy.ID)
|
||||
}
|
||||
|
||||
// 端口查找表
|
||||
var proxyPorts = make(map[uint64]struct{})
|
||||
var portsMap = make(map[uint64]struct{})
|
||||
for _, channel := range exists {
|
||||
key := uint64(channel.ProxyID)<<32 | uint64(channel.ProxyPort)
|
||||
proxyPorts[key] = struct{}{}
|
||||
portsMap[key] = struct{}{}
|
||||
}
|
||||
|
||||
// 查找用户白名单
|
||||
var whitelist []string
|
||||
if authType == ChannelAuthTypeIp {
|
||||
err := q.Whitelist.
|
||||
Where(q.Whitelist.UserID.Eq(userId)).
|
||||
Select(q.Whitelist.Host).
|
||||
Scan(&whitelist)
|
||||
if err != nil {
|
||||
return nil, nil, err
|
||||
}
|
||||
}
|
||||
|
||||
// 配置启用代理
|
||||
var result []string
|
||||
var channels []*models.Channel
|
||||
for _, assign := range assigns {
|
||||
for _, config := range configs {
|
||||
var err error
|
||||
var proxy = assign.proxy
|
||||
var count = assign.count
|
||||
var proxy = config.proxy
|
||||
var count = config.count
|
||||
|
||||
// 筛选可用端口
|
||||
var configs = make([]remote.PortConfigsReq, 0, count)
|
||||
for port := 10000; port < 20000 && len(configs) < count; port++ {
|
||||
// 跳过存在的端口
|
||||
key := uint64(proxy.ID)<<32 | uint64(port)
|
||||
_, ok := proxyPorts[key]
|
||||
_, ok := portsMap[key]
|
||||
if ok {
|
||||
continue
|
||||
}
|
||||
@@ -500,14 +578,6 @@ func assignPort(
|
||||
|
||||
switch authType {
|
||||
case ChannelAuthTypeIp:
|
||||
var whitelist []string
|
||||
err := q.Whitelist.
|
||||
Where(q.Whitelist.UserID.Eq(userId)).
|
||||
Select(q.Whitelist.Host).
|
||||
Scan(&whitelist)
|
||||
if err != nil {
|
||||
return nil, nil, err
|
||||
}
|
||||
configs[i].Whitelist = &whitelist
|
||||
configs[i].Userpass = v.P("")
|
||||
for _, item := range whitelist {
|
||||
@@ -547,6 +617,8 @@ func assignPort(
|
||||
}
|
||||
|
||||
// 保存到数据库
|
||||
step = time.Now()
|
||||
|
||||
err = q.Channel.
|
||||
Omit(
|
||||
q.Channel.NodeID,
|
||||
@@ -560,8 +632,12 @@ func assignPort(
|
||||
return nil, nil, err
|
||||
}
|
||||
|
||||
slog.Debug("保存到数据库", "step", time.Since(step))
|
||||
|
||||
// 提交端口配置并更新节点列表
|
||||
if env.DebugExternalChange {
|
||||
step = time.Now()
|
||||
|
||||
var secret = strings.Split(proxy.Secret, ":")
|
||||
gateway := remote.InitGateway(
|
||||
proxy.Host,
|
||||
@@ -572,6 +648,8 @@ func assignPort(
|
||||
if err != nil {
|
||||
return nil, nil, err
|
||||
}
|
||||
|
||||
slog.Debug("提交端口配置", "step", time.Since(step))
|
||||
}
|
||||
}
|
||||
|
||||
|
||||
Reference in New Issue
Block a user