修改节点分配方式

This commit is contained in:
2025-09-10 18:26:59 +08:00
parent 7e7d706f84
commit 1900f5e82d
13 changed files with 191 additions and 239 deletions

5
.gitignore vendored
View File

@@ -27,4 +27,7 @@ go.work.sum
.env
# editor file
.idea/
.idea/
cmd/
.vscode/

View File

@@ -1,4 +1,45 @@
## TODO
此实现目前并不是完全并发安全的:
- 目前事务等级没有对 cityhash 表的 offset 字段做防丢失,并发操作可能会出问题
- 目前事务等级没有对 cityhash 表的 offset 字段做防丢失,并发操作可能会出问题
## 模块逻辑
**更新网关配置**
```
获取所有网关
获取所有配置
构建配置表 `map[网关][配置]`
循环配置:
如果配置项需要更新,记录更新信息
循环更新信息(加速数据库查询)
上传配置到云端
```
**同步节点信息(全量更新)**
```
获取所有启用城市
循环城市:
获取云端节点信息
获取本地节点信息
更新列表 = 空
删除列表 = 本地节点
循环云端节点:
更新列表 <= 云端节点
删除列表 x= 本地节点(同云端 macaddr
更新并(软)删除节点信息
```
### 统一节点调度
节点上下线:
提供一个接口用来为节点加解锁

View File

