From 71554da54175e42d99d70eb96d10e57c207976ea Mon Sep 17 00:00:00 2001 From: luorijun Date: Mon, 18 May 2026 13:54:01 +0800 Subject: [PATCH] =?UTF-8?q?=E4=BF=AE=E5=A4=8D=E6=8F=90=E5=8F=96=E5=B9=B6?= =?UTF-8?q?=E5=8F=91=E9=97=AE=E9=A2=98=20&=20=E4=BF=AE=E5=A4=8D=E6=8E=A5?= =?UTF-8?q?=E5=8F=A3=E6=97=B6=E5=8C=BA=E9=97=AE=E9=A2=98?= MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit --- README.md | 3 + pkg/u/u.go | 8 +- web/events/edges.go | 9 ++ web/handlers/balance_activity.go | 2 +- web/handlers/batch.go | 18 ++- web/handlers/bill.go | 6 +- web/handlers/channel.go | 182 ++++++++++++++------------- web/handlers/coupon_user.go | 18 ++- web/handlers/resource.go | 40 +++--- web/handlers/user.go | 24 +++- web/services/channel.go | 43 +++++++ web/services/channel_baiyin.go | 206 ++++++++++++++----------------- web/services/edge.go | 13 ++ web/services/proxy.go | 13 ++ web/tasks/task.go | 10 ++ web/web.go | 30 ++++- 16 files changed, 386 insertions(+), 239 deletions(-) create mode 100644 web/events/edges.go diff --git a/README.md b/README.md index b9b325b..9178785 100644 --- a/README.md +++ b/README.md @@ -8,6 +8,9 @@ - 在提取流程中如果发生异常,可能导致端口被占用但通道信息没有被写入数据库,配置以及端口将无法解除 - 提交删除任务时,带上配置信息,无需再查数据库且接口总是会被正确清理 - 异常情况下,将只清理网关端口,而无法解除节点连接,这个问题需要额外处理 + - 数据库并发保存会导致失败,从而触发端口悬空,但是配置已经成功提交到网关,导致无法清理 + +redis 中没有记录 edge 信息,无法断开 --- diff --git a/pkg/u/u.go b/pkg/u/u.go index d83269d..c757a5b 100644 --- a/pkg/u/u.go +++ b/pkg/u/u.go @@ -82,13 +82,13 @@ func Map[T any, R any](src []T, convert func(T) R) []R { // ==================== func DateHead(date time.Time) time.Time { - var y, m, d = date.Date() - return time.Date(y, m, d, 0, 0, 0, 0, date.Location()) + var y, m, d = date.Local().Date() + return time.Date(y, m, d, 0, 0, 0, 0, time.Local).UTC() } func DateTail(date time.Time) time.Time { - var y, m, d = date.Date() - return time.Date(y, m, d, 23, 59, 59, 999999999, date.Location()) + var y, m, d = date.Local().Date() + return time.Date(y, m, d, 23, 59, 59, 999999999, time.Local).UTC() } func Today() time.Time { diff --git a/web/events/edges.go b/web/events/edges.go new file mode 100644 index 0000000..52e63e1 --- /dev/null +++ b/web/events/edges.go @@ -0,0 +1,9 @@ +package events + +import "github.com/hibiken/asynq" + +const RefreshEdge = "edge:refresh" + +func NewRefreshEdge() *asynq.Task { + return asynq.NewTask(RefreshEdge, nil) +} diff --git a/web/handlers/balance_activity.go b/web/handlers/balance_activity.go index 319579e..cd761e9 100644 --- a/web/handlers/balance_activity.go +++ b/web/handlers/balance_activity.go @@ -102,7 +102,7 @@ func PageBalanceActivityByAdmin(c *fiber.Ctx) error { } // 查询余额变动列表 - list, total, err := q.BalanceActivity.Debug(). + list, total, err := q.BalanceActivity. Joins(q.BalanceActivity.User, q.BalanceActivity.Admin, q.BalanceActivity.Bill). Select( q.BalanceActivity.ALL, diff --git a/web/handlers/batch.go b/web/handlers/batch.go index 4686290..b21325c 100644 --- a/web/handlers/batch.go +++ b/web/handlers/batch.go @@ -29,13 +29,20 @@ func PageBatch(ctx *fiber.Ctx) error { // 查询批次 conds := q.LogsUserUsage.Where(q.LogsUserUsage.UserID.Eq(authCtx.User.ID)) if req.TimeStart != nil { - conds.Where(q.LogsUserUsage.Time.Gte(*req.TimeStart)) + start := u.DateHead(*req.TimeStart) + conds.Where(q.LogsUserUsage.Time.Gte(start)) } if req.TimeEnd != nil { - conds.Where(q.LogsUserUsage.Time.Lte(*req.TimeEnd)) + end := u.DateTail(*req.TimeEnd) + conds.Where(q.LogsUserUsage.Time.Lte(end)) + } + if req.ResourceNo != nil { + conds.Where(q.Resource.As("Resource").ResourceNo.Eq(*req.ResourceNo)) } - list, total, err := q.LogsUserUsage.Where(conds). + list, total, err := q.LogsUserUsage.Debug(). + Joins(q.LogsUserUsage.Resource). + Where(conds). Order(q.LogsUserUsage.Time.Desc()). FindByPage(req.GetOffset(), req.GetLimit()) if err != nil { @@ -53,8 +60,9 @@ func PageBatch(ctx *fiber.Ctx) error { type PageResourceBatchReq struct { c.PageReq - TimeStart *time.Time `json:"time_start"` - TimeEnd *time.Time `json:"time_end"` + ResourceNo *string `json:"resource_no"` + TimeStart *time.Time `json:"time_start"` + TimeEnd *time.Time `json:"time_end"` } // PageBatchByAdmin 分页查询所有提取记录 diff --git a/web/handlers/bill.go b/web/handlers/bill.go index 54e5c7c..4e431d4 100644 --- a/web/handlers/bill.go +++ b/web/handlers/bill.go @@ -207,10 +207,12 @@ func ListBill(c *fiber.Ctx) error { do.Where(q.Bill.Type.Eq(int(*req.Type))) } if req.CreateAfter != nil { - do.Where(q.Bill.CreatedAt.Gte(*req.CreateAfter)) + start := u.DateHead(*req.CreateAfter) + do = do.Where(q.Bill.CreatedAt.Gte(start)) } if req.CreateBefore != nil { - do.Where(q.Bill.CreatedAt.Lte(*req.CreateBefore)) + end := u.DateTail(*req.CreateBefore) + do = do.Where(q.Bill.CreatedAt.Lte(end)) } if req.BillNo != nil && *req.BillNo != "" { do.Where(q.Bill.BillNo.Eq(*req.BillNo)) diff --git a/web/handlers/channel.go b/web/handlers/channel.go index 114aaae..7315fc4 100644 --- a/web/handlers/channel.go +++ b/web/handlers/channel.go @@ -15,90 +15,6 @@ import ( "github.com/gofiber/fiber/v2" ) -// PageChannelByAdmin 分页查询所有通道 -func PageChannelByAdmin(c *fiber.Ctx) error { - // 检查权限 - _, err := auth.GetAuthCtx(c).PermitAdmin(core.ScopeChannelRead) - if err != nil { - return err - } - - // 解析请求参数 - var req PageChannelsByAdminReq - if err := g.Validator.ParseBody(c, &req); err != nil { - return err - } - - // 构建查询条件 - do := q.Channel.Where() - if req.UserPhone != nil { - do = do.Where(q.User.As("User").Phone.Eq(*req.UserPhone)) - } - if req.ResourceNo != nil { - do = do.Where(q.Resource.As("Resource").ResourceNo.Eq(*req.ResourceNo)) - } - if req.BatchNo != nil { - do = do.Where(q.Channel.BatchNo.Eq(*req.BatchNo)) - } - if req.ProxyHost != nil { - do = do.Where(q.Channel.Host.Eq(*req.ProxyHost)) - } - if req.ProxyPort != nil { - do = do.Where(q.Channel.Port.Eq(*req.ProxyPort)) - } - if req.NodeIP != nil { - ip, err := orm.ParseInet(*req.NodeIP) - if err != nil { - return core.NewBizErr("查询参数 ip 格式不正确") - } - do = do.Where(q.Channel.IP.Eq(ip)) - } - if req.ExpiredAtStart != nil { - time := u.DateHead(*req.ExpiredAtStart) - do = do.Where(q.Channel.ExpiredAt.Gte(time)) - } - if req.ExpiredAtEnd != nil { - time := u.DateHead(*req.ExpiredAtEnd) - do = do.Where(q.Channel.ExpiredAt.Lte(time)) - } - - // 查询通道列表 - list, total, err := q.Channel. - Joins(q.Channel.User, q.Channel.Resource). - Select( - q.Channel.ALL, - q.Resource.As("Resource").ResourceNo.As("Resource__resource_no"), - q.User.As("User").Phone.As("User__phone"), - q.User.As("User").Name.As("User__name"), - ). - Where(do). - Order(q.Channel.CreatedAt.Desc()). - FindByPage(req.GetOffset(), req.GetLimit()) - if err != nil { - return err - } - - // 返回结果 - return c.JSON(core.PageResp{ - List: list, - Total: int(total), - Page: req.GetPage(), - Size: req.GetSize(), - }) -} - -type PageChannelsByAdminReq struct { - core.PageReq - UserPhone *string `json:"user_phone"` - ResourceNo *string `json:"resource_no"` - BatchNo *string `json:"batch_no"` - ProxyHost *string `json:"proxy_host"` - ProxyPort *uint16 `json:"proxy_port"` - NodeIP *string `json:"node_ip" validator:"omitempty,ip"` - ExpiredAtStart *time.Time `json:"expired_at_start"` - ExpiredAtEnd *time.Time `json:"expired_at_end"` -} - // ListChannel 分页查询当前用户通道 func ListChannel(c *fiber.Ctx) error { // 检查权限 @@ -126,10 +42,12 @@ func ListChannel(c *fiber.Ctx) error { } if req.ExpireAfter != nil { - cond.Where(q.Channel.ExpiredAt.Gte(*req.ExpireAfter)) + start := u.DateHead(*req.ExpireAfter) + cond = cond.Where(q.Channel.ExpiredAt.Gte(start)) } if req.ExpireBefore != nil { - cond.Where(q.Channel.ExpiredAt.Lte(*req.ExpireBefore)) + end := u.DateTail(*req.ExpireBefore) + cond = cond.Where(q.Channel.ExpiredAt.Lte(end)) } // 查询数据 @@ -269,6 +187,98 @@ type RemoveChannelsReq struct { Batch string `json:"batch" validate:"required"` } +// PageChannelByAdmin 分页查询所有通道 +func PageChannelByAdmin(c *fiber.Ctx) error { + // 检查权限 + _, err := auth.GetAuthCtx(c).PermitAdmin(core.ScopeChannelRead) + if err != nil { + return err + } + + // 解析请求参数 + var req PageChannelsByAdminReq + if err := g.Validator.ParseBody(c, &req); err != nil { + return err + } + + // 构建查询条件 + do := q.Channel.Where() + if req.UserPhone != nil { + do = do.Where(q.User.As("User").Phone.Eq(*req.UserPhone)) + } + if req.ResourceNo != nil { + do = do.Where(q.Resource.As("Resource").ResourceNo.Eq(*req.ResourceNo)) + } + if req.BatchNo != nil { + do = do.Where(q.Channel.BatchNo.Eq(*req.BatchNo)) + } + if req.ProxyHost != nil { + do = do.Where(q.Channel.Host.Eq(*req.ProxyHost)) + } + if req.ProxyPort != nil { + do = do.Where(q.Channel.Port.Eq(*req.ProxyPort)) + } + if req.NodeIP != nil { + ip, err := orm.ParseInet(*req.NodeIP) + if err != nil { + return core.NewBizErr("查询参数 ip 格式不正确") + } + do = do.Where(q.Channel.IP.Eq(ip)) + } + if req.ExpiredAtStart != nil { + time := u.DateHead(*req.ExpiredAtStart) + do = do.Where(q.Channel.ExpiredAt.Gte(time)) + } + if req.ExpiredAtEnd != nil { + time := u.DateHead(*req.ExpiredAtEnd) + do = do.Where(q.Channel.ExpiredAt.Lte(time)) + } + if req.Expired != nil { + if *req.Expired { + do = do.Where(q.Channel.ExpiredAt.Lte(time.Now())) + } else { + do = do.Where(q.Channel.ExpiredAt.Gt(time.Now())) + } + } + + // 查询通道列表 + list, total, err := q.Channel.Debug(). + Joins(q.Channel.User, q.Channel.Resource). + Select( + q.Channel.ALL, + q.Resource.As("Resource").ResourceNo.As("Resource__resource_no"), + q.User.As("User").Phone.As("User__phone"), + q.User.As("User").Name.As("User__name"), + ). + Where(do). + Order(q.Channel.CreatedAt.Desc()). + FindByPage(req.GetOffset(), req.GetLimit()) + if err != nil { + return err + } + + // 返回结果 + return c.JSON(core.PageResp{ + List: list, + Total: int(total), + Page: req.GetPage(), + Size: req.GetSize(), + }) +} + +type PageChannelsByAdminReq struct { + core.PageReq + UserPhone *string `json:"user_phone"` + ResourceNo *string `json:"resource_no"` + BatchNo *string `json:"batch_no"` + ProxyHost *string `json:"proxy_host"` + ProxyPort *uint16 `json:"proxy_port"` + NodeIP *string `json:"node_ip" validator:"omitempty,ip"` + ExpiredAtStart *time.Time `json:"expired_at_start"` + ExpiredAtEnd *time.Time `json:"expired_at_end"` + Expired *bool `json:"expired"` +} + // PageChannelOfUserByAdmin 分页查询指定用户的通道 func PageChannelOfUserByAdmin(c *fiber.Ctx) error { // 检查权限 diff --git a/web/handlers/coupon_user.go b/web/handlers/coupon_user.go index 5309d30..e1d7174 100644 --- a/web/handlers/coupon_user.go +++ b/web/handlers/coupon_user.go @@ -282,22 +282,28 @@ func couponUserPageConditions(req CouponUserPageFilter) []gen.Condition { } } if req.CreatedAtStart != nil { - conds = append(conds, q.CouponUser.CreatedAt.Gte(u.DateHead(*req.CreatedAtStart))) + start := u.DateHead(*req.CreatedAtStart) + conds = append(conds, q.CouponUser.CreatedAt.Gte(start)) } if req.CreatedAtEnd != nil { - conds = append(conds, q.CouponUser.CreatedAt.Lte(u.DateTail(*req.CreatedAtEnd))) + end := u.DateTail(*req.CreatedAtEnd) + conds = append(conds, q.CouponUser.CreatedAt.Lte(end)) } if req.ExpireAtStart != nil { - conds = append(conds, q.CouponUser.ExpireAt.Gte(u.DateHead(*req.ExpireAtStart))) + start := u.DateHead(*req.ExpireAtStart) + conds = append(conds, q.CouponUser.ExpireAt.Gte(start)) } if req.ExpireAtEnd != nil { - conds = append(conds, q.CouponUser.ExpireAt.Lte(u.DateTail(*req.ExpireAtEnd))) + end := u.DateTail(*req.ExpireAtEnd) + conds = append(conds, q.CouponUser.ExpireAt.Lte(end)) } if req.UsedAtStart != nil { - conds = append(conds, q.CouponUser.UsedAt.Gte(u.DateHead(*req.UsedAtStart))) + start := u.DateHead(*req.UsedAtStart) + conds = append(conds, q.CouponUser.UsedAt.Gte(start)) } if req.UsedAtEnd != nil { - conds = append(conds, q.CouponUser.UsedAt.Lte(u.DateTail(*req.UsedAtEnd))) + end := u.DateTail(*req.UsedAtEnd) + conds = append(conds, q.CouponUser.UsedAt.Lte(end)) } return conds } diff --git a/web/handlers/resource.go b/web/handlers/resource.go index 1153cb8..cd348ce 100644 --- a/web/handlers/resource.go +++ b/web/handlers/resource.go @@ -44,16 +44,20 @@ func PageResourceShort(c *fiber.Ctx) error { do.Where(q.ResourceShort.As(q.Resource.Short.Name()).Type.Eq(*req.Type)) } if req.CreateAfter != nil { - do.Where(q.Resource.CreatedAt.Gte(*req.CreateAfter)) + start := u.DateHead(*req.CreateAfter) + do = do.Where(q.Resource.CreatedAt.Gte(start)) } if req.CreateBefore != nil { - do.Where(q.Resource.CreatedAt.Lte(*req.CreateBefore)) + end := u.DateTail(*req.CreateBefore) + do = do.Where(q.Resource.CreatedAt.Lte(end)) } if req.ExpireAfter != nil { - do.Where(q.ResourceShort.As(q.Resource.Short.Name()).ExpireAt.Gte(*req.ExpireAfter)) + start := u.DateHead(*req.ExpireAfter) + do = do.Where(q.ResourceShort.As(q.Resource.Short.Name()).ExpireAt.Gte(start)) } if req.ExpireBefore != nil { - do.Where(q.ResourceShort.As(q.Resource.Short.Name()).ExpireAt.Lte(*req.ExpireBefore)) + end := u.DateTail(*req.ExpireBefore) + do = do.Where(q.ResourceShort.As(q.Resource.Short.Name()).ExpireAt.Lte(end)) } if req.Status != nil { var short = q.ResourceShort.As(q.Resource.Short.Name()) @@ -69,7 +73,7 @@ func PageResourceShort(c *fiber.Ctx) error { } } - resource, err := q.Resource.Where(do). + resource, err := q.Resource.Where(do).Debug(). Joins(q.Resource.Short). Order(q.Resource.CreatedAt.Desc()). Offset(req.GetOffset()). @@ -141,16 +145,20 @@ func PageResourceLong(c *fiber.Ctx) error { do.Where(q.ResourceLong.As(q.Resource.Long.Name()).Type.Eq(int(*req.Type))) } if req.CreateAfter != nil { - do.Where(q.Resource.CreatedAt.Gte(*req.CreateAfter)) + start := u.DateHead(*req.CreateAfter) + do = do.Where(q.Resource.CreatedAt.Gte(start)) } if req.CreateBefore != nil { - do.Where(q.Resource.CreatedAt.Lte(*req.CreateBefore)) + end := u.DateTail(*req.CreateBefore) + do = do.Where(q.Resource.CreatedAt.Lte(end)) } if req.ExpireAfter != nil { - do.Where(q.ResourceLong.As(q.Resource.Long.Name()).ExpireAt.Gte(*req.ExpireAfter)) + start := u.DateHead(*req.ExpireAfter) + do = do.Where(q.ResourceLong.As(q.Resource.Long.Name()).ExpireAt.Gte(start)) } if req.ExpireBefore != nil { - do.Where(q.ResourceLong.As(q.Resource.Long.Name()).ExpireAt.Lte(*req.ExpireBefore)) + end := u.DateTail(*req.ExpireBefore) + do = do.Where(q.ResourceLong.As(q.Resource.Long.Name()).ExpireAt.Lte(end)) } if req.Status != nil { var long = q.ResourceLong.As(q.Resource.Long.Name()) @@ -166,7 +174,7 @@ func PageResourceLong(c *fiber.Ctx) error { } } - resource, err := q.Resource.Where(do). + resource, err := q.Resource.Where(do).Debug(). Joins(q.Resource.Long). Order(q.Resource.CreatedAt.Desc()). Offset(req.GetOffset()). @@ -329,10 +337,12 @@ func PageResourceLongByAdmin(c *fiber.Ctx) error { do = do.Where(q.ResourceLong.As("Long").Type.Eq(*req.Mode)) } if req.CreatedAtStart != nil { - do = do.Where(q.Resource.CreatedAt.Gte(*req.CreatedAtStart)) + start := u.DateHead(*req.CreatedAtStart) + do = do.Where(q.Resource.CreatedAt.Gte(start)) } if req.CreatedAtEnd != nil { - do = do.Where(q.Resource.CreatedAt.Lte(*req.CreatedAtEnd)) + end := u.DateTail(*req.CreatedAtEnd) + do = do.Where(q.Resource.CreatedAt.Lte(end)) } if req.Expired != nil { if *req.Expired { @@ -755,10 +765,12 @@ func StatisticResourceUsage(c *fiber.Ctx) error { ) if req.TimeAfter != nil { - do.Where(q.LogsUserUsage.Time.Gte(*req.TimeAfter)) + start := u.DateHead(*req.TimeAfter) + do = do.Where(q.LogsUserUsage.Time.Gte(start)) } if req.TimeBefore != nil { - do.Where(q.LogsUserUsage.Time.Lte(*req.TimeBefore)) + end := u.DateTail(*req.TimeBefore) + do = do.Where(q.LogsUserUsage.Time.Lte(end)) } var data = new(StatisticResourceUsageResp) diff --git a/web/handlers/user.go b/web/handlers/user.go index 9fcf4f8..09f51b4 100644 --- a/web/handlers/user.go +++ b/web/handlers/user.go @@ -2,12 +2,14 @@ package handlers import ( "errors" + "platform/pkg/u" "platform/web/auth" "platform/web/core" g "platform/web/globals" m "platform/web/models" q "platform/web/queries" s "platform/web/services" + "time" "github.com/gofiber/fiber/v2" "github.com/shopspring/decimal" @@ -65,9 +67,17 @@ func PageUserByAdmin(c *fiber.Ctx) error { do = do.Where(q.User.AdminID.IsNull()) } } + if req.CreatedAtStart != nil { + start := u.DateHead(*req.CreatedAtStart) + do = do.Where(q.User.CreatedAt.Gte(start)) + } + if req.CreatedAtEnd != nil { + end := u.DateTail(*req.CreatedAtEnd) + do = do.Where(q.User.CreatedAt.Lte(end)) + } // 查询用户列表 - users, total, err := q.User. + users, total, err := q.User.Debug(). Preload(q.User.Admin, q.User.Discount). Omit(q.User.Password, q.Admin.Password). Where(do). @@ -102,11 +112,13 @@ func PageUserByAdmin(c *fiber.Ctx) error { type PageUserByAdminReq struct { core.PageReq - Account *string `json:"account,omitempty"` - Name *string `json:"name,omitempty"` - Identified *bool `json:"identified,omitempty"` - Enabled *bool `json:"enabled,omitempty"` - Assigned *bool `json:"assigned,omitempty"` + Account *string `json:"account,omitempty"` + Name *string `json:"name,omitempty"` + Identified *bool `json:"identified,omitempty"` + Enabled *bool `json:"enabled,omitempty"` + Assigned *bool `json:"assigned,omitempty"` + CreatedAtStart *time.Time `json:"created_at_start,omitempty"` + CreatedAtEnd *time.Time `json:"created_at_end,omitempty"` } // 管理员获取单个用户 diff --git a/web/services/channel.go b/web/services/channel.go index d859fdf..211f2e3 100644 --- a/web/services/channel.go +++ b/web/services/channel.go @@ -6,6 +6,7 @@ import ( "fmt" "math/rand/v2" "net/netip" + "platform/pkg/env" "platform/pkg/u" "platform/web/core" g "platform/web/globals" @@ -45,6 +46,48 @@ func (s *channelServer) ClearExpiredChannels(proxyId int32) (int, error) { return s.provider.ClearExpiredChannels(proxyId) } +func (s *channelServer) RefreshEdges() error { + if env.RunMode != env.RunModeProd { + return nil + } + + // 找到所有网关 + proxies, err := q.Proxy.Where( + q.Proxy.Status.Eq(int(m.ProxyStatusOnline)), + ).Find() + if err != nil { + return fmt.Errorf("查询网关失败: %w", err) + } + + for _, proxy := range proxies { + gateway, err := proxyGateway(proxy) + if err != nil { + return core.NewServErr("创建代理网关失败", err) + } + + // 选取随机节点 + edges, err := gateway.GatewayEdge(&g.GatewayEdgeReq{ + Assigned: u.P(false), + Limit: u.P(1000), + }) + if err != nil { + return fmt.Errorf("获取边缘节点失败: %w", err) + } + + // 提交断开配置 + edgeIds := make([]string, 0, len(edges)) + for id, _ := range edges { + edgeIds = append(edgeIds, id) + } + g.Cloud.CloudDisconnect(&g.CloudDisconnectReq{ + Uuid: proxy.Mac, + Edge: &edgeIds, + }) + } + + return nil +} + // 授权方式 type ChannelAuthType int diff --git a/web/services/channel_baiyin.go b/web/services/channel_baiyin.go index 343bfee..44afa05 100644 --- a/web/services/channel_baiyin.go +++ b/web/services/channel_baiyin.go @@ -31,33 +31,27 @@ func (s *channelBaiyinProvider) CreateChannels(source netip.Addr, resourceId int now := time.Now() batchNo := ID.GenReadable("bat") - - // 检查并获取套餐与白名单 - resource, whitelists, err := ensure(now, source, resourceId, authWhitelist, count) - if err != nil { - return nil, err - } - - user := resource.User - expire := now.Add(resource.Live) - - // 选择代理 - proxy, gateway, err := selectProxy(count) - if err != nil { - return nil, err - } - - // 取用端口 - chans, err := selectPorts(proxy.ID, batchNo, count, expire) - if err != nil { - return nil, err - } - - // 节点查询到提交,需要锁定防止并发取用 channels := make([]*m.Channel, count) - err = g.Redsync.WithLock(lockChannelCreateKey(), func() error { - // 取用节点 - edges, err := selectEdges(gateway, filter, count) + + // 资源锁,防止并发扣减失败导致的端口悬空问题 + err := g.Redsync.WithLock(lockChannelCreateKey(resourceId), func() error { + // 检查并获取套餐与白名单 + resource, whitelists, err := ensure(now, source, resourceId, authWhitelist, count) + if err != nil { + return err + } + + user := resource.User + expire := now.Add(resource.Live) + + // 选择代理 + proxy, gateway, err := selectProxy(count) + if err != nil { + return err + } + + // 取用端口 + chans, err := selectPorts(proxy.ID, batchNo, count, expire) if err != nil { return err } @@ -67,7 +61,6 @@ func (s *channelBaiyinProvider) CreateChannels(source netip.Addr, resourceId int edgeConfigs := make([]string, 0, count) for i := range count { ch := chans[i] - edge := edges[i] // 通道数据 channels[i] = &m.Channel{ @@ -77,7 +70,6 @@ func (s *channelBaiyinProvider) CreateChannels(source netip.Addr, resourceId int ProxyID: proxy.ID, Host: u.Else(proxy.Host, proxy.IP.String()), Port: ch.Port(), - EdgeRef: u.P(edge.EdgeID), FilterISP: filter.Isp, FilterProv: filter.Prov, FilterCity: filter.City, @@ -89,7 +81,12 @@ func (s *channelBaiyinProvider) CreateChannels(source netip.Addr, resourceId int chanConfigs[i] = &g.PortConfigsReq{ Port: int(ch.Port()), Status: true, - Edge: &[]string{edge.EdgeID}, + AutoEdgeConfig: &g.AutoEdgeConfig{ + Province: u.Z(filter.Prov), + City: u.Z(filter.City), + Isp: filter.Isp.String(), + Count: u.P(1), + }, } // 白名单模式 @@ -105,26 +102,24 @@ func (s *channelBaiyinProvider) CreateChannels(source netip.Addr, resourceId int channels[i].Password = &password chanConfigs[i].Userpass = u.P(username + ":" + password) } - - // 连接配置数据 - if edge.Type == EdgeInfoCloud { - edgeConfigs = append(edgeConfigs, edge.EdgeID) - } } // 提交配置 slog.Debug("提交代理端口配置", "proxy", proxy.IP.String(), "total_count", len(chanConfigs), "remote_count", len(edgeConfigs)) if env.RunMode == env.RunModeProd { - // 连接节点到网关 - if err := g.Cloud.CloudConnect(&g.CloudConnectReq{Uuid: proxy.Mac, Edge: &edgeConfigs}); err != nil { - return core.NewServErr("连接云平台失败", err) + // 从云端补足节点 + err := ensureEdges(proxy, gateway, filter, count) + if err != nil { + slog.Warn("ensureEdges 失败", "err", err) // 不阻止通道创建,继续走后续流程 } // 启用网关代理通道 - if err := gateway.GatewayPortConfigs(chanConfigs); err != nil { - slog.Warn("提交代理端口配置失败", "error", err.Error()) - return core.NewServErr(fmt.Sprintf("配置代理 %s 端口失败", proxy.IP.String()), err) + if len(chanConfigs) > 0 { + if err := gateway.GatewayPortConfigs(chanConfigs); err != nil { + slog.Warn("提交代理端口配置失败", "error", err.Error()) + return core.NewServErr(fmt.Sprintf("配置代理 %s 端口失败", proxy.IP.String()), err) + } } } else { for _, item := range chanConfigs { @@ -166,13 +161,13 @@ func (s *channelBaiyinProvider) CreateChannels(source netip.Addr, resourceId int ) default: - return core.NewBizErr("套餐类型不正确,无法更新", nil) + return core.NewBizErr("套餐类型不正确,无法更新") } if err != nil { return core.NewServErr("更新套餐使用记录失败", err) } if result.RowsAffected == 0 { - return core.NewBizErr("提取太频繁,请稍后再试", nil) + return core.NewBizErr("套餐状态已过期") } // 保存通道 @@ -214,75 +209,61 @@ func (s *channelBaiyinProvider) CreateChannels(source netip.Addr, resourceId int return channels, nil } -func (s *channelBaiyinProvider) RemoveChannels(batch string) error { - return g.Redsync.WithLock(lockChannelRemoveKey(batch), func() error { +func (s *channelBaiyinProvider) RemoveChannels(batchNo string) error { + return g.Redsync.WithLock(lockChannelRemoveKey(batchNo), func() error { start := time.Now() // 获取连接数据 - channels, err := q.Channel.Where(q.Channel.BatchNo.Eq(batch)).Find() + channels, err := q.Channel.Where(q.Channel.BatchNo.Eq(batchNo)).Find() if err != nil { - return core.NewServErr(fmt.Sprintf("获取通道数据失败,batch:%s", batch), err) + return core.NewServErr(fmt.Sprintf("获取通道数据失败,batch:%s", batchNo), err) } if len(channels) == 0 { - slog.Warn(fmt.Sprintf("未找到通道数据,batch:%s", batch)) + slog.Warn(fmt.Sprintf("未找到通道数据,batch:%s", batchNo)) return nil } proxy, err := q.Proxy.Where(q.Proxy.ID.Eq(channels[0].ProxyID)).Take() if err != nil { - return core.NewServErr(fmt.Sprintf("获取代理数据失败,batch:%s", batch), err) + return core.NewServErr(fmt.Sprintf("获取代理数据失败,batch:%s", batchNo), err) } // 检查通道是否存在 - exist, err := g.Redis.Exists(context.Background(), usedChansKey(proxy.ID, batch)).Result() + chans, err := g.Redis.LRange(context.Background(), usedChansKey(proxy.ID, batchNo), 0, -1).Result() if err != nil { return core.NewServErr("查询使用中通道失败", err) } - if exist == 0 { + if len(chans) == 0 { + slog.Debug("通道为空,跳过清理", "key", usedChansKey(proxy.ID, batchNo)) return nil // 没有使用中通道,已经被清理过了 } // 准备配置数据 - edgeConfigs := make([]string, len(channels)) - configs := make([]*g.PortConfigsReq, len(channels)) - for i, channel := range channels { - if channel.EdgeRef != nil { - edgeConfigs[i] = *channel.EdgeRef - } else { - slog.Warn(fmt.Sprintf("通道 %d 没有保存节点引用", channel.ID)) + configs := make([]*g.PortConfigsReq, len(chans)) + for i, ch := range chans { + ap, err := netip.ParseAddrPort(ch) + if err != nil { + return core.NewServErr(fmt.Sprintf("解析通道数据失败: %s", ch), err) } configs[i] = &g.PortConfigsReq{ - Status: false, - Port: int(channel.Port), - Edge: &[]string{}, + Port: int(ap.Port()), + Edge: &[]string{}, + AutoEdgeConfig: &g.AutoEdgeConfig{Count: u.P(0)}, + Status: false, } } // 提交配置 if env.RunMode == env.RunModeProd { - - // 清空通道配置 - secret := strings.Split(u.Z(proxy.Secret), ":") - if len(secret) != 2 { - return core.NewServErr(fmt.Sprintf("代理 %s 密钥格式错误", proxy.IP.String()), nil) - } - gateway := g.NewGateway(proxy.IP.String(), secret[0], secret[1]) - err := gateway.GatewayPortConfigs(configs) + gateway, err := proxyGateway(proxy) if err != nil { + return core.NewServErr("创建代理网关失败", err) + } + + if err = gateway.GatewayPortConfigs(configs); err != nil { return core.NewServErr(fmt.Sprintf("清空代理 %s 端口配置失败", proxy.IP.String()), err) } - - // 断开节点连接 - _, err = g.Cloud.CloudDisconnect(&g.CloudDisconnectReq{ - Uuid: proxy.Mac, - Edge: &edgeConfigs, - }) - if err != nil { - slog.Warn("断开云平台连接失败", "error", err.Error()) - return core.NewServErr("断开云平台连接失败", err) - } - } else { for _, item := range configs { str, _ := json.Marshal(item) @@ -291,12 +272,12 @@ func (s *channelBaiyinProvider) RemoveChannels(batch string) error { } // 释放端口 - err = freeChans(proxy.ID, batch) + err = freeChans(proxy.ID, batchNo) if err != nil { return err } - slog.Debug("清除代理端口配置", "proxy", proxy.ID, "batch", batch, "duration", time.Since(start).String()) + slog.Debug("清除代理端口配置", "proxy", proxy.ID, "batch", batchNo, "duration", time.Since(start).String()) return nil }) } @@ -354,8 +335,8 @@ func (s *channelBaiyinProvider) ClearExpiredChannels(proxyId int32) (int, error) return len(batchSet), nil } -func lockChannelCreateKey() string { - return "platform:channel:create" +func lockChannelCreateKey(resourceId int32) string { + return fmt.Sprintf("platform:channel:create:%d", resourceId) } func lockChannelRemoveKey(bid string) string { @@ -399,13 +380,12 @@ func selectProxy(count int) (*m.Proxy, g.GatewayClient, error) { if maxCount < count { return nil, nil, core.NewBizErr("无可用代理") } - proxy := proxyMap[maxId] - secret := strings.Split(u.Z(proxy.Secret), ":") - if len(secret) != 2 { - return nil, nil, core.NewServErr(fmt.Sprintf("代理 %s 密钥格式错误", proxy.IP.String()), nil) + proxy := proxyMap[maxId] + gateway, err := proxyGateway(proxy) + if err != nil { + return nil, nil, core.NewServErr("创建代理网关失败", err) } - gateway := g.NewGateway(proxy.IP.String(), secret[0], secret[1]) return proxy, gateway, nil } @@ -427,14 +407,16 @@ func selectPorts(proxyId int32, batchNo string, count int, expire time.Time) ([] return chans, nil } -// selectEdges 选择节点,优先本地节点,失败重试,直到达到重试次数限制 +// ensureEdges 检查本地节点是否足够,如果不足从云端连入 // 本地节点通过 Assigned = false 排除已分配节点 // 云端节点通过 NoRepeat = true 排除已分配节点 -func selectEdges(gateway g.GatewayClient, filter *EdgeFilter, count int) ([]EdgeInfo, error) { - edges := make([]EdgeInfo, 0, count) +func ensureEdges(proxy *m.Proxy, gateway g.GatewayClient, filter *EdgeFilter, count int) error { + if filter.IsEmpty() { + return nil // 没有过滤条件,直接返回空,避免无意义的查询 + } // 先查本地 - localEdgesResp, err := gateway.GatewayEdge(&g.GatewayEdgeReq{ + localEdges, err := gateway.GatewayEdge(&g.GatewayEdgeReq{ Province: filter.Prov, City: filter.City, Isp: u.X(filter.Isp.String()), @@ -442,22 +424,15 @@ func selectEdges(gateway g.GatewayClient, filter *EdgeFilter, count int) ([]Edge Assigned: u.P(false), }) if err != nil { - return nil, core.NewBizErr("获取可用节点失败[1]", err) + return core.NewBizErr("检查可用节点失败[1]", err) } - - for id, _ := range localEdgesResp { - edges = append(edges, EdgeInfo{ - Type: EdgeInfoLocal, - EdgeID: id, - }) - } - if len(edges) >= count { - return edges, nil + if len(localEdges) >= count { + return nil // 本地节点足够,直接返回空,后续逻辑会优先使用本地节点 } // 再查云端 - remaining := count - len(edges) - cloudEdgesResp, err := g.Cloud.CloudEdges(&g.CloudEdgesReq{ + remaining := count - len(localEdges) + cloudEdges, err := g.Cloud.CloudEdges(&g.CloudEdgesReq{ Province: filter.Prov, City: filter.City, Isp: u.X(filter.Isp.String()), @@ -467,20 +442,23 @@ func selectEdges(gateway g.GatewayClient, filter *EdgeFilter, count int) ([]Edge IpUnchangedTime: u.P(3600), }) if err != nil { - return nil, core.NewBizErr("获取可用节点失败[2]", err) + return core.NewBizErr("检查可用节点失败[2]", err) + } + if len(cloudEdges.Edges) < remaining { + return core.NewBizErr("地区可用节点数量不足") } - for _, edge := range cloudEdgesResp.Edges { - edges = append(edges, EdgeInfo{ - Type: EdgeInfoCloud, - EdgeID: edge.EdgeID, - }) - } - if len(edges) < count { - return nil, core.NewBizErr("地区可用节点数量不足") + // 连入云端节点 + edges := make([]string, remaining) + for i, edge := range cloudEdges.Edges { + edges[i] = edge.EdgeID } - return edges, nil + if err := g.Cloud.CloudConnect(&g.CloudConnectReq{Uuid: proxy.Mac, Edge: &edges}); err != nil { + return core.NewServErr("连接云平台失败", err) + } + + return nil } type EdgeInfo struct { diff --git a/web/services/edge.go b/web/services/edge.go index ab35c2b..4a346c9 100644 --- a/web/services/edge.go +++ b/web/services/edge.go @@ -1,6 +1,7 @@ package services import ( + "platform/pkg/u" m "platform/web/models" q "platform/web/queries" ) @@ -37,3 +38,15 @@ type EdgeFilter struct { Prov *string `json:"prov"` City *string `json:"city"` } + +func (f *EdgeFilter) IsEmpty() bool { + if f == nil { + return true + } + + if f.Isp.String() == "" || u.Z(f.Prov) != "" || u.Z(f.City) != "" { + return false + } + + return false +} diff --git a/web/services/proxy.go b/web/services/proxy.go index e7c6930..e563604 100644 --- a/web/services/proxy.go +++ b/web/services/proxy.go @@ -2,6 +2,7 @@ package services import ( "context" + "fmt" "net/netip" "platform/pkg/u" "platform/web/core" @@ -9,6 +10,7 @@ import ( "platform/web/globals/orm" m "platform/web/models" q "platform/web/queries" + "strings" "time" "gorm.io/gen/field" @@ -208,3 +210,14 @@ func (s *proxyService) UpdateStatus(update *UpdateProxyStatus) error { return err }) } + +func proxyGateway(proxy *m.Proxy) (g.GatewayClient, error) { + + secret := strings.Split(u.Z(proxy.Secret), ":") + if len(secret) != 2 { + return nil, core.NewServErr(fmt.Sprintf("代理 %s 密钥格式错误", proxy.IP.String()), nil) + } + gateway := g.NewGateway(proxy.IP.String(), secret[0], secret[1]) + + return gateway, nil +} diff --git a/web/tasks/task.go b/web/tasks/task.go index 1540212..130da5f 100644 --- a/web/tasks/task.go +++ b/web/tasks/task.go @@ -52,3 +52,13 @@ func HandleRemoveChannel(_ context.Context, task *asynq.Task) (err error) { } return nil } + +func HandleRefreshEdges(_ context.Context, task *asynq.Task) (err error) { + slog.Info("[event]刷新边缘节点") + + err = s.Channel.RefreshEdges() + if err != nil { + return fmt.Errorf("刷新边缘节点失败: %w", err) + } + return nil +} diff --git a/web/web.go b/web/web.go index 84c8469..a283601 100644 --- a/web/web.go +++ b/web/web.go @@ -42,6 +42,10 @@ func RunApp(pCtx context.Context) error { return RunTask(ctx) }) + g.Go(func() error { + return RunCron(ctx) + }) + return g.Wait() } @@ -80,7 +84,7 @@ func RunWeb(ctx context.Context) error { } func RunTask(ctx context.Context) error { - var server = asynq.NewServerFromRedisClient(deps.Redis, asynq.Config{ + server := asynq.NewServerFromRedisClient(deps.Redis, asynq.Config{ ShutdownTimeout: 5 * time.Second, ErrorHandler: asynq.ErrorHandlerFunc(func(ctx context.Context, task *asynq.Task, err error) { slog.Error("任务执行失败", "task", task.Type(), "error", err) @@ -91,6 +95,7 @@ func RunTask(ctx context.Context) error { var mux = asynq.NewServeMux() mux.HandleFunc(events.RemoveChannel, tasks.HandleRemoveChannel) mux.HandleFunc(events.CloseTrade, tasks.HandleCompleteTrade) + mux.HandleFunc(events.RefreshEdge, tasks.HandleRefreshEdges) // 停止服务 go func() { @@ -107,6 +112,29 @@ func RunTask(ctx context.Context) error { return nil } +func RunCron(ctx context.Context) error { + cron := asynq.NewSchedulerFromRedisClient(deps.Redis, &asynq.SchedulerOpts{ + Logger: &AppAsynqLogger{}, + Location: time.Local, + }) + + cron.Register("0/10 * * * *", events.NewRefreshEdge()) + + // 停止服务 + go func() { + <-ctx.Done() + cron.Shutdown() + }() + + // 启动服务 + err := cron.Run() + if err != nil { + return fmt.Errorf("定时任务服务运行失败: %w", err) + } + + return nil +} + type AppAsynqLogger struct{} func (l *AppAsynqLogger) Debug(args ...any) {