diff --git a/README.md b/README.md index 6e45c22..f7696b5 100644 --- a/README.md +++ b/README.md @@ -1,8 +1,10 @@ ## TODO -交易信息持久化 +proxy 的删除和更新,锁粒度应该有问题 -用户请求需要检查数据权限 +最低价格 0.01 + +交易信息持久化 用反射实现环境变量解析,以简化函数签名 @@ -10,12 +12,8 @@ 分离 task 的客户端,支持多进程(prefork 必要!) -jsonb 类型转换问题,考虑一个高效的 any 到 struct 转换工具 - 慢速请求底层调用埋点监控 -数据库转模型文件 - 冷数据迁移方案 ## 开发环境 @@ -49,13 +47,6 @@ jsonb 类型转换问题,考虑一个高效的 any 到 struct 转换工具 3. 异步回调事件,收到支付成功事件后自动完成订单 4. 用户退出支付界面,客户端主动发起关闭订单 -### 产品字典表 - -| 代码 | 产品 | -| ----- | ------------ | -| short | 短效动态代理 | -| long | 长效动态代理 | - ### 节点分配与存储逻辑 提取: diff --git a/scripts/sql/fill.sql b/scripts/sql/fill.sql index d40d495..be812b9 100644 --- a/scripts/sql/fill.sql +++ b/scripts/sql/fill.sql @@ -126,7 +126,8 @@ insert into permission (name, description, sort) values ('channel', 'IP', 11), ('trade', '交易', 12), ('bill', '账单', 13), - ('balance_activity', '余额变动', 14); + ('balance_activity', '余额变动', 14), + ('proxy', '代理', 15); -- -------------------------- -- level 2 @@ -189,6 +190,11 @@ insert into permission (parent_id, name, description, sort) values ((select id from permission where name = 'channel' and deleted_at is null), 'channel:read', '读取 IP 列表', 1), ((select id from permission where name = 'channel' and deleted_at is null), 'channel:write', '写入 IP', 2); +-- proxy 子权限 +insert into permission (parent_id, name, description, sort) values + ((select id from permission where name = 'proxy' and deleted_at is null), 'proxy:read', '读取代理列表', 1), + ((select id from permission where name = 'proxy' and deleted_at is null), 'proxy:write', '写入代理', 2); + -- trade 子权限 insert into permission (parent_id, name, description, sort) values ((select id from permission where name = 'trade' and deleted_at is null), 'trade:read', '读取交易列表', 1), @@ -211,6 +217,10 @@ insert into permission (parent_id, name, description, sort) values insert into permission (parent_id, name, description, sort) values ((select id from permission where name = 'product_sku:write' and deleted_at is null), 'product_sku:write:status', '更改产品套餐状态', 1); +-- proxy:write 子权限 +insert into permission (parent_id, name, description, sort) values + ((select id from permission where name = 'proxy:write' and deleted_at is null), 'proxy:write:status', '更改代理状态', 1); + -- resource:short 子权限 insert into permission (parent_id, name, description, sort) values ((select id from permission where name = 'resource:short' and deleted_at is null), 'resource:short:read', '读取用户短效动态套餐列表', 1); diff --git a/web/core/scopes.go b/web/core/scopes.go index 5d638d9..2e7ef43 100644 --- a/web/core/scopes.go +++ b/web/core/scopes.go @@ -62,6 +62,11 @@ const ( ScopeChannelReadOfUser = string("channel:read:of_user") // 读取指定用户的 IP 列表 ScopeChannelWrite = string("channel:write") // 写入 IP + ScopeProxy = string("proxy") // 代理 + ScopeProxyRead = string("proxy:read") // 读取代理列表 + ScopeProxyWrite = string("proxy:write") // 写入代理 + ScopeProxyWriteStatus = string("proxy:write:status") // 更改代理状态 + ScopeTrade = string("trade") // 交易 ScopeTradeRead = string("trade:read") // 读取交易列表 ScopeTradeReadOfUser = string("trade:read:of_user") // 读取指定用户的交易列表 diff --git a/web/handlers/proxy.go b/web/handlers/proxy.go index aaa921e..ef7672e 100644 --- a/web/handlers/proxy.go +++ b/web/handlers/proxy.go @@ -1,61 +1,123 @@ package handlers import ( - "net/netip" - "platform/pkg/env" "platform/web/auth" "platform/web/core" - "platform/web/globals" + g "platform/web/globals" s "platform/web/services" "time" "github.com/gofiber/fiber/v2" ) -func DebugRegisterProxyBaiYin(c *fiber.Ctx) error { - if env.RunMode != env.RunModeDev { - return fiber.ErrNotFound - } - - err := s.Proxy.RegisterBaiyin("1a:2b:3c:4d:5e:6f", netip.AddrFrom4([4]byte{127, 0, 0, 1}), "test", "test") - if err != nil { - return core.NewServErr("注册失败", err) - } - - return nil -} - -// 注册白银代理网关 -func ProxyRegisterBaiYin(c *fiber.Ctx) error { - _, err := auth.GetAuthCtx(c).PermitOfficialClient() +func PageProxyByAdmin(c *fiber.Ctx) error { + _, err := auth.GetAuthCtx(c).PermitAdmin(core.ScopeProxyRead) if err != nil { return err } - req := new(RegisterProxyBaiyinReq) - err = globals.Validator.ParseBody(c, req) + var req core.PageReq + if err := g.Validator.ParseBody(c, &req); err != nil { + return err + } + + list, total, err := s.Proxy.Page(req) if err != nil { return err } - addr, err := netip.ParseAddr(req.IP) - if err != nil { - return core.NewServErr("IP地址格式错误", err) - } - - err = s.Proxy.RegisterBaiyin(req.Name, addr, req.Username, req.Password) - if err != nil { - return core.NewServErr("注册失败", err) - } - - return nil + return c.JSON(core.PageResp{ + List: list, + Total: int(total), + Page: req.GetPage(), + Size: req.GetSize(), + }) } -type RegisterProxyBaiyinReq struct { - Name string `json:"name" validate:"required"` - IP string `json:"ip" validate:"required"` - Username string `json:"username" validate:"required"` - Password string `json:"password" validate:"required"` +func AllProxyByAdmin(c *fiber.Ctx) error { + _, err := auth.GetAuthCtx(c).PermitAdmin(core.ScopeProxyRead) + if err != nil { + return err + } + + list, err := s.Proxy.All() + if err != nil { + return err + } + + return c.JSON(list) +} + +func CreateProxy(c *fiber.Ctx) error { + _, err := auth.GetAuthCtx(c).PermitAdmin(core.ScopeProxyWrite) + if err != nil { + return err + } + + var req s.CreateProxy + if err := g.Validator.ParseBody(c, &req); err != nil { + return err + } + + if err := s.Proxy.Create(&req); err != nil { + return err + } + + return c.JSON(nil) +} + +func UpdateProxy(c *fiber.Ctx) error { + _, err := auth.GetAuthCtx(c).PermitAdmin(core.ScopeProxyWrite) + if err != nil { + return err + } + + var req s.UpdateProxy + if err := g.Validator.ParseBody(c, &req); err != nil { + return err + } + + if err := s.Proxy.Update(&req); err != nil { + return err + } + + return c.JSON(nil) +} + +func UpdateProxyStatus(c *fiber.Ctx) error { + _, err := auth.GetAuthCtx(c).PermitAdmin(core.ScopeProxyWriteStatus) + if err != nil { + return err + } + + var req s.UpdateProxyStatus + if err := g.Validator.ParseBody(c, &req); err != nil { + return err + } + + if err := s.Proxy.UpdateStatus(&req); err != nil { + return err + } + + return c.JSON(nil) +} + +func RemoveProxy(c *fiber.Ctx) error { + _, err := auth.GetAuthCtx(c).PermitAdmin(core.ScopeProxyWrite) + if err != nil { + return err + } + + var req core.IdReq + if err := g.Validator.ParseBody(c, &req); err != nil { + return err + } + + if err := s.Proxy.Remove(req.Id); err != nil { + return err + } + + return c.JSON(nil) } // region 报告上线 diff --git a/web/routes.go b/web/routes.go index c9c3f36..7e29c80 100644 --- a/web/routes.go +++ b/web/routes.go @@ -25,7 +25,6 @@ func ApplyRouters(app *fiber.App) { if env.RunMode == env.RunModeDev { debug := app.Group("/debug") debug.Get("/sms/:phone", handlers.DebugGetSmsCode) - debug.Get("/proxy/register", handlers.DebugRegisterProxyBaiYin) debug.Get("/iden/clear/:phone", handlers.DebugIdentifyClear) debug.Get("/session/now", func(ctx *fiber.Ctx) error { rs, err := q.Session.Where(q.Session.AccessTokenExpires.Gt(time.Now())).Find() @@ -134,9 +133,6 @@ func clientRouter(api fiber.Router) { channel := client.Group("/channel") channel.Post("/remove", handlers.RemoveChannels) - // 代理网关注册 - proxy := client.Group("/proxy") - proxy.Post("/register/baidyin", handlers.ProxyRegisterBaiYin) } // 管理员接口路由 @@ -196,6 +192,15 @@ func adminRouter(api fiber.Router) { channel.Post("/page", handlers.PageChannelByAdmin) channel.Post("/page/of-user", handlers.PageChannelOfUserByAdmin) + // proxy 代理 + var proxy = api.Group("/proxy") + proxy.Post("/all", handlers.AllProxyByAdmin) + proxy.Post("/page", handlers.PageProxyByAdmin) + proxy.Post("/create", handlers.CreateProxy) + proxy.Post("/update", handlers.UpdateProxy) + proxy.Post("/update/status", handlers.UpdateProxyStatus) + proxy.Post("/remove", handlers.RemoveProxy) + // trade 交易 var trade = api.Group("/trade") trade.Post("/page", handlers.PageTradeByAdmin) diff --git a/web/services/channel_baiyin.go b/web/services/channel_baiyin.go index 047a67b..ce096e6 100644 --- a/web/services/channel_baiyin.go +++ b/web/services/channel_baiyin.go @@ -63,10 +63,27 @@ func (s *channelBaiyinProvider) CreateChannels(source netip.Addr, resourceId int } proxy := proxyResult.Proxy - // 获取可用通道 - chans, err := lockChans(proxy.ID, batch, count) + // 锁内确认状态并锁定端口,避免与状态切换并发穿透 + var chans []netip.AddrPort + err = g.Redsync.WithLock(proxyStatusLockKey(proxy.ID), func() error { + lockedProxy, err := q.Proxy.Where(q.Proxy.ID.Eq(proxy.ID)).Take() + if err != nil { + return err + } + if lockedProxy.Status != m.ProxyStatusOnline { + return core.NewBizErr("无可用主机,请稍后再试") + } + + chans, err = lockChans(proxy.ID, batch, count) + if err != nil { + return core.NewBizErr("无可用通道,请稍后再试", err) + } + + proxy = *lockedProxy + return nil + }) if err != nil { - return nil, core.NewBizErr("无可用通道,请稍后再试", err) + return nil, err } // 获取可用节点 diff --git a/web/services/proxy.go b/web/services/proxy.go index 14515be..fedf25a 100644 --- a/web/services/proxy.go +++ b/web/services/proxy.go @@ -1,65 +1,216 @@ package services import ( + "context" "fmt" "net/netip" "platform/pkg/u" "platform/web/core" + g "platform/web/globals" "platform/web/globals/orm" m "platform/web/models" q "platform/web/queries" + "strconv" "time" + + "gorm.io/gen/field" ) var Proxy = &proxyService{} type proxyService struct{} -// AllProxies 获取所有代理 -func (s *proxyService) AllProxies(proxyType m.ProxyType, channels bool) ([]*m.Proxy, error) { - proxies, err := q.Proxy.Where( - q.Proxy.Type.Eq(int(proxyType)), - q.Proxy.Status.Eq(int(m.ProxyStatusOnline)), - ).Preload( - q.Proxy.Channels.On(q.Channel.ExpiredAt.Gte(time.Now())), - ).Find() - if err != nil { - return nil, err - } - - return proxies, nil +func proxyStatusLockKey(id int32) string { + return fmt.Sprintf("platform:proxy:status:%d", id) } -// RegisterBaiyin 注册新代理服务 -func (s *proxyService) RegisterBaiyin(Name string, IP netip.Addr, username, password string) error { - - // 保存代理信息 - proxy := &m.Proxy{ - Version: 0, - Mac: Name, - IP: orm.Inet{Addr: IP}, - Secret: u.P(fmt.Sprintf("%s:%s", username, password)), - Type: m.ProxyTypeBaiYin, - Status: m.ProxyStatusOnline, +func hasUsedChans(proxyID int32) (bool, error) { + ctx := context.Background() + pattern := usedChansKey + ":" + strconv.Itoa(int(proxyID)) + ":*" + keys, _, err := g.Redis.Scan(ctx, 0, pattern, 1).Result() + if err != nil { + return false, err } - if err := q.Proxy.Create(proxy); err != nil { - return core.NewServErr("保存通道数据失败") + return len(keys) > 0, nil +} + +func rebuildFreeChans(proxyID int32, addr netip.Addr) error { + if err := remChans(proxyID); err != nil { + return err } - // 添加可用通道到 redis chans := make([]netip.AddrPort, 10000) for i := range 10000 { - chans[i] = netip.AddrPortFrom(IP, uint16(i+10000)) + chans[i] = netip.AddrPortFrom(addr, uint16(i+10000)) } - err := regChans(proxy.ID, chans) + + if err := regChans(proxyID, chans); err != nil { + return err + } + return nil +} + +func (s *proxyService) Page(req core.PageReq) (result []*m.Proxy, count int64, err error) { + return q.Proxy. + Omit(q.Proxy.Version, q.Proxy.Meta). + Order(q.Proxy.CreatedAt.Desc()). + FindByPage(req.GetOffset(), req.GetLimit()) +} + +func (s *proxyService) All() (result []*m.Proxy, err error) { + return q.Proxy. + Omit(q.Proxy.Version, q.Proxy.Meta). + Order(q.Proxy.CreatedAt.Desc()). + Find() +} + +type CreateProxy struct { + Mac string `json:"mac" validate:"required"` + IP string `json:"ip" validate:"required"` + Host *string `json:"host"` + Secret *string `json:"secret"` + Type *m.ProxyType `json:"type"` + Status *m.ProxyStatus `json:"status"` +} + +func (s *proxyService) Create(create *CreateProxy) error { + addr, err := netip.ParseAddr(create.IP) if err != nil { - return core.NewServErr("添加通道失败", err) + return core.NewServErr("IP地址格式错误", err) } + return q.Q.Transaction(func(tx *q.Query) error { + proxy := &m.Proxy{ + Mac: create.Mac, + IP: orm.Inet{Addr: addr}, + Host: create.Host, + Secret: create.Secret, + Type: u.Else(create.Type, m.ProxyTypeSelfHosted), + Status: u.Else(create.Status, m.ProxyStatusOffline), + } + if err := tx.Proxy.Create(proxy); err != nil { + return core.NewServErr("保存代理数据失败", err) + } + if err := rebuildFreeChans(proxy.ID, addr); err != nil { + return core.NewServErr("初始化代理通道失败", err) + } + return nil + }) +} + +type UpdateProxy struct { + ID int32 `json:"id" validate:"required"` + Mac *string `json:"mac"` + IP *string `json:"ip"` + Host *string `json:"host"` + Secret *string `json:"secret"` +} + +func (s *proxyService) Update(update *UpdateProxy) error { + simples := make([]field.AssignExpr, 0) + hasSideEffect := false + + if update.Mac != nil { + hasSideEffect = true + simples = append(simples, q.Proxy.Mac.Value(*update.Mac)) + } + if update.IP != nil { + addr, err := netip.ParseAddr(*update.IP) + if err != nil { + return core.NewServErr("IP地址格式错误", err) + } + hasSideEffect = true + simples = append(simples, q.Proxy.IP.Value(orm.Inet{Addr: addr})) + } + if update.Host != nil { + simples = append(simples, q.Proxy.Host.Value(*update.Host)) + } + if update.Secret != nil { + hasSideEffect = true + simples = append(simples, q.Proxy.Secret.Value(*update.Secret)) + } + + if len(simples) == 0 { + return nil + } + + if hasSideEffect { + used, err := hasUsedChans(update.ID) + if err != nil { + return core.NewServErr("检查代理通道状态失败", err) + } + if used { + return core.NewBizErr("代理存在未关闭通道,禁止修改") + } + } + + rs, err := q.Proxy. + Where( + q.Proxy.ID.Eq(update.ID), + q.Proxy.Status.Eq(int(m.ProxyStatusOffline)), + ). + UpdateSimple(simples...) + if err != nil { + return err + } + if rs.RowsAffected == 0 { + return core.NewBizErr("代理未下线,禁止修改") + } return nil } -// UnregisterBaiyin 注销代理服务 -func (s *proxyService) UnregisterBaiyin(id int) error { +func (s *proxyService) Remove(id int32) error { + used, err := hasUsedChans(id) + if err != nil { + return core.NewServErr("检查代理通道状态失败", err) + } + if used { + return core.NewBizErr("代理存在未关闭通道,禁止删除") + } + + rs, err := q.Proxy. + Where( + q.Proxy.ID.Eq(id), + q.Proxy.Status.Eq(int(m.ProxyStatusOffline)), + ). + UpdateColumn(q.Proxy.DeletedAt, time.Now()) + if err != nil { + return err + } + if rs.RowsAffected == 0 { + return core.NewBizErr("代理未下线,禁止删除") + } + if err := remChans(id); err != nil { + return core.NewServErr("注销代理通道失败", err) + } return nil } + +type UpdateProxyStatus struct { + ID int32 `json:"id" validate:"required"` + Status m.ProxyStatus `json:"status" validate:"required"` +} + +func (s *proxyService) UpdateStatus(update *UpdateProxyStatus) error { + return g.Redsync.WithLock(proxyStatusLockKey(update.ID), func() error { + proxy, err := q.Proxy.Where(q.Proxy.ID.Eq(update.ID)).Take() + if err != nil { + return err + } + + if proxy.Status == update.Status { + return nil + } + + if update.Status == m.ProxyStatusOnline { + if err := rebuildFreeChans(proxy.ID, proxy.IP.Addr); err != nil { + return core.NewServErr("初始化代理通道失败", err) + } + } + + _, err = q.Proxy. + Where(q.Proxy.ID.Eq(update.ID)). + UpdateSimple(q.Proxy.Status.Value(int(update.Status))) + return err + }) +}