Files
platform/web/services/channel_baiyin.go

358 lines
8.7 KiB
Go
Raw Normal View History

package services
import (
"database/sql/driver"
"encoding/json"
"fmt"
"log/slog"
"net/netip"
"platform/pkg/env"
"platform/pkg/u"
"platform/web/core"
e "platform/web/events"
g "platform/web/globals"
"platform/web/globals/orm"
m "platform/web/models"
q "platform/web/queries"
"strings"
"time"
"github.com/hibiken/asynq"
"gorm.io/gen/field"
)
type channelBaiyinService struct{}
func (s *channelBaiyinService) CreateChannels(source netip.Addr, resourceId int32, authWhitelist bool, authPassword bool, count int, edgeFilter ...EdgeFilter) ([]*m.Channel, error) {
2025-11-28 18:53:08 +08:00
if count > 400 {
return nil, core.NewBizErr("单次最多提取 400 个")
}
var filter *EdgeFilter = nil
if len(edgeFilter) > 0 {
filter = &edgeFilter[0]
}
now := time.Now()
batch := ID.GenReadable("bat")
// 获取用户套餐
2025-12-05 16:52:40 +08:00
resource, err := findResource(resourceId)
if err != nil {
return nil, err
}
// 检查用户
user := resource.User
if user.IDToken == nil || *user.IDToken == "" {
return nil, core.NewBizErr("账号未实名")
}
// 获取用户白名单并检查用户 ip 地址
whitelists, err := q.Whitelist.Where(
q.Whitelist.UserID.Eq(user.ID),
).Find()
if err != nil {
return nil, err
}
whitelistIPs := make([]string, len(whitelists))
pass := false
for i, item := range whitelists {
whitelistIPs[i] = item.IP.String()
2025-12-01 10:05:55 +08:00
if item.IP.Addr == source {
pass = true
}
}
if !pass {
2025-12-01 10:05:55 +08:00
return nil, core.NewBizErr(fmt.Sprintf("IP 地址 %s 不在白名单内", source.String()))
}
// 检查套餐使用情况
switch resource.Mode {
default:
return nil, core.NewBizErr("不支持的套餐模式")
// 包时
case m.ResourceModeTime:
// 检查过期时间
if resource.Expire.Before(now) {
return nil, ErrResourceExpired
}
// 检查每日限额
used := 0
if now.Format("2006-01-02") == resource.DailyLast.Format("2006-01-02") {
used = int(resource.DailyUsed)
}
excess := used+count > int(resource.DailyLimit)
if excess {
return nil, ErrResourceDailyLimit
}
// 包量
case m.ResourceModeQuota:
// 检查可用配额
if int(resource.Quota)-int(resource.Used) < count {
return nil, ErrResourceExhausted
}
}
expire := now.Add(resource.Live)
// 获取可用通道
chans, err := lockChans(batch, count, expire)
if err != nil {
return nil, err
}
// 获取对应代理
ips := make([]driver.Valuer, 0)
findProxy := make(map[orm.Inet]*m.Proxy)
for _, ch := range chans {
ip := orm.Inet{Addr: ch.Addr()}
if _, ok := findProxy[ip]; !ok {
ips = append(ips, ip)
findProxy[ip] = nil
}
}
proxies, err := q.Proxy.Where(
q.Proxy.Type.Eq(int(m.ProxyTypeBaiYin)),
q.Proxy.Status.Eq(int(m.ProxyStatusOnline)),
q.Proxy.IP.In(ips...),
).Find()
if err != nil {
return nil, core.NewBizErr("获取代理失败", err)
}
groups := make(map[*m.Proxy][]*m.Channel)
for _, proxy := range proxies {
findProxy[proxy.IP] = proxy
groups[proxy] = make([]*m.Channel, 0)
}
// 准备通道数据
actions := make([]*m.LogsUserUsage, len(chans))
channels := make([]*m.Channel, len(chans))
for i, ch := range chans {
if err != nil {
return nil, core.NewBizErr("解析通道地址失败", err)
}
// 使用记录
actions[i] = &m.LogsUserUsage{
UserID: user.ID,
ResourceID: resourceId,
BatchNo: batch,
Count: int32(count),
ISP: u.P(filter.Isp.String()),
Prov: filter.Prov,
City: filter.City,
IP: orm.Inet{Addr: source},
Time: now,
}
// 通道数据
inet := orm.Inet{Addr: ch.Addr()}
channels[i] = &m.Channel{
UserID: user.ID,
ResourceID: resourceId,
BatchNo: batch,
ProxyID: findProxy[inet].ID,
Port: ch.Port(),
FilterISP: filter.Isp,
FilterProv: filter.Prov,
FilterCity: filter.City,
ExpiredAt: expire,
Proxy: *findProxy[inet],
}
if authWhitelist {
channels[i].Whitelists = u.P(strings.Join(whitelistIPs, ","))
}
if authPassword {
username, password := genPassPair()
channels[i].Username = &username
channels[i].Password = &password
}
// 关联代理
proxy := findProxy[inet]
groups[proxy] = append(groups[proxy], channels[i])
}
// 保存数据
err = q.Q.Transaction(func(q *q.Query) error {
// 更新套餐用量
used := int32(count)
if u.IsSameDate(now, resource.DailyLast) {
used += resource.DailyUsed
}
switch resource.Type {
case m.ResourceTypeShort:
_, err = q.ResourceShort.
Where(
q.ResourceShort.ResourceID.Eq(resource.Id),
q.ResourceShort.Used.Eq(resource.Used),
q.ResourceShort.DailyUsed.Eq(resource.DailyUsed),
q.ResourceShort.DailyLast.Eq(resource.DailyLast),
).
UpdateSimple(
q.ResourceShort.Used.Add(int32(count)),
q.ResourceShort.DailyUsed.Value(used),
q.ResourceShort.DailyLast.Value(now),
)
case m.ResourceTypeLong:
_, err = q.ResourceLong.
Where(
q.ResourceLong.ResourceID.Eq(resource.Id),
q.ResourceLong.Used.Eq(resource.Used),
q.ResourceLong.DailyUsed.Eq(resource.DailyUsed),
q.ResourceLong.DailyLast.Eq(resource.DailyLast),
).
UpdateSimple(
q.ResourceLong.Used.Add(int32(count)),
q.ResourceLong.DailyUsed.Value(used),
q.ResourceLong.DailyLast.Value(now),
)
}
if err != nil {
return core.NewServErr("更新套餐使用记录失败", err)
}
// 保存通道和分配记录
err = q.Channel.
Omit(field.AssociationFields).
Create(channels...)
if err != nil {
return core.NewServErr("保存通道失败", err)
}
err = q.LogsUserUsage.Create(actions...)
if err != nil {
return core.NewServErr("保存用户使用记录失败", err)
}
return nil
})
if err != nil {
return nil, err
}
// 提交异步任务关闭通道
_, err = g.Asynq.Enqueue(
e.NewRemoveChannel(e.RemoveChannelData{
Batch: batch,
IDs: core.GetIDs(channels),
}),
asynq.ProcessAt(expire),
)
if err != nil {
return nil, core.NewServErr("提交关闭通道任务失败", err)
}
// 提交配置
for proxy, chanels := range groups {
secret := strings.Split(u.Z(proxy.Secret), ":")
gateway := g.NewGateway(proxy.IP.String(), secret[0], secret[1])
configs := make([]g.PortConfigsReq, len(chanels))
for i, channel := range chanels {
configs[i] = g.PortConfigsReq{
Port: int(channel.Port),
Status: true,
AutoEdgeConfig: &g.AutoEdgeConfig{
Isp: channel.FilterISP.String(),
Province: u.Z(channel.FilterProv),
City: u.Z(channel.FilterCity),
},
}
if authWhitelist {
configs[i].Whitelist = &whitelistIPs
}
if authPassword {
configs[i].Userpass = u.P(fmt.Sprintf("%s:%s", *channel.Username, *channel.Password))
}
}
if env.DebugExternalChange {
err := gateway.GatewayPortConfigs(configs)
if err != nil {
return nil, core.NewServErr(fmt.Sprintf("配置代理 %s 端口失败", proxy.IP.String()), err)
}
} else {
bytes, _ := json.Marshal(configs)
slog.Debug("提交代理端口配置", "proxy", proxy.IP.String(), "config", string(bytes))
}
}
return channels, nil
}
func (s *channelBaiyinService) RemoveChannels(batch string, ids []int32) error {
start := time.Now()
// 获取连接数据
channels, err := q.Channel.
Preload(q.Channel.Proxy).
Where(q.Channel.ID.In(ids...)).
Find()
if err != nil {
return core.NewServErr("获取通道数据失败", err)
}
if len(channels) != len(ids) {
return core.NewServErr("获取通道数据不完整", err)
}
proxies := make(map[string]*m.Proxy, len(channels))
groups := make(map[string][]*m.Channel, len(channels))
chans := make([]string, len(channels))
for i, channel := range channels {
ip := channel.Proxy.IP.String()
groups[ip] = append(groups[ip], channel)
proxies[ip] = &channel.Proxy
chans[i] = fmt.Sprintf("%s:%d", ip, channel.Port)
}
addrs := make([]netip.AddrPort, len(channels))
for i, channel := range channels {
addrs[i] = netip.AddrPortFrom(channel.Proxy.IP.Addr, channel.Port)
}
// 释放端口
err = freeChans(batch, chans)
if err != nil {
return err
}
// 清空配置
for ip, channels := range groups {
proxy := proxies[ip]
secret := strings.Split(*proxy.Secret, ":")
gateway := g.NewGateway(ip, secret[0], secret[1])
configs := make([]g.PortConfigsReq, len(channels))
for i, channel := range channels {
configs[i] = g.PortConfigsReq{
Status: false,
Port: int(channel.Port),
Edge: &[]string{},
}
}
if env.DebugExternalChange {
err := gateway.GatewayPortConfigs(configs)
if err != nil {
return core.NewServErr(fmt.Sprintf("清空代理 %s 端口配置失败", proxy.IP.String()), err)
}
} else {
bytes, _ := json.Marshal(configs)
slog.Debug("清除代理端口配置", "proxy", ip, "config", string(bytes))
}
}
slog.Debug("清除代理端口配置", "time", time.Since(start).String())
return nil
}