实现代理网关管理接口
This commit is contained in:
@@ -62,6 +62,11 @@ const (
|
||||
ScopeChannelReadOfUser = string("channel:read:of_user") // 读取指定用户的 IP 列表
|
||||
ScopeChannelWrite = string("channel:write") // 写入 IP
|
||||
|
||||
ScopeProxy = string("proxy") // 代理
|
||||
ScopeProxyRead = string("proxy:read") // 读取代理列表
|
||||
ScopeProxyWrite = string("proxy:write") // 写入代理
|
||||
ScopeProxyWriteStatus = string("proxy:write:status") // 更改代理状态
|
||||
|
||||
ScopeTrade = string("trade") // 交易
|
||||
ScopeTradeRead = string("trade:read") // 读取交易列表
|
||||
ScopeTradeReadOfUser = string("trade:read:of_user") // 读取指定用户的交易列表
|
||||
|
||||
@@ -1,61 +1,123 @@
|
||||
package handlers
|
||||
|
||||
import (
|
||||
"net/netip"
|
||||
"platform/pkg/env"
|
||||
"platform/web/auth"
|
||||
"platform/web/core"
|
||||
"platform/web/globals"
|
||||
g "platform/web/globals"
|
||||
s "platform/web/services"
|
||||
"time"
|
||||
|
||||
"github.com/gofiber/fiber/v2"
|
||||
)
|
||||
|
||||
func DebugRegisterProxyBaiYin(c *fiber.Ctx) error {
|
||||
if env.RunMode != env.RunModeDev {
|
||||
return fiber.ErrNotFound
|
||||
}
|
||||
|
||||
err := s.Proxy.RegisterBaiyin("1a:2b:3c:4d:5e:6f", netip.AddrFrom4([4]byte{127, 0, 0, 1}), "test", "test")
|
||||
if err != nil {
|
||||
return core.NewServErr("注册失败", err)
|
||||
}
|
||||
|
||||
return nil
|
||||
}
|
||||
|
||||
// 注册白银代理网关
|
||||
func ProxyRegisterBaiYin(c *fiber.Ctx) error {
|
||||
_, err := auth.GetAuthCtx(c).PermitOfficialClient()
|
||||
func PageProxyByAdmin(c *fiber.Ctx) error {
|
||||
_, err := auth.GetAuthCtx(c).PermitAdmin(core.ScopeProxyRead)
|
||||
if err != nil {
|
||||
return err
|
||||
}
|
||||
|
||||
req := new(RegisterProxyBaiyinReq)
|
||||
err = globals.Validator.ParseBody(c, req)
|
||||
var req core.PageReq
|
||||
if err := g.Validator.ParseBody(c, &req); err != nil {
|
||||
return err
|
||||
}
|
||||
|
||||
list, total, err := s.Proxy.Page(req)
|
||||
if err != nil {
|
||||
return err
|
||||
}
|
||||
|
||||
addr, err := netip.ParseAddr(req.IP)
|
||||
if err != nil {
|
||||
return core.NewServErr("IP地址格式错误", err)
|
||||
}
|
||||
|
||||
err = s.Proxy.RegisterBaiyin(req.Name, addr, req.Username, req.Password)
|
||||
if err != nil {
|
||||
return core.NewServErr("注册失败", err)
|
||||
}
|
||||
|
||||
return nil
|
||||
return c.JSON(core.PageResp{
|
||||
List: list,
|
||||
Total: int(total),
|
||||
Page: req.GetPage(),
|
||||
Size: req.GetSize(),
|
||||
})
|
||||
}
|
||||
|
||||
type RegisterProxyBaiyinReq struct {
|
||||
Name string `json:"name" validate:"required"`
|
||||
IP string `json:"ip" validate:"required"`
|
||||
Username string `json:"username" validate:"required"`
|
||||
Password string `json:"password" validate:"required"`
|
||||
func AllProxyByAdmin(c *fiber.Ctx) error {
|
||||
_, err := auth.GetAuthCtx(c).PermitAdmin(core.ScopeProxyRead)
|
||||
if err != nil {
|
||||
return err
|
||||
}
|
||||
|
||||
list, err := s.Proxy.All()
|
||||
if err != nil {
|
||||
return err
|
||||
}
|
||||
|
||||
return c.JSON(list)
|
||||
}
|
||||
|
||||
func CreateProxy(c *fiber.Ctx) error {
|
||||
_, err := auth.GetAuthCtx(c).PermitAdmin(core.ScopeProxyWrite)
|
||||
if err != nil {
|
||||
return err
|
||||
}
|
||||
|
||||
var req s.CreateProxy
|
||||
if err := g.Validator.ParseBody(c, &req); err != nil {
|
||||
return err
|
||||
}
|
||||
|
||||
if err := s.Proxy.Create(&req); err != nil {
|
||||
return err
|
||||
}
|
||||
|
||||
return c.JSON(nil)
|
||||
}
|
||||
|
||||
func UpdateProxy(c *fiber.Ctx) error {
|
||||
_, err := auth.GetAuthCtx(c).PermitAdmin(core.ScopeProxyWrite)
|
||||
if err != nil {
|
||||
return err
|
||||
}
|
||||
|
||||
var req s.UpdateProxy
|
||||
if err := g.Validator.ParseBody(c, &req); err != nil {
|
||||
return err
|
||||
}
|
||||
|
||||
if err := s.Proxy.Update(&req); err != nil {
|
||||
return err
|
||||
}
|
||||
|
||||
return c.JSON(nil)
|
||||
}
|
||||
|
||||
func UpdateProxyStatus(c *fiber.Ctx) error {
|
||||
_, err := auth.GetAuthCtx(c).PermitAdmin(core.ScopeProxyWriteStatus)
|
||||
if err != nil {
|
||||
return err
|
||||
}
|
||||
|
||||
var req s.UpdateProxyStatus
|
||||
if err := g.Validator.ParseBody(c, &req); err != nil {
|
||||
return err
|
||||
}
|
||||
|
||||
if err := s.Proxy.UpdateStatus(&req); err != nil {
|
||||
return err
|
||||
}
|
||||
|
||||
return c.JSON(nil)
|
||||
}
|
||||
|
||||
func RemoveProxy(c *fiber.Ctx) error {
|
||||
_, err := auth.GetAuthCtx(c).PermitAdmin(core.ScopeProxyWrite)
|
||||
if err != nil {
|
||||
return err
|
||||
}
|
||||
|
||||
var req core.IdReq
|
||||
if err := g.Validator.ParseBody(c, &req); err != nil {
|
||||
return err
|
||||
}
|
||||
|
||||
if err := s.Proxy.Remove(req.Id); err != nil {
|
||||
return err
|
||||
}
|
||||
|
||||
return c.JSON(nil)
|
||||
}
|
||||
|
||||
// region 报告上线
|
||||
|
||||
@@ -25,7 +25,6 @@ func ApplyRouters(app *fiber.App) {
|
||||
if env.RunMode == env.RunModeDev {
|
||||
debug := app.Group("/debug")
|
||||
debug.Get("/sms/:phone", handlers.DebugGetSmsCode)
|
||||
debug.Get("/proxy/register", handlers.DebugRegisterProxyBaiYin)
|
||||
debug.Get("/iden/clear/:phone", handlers.DebugIdentifyClear)
|
||||
debug.Get("/session/now", func(ctx *fiber.Ctx) error {
|
||||
rs, err := q.Session.Where(q.Session.AccessTokenExpires.Gt(time.Now())).Find()
|
||||
@@ -134,9 +133,6 @@ func clientRouter(api fiber.Router) {
|
||||
channel := client.Group("/channel")
|
||||
channel.Post("/remove", handlers.RemoveChannels)
|
||||
|
||||
// 代理网关注册
|
||||
proxy := client.Group("/proxy")
|
||||
proxy.Post("/register/baidyin", handlers.ProxyRegisterBaiYin)
|
||||
}
|
||||
|
||||
// 管理员接口路由
|
||||
@@ -196,6 +192,15 @@ func adminRouter(api fiber.Router) {
|
||||
channel.Post("/page", handlers.PageChannelByAdmin)
|
||||
channel.Post("/page/of-user", handlers.PageChannelOfUserByAdmin)
|
||||
|
||||
// proxy 代理
|
||||
var proxy = api.Group("/proxy")
|
||||
proxy.Post("/all", handlers.AllProxyByAdmin)
|
||||
proxy.Post("/page", handlers.PageProxyByAdmin)
|
||||
proxy.Post("/create", handlers.CreateProxy)
|
||||
proxy.Post("/update", handlers.UpdateProxy)
|
||||
proxy.Post("/update/status", handlers.UpdateProxyStatus)
|
||||
proxy.Post("/remove", handlers.RemoveProxy)
|
||||
|
||||
// trade 交易
|
||||
var trade = api.Group("/trade")
|
||||
trade.Post("/page", handlers.PageTradeByAdmin)
|
||||
|
||||
@@ -63,10 +63,27 @@ func (s *channelBaiyinProvider) CreateChannels(source netip.Addr, resourceId int
|
||||
}
|
||||
proxy := proxyResult.Proxy
|
||||
|
||||
// 获取可用通道
|
||||
chans, err := lockChans(proxy.ID, batch, count)
|
||||
// 锁内确认状态并锁定端口,避免与状态切换并发穿透
|
||||
var chans []netip.AddrPort
|
||||
err = g.Redsync.WithLock(proxyStatusLockKey(proxy.ID), func() error {
|
||||
lockedProxy, err := q.Proxy.Where(q.Proxy.ID.Eq(proxy.ID)).Take()
|
||||
if err != nil {
|
||||
return err
|
||||
}
|
||||
if lockedProxy.Status != m.ProxyStatusOnline {
|
||||
return core.NewBizErr("无可用主机,请稍后再试")
|
||||
}
|
||||
|
||||
chans, err = lockChans(proxy.ID, batch, count)
|
||||
if err != nil {
|
||||
return core.NewBizErr("无可用通道,请稍后再试", err)
|
||||
}
|
||||
|
||||
proxy = *lockedProxy
|
||||
return nil
|
||||
})
|
||||
if err != nil {
|
||||
return nil, core.NewBizErr("无可用通道,请稍后再试", err)
|
||||
return nil, err
|
||||
}
|
||||
|
||||
// 获取可用节点
|
||||
|
||||
@@ -1,65 +1,216 @@
|
||||
package services
|
||||
|
||||
import (
|
||||
"context"
|
||||
"fmt"
|
||||
"net/netip"
|
||||
"platform/pkg/u"
|
||||
"platform/web/core"
|
||||
g "platform/web/globals"
|
||||
"platform/web/globals/orm"
|
||||
m "platform/web/models"
|
||||
q "platform/web/queries"
|
||||
"strconv"
|
||||
"time"
|
||||
|
||||
"gorm.io/gen/field"
|
||||
)
|
||||
|
||||
var Proxy = &proxyService{}
|
||||
|
||||
type proxyService struct{}
|
||||
|
||||
// AllProxies 获取所有代理
|
||||
func (s *proxyService) AllProxies(proxyType m.ProxyType, channels bool) ([]*m.Proxy, error) {
|
||||
proxies, err := q.Proxy.Where(
|
||||
q.Proxy.Type.Eq(int(proxyType)),
|
||||
q.Proxy.Status.Eq(int(m.ProxyStatusOnline)),
|
||||
).Preload(
|
||||
q.Proxy.Channels.On(q.Channel.ExpiredAt.Gte(time.Now())),
|
||||
).Find()
|
||||
if err != nil {
|
||||
return nil, err
|
||||
}
|
||||
|
||||
return proxies, nil
|
||||
func proxyStatusLockKey(id int32) string {
|
||||
return fmt.Sprintf("platform:proxy:status:%d", id)
|
||||
}
|
||||
|
||||
// RegisterBaiyin 注册新代理服务
|
||||
func (s *proxyService) RegisterBaiyin(Name string, IP netip.Addr, username, password string) error {
|
||||
|
||||
// 保存代理信息
|
||||
proxy := &m.Proxy{
|
||||
Version: 0,
|
||||
Mac: Name,
|
||||
IP: orm.Inet{Addr: IP},
|
||||
Secret: u.P(fmt.Sprintf("%s:%s", username, password)),
|
||||
Type: m.ProxyTypeBaiYin,
|
||||
Status: m.ProxyStatusOnline,
|
||||
func hasUsedChans(proxyID int32) (bool, error) {
|
||||
ctx := context.Background()
|
||||
pattern := usedChansKey + ":" + strconv.Itoa(int(proxyID)) + ":*"
|
||||
keys, _, err := g.Redis.Scan(ctx, 0, pattern, 1).Result()
|
||||
if err != nil {
|
||||
return false, err
|
||||
}
|
||||
if err := q.Proxy.Create(proxy); err != nil {
|
||||
return core.NewServErr("保存通道数据失败")
|
||||
return len(keys) > 0, nil
|
||||
}
|
||||
|
||||
func rebuildFreeChans(proxyID int32, addr netip.Addr) error {
|
||||
if err := remChans(proxyID); err != nil {
|
||||
return err
|
||||
}
|
||||
|
||||
// 添加可用通道到 redis
|
||||
chans := make([]netip.AddrPort, 10000)
|
||||
for i := range 10000 {
|
||||
chans[i] = netip.AddrPortFrom(IP, uint16(i+10000))
|
||||
chans[i] = netip.AddrPortFrom(addr, uint16(i+10000))
|
||||
}
|
||||
err := regChans(proxy.ID, chans)
|
||||
|
||||
if err := regChans(proxyID, chans); err != nil {
|
||||
return err
|
||||
}
|
||||
return nil
|
||||
}
|
||||
|
||||
func (s *proxyService) Page(req core.PageReq) (result []*m.Proxy, count int64, err error) {
|
||||
return q.Proxy.
|
||||
Omit(q.Proxy.Version, q.Proxy.Meta).
|
||||
Order(q.Proxy.CreatedAt.Desc()).
|
||||
FindByPage(req.GetOffset(), req.GetLimit())
|
||||
}
|
||||
|
||||
func (s *proxyService) All() (result []*m.Proxy, err error) {
|
||||
return q.Proxy.
|
||||
Omit(q.Proxy.Version, q.Proxy.Meta).
|
||||
Order(q.Proxy.CreatedAt.Desc()).
|
||||
Find()
|
||||
}
|
||||
|
||||
type CreateProxy struct {
|
||||
Mac string `json:"mac" validate:"required"`
|
||||
IP string `json:"ip" validate:"required"`
|
||||
Host *string `json:"host"`
|
||||
Secret *string `json:"secret"`
|
||||
Type *m.ProxyType `json:"type"`
|
||||
Status *m.ProxyStatus `json:"status"`
|
||||
}
|
||||
|
||||
func (s *proxyService) Create(create *CreateProxy) error {
|
||||
addr, err := netip.ParseAddr(create.IP)
|
||||
if err != nil {
|
||||
return core.NewServErr("添加通道失败", err)
|
||||
return core.NewServErr("IP地址格式错误", err)
|
||||
}
|
||||
|
||||
return q.Q.Transaction(func(tx *q.Query) error {
|
||||
proxy := &m.Proxy{
|
||||
Mac: create.Mac,
|
||||
IP: orm.Inet{Addr: addr},
|
||||
Host: create.Host,
|
||||
Secret: create.Secret,
|
||||
Type: u.Else(create.Type, m.ProxyTypeSelfHosted),
|
||||
Status: u.Else(create.Status, m.ProxyStatusOffline),
|
||||
}
|
||||
if err := tx.Proxy.Create(proxy); err != nil {
|
||||
return core.NewServErr("保存代理数据失败", err)
|
||||
}
|
||||
if err := rebuildFreeChans(proxy.ID, addr); err != nil {
|
||||
return core.NewServErr("初始化代理通道失败", err)
|
||||
}
|
||||
return nil
|
||||
})
|
||||
}
|
||||
|
||||
type UpdateProxy struct {
|
||||
ID int32 `json:"id" validate:"required"`
|
||||
Mac *string `json:"mac"`
|
||||
IP *string `json:"ip"`
|
||||
Host *string `json:"host"`
|
||||
Secret *string `json:"secret"`
|
||||
}
|
||||
|
||||
func (s *proxyService) Update(update *UpdateProxy) error {
|
||||
simples := make([]field.AssignExpr, 0)
|
||||
hasSideEffect := false
|
||||
|
||||
if update.Mac != nil {
|
||||
hasSideEffect = true
|
||||
simples = append(simples, q.Proxy.Mac.Value(*update.Mac))
|
||||
}
|
||||
if update.IP != nil {
|
||||
addr, err := netip.ParseAddr(*update.IP)
|
||||
if err != nil {
|
||||
return core.NewServErr("IP地址格式错误", err)
|
||||
}
|
||||
hasSideEffect = true
|
||||
simples = append(simples, q.Proxy.IP.Value(orm.Inet{Addr: addr}))
|
||||
}
|
||||
if update.Host != nil {
|
||||
simples = append(simples, q.Proxy.Host.Value(*update.Host))
|
||||
}
|
||||
if update.Secret != nil {
|
||||
hasSideEffect = true
|
||||
simples = append(simples, q.Proxy.Secret.Value(*update.Secret))
|
||||
}
|
||||
|
||||
if len(simples) == 0 {
|
||||
return nil
|
||||
}
|
||||
|
||||
if hasSideEffect {
|
||||
used, err := hasUsedChans(update.ID)
|
||||
if err != nil {
|
||||
return core.NewServErr("检查代理通道状态失败", err)
|
||||
}
|
||||
if used {
|
||||
return core.NewBizErr("代理存在未关闭通道,禁止修改")
|
||||
}
|
||||
}
|
||||
|
||||
rs, err := q.Proxy.
|
||||
Where(
|
||||
q.Proxy.ID.Eq(update.ID),
|
||||
q.Proxy.Status.Eq(int(m.ProxyStatusOffline)),
|
||||
).
|
||||
UpdateSimple(simples...)
|
||||
if err != nil {
|
||||
return err
|
||||
}
|
||||
if rs.RowsAffected == 0 {
|
||||
return core.NewBizErr("代理未下线,禁止修改")
|
||||
}
|
||||
return nil
|
||||
}
|
||||
|
||||
// UnregisterBaiyin 注销代理服务
|
||||
func (s *proxyService) UnregisterBaiyin(id int) error {
|
||||
func (s *proxyService) Remove(id int32) error {
|
||||
used, err := hasUsedChans(id)
|
||||
if err != nil {
|
||||
return core.NewServErr("检查代理通道状态失败", err)
|
||||
}
|
||||
if used {
|
||||
return core.NewBizErr("代理存在未关闭通道,禁止删除")
|
||||
}
|
||||
|
||||
rs, err := q.Proxy.
|
||||
Where(
|
||||
q.Proxy.ID.Eq(id),
|
||||
q.Proxy.Status.Eq(int(m.ProxyStatusOffline)),
|
||||
).
|
||||
UpdateColumn(q.Proxy.DeletedAt, time.Now())
|
||||
if err != nil {
|
||||
return err
|
||||
}
|
||||
if rs.RowsAffected == 0 {
|
||||
return core.NewBizErr("代理未下线,禁止删除")
|
||||
}
|
||||
if err := remChans(id); err != nil {
|
||||
return core.NewServErr("注销代理通道失败", err)
|
||||
}
|
||||
return nil
|
||||
}
|
||||
|
||||
type UpdateProxyStatus struct {
|
||||
ID int32 `json:"id" validate:"required"`
|
||||
Status m.ProxyStatus `json:"status" validate:"required"`
|
||||
}
|
||||
|
||||
func (s *proxyService) UpdateStatus(update *UpdateProxyStatus) error {
|
||||
return g.Redsync.WithLock(proxyStatusLockKey(update.ID), func() error {
|
||||
proxy, err := q.Proxy.Where(q.Proxy.ID.Eq(update.ID)).Take()
|
||||
if err != nil {
|
||||
return err
|
||||
}
|
||||
|
||||
if proxy.Status == update.Status {
|
||||
return nil
|
||||
}
|
||||
|
||||
if update.Status == m.ProxyStatusOnline {
|
||||
if err := rebuildFreeChans(proxy.ID, proxy.IP.Addr); err != nil {
|
||||
return core.NewServErr("初始化代理通道失败", err)
|
||||
}
|
||||
}
|
||||
|
||||
_, err = q.Proxy.
|
||||
Where(q.Proxy.ID.Eq(update.ID)).
|
||||
UpdateSimple(q.Proxy.Status.Value(int(update.Status)))
|
||||
return err
|
||||
})
|
||||
}
|
||||
|
||||
Reference in New Issue
Block a user