Files
jh-zz/actions/update.go

241 lines
6.3 KiB
Go
Raw Blame History

This file contains ambiguous Unicode characters
This file contains Unicode characters that might be confused with other characters. If you think that this is intentional, you can safely ignore this warning. Use the Escape button to reveal them.
package actions
import (
"fmt"
"log/slog"
"strconv"
"time"
"zzman/clients/jd"
"zzman/model"
u "zzman/util"
"gorm.io/gorm"
)
type UpdateArgs struct {
Mock bool
}
func Update(args ...UpdateArgs) error {
var arg UpdateArgs
if len(args) > 0 {
arg = args[0]
} else {
arg.Mock = false
}
return model.DB.Transaction(func(tx *gorm.DB) error {
return update(tx, arg)
})
}
func update(tx *gorm.DB, arg UpdateArgs) error {
var now = time.Now()
gateways, err := FindGateways(tx)
if err != nil {
return fmt.Errorf("获取所有网关失败:%w", err)
}
cities, err := FindCitiesWithEdgesCount(tx)
if err != nil {
return fmt.Errorf("获取所有城市失败:%w", err)
}
// 准备网关查找表,初始化网关配置表
type ConfigInfo struct {
Item jd.EdgeInfo
Type int // 0: 不变, 1: 更新, 2: 新增
}
var oldConfigsMap = make(map[model.Gateway]map[string]model.Config)
var newConfigs = make(map[model.Gateway][]ConfigInfo)
for _, gateway := range gateways {
newConfigs[gateway] = make([]ConfigInfo, len(cities))
oldConfigs, err := FindConfigsByGateway(tx, gateway.Macaddr)
if err != nil {
return fmt.Errorf("获取网关 %s 城市节点失败:%w", gateway.Macaddr, err)
}
oldConfigsMap[gateway] = make(map[string]model.Config)
for _, config := range oldConfigs {
oldConfigsMap[gateway][config.Cityhash] = config
}
}
// 按城市循环,对比并更新网关配置
for iCity, city := range cities {
// 如果每个网关在此城市都有节点且无需改变,就不需要重新分配节点
// 相当于直接重新提交配置,此流程下配置更新是幂等的
var gateways2Change []model.Gateway
for _, gateway := range gateways {
oldConfig, exists := oldConfigsMap[gateway][city.Hash]
if exists && oldConfig.IsChange != 1 {
newConfigs[gateway][iCity] = ConfigInfo{
Type: 0, // 不变
Item: jd.EdgeInfo{
Mac: oldConfig.Macaddr,
City: oldConfig.Cityhash,
},
}
} else {
gateways2Change = append(gateways2Change, gateway)
}
}
count := len(gateways2Change)
if len(gateways2Change) == 0 {
continue
}
// 否则获取足量新节点
offset := city.Offset
if count > city.EdgesCount {
slog.Warn(fmt.Sprintf("城市节点数量不足,跳过本次更新,城市:%s节点数%d网关数%d", city.Name, city.EdgesCount, count))
continue
}
edges, err := SliceActiveEdges(tx, city.Id, offset, count)
if err != nil {
return fmt.Errorf("查询城市 %s 可用节点失败:%w", city.Name, err)
}
if len(edges) < count {
slog.Debug(fmt.Sprintf("城市节点不足,将循环使用节点,城市:%s节点数%d网关数%d", city.Name, city.EdgesCount, count))
edges, err = SliceActiveEdges(tx, city.Id, 0, count)
if err != nil {
return fmt.Errorf("查询城市 %s 可用节点失败:%w", city.Name, err)
}
}
offset = edges[len(edges)-1].Id
// 更新网关配置
var configs2Create []model.Config
var configs2Update []model.ConfigUpdate
var changes []model.ChangeEdge
for iGateway, gateway := range gateways2Change {
oldConfig, exists := oldConfigsMap[gateway][city.Hash]
newConfig := edges[iGateway]
if exists {
slog.Debug(fmt.Sprintf("网关配置变更,网关:%s旧节点%s新节点%s", gateway.Macaddr, oldConfig.Macaddr, newConfig.Macaddr))
configs2Update = append(configs2Update, model.ConfigUpdate{
Id: oldConfig.Id,
Macaddr: u.P(oldConfig.Macaddr),
IsChange: u.P(0),
})
changes = append(changes, model.ChangeEdge{
Time: now,
CityId: city.Id,
Gateway: gateway.Macaddr,
OldEdge: oldConfig.Macaddr,
NewEdge: newConfig.Macaddr,
})
newConfigs[gateway][iCity] = ConfigInfo{
Type: 1, // 更新
Item: jd.EdgeInfo{
Mac: oldConfig.Macaddr,
City: city.Hash,
},
}
} else {
slog.Debug(fmt.Sprintf("网关配置新增,网关:%s新节点%s", gateway.Macaddr, newConfig.Macaddr))
configs2Create = append(configs2Create, model.Config{
Cityhash: city.Hash,
CityLabel: city.Label,
GatewayMac: gateway.Macaddr,
Macaddr: newConfig.Macaddr,
Table: strconv.Itoa(iCity + 1),
User: fmt.Sprintf("jdzz%ddt%d", gateway.Id, iCity+1),
Network: fmt.Sprintf("172.30.168.%d", iCity+2),
InnerIp: fmt.Sprintf("172.16.%d.%d", gateway.Id, iCity+2),
})
changes = append(changes, model.ChangeEdge{
Time: now,
CityId: city.Id,
Gateway: gateway.Macaddr,
OldEdge: "",
NewEdge: newConfig.Macaddr,
})
newConfigs[gateway][iCity] = ConfigInfo{
Type: 2, // 新增
Item: jd.EdgeInfo{
Mac: oldConfig.Macaddr,
City: city.Hash,
},
}
}
}
err = CreateConfigs(tx, configs2Create)
if err != nil {
return fmt.Errorf("创建新配置失败:%w", err)
}
err = UpdateConfigs(tx, configs2Update)
if err != nil {
return fmt.Errorf("更新配置失败:%w", err)
}
err = UpdateCityOffset(tx, city.Id, offset)
if err != nil {
return fmt.Errorf("更新城市 %s 的偏移量失败:%w", city.Name, err)
}
err = RecordChangeCity(tx, model.ChangeCity{
Time: now,
CityId: city.Id,
Count: count,
OffsetOld: city.Offset,
OffsetNew: offset,
})
if err != nil {
return fmt.Errorf("记录城市变更失败:%w", err)
}
err = RecordChangeEdges(tx, changes)
if err != nil {
return fmt.Errorf("记录节点变更失败:%w", err)
}
}
// 提交所有网关配置到云端
for gateway, infos := range newConfigs {
change := 0
setup := 0
edges := make([]jd.EdgeInfo, len(infos))
for i, info := range infos {
edges[i] = info.Item
switch info.Type {
case 1:
change++
case 2:
setup++
}
}
slog.Info(fmt.Sprintf("提交网关配置,网关:%s变更数%d新增数%d", gateway.Macaddr, change, setup))
// 提交配置到云端:配置版本 gateway.ConfigVersion
if !arg.Mock {
err := jd.GatewayConfigSet(gateway.ConfigVersion, gateway.Macaddr, edges)
if err != nil {
return fmt.Errorf("配置网关 %s 失败:%w", gateway.Macaddr, err)
}
}
// 更新配置版本
err = AppendGatewayConfigVersion(tx, gateway.Id)
if err != nil {
return fmt.Errorf("更新网关 %s 配置版本失败:%w", gateway.Macaddr, err)
}
}
return nil
}