diff --git a/README.md b/README.md index ead0490..5a8b2c6 100644 --- a/README.md +++ b/README.md @@ -1,20 +1,12 @@ ## TODO -重新梳理提取逻辑: -- 注册代理 & 注销代理 - - 实现两个 set 池,分别保存可用端口和全部端口 - - 添加代理时,同时将端口加入全部池和可用池 - - 注销代理时,同时将端口从全部池和可用池中移除 -- 调整通道分配策略,提供一个 all set 和一个 free set,提取时取交集再取出,归还时取交集再归还。 -- redis channel lease 加一个 zset,定时处理没有成功释放的端口 - -### 低优先 - trade/create 性能问题,缩短事务时间,考虑其他方式实现可靠分布式事务 需要确认以下 ID.GenSerial 的分布式并发安全性 -jsonb 类型转换问题 +jsonb 类型转换问题,考虑一个高效的 any 到 struct 转换工具 + +端口资源池的 gc 实现 标准化生产环境 cors 配置 diff --git a/web/services/channel.go b/web/services/channel.go index 99f921e..e0c9bae 100644 --- a/web/services/channel.go +++ b/web/services/channel.go @@ -29,6 +29,7 @@ const ( ChannelAuthTypePass ) +// 生成用户名和密码对 func genPassPair() (string, string) { var alphabet = []rune("abcdefghjkmnpqrstuvwxyz") var numbers = []rune("23456789") @@ -47,7 +48,8 @@ func genPassPair() (string, string) { return string(username), string(password) } -func findResource(q *q.Query, resourceId int32, count int, now time.Time) (*ResourceView, error) { +// 查找资源 +func findResource(resourceId int32) (*ResourceView, error) { resource, err := q.Resource. Preload(field.Associations). Where( @@ -133,13 +135,21 @@ type ResourceView struct { User m.User } +var ( + allChansKey = "channel:all" + freeChansKey = "channel:free" + usedChansKey = "channel:used" +) + +// 取用通道 func lockChans(batch string, count int, expire time.Time) ([]netip.AddrPort, error) { chans, err := g.Redis.Eval( context.Background(), RedisScriptLockChans, []string{ - "channel:chans", - "channel:lease:" + batch, + freeChansKey, + usedChansKey, + usedChansKey + ":" + batch, }, count, expire.Unix(), @@ -161,25 +171,24 @@ func lockChans(batch string, count int, expire time.Time) ([]netip.AddrPort, err } var RedisScriptLockChans = ` -local chans_key = KEYS[1] -local lease_key = KEYS[2] +local free_key = KEYS[1] +local used_key = KEYS[2] +local batch_key = KEYS[3] local count = tonumber(ARGV[1]) local expire = tonumber(ARGV[2]) -if redis.call("SCARD", chans_key) < count then +if redis.call("SCARD", free_key) < count then return nil end -local ports = redis.call("SPOP", chans_key, count) - -redis.call("SET", lease_key, cjson.encode({ - p = ports, - e = expire -})) +local ports = redis.call("SPOP", free_key, count) +redis.call("ZADD", used_key, expire, batch_key) +redis.call("RPUSH", batch_key, unpack(ports)) return ports ` +// 归还通道 func freeChans(batch string, chans []string) error { values := make([]any, len(chans)) for i, ch := range chans { @@ -190,8 +199,10 @@ func freeChans(batch string, chans []string) error { context.Background(), RedisScriptFreeChans, []string{ - "channel:chans", - "channel:lease:" + batch, + freeChansKey, + usedChansKey, + usedChansKey + ":" + batch, + allChansKey, }, values..., ).Err() @@ -203,30 +214,88 @@ func freeChans(batch string, chans []string) error { } var RedisScriptFreeChans = ` -local chans_key = KEYS[1] -local lease_key = KEYS[2] +local free_key = KEYS[1] +local used_key = KEYS[2] +local batch_key = KEYS[3] +local all_key = KEYS[4] local chans = ARGV -redis.call("SADD", chans_key, unpack(chans)) -redis.call("DEL", lease_key) +local count = 0 +for i, chan in ipairs(chans) do + if redis.call("SISMEMBER", all_key, chan) == 1 then + redis.call("SADD", free_key, chan) + count = count + 1 + end +end +redis.call("ZREM", used_key, batch_key) +redis.call("DEL", batch_key) -return chans +return count ` -func registerChans(chans []netip.AddrPort) error { +// 扩容通道 +func addChans(chans []netip.AddrPort) error { strs := make([]string, len(chans)) for i, ch := range chans { strs[i] = ch.String() } - err := g.Redis.SAdd(context.Background(), "channel:chans", strs).Err() + err := g.Redis.Eval( + context.Background(), + RedisScriptAddChans, + []string{ + freeChansKey, + allChansKey, + }, + strs, + ).Err() if err != nil { - return core.NewBizErr("注册通道失败", err) + return core.NewBizErr("扩容通道失败", err) } return nil } +var RedisScriptAddChans = ` +local free_key = KEYS[1] +local all_key = KEYS[2] +local chans = ARGV + +redis.call("SADD", free_key, unpack(chans)) +redis.call("SADD", all_key, unpack(chans)) + +return 1 +` + +// 缩容通道 +func removeChans(chans []string) error { + err := g.Redis.Eval( + context.Background(), + RedisScriptRemoveChans, + []string{ + freeChansKey, + allChansKey, + }, + chans, + ).Err() + if err != nil { + return core.NewBizErr("缩容通道失败", err) + } + + return nil +} + +var RedisScriptRemoveChans = ` +local free_key = KEYS[1] +local all_key = KEYS[2] +local chans = ARGV + +redis.call("SREM", free_key, unpack(chans)) +redis.call("SREM", all_key, unpack(chans)) + +return 1 +` + // 错误信息 var ( ErrResourceNotExist = core.NewBizErr("套餐不存在") diff --git a/web/services/channel_baiyin.go b/web/services/channel_baiyin.go index ad1e02b..81b3b1c 100644 --- a/web/services/channel_baiyin.go +++ b/web/services/channel_baiyin.go @@ -37,7 +37,7 @@ func (s *channelBaiyinService) CreateChannels(source netip.Addr, resourceId int3 batch := ID.GenReadable("bat") // 获取用户套餐 - resource, err := findResource(q.Q, resourceId, count, now) + resource, err := findResource(resourceId) if err != nil { return nil, err } diff --git a/web/services/proxy.go b/web/services/proxy.go index d51b247..a8db972 100644 --- a/web/services/proxy.go +++ b/web/services/proxy.go @@ -38,7 +38,7 @@ func (s *proxyService) RegisterBaiyin(Mac string, IP netip.Addr, username, passw for i := range 10000 { chans[i] = netip.AddrPortFrom(IP, uint16(i+10000)) } - err := registerChans(chans) + err := addChans(chans) if err != nil { return core.NewServErr("添加通道失败") }