diff --git a/.gitignore b/.gitignore index 7e77fca..2ef9344 100644 --- a/.gitignore +++ b/.gitignore @@ -27,4 +27,7 @@ go.work.sum .env # editor file -.idea/ \ No newline at end of file +.idea/ + +cmd/ +.vscode/ \ No newline at end of file diff --git a/README.md b/README.md index 962091e..78161b3 100644 --- a/README.md +++ b/README.md @@ -1,4 +1,45 @@ ## TODO 此实现目前并不是完全并发安全的: -- 目前事务等级没有对 cityhash 表的 offset 字段做防丢失,并发操作可能会出问题 \ No newline at end of file +- 目前事务等级没有对 cityhash 表的 offset 字段做防丢失,并发操作可能会出问题 + +## 模块逻辑 + +**更新网关配置** + +``` +获取所有网关 +获取所有配置 + +构建配置表 `map[网关][配置]` + +循环配置: + 如果配置项需要更新,记录更新信息 + +循环更新信息(加速数据库查询) + +上传配置到云端 +``` + +**同步节点信息(全量更新)** + +``` +获取所有启用城市 +循环城市: + 获取云端节点信息 + 获取本地节点信息 + + 更新列表 = 空 + 删除列表 = 本地节点 + 循环云端节点: + 更新列表 <= 云端节点 + 删除列表 x= 本地节点(同云端 macaddr) + + 更新并(软)删除节点信息 +``` + +### 统一节点调度 + +节点上下线: + +提供一个接口用来为节点加解锁 \ No newline at end of file diff --git a/actions/changes.go b/actions/changes.go index c2ff592..6b25a68 100644 --- a/actions/changes.go +++ b/actions/changes.go @@ -6,12 +6,12 @@ import ( "gorm.io/gorm" ) -func RecordChangeEdges(tx *gorm.DB, changes []model.ChangeEdge) error { +func RecordChanges(tx *gorm.DB, changes []model.Change) error { if len(changes) == 0 { return nil } - batchSize := 1000 + batchSize := 500 for i := 0; i < len(changes); i += batchSize { end := min(i+batchSize, len(changes)) batch := changes[i:end] @@ -22,7 +22,3 @@ func RecordChangeEdges(tx *gorm.DB, changes []model.ChangeEdge) error { } return nil } - -func RecordChangeCity(tx *gorm.DB, change model.ChangeCity) error { - return tx.Create(&change).Error -} diff --git a/actions/cities.go b/actions/cities.go index 81f8fed..2822584 100644 --- a/actions/cities.go +++ b/actions/cities.go @@ -37,16 +37,18 @@ func FindCitiesWithEdgesCount(tx *gorm.DB) ([]model.City, error) { return cities, nil } -func UpdateCityOffset(tx *gorm.DB, city int, offset int) error { - if offset < 0 { - return fmt.Errorf("offset must be non-negative") +func UpdateCitiesOffset(tx *gorm.DB, updates []model.City) error { + if len(updates) == 0 { + return nil } - err := tx.Model(&model.City{}). - Where("id = ?", city). - Update("offset", offset).Error - if err != nil { - return fmt.Errorf("failed to update cities offset: %w", err) + for _, city := range updates { + err := tx.Model(new(model.City)). + Where("id = ?", city.Id). + Update("offset", city.Offset).Error + if err != nil { + return fmt.Errorf("failed to update city id %d offset: %w", city.Id, err) + } } return nil diff --git a/actions/configs.go b/actions/configs.go index cd68ca9..0afa5fd 100644 --- a/actions/configs.go +++ b/actions/configs.go @@ -6,24 +6,15 @@ import ( "gorm.io/gorm" ) -func FindConfigsByGateway(tx *gorm.DB, macaddr string) ([]model.Config, error) { +func FindConfigs(tx *gorm.DB) ([]model.Config, error) { var configs []model.Config - err := tx.Find(&configs, "macaddr = ?", macaddr).Error + err := tx.Find(&configs).Error if err != nil { return nil, err } return configs, nil } -func CreateConfigs(tx *gorm.DB, configs []model.Config) error { - if len(configs) == 0 { - return nil - } - - // 使用事务批量插入配置 - return tx.Omit("createtime", "updatetime").Create(&configs).Error -} - func UpdateConfigs(tx *gorm.DB, configs []model.ConfigUpdate) error { if len(configs) == 0 { return nil @@ -31,7 +22,10 @@ func UpdateConfigs(tx *gorm.DB, configs []model.ConfigUpdate) error { // 批量更新配置 for _, config := range configs { - err := tx.Updates(&config).Error + err := tx. + Where("id = ?", config.Id). + Updates(&config). + Error if err != nil { return err } diff --git a/actions/edges.go b/actions/edges.go index fbce462..fea4c8b 100644 --- a/actions/edges.go +++ b/actions/edges.go @@ -32,21 +32,6 @@ func SaveEdges(tx *gorm.DB, edges []model.Edge) error { return nil } - // 分批处理边缘设备数据 - // for i := 0; i < len(edges); i += batchSize { - // end := min(i+batchSize, len(edges)) - - // batch := edges[i:end] - // err := tx.Clauses(clause.OnConflict{ - // Columns: []clause.Column{{Name: "macaddr"}}, - // UpdateAll: true, - // }).Create(&batch).Error - - // if err != nil { - // return fmt.Errorf("failed to save edges batch %d-%d: %w", i, end-1, err) - // } - // } - // 手动区分需要创建和更新的节点,以避免 on conflict 更新的 id 占用问题 macAddrs := make([]string, len(edges)) for i, e := range edges { diff --git a/actions/update.go b/actions/update.go index 37b1b6e..25ccb70 100644 --- a/actions/update.go +++ b/actions/update.go @@ -3,7 +3,6 @@ package actions import ( "fmt" "log/slog" - "strconv" "time" "zzman/clients/jd" "zzman/model" @@ -31,65 +30,75 @@ func Update(args ...UpdateArgs) error { func update(tx *gorm.DB, arg UpdateArgs) error { var now = time.Now() + var step = now + // 获取所有网关 gateways, err := FindGateways(tx) if err != nil { return fmt.Errorf("获取所有网关失败:%w", err) } + var findGateway = make(map[string]model.Gateway) + for _, gateway := range gateways { + findGateway[gateway.Macaddr] = gateway + } + println(fmt.Sprintf("获取网关:%v", time.Since(step))) + step = time.Now() + + // 获取所有城市 cities, err := FindCitiesWithEdgesCount(tx) if err != nil { return fmt.Errorf("获取所有城市失败:%w", err) } - - // 准备网关查找表,初始化网关配置表 - type ConfigInfo struct { - Item jd.EdgeInfo - Type int // 0: 不变, 1: 更新, 2: 新增 + var findCity = make(map[string]model.City) + for _, city := range cities { + findCity[city.Hash] = city } - var oldConfigsMap = make(map[model.Gateway]map[string]model.Config) + + println(fmt.Sprintf("获取城市:%v", time.Since(step))) + step = time.Now() + + // 获取所有配置 + configs, err := FindConfigs(tx) + if err != nil { + return fmt.Errorf("获取所有配置失败:%w", err) + } + var findConfig = make(map[string]model.Config) + for _, config := range configs { + findConfig[config.Cityhash+":"+config.GatewayMac] = config + } + + println(fmt.Sprintf("获取配置:%v", time.Since(step))) + step = time.Now() + + // 更新网关配置 var newConfigs = make(map[model.Gateway][]ConfigInfo) - for _, gateway := range gateways { - newConfigs[gateway] = make([]ConfigInfo, len(cities)) + var changes []model.Change - oldConfigs, err := FindConfigsByGateway(tx, gateway.Macaddr) - if err != nil { - return fmt.Errorf("获取网关 %s 城市节点失败:%w", gateway.Macaddr, err) - } + for _, city := range cities { - oldConfigsMap[gateway] = make(map[string]model.Config) - for _, config := range oldConfigs { - oldConfigsMap[gateway][config.Cityhash] = config - } - } - - // 按城市循环,对比并更新网关配置 - for iCity, city := range cities { - - // 如果每个网关在此城市都有节点且无需改变,就不需要重新分配节点 - // 相当于直接重新提交配置,此流程下配置更新是幂等的 - var gateways2Change []model.Gateway + // 先处理不变更的节点 + configsUpdate := make([]model.Config, 0) for _, gateway := range gateways { - oldConfig, exists := oldConfigsMap[gateway][city.Hash] + oldConfig, exists := findConfig[city.Hash+":"+gateway.Macaddr] if exists && oldConfig.IsChange != 1 { - newConfigs[gateway][iCity] = ConfigInfo{ - Type: 0, // 不变 - Item: jd.EdgeInfo{ + newConfigs[gateway] = append(newConfigs[gateway], ConfigInfo{ + Change: false, + Remote: jd.EdgeInfo{ Mac: oldConfig.Macaddr, City: oldConfig.Cityhash, }, - } + }) } else { - gateways2Change = append(gateways2Change, gateway) + configsUpdate = append(configsUpdate, oldConfig) } } - - count := len(gateways2Change) - if len(gateways2Change) == 0 { + count := len(configsUpdate) + if len(configsUpdate) == 0 { continue } - // 否则获取足量新节点 + // 如果有需要变更的节点,获取足量新节点 offset := city.Offset if count > city.EdgesCount { slog.Warn(fmt.Sprintf("城市节点数量不足,跳过本次更新,城市:%s,节点数:%d,网关数:%d", city.Name, city.EdgesCount, count)) @@ -108,118 +117,87 @@ func update(tx *gorm.DB, arg UpdateArgs) error { } } - offset = edges[len(edges)-1].Id + city.Offset = edges[len(edges)-1].Id - // 更新网关配置 - var configs2Create []model.Config - var configs2Update []model.ConfigUpdate - var changes []model.ChangeEdge + // 分配新节点 + for i, oldConfig := range configsUpdate { - for iGateway, gateway := range gateways2Change { + gateway := findGateway[oldConfig.GatewayMac] + edge := edges[i] - oldConfig, exists := oldConfigsMap[gateway][city.Hash] - newConfig := edges[iGateway] + slog.Debug(fmt.Sprintf("网关配置变更,网关:%s,旧节点:%s,新节点:%s", gateway.Macaddr, oldConfig.Macaddr, edge.Macaddr)) - if exists { - slog.Debug(fmt.Sprintf("网关配置变更,网关:%s,旧节点:%s,新节点:%s", gateway.Macaddr, oldConfig.Macaddr, newConfig.Macaddr)) - - configs2Update = append(configs2Update, model.ConfigUpdate{ + newConfigs[gateway] = append(newConfigs[gateway], ConfigInfo{ + Change: true, + Remote: jd.EdgeInfo{ + City: city.Hash, + Mac: edge.Macaddr, + }, + Config: model.ConfigUpdate{ Id: oldConfig.Id, - Macaddr: u.P(oldConfig.Macaddr), + Macaddr: u.P(edge.Macaddr), IsChange: u.P(0), - }) - - changes = append(changes, model.ChangeEdge{ - Time: now, - CityId: city.Id, - Gateway: gateway.Macaddr, - OldEdge: oldConfig.Macaddr, - NewEdge: newConfig.Macaddr, - }) - - newConfigs[gateway][iCity] = ConfigInfo{ - Type: 1, // 更新 - Item: jd.EdgeInfo{ - Mac: oldConfig.Macaddr, - City: city.Hash, - }, - } - } else { - slog.Debug(fmt.Sprintf("网关配置新增,网关:%s,新节点:%s", gateway.Macaddr, newConfig.Macaddr)) - - configs2Create = append(configs2Create, model.Config{ - Cityhash: city.Hash, - CityLabel: city.Label, - GatewayMac: gateway.Macaddr, - Macaddr: newConfig.Macaddr, - Table: strconv.Itoa(iCity + 1), - User: fmt.Sprintf("jdzz%ddt%d", gateway.Id, iCity+1), - Network: fmt.Sprintf("172.30.168.%d", iCity+2), - InnerIp: fmt.Sprintf("172.16.%d.%d", gateway.Id, iCity+2), - }) - - changes = append(changes, model.ChangeEdge{ - Time: now, - CityId: city.Id, - Gateway: gateway.Macaddr, - OldEdge: "", - NewEdge: newConfig.Macaddr, - }) - - newConfigs[gateway][iCity] = ConfigInfo{ - Type: 2, // 新增 - Item: jd.EdgeInfo{ - Mac: oldConfig.Macaddr, - City: city.Hash, - }, - } - } - } - - err = CreateConfigs(tx, configs2Create) - if err != nil { - return fmt.Errorf("创建新配置失败:%w", err) - } - err = UpdateConfigs(tx, configs2Update) - if err != nil { - return fmt.Errorf("更新配置失败:%w", err) - } - err = UpdateCityOffset(tx, city.Id, offset) - if err != nil { - return fmt.Errorf("更新城市 %s 的偏移量失败:%w", city.Name, err) - } - - err = RecordChangeCity(tx, model.ChangeCity{ - Time: now, - CityId: city.Id, - Count: count, - OffsetOld: city.Offset, - OffsetNew: offset, - }) - if err != nil { - return fmt.Errorf("记录城市变更失败:%w", err) - } - err = RecordChangeEdges(tx, changes) - if err != nil { - return fmt.Errorf("记录节点变更失败:%w", err) + IsOnline: u.P(1), + }, + }) + changes = append(changes, model.Change{ + Time: now, + CityId: city.Id, + Gateway: gateway.Macaddr, + OldEdge: oldConfig.Macaddr, + NewEdge: edge.Macaddr, + }) } } + println(fmt.Sprintf("循环分配:%v", time.Since(step))) + step = time.Now() + + // 更新城市偏移量 + err = UpdateCitiesOffset(tx, cities) + if err != nil { + return fmt.Errorf("更新城市偏移量失败:%w", err) + } + + println(fmt.Sprintf("更新城市偏移量:%v", time.Since(step))) + step = time.Now() + + // 记录节点变更 + err = RecordChanges(tx, changes) + if err != nil { + return fmt.Errorf("记录节点变更失败:%w", err) + } + + println(fmt.Sprintf("记录节点变更:%v", time.Since(step))) + step = time.Now() + // 提交所有网关配置到云端 for gateway, infos := range newConfigs { - change := 0 - setup := 0 edges := make([]jd.EdgeInfo, len(infos)) + configsUpdate := make([]model.ConfigUpdate, 0) + + // 统计变更数 + change := 0 for i, info := range infos { - edges[i] = info.Item - switch info.Type { - case 1: + edges[i] = info.Remote + if info.Change { + configsUpdate = append(configsUpdate, info.Config) change++ - case 2: - setup++ } } - slog.Info(fmt.Sprintf("提交网关配置,网关:%s,变更数:%d,新增数:%d", gateway.Macaddr, change, setup)) + slog.Info(fmt.Sprintf("提交网关配置,网关:%s,变更数:%d", gateway.Macaddr, change)) + + // 更新配置版本 + err = AppendGatewayConfigVersion(tx, gateway.Id) + if err != nil { + return fmt.Errorf("更新网关 %s 配置版本失败:%w", gateway.Macaddr, err) + } + + // 更新本地配置 + err = UpdateConfigs(tx, configsUpdate) + if err != nil { + return fmt.Errorf("更新网关 %s 本地配置失败:%w", gateway.Macaddr, err) + } // 提交配置到云端:配置版本 gateway.ConfigVersion if !arg.Mock { @@ -228,13 +206,16 @@ func update(tx *gorm.DB, arg UpdateArgs) error { return fmt.Errorf("配置网关 %s 失败:%w", gateway.Macaddr, err) } } - - // 更新配置版本 - err = AppendGatewayConfigVersion(tx, gateway.Id) - if err != nil { - return fmt.Errorf("更新网关 %s 配置版本失败:%w", gateway.Macaddr, err) - } } + println(fmt.Sprintf("提交配置:%v", time.Since(step))) + + println(fmt.Sprintf("总耗时:%v", time.Since(now))) return nil } + +type ConfigInfo struct { + Change bool + Remote jd.EdgeInfo + Config model.ConfigUpdate +} diff --git a/clients/redis.go b/clients/redis.go index 2024195..970ee18 100644 --- a/clients/redis.go +++ b/clients/redis.go @@ -4,7 +4,7 @@ import ( "log/slog" "os" - redis "github.com/redis/go-redis/v9" + "github.com/redis/go-redis/v9" ) var Redis *redis.Client diff --git a/cmd/test.go b/cmd/test.go deleted file mode 100644 index c88420c..0000000 --- a/cmd/test.go +++ /dev/null @@ -1,34 +0,0 @@ -package main - -import ( - "fmt" - "log/slog" - "zzman/actions" - "zzman/clients" - "zzman/model" - - "github.com/joho/godotenv" -) - -func main() { - // 初始化环境 - slog.Debug("初始化环境变量") - err := godotenv.Load() - if err != nil { - slog.Error(fmt.Errorf("初始化变量失败:%w", err).Error()) - } - - // 初始化数据库和 Redis - model.Init() - defer model.Close() - - clients.InitRedis() - defer clients.CloseRedis() - - // 测试功能 - actions.Update(actions.UpdateArgs{ - Mock: true, - }) - - // actions.Sync() -} diff --git a/docker-compose.yaml b/docker-compose.yaml index ef5a2fc..32baf66 100644 --- a/docker-compose.yaml +++ b/docker-compose.yaml @@ -5,19 +5,19 @@ services: redis: image: redis:7 environment: - REDIS_PASSWORD: 123456 + REDIS_PASSWORD: ${REDIS_PASSWORD} ports: - - "6379:6379" + - "${REDIS_PORT}:6379" volumes: - redis_data:/data mariadb: image: mariadb:10 environment: - MYSQL_ROOT_PASSWORD: "hljuip916..." - MYSQL_DATABASE: jdbox + MYSQL_ROOT_PASSWORD: ${MYSQL_PASSWORD} + MYSQL_DATABASE: ${MYSQL_DATABASE} ports: - - "3306:3306" + - "${MYSQL_PORT}:3306" volumes: - mariadb_data:/var/lib/mysql diff --git a/go.mod b/go.mod index ce0576c..9c8ec9b 100644 --- a/go.mod +++ b/go.mod @@ -5,6 +5,7 @@ go 1.24.0 require ( github.com/joho/godotenv v1.5.1 github.com/redis/go-redis/v9 v9.11.0 + github.com/systemd/slog-journal v0.1.0 gorm.io/driver/mysql v1.6.0 gorm.io/gorm v1.30.1 ) @@ -16,7 +17,6 @@ require ( github.com/go-sql-driver/mysql v1.9.3 // indirect github.com/jinzhu/inflection v1.0.0 // indirect github.com/jinzhu/now v1.1.5 // indirect - github.com/systemd/slog-journal v0.1.0 // indirect golang.org/x/sys v0.29.0 // indirect golang.org/x/text v0.27.0 // indirect ) diff --git a/model/change-city.go b/model/change-city.go deleted file mode 100644 index e0866f6..0000000 --- a/model/change-city.go +++ /dev/null @@ -1,16 +0,0 @@ -package model - -import "time" - -type ChangeCity struct { - Id int `gorm:"column:id;primaryKey"` - Time time.Time `gorm:"column:time"` - CityId int `gorm:"column:city_id"` - Count int `gorm:"column:count"` - OffsetOld int `gorm:"column:offset_old"` - OffsetNew int `gorm:"column:offset_new"` -} - -func (ChangeCity) TableName() string { - return "change_city" -} diff --git a/model/change-edge.go b/model/change.go similarity index 86% rename from model/change-edge.go rename to model/change.go index 3561241..3f8defc 100644 --- a/model/change-edge.go +++ b/model/change.go @@ -2,7 +2,7 @@ package model import "time" -type ChangeEdge struct { +type Change struct { Id int `gorm:"column:id;primaryKey"` Time time.Time `gorm:"column:time"` CityId int `gorm:"column:city"` @@ -13,6 +13,6 @@ type ChangeEdge struct { Info string `gorm:"column:info"` } -func (ChangeEdge) TableName() string { +func (Change) TableName() string { return "change" }