优化通道关闭流程,只靠 batch id 索引通道
This commit is contained in:
13
README.md
13
README.md
@@ -6,6 +6,8 @@ jsonb 类型转换问题,考虑一个高效的 any 到 struct 转换工具
|
|||||||
|
|
||||||
端口资源池的 gc 实现
|
端口资源池的 gc 实现
|
||||||
|
|
||||||
|
考虑一个方案限制接口请求速率,无侵入更好
|
||||||
|
|
||||||
标准化生产环境 cors 配置
|
标准化生产环境 cors 配置
|
||||||
|
|
||||||
底层调用集成 otel
|
底层调用集成 otel
|
||||||
@@ -37,7 +39,16 @@ cmd 调用 app, http 的初始化函数
|
|||||||
http 调用 clients 的初始化函数
|
http 调用 clients 的初始化函数
|
||||||
```
|
```
|
||||||
|
|
||||||
考虑一个方案限制接口请求速率,无侵入更好
|
开号流程事务化
|
||||||
|
|
||||||
|
开号:
|
||||||
|
- 提交关闭任务
|
||||||
|
- 保存数据
|
||||||
|
- 开通端口
|
||||||
|
|
||||||
|
过期:
|
||||||
|
- 接口
|
||||||
|
- redis
|
||||||
|
|
||||||
冷数据迁移方案
|
冷数据迁移方案
|
||||||
|
|
||||||
|
|||||||
@@ -1,24 +1,11 @@
|
|||||||
package events
|
package events
|
||||||
|
|
||||||
import (
|
import (
|
||||||
"encoding/json"
|
|
||||||
"log/slog"
|
|
||||||
|
|
||||||
"github.com/hibiken/asynq"
|
"github.com/hibiken/asynq"
|
||||||
)
|
)
|
||||||
|
|
||||||
const RemoveChannel = "channel:remove"
|
const RemoveChannel = "channel:remove"
|
||||||
|
|
||||||
type RemoveChannelData struct {
|
func NewRemoveChannel(batch string) *asynq.Task {
|
||||||
Batch string `json:"batch"`
|
return asynq.NewTask(RemoveChannel, []byte(batch))
|
||||||
IDs []int32 `json:"ids"`
|
|
||||||
}
|
|
||||||
|
|
||||||
func NewRemoveChannel(data RemoveChannelData) *asynq.Task {
|
|
||||||
bytes, err := json.Marshal(data)
|
|
||||||
if err != nil {
|
|
||||||
slog.Error("序列化删除通道任务失败", "error", err)
|
|
||||||
return nil
|
|
||||||
}
|
|
||||||
return asynq.NewTask(RemoveChannel, bytes)
|
|
||||||
}
|
}
|
||||||
|
|||||||
@@ -163,7 +163,6 @@ type CreateChannelResultType string
|
|||||||
|
|
||||||
type RemoveChannelsReq struct {
|
type RemoveChannelsReq struct {
|
||||||
Batch string `json:"batch" validate:"required"`
|
Batch string `json:"batch" validate:"required"`
|
||||||
Ids []int32 `json:"ids" validate:"required"`
|
|
||||||
}
|
}
|
||||||
|
|
||||||
func RemoveChannels(c *fiber.Ctx) error {
|
func RemoveChannels(c *fiber.Ctx) error {
|
||||||
@@ -180,7 +179,7 @@ func RemoveChannels(c *fiber.Ctx) error {
|
|||||||
}
|
}
|
||||||
|
|
||||||
// 删除通道
|
// 删除通道
|
||||||
err = s.Channel.RemoveChannels(req.Batch, req.Ids)
|
err = s.Channel.RemoveChannels(req.Batch)
|
||||||
if err != nil {
|
if err != nil {
|
||||||
return err
|
return err
|
||||||
}
|
}
|
||||||
|
|||||||
@@ -18,7 +18,7 @@ var Channel ChannelService = &channelBaiyinService{}
|
|||||||
// 通道服务
|
// 通道服务
|
||||||
type ChannelService interface {
|
type ChannelService interface {
|
||||||
CreateChannels(source netip.Addr, resourceId int32, authWhitelist bool, authPassword bool, count int, edgeFilter ...EdgeFilter) ([]*m.Channel, error)
|
CreateChannels(source netip.Addr, resourceId int32, authWhitelist bool, authPassword bool, count int, edgeFilter ...EdgeFilter) ([]*m.Channel, error)
|
||||||
RemoveChannels(batch string, ids []int32) error
|
RemoveChannels(batch string) error
|
||||||
}
|
}
|
||||||
|
|
||||||
// 授权方式
|
// 授权方式
|
||||||
|
|||||||
@@ -180,6 +180,15 @@ func (s *channelBaiyinService) CreateChannels(source netip.Addr, resourceId int3
|
|||||||
groups[proxy] = append(groups[proxy], channels[i])
|
groups[proxy] = append(groups[proxy], channels[i])
|
||||||
}
|
}
|
||||||
|
|
||||||
|
// 提交异步任务关闭通道
|
||||||
|
_, err = g.Asynq.Enqueue(
|
||||||
|
e.NewRemoveChannel(batch),
|
||||||
|
asynq.ProcessAt(expire),
|
||||||
|
)
|
||||||
|
if err != nil {
|
||||||
|
return nil, core.NewServErr("提交关闭通道任务失败", err)
|
||||||
|
}
|
||||||
|
|
||||||
// 保存数据
|
// 保存数据
|
||||||
err = q.Q.Transaction(func(q *q.Query) error {
|
err = q.Q.Transaction(func(q *q.Query) error {
|
||||||
|
|
||||||
@@ -240,18 +249,6 @@ func (s *channelBaiyinService) CreateChannels(source netip.Addr, resourceId int3
|
|||||||
return nil, err
|
return nil, err
|
||||||
}
|
}
|
||||||
|
|
||||||
// 提交异步任务关闭通道
|
|
||||||
_, err = g.Asynq.Enqueue(
|
|
||||||
e.NewRemoveChannel(e.RemoveChannelData{
|
|
||||||
Batch: batch,
|
|
||||||
IDs: core.GetIDs(channels),
|
|
||||||
}),
|
|
||||||
asynq.ProcessAt(expire),
|
|
||||||
)
|
|
||||||
if err != nil {
|
|
||||||
return nil, core.NewServErr("提交关闭通道任务失败", err)
|
|
||||||
}
|
|
||||||
|
|
||||||
// 提交配置
|
// 提交配置
|
||||||
for proxy, chanels := range groups {
|
for proxy, chanels := range groups {
|
||||||
secret := strings.Split(u.Z(proxy.Secret), ":")
|
secret := strings.Split(u.Z(proxy.Secret), ":")
|
||||||
@@ -290,20 +287,17 @@ func (s *channelBaiyinService) CreateChannels(source netip.Addr, resourceId int3
|
|||||||
return channels, nil
|
return channels, nil
|
||||||
}
|
}
|
||||||
|
|
||||||
func (s *channelBaiyinService) RemoveChannels(batch string, ids []int32) error {
|
func (s *channelBaiyinService) RemoveChannels(batch string) error {
|
||||||
start := time.Now()
|
start := time.Now()
|
||||||
|
|
||||||
// 获取连接数据
|
// 获取连接数据
|
||||||
channels, err := q.Channel.
|
channels, err := q.Channel.
|
||||||
Preload(q.Channel.Proxy).
|
Preload(q.Channel.Proxy).
|
||||||
Where(q.Channel.ID.In(ids...)).
|
Where(q.Channel.BatchNo.Eq(batch)).
|
||||||
Find()
|
Find()
|
||||||
if err != nil {
|
if err != nil {
|
||||||
return core.NewServErr("获取通道数据失败", err)
|
return core.NewServErr("获取通道数据失败", err)
|
||||||
}
|
}
|
||||||
if len(channels) != len(ids) {
|
|
||||||
return core.NewServErr("获取通道数据不完整", err)
|
|
||||||
}
|
|
||||||
|
|
||||||
proxies := make(map[string]*m.Proxy, len(channels))
|
proxies := make(map[string]*m.Proxy, len(channels))
|
||||||
groups := make(map[string][]*m.Channel, len(channels))
|
groups := make(map[string][]*m.Channel, len(channels))
|
||||||
@@ -320,12 +314,6 @@ func (s *channelBaiyinService) RemoveChannels(batch string, ids []int32) error {
|
|||||||
addrs[i] = netip.AddrPortFrom(channel.Proxy.IP.Addr, channel.Port)
|
addrs[i] = netip.AddrPortFrom(channel.Proxy.IP.Addr, channel.Port)
|
||||||
}
|
}
|
||||||
|
|
||||||
// 释放端口
|
|
||||||
err = freeChans(batch, chans)
|
|
||||||
if err != nil {
|
|
||||||
return err
|
|
||||||
}
|
|
||||||
|
|
||||||
// 清空配置
|
// 清空配置
|
||||||
for ip, channels := range groups {
|
for ip, channels := range groups {
|
||||||
proxy := proxies[ip]
|
proxy := proxies[ip]
|
||||||
@@ -352,6 +340,12 @@ func (s *channelBaiyinService) RemoveChannels(batch string, ids []int32) error {
|
|||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
|
// 释放端口
|
||||||
|
err = freeChans(batch, chans)
|
||||||
|
if err != nil {
|
||||||
|
return err
|
||||||
|
}
|
||||||
|
|
||||||
slog.Debug("清除代理端口配置", "time", time.Since(start).String())
|
slog.Debug("清除代理端口配置", "time", time.Since(start).String())
|
||||||
return nil
|
return nil
|
||||||
}
|
}
|
||||||
|
|||||||
@@ -8,7 +8,6 @@ import (
|
|||||||
"platform/pkg/env"
|
"platform/pkg/env"
|
||||||
"platform/pkg/u"
|
"platform/pkg/u"
|
||||||
"platform/web/events"
|
"platform/web/events"
|
||||||
e "platform/web/events"
|
|
||||||
g "platform/web/globals"
|
g "platform/web/globals"
|
||||||
m "platform/web/models"
|
m "platform/web/models"
|
||||||
q "platform/web/queries"
|
q "platform/web/queries"
|
||||||
@@ -45,17 +44,11 @@ func HandleCompleteTrade(_ context.Context, task *asynq.Task) (err error) {
|
|||||||
}
|
}
|
||||||
|
|
||||||
func HandleRemoveChannel(_ context.Context, task *asynq.Task) (err error) {
|
func HandleRemoveChannel(_ context.Context, task *asynq.Task) (err error) {
|
||||||
data := new(e.RemoveChannelData)
|
batch := string(task.Payload())
|
||||||
err = json.Unmarshal(task.Payload(), data)
|
err = s.Channel.RemoveChannels(batch)
|
||||||
if err != nil {
|
|
||||||
return fmt.Errorf("解析任务参数失败: %w", err)
|
|
||||||
}
|
|
||||||
|
|
||||||
err = s.Channel.RemoveChannels(data.Batch, data.IDs)
|
|
||||||
if err != nil {
|
if err != nil {
|
||||||
return fmt.Errorf("删除通道失败: %w", err)
|
return fmt.Errorf("删除通道失败: %w", err)
|
||||||
}
|
}
|
||||||
|
|
||||||
return nil
|
return nil
|
||||||
}
|
}
|
||||||
|
|
||||||
|
|||||||
Reference in New Issue
Block a user