From 5649a03c47f2464028d02b7a9a29083eb60a5569 Mon Sep 17 00:00:00 2001 From: luorijun Date: Fri, 5 Dec 2025 18:31:30 +0800 Subject: [PATCH] =?UTF-8?q?=E4=BC=98=E5=8C=96=E9=80=9A=E9=81=93=E5=85=B3?= =?UTF-8?q?=E9=97=AD=E6=B5=81=E7=A8=8B=EF=BC=8C=E5=8F=AA=E9=9D=A0=20batch?= =?UTF-8?q?=20id=20=E7=B4=A2=E5=BC=95=E9=80=9A=E9=81=93?= MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit --- README.md | 15 +++++++++++-- web/events/channel.go | 17 ++------------- web/handlers/channel.go | 5 ++--- web/services/channel.go | 2 +- web/services/channel_baiyin.go | 40 +++++++++++++++------------------- web/tasks/task.go | 11 ++-------- 6 files changed, 37 insertions(+), 53 deletions(-) diff --git a/README.md b/README.md index 1ab4e0a..1103dd2 100644 --- a/README.md +++ b/README.md @@ -6,6 +6,8 @@ jsonb 类型转换问题,考虑一个高效的 any 到 struct 转换工具 端口资源池的 gc 实现 +考虑一个方案限制接口请求速率,无侵入更好 + 标准化生产环境 cors 配置 底层调用集成 otel @@ -36,8 +38,17 @@ jsonb 类型转换问题,考虑一个高效的 any 到 struct 转换工具 cmd 调用 app, http 的初始化函数 http 调用 clients 的初始化函数 ``` - -考虑一个方案限制接口请求速率,无侵入更好 + +开号流程事务化 + +开号: +- 提交关闭任务 +- 保存数据 +- 开通端口 + +过期: +- 接口 +- redis 冷数据迁移方案 diff --git a/web/events/channel.go b/web/events/channel.go index 59d734a..83fd339 100644 --- a/web/events/channel.go +++ b/web/events/channel.go @@ -1,24 +1,11 @@ package events import ( - "encoding/json" - "log/slog" - "github.com/hibiken/asynq" ) const RemoveChannel = "channel:remove" -type RemoveChannelData struct { - Batch string `json:"batch"` - IDs []int32 `json:"ids"` -} - -func NewRemoveChannel(data RemoveChannelData) *asynq.Task { - bytes, err := json.Marshal(data) - if err != nil { - slog.Error("序列化删除通道任务失败", "error", err) - return nil - } - return asynq.NewTask(RemoveChannel, bytes) +func NewRemoveChannel(batch string) *asynq.Task { + return asynq.NewTask(RemoveChannel, []byte(batch)) } diff --git a/web/handlers/channel.go b/web/handlers/channel.go index b2cbd0b..b188c92 100644 --- a/web/handlers/channel.go +++ b/web/handlers/channel.go @@ -162,8 +162,7 @@ type CreateChannelResultType string // region RemoveChannels type RemoveChannelsReq struct { - Batch string `json:"batch" validate:"required"` - Ids []int32 `json:"ids" validate:"required"` + Batch string `json:"batch" validate:"required"` } func RemoveChannels(c *fiber.Ctx) error { @@ -180,7 +179,7 @@ func RemoveChannels(c *fiber.Ctx) error { } // 删除通道 - err = s.Channel.RemoveChannels(req.Batch, req.Ids) + err = s.Channel.RemoveChannels(req.Batch) if err != nil { return err } diff --git a/web/services/channel.go b/web/services/channel.go index e0c9bae..0c615ce 100644 --- a/web/services/channel.go +++ b/web/services/channel.go @@ -18,7 +18,7 @@ var Channel ChannelService = &channelBaiyinService{} // 通道服务 type ChannelService interface { CreateChannels(source netip.Addr, resourceId int32, authWhitelist bool, authPassword bool, count int, edgeFilter ...EdgeFilter) ([]*m.Channel, error) - RemoveChannels(batch string, ids []int32) error + RemoveChannels(batch string) error } // 授权方式 diff --git a/web/services/channel_baiyin.go b/web/services/channel_baiyin.go index 81b3b1c..82535a1 100644 --- a/web/services/channel_baiyin.go +++ b/web/services/channel_baiyin.go @@ -180,6 +180,15 @@ func (s *channelBaiyinService) CreateChannels(source netip.Addr, resourceId int3 groups[proxy] = append(groups[proxy], channels[i]) } + // 提交异步任务关闭通道 + _, err = g.Asynq.Enqueue( + e.NewRemoveChannel(batch), + asynq.ProcessAt(expire), + ) + if err != nil { + return nil, core.NewServErr("提交关闭通道任务失败", err) + } + // 保存数据 err = q.Q.Transaction(func(q *q.Query) error { @@ -240,18 +249,6 @@ func (s *channelBaiyinService) CreateChannels(source netip.Addr, resourceId int3 return nil, err } - // 提交异步任务关闭通道 - _, err = g.Asynq.Enqueue( - e.NewRemoveChannel(e.RemoveChannelData{ - Batch: batch, - IDs: core.GetIDs(channels), - }), - asynq.ProcessAt(expire), - ) - if err != nil { - return nil, core.NewServErr("提交关闭通道任务失败", err) - } - // 提交配置 for proxy, chanels := range groups { secret := strings.Split(u.Z(proxy.Secret), ":") @@ -290,20 +287,17 @@ func (s *channelBaiyinService) CreateChannels(source netip.Addr, resourceId int3 return channels, nil } -func (s *channelBaiyinService) RemoveChannels(batch string, ids []int32) error { +func (s *channelBaiyinService) RemoveChannels(batch string) error { start := time.Now() // 获取连接数据 channels, err := q.Channel. Preload(q.Channel.Proxy). - Where(q.Channel.ID.In(ids...)). + Where(q.Channel.BatchNo.Eq(batch)). Find() if err != nil { return core.NewServErr("获取通道数据失败", err) } - if len(channels) != len(ids) { - return core.NewServErr("获取通道数据不完整", err) - } proxies := make(map[string]*m.Proxy, len(channels)) groups := make(map[string][]*m.Channel, len(channels)) @@ -320,12 +314,6 @@ func (s *channelBaiyinService) RemoveChannels(batch string, ids []int32) error { addrs[i] = netip.AddrPortFrom(channel.Proxy.IP.Addr, channel.Port) } - // 释放端口 - err = freeChans(batch, chans) - if err != nil { - return err - } - // 清空配置 for ip, channels := range groups { proxy := proxies[ip] @@ -352,6 +340,12 @@ func (s *channelBaiyinService) RemoveChannels(batch string, ids []int32) error { } } + // 释放端口 + err = freeChans(batch, chans) + if err != nil { + return err + } + slog.Debug("清除代理端口配置", "time", time.Since(start).String()) return nil } diff --git a/web/tasks/task.go b/web/tasks/task.go index 3f519bb..26bbd65 100644 --- a/web/tasks/task.go +++ b/web/tasks/task.go @@ -8,7 +8,6 @@ import ( "platform/pkg/env" "platform/pkg/u" "platform/web/events" - e "platform/web/events" g "platform/web/globals" m "platform/web/models" q "platform/web/queries" @@ -45,17 +44,11 @@ func HandleCompleteTrade(_ context.Context, task *asynq.Task) (err error) { } func HandleRemoveChannel(_ context.Context, task *asynq.Task) (err error) { - data := new(e.RemoveChannelData) - err = json.Unmarshal(task.Payload(), data) - if err != nil { - return fmt.Errorf("解析任务参数失败: %w", err) - } - - err = s.Channel.RemoveChannels(data.Batch, data.IDs) + batch := string(task.Payload()) + err = s.Channel.RemoveChannels(batch) if err != nil { return fmt.Errorf("删除通道失败: %w", err) } - return nil }