建立仓库

This commit is contained in:
2025-08-05 10:51:35 +08:00
commit 4bbc05fe1f
36 changed files with 1946 additions and 0 deletions

43
actions/cities.go Normal file
View File

@@ -0,0 +1,43 @@
package actions
import (
"fmt"
"gorm.io/gorm"
"zzman/model"
)
func FindCities(tx *gorm.DB) ([]model.City, error) {
var cities []model.City
if err := tx.Find(&cities, "label is not null").Error; err != nil {
return nil, fmt.Errorf("failed to find cities: %w", err)
}
return cities, nil
}
func FindCitiesWithEdgesCount(tx *gorm.DB) ([]model.City, error) {
var cities []model.City
err := tx.Debug().
Select("cities.*, COUNT(edges.id) AS edges_count").
Joins("LEFT JOIN edges ON edges.city_id = cities.id").
Group("cities.id").
Find(&cities).Error
if err != nil {
return nil, fmt.Errorf("failed to find cities with edges count: %w", err)
}
return cities, nil
}
func AppendCityOffset(tx *gorm.DB, city int, offset int) error {
if offset < 0 {
return fmt.Errorf("offset must be non-negative")
}
err := tx.Model(&model.City{}).
Where("id = ?", city).
Update("offset", offset).Error
if err != nil {
return fmt.Errorf("failed to update cities offset: %w", err)
}
return nil
}

33
actions/configs.go Normal file
View File

@@ -0,0 +1,33 @@
package actions
import (
"gorm.io/gorm"
"zzman/model"
)
func FindConfigsByGateway(tx *gorm.DB, macaddr string) ([]model.Config, error) {
var configs []model.Config
err := tx.Find(&configs, "macaddr = ?", macaddr).Error
if err != nil {
return nil, err
}
return configs, nil
}
func CreateConfigs(tx *gorm.DB, configs []model.Config) error {
if len(configs) == 0 {
return nil
}
// 使用事务批量插入配置
return tx.Create(&configs).Error
}
func UpdateConfigs(tx *gorm.DB, configs []model.ConfigUpdate) error {
if len(configs) == 0 {
return nil
}
// 使用事务批量更新配置
return tx.Updates(&configs).Error
}

73
actions/edges.go Normal file
View File

@@ -0,0 +1,73 @@
package actions
import (
"fmt"
"gorm.io/gorm"
"gorm.io/gorm/clause"
"zzman/model"
)
const batchSize = 1000
func FindEdgesByCity(tx *gorm.DB, cityId int) ([]model.Edge, error) {
var edges []model.Edge
err := tx.Find(&edges, "city_id = ?", cityId).Error
if err != nil {
return nil, fmt.Errorf("failed to find edges: %w", err)
}
return edges, nil
}
func SliceActiveEdgesByCity(tx *gorm.DB, cityId int, offset int, limit int) ([]model.Edge, error) {
var edges []model.Edge
err := tx.Limit(limit).Offset(offset).Find(&edges, "city_id = ? and active = 1", cityId).Error
if err != nil {
return nil, fmt.Errorf("failed to find edges with offset: %w", err)
}
return edges, nil
}
func SaveEdges(tx *gorm.DB, edges []model.Edge) error {
if len(edges) == 0 {
return nil
}
// 分批处理边缘设备数据
for i := 0; i < len(edges); i += batchSize {
end := i + batchSize
if end > len(edges) {
end = len(edges)
}
batch := edges[i:end]
err := tx.Clauses(clause.OnConflict{
Columns: []clause.Column{{Name: "macaddr"}},
UpdateAll: true,
}).Create(&batch).Error
if err != nil {
return fmt.Errorf("failed to save edges batch %d-%d: %w", i, end-1, err)
}
}
return nil
}
func DisableEdgesByMacs(tx *gorm.DB, macs []string) error {
if len(macs) == 0 {
return nil
}
// 分批处理MAC地址列表
for i := 0; i < len(macs); i += batchSize {
end := i + batchSize
if end > len(macs) {
end = len(macs)
}
batch := macs[i:end]
err := tx.Model(&model.Edge{}).Where("macaddr IN ?", batch).Update("active", false).Error
if err != nil {
return fmt.Errorf("failed to disable edges batch %d-%d: %w", i, end-1, err)
}
}
return nil
}

28
actions/gateways.go Normal file
View File

@@ -0,0 +1,28 @@
package actions
import (
"fmt"
"gorm.io/gorm"
"math"
"zzman/model"
)
func FindGateways(tx *gorm.DB) ([]model.Gateway, error) {
var gateways []model.Gateway
err := tx.Find(gateways).Error
if err != nil {
return nil, err
}
return gateways, nil
}
func AppendGatewayConfigVersion(tx *gorm.DB, gateway int) error {
expr := fmt.Sprintf("if(setid = %d, 1, setid + 1)", math.MaxInt32)
tx.Model(&model.Gateway{}).
Where("id = ?", gateway).
Update("setid", gorm.Expr(expr))
if tx.Error != nil {
return fmt.Errorf("更新网关配置版本失败:%w", tx.Error)
}
return nil
}

