Files
platform/web/services/proxy.go

352 lines
8.4 KiB
Go

package services
import (
"context"
"errors"
"fmt"
"net/netip"
"platform/pkg/u"
"platform/web/core"
g "platform/web/globals"
"platform/web/globals/orm"
m "platform/web/models"
q "platform/web/queries"
"strings"
"time"
"gorm.io/gen/field"
"gorm.io/gorm"
)
var Proxy = &proxyService{}
type proxyService struct{}
func hasUsedChans(proxyID int32) (bool, error) {
ctx := context.Background()
pattern := usedChansKey(proxyID, "*")
var cursor uint64
for {
keys, next, err := g.Redis.Scan(ctx, cursor, pattern, 100).Result()
if err != nil {
return false, err
}
if len(keys) > 0 {
return true, nil
}
if next == 0 {
return false, nil
}
cursor = next
}
}
func rebuildFreeChans(proxyID int32, addr netip.Addr) error {
if err := remChans(proxyID); err != nil {
return err
}
chans := make([]netip.AddrPort, 10000)
for i := range 10000 {
chans[i] = netip.AddrPortFrom(addr, uint16(i+10000))
}
if err := regChans(proxyID, chans); err != nil {
return err
}
return nil
}
func (s *proxyService) Page(req core.PageReq) (result []*m.Proxy, count int64, err error) {
return q.Proxy.
Omit(q.Proxy.Version, q.Proxy.Meta).
Order(q.Proxy.CreatedAt.Desc()).
FindByPage(req.GetOffset(), req.GetLimit())
}
func (s *proxyService) All() (result []*m.Proxy, err error) {
return q.Proxy.
Omit(q.Proxy.Version, q.Proxy.Meta).
Order(q.Proxy.CreatedAt.Desc()).
Find()
}
type CreateProxy struct {
Mac string `json:"mac" validate:"required"`
IP string `json:"ip" validate:"required"`
Host *string `json:"host"`
Port *int `json:"port"`
Secret *string `json:"secret"`
Type *m.ProxyType `json:"type"`
Status *m.ProxyStatus `json:"status"`
}
func (s *proxyService) Create(create *CreateProxy) error {
addr, err := netip.ParseAddr(create.IP)
if err != nil {
return core.NewServErr("IP地址格式错误", err)
}
return q.Q.Transaction(func(tx *q.Query) error {
proxy := &m.Proxy{
Mac: create.Mac,
IP: orm.Inet{Addr: addr},
Host: create.Host,
Port: create.Port,
Secret: create.Secret,
Type: u.Else(create.Type, m.ProxyTypeSelfHosted),
Status: u.Else(create.Status, m.ProxyStatusOffline),
}
if err := tx.Proxy.Create(proxy); err != nil {
return core.NewServErr("保存代理数据失败", err)
}
if err := rebuildFreeChans(proxy.ID, addr); err != nil {
return core.NewServErr("初始化代理通道失败", err)
}
return nil
})
}
type UpdateProxy struct {
ID int32 `json:"id" validate:"required"`
Mac *string `json:"mac"`
IP *string `json:"ip"`
Host *string `json:"host"`
Port *int `json:"port"`
Secret *string `json:"secret"`
}
func (s *proxyService) Update(update *UpdateProxy) error {
simples := make([]field.AssignExpr, 0)
hasSideEffect := false
if update.Mac != nil {
hasSideEffect = true
simples = append(simples, q.Proxy.Mac.Value(*update.Mac))
}
if update.IP != nil {
addr, err := netip.ParseAddr(*update.IP)
if err != nil {
return core.NewServErr("IP地址格式错误", err)
}
hasSideEffect = true
simples = append(simples, q.Proxy.IP.Value(orm.Inet{Addr: addr}))
}
if update.Host != nil {
simples = append(simples, q.Proxy.Host.Value(*update.Host))
}
if update.Port != nil {
simples = append(simples, q.Proxy.Port.Value(*update.Port))
}
if update.Secret != nil {
hasSideEffect = true
simples = append(simples, q.Proxy.Secret.Value(*update.Secret))
}
if len(simples) == 0 {
return nil
}
if hasSideEffect {
used, err := hasUsedChans(update.ID)
if err != nil {
return core.NewServErr("检查代理通道状态失败", err)
}
if used {
return core.NewBizErr("代理存在未关闭通道,禁止修改")
}
}
rs, err := q.Proxy.
Where(
q.Proxy.ID.Eq(update.ID),
q.Proxy.Status.Eq(int(m.ProxyStatusOffline)),
).
UpdateSimple(simples...)
if err != nil {
return err
}
if rs.RowsAffected == 0 {
return core.NewBizErr("代理未下线,禁止修改")
}
return nil
}
func (s *proxyService) SyncPorts(id int32) error {
proxy, err := findOfflineProxy(id)
if err != nil {
return err
}
used, err := hasUsedChans(id)
if err != nil {
return core.NewServErr("检查代理通道状态失败", err)
}
if used {
return core.NewBizErr("代理存在未关闭通道,禁止重建端口池")
}
return rebuildFreeChans(id, proxy.IP.Addr)
}
func (s *proxyService) SyncChains(id int32) error {
proxy, err := findOfflineProxy(id)
if err != nil {
return err
}
if proxy.Type != m.ProxyTypeGost {
return core.NewBizErr("仅 GOST 代理支持重建代理链")
}
chains, err := buildGostChainsFromEdges()
if err != nil {
return err
}
client, err := proxyGost(proxy)
if err != nil {
return core.NewServErr("创建 GOST 客户端失败", err)
}
oldChains, err := client.ListChains()
if err != nil {
return core.NewServErr("查询 GOST chains 失败", err)
}
for _, chain := range oldChains {
if err := client.DeleteChain(chain.Name); err != nil {
return core.NewServErr(fmt.Sprintf("删除 GOST chain 失败: %s", chain.Name), err)
}
}
for _, chain := range chains {
if err := client.CreateChain(chain); err != nil {
return core.NewServErr(fmt.Sprintf("创建 GOST chain 失败: %s", chain.Name), err)
}
}
if err := client.SaveConfig(); err != nil {
return core.NewServErr("保存 GOST 配置失败", err)
}
return nil
}
func findOfflineProxy(id int32) (*m.Proxy, error) {
proxy, err := q.Proxy.Where(q.Proxy.ID.Eq(id)).Take()
if errors.Is(err, gorm.ErrRecordNotFound) {
return nil, core.NewBizErr("代理不存在")
}
if err != nil {
return nil, core.NewServErr("获取代理数据失败", err)
}
if proxy.Status != m.ProxyStatusOffline {
return nil, core.NewBizErr("代理未下线,禁止同步")
}
return proxy, nil
}
func buildGostChainsFromEdges() ([]*g.GostChainConfig, error) {
edges, err := q.Edge.
Where(q.Edge.Type.Eq(int(m.EdgeTypeGostChain))).
Order(q.Edge.ID).
Find()
if err != nil {
return nil, core.NewServErr("查询 GOST edge 数据失败", err)
}
chains := make([]*g.GostChainConfig, len(edges))
for i, edge := range edges {
if strings.TrimSpace(edge.Mac) == "" {
return nil, core.NewBizErr(fmt.Sprintf("GOST edge %d chain 名称为空", edge.ID))
}
if !edge.IP.Addr.IsValid() {
return nil, core.NewBizErr(fmt.Sprintf("GOST edge %s IP 无效", edge.Mac))
}
if edge.Port == nil || *edge.Port == 0 {
return nil, core.NewBizErr(fmt.Sprintf("GOST edge %s 端口为空", edge.Mac))
}
chains[i] = &g.GostChainConfig{
Name: edge.Mac,
Hops: []g.GostHopConfig{{
Nodes: []g.GostNodeConfig{{
Addr: netip.AddrPortFrom(edge.IP.Addr, *edge.Port).String(),
Connector: g.GostConnectorConfig{
Type: "socks5",
},
Dialer: g.GostDialerConfig{
Type: "tcp",
},
}},
}},
}
}
return chains, nil
}
func (s *proxyService) Remove(id int32) error {
used, err := hasUsedChans(id)
if err != nil {
return core.NewServErr("检查代理通道状态失败", err)
}
if used {
return core.NewBizErr("代理存在未关闭通道,禁止删除")
}
rs, err := q.Proxy.
Where(
q.Proxy.ID.Eq(id),
q.Proxy.Status.Eq(int(m.ProxyStatusOffline)),
).
UpdateColumn(q.Proxy.DeletedAt, time.Now())
if err != nil {
return err
}
if rs.RowsAffected == 0 {
return core.NewBizErr("代理未下线,禁止删除")
}
if err := remChans(id); err != nil {
return core.NewServErr("注销代理通道失败", err)
}
return nil
}
type UpdateProxyStatus struct {
ID int32 `json:"id" validate:"required"`
Status m.ProxyStatus `json:"status"`
}
func (s *proxyService) UpdateStatus(update *UpdateProxyStatus) error {
return q.Q.Transaction(func(tx *q.Query) error {
proxy, err := q.Proxy.Where(q.Proxy.ID.Eq(update.ID)).Take()
if err != nil {
return err
}
if proxy.Status == update.Status {
return nil
}
if update.Status == m.ProxyStatusOnline {
if err := rebuildFreeChans(proxy.ID, proxy.IP.Addr); err != nil {
return core.NewServErr("初始化代理通道失败", err)
}
}
_, err = q.Proxy.
Where(q.Proxy.ID.Eq(update.ID)).
UpdateSimple(q.Proxy.Status.Value(int(update.Status)))
return err
})
}
func proxyGateway(proxy *m.Proxy) (g.GatewayClient, error) {
secret := strings.Split(u.Z(proxy.Secret), ":")
if len(secret) != 2 {
return nil, core.NewServErr(fmt.Sprintf("代理 %s 密钥格式错误", proxy.IP.String()), nil)
}
gateway := g.NewGateway(proxy.IP.String(), secret[0], secret[1])
return gateway, nil
}