优化节点分配逻辑,使用 edge id 固定轮换顺位;日志与 journal 集成;完善命令执行后的资源清理

This commit is contained in:
2025-08-19 11:09:58 +08:00
parent b6731f8442
commit 0c32337168
10 changed files with 75 additions and 31 deletions

View File

@@ -1,7 +1,4 @@
## TODO ## TODO
update 操作:
- 如果网关是禁用状态,则清空配置(目前是只查询启用的节点
此实现目前并不是完全并发安全的: 此实现目前并不是完全并发安全的:
- 目前事务等级没有对 cityhash 表的 offset 字段做防丢失,并发操作可能会出问题 - 目前事务等级没有对 cityhash 表的 offset 字段做防丢失,并发操作可能会出问题

View File

@@ -37,7 +37,7 @@ func FindCitiesWithEdgesCount(tx *gorm.DB) ([]model.City, error) {
return cities, nil return cities, nil
} }
func AppendCityOffset(tx *gorm.DB, city int, offset int) error { func UpdateCityOffset(tx *gorm.DB, city int, offset int) error {
if offset < 0 { if offset < 0 {
return fmt.Errorf("offset must be non-negative") return fmt.Errorf("offset must be non-negative")
} }

View File

@@ -2,9 +2,10 @@ package actions
import ( import (
"fmt" "fmt"
"zzman/model"
"gorm.io/gorm" "gorm.io/gorm"
"gorm.io/gorm/clause" "gorm.io/gorm/clause"
"zzman/model"
) )
const batchSize = 1000 const batchSize = 1000
@@ -18,9 +19,9 @@ func FindEdgesByCity(tx *gorm.DB, cityId int) ([]model.Edge, error) {
return edges, nil return edges, nil
} }
func SliceActiveEdgesByCity(tx *gorm.DB, cityId int, offset int, limit int) ([]model.Edge, error) { func SliceActiveEdges(tx *gorm.DB, cityId int, afterEdgeId int, count int) ([]model.Edge, error) {
var edges []model.Edge var edges []model.Edge
err := tx.Limit(limit).Offset(offset).Find(&edges, "city_id = ? and active = 1", cityId).Error err := tx.Limit(count).Find(&edges, "id > ? and city_id = ? and active = 1", afterEdgeId, cityId).Error
if err != nil { if err != nil {
return nil, fmt.Errorf("failed to find edges with offset: %w", err) return nil, fmt.Errorf("failed to find edges with offset: %w", err)
} }
@@ -34,10 +35,7 @@ func SaveEdges(tx *gorm.DB, edges []model.Edge) error {
// 分批处理边缘设备数据 // 分批处理边缘设备数据
for i := 0; i < len(edges); i += batchSize { for i := 0; i < len(edges); i += batchSize {
end := i + batchSize end := min(i+batchSize, len(edges))
if end > len(edges) {
end = len(edges)
}
batch := edges[i:end] batch := edges[i:end]
err := tx.Clauses(clause.OnConflict{ err := tx.Clauses(clause.OnConflict{

View File

@@ -7,6 +7,8 @@ import (
"zzman/clients/jd" "zzman/clients/jd"
"zzman/model" "zzman/model"
u "zzman/util" u "zzman/util"
"gorm.io/gorm"
) )
type UpdateArgs struct { type UpdateArgs struct {
@@ -14,7 +16,6 @@ type UpdateArgs struct {
} }
func Update(args ...UpdateArgs) error { func Update(args ...UpdateArgs) error {
var tx = model.DB
var arg UpdateArgs var arg UpdateArgs
if len(args) > 0 { if len(args) > 0 {
arg = args[0] arg = args[0]
@@ -22,6 +23,12 @@ func Update(args ...UpdateArgs) error {
arg.Mock = false arg.Mock = false
} }
return model.DB.Transaction(func(tx *gorm.DB) error {
return update(tx, arg)
})
}
func update(tx *gorm.DB, arg UpdateArgs) error {
gateways, err := FindGateways(tx) gateways, err := FindGateways(tx)
if err != nil { if err != nil {
return fmt.Errorf("获取所有网关失败:%w", err) return fmt.Errorf("获取所有网关失败:%w", err)
@@ -56,7 +63,7 @@ func Update(args ...UpdateArgs) error {
// 按城市循环,对比并更新网关配置 // 按城市循环,对比并更新网关配置
for iCity, city := range cities { for iCity, city := range cities {
// 如果每个网关在此城市都有节点且无需改变,就不需要从云端拉取城市节点信息 // 如果每个网关在此城市都有节点且无需改变,就不需要重新分配节点
// 相当于直接重新提交配置,此流程下配置更新是幂等的 // 相当于直接重新提交配置,此流程下配置更新是幂等的
var gateways2Update []model.Gateway var gateways2Update []model.Gateway
for _, gateway := range gateways { for _, gateway := range gateways {
@@ -77,32 +84,29 @@ func Update(args ...UpdateArgs) error {
continue continue
} }
// 否则,正常从云端拉取城市节点信息 // 否则获取足量新节点
offset := city.Offset offset := city.Offset
count := len(gateways2Update) count := len(gateways2Update)
if count > city.EdgesCount { if count > city.EdgesCount {
slog.Warn("城市节点数量不足,跳过本次更新", "城市", city.Name, "节点数", city.EdgesCount, "网关数", count) slog.Warn("城市节点数量不足,跳过本次更新", "城市", city.Name, "节点数", city.EdgesCount, "网关数", count)
continue continue
} }
if offset+count > city.EdgesCount { edges, err := SliceActiveEdges(tx, city.Id, offset, count)
slog.Debug("城市节点不足,将循环使用节点", "城市", city.Name, "节点数", city.EdgesCount, "网关数", count)
offset = 0
}
edges, err := SliceActiveEdgesByCity(tx, city.Id, offset, count)
if err != nil { if err != nil {
return fmt.Errorf("查询城市 %s 可用节点失败:%w", city.Name, err) return fmt.Errorf("查询城市 %s 可用节点失败:%w", city.Name, err)
} }
if len(edges) < count { if len(edges) < count {
slog.Debug("城市节点不足,将循环使用节点", "城市", city.Name, "节点数", city.EdgesCount, "网关数", count) slog.Debug("城市节点不足,将循环使用节点", "城市", city.Name, "节点数", city.EdgesCount, "网关数", count)
edges, err = SliceActiveEdgesByCity(tx, city.Id, 0, count) edges, err = SliceActiveEdges(tx, city.Id, 0, count)
if err != nil { if err != nil {
return fmt.Errorf("查询城市 %s 可用节点失败:%w", city.Name, err) return fmt.Errorf("查询城市 %s 可用节点失败:%w", city.Name, err)
} }
offset = 0
} }
offset = edges[len(edges)-1].Id
// 更新网关配置 // 更新网关配置
var configs2Create []model.Config var configs2Create []model.Config
var configs2Update []model.ConfigUpdate var configs2Update []model.ConfigUpdate
@@ -113,7 +117,7 @@ func Update(args ...UpdateArgs) error {
newConfig := edges[iGateway] newConfig := edges[iGateway]
if exists { if exists {
fmt.Printf("\t网关 %s 变更节点: %s -> %s\n", gateway.Macaddr, oldConfig.Macaddr, newConfig.Macaddr) slog.Debug("网关配置变更", "网关", gateway.Macaddr, "旧节点", oldConfig.Macaddr, "新节点", newConfig.Macaddr)
configs2Update = append(configs2Update, model.ConfigUpdate{ configs2Update = append(configs2Update, model.ConfigUpdate{
Id: oldConfig.Id, Id: oldConfig.Id,
@@ -129,7 +133,7 @@ func Update(args ...UpdateArgs) error {
}, },
} }
} else { } else {
fmt.Printf("\t网关 %s 新增节点: %s\n", gateway.Macaddr, newConfig.Macaddr) slog.Debug("网关配置新增", "网关", gateway.Macaddr, "新节点", newConfig.Macaddr)
configs2Create = append(configs2Create, model.Config{ configs2Create = append(configs2Create, model.Config{
Cityhash: city.Hash, Cityhash: city.Hash,
@@ -160,7 +164,7 @@ func Update(args ...UpdateArgs) error {
if err != nil { if err != nil {
return fmt.Errorf("更新配置失败:%w", err) return fmt.Errorf("更新配置失败:%w", err)
} }
err = AppendCityOffset(tx, city.Id, offset+count) err = UpdateCityOffset(tx, city.Id, offset)
if err != nil { if err != nil {
return fmt.Errorf("更新城市 %s 的偏移量失败:%w", city.Name, err) return fmt.Errorf("更新城市 %s 的偏移量失败:%w", city.Name, err)
} }
@@ -180,11 +184,11 @@ func Update(args ...UpdateArgs) error {
setup++ setup++
} }
} }
fmt.Printf("提交网关 %s 配置: %d 变更, %d 新增\n", gateway.Macaddr, change, setup) slog.Info("提交网关配置", "网关", gateway.Macaddr, "变更", change, "新增", setup)
// 提交配置到云端:配置版本 gateway.ConfigVersion // 提交配置到云端:配置版本 gateway.ConfigVersion
if arg.Mock { if arg.Mock {
fmt.Printf("[MOCK] 配置网关 %s:\n%v\n", gateway.Macaddr, edges) slog.Info("[MOCK] 配置网关", "网关", gateway.Macaddr, "配置", edges)
} else { } else {
err := jd.GatewayConfigSet(gateway.ConfigVersion, gateway.Macaddr, edges) err := jd.GatewayConfigSet(gateway.ConfigVersion, gateway.Macaddr, edges)
if err != nil { if err != nil {

View File

@@ -1,9 +1,10 @@
package clients package clients
import ( import (
redis "github.com/redis/go-redis/v9"
"log/slog" "log/slog"
"os" "os"
redis "github.com/redis/go-redis/v9"
) )
var Redis *redis.Client var Redis *redis.Client
@@ -29,3 +30,12 @@ func InitRedis() {
Password: password, Password: password,
}) })
} }
func CloseRedis() {
if Redis != nil {
err := Redis.Close()
if err != nil {
slog.Error("关闭 Redis 连接失败", "error", err)
}
}
}

View File

@@ -1 +1,3 @@
$env:GOOS="linux"; $env:GOARCH="amd64"; $env:CGO_ENABLED=0; go build -o dist/zz main.go $env:GOOS="linux"; $env:GOARCH="amd64"; $env:CGO_ENABLED=0; go build -o dist/zz main.go
scp ./dist/zz wyk@43.226.58.254:~/zzcli

2
go.mod
View File

@@ -16,5 +16,7 @@ require (
github.com/go-sql-driver/mysql v1.9.3 // indirect github.com/go-sql-driver/mysql v1.9.3 // indirect
github.com/jinzhu/inflection v1.0.0 // indirect github.com/jinzhu/inflection v1.0.0 // indirect
github.com/jinzhu/now v1.1.5 // indirect github.com/jinzhu/now v1.1.5 // indirect
github.com/systemd/slog-journal v0.1.0 // indirect
golang.org/x/sys v0.29.0 // indirect
golang.org/x/text v0.27.0 // indirect golang.org/x/text v0.27.0 // indirect
) )

4
go.sum
View File

@@ -18,6 +18,10 @@ github.com/joho/godotenv v1.5.1 h1:7eLL/+HRGLY0ldzfGMeQkb7vMd0as4CfYvUVzLqw0N0=
github.com/joho/godotenv v1.5.1/go.mod h1:f4LDr5Voq0i2e/R5DDNOoa2zzDfwtkZa6DnEwAbqwq4= github.com/joho/godotenv v1.5.1/go.mod h1:f4LDr5Voq0i2e/R5DDNOoa2zzDfwtkZa6DnEwAbqwq4=
github.com/redis/go-redis/v9 v9.11.0 h1:E3S08Gl/nJNn5vkxd2i78wZxWAPNZgUNTp8WIJUAiIs= github.com/redis/go-redis/v9 v9.11.0 h1:E3S08Gl/nJNn5vkxd2i78wZxWAPNZgUNTp8WIJUAiIs=
github.com/redis/go-redis/v9 v9.11.0/go.mod h1:huWgSWd8mW6+m0VPhJjSSQ+d6Nh1VICQ6Q5lHuCH/Iw= github.com/redis/go-redis/v9 v9.11.0/go.mod h1:huWgSWd8mW6+m0VPhJjSSQ+d6Nh1VICQ6Q5lHuCH/Iw=
github.com/systemd/slog-journal v0.1.0 h1:/SdZsEp21ZzFWS3w6rcmiFCkaVKX3dt8W8mwcbNPdrM=
github.com/systemd/slog-journal v0.1.0/go.mod h1:RroeO1jghpRimGgddhGx+TS/CmRJdPjK9g2ZeYLr9P8=
golang.org/x/sys v0.29.0 h1:TPYlXGxvx1MGTn2GiZDhnjPA9wZzZeGKHHmKhHYvgaU=
golang.org/x/sys v0.29.0/go.mod h1:/VUhepiaJMQUp4+oa/7Zr1D23ma6VTLIYjOOTFZPUcA=
golang.org/x/text v0.27.0 h1:4fGWRpyh641NLlecmyl4LOe6yDdfaYNrGb2zdfo4JV4= golang.org/x/text v0.27.0 h1:4fGWRpyh641NLlecmyl4LOe6yDdfaYNrGb2zdfo4JV4=
golang.org/x/text v0.27.0/go.mod h1:1D28KMCvyooCX9hBiosv5Tz/+YLxj0j7XhWjpSUF7CU= golang.org/x/text v0.27.0/go.mod h1:1D28KMCvyooCX9hBiosv5Tz/+YLxj0j7XhWjpSUF7CU=
gorm.io/driver/mysql v1.6.0 h1:eNbLmNTpPpTOVZi8MMxCi2aaIm0ZpInbORNXDwyLGvg= gorm.io/driver/mysql v1.6.0 h1:eNbLmNTpPpTOVZi8MMxCi2aaIm0ZpInbORNXDwyLGvg=

16
main.go
View File

@@ -6,11 +6,13 @@ import (
"os" "os"
"path/filepath" "path/filepath"
"slices" "slices"
"strings"
"zzman/actions" "zzman/actions"
"zzman/clients" "zzman/clients"
"zzman/model" "zzman/model"
"github.com/joho/godotenv" "github.com/joho/godotenv"
slogjournal "github.com/systemd/slog-journal"
) )
func main() { func main() {
@@ -29,8 +31,22 @@ func main() {
slog.Error(fmt.Errorf("初始化变量失败:%w", err).Error()) slog.Error(fmt.Errorf("初始化变量失败:%w", err).Error())
} }
// 初始化日志
handler, err := slogjournal.NewHandler(&slogjournal.Options{
ReplaceAttr: func(groups []string, a slog.Attr) slog.Attr {
a.Key = strings.ToUpper(a.Key)
a.Key = strings.ReplaceAll(a.Key, "-", "_")
return a
},
})
slog.SetDefault(slog.New(handler))
// 初始化数据库和 Redis
model.Init() model.Init()
defer model.Close()
clients.InitRedis() clients.InitRedis()
defer clients.CloseRedis()
// 执行命令 // 执行命令
if len(os.Args) < 2 { if len(os.Args) < 2 {

View File

@@ -2,10 +2,11 @@ package model
import ( import (
"fmt" "fmt"
"gorm.io/driver/mysql"
"gorm.io/gorm"
"os" "os"
"strconv" "strconv"
"gorm.io/driver/mysql"
"gorm.io/gorm"
) )
var DB *gorm.DB var DB *gorm.DB
@@ -53,3 +54,13 @@ func Init() {
panic(err) panic(err)
} }
} }
func Close() {
sqlDB, err := DB.DB()
if err != nil {
panic(fmt.Sprintf("Failed to get database instance: %s", err))
}
if err := sqlDB.Close(); err != nil {
panic(fmt.Sprintf("Failed to close database connection: %s", err))
}
}