98
actions/sync.go Normal file
View File

@@ -0,0 +1,98 @@
package actions
import (
"fmt"
"gorm.io/gorm"
"log/slog"
"zzman/clients/jd"
"zzman/model"
)
// Sync 同步城市节点数据
//
// 数据库中保留云端已经移除的节点是有必要的,因为在节点轮换时需要保证能够从正确的位置开始,
// 而其之前所用的节点可能已经被云端移除,因此需要保留这些节点的记录以保持正确的轮换顺位。
func Sync() (err error) {
slog.Info("开始同步城市节点数据")
// 获取需要更新的城市列表
cities, err := FindCities(model.DB)
if err != nil {
return fmt.Errorf("获取所有城市失败: %w", err)
}
slog.Info("成功获取城市列表", slog.Int("城市数量", len(cities)))
// 获取所有城市的节点数据
for i, city := range cities {
slog.Info("正在同步城市", slog.String("城市", city.Name), slog.String("哈希", city.Hash))
// 新节点信息
resp, err := jd.EdgeDevice(jd.EdgeDeviceReq{
Geo: city.Hash,
Offset: 0,
Num: 1000000,
})
if err != nil {
return fmt.Errorf("获取城市 %s:%s 的边缘设备失败: %w", city.Name, city.Hash, err)
}
var newEdges = resp.Edges
slog.Info("获取节点数据完成", slog.String("城市", city.Name), slog.Int("节点数量", len(newEdges)))
err = model.DB.Transaction(func(tx *gorm.DB) error {
// 旧节点信息
oldEdges, err := FindEdgesByCity(tx, city.Id)
if err != nil {
return fmt.Errorf("获取所有边缘节点MAC地址失败: %w", err)
}
var oldEdgesMacSet = make(map[string]struct{}, len(oldEdges))
for _, edge := range oldEdges {
oldEdgesMacSet[edge.Macaddr] = struct{}{}
}
// 对比并更新节点信息(全量更新)
var edgeSaves = make([]model.Edge, 0)
for _, edge := range newEdges {
edgeSaves = append(edgeSaves, model.Edge{
CityId: city.Id,
Macaddr: edge.Macaddr,
Public: edge.Public,
Isp: edge.Isp,
Single: edge.Single,
Sole: edge.Sole,
Arch: edge.Arch,
Online: edge.Online,
Active: true,
})
delete(oldEdgesMacSet, edge.Macaddr)
}
// 更新现有数据
if err = SaveEdges(tx, edgeSaves); err != nil {
return fmt.Errorf("批量保存边缘节点失败: %w", err)
}
// 删除旧节点信息
var oldEdgesMacs = make([]string, 0, len(oldEdgesMacSet))
for mac := range oldEdgesMacSet {
oldEdgesMacs = append(oldEdgesMacs, mac)
}
if err = DisableEdgesByMacs(tx, oldEdgesMacs); err != nil {
return fmt.Errorf("通过MAC地址删除边缘节点失败: %w", err)
}
slog.Info("城市同步完成",
slog.Int("新节点", len(newEdges)),
slog.Int("旧节点", len(oldEdges)),
slog.Int("移除", len(oldEdgesMacs)),
slog.Int("同步", len(edgeSaves)),
slog.String("进度", fmt.Sprintf("%d/%d", i+1, len(cities))),
)
return nil
})
if err != nil {
return fmt.Errorf("城市 %s 事务处理失败: %w", city.Name, err)
}
}
return nil
}

196
actions/update.go Normal file
View File

