优化通道资源取用逻辑

This commit is contained in:
2025-12-05 16:52:40 +08:00
parent 2d053ddf49
commit 3f24fba1ae
4 changed files with 96 additions and 35 deletions

View File

@@ -1,20 +1,12 @@
## TODO ## TODO
重新梳理提取逻辑:
- 注册代理 & 注销代理
- 实现两个 set 池,分别保存可用端口和全部端口
- 添加代理时,同时将端口加入全部池和可用池
- 注销代理时,同时将端口从全部池和可用池中移除
- 调整通道分配策略,提供一个 all set 和一个 free set提取时取交集再取出归还时取交集再归还。
- redis channel lease 加一个 zset定时处理没有成功释放的端口
### 低优先
trade/create 性能问题,缩短事务时间,考虑其他方式实现可靠分布式事务 trade/create 性能问题,缩短事务时间,考虑其他方式实现可靠分布式事务
需要确认以下 ID.GenSerial 的分布式并发安全性 需要确认以下 ID.GenSerial 的分布式并发安全性
jsonb 类型转换问题 jsonb 类型转换问题,考虑一个高效的 any 到 struct 转换工具
端口资源池的 gc 实现
标准化生产环境 cors 配置 标准化生产环境 cors 配置

View File

@@ -29,6 +29,7 @@ const (
ChannelAuthTypePass ChannelAuthTypePass
) )
// 生成用户名和密码对
func genPassPair() (string, string) { func genPassPair() (string, string) {
var alphabet = []rune("abcdefghjkmnpqrstuvwxyz") var alphabet = []rune("abcdefghjkmnpqrstuvwxyz")
var numbers = []rune("23456789") var numbers = []rune("23456789")
@@ -47,7 +48,8 @@ func genPassPair() (string, string) {
return string(username), string(password) return string(username), string(password)
} }
func findResource(q *q.Query, resourceId int32, count int, now time.Time) (*ResourceView, error) { // 查找资源
func findResource(resourceId int32) (*ResourceView, error) {
resource, err := q.Resource. resource, err := q.Resource.
Preload(field.Associations). Preload(field.Associations).
Where( Where(
@@ -133,13 +135,21 @@ type ResourceView struct {
User m.User User m.User
} }
var (
allChansKey = "channel:all"
freeChansKey = "channel:free"
usedChansKey = "channel:used"
)
// 取用通道
func lockChans(batch string, count int, expire time.Time) ([]netip.AddrPort, error) { func lockChans(batch string, count int, expire time.Time) ([]netip.AddrPort, error) {
chans, err := g.Redis.Eval( chans, err := g.Redis.Eval(
context.Background(), context.Background(),
RedisScriptLockChans, RedisScriptLockChans,
[]string{ []string{
"channel:chans", freeChansKey,
"channel:lease:" + batch, usedChansKey,
usedChansKey + ":" + batch,
}, },
count, count,
expire.Unix(), expire.Unix(),
@@ -161,25 +171,24 @@ func lockChans(batch string, count int, expire time.Time) ([]netip.AddrPort, err
} }
var RedisScriptLockChans = ` var RedisScriptLockChans = `
local chans_key = KEYS[1] local free_key = KEYS[1]
local lease_key = KEYS[2] local used_key = KEYS[2]
local batch_key = KEYS[3]
local count = tonumber(ARGV[1]) local count = tonumber(ARGV[1])
local expire = tonumber(ARGV[2]) local expire = tonumber(ARGV[2])
if redis.call("SCARD", chans_key) < count then if redis.call("SCARD", free_key) < count then
return nil return nil
end end
local ports = redis.call("SPOP", chans_key, count) local ports = redis.call("SPOP", free_key, count)
redis.call("ZADD", used_key, expire, batch_key)
redis.call("SET", lease_key, cjson.encode({ redis.call("RPUSH", batch_key, unpack(ports))
p = ports,
e = expire
}))
return ports return ports
` `
// 归还通道
func freeChans(batch string, chans []string) error { func freeChans(batch string, chans []string) error {
values := make([]any, len(chans)) values := make([]any, len(chans))
for i, ch := range chans { for i, ch := range chans {
@@ -190,8 +199,10 @@ func freeChans(batch string, chans []string) error {
context.Background(), context.Background(),
RedisScriptFreeChans, RedisScriptFreeChans,
[]string{ []string{
"channel:chans", freeChansKey,
"channel:lease:" + batch, usedChansKey,
usedChansKey + ":" + batch,
allChansKey,
}, },
values..., values...,
).Err() ).Err()
@@ -203,30 +214,88 @@ func freeChans(batch string, chans []string) error {
} }
var RedisScriptFreeChans = ` var RedisScriptFreeChans = `
local chans_key = KEYS[1] local free_key = KEYS[1]
local lease_key = KEYS[2] local used_key = KEYS[2]
local batch_key = KEYS[3]
local all_key = KEYS[4]
local chans = ARGV local chans = ARGV
redis.call("SADD", chans_key, unpack(chans)) local count = 0
redis.call("DEL", lease_key) for i, chan in ipairs(chans) do
if redis.call("SISMEMBER", all_key, chan) == 1 then
redis.call("SADD", free_key, chan)
count = count + 1
end
end
redis.call("ZREM", used_key, batch_key)
redis.call("DEL", batch_key)
return chans return count
` `
func registerChans(chans []netip.AddrPort) error { // 扩容通道
func addChans(chans []netip.AddrPort) error {
strs := make([]string, len(chans)) strs := make([]string, len(chans))
for i, ch := range chans { for i, ch := range chans {
strs[i] = ch.String() strs[i] = ch.String()
} }
err := g.Redis.SAdd(context.Background(), "channel:chans", strs).Err() err := g.Redis.Eval(
context.Background(),
RedisScriptAddChans,
[]string{
freeChansKey,
allChansKey,
},
strs,
).Err()
if err != nil { if err != nil {
return core.NewBizErr("注册通道失败", err) return core.NewBizErr("扩容通道失败", err)
} }
return nil return nil
} }
var RedisScriptAddChans = `
local free_key = KEYS[1]
local all_key = KEYS[2]
local chans = ARGV
redis.call("SADD", free_key, unpack(chans))
redis.call("SADD", all_key, unpack(chans))
return 1
`
// 缩容通道
func removeChans(chans []string) error {
err := g.Redis.Eval(
context.Background(),
RedisScriptRemoveChans,
[]string{
freeChansKey,
allChansKey,
},
chans,
).Err()
if err != nil {
return core.NewBizErr("缩容通道失败", err)
}
return nil
}
var RedisScriptRemoveChans = `
local free_key = KEYS[1]
local all_key = KEYS[2]
local chans = ARGV
redis.call("SREM", free_key, unpack(chans))
redis.call("SREM", all_key, unpack(chans))
return 1
`
// 错误信息 // 错误信息
var ( var (
ErrResourceNotExist = core.NewBizErr("套餐不存在") ErrResourceNotExist = core.NewBizErr("套餐不存在")

View File

@@ -37,7 +37,7 @@ func (s *channelBaiyinService) CreateChannels(source netip.Addr, resourceId int3
batch := ID.GenReadable("bat") batch := ID.GenReadable("bat")
// 获取用户套餐 // 获取用户套餐
resource, err := findResource(q.Q, resourceId, count, now) resource, err := findResource(resourceId)
if err != nil { if err != nil {
return nil, err return nil, err
} }

View File

@@ -38,7 +38,7 @@ func (s *proxyService) RegisterBaiyin(Mac string, IP netip.Addr, username, passw
for i := range 10000 { for i := range 10000 {
chans[i] = netip.AddrPortFrom(IP, uint16(i+10000)) chans[i] = netip.AddrPortFrom(IP, uint16(i+10000))
} }
err := registerChans(chans) err := addChans(chans)
if err != nil { if err != nil {
return core.NewServErr("添加通道失败") return core.NewServErr("添加通道失败")
} }