Files
platform/web/services/channel.go

897 lines
22 KiB
Go

package services
import (
"context"
"database/sql"
"fmt"
"log/slog"
"math"
"math/rand/v2"
"platform/pkg/env"
"platform/pkg/u"
"platform/web/core"
channel2 "platform/web/domains/channel"
edge2 "platform/web/domains/edge"
proxy2 "platform/web/domains/proxy"
resource2 "platform/web/domains/resource"
"platform/web/events"
g "platform/web/globals"
"platform/web/globals/orm"
m "platform/web/models"
q "platform/web/queries"
"strconv"
"strings"
"time"
"github.com/gofiber/fiber/v2"
"github.com/hibiken/asynq"
"gorm.io/gen/field"
"github.com/redis/go-redis/v9"
)
var Channel = &channelService{}
type channelService struct{}
// region 删除通道
func (s *channelService) RemoveChannels(id []int32, userId ...int32) error {
var now = time.Now()
err := q.Q.Transaction(func(tx *q.Query) error {
// 查找通道
var do = tx.Channel.Where(q.Channel.ID.In(id...))
if len(userId) > 0 {
do.Where(q.Channel.UserID.Eq(userId[0]))
}
channels, err := tx.Channel.Where(do).Find()
if err != nil {
return core.NewBizErr("查找通道失败", err)
}
proxyMap := make(map[int32]*m.Proxy)
proxyIds := make([]int32, 0)
resourceMap := make(map[int32]*m.Resource)
resourceIds := make([]int32, 0)
for _, channel := range channels {
if _, ok := proxyMap[channel.ProxyID]; !ok {
proxyIds = append(proxyIds, channel.ProxyID)
proxyMap[channel.ProxyID] = &m.Proxy{}
}
if _, ok := resourceMap[channel.ResourceID]; !ok {
resourceIds = append(resourceIds, channel.ResourceID)
resourceMap[channel.ResourceID] = &m.Resource{}
}
}
// 查找资源
resources, err := tx.Resource.Where(tx.Resource.ID.In(resourceIds...)).Find()
if err != nil {
return core.NewBizErr("查找资源失败", err)
}
for _, res := range resources {
resourceMap[res.ID] = res
}
// 查找代理
proxies, err := tx.Proxy.Where(q.Proxy.ID.In(proxyIds...)).Find()
if err != nil {
return core.NewBizErr("查找代理失败", err)
}
for _, proxy := range proxies {
proxyMap[proxy.ID] = proxy
}
// 区分通道类型
shortToRemove := make([]*m.Channel, 0)
longToRemove := make([]*m.Channel, 0)
for _, channel := range channels {
resource := resourceMap[channel.ResourceID]
switch resource2.Type(resource.Type) {
case resource2.TypeShort:
shortToRemove = append(shortToRemove, channel)
case resource2.TypeLong:
longToRemove = append(longToRemove, channel)
}
}
// 删除指定的通道
result, err := tx.Channel.
Where(q.Channel.ID.In(id...)).
Update(q.Channel.DeletedAt, now)
if err != nil {
return core.NewBizErr("删除通道失败", err)
}
if result.RowsAffected != int64(len(channels)) {
return core.NewBizErr("删除通道数量不匹配")
}
// 禁用代理端口并下线用过的节点
if env.DebugExternalChange {
var step = time.Now()
if len(shortToRemove) > 0 {
err := removeShortChannelExternal(proxies, shortToRemove)
if err != nil {
return core.NewBizErr("提交删除通道配置失败", err)
}
}
slog.Debug("提交删除通道配置", "step", time.Since(step))
}
return nil
})
if err != nil {
return err
}
return nil
}
func removeShortChannelExternal(proxies []*m.Proxy, channels []*m.Channel) error {
// 组织数据
var configMap = make(map[int32][]g.PortConfigsReq, len(proxies))
var proxyMap = make(map[int32]*m.Proxy, len(proxies))
for _, proxy := range proxies {
configMap[proxy.ID] = make([]g.PortConfigsReq, 0)
proxyMap[proxy.ID] = proxy
}
var portMap = make(map[uint64]struct{})
for _, channel := range channels {
var config = g.PortConfigsReq{
Port: int(channel.ProxyPort),
Edge: &[]string{},
AutoEdgeConfig: &g.AutoEdgeConfig{
Count: u.P(0),
},
Status: false,
}
configMap[channel.ProxyID] = append(configMap[channel.ProxyID], config)
key := uint64(channel.ProxyID)<<32 | uint64(channel.ProxyPort)
portMap[key] = struct{}{}
}
// 更新配置
for proxyId, configs := range configMap {
if len(configs) == 0 {
continue
}
proxy, ok := proxyMap[proxyId]
if !ok {
return core.NewBizErr("代理不存在")
}
if proxy.Secret == nil {
return core.NewBizErr("代理未配置密钥")
}
var secret = strings.Split(*proxy.Secret, ":")
gateway := g.NewGateway(
proxy.Host,
secret[0],
secret[1],
)
// 查询节点配置
actives, err := gateway.GatewayPortActive()
if err != nil {
return core.NewBizErr("查询节点配置失败", err)
}
// 更新节点配置
err = gateway.GatewayPortConfigs(configs)
if err != nil {
return core.NewBizErr("提交删除通道配置失败", err)
}
// 下线对应节点
var edges []string
for portStr, active := range actives {
port, err := strconv.Atoi(portStr)
if err != nil {
return core.NewBizErr("端口转换失败", err)
}
key := uint64(proxyId)<<32 | uint64(port)
if _, ok := portMap[key]; ok {
edges = append(edges, active.Edge...)
}
}
if len(edges) > 0 {
_, err := g.Cloud.CloudDisconnect(g.CloudDisconnectReq{
Uuid: proxy.Name,
Edge: edges,
})
if err != nil {
return core.NewBizErr("下线节点失败", err)
}
}
}
return nil
}
// endregion
// region 创建通道
func (s *channelService) CreateChannel(
c *fiber.Ctx,
userId int32,
resourceId int32,
protocol channel2.Protocol,
authType ChannelAuthType,
count int,
edgeFilter ...EdgeFilter,
) (channels []*m.Channel, err error) {
var now = time.Now()
var filter = EdgeFilter{}
if len(edgeFilter) > 0 {
filter = edgeFilter[0]
}
var resource *ResourceInfo
err = q.Q.Transaction(func(q *q.Query) (err error) {
// 查找套餐
resource, err = findResource(q, resourceId, userId, count, now)
if err != nil {
return core.NewBizErr("查找套餐失败", err)
}
// 查找白名单
var whitelist []string
if authType == ChannelAuthTypeIp {
whitelist, err = findWhitelist(q, userId)
if err != nil {
return core.NewBizErr("查找白名单失败", err)
}
}
// 分配节点
var config = ChannelCreator{
Protocol: protocol,
AuthIp: authType == ChannelAuthTypeIp,
Whitelists: whitelist,
AuthPass: authType == ChannelAuthTypePass,
}
switch resource.Type {
case resource2.TypeShort:
config.Expiration = now.Add(time.Duration(resource.Live) * time.Second)
channels, err = assignShortChannels(q, userId, resourceId, count, config, filter, now)
case resource2.TypeLong:
config.Expiration = now.Add(time.Duration(resource.Live) * time.Hour)
channels, err = assignLongChannels(q, userId, resourceId, count, config, filter)
}
if err != nil {
return core.NewBizErr("分配通道失败", err)
}
// 保存通道开通结果
err = saveAssigns(q, resource, channels, now)
if err != nil {
return core.NewBizErr("保存通道失败", err)
}
return nil
}, &sql.TxOptions{Isolation: sql.LevelRepeatableRead})
if err != nil {
return nil, err
}
// 定时异步删除过期通道
var duration time.Duration
switch resource.Type {
case resource2.TypeShort:
duration = time.Duration(resource.Live) * time.Second
case resource2.TypeLong:
duration = time.Duration(resource.Live) * time.Minute
}
var ids = make([]int32, len(channels))
for i := range channels {
ids[i] = channels[i].ID
}
_, err = g.Asynq.Enqueue(
events.NewRemoveChannel(ids),
asynq.ProcessIn(duration),
)
if err != nil {
return nil, core.NewBizErr("提交异步删除通道任务失败", err)
}
// 记录通道创建日志
slog.Info("创建通道",
slog.Int("user_id", int(userId)),
slog.Int("resource_id", int(resourceId)),
slog.Int("count", count),
slog.String("prov", filter.Prov),
slog.String("city", filter.City),
slog.String("isp", filter.Isp),
slog.String("ip", c.IP()),
slog.Time("time", now),
)
return channels, nil
}
func findResource(q *q.Query, resourceId int32, userId int32, count int, now time.Time) (*ResourceInfo, error) {
resource, err := q.Resource.
Preload(
q.Resource.Short,
q.Resource.Long,
).
Where(
q.Resource.ID.Eq(resourceId),
q.Resource.UserID.Eq(userId),
).
Take()
if err != nil {
return nil, ErrResourceNotExist
}
var info = &ResourceInfo{
Id: resource.ID,
Active: resource.Active,
Type: resource2.Type(resource.Type),
}
switch resource2.Type(resource.Type) {
case resource2.TypeShort:
var sub = resource.Short
var dailyLast = time.Time{}
if sub.DailyLast != nil {
dailyLast = time.Time(*sub.DailyLast)
}
var expire = time.Time{}
if sub.Expire != nil {
expire = time.Time(*sub.Expire)
}
var quota int32
if sub.Quota != nil {
quota = *sub.Quota
}
info.Mode = resource2.Mode(sub.Type)
info.Live = sub.Live
info.DailyLimit = sub.DailyLimit
info.DailyUsed = sub.DailyUsed
info.DailyLast = dailyLast
info.Expire = expire
info.Quota = quota
info.Used = sub.Used
case resource2.TypeLong:
var sub = resource.Long
var dailyLast = time.Time{}
if sub.DailyLast != nil {
dailyLast = time.Time(*sub.DailyLast)
}
var expire = time.Time{}
if sub.Expire != nil {
expire = time.Time(*sub.Expire)
}
var quota int32
if sub.Quota != nil {
quota = *sub.Quota
}
info.Mode = resource2.Mode(sub.Type)
info.Live = sub.Live
info.DailyLimit = sub.DailyLimit
info.DailyUsed = sub.DailyUsed
info.DailyLast = dailyLast
info.Expire = expire
info.Quota = quota
info.Used = sub.Used
}
// 检查套餐状态
if !info.Active {
return nil, ErrResourceInvalid
}
// 检查套餐使用情况
switch info.Mode {
default:
return nil, core.NewBizErr("不支持的套餐模式")
// 包时
case resource2.ModeTime:
// 检查过期时间
if info.Expire.Before(now) {
return nil, ErrResourceExpired
}
// 检查每日限额
used := 0
if now.Format("2006-01-02") == info.DailyLast.Format("2006-01-02") {
used = int(info.DailyUsed)
}
excess := used+count > int(info.DailyLimit)
if excess {
return nil, ErrResourceDailyLimit
}
// 包量
case resource2.ModeCount:
// 检查可用配额
if int(info.Quota)-int(info.Used) < count {
return nil, ErrResourceExhausted
}
}
return info, nil
}
func findWhitelist(q *q.Query, userId int32) ([]string, error) {
var whitelist []string
err := q.Whitelist.
Where(q.Whitelist.UserID.Eq(userId)).
Select(q.Whitelist.Host).
Scan(&whitelist)
if err != nil {
return nil, core.NewBizErr("查询白名单失败", err)
}
if len(whitelist) == 0 {
return nil, core.NewBizErr("没有配置白名单")
}
return whitelist, nil
}
func assignShortChannels(q *q.Query, userId int32, resourceId int32, count int, config ChannelCreator, filter EdgeFilter, now time.Time) ([]*m.Channel, error) {
// 查找网关
proxies, err := q.Proxy.
Where(q.Proxy.Type.Eq(int32(proxy2.TypeThirdParty))).
Find()
if err != nil {
return nil, core.NewBizErr("查找网关失败", err)
}
// 查找已使用的节点
var proxyIds = make([]int32, len(proxies))
for i, proxy := range proxies {
proxyIds[i] = proxy.ID
}
allChannels, err := q.Channel.
Select(
q.Channel.ProxyID,
q.Channel.ProxyPort).
Where(
q.Channel.ProxyID.In(proxyIds...),
q.Channel.Expiration.Gt(orm.LocalDateTime(now))).
Group(
q.Channel.ProxyPort,
q.Channel.ProxyID).
Find()
if err != nil {
return nil, core.NewBizErr("查找已使用的节点失败", err)
}
// 查询已配置的节点
remoteConfigs, err := g.Cloud.CloudAutoQuery()
if err != nil {
return nil, core.NewBizErr("查询远端节点配置失败", err)
}
// 统计已用节点量与端口查找表
var proxyUses = make(map[int32]int, len(allChannels))
var portsMap = make(map[uint64]struct{})
for _, channel := range allChannels {
proxyUses[channel.ProxyID]++
key := uint64(channel.ProxyID)<<32 | uint64(channel.ProxyPort)
portsMap[key] = struct{}{}
}
// 计算分配额度
var total = len(allChannels) + count
var avg = int(math.Ceil(float64(total) / float64(len(proxies))))
// 分配节点
var newChannels = make([]*m.Channel, 0, count)
for _, proxy := range proxies {
var prev = proxyUses[proxy.ID]
var next = int(math.Max(float64(prev), float64(int(math.Min(float64(avg), float64(total))))))
total -= next
var acc = next - prev
if acc <= 0 {
continue
}
// 获取远端配置量
var count = 0
remoteConfig, ok := remoteConfigs[proxy.Name]
if ok {
for _, config := range remoteConfig {
if config.Isp == filter.Isp && config.City == filter.City && config.Province == filter.Prov {
count = config.Count
break
}
}
}
// 提交节点配置
if env.DebugExternalChange && next > count {
var step = time.Now()
var multiple float64 = 2 // 扩张倍数
var newConfig = g.AutoConfig{
Province: filter.Prov,
City: filter.City,
Isp: filter.Isp,
Count: int(math.Ceil(float64(next) * multiple)),
}
var newConfigs []g.AutoConfig
if count == 0 {
newConfigs = append(newConfigs, newConfig)
} else {
newConfigs = make([]g.AutoConfig, len(remoteConfig))
for i, config := range remoteConfig {
if config.Isp == filter.Isp && config.City == filter.City && config.Province == filter.Prov {
count = config.Count
break
}
newConfigs[i] = config
}
}
err := g.Cloud.CloudConnect(g.CloudConnectReq{
Uuid: proxy.Name,
Edge: nil,
AutoConfig: newConfigs,
})
if err != nil {
return nil, core.NewBizErr("提交节点配置失败", err)
}
slog.Debug("提交节点配置",
slog.Duration("step", time.Since(step)),
slog.String("proxy", proxy.Name),
slog.Int("used", prev),
slog.Int("count", next),
)
}
// 筛选可用端口
var portConfigs = make([]g.PortConfigsReq, 0, acc)
for port := 10000; port < 20000 && len(portConfigs) < acc; port++ {
// 跳过存在的端口
key := uint64(proxy.ID)<<32 | uint64(port)
_, ok := portsMap[key]
if ok {
continue
}
// 配置新端口
var portConf = g.PortConfigsReq{
Port: port,
Edge: nil,
Status: true,
AutoEdgeConfig: &g.AutoEdgeConfig{
Province: filter.Prov,
City: filter.City,
Isp: filter.Isp,
Count: u.P(1),
PacketLoss: 30,
},
}
var newChannel = &m.Channel{
UserID: userId,
ProxyID: proxy.ID,
ResourceID: resourceId,
ProxyHost: proxy.Host,
ProxyPort: int32(port),
Protocol: u.P(int32(config.Protocol)),
Expiration: orm.LocalDateTime(config.Expiration),
}
if config.AuthIp {
portConf.Whitelist = &config.Whitelists
newChannel.AuthIP = true
newChannel.Whitelists = u.P(strings.Join(config.Whitelists, ","))
}
if config.AuthPass {
username, password := genPassPair()
portConf.Userpass = u.P(fmt.Sprintf("%s:%s", username, password))
newChannel.AuthPass = true
newChannel.Username = &username
newChannel.Password = &password
}
portConfigs = append(portConfigs, portConf)
newChannels = append(newChannels, newChannel)
}
if len(portConfigs) < acc {
return nil, core.NewBizErr("网关端口数量到达上限,无法分配")
}
// 提交端口配置
if env.DebugExternalChange {
var step = time.Now()
if proxy.Secret == nil {
return nil, core.NewBizErr("代理未配置密钥")
}
var secret = strings.Split(*proxy.Secret, ":")
gateway := g.NewGateway(
proxy.Host,
secret[0],
secret[1],
)
err = gateway.GatewayPortConfigs(portConfigs)
if err != nil {
return nil, core.NewBizErr("提交端口配置失败", err)
}
slog.Debug("提交端口配置", "step", time.Since(step))
}
}
if len(newChannels) != count {
return nil, core.NewBizErr("分配节点失败")
}
return newChannels, nil
}
func assignLongChannels(q *q.Query, userId int32, resourceId int32, count int, config ChannelCreator, filter EdgeFilter) ([]*m.Channel, error) {
// 查询符合条件的节点,根据 channel 统计使用次数
var edges = make([]struct {
m.Edge
Count int
ProxyHost string
ProxySecret string
}, 0)
do := q.Edge.Where(q.Edge.Status.Eq(1))
if filter.Prov != "" {
do = do.Where(q.Edge.Prov.Eq(filter.Prov))
}
if filter.City != "" {
do = do.Where(q.Edge.City.Eq(filter.City))
}
if filter.Isp != "" {
do = do.Where(q.Edge.Isp.Eq(int32(edge2.ISPFromStr(filter.Isp))))
}
err := q.Edge.
LeftJoin(q.Channel, q.Channel.EdgeID.EqCol(q.Edge.ID)).
LeftJoin(q.Proxy, q.Proxy.ID.EqCol(q.Edge.ProxyID)).
Select(
q.Edge.ALL,
q.Channel.ALL.Count().As("count"),
q.Proxy.Host.As("proxy_host"),
q.Proxy.Secret.As("proxy_secret"),
).
Group(q.Edge.ID, q.Proxy.Host, q.Proxy.Secret).
Where(do).
Order(field.NewField("", "count").Asc()).
Limit(count).
Scan(&edges)
if err != nil {
return nil, core.NewBizErr("查询符合条件的节点失败", err)
}
if len(edges) == 0 {
return nil, ErrEdgesNoAvailable
}
// 计算分配负载(考虑去重,维护一个节点使用记录表,优先分配未使用节点,达到算法额定负载后再选择负载最少的节点)
var total = count
for _, edge := range edges {
total += edge.Count
}
var avg = int(math.Ceil(float64(total) / float64(len(edges))))
var channels = make([]*m.Channel, 0, count)
var proxies = make(map[int32]*m.Proxy)
var reqs = make(map[int32][]*g.ProxyPermitConfig)
for _, edge := range edges {
if edge.ProxyID == nil || edge.ProxyPort == nil {
return nil, core.NewBizErr("节点配置不完整,缺少代理信息")
}
if _, ok := proxies[*edge.ProxyID]; !ok {
proxies[*edge.ProxyID] = &m.Proxy{
ID: *edge.ProxyID,
Host: edge.ProxyHost,
Secret: &edge.ProxySecret,
}
}
prev := edge.Count
next := int(math.Max(float64(prev), float64(int(math.Min(float64(avg), float64(total))))))
total -= next
acc := next - prev
if acc <= 0 {
continue
}
for range acc {
var channel = &m.Channel{
UserID: userId,
ProxyID: *edge.ProxyID,
EdgeID: &edge.ID,
ResourceID: resourceId,
Protocol: u.P(int32(config.Protocol)),
AuthIP: config.AuthIp,
AuthPass: config.AuthPass,
Expiration: orm.LocalDateTime(config.Expiration),
ProxyHost: edge.ProxyHost,
ProxyPort: *edge.ProxyPort,
}
if config.AuthIp {
channel.Whitelists = u.P(strings.Join(config.Whitelists, ","))
}
if config.AuthPass {
username, password := genPassPair()
channel.Username = &username
channel.Password = &password
}
channels = append(channels, channel)
req := &g.ProxyPermitConfig{
Id: *channel.EdgeID,
Expire: time.Time(channel.Expiration),
}
if channel.AuthIP {
req.Whitelists = &config.Whitelists
}
if channel.AuthPass {
req.Username = channel.Username
req.Password = channel.Password
}
reqs[*edge.ProxyID] = append(reqs[*edge.ProxyID], req)
}
}
// 发送配置到网关
if env.DebugExternalChange {
var step = time.Now()
for id, reqs := range reqs {
proxy := proxies[id]
err := g.Proxy.Permit(proxy.Host, *proxy.Secret, reqs)
if err != nil {
return nil, core.NewBizErr("提交端口配置失败", err)
}
}
slog.Debug("提交端口配置", "step", time.Since(step))
}
return channels, nil
}
func saveAssigns(q *q.Query, resource *ResourceInfo, channels []*m.Channel, now time.Time) (err error) {
if len(channels) == 0 {
return nil
}
// 缓存通道数据
pipe := g.Redis.TxPipeline()
zList := make([]redis.Z, 0, len(channels))
for _, channel := range channels {
expiration := time.Time(channel.Expiration)
zList = append(zList, redis.Z{
Score: float64(expiration.Unix()),
Member: channel.ID,
})
}
pipe.ZAdd(context.Background(), "tasks:channel", zList...)
_, err = pipe.Exec(context.Background())
if err != nil {
return core.NewBizErr("缓存通道数据失败", err)
}
// 保存通道
err = q.Channel.
Create(channels...)
if err != nil {
return core.NewBizErr("保存通道失败", err)
}
// 更新套餐使用记录
var count = len(channels)
var last = resource.DailyLast
var dailyUsed int32
if now.Year() != last.Year() || now.Month() != last.Month() || now.Day() != last.Day() {
dailyUsed = int32(count)
} else {
dailyUsed = resource.DailyUsed + int32(count)
}
switch resource.Type {
case resource2.TypeShort:
_, err = q.ResourceShort.
Where(q.ResourceShort.ResourceID.Eq(resource.Id)).
UpdateSimple(
q.ResourceShort.Used.Add(int32(count)),
q.ResourceShort.DailyUsed.Value(dailyUsed),
q.ResourceShort.DailyLast.Value(orm.LocalDateTime(now)),
)
case resource2.TypeLong:
_, err = q.ResourceLong.
Where(q.ResourceLong.ResourceID.Eq(resource.Id)).
UpdateSimple(
q.ResourceLong.Used.Add(int32(count)),
q.ResourceLong.DailyUsed.Value(dailyUsed),
q.ResourceLong.DailyLast.Value(orm.LocalDateTime(now)),
)
}
if err != nil {
return core.NewBizErr("更新套餐使用记录失败", err)
}
return nil
}
func genPassPair() (string, string) {
//goland:noinspection SpellCheckingInspection
var alphabet = []rune("abcdefghjkmnpqrstuvwxyz")
var numbers = []rune("23456789")
var username = make([]rune, 6)
var password = make([]rune, 6)
for i := range 6 {
if i < 2 {
username[i] = alphabet[rand.N(len(alphabet))]
} else {
username[i] = numbers[rand.N(len(numbers))]
}
password[i] = numbers[rand.N(len(numbers))]
}
return string(username), string(password)
}
// endregion
type ChannelAuthType int
const (
ChannelAuthTypeIp ChannelAuthType = iota + 1
ChannelAuthTypePass
)
type ChannelCreator struct {
Protocol channel2.Protocol
AuthIp bool
Whitelists []string
AuthPass bool
Expiration time.Time
}
type ResourceInfo struct {
Id int32
Active bool
Type resource2.Type
Mode resource2.Mode
Live int32
DailyLimit int32
DailyUsed int32
DailyLast time.Time
Quota int32
Used int32
Expire time.Time
}
var (
ErrResourceNotExist = core.NewBizErr("套餐不存在")
ErrResourceInvalid = core.NewBizErr("套餐不可用")
ErrResourceExhausted = core.NewBizErr("套餐已用完")
ErrResourceExpired = core.NewBizErr("套餐已过期")
ErrResourceDailyLimit = core.NewBizErr("套餐每日配额已用完")
ErrEdgesNoAvailable = core.NewBizErr("没有可用的节点")
)