完善数据填充脚本 & 移除 cron 任务 & 兼容旧事件
This commit is contained in:
@@ -184,6 +184,7 @@ values
|
|||||||
('许昌', 2, (select id from area where name = '河南')),
|
('许昌', 2, (select id from area where name = '河南')),
|
||||||
('郑州', 2, (select id from area where name = '河南')),
|
('郑州', 2, (select id from area where name = '河南')),
|
||||||
('驻马店', 2, (select id from area where name = '河南')),
|
('驻马店', 2, (select id from area where name = '河南')),
|
||||||
|
('平顶山', 2, (select id from area where name = '河南')),
|
||||||
('鹤壁', 2, (select id from area where name = '河南')),
|
('鹤壁', 2, (select id from area where name = '河南')),
|
||||||
('丽水', 2, (select id from area where name = '浙江')),
|
('丽水', 2, (select id from area where name = '浙江')),
|
||||||
('台州', 2, (select id from area where name = '浙江')),
|
('台州', 2, (select id from area where name = '浙江')),
|
||||||
|
|||||||
File diff suppressed because it is too large
Load Diff
@@ -614,8 +614,8 @@ create table proxy (
|
|||||||
updated_at timestamptz default current_timestamp,
|
updated_at timestamptz default current_timestamp,
|
||||||
deleted_at timestamptz
|
deleted_at timestamptz
|
||||||
);
|
);
|
||||||
create unique index udx_proxy_mac on proxy (mac) where deleted_at is null;
|
create index idx_proxy_mac on proxy (mac) where deleted_at is null;
|
||||||
create unique index udx_proxy_ip on proxy (ip) where deleted_at is null;
|
create index idx_proxy_ip on proxy (ip) where deleted_at is null;
|
||||||
create index idx_proxy_created_at on proxy (created_at) where deleted_at is null;
|
create index idx_proxy_created_at on proxy (created_at) where deleted_at is null;
|
||||||
|
|
||||||
-- proxy表字段注释
|
-- proxy表字段注释
|
||||||
|
|||||||
@@ -1,9 +1,7 @@
|
|||||||
package events
|
package events
|
||||||
|
|
||||||
import "github.com/hibiken/asynq"
|
|
||||||
|
|
||||||
const RefreshEdge = "edge:refresh"
|
const RefreshEdge = "edge:refresh"
|
||||||
|
|
||||||
func NewRefreshEdge() *asynq.Task {
|
// func NewRefreshEdge() *asynq.Task {
|
||||||
return asynq.NewTask(RefreshEdge, nil)
|
// return asynq.NewTask(RefreshEdge, nil)
|
||||||
}
|
// }
|
||||||
|
|||||||
@@ -136,7 +136,12 @@ func (s *channelServer) RemoveChannels(batch string) error {
|
|||||||
return nil
|
return nil
|
||||||
}
|
}
|
||||||
|
|
||||||
if err := s.provider.removeRemote(batch, usedBatch); err != nil {
|
provider, err := channelProviderByProxyID(usedBatch.ProxyID)
|
||||||
|
if err != nil {
|
||||||
|
return err
|
||||||
|
}
|
||||||
|
|
||||||
|
if err := provider.removeRemote(batch, usedBatch); err != nil {
|
||||||
return err
|
return err
|
||||||
}
|
}
|
||||||
if err := freeChans(usedBatch.ProxyID, batch); err != nil {
|
if err := freeChans(usedBatch.ProxyID, batch); err != nil {
|
||||||
@@ -407,6 +412,22 @@ func selectProxyByType(proxyType m.ProxyType, count int) (*m.Proxy, error) {
|
|||||||
return bestProxy, nil
|
return bestProxy, nil
|
||||||
}
|
}
|
||||||
|
|
||||||
|
func channelProviderByProxyID(proxyID int32) (channelProvider, error) {
|
||||||
|
proxy, err := q.Proxy.Where(q.Proxy.ID.Eq(proxyID)).Take()
|
||||||
|
if err != nil {
|
||||||
|
return nil, core.NewServErr("获取代理数据失败", err)
|
||||||
|
}
|
||||||
|
|
||||||
|
switch proxy.Type {
|
||||||
|
case m.ProxyTypeGost:
|
||||||
|
return &channelGostProvider{}, nil
|
||||||
|
case m.ProxyTypeBaiYin:
|
||||||
|
return &channelBaiyinProvider{}, nil
|
||||||
|
default:
|
||||||
|
return nil, core.NewBizErr(fmt.Sprintf("不支持的代理类型: %d", proxy.Type))
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
func (s *channelServer) RefreshEdges() error {
|
func (s *channelServer) RefreshEdges() error {
|
||||||
|
|
||||||
// 仅白银网关支持边缘节点刷新,GOST 不参与此流程。
|
// 仅白银网关支持边缘节点刷新,GOST 不参与此流程。
|
||||||
|
|||||||
@@ -7,7 +7,6 @@ import (
|
|||||||
"platform/web/core"
|
"platform/web/core"
|
||||||
g "platform/web/globals"
|
g "platform/web/globals"
|
||||||
m "platform/web/models"
|
m "platform/web/models"
|
||||||
q "platform/web/queries"
|
|
||||||
)
|
)
|
||||||
|
|
||||||
type channelBaiyinProvider struct{}
|
type channelBaiyinProvider struct{}
|
||||||
@@ -68,30 +67,31 @@ func (s *channelBaiyinProvider) prepareCreate(ctx *channelCreateContext) (*chann
|
|||||||
}
|
}
|
||||||
|
|
||||||
func (s *channelBaiyinProvider) removeRemote(_ string, batch *usedChanBatch) error {
|
func (s *channelBaiyinProvider) removeRemote(_ string, batch *usedChanBatch) error {
|
||||||
configs := make([]*g.PortConfigsReq, len(batch.Chans))
|
|
||||||
for i, ch := range batch.Chans {
|
|
||||||
configs[i] = &g.PortConfigsReq{
|
|
||||||
Port: int(ch.Port()),
|
|
||||||
Edge: &[]string{},
|
|
||||||
AutoEdgeConfig: &g.AutoEdgeConfig{Count: u.P(0)},
|
|
||||||
Status: false,
|
|
||||||
}
|
|
||||||
}
|
|
||||||
|
|
||||||
proxy, err := q.Proxy.Where(q.Proxy.ID.Eq(batch.ProxyID)).Take()
|
|
||||||
if err != nil {
|
|
||||||
return core.NewServErr("获取代理数据失败", err)
|
|
||||||
}
|
|
||||||
|
|
||||||
gateway, err := proxyGateway(proxy)
|
|
||||||
if err != nil {
|
|
||||||
return core.NewServErr("创建代理网关失败", err)
|
|
||||||
}
|
|
||||||
|
|
||||||
if err = gateway.GatewayPortConfigs(configs); err != nil {
|
|
||||||
return core.NewServErr(fmt.Sprintf("清空代理 %s 端口配置失败", proxy.IP.String()), err)
|
|
||||||
}
|
|
||||||
return nil
|
return nil
|
||||||
|
// configs := make([]*g.PortConfigsReq, len(batch.Chans))
|
||||||
|
// for i, ch := range batch.Chans {
|
||||||
|
// configs[i] = &g.PortConfigsReq{
|
||||||
|
// Port: int(ch.Port()),
|
||||||
|
// Edge: &[]string{},
|
||||||
|
// AutoEdgeConfig: &g.AutoEdgeConfig{Count: u.P(0)},
|
||||||
|
// Status: false,
|
||||||
|
// }
|
||||||
|
// }
|
||||||
|
|
||||||
|
// proxy, err := q.Proxy.Where(q.Proxy.ID.Eq(batch.ProxyID)).Take()
|
||||||
|
// if err != nil {
|
||||||
|
// return core.NewServErr("获取代理数据失败", err)
|
||||||
|
// }
|
||||||
|
|
||||||
|
// gateway, err := proxyGateway(proxy)
|
||||||
|
// if err != nil {
|
||||||
|
// return core.NewServErr("创建代理网关失败", err)
|
||||||
|
// }
|
||||||
|
|
||||||
|
// if err = gateway.GatewayPortConfigs(configs); err != nil {
|
||||||
|
// return core.NewServErr(fmt.Sprintf("清空代理 %s 端口配置失败", proxy.IP.String()), err)
|
||||||
|
// }
|
||||||
|
// return nil
|
||||||
}
|
}
|
||||||
|
|
||||||
// ensureEdges 检查本地节点是否足够,如果不足从云端连入
|
// ensureEdges 检查本地节点是否足够,如果不足从云端连入
|
||||||
|
|||||||
@@ -56,9 +56,9 @@ func HandleRemoveChannel(_ context.Context, task *asynq.Task) (err error) {
|
|||||||
func HandleRefreshEdges(_ context.Context, task *asynq.Task) (err error) {
|
func HandleRefreshEdges(_ context.Context, task *asynq.Task) (err error) {
|
||||||
slog.Info("[event]刷新边缘节点")
|
slog.Info("[event]刷新边缘节点")
|
||||||
|
|
||||||
err = s.Channel.RefreshEdges()
|
// err = s.Channel.RefreshEdges()
|
||||||
if err != nil {
|
// if err != nil {
|
||||||
return fmt.Errorf("刷新边缘节点失败: %w", err)
|
// return fmt.Errorf("刷新边缘节点失败: %w", err)
|
||||||
}
|
// }
|
||||||
return nil
|
return nil
|
||||||
}
|
}
|
||||||
|
|||||||
27
web/web.go
27
web/web.go
@@ -42,10 +42,6 @@ func RunApp(pCtx context.Context) error {
|
|||||||
return RunTask(ctx)
|
return RunTask(ctx)
|
||||||
})
|
})
|
||||||
|
|
||||||
g.Go(func() error {
|
|
||||||
return RunCron(ctx)
|
|
||||||
})
|
|
||||||
|
|
||||||
return g.Wait()
|
return g.Wait()
|
||||||
}
|
}
|
||||||
|
|
||||||
@@ -111,29 +107,6 @@ func RunTask(ctx context.Context) error {
|
|||||||
return nil
|
return nil
|
||||||
}
|
}
|
||||||
|
|
||||||
func RunCron(ctx context.Context) error {
|
|
||||||
cron := asynq.NewSchedulerFromRedisClient(deps.Redis, &asynq.SchedulerOpts{
|
|
||||||
Logger: &AppAsynqLogger{},
|
|
||||||
Location: time.Local,
|
|
||||||
})
|
|
||||||
|
|
||||||
cron.Register("0/10 * * * *", events.NewRefreshEdge())
|
|
||||||
|
|
||||||
// 停止服务
|
|
||||||
go func() {
|
|
||||||
<-ctx.Done()
|
|
||||||
cron.Shutdown()
|
|
||||||
}()
|
|
||||||
|
|
||||||
// 启动服务
|
|
||||||
err := cron.Run()
|
|
||||||
if err != nil {
|
|
||||||
return fmt.Errorf("定时任务服务运行失败: %w", err)
|
|
||||||
}
|
|
||||||
|
|
||||||
return nil
|
|
||||||
}
|
|
||||||
|
|
||||||
type AppAsynqLogger struct{}
|
type AppAsynqLogger struct{}
|
||||||
|
|
||||||
func (l *AppAsynqLogger) Debug(args ...any) {
|
func (l *AppAsynqLogger) Debug(args ...any) {
|
||||||
|
|||||||
Reference in New Issue
Block a user