@@ -6,12 +6,12 @@ import (
"gorm.io/gorm"
)
func RecordChangeEdges(tx *gorm.DB, changes []model.ChangeEdge) error {
func RecordChanges(tx *gorm.DB, changes []model.Change) error {
if len(changes) == 0 {
return nil
}
batchSize := 1000
batchSize := 500
for i := 0; i < len(changes); i += batchSize {
end := min(i+batchSize, len(changes))
batch := changes[i:end]
@@ -22,7 +22,3 @@ func RecordChangeEdges(tx *gorm.DB, changes []model.ChangeEdge) error {
}
return nil
}
func RecordChangeCity(tx *gorm.DB, change model.ChangeCity) error {
return tx.Create(&change).Error
}

View File

@@ -37,16 +37,18 @@ func FindCitiesWithEdgesCount(tx *gorm.DB) ([]model.City, error) {
return cities, nil
}
func UpdateCityOffset(tx *gorm.DB, city int, offset int) error {
if offset < 0 {
return fmt.Errorf("offset must be non-negative")
func UpdateCitiesOffset(tx *gorm.DB, updates []model.City) error {
if len(updates) == 0 {
return nil
}
err := tx.Model(&model.City{}).
Where("id = ?", city).
Update("offset", offset).Error
if err != nil {
return fmt.Errorf("failed to update cities offset: %w", err)
for _, city := range updates {
err := tx.Model(new(model.City)).
Where("id = ?", city.Id).
Update("offset", city.Offset).Error
if err != nil {
return fmt.Errorf("failed to update city id %d offset: %w", city.Id, err)
}
}
return nil

View File

@@ -6,24 +6,15 @@ import (
"gorm.io/gorm"
)
func FindConfigsByGateway(tx *gorm.DB, macaddr string) ([]model.Config, error) {
func FindConfigs(tx *gorm.DB) ([]model.Config, error) {
var configs []model.Config
err := tx.Find(&configs, "macaddr = ?", macaddr).Error
err := tx.Find(&configs).Error
if err != nil {
return nil, err
}
return configs, nil
}
func CreateConfigs(tx *gorm.DB, configs []model.Config) error {
if len(configs) == 0 {
return nil
}
// 使用事务批量插入配置
return tx.Omit("createtime", "updatetime").Create(&configs).Error
}
func UpdateConfigs(tx *gorm.DB, configs []model.ConfigUpdate) error {
if len(configs) == 0 {
return nil
@@ -31,7 +22,10 @@ func UpdateConfigs(tx *gorm.DB, configs []model.ConfigUpdate) error {
// 批量更新配置
for _, config := range configs {
err := tx.Updates(&config).Error
err := tx.
Where("id = ?", config.Id).
Updates(&config).
Error
if err != nil {
return err
}

View File

@@ -32,21 +32,6 @@ func SaveEdges(tx *gorm.DB, edges []model.Edge) error {
return nil
}
// 分批处理边缘设备数据
// for i := 0; i < len(edges); i += batchSize {
// end := min(i+batchSize, len(edges))
// batch := edges[i:end]
// err := tx.Clauses(clause.OnConflict{
// Columns: []clause.Column{{Name: "macaddr"}},
// UpdateAll: true,
// }).Create(&batch).Error
// if err != nil {
// return fmt.Errorf("failed to save edges batch %d-%d: %w", i, end-1, err)
// }
// }
// 手动区分需要创建和更新的节点,以避免 on conflict 更新的 id 占用问题
macAddrs := make([]string, len(edges))
for i, e := range edges {

View File

@@ -3,7 +3,6 @@ package actions
import (
"fmt"
"log/slog"
"strconv"
"time"
"zzman/clients/jd"
"zzman/model"
@@ -31,65 +30,75 @@ func Update(args ...UpdateArgs) error {
func update(tx *gorm.DB, arg UpdateArgs) error {
var now = time.Now()
var step = now
// 获取所有网关
gateways, err := FindGateways(tx)
if err != nil {
return fmt.Errorf("获取所有网关失败:%w", err)
}
var findGateway = make(map[string]model.Gateway)
for _, gateway := range gateways {
findGateway[gateway.Macaddr] = gateway
}
println(fmt.Sprintf("获取网关:%v", time.Since(step)))
step = time.Now()
// 获取所有城市
cities, err := FindCitiesWithEdgesCount(tx)
if err != nil {
return fmt.Errorf("获取所有城市失败:%w", err)
}
// 准备网关查找表,初始化网关配置表
type ConfigInfo struct {
Item jd.EdgeInfo
Type int // 0: 不变, 1: 更新, 2: 新增
var findCity = make(map[string]model.City)
for _, city := range cities {
findCity[city.Hash] = city
}
var oldConfigsMap = make(map[model.Gateway]map[string]model.Config)
println(fmt.Sprintf("获取城市:%v", time.Since(step)))
step = time.Now()
// 获取所有配置
configs, err := FindConfigs(tx)
if err != nil {
return fmt.Errorf("获取所有配置失败:%w", err)
}
var findConfig = make(map[string]model.Config)
for _, config := range configs {
findConfig[config.Cityhash+":"+config.GatewayMac] = config
}
println(fmt.Sprintf("获取配置:%v", time.Since(step)))
step = time.Now()
// 更新网关配置
var newConfigs = make(map[model.Gateway][]ConfigInfo)
for _, gateway := range gateways {
newConfigs[gateway] = make([]ConfigInfo, len(cities))
var changes []model.Change
oldConfigs, err := FindConfigsByGateway(tx, gateway.Macaddr)
if err != nil {
return fmt.Errorf("获取网关 %s 城市节点失败:%w", gateway.Macaddr, err)
}
for _, city := range cities {
oldConfigsMap[gateway] = make(map[string]model.Config)
for _, config := range oldConfigs {
oldConfigsMap[gateway][config.Cityhash] = config
}
}
// 按城市循环,对比并更新网关配置
for iCity, city := range cities {
// 如果每个网关在此城市都有节点且无需改变,就不需要重新分配节点
// 相当于直接重新提交配置,此流程下配置更新是幂等的
var gateways2Change []model.Gateway
// 先处理不变更的节点
configsUpdate := make([]model.Config, 0)
for _, gateway := range gateways {
oldConfig, exists := oldConfigsMap[gateway][city.Hash]
oldConfig, exists := findConfig[city.Hash+":"+gateway.Macaddr]
if exists && oldConfig.IsChange != 1 {
newConfigs[gateway][iCity] = ConfigInfo{
Type: 0, // 不变
Item: jd.EdgeInfo{
newConfigs[gateway] = append(newConfigs[gateway], ConfigInfo{
Change: false,
Remote: jd.EdgeInfo{
Mac: oldConfig.Macaddr,
City: oldConfig.Cityhash,
},
}
})
} else {
gateways2Change = append(gateways2Change, gateway)
configsUpdate = append(configsUpdate, oldConfig)
}
}
count := len(gateways2Change)
if len(gateways2Change) == 0 {
count := len(configsUpdate)
if len(configsUpdate) == 0 {
continue
}
// 否则获取足量新节点
// 如果有需要变更的节点,获取足量新节点
offset := city.Offset
if count > city.EdgesCount {
slog.Warn(fmt.Sprintf("城市节点数量不足,跳过本次更新,城市:%s节点数%d网关数%d", city.Name, city.EdgesCount, count))
@@ -108,118 +117,87 @@ func update(tx *gorm.DB, arg UpdateArgs) error {
}
}
offset = edges[len(edges)-1].Id
city.Offset = edges[len(edges)-1].Id
// 更新网关配置
var configs2Create []model.Config
var configs2Update []model.ConfigUpdate
var changes []model.ChangeEdge
// 分配新节点
for i, oldConfig := range configsUpdate {
for iGateway, gateway := range gateways2Change {
gateway := findGateway[oldConfig.GatewayMac]
edge := edges[i]
oldConfig, exists := oldConfigsMap[gateway][city.Hash]
newConfig := edges[iGateway]
slog.Debug(fmt.Sprintf("网关配置变更,网关:%s旧节点%s新节点%s", gateway.Macaddr, oldConfig.Macaddr, edge.Macaddr))
if exists {
slog.Debug(fmt.Sprintf("网关配置变更,网关:%s旧节点%s新节点%s", gateway.Macaddr, oldConfig.Macaddr, newConfig.Macaddr))
configs2Update = append(configs2Update, model.ConfigUpdate{
newConfigs[gateway] = append(newConfigs[gateway], ConfigInfo{
Change: true,
Remote: jd.EdgeInfo{
City: city.Hash,
Mac: edge.Macaddr,
},
Config: model.ConfigUpdate{
Id: oldConfig.Id,
Macaddr: u.P(oldConfig.Macaddr),
Macaddr: u.P(edge.Macaddr),
IsChange: u.P(0),
})
changes = append(changes, model.ChangeEdge{
Time: now,
CityId: city.Id,
Gateway: gateway.Macaddr,
OldEdge: oldConfig.Macaddr,
NewEdge: newConfig.Macaddr,
})
newConfigs[gateway][iCity] = ConfigInfo{
Type: 1, // 更新
Item: jd.EdgeInfo{
Mac: oldConfig.Macaddr,
City: city.Hash,
},
}
} else {
slog.Debug(fmt.Sprintf("网关配置新增,网关:%s新节点%s", gateway.Macaddr, newConfig.Macaddr))
configs2Create = append(configs2Create, model.Config{
Cityhash: city.Hash,
CityLabel: city.Label,
GatewayMac: gateway.Macaddr,
Macaddr: newConfig.Macaddr,
Table: strconv.Itoa(iCity + 1),
User: fmt.Sprintf("jdzz%ddt%d", gateway.Id, iCity+1),
Network: fmt.Sprintf("172.30.168.%d", iCity+2),
InnerIp: fmt.Sprintf("172.16.%d.%d", gateway.Id, iCity+2),
})
changes = append(changes, model.ChangeEdge{
Time: now,
CityId: city.Id,
Gateway: gateway.Macaddr,
OldEdge: "",
NewEdge: newConfig.Macaddr,
})
newConfigs[gateway][iCity] = ConfigInfo{
Type: 2, // 新增
Item: jd.EdgeInfo{
Mac: oldConfig.Macaddr,
City: city.Hash,
},
}
}
}
err = CreateConfigs(tx, configs2Create)
if err != nil {
return fmt.Errorf("创建新配置失败:%w", err)
}
err = UpdateConfigs(tx, configs2Update)
if err != nil {
return fmt.Errorf("更新配置失败:%w", err)
}
err = UpdateCityOffset(tx, city.Id, offset)
if err != nil {
return fmt.Errorf("更新城市 %s 的偏移量失败:%w", city.Name, err)
}
err = RecordChangeCity(tx, model.ChangeCity{
Time: now,
CityId: city.Id,
Count: count,
OffsetOld: city.Offset,
OffsetNew: offset,
})
if err != nil {
return fmt.Errorf("记录城市变更失败:%w", err)
}
err = RecordChangeEdges(tx, changes)
if err != nil {
return fmt.Errorf("记录节点变更失败:%w", err)
IsOnline: u.P(1),
},
})
changes = append(changes, model.Change{
Time: now,
CityId: city.Id,
Gateway: gateway.Macaddr,
OldEdge: oldConfig.Macaddr,
NewEdge: edge.Macaddr,
})
}
}
println(fmt.Sprintf("循环分配:%v", time.Since(step)))
step = time.Now()
// 更新城市偏移量
err = UpdateCitiesOffset(tx, cities)
if err != nil {
return fmt.Errorf("更新城市偏移量失败:%w", err)
}
println(fmt.Sprintf("更新城市偏移量:%v", time.Since(step)))
step = time.Now()
// 记录节点变更
err = RecordChanges(tx, changes)
if err != nil {
return fmt.Errorf("记录节点变更失败:%w", err)
}
println(fmt.Sprintf("记录节点变更:%v", time.Since(step)))
step = time.Now()
// 提交所有网关配置到云端
for gateway, infos := range newConfigs {
change := 0
setup := 0
edges := make([]jd.EdgeInfo, len(infos))
configsUpdate := make([]model.ConfigUpdate, 0)
// 统计变更数
change := 0
for i, info := range infos {
edges[i] = info.Item
switch info.Type {
case 1:
edges[i] = info.Remote
if info.Change {
configsUpdate = append(configsUpdate, info.Config)
change++
case 2:
setup++
}
}
slog.Info(fmt.Sprintf("提交网关配置,网关:%s变更数%d,新增数:%d", gateway.Macaddr, change, setup))
slog.Info(fmt.Sprintf("提交网关配置,网关:%s变更数%d", gateway.Macaddr, change))
// 更新配置版本
err = AppendGatewayConfigVersion(tx, gateway.Id)
if err != nil {
return fmt.Errorf("更新网关 %s 配置版本失败:%w", gateway.Macaddr, err)
}
// 更新本地配置
err = UpdateConfigs(tx, configsUpdate)
if err != nil {
return fmt.Errorf("更新网关 %s 本地配置失败:%w", gateway.Macaddr, err)
}
// 提交配置到云端:配置版本 gateway.ConfigVersion
if !arg.Mock {
@@ -228,13 +206,16 @@ func update(tx *gorm.DB, arg UpdateArgs) error {
return fmt.Errorf("配置网关 %s 失败:%w", gateway.Macaddr, err)
}
}
// 更新配置版本
err = AppendGatewayConfigVersion(tx, gateway.Id)
if err != nil {
return fmt.Errorf("更新网关 %s 配置版本失败:%w", gateway.Macaddr, err)
}
}
println(fmt.Sprintf("提交配置:%v", time.Since(step)))
println(fmt.Sprintf("总耗时:%v", time.Since(now)))
return nil
}
type ConfigInfo struct {
Change bool
Remote jd.EdgeInfo
Config model.ConfigUpdate
}

View File

@@ -4,7 +4,7 @@ import (
"log/slog"
"os"
redis "github.com/redis/go-redis/v9"
"github.com/redis/go-redis/v9"
)
var Redis *redis.Client

View File

@@ -1,34 +0,0 @@
package main
import (
"fmt"
"log/slog"
"zzman/actions"
"zzman/clients"
"zzman/model"
"github.com/joho/godotenv"
)
func main() {
// 初始化环境
slog.Debug("初始化环境变量")
err := godotenv.Load()
if err != nil {
slog.Error(fmt.Errorf("初始化变量失败:%w", err).Error())
}
// 初始化数据库和 Redis
model.Init()
defer model.Close()
clients.InitRedis()
defer clients.CloseRedis()
// 测试功能
actions.Update(actions.UpdateArgs{
Mock: true,
})
// actions.Sync()
}

View File

@@ -5,19 +5,19 @@ services:
redis:
image: redis:7
environment:
REDIS_PASSWORD: 123456
REDIS_PASSWORD: ${REDIS_PASSWORD}
ports:
- "6379:6379"
- "${REDIS_PORT}:6379"
volumes:
- redis_data:/data
mariadb:
image: mariadb:10
environment:
MYSQL_ROOT_PASSWORD: "hljuip916..."
MYSQL_DATABASE: jdbox
MYSQL_ROOT_PASSWORD: ${MYSQL_PASSWORD}
MYSQL_DATABASE: ${MYSQL_DATABASE}
ports:
- "3306:3306"
- "${MYSQL_PORT}:3306"
volumes:
- mariadb_data:/var/lib/mysql

2
go.mod
View File

@@ -5,6 +5,7 @@ go 1.24.0
require (
github.com/joho/godotenv v1.5.1
github.com/redis/go-redis/v9 v9.11.0
github.com/systemd/slog-journal v0.1.0
gorm.io/driver/mysql v1.6.0
gorm.io/gorm v1.30.1
)
@@ -16,7 +17,6 @@ require (
github.com/go-sql-driver/mysql v1.9.3 // indirect
github.com/jinzhu/inflection v1.0.0 // indirect
github.com/jinzhu/now v1.1.5 // indirect
github.com/systemd/slog-journal v0.1.0 // indirect
golang.org/x/sys v0.29.0 // indirect
golang.org/x/text v0.27.0 // indirect
)

View File

@@ -1,16 +0,0 @@
package model
import "time"
type ChangeCity struct {
Id int `gorm:"column:id;primaryKey"`
Time time.Time `gorm:"column:time"`
CityId int `gorm:"column:city_id"`
Count int `gorm:"column:count"`
OffsetOld int `gorm:"column:offset_old"`
OffsetNew int `gorm:"column:offset_new"`
}
func (ChangeCity) TableName() string {
return "change_city"
}

View File

@@ -2,7 +2,7 @@ package model
import "time"
type ChangeEdge struct {
type Change struct {
Id int `gorm:"column:id;primaryKey"`
Time time.Time `gorm:"column:time"`
CityId int `gorm:"column:city"`
@@ -13,6 +13,6 @@ type ChangeEdge struct {
Info string `gorm:"column:info"`
}
func (ChangeEdge) TableName() string {
func (Change) TableName() string {
return "change"
}