Files
platform/web/services/channel_baiyin.go

472 lines
12 KiB
Go
Raw Normal View History

package services
import (
"context"
"encoding/json"
"fmt"
"log/slog"
"net/netip"
"platform/pkg/env"
"platform/pkg/u"
"platform/web/core"
e "platform/web/events"
g "platform/web/globals"
"platform/web/globals/orm"
m "platform/web/models"
q "platform/web/queries"
"strings"
"time"
"github.com/hibiken/asynq"
"gorm.io/gen/field"
)
2025-12-18 14:22:56 +08:00
type channelBaiyinProvider struct{}
2026-04-17 16:27:29 +08:00
func (s *channelBaiyinProvider) CreateChannels(source netip.Addr, resourceId int32, authWhitelist bool, authPassword bool, count int, filter *EdgeFilter) ([]*m.Channel, error) {
if filter == nil {
return nil, core.NewBizErr("缺少节点过滤条件")
}
now := time.Now()
batch := ID.GenReadable("bat")
// 检查并获取套餐与白名单
2026-04-22 17:11:55 +08:00
resource, whitelists, err := ensure(now, source, resourceId, authWhitelist, count)
if err != nil {
return nil, err
}
user := resource.User
expire := now.Add(resource.Live)
// 注册异步关闭任务
_, err = g.Asynq.Enqueue(
e.NewRemoveChannel(batch),
asynq.ProcessAt(expire),
)
if err != nil {
return nil, core.NewServErr("注册异步关闭通道任务失败", err)
}
// 选择代理
proxyResult := struct {
m.Proxy
Count int
}{}
err = q.Proxy.
LeftJoin(q.Channel, q.Channel.ProxyID.EqCol(q.Proxy.ID), q.Channel.ExpiredAt.Gt(now)).
Select(q.Proxy.ALL, field.NewUnsafeFieldRaw("10000 - count(*)").As("count")).
Where(
q.Proxy.Type.Eq(int(m.ProxyTypeBaiYin)),
q.Proxy.Status.Eq(int(m.ProxyStatusOnline)),
).
Group(q.Proxy.ID).
Order(field.NewField("", "count")).
Limit(1).Scan(&proxyResult)
if err != nil {
return nil, core.NewBizErr("获取可用代理失败", err)
}
if proxyResult.Count < count {
return nil, core.NewBizErr("无可用主机,请稍后再试")
}
proxy := proxyResult.Proxy
2026-05-07 14:58:11 +08:00
// 取用端口
2026-04-18 11:15:29 +08:00
var chans []netip.AddrPort
err = g.Redsync.WithLock(proxyStatusLockKey(proxy.ID), func() error {
lockedProxy, err := q.Proxy.Where(q.Proxy.ID.Eq(proxy.ID)).Take()
if err != nil {
return err
}
if lockedProxy.Status != m.ProxyStatusOnline {
return core.NewBizErr("无可用主机,请稍后再试")
}
chans, err = lockChans(proxy.ID, batch, count)
if err != nil {
return core.NewBizErr("无可用通道,请稍后再试", err)
}
proxy = *lockedProxy
return nil
})
if err != nil {
2026-04-18 11:15:29 +08:00
return nil, err
}
2026-05-07 14:58:11 +08:00
// 取用节点
2026-05-07 12:43:15 +08:00
secret := strings.Split(u.Z(proxy.Secret), ":")
if len(secret) != 2 {
return nil, core.NewServErr(fmt.Sprintf("代理 %s 密钥格式错误", proxy.IP.String()), nil)
}
2026-05-07 12:43:15 +08:00
gateway := g.NewGateway(proxy.IP.String(), secret[0], secret[1])
2026-05-07 14:58:11 +08:00
2026-05-07 12:43:15 +08:00
edges, err := getAvailableEdges(gateway, filter, count)
if err != nil {
return nil, err
}
2026-05-07 14:58:11 +08:00
// 绑定节点到端口
channels := make([]*m.Channel, count)
chanConfigs := make([]*g.PortConfigsReq, count)
2026-05-07 12:43:15 +08:00
edgeConfigs := make([]string, 0, count)
for i := range count {
ch := chans[i]
edge := edges[i]
// 通道数据
channels[i] = &m.Channel{
UserID: user.ID,
ResourceID: resourceId,
BatchNo: batch,
ProxyID: proxy.ID,
Host: u.Else(proxy.Host, proxy.IP.String()),
Port: ch.Port(),
EdgeRef: u.P(edge.EdgeID),
FilterISP: filter.Isp,
FilterProv: filter.Prov,
FilterCity: filter.City,
ExpiredAt: expire,
2026-05-07 14:58:11 +08:00
Proxy: &proxy,
}
// 通道配置数据
chanConfigs[i] = &g.PortConfigsReq{
Port: int(ch.Port()),
Status: true,
Edge: &[]string{edge.EdgeID},
}
// 白名单模式
if authWhitelist {
channels[i].Whitelists = u.P(strings.Join(whitelists, ","))
chanConfigs[i].Whitelist = &whitelists
}
// 密码模式
if authPassword {
username, password := genPassPair()
channels[i].Username = &username
channels[i].Password = &password
chanConfigs[i].Userpass = u.P(username + ":" + password)
}
// 连接配置数据
2026-05-07 12:43:15 +08:00
if edge.Type == EdgeInfoCloud {
edgeConfigs = append(edgeConfigs, edge.EdgeID)
}
}
// 提交配置
slog.Debug("提交代理端口配置", "proxy", proxy.IP.String(), "total_count", len(chanConfigs), "remote_count", len(edgeConfigs))
if env.RunMode == env.RunModeProd {
// 连接节点到网关
err = g.Cloud.CloudConnect(&g.CloudConnectReq{
Uuid: proxy.Mac,
Edge: &edgeConfigs,
})
if err != nil {
return nil, core.NewServErr("连接云平台失败", err)
}
// 启用网关代理通道
err = gateway.GatewayPortConfigs(chanConfigs)
if err != nil {
slog.Warn("提交代理端口配置失败", "error", err.Error())
return nil, core.NewServErr(fmt.Sprintf("配置代理 %s 端口失败", proxy.IP.String()), err)
}
} else {
for _, item := range chanConfigs {
str, _ := json.Marshal(item)
fmt.Println(string(str))
}
}
// 保存数据
err = q.Q.Transaction(func(q *q.Query) error {
2026-05-07 14:58:11 +08:00
// 更新使用记录
var err error
switch resource.Type {
case m.ResourceTypeShort:
2026-05-07 12:43:15 +08:00
_, err = q.ResourceShort.
Where(
q.ResourceShort.ID.Eq(*resource.ShortId),
q.ResourceShort.Used.Eq(resource.Used),
q.ResourceShort.Daily.Eq(resource.Daily),
).
UpdateSimple(
q.ResourceShort.Used.Add(int32(count)),
q.ResourceShort.Daily.Value(int32(resource.Today+count)),
q.ResourceShort.LastAt.Value(now),
)
2026-05-07 14:58:11 +08:00
case m.ResourceTypeLong:
2026-05-07 12:43:15 +08:00
_, err = q.ResourceLong.
Where(
q.ResourceLong.ID.Eq(*resource.LongId),
q.ResourceLong.Used.Eq(resource.Used),
q.ResourceLong.Daily.Eq(resource.Daily),
).
UpdateSimple(
q.ResourceLong.Used.Add(int32(count)),
q.ResourceLong.Daily.Value(int32(resource.Today+count)),
q.ResourceLong.LastAt.Value(now),
)
default:
return core.NewServErr("套餐类型不正确,无法更新", nil)
}
if err != nil {
return core.NewServErr("更新套餐使用记录失败", err)
}
// 保存通道
err = q.Channel.
Omit(field.AssociationFields).
Create(channels...)
if err != nil {
return core.NewServErr("保存通道失败", err)
}
// 保存提取记录
err = q.LogsUserUsage.Create(&m.LogsUserUsage{
UserID: user.ID,
ResourceID: resourceId,
BatchNo: batch,
Count: int32(count),
ISP: u.P(filter.Isp.String()),
Prov: filter.Prov,
City: filter.City,
IP: orm.Inet{Addr: source},
Time: now,
})
if err != nil {
return core.NewServErr("保存用户使用记录失败", err)
}
return nil
})
if err != nil {
return nil, err
}
return channels, nil
}
func (s *channelBaiyinProvider) RemoveChannels(batch string, proxyId *int32) error {
return g.Redsync.WithLock(batchRemoveExpiredKey(batch), func() error {
start := time.Now()
pid := int32(0)
if proxyId == nil {
// 获取连接数据
channels, err := q.Channel.Where(q.Channel.BatchNo.Eq(batch)).Find()
if err != nil {
return core.NewServErr(fmt.Sprintf("获取通道数据失败batch%s", batch), err)
}
if len(channels) == 0 {
slog.Warn(fmt.Sprintf("未找到通道数据batch%s", batch))
return nil
}
proxy, err := q.Proxy.Where(q.Proxy.ID.Eq(channels[0].ProxyID)).Take()
if err != nil {
return core.NewServErr(fmt.Sprintf("获取代理数据失败batch%s", batch), err)
}
// 检查通道是否存在
exist, err := g.Redis.Exists(context.Background(), usedChansKey(proxy.ID, batch)).Result()
if err != nil {
return core.NewServErr("查询使用中通道失败", err)
}
if exist == 0 {
return nil // 没有使用中通道,已经被清理过了
}
// 准备配置数据
edgeConfigs := make([]string, len(channels))
configs := make([]*g.PortConfigsReq, len(channels))
for i, channel := range channels {
if channel.EdgeRef != nil {
edgeConfigs[i] = *channel.EdgeRef
} else {
slog.Warn(fmt.Sprintf("通道 %d 没有保存节点引用", channel.ID))
}
configs[i] = &g.PortConfigsReq{
Status: false,
Port: int(channel.Port),
Edge: &[]string{},
}
}
// 提交配置
if env.RunMode == env.RunModeProd {
// 清空通道配置
secret := strings.Split(u.Z(proxy.Secret), ":")
gateway := g.NewGateway(proxy.IP.String(), secret[0], secret[1])
err := gateway.GatewayPortConfigs(configs)
if err != nil {
return core.NewServErr(fmt.Sprintf("清空代理 %s 端口配置失败", proxy.IP.String()), err)
}
// 断开节点连接
_, err = g.Cloud.CloudDisconnect(&g.CloudDisconnectReq{
Uuid: proxy.Mac,
Edge: &edgeConfigs,
})
if err != nil {
slog.Warn("断开云平台连接失败", "error", err.Error())
return core.NewServErr("断开云平台连接失败", err)
}
} else {
for _, item := range configs {
str, _ := json.Marshal(item)
fmt.Println(string(str))
}
}
pid = proxy.ID
} else {
pid = *proxyId
}
// 释放端口
err := freeChans(pid, batch)
if err != nil {
return err
}
slog.Debug("清除代理端口配置", "proxy", pid, "batch", batch, "duration", time.Since(start).String())
return nil
})
}
// ClearExpiredChannels 定期清理过期通道,返回清理数量
// 通道有三种情况:
// - 过期等待清理,过期时间在一小时内,可以等待异步任务回收通道
// - 过期未清理,过期时间超过一小时,说明异步任务可能失败了,需要强制清理
// - 异常通道,取用后任务失败,导致通道悬空,需要强制清理
func (s *channelBaiyinProvider) ClearExpiredChannels(proxyId int32) (int, error) {
now := time.Now()
// 获取使用中通道批次
keys, err := g.Redis.Keys(context.Background(), usedChansKey(proxyId, "*")).Result()
if err != nil {
return 0, core.NewServErr("查询使用中通道失败", err)
}
batchList := make([]string, len(keys))
batchSet := make(map[string]struct{}, len(keys))
for i, key := range keys {
parts := strings.Split(key, ":")
if len(parts) != 4 {
return 0, core.NewServErr(fmt.Sprintf("使用中通道键格式错误: %s", key), nil)
}
batchList[i] = parts[3]
batchSet[parts[3]] = struct{}{}
}
// 排除未过期通道
var batchQueried []struct{ BatchNo string }
err = q.Channel.
Select(q.Channel.BatchNo).
Where(
q.Channel.BatchNo.In(batchList...),
q.Channel.ExpiredAt.Gte(now),
).
Group(q.Channel.BatchNo).
Scan(&batchQueried)
if err != nil {
return 0, core.NewServErr("查询过期通道失败", err)
}
for _, batch := range batchQueried {
delete(batchSet, batch.BatchNo)
}
// 清理过期通道
slog.Info("批量清理过期通道", "count", len(batchSet))
for batchNo, _ := range batchSet {
err := s.RemoveChannels(batchNo, &proxyId)
2026-05-07 14:58:11 +08:00
if err != nil {
slog.Error("清理过期通道失败", "batch", batchNo, "error", err)
}
}
return len(batchSet), nil
2026-05-07 14:58:11 +08:00
}
func batchRemoveExpiredKey(bid string) string {
return fmt.Sprintf("platform:batch:remove_expired:%s", bid)
}
2026-05-07 12:43:15 +08:00
func getAvailableEdges(gateway g.GatewayClient, filter *EdgeFilter, count int) ([]EdgeInfo, error) {
edges := make([]EdgeInfo, 0, count)
// 先查本地
localEdgesResp, err := gateway.GatewayEdge(&g.GatewayEdgeReq{
Province: filter.Prov,
City: filter.City,
Isp: u.X(filter.Isp.String()),
Limit: &count,
Assigned: u.P(false),
})
if err != nil {
2026-05-07 14:58:11 +08:00
return nil, core.NewBizErr("获取可用节点失败[1]", err)
2026-05-07 12:43:15 +08:00
}
for id, _ := range localEdgesResp {
edges = append(edges, EdgeInfo{
Type: EdgeInfoLocal,
EdgeID: id,
})
}
if len(edges) >= count {
return edges, nil
}
// 再查云端无重复
remaining := count - len(edges)
cloudEdgesResp, err := g.Cloud.CloudEdges(&g.CloudEdgesReq{
Province: filter.Prov,
City: filter.City,
Isp: u.X(filter.Isp.String()),
Limit: &remaining,
NoRepeat: u.P(true),
ActiveTime: u.P(3600),
IpUnchangedTime: u.P(3600),
})
if err != nil {
2026-05-07 14:58:11 +08:00
return nil, core.NewBizErr("获取可用节点失败[2]", err)
2026-05-07 12:43:15 +08:00
}
for _, edge := range cloudEdgesResp.Edges {
edges = append(edges, EdgeInfo{
Type: EdgeInfoCloud,
EdgeID: edge.EdgeID,
})
}
if len(edges) >= count {
return edges, nil
}
// 不能和已有的重复,如果有重复则再次查询云端补足,二次提取还有重复则放弃
return nil, core.NewBizErr("地区可用节点数量不足")
}
type EdgeInfo struct {
Type EdgeInfoType
EdgeID string
}
type EdgeInfoType string
const (
EdgeInfoLocal EdgeInfoType = "local"
EdgeInfoCloud EdgeInfoType = "cloud"
)