优化通道处理
This commit is contained in:
@@ -7,11 +7,11 @@ import (
|
||||
"log/slog"
|
||||
"math/rand/v2"
|
||||
"net/netip"
|
||||
"platform/pkg/env"
|
||||
"platform/pkg/u"
|
||||
"platform/web/core"
|
||||
e "platform/web/events"
|
||||
g "platform/web/globals"
|
||||
"platform/web/globals/orm"
|
||||
m "platform/web/models"
|
||||
q "platform/web/queries"
|
||||
"strconv"
|
||||
@@ -20,6 +20,7 @@ import (
|
||||
|
||||
"github.com/hibiken/asynq"
|
||||
"github.com/redis/go-redis/v9"
|
||||
"gorm.io/gen"
|
||||
"gorm.io/gen/field"
|
||||
)
|
||||
|
||||
@@ -28,26 +29,278 @@ var Channel = &channelServer{
|
||||
provider: &channelGostProvider{},
|
||||
}
|
||||
|
||||
type ChannelServiceProvider interface {
|
||||
CreateChannels(source netip.Addr, resourceNo string, authWhitelist bool, authPassword bool, count int, edgeFilter *EdgeFilter) ([]*m.Channel, error)
|
||||
RemoveChannels(batch string) error
|
||||
ClearExpiredChannels(proxyId int32) (int, error)
|
||||
type channelProvider interface {
|
||||
selectProxy(count int) (*m.Proxy, error)
|
||||
prepareCreate(ctx *channelCreateContext) (*channelCreateResult, error)
|
||||
removeRemote(batchNo string, batch *usedChanBatch) error
|
||||
}
|
||||
|
||||
type channelServer struct {
|
||||
provider ChannelServiceProvider
|
||||
provider channelProvider
|
||||
}
|
||||
|
||||
func (s *channelServer) CreateChannels(source netip.Addr, resourceNo string, authWhitelist bool, authPassword bool, count int, edgeFilter *EdgeFilter) ([]*m.Channel, error) {
|
||||
return s.provider.CreateChannels(source, resourceNo, authWhitelist, authPassword, count, edgeFilter)
|
||||
now := time.Now()
|
||||
batchNo := ID.GenReadable("bat")
|
||||
var channels []*m.Channel
|
||||
if edgeFilter == nil {
|
||||
edgeFilter = &EdgeFilter{}
|
||||
}
|
||||
|
||||
var whitelistText *string
|
||||
err := g.Redsync.WithLock(lockChannelCreateKey(resourceNo), func() error {
|
||||
resource, whitelists, err := ensure(now, source, resourceNo, authWhitelist, count)
|
||||
if err != nil {
|
||||
return err
|
||||
}
|
||||
|
||||
if authWhitelist {
|
||||
joined := strings.Join(whitelists, ",")
|
||||
whitelistText = &joined
|
||||
}
|
||||
|
||||
expire := now.Add(resource.Live)
|
||||
proxy, err := s.provider.selectProxy(count)
|
||||
if err != nil {
|
||||
return err
|
||||
}
|
||||
|
||||
ports, err := selectPorts(proxy.ID, batchNo, count, expire)
|
||||
if err != nil {
|
||||
return err
|
||||
}
|
||||
|
||||
createCtx := &channelCreateContext{
|
||||
Now: now,
|
||||
Source: source,
|
||||
Resource: resource,
|
||||
Proxy: proxy,
|
||||
BatchNo: batchNo,
|
||||
Ports: ports,
|
||||
Expire: expire,
|
||||
Count: count,
|
||||
Filter: edgeFilter,
|
||||
AuthWhitelist: authWhitelist,
|
||||
AuthPassword: authPassword,
|
||||
Whitelists: whitelists,
|
||||
WhitelistText: whitelistText,
|
||||
}
|
||||
|
||||
result, err := s.provider.prepareCreate(createCtx)
|
||||
if err != nil {
|
||||
return err
|
||||
}
|
||||
if result.applyRemote != nil {
|
||||
if err := result.applyRemote(); err != nil {
|
||||
return err
|
||||
}
|
||||
}
|
||||
if err := persistChannelCreate(createCtx, result.Channels); err != nil {
|
||||
return err
|
||||
}
|
||||
|
||||
channels = result.Channels
|
||||
return nil
|
||||
})
|
||||
if err != nil {
|
||||
return nil, err
|
||||
}
|
||||
|
||||
return channels, nil
|
||||
}
|
||||
|
||||
func (s *channelServer) RemoveChannels(batch string) error {
|
||||
return s.provider.RemoveChannels(batch)
|
||||
return g.Redsync.WithLock(lockChannelRemoveKey(batch), func() error {
|
||||
start := time.Now()
|
||||
|
||||
usedBatch, err := findUsedChanBatch(batch)
|
||||
if err != nil {
|
||||
return err
|
||||
}
|
||||
if usedBatch == nil {
|
||||
slog.Debug("通道为空,跳过清理", "batch", batch)
|
||||
return nil
|
||||
}
|
||||
|
||||
if err := s.provider.removeRemote(batch, usedBatch); err != nil {
|
||||
return err
|
||||
}
|
||||
if err := freeChans(usedBatch.ProxyID, batch); err != nil {
|
||||
return err
|
||||
}
|
||||
|
||||
slog.Debug("清除通道配置", "proxy", usedBatch.ProxyID, "batch", batch, "duration", time.Since(start).String())
|
||||
return nil
|
||||
})
|
||||
}
|
||||
|
||||
func (s *channelServer) ClearExpiredChannels(proxyId int32) (int, error) {
|
||||
return s.provider.ClearExpiredChannels(proxyId)
|
||||
batchSet, err := findExpiredChannelBatches(proxyId, time.Now())
|
||||
if err != nil {
|
||||
return 0, err
|
||||
}
|
||||
|
||||
slog.Info("批量清理过期通道", "count", len(batchSet))
|
||||
for batchNo := range batchSet {
|
||||
if err := s.RemoveChannels(batchNo); err != nil {
|
||||
slog.Error("清理过期通道失败", "batch", batchNo, "error", err)
|
||||
}
|
||||
}
|
||||
|
||||
return len(batchSet), nil
|
||||
}
|
||||
|
||||
type channelCreateContext struct {
|
||||
Now time.Time
|
||||
Source netip.Addr
|
||||
Resource *ResourceView
|
||||
Proxy *m.Proxy
|
||||
BatchNo string
|
||||
Ports []netip.AddrPort
|
||||
Expire time.Time
|
||||
Count int
|
||||
Filter *EdgeFilter
|
||||
AuthWhitelist bool
|
||||
AuthPassword bool
|
||||
Whitelists []string
|
||||
WhitelistText *string
|
||||
}
|
||||
|
||||
type channelCreateResult struct {
|
||||
Channels []*m.Channel
|
||||
applyRemote func() error
|
||||
}
|
||||
|
||||
func newBaseChannel(ctx *channelCreateContext, port uint16) *m.Channel {
|
||||
return &m.Channel{
|
||||
UserID: ctx.Resource.User.ID,
|
||||
ResourceID: ctx.Resource.ID,
|
||||
BatchNo: ctx.BatchNo,
|
||||
ProxyID: ctx.Proxy.ID,
|
||||
Host: u.Else(ctx.Proxy.Host, ctx.Proxy.IP.String()),
|
||||
Port: port,
|
||||
FilterISP: ctx.Filter.Isp,
|
||||
FilterProv: ctx.Filter.Prov,
|
||||
FilterCity: ctx.Filter.City,
|
||||
ExpiredAt: ctx.Expire,
|
||||
Proxy: ctx.Proxy,
|
||||
}
|
||||
}
|
||||
|
||||
func applyChannelAuth(ctx *channelCreateContext, channel *m.Channel) (username string, password string, ok bool) {
|
||||
if ctx.AuthWhitelist {
|
||||
channel.Whitelists = ctx.WhitelistText
|
||||
}
|
||||
if !ctx.AuthPassword {
|
||||
return "", "", false
|
||||
}
|
||||
|
||||
username, password = genPassPair()
|
||||
channel.Username = &username
|
||||
channel.Password = &password
|
||||
return username, password, true
|
||||
}
|
||||
|
||||
func persistChannelCreate(ctx *channelCreateContext, channels []*m.Channel) error {
|
||||
return q.Q.Transaction(func(tx *q.Query) error {
|
||||
var (
|
||||
result gen.ResultInfo
|
||||
err error
|
||||
)
|
||||
switch ctx.Resource.Type {
|
||||
case m.ResourceTypeShort:
|
||||
result, err = tx.ResourceShort.
|
||||
Where(
|
||||
tx.ResourceShort.ID.Eq(*ctx.Resource.ShortId),
|
||||
tx.ResourceShort.Used.Eq(ctx.Resource.Used),
|
||||
tx.ResourceShort.Daily.Eq(ctx.Resource.Daily),
|
||||
).
|
||||
UpdateSimple(
|
||||
tx.ResourceShort.Used.Add(int32(ctx.Count)),
|
||||
tx.ResourceShort.Daily.Value(int32(ctx.Resource.Today+ctx.Count)),
|
||||
tx.ResourceShort.LastAt.Value(ctx.Now),
|
||||
)
|
||||
case m.ResourceTypeLong:
|
||||
result, err = tx.ResourceLong.
|
||||
Where(
|
||||
tx.ResourceLong.ID.Eq(*ctx.Resource.LongId),
|
||||
tx.ResourceLong.Used.Eq(ctx.Resource.Used),
|
||||
tx.ResourceLong.Daily.Eq(ctx.Resource.Daily),
|
||||
).
|
||||
UpdateSimple(
|
||||
tx.ResourceLong.Used.Add(int32(ctx.Count)),
|
||||
tx.ResourceLong.Daily.Value(int32(ctx.Resource.Today+ctx.Count)),
|
||||
tx.ResourceLong.LastAt.Value(ctx.Now),
|
||||
)
|
||||
default:
|
||||
return core.NewBizErr("套餐类型不正确,无法更新")
|
||||
}
|
||||
if err != nil {
|
||||
return core.NewServErr("更新套餐使用记录失败", err)
|
||||
}
|
||||
if result.RowsAffected == 0 {
|
||||
return core.NewBizErr("套餐状态已过期")
|
||||
}
|
||||
|
||||
if err := tx.Channel.Omit(field.AssociationFields).Create(channels...); err != nil {
|
||||
return core.NewServErr("保存通道失败", err)
|
||||
}
|
||||
|
||||
if err := tx.LogsUserUsage.Create(&m.LogsUserUsage{
|
||||
UserID: ctx.Resource.User.ID,
|
||||
ResourceID: ctx.Resource.ID,
|
||||
BatchNo: ctx.BatchNo,
|
||||
Count: int32(ctx.Count),
|
||||
ISP: u.X(ctx.Filter.Isp.String()),
|
||||
Prov: ctx.Filter.Prov,
|
||||
City: ctx.Filter.City,
|
||||
IP: orm.Inet{Addr: ctx.Source},
|
||||
Time: ctx.Now,
|
||||
}); err != nil {
|
||||
return core.NewServErr("保存用户使用记录失败", err)
|
||||
}
|
||||
|
||||
return nil
|
||||
})
|
||||
}
|
||||
|
||||
func findExpiredChannelBatches(proxyId int32, now time.Time) (map[string]struct{}, error) {
|
||||
keys, err := g.Redis.Keys(context.Background(), usedChansKey(proxyId, "*")).Result()
|
||||
if err != nil {
|
||||
return nil, core.NewServErr("查询使用中通道失败", err)
|
||||
}
|
||||
if len(keys) == 0 {
|
||||
return map[string]struct{}{}, nil
|
||||
}
|
||||
|
||||
batchList := make([]string, len(keys))
|
||||
batchSet := make(map[string]struct{}, len(keys))
|
||||
for i, key := range keys {
|
||||
parsed, err := parseUsedChanKey(key)
|
||||
if err != nil {
|
||||
return nil, err
|
||||
}
|
||||
batchList[i] = parsed.BatchNo
|
||||
batchSet[parsed.BatchNo] = struct{}{}
|
||||
}
|
||||
|
||||
var batchQueried []struct{ BatchNo string }
|
||||
err = q.Channel.
|
||||
Select(q.Channel.BatchNo).
|
||||
Where(
|
||||
q.Channel.BatchNo.In(batchList...),
|
||||
q.Channel.ExpiredAt.Gte(now.UTC()),
|
||||
).
|
||||
Group(q.Channel.BatchNo).
|
||||
Scan(&batchQueried)
|
||||
if err != nil {
|
||||
return nil, core.NewServErr("查询过期通道失败", err)
|
||||
}
|
||||
for _, batch := range batchQueried {
|
||||
delete(batchSet, batch.BatchNo)
|
||||
}
|
||||
|
||||
return batchSet, nil
|
||||
}
|
||||
|
||||
func lockChannelCreateKey(resourceNo string) string {
|
||||
@@ -87,36 +340,26 @@ func selectProxyByType(proxyType m.ProxyType, count int) (*m.Proxy, error) {
|
||||
return nil, core.NewBizErr("无可用代理")
|
||||
}
|
||||
|
||||
proxyIDs := make([]int32, 0, len(proxies))
|
||||
proxyMap := make(map[int32]*m.Proxy, len(proxies))
|
||||
for _, item := range proxies {
|
||||
proxyIDs = append(proxyIDs, item.ID)
|
||||
proxyMap[item.ID] = item
|
||||
}
|
||||
|
||||
maxID := int32(0)
|
||||
var bestProxy *m.Proxy
|
||||
maxCount := -1
|
||||
for _, id := range proxyIDs {
|
||||
idCount, err := g.Redis.SCard(context.Background(), freeChansKey(id)).Result()
|
||||
for _, proxy := range proxies {
|
||||
idCount, err := g.Redis.SCard(context.Background(), freeChansKey(proxy.ID)).Result()
|
||||
if err != nil {
|
||||
return nil, core.NewServErr("查询可用通道数量失败", err)
|
||||
}
|
||||
if idCount > int64(maxCount) {
|
||||
maxCount = int(idCount)
|
||||
maxID = id
|
||||
bestProxy = proxy
|
||||
}
|
||||
}
|
||||
if maxCount < count {
|
||||
return nil, core.NewBizErr("无可用代理")
|
||||
}
|
||||
|
||||
return proxyMap[maxID], nil
|
||||
return bestProxy, nil
|
||||
}
|
||||
|
||||
func (s *channelServer) RefreshEdges() error {
|
||||
if env.RunMode != env.RunModeProd {
|
||||
return nil
|
||||
}
|
||||
|
||||
// 仅白银网关支持边缘节点刷新,GOST 不参与此流程。
|
||||
proxies, err := q.Proxy.Where(
|
||||
@@ -355,6 +598,11 @@ type usedChanBatch struct {
|
||||
Chans []netip.AddrPort
|
||||
}
|
||||
|
||||
type usedChanKey struct {
|
||||
ProxyID int32
|
||||
BatchNo string
|
||||
}
|
||||
|
||||
func findUsedChanBatch(batch string) (*usedChanBatch, error) {
|
||||
keys, err := g.Redis.Keys(context.Background(), "channel:used:*:"+batch).Result()
|
||||
if err != nil {
|
||||
@@ -393,7 +641,7 @@ func selectUsedChanBatchKey(batch string, keys []string) (string, bool, error) {
|
||||
}
|
||||
|
||||
func parseUsedChanBatch(key string, chans []string) (*usedChanBatch, error) {
|
||||
proxyID, err := parseUsedChansKey(key)
|
||||
parsed, err := parseUsedChanKey(key)
|
||||
if err != nil {
|
||||
return nil, err
|
||||
}
|
||||
@@ -408,23 +656,26 @@ func parseUsedChanBatch(key string, chans []string) (*usedChanBatch, error) {
|
||||
}
|
||||
|
||||
return &usedChanBatch{
|
||||
ProxyID: proxyID,
|
||||
ProxyID: parsed.ProxyID,
|
||||
Chans: addrs,
|
||||
}, nil
|
||||
}
|
||||
|
||||
func parseUsedChansKey(key string) (int32, error) {
|
||||
func parseUsedChanKey(key string) (*usedChanKey, error) {
|
||||
parts := strings.Split(key, ":")
|
||||
if len(parts) != 4 {
|
||||
return 0, core.NewServErr(fmt.Sprintf("使用中通道键格式错误: %s", key), nil)
|
||||
return nil, core.NewServErr(fmt.Sprintf("使用中通道键格式错误: %s", key), nil)
|
||||
}
|
||||
|
||||
proxyID, err := strconv.Atoi(parts[2])
|
||||
if err != nil {
|
||||
return 0, core.NewServErr(fmt.Sprintf("使用中通道键格式错误: %s", key), err)
|
||||
return nil, core.NewServErr(fmt.Sprintf("使用中通道键格式错误: %s", key), err)
|
||||
}
|
||||
|
||||
return int32(proxyID), nil
|
||||
return &usedChanKey{
|
||||
ProxyID: int32(proxyID),
|
||||
BatchNo: parts[3],
|
||||
}, nil
|
||||
}
|
||||
|
||||
// 扩容通道
|
||||
@@ -530,9 +781,7 @@ return 1
|
||||
// 错误信息
|
||||
var (
|
||||
ErrResourceNotExist = core.NewBizErr("套餐不存在")
|
||||
ErrResourceInvalid = core.NewBizErr("套餐不可用")
|
||||
ErrResourceExhausted = core.NewBizErr("套餐已用完")
|
||||
ErrResourceExpired = core.NewBizErr("套餐已过期")
|
||||
ErrResourceDailyLimit = core.NewBizErr("套餐每日配额已用完")
|
||||
ErrEdgesNoAvailable = core.NewBizErr("没有可用的节点")
|
||||
)
|
||||
|
||||
Reference in New Issue
Block a user