实现手动 proxy 同步接口

This commit is contained in:
2026-06-11 15:07:46 +08:00
parent ebac8042ea
commit 513fe78815
9 changed files with 362 additions and 22 deletions

View File

@@ -5010,6 +5010,42 @@ paths:
default: default:
$ref: "#/components/responses/PlainTextError" $ref: "#/components/responses/PlainTextError"
/api/admin/proxy/sync/ports:
post:
tags: [admin/proxy]
summary: 重建端口池
security:
- bearerAuth: []
requestBody:
required: true
content:
application/json:
schema:
$ref: "#/components/schemas/IdRequest"
responses:
"200":
description: 成功,无响应体
default:
$ref: "#/components/responses/PlainTextError"
/api/admin/proxy/sync/chains:
post:
tags: [admin/proxy]
summary: 重建代理链
security:
- bearerAuth: []
requestBody:
required: true
content:
application/json:
schema:
$ref: "#/components/schemas/IdRequest"
responses:
"200":
description: 成功,无响应体
default:
$ref: "#/components/responses/PlainTextError"
/api/admin/proxy/remove: /api/admin/proxy/remove:
post: post:
tags: [admin/proxy] tags: [admin/proxy]

View File

@@ -52,7 +52,8 @@ func ErrorHandler(c *fiber.Ctx, err error) error {
case errors.As(err, &servErr): case errors.As(err, &servErr):
code = fiber.StatusInternalServerError code = fiber.StatusInternalServerError
message = err.Error() slog.Warn("服务端错误", slog.String("error", servErr.Error()))
message = "服务端错误"
case errors.As(err, &timeErr): case errors.As(err, &timeErr):
code = fiber.StatusBadRequest code = fiber.StatusBadRequest

View File

@@ -19,7 +19,11 @@ func IsGostNotFound(err error) bool {
} }
type GostClient interface { type GostClient interface {
ListChains() ([]*GostChainConfig, error)
GetChain(name string) (*GostChainConfig, error) GetChain(name string) (*GostChainConfig, error)
CreateChain(chain *GostChainConfig) error
DeleteChain(name string) error
SaveConfig() error
CreateService(service *GostServiceConfig) error CreateService(service *GostServiceConfig) error
DeleteService(name string) error DeleteService(name string) error
CreateAuther(auther *GostAutherConfig) error CreateAuther(auther *GostAutherConfig) error
@@ -54,15 +58,37 @@ func NewGost(host string, port int, pathPrefix, username, password string) GostC
} }
type GostChainConfig struct { type GostChainConfig struct {
Name string `json:"name"` Name string `json:"name"`
Hops []GostHopConfig `json:"hops,omitempty"`
}
type GostHopConfig struct {
Name string `json:"name,omitempty"`
Nodes []GostNodeConfig `json:"nodes,omitempty"`
}
type GostNodeConfig struct {
Name string `json:"name,omitempty"`
Addr string `json:"addr"`
Connector GostConnectorConfig `json:"connector"`
Dialer GostDialerConfig `json:"dialer"`
}
type GostConnectorConfig struct {
Type string `json:"type"`
}
type GostDialerConfig struct {
Type string `json:"type"`
} }
type GostServiceConfig struct { type GostServiceConfig struct {
Name string `json:"name"` Name string `json:"name"`
Addr string `json:"addr"` Addr string `json:"addr"`
Admission string `json:"admission,omitempty"` Admission string `json:"admission,omitempty"`
Handler GostHandlerConfig `json:"handler"` Handler GostHandlerConfig `json:"handler"`
Listener GostListenerConfig `json:"listener"` Listener GostListenerConfig `json:"listener"`
Recorders []GostRecorderConfig `json:"recorders,omitempty"`
} }
type GostHandlerConfig struct { type GostHandlerConfig struct {
@@ -75,6 +101,11 @@ type GostListenerConfig struct {
Type string `json:"type"` Type string `json:"type"`
} }
type GostRecorderConfig struct {
Name string `json:"name"`
Record string `json:"record"`
}
type GostAutherConfig struct { type GostAutherConfig struct {
Name string `json:"name"` Name string `json:"name"`
Auths []GostAuthConfig `json:"auths"` Auths []GostAuthConfig `json:"auths"`
@@ -115,6 +146,40 @@ func (c *gostClient) GetChain(name string) (*GostChainConfig, error) {
return &GostChainConfig{Name: name}, nil return &GostChainConfig{Name: name}, nil
} }
func (c *gostClient) ListChains() ([]*GostChainConfig, error) {
body, err := c.get("/config/chains")
if err != nil {
return nil, err
}
if len(body) == 0 {
return nil, nil
}
var resp struct {
Data struct {
Count int `json:"count"`
List []*GostChainConfig `json:"list"`
} `json:"data"`
}
if err := json.Unmarshal(body, &resp); err != nil {
return nil, fmt.Errorf("parse gost chain list failed %s: %w", string(body), err)
}
return resp.Data.List, nil
}
func (c *gostClient) CreateChain(chain *GostChainConfig) error {
return c.create("/config/chains", chain)
}
func (c *gostClient) DeleteChain(name string) error {
return c.delete("/config/chains/" + url.PathEscape(name))
}
func (c *gostClient) SaveConfig() error {
return c.create("/config", nil)
}
func (c *gostClient) CreateService(service *GostServiceConfig) error { func (c *gostClient) CreateService(service *GostServiceConfig) error {
return c.create("/config/services", service) return c.create("/config/services", service)
} }

107
web/globals/gost_test.go Normal file
View File

@@ -0,0 +1,107 @@
package globals
import (
"encoding/json"
"net/http"
"net/http/httptest"
"testing"
)
func TestGostClientChainOperations(t *testing.T) {
var (
created *GostChainConfig
deleted []string
saved bool
)
server := httptest.NewServer(http.HandlerFunc(func(w http.ResponseWriter, r *http.Request) {
username, password, ok := r.BasicAuth()
if !ok || username != "user" || password != "pass" {
t.Errorf("unexpected auth: ok=%v username=%q password=%q", ok, username, password)
http.Error(w, "unauthorized", http.StatusUnauthorized)
return
}
switch {
case r.Method == http.MethodGet && r.URL.Path == "/api/config/chains":
_ = json.NewEncoder(w).Encode(map[string]any{
"count": 2,
"list": []map[string]any{
{"name": "old-a"},
{"name": "old-b"},
},
})
case r.Method == http.MethodPost && r.URL.Path == "/api/config/chains":
if err := json.NewDecoder(r.Body).Decode(&created); err != nil {
t.Errorf("Decode chain failed: %v", err)
http.Error(w, "bad request", http.StatusBadRequest)
return
}
_, _ = w.Write([]byte(`{}`))
case r.Method == http.MethodDelete && r.URL.Path == "/api/config/chains/old-a":
deleted = append(deleted, "old-a")
_, _ = w.Write([]byte(`{}`))
case r.Method == http.MethodDelete && r.URL.Path == "/api/config/chains/old-b":
deleted = append(deleted, "old-b")
_, _ = w.Write([]byte(`{}`))
case r.Method == http.MethodPost && r.URL.Path == "/api/config":
saved = true
_, _ = w.Write([]byte(`{}`))
default:
t.Errorf("unexpected request: %s %s", r.Method, r.URL.Path)
http.NotFound(w, r)
}
}))
defer server.Close()
client := NewGost(server.URL, 9700, "/api", "user", "pass")
chains, err := client.ListChains()
if err != nil {
t.Fatalf("ListChains returned error: %v", err)
}
if len(chains) != 2 || chains[0].Name != "old-a" || chains[1].Name != "old-b" {
t.Fatalf("unexpected chains: %#v", chains)
}
if err := client.DeleteChain(chains[0].Name); err != nil {
t.Fatalf("DeleteChain old-a returned error: %v", err)
}
if err := client.DeleteChain(chains[1].Name); err != nil {
t.Fatalf("DeleteChain old-b returned error: %v", err)
}
if len(deleted) != 2 {
t.Fatalf("unexpected deleted chains: %#v", deleted)
}
err = client.CreateChain(&GostChainConfig{
Name: "edge-a",
Hops: []GostHopConfig{{
Nodes: []GostNodeConfig{{
Addr: "192.0.2.1:1080",
Connector: GostConnectorConfig{Type: "socks5"},
Dialer: GostDialerConfig{Type: "tcp"},
}},
}},
})
if err != nil {
t.Fatalf("CreateChain returned error: %v", err)
}
if created == nil || created.Name != "edge-a" {
t.Fatalf("unexpected created chain: %#v", created)
}
if len(created.Hops) != 1 || len(created.Hops[0].Nodes) != 1 {
t.Fatalf("unexpected created chain hops: %#v", created.Hops)
}
node := created.Hops[0].Nodes[0]
if node.Addr != "192.0.2.1:1080" || node.Connector.Type != "socks5" || node.Dialer.Type != "tcp" {
t.Fatalf("unexpected created node: %#v", node)
}
if err := client.SaveConfig(); err != nil {
t.Fatalf("SaveConfig returned error: %v", err)
}
if !saved {
t.Fatal("expected SaveConfig request")
}
}

View File

@@ -105,7 +105,7 @@ func UpdateProxyStatus(c *fiber.Ctx) error {
return c.JSON(nil) return c.JSON(nil)
} }
func SyncProxyPool(c *fiber.Ctx) error { func SyncProxyPorts(c *fiber.Ctx) error {
_, err := auth.GetAuthCtx(c).PermitAdmin(core.ScopeProxyWrite) _, err := auth.GetAuthCtx(c).PermitAdmin(core.ScopeProxyWrite)
if err != nil { if err != nil {
return err return err
@@ -116,7 +116,25 @@ func SyncProxyPool(c *fiber.Ctx) error {
return err return err
} }
if err := s.Proxy.SyncPool(req.Id); err != nil { if err := s.Proxy.SyncPorts(req.Id); err != nil {
return err
}
return c.JSON(nil)
}
func SyncProxyChains(c *fiber.Ctx) error {
_, err := auth.GetAuthCtx(c).PermitAdmin(core.ScopeProxyWrite)
if err != nil {
return err
}
var req core.IdReq
if err := g.Validator.ParseBody(c, &req); err != nil {
return err
}
if err := s.Proxy.SyncChains(req.Id); err != nil {
return err return err
} }

View File

@@ -129,9 +129,6 @@ func clientRouter(api fiber.Router) {
client.Post("/verify/sms", handlers.SendSmsCode) client.Post("/verify/sms", handlers.SendSmsCode)
// 网关 // 网关
proxy := client.Group("/proxy")
proxy.Post("/sync-pool", handlers.SyncProxyPool)
// 通道管理 // 通道管理
channel := client.Group("/channel") channel := client.Group("/channel")
channel.Post("/remove", handlers.RemoveChannels) channel.Post("/remove", handlers.RemoveChannels)
@@ -277,6 +274,8 @@ func adminRouter(api fiber.Router) {
proxy.Post("/create", handlers.CreateProxy) proxy.Post("/create", handlers.CreateProxy)
proxy.Post("/update", handlers.UpdateProxy) proxy.Post("/update", handlers.UpdateProxy)
proxy.Post("/update/status", handlers.UpdateProxyStatus) proxy.Post("/update/status", handlers.UpdateProxyStatus)
proxy.Post("/sync/ports", handlers.SyncProxyPorts)
proxy.Post("/sync/chains", handlers.SyncProxyChains)
proxy.Post("/remove", handlers.RemoveProxy) proxy.Post("/remove", handlers.RemoveProxy)
// trade 交易 // trade 交易

View File

@@ -397,7 +397,7 @@ func selectProxyByType(proxyType m.ProxyType, count int) (*m.Proxy, error) {
} }
} }
if maxCount < count { if maxCount < count {
return nil, core.NewBizErr("无可用代理") return nil, core.NewBizErr("无空闲代理")
} }
return bestProxy, nil return bestProxy, nil

View File

@@ -50,6 +50,9 @@ func (s *channelGostProvider) prepareCreate(ctx *channelCreateContext) (*channel
Listener: g.GostListenerConfig{ Listener: g.GostListenerConfig{
Type: "tcp", Type: "tcp",
}, },
Recorders: []g.GostRecorderConfig{
{Name: "record-file", Record: "recorder.service.handler"},
},
} }
if ctx.AuthWhitelist { if ctx.AuthWhitelist {

View File

@@ -2,6 +2,7 @@ package services
import ( import (
"context" "context"
"errors"
"fmt" "fmt"
"net/netip" "net/netip"
"platform/pkg/u" "platform/pkg/u"
@@ -14,6 +15,7 @@ import (
"time" "time"
"gorm.io/gen/field" "gorm.io/gen/field"
"gorm.io/gorm"
) )
var Proxy = &proxyService{} var Proxy = &proxyService{}
@@ -23,11 +25,20 @@ type proxyService struct{}
func hasUsedChans(proxyID int32) (bool, error) { func hasUsedChans(proxyID int32) (bool, error) {
ctx := context.Background() ctx := context.Background()
pattern := usedChansKey(proxyID, "*") pattern := usedChansKey(proxyID, "*")
keys, _, err := g.Redis.Scan(ctx, 0, pattern, 1).Result() var cursor uint64
if err != nil { for {
return false, err keys, next, err := g.Redis.Scan(ctx, cursor, pattern, 100).Result()
if err != nil {
return false, err
}
if len(keys) > 0 {
return true, nil
}
if next == 0 {
return false, nil
}
cursor = next
} }
return len(keys) > 0, nil
} }
func rebuildFreeChans(proxyID int32, addr netip.Addr) error { func rebuildFreeChans(proxyID int32, addr netip.Addr) error {
@@ -161,17 +172,117 @@ func (s *proxyService) Update(update *UpdateProxy) error {
return nil return nil
} }
func (s *proxyService) SyncPool(id int32) error { func (s *proxyService) SyncPorts(id int32) error {
proxy, err := q.Proxy.Where(q.Proxy.ID.Eq(id)).Select(q.Proxy.ID, q.Proxy.IP).First() proxy, err := findOfflineProxy(id)
if err != nil { if err != nil {
return core.NewServErr("获取代理数据失败", err) return err
} }
if proxy == nil {
return core.NewBizErr("代理不存在") used, err := hasUsedChans(id)
if err != nil {
return core.NewServErr("检查代理通道状态失败", err)
} }
if used {
return core.NewBizErr("代理存在未关闭通道,禁止重建端口池")
}
return rebuildFreeChans(id, proxy.IP.Addr) return rebuildFreeChans(id, proxy.IP.Addr)
} }
func (s *proxyService) SyncChains(id int32) error {
proxy, err := findOfflineProxy(id)
if err != nil {
return err
}
if proxy.Type != m.ProxyTypeGost {
return core.NewBizErr("仅 GOST 代理支持重建代理链")
}
chains, err := buildGostChainsFromEdges()
if err != nil {
return err
}
client, err := proxyGost(proxy)
if err != nil {
return core.NewServErr("创建 GOST 客户端失败", err)
}
oldChains, err := client.ListChains()
if err != nil {
return core.NewServErr("查询 GOST chains 失败", err)
}
for _, chain := range oldChains {
if err := client.DeleteChain(chain.Name); err != nil {
return core.NewServErr(fmt.Sprintf("删除 GOST chain 失败: %s", chain.Name), err)
}
}
for _, chain := range chains {
if err := client.CreateChain(chain); err != nil {
return core.NewServErr(fmt.Sprintf("创建 GOST chain 失败: %s", chain.Name), err)
}
}
if err := client.SaveConfig(); err != nil {
return core.NewServErr("保存 GOST 配置失败", err)
}
return nil
}
func findOfflineProxy(id int32) (*m.Proxy, error) {
proxy, err := q.Proxy.Where(q.Proxy.ID.Eq(id)).Take()
if errors.Is(err, gorm.ErrRecordNotFound) {
return nil, core.NewBizErr("代理不存在")
}
if err != nil {
return nil, core.NewServErr("获取代理数据失败", err)
}
if proxy.Status != m.ProxyStatusOffline {
return nil, core.NewBizErr("代理未下线,禁止同步")
}
return proxy, nil
}
func buildGostChainsFromEdges() ([]*g.GostChainConfig, error) {
edges, err := q.Edge.
Where(q.Edge.Type.Eq(int(m.EdgeTypeGostChain))).
Order(q.Edge.ID).
Find()
if err != nil {
return nil, core.NewServErr("查询 GOST edge 数据失败", err)
}
chains := make([]*g.GostChainConfig, len(edges))
for i, edge := range edges {
if strings.TrimSpace(edge.Mac) == "" {
return nil, core.NewBizErr(fmt.Sprintf("GOST edge %d chain 名称为空", edge.ID))
}
if !edge.IP.Addr.IsValid() {
return nil, core.NewBizErr(fmt.Sprintf("GOST edge %s IP 无效", edge.Mac))
}
if edge.Port == nil || *edge.Port == 0 {
return nil, core.NewBizErr(fmt.Sprintf("GOST edge %s 端口为空", edge.Mac))
}
chains[i] = &g.GostChainConfig{
Name: edge.Mac,
Hops: []g.GostHopConfig{{
Nodes: []g.GostNodeConfig{{
Addr: netip.AddrPortFrom(edge.IP.Addr, *edge.Port).String(),
Connector: g.GostConnectorConfig{
Type: "socks5",
},
Dialer: g.GostDialerConfig{
Type: "tcp",
},
}},
}},
}
}
return chains, nil
}
func (s *proxyService) Remove(id int32) error { func (s *proxyService) Remove(id int32) error {
used, err := hasUsedChans(id) used, err := hasUsedChans(id)
if err != nil { if err != nil {