779 lines
17 KiB
Go
779 lines
17 KiB
Go
package services
|
|
|
|
import (
|
|
"context"
|
|
"encoding/json"
|
|
"errors"
|
|
"fmt"
|
|
"log/slog"
|
|
"math"
|
|
"math/rand/v2"
|
|
"platform/pkg/env"
|
|
"platform/pkg/orm"
|
|
"platform/pkg/rds"
|
|
"platform/pkg/remote"
|
|
"platform/pkg/v"
|
|
"platform/web/common"
|
|
"platform/web/models"
|
|
q "platform/web/queries"
|
|
"strconv"
|
|
"strings"
|
|
"time"
|
|
|
|
"github.com/gofiber/fiber/v2/middleware/requestid"
|
|
"github.com/redis/go-redis/v9"
|
|
"gorm.io/gorm"
|
|
)
|
|
|
|
var Channel = &channelService{}
|
|
|
|
type channelService struct {
|
|
}
|
|
|
|
type ChannelAuthType int
|
|
|
|
const (
|
|
ChannelAuthTypeIp = iota
|
|
ChannelAuthTypePass
|
|
)
|
|
|
|
type ChannelProtocol string
|
|
|
|
const (
|
|
ProtocolSocks5 = ChannelProtocol("socks5")
|
|
ProtocolHTTP = ChannelProtocol("http")
|
|
ProtocolHttps = ChannelProtocol("https")
|
|
)
|
|
|
|
type ResourceInfo struct {
|
|
Id int32
|
|
UserId int32
|
|
Active bool
|
|
Type int32
|
|
Live int32
|
|
DailyLimit int32
|
|
DailyUsed int32
|
|
DailyLast time.Time
|
|
Quota int32
|
|
Used int32
|
|
Expire time.Time
|
|
}
|
|
|
|
// region RemoveChannel
|
|
|
|
func (s *channelService) RemoveChannels(ctx context.Context, auth *AuthContext, id ...int32) error {
|
|
var step = time.Now()
|
|
var rid = ctx.Value(requestid.ConfigDefault.ContextKey).(string)
|
|
|
|
err := q.Q.Transaction(func(tx *q.Query) error {
|
|
|
|
// 查找通道
|
|
channels, err := tx.Channel.Where(
|
|
q.Channel.ID.In(id...),
|
|
).Find()
|
|
if err != nil {
|
|
return err
|
|
}
|
|
|
|
slog.Debug("查找通道", "rid", rid, "step", time.Since(step))
|
|
|
|
// 检查权限,如果为用户操作的话,则只能删除自己的通道
|
|
for _, channel := range channels {
|
|
if auth.Payload.Type == PayloadUser && auth.Payload.Id != channel.UserID {
|
|
return common.AuthForbiddenErr("无权限访问")
|
|
}
|
|
}
|
|
|
|
// 查找代理
|
|
step = time.Now()
|
|
|
|
proxySet := make(map[int32]struct{})
|
|
proxyIds := make([]int32, 0)
|
|
for _, channel := range channels {
|
|
if _, ok := proxySet[channel.ProxyID]; !ok {
|
|
proxyIds = append(proxyIds, channel.ProxyID)
|
|
proxySet[channel.ProxyID] = struct{}{}
|
|
}
|
|
}
|
|
proxies, err := tx.Proxy.Where(
|
|
q.Proxy.ID.In(proxyIds...),
|
|
).Find()
|
|
if err != nil {
|
|
return err
|
|
}
|
|
|
|
slog.Debug("查找代理", "rid", rid, "step", time.Since(step))
|
|
|
|
// 删除指定的通道
|
|
result, err := tx.Channel.
|
|
Where(q.Channel.ID.In(id...)).
|
|
Update(q.Channel.DeletedAt, time.Now())
|
|
if err != nil {
|
|
return err
|
|
}
|
|
if result.RowsAffected != int64(len(channels)) {
|
|
return ChannelServiceErr("删除通道失败")
|
|
}
|
|
|
|
// 删除缓存,异步任务直接在消费端处理删除
|
|
step = time.Now()
|
|
|
|
err = deleteCache(ctx, channels)
|
|
if err != nil {
|
|
return err
|
|
}
|
|
|
|
slog.Debug("删除缓存", "rid", rid, "step", time.Since(step))
|
|
|
|
// 禁用代理端口并下线用过的节点
|
|
if env.DebugExternalChange {
|
|
step = time.Now()
|
|
|
|
// 组织数据
|
|
var configMap = make(map[int32][]remote.PortConfigsReq, len(proxies))
|
|
var proxyMap = make(map[int32]*models.Proxy, len(proxies))
|
|
for _, proxy := range proxies {
|
|
configMap[proxy.ID] = make([]remote.PortConfigsReq, 0)
|
|
proxyMap[proxy.ID] = proxy
|
|
}
|
|
var portMap = make(map[uint64]struct{})
|
|
for _, channel := range channels {
|
|
var config = remote.PortConfigsReq{
|
|
Port: int(channel.ProxyPort),
|
|
Edge: &[]string{},
|
|
AutoEdgeConfig: &remote.AutoEdgeConfig{
|
|
Count: v.P(0),
|
|
},
|
|
Status: false,
|
|
}
|
|
configMap[channel.ProxyID] = append(configMap[channel.ProxyID], config)
|
|
|
|
key := uint64(channel.ProxyID)<<32 | uint64(channel.ProxyPort)
|
|
portMap[key] = struct{}{}
|
|
}
|
|
|
|
slog.Debug("组织数据", "rid", rid, "step", time.Since(step))
|
|
|
|
// 更新配置
|
|
for proxyId, configs := range configMap {
|
|
if len(configs) == 0 {
|
|
continue
|
|
}
|
|
proxy, ok := proxyMap[proxyId]
|
|
if !ok {
|
|
return ChannelServiceErr("代理不存在")
|
|
}
|
|
|
|
var secret = strings.Split(proxy.Secret, ":")
|
|
gateway := remote.NewGateway(
|
|
proxy.Host,
|
|
secret[0],
|
|
secret[1],
|
|
)
|
|
|
|
// 查询节点配置
|
|
step = time.Now()
|
|
|
|
actives, err := gateway.GatewayPortActive()
|
|
if err != nil {
|
|
return err
|
|
}
|
|
|
|
slog.Debug("查询节点配置", "rid", rid, "step", time.Since(step))
|
|
|
|
// 更新节点配置
|
|
step = time.Now()
|
|
|
|
err = gateway.GatewayPortConfigs(configs)
|
|
if err != nil {
|
|
return err
|
|
}
|
|
|
|
slog.Debug("更新节点配置", "rid", rid, "step", time.Since(step))
|
|
|
|
// 下线对应节点
|
|
step = time.Now()
|
|
|
|
var edges []string
|
|
for portStr, active := range actives {
|
|
port, err := strconv.Atoi(portStr)
|
|
if err != nil {
|
|
return err
|
|
}
|
|
key := uint64(proxyId)<<32 | uint64(port)
|
|
if _, ok := portMap[key]; ok {
|
|
edges = append(edges, active.Edge...)
|
|
}
|
|
}
|
|
if len(edges) > 0 {
|
|
_, err := remote.Cloud.CloudDisconnect(remote.CloudDisconnectReq{
|
|
Uuid: proxy.Name,
|
|
Edge: edges,
|
|
})
|
|
if err != nil {
|
|
return err
|
|
}
|
|
}
|
|
|
|
slog.Debug("下线对应节点", "rid", rid, "step", time.Since(step))
|
|
}
|
|
|
|
}
|
|
|
|
return nil
|
|
})
|
|
if err != nil {
|
|
return err
|
|
}
|
|
|
|
return nil
|
|
}
|
|
|
|
// endregion
|
|
|
|
// region CreateChannel
|
|
|
|
func (s *channelService) CreateChannel(
|
|
ctx context.Context,
|
|
auth *AuthContext,
|
|
resourceId int32,
|
|
protocol ChannelProtocol,
|
|
authType ChannelAuthType,
|
|
count int,
|
|
nodeFilter ...NodeFilterConfig,
|
|
) ([]*PortInfo, error) {
|
|
var step = time.Now()
|
|
var rid = ctx.Value(requestid.ConfigDefault.ContextKey).(string)
|
|
|
|
filter := NodeFilterConfig{}
|
|
if len(nodeFilter) > 0 {
|
|
filter = nodeFilter[0]
|
|
}
|
|
|
|
var addr []*PortInfo
|
|
err := q.Q.Transaction(func(q *q.Query) error {
|
|
|
|
// 查找套餐
|
|
step = time.Now()
|
|
|
|
var resource = new(ResourceInfo)
|
|
data := q.Resource.As("data")
|
|
pss := q.ResourcePss.As("pss")
|
|
err := data.Scopes(orm.Alias(data)).
|
|
Select(
|
|
data.ID, data.UserID, data.Active,
|
|
pss.Type, pss.Live, pss.DailyUsed, pss.DailyLimit, pss.DailyLast, pss.Quota, pss.Used, pss.Expire,
|
|
).
|
|
LeftJoin(q.ResourcePss.As("pss"), pss.ResourceID.EqCol(data.ID)).
|
|
Where(data.ID.Eq(resourceId)).
|
|
Scan(&resource)
|
|
if err != nil {
|
|
if errors.Is(err, gorm.ErrRecordNotFound) {
|
|
// 禁止 id 猜测
|
|
return ChannelServiceErr("无权限访问")
|
|
}
|
|
return err
|
|
}
|
|
|
|
slog.Debug("查找套餐", "rid", rid, "step", time.Since(step))
|
|
|
|
// 检查用户权限
|
|
err = checkUser(auth, resource, count)
|
|
if err != nil {
|
|
return err
|
|
}
|
|
|
|
// 申请节点
|
|
step = time.Now()
|
|
|
|
edgeAssigns, err := assignEdge(q, count, filter)
|
|
if err != nil {
|
|
return err
|
|
}
|
|
|
|
slog.Debug("申请节点", "rid", rid, "total", time.Since(step))
|
|
|
|
// 分配端口
|
|
step = time.Now()
|
|
|
|
now := time.Now()
|
|
expiration := now.Add(time.Duration(resource.Live) * time.Second)
|
|
_addr, channels, err := assignPort(q, edgeAssigns, auth.Payload.Id, protocol, authType, expiration, filter)
|
|
if err != nil {
|
|
return err
|
|
}
|
|
addr = _addr
|
|
|
|
slog.Debug("分配端口", "rid", rid, "total", time.Since(step))
|
|
|
|
// 更新套餐使用记录
|
|
step = time.Now()
|
|
|
|
toUpdate := models.ResourcePss{
|
|
Used: resource.Used + int32(count),
|
|
DailyLast: now,
|
|
}
|
|
last := resource.DailyLast
|
|
if now.Year() != last.Year() || now.Month() != last.Month() || now.Day() != last.Day() {
|
|
toUpdate.DailyUsed = int32(count)
|
|
} else {
|
|
toUpdate.DailyUsed = resource.DailyUsed + int32(count)
|
|
}
|
|
_, err = q.ResourcePss.
|
|
Where(q.ResourcePss.ResourceID.Eq(resourceId)).
|
|
Select(
|
|
q.ResourcePss.Used,
|
|
q.ResourcePss.DailyUsed,
|
|
q.ResourcePss.DailyLast,
|
|
).
|
|
Updates(toUpdate)
|
|
if err != nil {
|
|
return err
|
|
}
|
|
|
|
slog.Debug("更新套餐使用记录", "rid", rid, "step", time.Since(step))
|
|
|
|
// 缓存通道数据
|
|
step = time.Now()
|
|
|
|
err = cache(ctx, channels)
|
|
if err != nil {
|
|
return err
|
|
}
|
|
|
|
slog.Debug("缓存通道数据", "rid", rid, "step", time.Since(step))
|
|
|
|
return nil
|
|
})
|
|
if err != nil {
|
|
return nil, err
|
|
}
|
|
|
|
return addr, nil
|
|
}
|
|
|
|
func checkUser(auth *AuthContext, resource *ResourceInfo, count int) error {
|
|
|
|
// 检查使用人
|
|
if auth.Payload.Type == PayloadUser && auth.Payload.Id != resource.UserId {
|
|
return common.AuthForbiddenErr("无权限访问")
|
|
}
|
|
|
|
// 检查套餐状态
|
|
if !resource.Active {
|
|
return ChannelServiceErr("套餐已失效")
|
|
}
|
|
|
|
// 检查每日限额
|
|
today := time.Now().Format("2006-01-02") == resource.DailyLast.Format("2006-01-02")
|
|
dailyRemain := int(math.Max(float64(resource.DailyLimit-resource.DailyUsed), 0))
|
|
if today && dailyRemain < count {
|
|
return ChannelServiceErr("套餐每日配额不足")
|
|
}
|
|
|
|
// 检查时间或配额
|
|
if resource.Type == 1 { // 包时
|
|
if resource.Expire.Before(time.Now()) {
|
|
return ChannelServiceErr("套餐已过期")
|
|
}
|
|
} else { // 包量
|
|
remain := int(math.Max(float64(resource.Quota-resource.Used), 0))
|
|
if remain < count {
|
|
return ChannelServiceErr("套餐配额不足")
|
|
}
|
|
}
|
|
|
|
return nil
|
|
}
|
|
|
|
// assignEdge 分配边缘节点数量
|
|
func assignEdge(q *q.Query, count int, filter NodeFilterConfig) (*AssignEdgeResult, error) {
|
|
|
|
// 查询可以使用的网关
|
|
var step = time.Now()
|
|
|
|
proxies, err := q.Proxy.
|
|
Where(q.Proxy.Type.Eq(1)).
|
|
Find()
|
|
if err != nil {
|
|
return nil, err
|
|
}
|
|
|
|
slog.Debug("查找网关", "step", time.Since(step))
|
|
|
|
// 查询已配置的节点
|
|
step = time.Now()
|
|
|
|
rProxyConfigs, err := remote.Cloud.CloudAutoQuery()
|
|
if err != nil {
|
|
return nil, err
|
|
}
|
|
|
|
slog.Debug("查询已配置节点 (remote)", "step", time.Since(step))
|
|
|
|
// 查询已使用的节点
|
|
step = time.Now()
|
|
|
|
var proxyIds = make([]int32, len(proxies))
|
|
for i, proxy := range proxies {
|
|
proxyIds[i] = proxy.ID
|
|
}
|
|
channels, err := q.Channel.Debug().
|
|
Select(
|
|
q.Channel.ProxyID,
|
|
q.Channel.ProxyPort).
|
|
Where(
|
|
q.Channel.ProxyID.In(proxyIds...),
|
|
q.Channel.Expiration.Gt(time.Now())).
|
|
Group(
|
|
q.Channel.ProxyPort,
|
|
q.Channel.ProxyID).
|
|
Find()
|
|
if err != nil {
|
|
return nil, err
|
|
}
|
|
var proxyUses = make(map[int32]int, len(channels))
|
|
for _, channel := range channels {
|
|
proxyUses[channel.ProxyID]++
|
|
}
|
|
|
|
slog.Debug("查找已使用节点", "step", time.Since(step))
|
|
|
|
// 组织数据
|
|
var infos = make([]*ProxyInfo, len(proxies))
|
|
for i, proxy := range proxies {
|
|
infos[i] = &ProxyInfo{
|
|
proxy: proxy,
|
|
used: proxyUses[proxy.ID],
|
|
}
|
|
|
|
rConfigs, ok := rProxyConfigs[proxy.Name]
|
|
if !ok {
|
|
infos[i].count = 0
|
|
continue
|
|
}
|
|
|
|
for _, rConfig := range rConfigs {
|
|
if rConfig.Isp == filter.Isp && rConfig.City == filter.City && rConfig.Province == filter.Prov {
|
|
infos[i].count = rConfig.Count
|
|
}
|
|
}
|
|
}
|
|
|
|
// 分配新增的节点
|
|
var configs = make([]*ProxyConfig, len(proxies))
|
|
var needed = len(channels) + count
|
|
avg := int(math.Ceil(float64(needed) / float64(len(proxies))))
|
|
for i, info := range infos {
|
|
var prev = info.used
|
|
var next = int(math.Min(float64(avg), float64(needed)))
|
|
|
|
info.used = int(math.Max(float64(prev), float64(next)))
|
|
needed -= info.used
|
|
|
|
if env.DebugExternalChange && info.used > info.count {
|
|
step = time.Now()
|
|
|
|
slog.Debug("新增新节点", "proxy", info.proxy.Name, "used", info.used, "count", info.count)
|
|
|
|
rConfigs := rProxyConfigs[info.proxy.Name]
|
|
|
|
var newConfig = remote.AutoConfig{
|
|
Province: filter.Prov,
|
|
City: filter.City,
|
|
Isp: filter.Isp,
|
|
Count: int(math.Ceil(float64(info.used) * 2)),
|
|
}
|
|
var newConfigs []remote.AutoConfig
|
|
var update = false
|
|
for _, rConfig := range rConfigs {
|
|
if rConfig.Isp == filter.Isp && rConfig.City == filter.City && rConfig.Province == filter.Prov {
|
|
newConfigs = append(newConfigs, newConfig)
|
|
update = true
|
|
} else {
|
|
newConfigs = append(newConfigs, rConfig)
|
|
}
|
|
}
|
|
if !update {
|
|
newConfigs = append(newConfigs, newConfig)
|
|
}
|
|
|
|
err := remote.Cloud.CloudConnect(remote.CloudConnectReq{
|
|
Uuid: info.proxy.Name,
|
|
Edge: nil,
|
|
AutoConfig: newConfigs,
|
|
})
|
|
if err != nil {
|
|
return nil, err
|
|
}
|
|
|
|
slog.Debug("分配新增的节点", "step", time.Since(step))
|
|
}
|
|
|
|
configs[i] = &ProxyConfig{
|
|
proxy: info.proxy,
|
|
count: int(math.Max(float64(next-prev), 0)),
|
|
}
|
|
}
|
|
|
|
return &AssignEdgeResult{
|
|
configs: configs,
|
|
channels: channels,
|
|
}, nil
|
|
}
|
|
|
|
type ProxyInfo struct {
|
|
proxy *models.Proxy
|
|
used int
|
|
count int
|
|
}
|
|
|
|
type AssignEdgeResult struct {
|
|
configs []*ProxyConfig
|
|
channels []*models.Channel
|
|
}
|
|
|
|
type ProxyConfig struct {
|
|
proxy *models.Proxy
|
|
count int
|
|
}
|
|
|
|
// assignPort 分配指定数量的端口
|
|
func assignPort(
|
|
q *q.Query,
|
|
proxies *AssignEdgeResult,
|
|
userId int32,
|
|
protocol ChannelProtocol,
|
|
authType ChannelAuthType,
|
|
expiration time.Time,
|
|
filter NodeFilterConfig,
|
|
) ([]*PortInfo, []*models.Channel, error) {
|
|
var step time.Time
|
|
|
|
var configs = proxies.configs
|
|
var exists = proxies.channels
|
|
|
|
// 端口查找表
|
|
var portsMap = make(map[uint64]struct{})
|
|
for _, channel := range exists {
|
|
key := uint64(channel.ProxyID)<<32 | uint64(channel.ProxyPort)
|
|
portsMap[key] = struct{}{}
|
|
}
|
|
|
|
// 查找用户白名单
|
|
var whitelist []string
|
|
if authType == ChannelAuthTypeIp {
|
|
err := q.Whitelist.
|
|
Where(q.Whitelist.UserID.Eq(userId)).
|
|
Select(q.Whitelist.Host).
|
|
Scan(&whitelist)
|
|
if err != nil {
|
|
return nil, nil, err
|
|
}
|
|
}
|
|
|
|
// 配置启用代理
|
|
var result []*PortInfo
|
|
var channels []*models.Channel
|
|
for _, config := range configs {
|
|
var err error
|
|
var proxy = config.proxy
|
|
var count = config.count
|
|
|
|
// 筛选可用端口
|
|
var configs = make([]remote.PortConfigsReq, 0, count)
|
|
for port := 10000; port < 20000 && len(configs) < count; port++ {
|
|
// 跳过存在的端口
|
|
key := uint64(proxy.ID)<<32 | uint64(port)
|
|
_, ok := portsMap[key]
|
|
if ok {
|
|
continue
|
|
}
|
|
|
|
// 配置新端口
|
|
var i = len(configs)
|
|
configs = append(configs, remote.PortConfigsReq{
|
|
Port: port,
|
|
Edge: nil,
|
|
Status: true,
|
|
AutoEdgeConfig: &remote.AutoEdgeConfig{
|
|
Province: filter.Prov,
|
|
City: filter.City,
|
|
Isp: filter.Isp,
|
|
Count: v.P(1),
|
|
PacketLoss: 30,
|
|
},
|
|
})
|
|
|
|
switch authType {
|
|
case ChannelAuthTypeIp:
|
|
configs[i].Whitelist = &whitelist
|
|
configs[i].Userpass = v.P("")
|
|
for _, item := range whitelist {
|
|
channels = append(channels, &models.Channel{
|
|
UserID: userId,
|
|
ProxyID: proxy.ID,
|
|
UserHost: item,
|
|
ProxyPort: int32(port),
|
|
AuthIP: true,
|
|
AuthPass: false,
|
|
Protocol: string(protocol),
|
|
Expiration: expiration,
|
|
})
|
|
}
|
|
result = append(result, &PortInfo{
|
|
Proto: string(protocol),
|
|
Host: proxy.Host,
|
|
Port: port,
|
|
})
|
|
case ChannelAuthTypePass:
|
|
username, password := genPassPair()
|
|
configs[i].Whitelist = &[]string{}
|
|
configs[i].Userpass = v.P(fmt.Sprintf("%s:%s", username, password))
|
|
channels = append(channels, &models.Channel{
|
|
UserID: userId,
|
|
ProxyID: proxy.ID,
|
|
ProxyPort: int32(port),
|
|
AuthIP: false,
|
|
AuthPass: true,
|
|
Username: username,
|
|
Password: password,
|
|
Protocol: string(protocol),
|
|
Expiration: expiration,
|
|
})
|
|
result = append(result, &PortInfo{
|
|
Proto: string(protocol),
|
|
Host: proxy.Host,
|
|
Port: port,
|
|
Username: &username,
|
|
Password: &password,
|
|
})
|
|
}
|
|
}
|
|
|
|
if len(configs) < count {
|
|
return nil, nil, ChannelServiceErr("网关端口数量到达上限,无法分配")
|
|
}
|
|
|
|
// 保存到数据库
|
|
step = time.Now()
|
|
|
|
err = q.Channel.
|
|
Omit(
|
|
q.Channel.NodeID,
|
|
q.Channel.NodeHost,
|
|
q.Channel.DeletedAt,
|
|
).
|
|
Save(channels...)
|
|
if err != nil {
|
|
return nil, nil, err
|
|
}
|
|
|
|
slog.Debug("保存到数据库", "step", time.Since(step))
|
|
|
|
// 提交端口配置并更新节点列表
|
|
if env.DebugExternalChange {
|
|
step = time.Now()
|
|
|
|
var secret = strings.Split(proxy.Secret, ":")
|
|
gateway := remote.NewGateway(
|
|
proxy.Host,
|
|
secret[0],
|
|
secret[1],
|
|
)
|
|
err = gateway.GatewayPortConfigs(configs)
|
|
if err != nil {
|
|
return nil, nil, err
|
|
}
|
|
|
|
slog.Debug("提交端口配置", "step", time.Since(step))
|
|
}
|
|
}
|
|
|
|
return result, channels, nil
|
|
}
|
|
|
|
type PortInfo struct {
|
|
Proto string `json:"-"`
|
|
Host string `json:"host"`
|
|
Port int `json:"port"`
|
|
Username *string `json:"username,omitempty"`
|
|
Password *string `json:"password,omitempty"`
|
|
}
|
|
|
|
// endregion
|
|
|
|
func genPassPair() (string, string) {
|
|
var alphabet = []rune("abcdefghjkmnpqrstuvwxyz")
|
|
var numbers = []rune("23456789")
|
|
|
|
var username = make([]rune, 6)
|
|
var password = make([]rune, 6)
|
|
for i := range 6 {
|
|
if i < 2 {
|
|
username[i] = alphabet[rand.N(len(alphabet))]
|
|
} else {
|
|
username[i] = numbers[rand.N(len(numbers))]
|
|
}
|
|
password[i] = numbers[rand.N(len(numbers))]
|
|
}
|
|
|
|
return string(username), string(password)
|
|
}
|
|
|
|
func chKey(channel *models.Channel) string {
|
|
return fmt.Sprintf("channel:%d", channel.ID)
|
|
}
|
|
|
|
func cache(ctx context.Context, channels []*models.Channel) error {
|
|
if len(channels) == 0 {
|
|
return nil
|
|
}
|
|
|
|
pipe := rds.Client.TxPipeline()
|
|
|
|
zList := make([]redis.Z, 0, len(channels))
|
|
for _, channel := range channels {
|
|
marshal, err := json.Marshal(channel)
|
|
if err != nil {
|
|
return err
|
|
}
|
|
pipe.Set(ctx, chKey(channel), string(marshal), time.Until(channel.Expiration))
|
|
zList = append(zList, redis.Z{
|
|
Score: float64(channel.Expiration.Unix()),
|
|
Member: channel.ID,
|
|
})
|
|
}
|
|
pipe.ZAdd(ctx, "tasks:channel", zList...)
|
|
|
|
_, err := pipe.Exec(ctx)
|
|
if err != nil {
|
|
return err
|
|
}
|
|
|
|
return nil
|
|
}
|
|
|
|
func deleteCache(ctx context.Context, channels []*models.Channel) error {
|
|
if len(channels) == 0 {
|
|
return nil
|
|
}
|
|
|
|
keys := make([]string, len(channels))
|
|
for i := range channels {
|
|
keys[i] = chKey(channels[i])
|
|
}
|
|
_, err := rds.Client.Del(ctx, keys...).Result()
|
|
if err != nil {
|
|
return err
|
|
}
|
|
|
|
return nil
|
|
}
|
|
|
|
type ChannelServiceErr string
|
|
|
|
func (c ChannelServiceErr) Error() string {
|
|
return string(c)
|
|
}
|