@@ -0,0 +1,196 @@
package actions
import (
"fmt"
"log/slog"
"strconv"
"zzman/clients/jd"
"zzman/model"
u "zzman/util"
"gorm.io/gorm"
)
type UpdateArgs struct {
Mock bool
}
func Update(tx *gorm.DB, args ...UpdateArgs) error {
var arg UpdateArgs
if len(args) > 0 {
arg = args[0]
} else {
arg.Mock = false
}
gateways, err := FindGateways(tx)
if err != nil {
return fmt.Errorf("获取所有网关失败:%w", err)
}
cities, err := FindCitiesWithEdgesCount(tx)
if err != nil {
return fmt.Errorf("获取所有城市失败:%w", err)
}
// 准备网关查找表,初始化网关配置表
type ConfigInfo struct {
Item jd.EdgeInfo
Type int // 0: 不变, 1: 更新, 2: 新增
}
var oldConfigsMap = make(map[model.Gateway]map[string]model.Config)
var newConfigs = make(map[model.Gateway][]ConfigInfo)
for _, gateway := range gateways {
newConfigs[gateway] = make([]ConfigInfo, len(cities))
oldConfigs, err := FindConfigsByGateway(tx, gateway.Macaddr)
if err != nil {
return fmt.Errorf("获取网关 %s 城市节点失败:%w", gateway.Macaddr, err)
}
oldConfigsMap[gateway] = make(map[string]model.Config)
for _, config := range oldConfigs {
oldConfigsMap[gateway][config.Cityhash] = config
}
}
// 按城市循环,对比并更新网关配置
for iCity, city := range cities {
// 如果每个网关在此城市都有节点且无需改变,就不需要从云端拉取城市节点信息
// 相当于直接重新提交配置,此流程下配置更新是幂等的
var gateways2Update []model.Gateway
for _, gateway := range gateways {
oldConfig, exists := oldConfigsMap[gateway][city.Hash]
if exists && oldConfig.IsChange != 1 {
newConfigs[gateway][iCity] = ConfigInfo{
Type: 0, // 不变
Item: jd.EdgeInfo{
Mac: oldConfig.Macaddr,
City: oldConfig.Cityhash,
},
}
} else {
gateways2Update = append(gateways2Update, gateway)
}
}
if len(gateways2Update) == 0 {
continue
}
// 否则,正常从云端拉取城市节点信息
offset := city.Offset
count := len(gateways2Update)
if count > city.EdgesCount {
slog.Warn("城市节点数量不足,跳过本次更新", "城市", city.Name, "节点数", city.EdgesCount, "网关数", count)
continue
}
if offset+count > city.EdgesCount {
slog.Debug("城市节点不足,将循环使用节点", "城市", city.Name, "节点数", city.EdgesCount, "网关数", count)
offset = 0
}
edges, err := SliceActiveEdgesByCity(tx, city.Id, offset, count)
if err != nil {
return fmt.Errorf("查询城市 %s 可用节点失败:%w", city.Name, err)
}
// 更新网关配置
var configs2Create []model.Config
var configs2Update []model.ConfigUpdate
for iGateway, gateway := range gateways2Update {
oldConfig, exists := oldConfigsMap[gateway][city.Hash]
newConfig := edges[iGateway]
if exists {
fmt.Printf("\t网关 %s 变更节点: %s -> %s\n", gateway.Macaddr, oldConfig.Macaddr, newConfig.Macaddr)
configs2Update = append(configs2Update, model.ConfigUpdate{
Id: oldConfig.Id,
Macaddr: u.P(oldConfig.Macaddr),
IsChange: u.P(0),
})
newConfigs[gateway][iCity] = ConfigInfo{
Type: 1, // 更新
Item: jd.EdgeInfo{
Mac: oldConfig.Macaddr,
City: city.Hash,
},
}
} else {
fmt.Printf("\t网关 %s 新增节点: %s\n", gateway.Macaddr, newConfig.Macaddr)
configs2Create = append(configs2Create, model.Config{
Cityhash: city.Hash,
CityLabel: city.Label,
GatewayMac: gateway.Macaddr,
Macaddr: newConfig.Macaddr,
Table: strconv.Itoa(iCity + 1),
User: fmt.Sprintf("jdzz%ddt%d", iGateway+1, iCity+1),
Network: fmt.Sprintf("172.30.168.%d", iCity+2),
InnerIp: fmt.Sprintf("172.16.%d.%d", iGateway+1, iCity+2),
})
newConfigs[gateway][iCity] = ConfigInfo{
Type: 2, // 新增
Item: jd.EdgeInfo{
Mac: oldConfig.Macaddr,
City: city.Hash,
},
}
}
}
err = CreateConfigs(tx, configs2Create)
if err != nil {
return fmt.Errorf("创建新配置失败:%w", err)
}
err = UpdateConfigs(tx, configs2Update)
if err != nil {
return fmt.Errorf("更新配置失败:%w", err)
}
err = AppendCityOffset(tx, city.Id, offset+count)
if err != nil {
return fmt.Errorf("更新城市 %s 的偏移量失败:%w", city.Name, err)
}
}
// 提交所有网关配置到云端
for gateway, infos := range newConfigs {
change := 0
setup := 0
edges := make([]jd.EdgeInfo, len(infos))
for i, info := range infos {
edges[i] = info.Item
switch info.Type {
case 1:
change++
case 2:
setup++
}
}
fmt.Printf("提交网关 %s 配置: %d 变更, %d 新增\n", gateway.Macaddr, change, setup)
// 提交配置到云端:配置版本 gateway.ConfigVersion
if arg.Mock {
fmt.Printf("[MOCK] 配置网关 %s:\n%v\n", gateway.Macaddr, edges)
} else {
err := jd.GatewayConfigSet(gateway.ConfigVersion, gateway.Macaddr, edges)
if err != nil {
return fmt.Errorf("配置网关 %s 失败:%w", gateway.Macaddr, err)
}
}
// 更新配置版本
err = AppendGatewayConfigVersion(tx, gateway.Id)
if err != nil {
return fmt.Errorf("更新网关 %s 配置版本失败:%w", gateway.Macaddr, err)
}
}
return nil
}