Files
jh-zz/actions/update.go

224 lines
5.7 KiB
Go
Raw Blame History

This file contains ambiguous Unicode characters
This file contains Unicode characters that might be confused with other characters. If you think that this is intentional, you can safely ignore this warning. Use the Escape button to reveal them.
package actions
import (
"fmt"
"log/slog"
"time"
"zzman/clients/jd"
"zzman/model"
u "zzman/util"
"gorm.io/gorm"
)
type UpdateArgs struct {
Mock bool
}
func Update(args ...UpdateArgs) error {
var arg UpdateArgs
if len(args) > 0 {
arg = args[0]
} else {
arg.Mock = false
}
return model.DB.Transaction(func(tx *gorm.DB) error {
return update(tx, arg)
})
}
func update(tx *gorm.DB, arg UpdateArgs) error {
var now = time.Now()
var step = now
// 获取所有网关
gateways, err := FindGateways(tx)
if err != nil {
return fmt.Errorf("获取所有网关失败:%w", err)
}
var findGateway = make(map[string]model.Gateway)
for _, gateway := range gateways {
findGateway[gateway.Macaddr] = gateway
}
println(fmt.Sprintf("获取网关:%v", time.Since(step)))
step = time.Now()
// 获取所有城市
cities, err := FindCitiesWithEdgesCount(tx)
if err != nil {
return fmt.Errorf("获取所有城市失败:%w", err)
}
var findCity = make(map[string]model.City)
for _, city := range cities {
findCity[city.Hash] = city
}
println(fmt.Sprintf("获取城市:%v", time.Since(step)))
step = time.Now()
// 获取所有配置
configs, err := FindConfigs(tx)
if err != nil {
return fmt.Errorf("获取所有配置失败:%w", err)
}
println(fmt.Sprintf("获取配置:%v", time.Since(step)))
step = time.Now()
// 查找需要更新的配置
oldConfigs := make(map[model.City][]model.Config)
newConfigs := make(map[model.Gateway][]ConfigInfo)
for _, config := range configs {
city := findCity[config.Cityhash]
gateway := findGateway[config.GatewayMac]
if config.IsChange == 1 {
oldConfigs[city] = append(oldConfigs[city], config)
} else {
newConfigs[gateway] = append(newConfigs[gateway], ConfigInfo{
Change: false,
Remote: jd.EdgeInfo{
Mac: config.EdgeMac,
City: config.Cityhash,
Network: config.Network,
},
})
}
}
println(fmt.Sprintf("查找更新:%v", time.Since(step)))
step = time.Now()
// 更新网关配置
var citiesUpdate []model.City
var changesCreate []model.Change
for city, configs := range oldConfigs {
// 如果有需要变更的节点,获取足量新节点
count := len(configs)
if count > city.EdgesCount {
slog.Warn(fmt.Sprintf("城市节点数量不足,跳过本次更新,城市:%s节点数%d网关数%d", city.Name, city.EdgesCount, count))
continue
}
edges, err := SliceActiveEdges(tx, city.Id, city.Offset, count)
if err != nil {
return fmt.Errorf("查询城市 %s 可用节点失败:%w", city.Name, err)
}
if len(edges) < count {
slog.Debug(fmt.Sprintf("城市节点不足,将循环使用节点,城市:%s节点数%d网关数%d", city.Name, city.EdgesCount, count))
edges, err = SliceActiveEdges(tx, city.Id, 0, count)
if err != nil {
return fmt.Errorf("查询城市 %s 可用节点失败:%w", city.Name, err)
}
}
citiesUpdate = append(citiesUpdate, model.City{
Id: city.Id,
Offset: edges[len(edges)-1].Id,
})
// 分配新节点
for i, config := range configs {
gateway := findGateway[config.GatewayMac]
edge := edges[i]
slog.Debug(fmt.Sprintf("网关配置变更,网关:%s旧节点%s新节点%s", gateway.Macaddr, config.EdgeMac, edge.Macaddr))
newConfigs[gateway] = append(newConfigs[gateway], ConfigInfo{
Change: true,
Remote: jd.EdgeInfo{
Mac: edge.Macaddr,
City: city.Hash,
Network: config.Network,
},
Config: model.ConfigUpdate{
Id: config.Id,
EdgeMac: u.P(edge.Macaddr),
IsChange: u.P(0),
IsOnline: u.P(1),
},
})
changesCreate = append(changesCreate, model.Change{
Time: now,
CityId: city.Id,
Gateway: gateway.Macaddr,
OldEdge: config.EdgeMac,
NewEdge: edge.Macaddr,
})
}
}
println(fmt.Sprintf("更新配置:%v", time.Since(step)))
step = time.Now()
// 更新城市偏移量
err = UpdateCitiesOffset(tx, citiesUpdate)
if err != nil {
return fmt.Errorf("更新城市偏移量失败:%w", err)
}
println(fmt.Sprintf("更新城市偏移量:%v", time.Since(step)))
step = time.Now()
// 记录节点变更
err = RecordChanges(tx, changesCreate)
if err != nil {
return fmt.Errorf("记录节点变更失败:%w", err)
}
println(fmt.Sprintf("记录节点变更:%v", time.Since(step)))
step = time.Now()
// 提交所有网关配置到云端
for gateway, infos := range newConfigs {
edges := make([]jd.EdgeInfo, len(infos))
configsUpdate := make([]model.ConfigUpdate, 0)
// 统计变更数
change := 0
for i, info := range infos {
edges[i] = info.Remote
if info.Change {
configsUpdate = append(configsUpdate, info.Config)
change++
}
}
slog.Info(fmt.Sprintf("提交网关配置,网关:%s变更数%d", gateway.Macaddr, change))
// 更新配置版本
err = AppendGatewayConfigVersion(tx, gateway.Id)
if err != nil {
return fmt.Errorf("更新网关 %s 配置版本失败:%w", gateway.Macaddr, err)
}
// 更新本地配置
err = UpdateConfigs(tx, configsUpdate)
if err != nil {
return fmt.Errorf("更新网关 %s 本地配置失败:%w", gateway.Macaddr, err)
}
// 提交配置到云端:配置版本 gateway.ConfigVersion
if !arg.Mock {
err := jd.GatewayConfigSet(gateway.ConfigVersion, gateway.Macaddr, edges)
if err != nil {
return fmt.Errorf("配置网关 %s 失败:%w", gateway.Macaddr, err)
}
}
}
println(fmt.Sprintf("提交配置:%v", time.Since(step)))
println(fmt.Sprintf("总耗时:%v", time.Since(now)))
println(fmt.Sprint("更新总数:", len(changesCreate)))
return nil
}
type ConfigInfo struct {
Change bool
Remote jd.EdgeInfo
Config model.ConfigUpdate
}