重构白银节点分配方式,使用手动接口精确配置节点

This commit is contained in:
2025-12-08 14:22:30 +08:00
parent 9e237be21e
commit 983dbb4564
25 changed files with 651 additions and 630 deletions

View File

@@ -2,25 +2,42 @@ package services
import (
"context"
"fmt"
"math/rand/v2"
"net/netip"
"platform/web/core"
g "platform/web/globals"
m "platform/web/models"
q "platform/web/queries"
"strconv"
"time"
"github.com/redis/go-redis/v9"
"gorm.io/gen/field"
)
var Channel ChannelService = &channelBaiyinService{}
// 通道服务
type ChannelService interface {
var Channel = &channelServer{
provider: &channelBaiyinService{},
}
type ChanProviderAdapter interface {
CreateChannels(source netip.Addr, resourceId int32, authWhitelist bool, authPassword bool, count int, edgeFilter ...EdgeFilter) ([]*m.Channel, error)
RemoveChannels(batch string) error
}
type channelServer struct {
provider ChanProviderAdapter
}
func (s *channelServer) CreateChannels(source netip.Addr, resourceId int32, authWhitelist bool, authPassword bool, count int, edgeFilter ...EdgeFilter) ([]*m.Channel, error) {
return s.provider.CreateChannels(source, resourceId, authWhitelist, authPassword, count, edgeFilter...)
}
func (s *channelServer) RemoveChannels(batch string) error {
return s.provider.RemoveChannels(batch)
}
// 授权方式
type ChannelAuthType int
@@ -60,12 +77,14 @@ func findResource(resourceId int32) (*ResourceView, error) {
if err != nil {
return nil, ErrResourceNotExist
}
if resource.User == nil {
return nil, ErrResourceNotExist
}
var info = &ResourceView{
Id: resource.ID,
Active: resource.Active,
Type: resource.Type,
User: resource.User,
User: *resource.User,
}
switch resource.Type {
@@ -135,34 +154,127 @@ type ResourceView struct {
User m.User
}
// 检查用户是否可提取
func ensure(now time.Time, source netip.Addr, resourceId int32, count int) (*ResourceView, []string, error) {
if count > 400 {
return nil, nil, core.NewBizErr("单次最多提取 400 个")
}
// 获取用户套餐
resource, err := findResource(resourceId)
if err != nil {
return nil, nil, err
}
// 检查用户
user := resource.User
if user.IDToken == nil || *user.IDToken == "" {
return nil, nil, core.NewBizErr("账号未实名")
}
// 获取用户白名单并检查用户 ip 地址
whitelists, err := q.Whitelist.Where(
q.Whitelist.UserID.Eq(user.ID),
).Find()
if err != nil {
return nil, nil, err
}
ips := make([]string, len(whitelists))
pass := false
for i, item := range whitelists {
ips[i] = item.IP.String()
if item.IP.Addr == source {
pass = true
}
}
if !pass {
return nil, nil, core.NewBizErr(fmt.Sprintf("IP 地址 %s 不在白名单内", source.String()))
}
// 检查套餐使用情况
switch resource.Mode {
default:
return nil, nil, core.NewBizErr("不支持的套餐模式")
// 包时
case m.ResourceModeTime:
// 检查过期时间
if resource.Expire.Before(now) {
return nil, nil, ErrResourceExpired
}
// 检查每日限额
used := 0
if now.Format("2006-01-02") == resource.DailyLast.Format("2006-01-02") {
used = int(resource.DailyUsed)
}
excess := used+count > int(resource.DailyLimit)
if excess {
return nil, nil, ErrResourceDailyLimit
}
// 包量
case m.ResourceModeQuota:
// 检查可用配额
if int(resource.Quota)-int(resource.Used) < count {
return nil, nil, ErrResourceExhausted
}
}
return resource, ips, nil
}
var (
allChansKey = "channel:all"
freeChansKey = "channel:free"
usedChansKey = "channel:used"
)
// 扩容通道
func regChans(proxy int32, chans []netip.AddrPort) error {
strs := make([]any, len(chans))
for i, ch := range chans {
strs[i] = ch.String()
}
key := freeChansKey + ":" + strconv.Itoa(int(proxy))
err := g.Redis.SAdd(context.Background(), key, strs...).Err()
if err != nil {
return fmt.Errorf("扩容通道失败: %w", err)
}
return nil
}
// 缩容通道
func remChans(proxy int32) error {
key := freeChansKey + ":" + strconv.Itoa(int(proxy))
err := g.Redis.SRem(context.Background(), key).Err()
if err != nil {
return fmt.Errorf("缩容通道失败: %w", err)
}
return nil
}
// 取用通道
func lockChans(batch string, count int, expire time.Time) ([]netip.AddrPort, error) {
chans, err := g.Redis.Eval(
func lockChans(proxy int32, batch string, count int) ([]netip.AddrPort, error) {
pid := strconv.Itoa(int(proxy))
chans, err := RedisScriptLockChans.Run(
context.Background(),
RedisScriptLockChans,
g.Redis,
[]string{
freeChansKey,
usedChansKey,
usedChansKey + ":" + batch,
freeChansKey + ":" + pid,
usedChansKey + ":" + pid + ":" + batch,
},
count,
expire.Unix(),
).StringSlice()
if err != nil {
return nil, core.NewBizErr("获取通道失败", err)
return nil, fmt.Errorf("获取通道失败: %w", err)
}
addrs := make([]netip.AddrPort, len(chans))
for i, ch := range chans {
addr, err := netip.ParseAddrPort(ch)
if err != nil {
return nil, core.NewServErr("解析通道数据失败", err)
return nil, fmt.Errorf("解析通道数据失败: %w", err)
}
addrs[i] = addr
}
@@ -170,41 +282,31 @@ func lockChans(batch string, count int, expire time.Time) ([]netip.AddrPort, err
return addrs, nil
}
var RedisScriptLockChans = `
var RedisScriptLockChans = redis.NewScript(`
local free_key = KEYS[1]
local used_key = KEYS[2]
local batch_key = KEYS[3]
local batch_key = KEYS[2]
local count = tonumber(ARGV[1])
local expire = tonumber(ARGV[2])
if redis.call("SCARD", free_key) < count then
return nil
end
local ports = redis.call("SPOP", free_key, count)
redis.call("ZADD", used_key, expire, batch_key)
redis.call("RPUSH", batch_key, unpack(ports))
return ports
`
`)
// 归还通道
func freeChans(batch string, chans []string) error {
values := make([]any, len(chans))
for i, ch := range chans {
values[i] = ch
}
err := g.Redis.Eval(
func freeChans(proxy int32, batch string) error {
pid := strconv.Itoa(int(proxy))
err := RedisScriptFreeChans.Run(
context.Background(),
RedisScriptFreeChans,
g.Redis,
[]string{
freeChansKey,
usedChansKey,
usedChansKey + ":" + batch,
allChansKey,
freeChansKey + ":" + pid,
usedChansKey + ":" + pid + ":" + batch,
},
values...,
).Err()
if err != nil {
return core.NewBizErr("释放通道失败", err)
@@ -213,92 +315,19 @@ func freeChans(batch string, chans []string) error {
return nil
}
var RedisScriptFreeChans = `
var RedisScriptFreeChans = redis.NewScript(`
local free_key = KEYS[1]
local used_key = KEYS[2]
local batch_key = KEYS[3]
local all_key = KEYS[4]
local chans = ARGV
local batch_key = KEYS[2]
local count = 0
for i, chan in ipairs(chans) do
if redis.call("SISMEMBER", all_key, chan) == 1 then
redis.call("SADD", free_key, chan)
count = count + 1
end
end
redis.call("ZREM", used_key, batch_key)
local chans = redis.call("LRANGE", batch_key, 0, -1)
redis.call("DEL", batch_key)
return count
`
// 扩容通道
func addChans(chans []netip.AddrPort) error {
strs := make([]string, len(chans))
for i, ch := range chans {
strs[i] = ch.String()
}
err := g.Redis.Eval(
context.Background(),
RedisScriptAddChans,
[]string{
freeChansKey,
allChansKey,
},
strs,
).Err()
if err != nil {
return core.NewBizErr("扩容通道失败", err)
}
return nil
}
var RedisScriptAddChans = `
local free_key = KEYS[1]
local all_key = KEYS[2]
local chans = ARGV
local batch_size = 5000
for i = 1, #chans, batch_size do
local end_index = math.min(i + batch_size - 1, #chans)
redis.call("SADD", free_key, unpack(chans, i, end_index))
redis.call("SADD", all_key, unpack(chans, i, end_index))
if redis.call("EXISTS", free_key) == 1 then
redis.call("SADD", free_key, unpack(chans))
end
return 1
`
// 缩容通道
func removeChans(chans []string) error {
err := g.Redis.Eval(
context.Background(),
RedisScriptRemoveChans,
[]string{
freeChansKey,
allChansKey,
},
chans,
).Err()
if err != nil {
return core.NewBizErr("缩容通道失败", err)
}
return nil
}
var RedisScriptRemoveChans = `
local free_key = KEYS[1]
local all_key = KEYS[2]
local chans = ARGV
redis.call("SREM", free_key, unpack(chans))
redis.call("SREM", all_key, unpack(chans))
return 1
`
`)
// 错误信息
var (

View File

@@ -1,7 +1,6 @@
package services
import (
"database/sql/driver"
"encoding/json"
"fmt"
"log/slog"
@@ -24,10 +23,6 @@ import (
type channelBaiyinService struct{}
func (s *channelBaiyinService) CreateChannels(source netip.Addr, resourceId int32, authWhitelist bool, authPassword bool, count int, edgeFilter ...EdgeFilter) ([]*m.Channel, error) {
if count > 400 {
return nil, core.NewBizErr("单次最多提取 400 个")
}
var filter *EdgeFilter = nil
if len(edgeFilter) > 0 {
filter = &edgeFilter[0]
@@ -36,148 +31,114 @@ func (s *channelBaiyinService) CreateChannels(source netip.Addr, resourceId int3
now := time.Now()
batch := ID.GenReadable("bat")
// 获取用户套餐
resource, err := findResource(resourceId)
// 检查并获取套餐与白名单
resource, whitelists, err := ensure(now, source, resourceId, count)
if err != nil {
return nil, err
}
// 检查用户
user := resource.User
if user.IDToken == nil || *user.IDToken == "" {
return nil, core.NewBizErr("账号未实名")
}
// 获取用户白名单并检查用户 ip 地址
whitelists, err := q.Whitelist.Where(
q.Whitelist.UserID.Eq(user.ID),
).Find()
if err != nil {
return nil, err
}
whitelistIPs := make([]string, len(whitelists))
pass := false
for i, item := range whitelists {
whitelistIPs[i] = item.IP.String()
if item.IP.Addr == source {
pass = true
}
}
if !pass {
return nil, core.NewBizErr(fmt.Sprintf("IP 地址 %s 不在白名单内", source.String()))
}
// 检查套餐使用情况
switch resource.Mode {
default:
return nil, core.NewBizErr("不支持的套餐模式")
// 包时
case m.ResourceModeTime:
// 检查过期时间
if resource.Expire.Before(now) {
return nil, ErrResourceExpired
}
// 检查每日限额
used := 0
if now.Format("2006-01-02") == resource.DailyLast.Format("2006-01-02") {
used = int(resource.DailyUsed)
}
excess := used+count > int(resource.DailyLimit)
if excess {
return nil, ErrResourceDailyLimit
}
// 包量
case m.ResourceModeQuota:
// 检查可用配额
if int(resource.Quota)-int(resource.Used) < count {
return nil, ErrResourceExhausted
}
}
expire := now.Add(resource.Live)
// 选择代理
proxyResult := struct {
m.Proxy
Count int
}{}
err = q.Proxy.
LeftJoin(q.Channel, q.Channel.ProxyID.EqCol(q.Proxy.ID), q.Channel.ExpiredAt.Gt(now)).
Select(q.Proxy.ALL, field.NewUnsafeFieldRaw("10000 - count(*)").As("count")).
Where(
q.Proxy.Type.Eq(int(m.ProxyTypeBaiYin)),
q.Proxy.Status.Eq(int(m.ProxyStatusOnline)),
).
Group(q.Proxy.ID).
Order(field.NewField("", "count")).
Limit(1).Scan(&proxyResult)
if err != nil {
return nil, core.NewBizErr("获取可用代理失败", err)
}
if proxyResult.Count < count {
return nil, core.NewBizErr("无可用主机,请稍后再试")
}
proxy := proxyResult.Proxy
// 获取可用通道
chans, err := lockChans(batch, count, expire)
chans, err := lockChans(proxy.ID, batch, count)
if err != nil {
return nil, err
return nil, core.NewBizErr("无可用通道,请稍后再试", err)
}
// 获取对应代理
ips := make([]driver.Valuer, 0)
findProxy := make(map[orm.Inet]*m.Proxy)
for _, ch := range chans {
ip := orm.Inet{Addr: ch.Addr()}
if _, ok := findProxy[ip]; !ok {
ips = append(ips, ip)
findProxy[ip] = nil
}
}
proxies, err := q.Proxy.Where(
q.Proxy.Type.Eq(int(m.ProxyTypeBaiYin)),
q.Proxy.Status.Eq(int(m.ProxyStatusOnline)),
q.Proxy.IP.In(ips...),
).Find()
// 获取可用节点
edgesResp, err := g.Cloud.CloudEdges(&g.CloudEdgesReq{
Province: filter.Prov,
City: filter.City,
Isp: u.X(filter.Isp.String()),
Limit: &count,
NoRepeat: u.P(true),
NoDayRepeat: u.P(true),
ActiveTime: u.P(3600),
IpUnchangedTime: u.P(3600),
Sort: u.P("ip_unchanged_time_asc"),
})
if err != nil {
return nil, core.NewBizErr("获取代理失败", err)
return nil, core.NewBizErr("获取可用节点失败", err)
}
groups := make(map[*m.Proxy][]*m.Channel)
for _, proxy := range proxies {
findProxy[proxy.IP] = proxy
groups[proxy] = make([]*m.Channel, 0)
if edgesResp.Total != count && len(edgesResp.Edges) != count {
return nil, core.NewBizErr("无可用节点,请稍后再试")
}
edges := edgesResp.Edges
// 准备通道数据
actions := make([]*m.LogsUserUsage, len(chans))
channels := make([]*m.Channel, len(chans))
for i, ch := range chans {
channels := make([]*m.Channel, count)
chanConfigs := make([]*g.PortConfigsReq, count)
edgeConfigs := make([]string, count)
for i := range count {
ch := chans[i]
edge := edges[i]
if err != nil {
return nil, core.NewBizErr("解析通道地址失败", err)
}
// 使用记录
actions[i] = &m.LogsUserUsage{
UserID: user.ID,
ResourceID: resourceId,
BatchNo: batch,
Count: int32(count),
ISP: u.P(filter.Isp.String()),
Prov: filter.Prov,
City: filter.City,
IP: orm.Inet{Addr: source},
Time: now,
}
// 通道数据
inet := orm.Inet{Addr: ch.Addr()}
channels[i] = &m.Channel{
UserID: user.ID,
ResourceID: resourceId,
BatchNo: batch,
ProxyID: findProxy[inet].ID,
ProxyID: proxy.ID,
Host: u.Else(proxy.Host, proxy.IP.String()),
Port: ch.Port(),
EdgeRef: u.P(edge.EdgeID),
FilterISP: filter.Isp,
FilterProv: filter.Prov,
FilterCity: filter.City,
ExpiredAt: expire,
Proxy: *findProxy[inet],
}
// 通道配置数据
chanConfigs[i] = &g.PortConfigsReq{
Port: int(ch.Port()),
Status: true,
Edge: &[]string{edge.EdgeID},
}
// 白名单模式
if authWhitelist {
channels[i].Whitelists = u.P(strings.Join(whitelistIPs, ","))
channels[i].Whitelists = u.P(strings.Join(whitelists, ","))
chanConfigs[i].Whitelist = &whitelists
}
// 密码模式
if authPassword {
username, password := genPassPair()
channels[i].Username = &username
channels[i].Password = &password
chanConfigs[i].Userpass = u.P(username + ":" + password)
}
// 关联代理
proxy := findProxy[inet]
groups[proxy] = append(groups[proxy], channels[i])
// 连接配置数据
edgeConfigs[i] = edge.EdgeID
}
// 提交异步任务关闭通道
@@ -230,7 +191,7 @@ func (s *channelBaiyinService) CreateChannels(source netip.Addr, resourceId int3
return core.NewServErr("更新套餐使用记录失败", err)
}
// 保存通道和分配记录
// 保存通道
err = q.Channel.
Omit(field.AssociationFields).
Create(channels...)
@@ -238,7 +199,18 @@ func (s *channelBaiyinService) CreateChannels(source netip.Addr, resourceId int3
return core.NewServErr("保存通道失败", err)
}
err = q.LogsUserUsage.Create(actions...)
// 保存提取记录
err = q.LogsUserUsage.Create(&m.LogsUserUsage{
UserID: user.ID,
ResourceID: resourceId,
BatchNo: batch,
Count: int32(count),
ISP: u.P(filter.Isp.String()),
Prov: filter.Prov,
City: filter.City,
IP: orm.Inet{Addr: source},
Time: now,
})
if err != nil {
return core.NewServErr("保存用户使用记录失败", err)
}
@@ -250,37 +222,29 @@ func (s *channelBaiyinService) CreateChannels(source netip.Addr, resourceId int3
}
// 提交配置
for proxy, chanels := range groups {
secret := strings.Split(u.Z(proxy.Secret), ":")
gateway := g.NewGateway(proxy.IP.String(), secret[0], secret[1])
secret := strings.Split(u.Z(proxy.Secret), ":")
gateway := g.NewGateway(proxy.IP.String(), secret[0], secret[1])
if env.DebugExternalChange {
configs := make([]g.PortConfigsReq, len(chanels))
for i, channel := range chanels {
configs[i] = g.PortConfigsReq{
Port: int(channel.Port),
Status: true,
AutoEdgeConfig: &g.AutoEdgeConfig{
Isp: channel.FilterISP.String(),
Province: u.Z(channel.FilterProv),
City: u.Z(channel.FilterCity),
},
}
if authWhitelist {
configs[i].Whitelist = &whitelistIPs
}
if authPassword {
configs[i].Userpass = u.P(fmt.Sprintf("%s:%s", *channel.Username, *channel.Password))
}
// 连接节点到网关
err = g.Cloud.CloudConnect(&g.CloudConnectReq{
Uuid: proxy.Mac,
Edge: &edgeConfigs,
})
if err != nil {
return nil, core.NewServErr("连接云平台失败", err)
}
if env.DebugExternalChange {
err := gateway.GatewayPortConfigs(configs)
if err != nil {
return nil, core.NewServErr(fmt.Sprintf("配置代理 %s 端口失败", proxy.IP.String()), err)
}
} else {
bytes, _ := json.Marshal(configs)
slog.Debug("提交代理端口配置", "proxy", proxy.IP.String(), "config", string(bytes))
// 启用网关代理通道
err = gateway.GatewayPortConfigs(chanConfigs)
if err != nil {
return nil, core.NewServErr(fmt.Sprintf("配置代理 %s 端口失败", proxy.IP.String()), err)
}
} else {
slog.Debug("提交代理端口配置", "proxy", proxy.IP.String())
for _, item := range chanConfigs {
str, _ := json.Marshal(item)
fmt.Println(string(str))
}
}
@@ -291,57 +255,58 @@ func (s *channelBaiyinService) RemoveChannels(batch string) error {
start := time.Now()
// 获取连接数据
channels, err := q.Channel.
Preload(q.Channel.Proxy).
Where(q.Channel.BatchNo.Eq(batch)).
Find()
channels, err := q.Channel.Where(q.Channel.BatchNo.Eq(batch)).Find()
if err != nil {
return core.NewServErr("获取通道数据失败", err)
}
proxies := make(map[string]*m.Proxy, len(channels))
groups := make(map[string][]*m.Channel, len(channels))
chans := make([]string, len(channels))
for i, channel := range channels {
ip := channel.Proxy.IP.String()
groups[ip] = append(groups[ip], channel)
proxies[ip] = &channel.Proxy
chans[i] = fmt.Sprintf("%s:%d", ip, channel.Port)
proxy, err := q.Proxy.Where(q.Proxy.ID.Eq(channels[0].ProxyID)).Take()
if err != nil {
return core.NewServErr("获取代理数据失败", err)
}
addrs := make([]netip.AddrPort, len(channels))
// 准备配置数据
edgeConfigs := make([]string, len(channels))
configs := make([]*g.PortConfigsReq, len(channels))
for i, channel := range channels {
addrs[i] = netip.AddrPortFrom(channel.Proxy.IP.Addr, channel.Port)
}
// 清空配置
for ip, channels := range groups {
proxy := proxies[ip]
secret := strings.Split(*proxy.Secret, ":")
gateway := g.NewGateway(ip, secret[0], secret[1])
configs := make([]g.PortConfigsReq, len(channels))
for i, channel := range channels {
configs[i] = g.PortConfigsReq{
Status: false,
Port: int(channel.Port),
Edge: &[]string{},
}
if channel.EdgeRef != nil {
edgeConfigs[i] = *channel.EdgeRef
} else {
slog.Warn(fmt.Sprintf("通道 %d 没有保存节点引用", channel.ID))
}
if env.DebugExternalChange {
err := gateway.GatewayPortConfigs(configs)
if err != nil {
return core.NewServErr(fmt.Sprintf("清空代理 %s 端口配置失败", proxy.IP.String()), err)
}
} else {
bytes, _ := json.Marshal(configs)
slog.Debug("清除代理端口配置", "proxy", ip, "config", string(bytes))
configs[i] = &g.PortConfigsReq{
Status: false,
Port: int(channel.Port),
Edge: &[]string{},
}
}
// 提交配置
if env.DebugExternalChange {
// 断开节点连接
g.Cloud.CloudDisconnect(&g.CloudDisconnectReq{
Uuid: proxy.Mac,
Edge: &edgeConfigs,
})
// 清空通道配置
secret := strings.Split(*proxy.Secret, ":")
gateway := g.NewGateway(proxy.IP.String(), secret[0], secret[1])
err := gateway.GatewayPortConfigs(configs)
if err != nil {
return core.NewServErr(fmt.Sprintf("清空代理 %s 端口配置失败", proxy.IP.String()), err)
}
} else {
slog.Debug("清除代理端口配置", "proxy", proxy.IP)
for _, item := range configs {
str, _ := json.Marshal(item)
fmt.Println(string(str))
}
}
// 释放端口
err = freeChans(batch, chans)
err = freeChans(proxy.ID, batch)
if err != nil {
return err
}

View File

@@ -33,28 +33,29 @@ func (s *proxyService) AllProxies(proxyType m.ProxyType, channels bool) ([]*m.Pr
// RegisterBaiyin 注册新代理服务
func (s *proxyService) RegisterBaiyin(Mac string, IP netip.Addr, username, password string) error {
// 添加可用通道到 redis
chans := make([]netip.AddrPort, 10000)
for i := range 10000 {
chans[i] = netip.AddrPortFrom(IP, uint16(i+10000))
}
err := addChans(chans)
if err != nil {
return core.NewServErr("添加通道失败", err)
}
// 保存代理信息
if err := q.Proxy.Create(&m.Proxy{
proxy := &m.Proxy{
Version: 0,
Mac: Mac,
IP: orm.Inet{Addr: IP},
Secret: u.P(fmt.Sprintf("%s:%s", username, password)),
Type: m.ProxyTypeBaiYin,
Status: m.ProxyStatusOnline,
}); err != nil {
}
if err := q.Proxy.Create(proxy); err != nil {
return core.NewServErr("保存通道数据失败")
}
// 添加可用通道到 redis
chans := make([]netip.AddrPort, 10000)
for i := range 10000 {
chans[i] = netip.AddrPortFrom(IP, uint16(i+10000))
}
err := regChans(proxy.ID, chans)
if err != nil {
return core.NewServErr("添加通道失败", err)
}
return nil
}