package actions import ( "fmt" "jhman/clients/jd" "jhman/model" u "jhman/util" "log/slog" "os" "time" "gorm.io/gorm" ) type UpdateArgs struct { Mock bool } func Update(args ...UpdateArgs) error { var arg UpdateArgs if len(args) > 0 { arg = args[0] } else { arg.Mock = false } return model.DB.Transaction(func(tx *gorm.DB) error { return update(tx, arg) }) } func update(tx *gorm.DB, arg UpdateArgs) error { var now = time.Now() var step = now // 获取所有网关 gateways, err := FindGateways(tx) if err != nil { return fmt.Errorf("获取所有网关失败:%w", err) } var findGateway = make(map[string]model.Gateway) for _, gateway := range gateways { findGateway[gateway.Macaddr] = gateway } println(fmt.Sprintf("获取网关:%v", time.Since(step))) step = time.Now() // 获取所有城市 cities, err := FindCitiesWithEdgesCount(tx) if err != nil { return fmt.Errorf("获取所有城市失败:%w", err) } var findCity = make(map[string]model.City) for _, city := range cities { findCity[city.Hash] = city } println(fmt.Sprintf("获取城市:%v", time.Since(step))) step = time.Now() // 获取所有配置 configs, err := FindConfigs(tx) if err != nil { return fmt.Errorf("获取所有配置失败:%w", err) } println(fmt.Sprintf("获取配置:%v", time.Since(step))) step = time.Now() // 查找需要更新的配置 oldConfigs := make(map[model.City][]model.Config) newConfigs := make(map[model.Gateway][]ConfigInfo) for _, gateway := range gateways { newConfigs[gateway] = make([]ConfigInfo, 250) // 预分配空间,减少扩容 } for _, config := range configs { city := findCity[config.Cityhash] gateway := findGateway[config.GatewayMac] if config.IsChange == 1 { oldConfigs[city] = append(oldConfigs[city], config) } else { newConfigs[gateway][config.Table-1] = ConfigInfo{ Change: false, Remote: jd.EdgeInfo{ Mac: config.EdgeMac, City: config.Cityhash, Network: config.Network, }, } } } println(fmt.Sprintf("查找更新:%v", time.Since(step))) step = time.Now() // 更新网关配置 var citiesUpdate []model.City var changesCreate []model.Change for city, configs := range oldConfigs { // 如果有需要变更的节点,获取足量新节点 count := len(configs) if count > city.EdgesCount { slog.Warn(fmt.Sprintf("城市节点数量不足,跳过本次更新,城市:%s,节点数:%d,网关数:%d", city.Name, city.EdgesCount, count)) continue } edges, err := SliceActiveEdges(tx, city.Id, city.Offset, count) if err != nil { return fmt.Errorf("查询城市 %s 可用节点失败:%w", city.Name, err) } if len(edges) < count { slog.Debug(fmt.Sprintf("城市节点不足,将循环使用节点,城市:%s,节点数:%d,网关数:%d", city.Name, city.EdgesCount, count)) edges, err = SliceActiveEdges(tx, city.Id, 0, count) if err != nil { return fmt.Errorf("查询城市 %s 可用节点失败:%w", city.Name, err) } } citiesUpdate = append(citiesUpdate, model.City{ Id: city.Id, Offset: edges[len(edges)-1].Id, }) // 分配新节点 for i, config := range configs { gateway := findGateway[config.GatewayMac] edge := edges[i] slog.Debug(fmt.Sprintf("网关配置变更,网关:%s,旧节点:%s,新节点:%s", gateway.Macaddr, config.EdgeMac, edge.Macaddr)) newConfigs[gateway][config.Table-1] = ConfigInfo{ Change: true, Remote: jd.EdgeInfo{ Mac: edge.Macaddr, City: city.Hash, Network: config.Network, }, Config: model.ConfigUpdate{ Id: config.Id, EdgeMac: u.P(edge.Macaddr), IsChange: u.P(0), IsOnline: u.P(0), }, } changesCreate = append(changesCreate, model.Change{ Time: now, CityId: city.Id, Gateway: gateway.Macaddr, OldEdge: config.EdgeMac, NewEdge: edge.Macaddr, }) } } println(fmt.Sprintf("更新配置:%v", time.Since(step))) step = time.Now() // 更新城市偏移量 err = UpdateCitiesOffset(tx, citiesUpdate) if err != nil { return fmt.Errorf("更新城市偏移量失败:%w", err) } println(fmt.Sprintf("更新城市偏移量:%v", time.Since(step))) step = time.Now() // 记录节点变更 if os.Getenv("DEBUG") == "true" { err = RecordChanges(tx, changesCreate) if err != nil { return fmt.Errorf("记录节点变更失败:%w", err) } } println(fmt.Sprintf("记录节点变更:%v", time.Since(step))) step = time.Now() // 提交所有网关配置到云端 for gateway, infos := range newConfigs { edges := make([]jd.EdgeInfo, len(infos)) configsUpdate := make([]model.ConfigUpdate, 0) // 统计变更数 change := 0 for i, info := range infos { edges[i] = info.Remote if info.Change { configsUpdate = append(configsUpdate, info.Config) change++ } } slog.Info(fmt.Sprintf("提交网关配置,网关:%s,变更数:%d", gateway.Macaddr, change)) // 更新配置版本 err = AppendGatewayConfigVersion(tx, gateway.Id) if err != nil { return fmt.Errorf("更新网关 %s 配置版本失败:%w", gateway.Macaddr, err) } // 更新本地配置 err = UpdateConfigs(tx, configsUpdate) if err != nil { return fmt.Errorf("更新网关 %s 本地配置失败:%w", gateway.Macaddr, err) } // 记录提交配置 if os.Getenv("DEBUG") == "true" { err = RecordSubmit(now, gateway, edges) if err != nil { return fmt.Errorf("记录网关 %s 提交配置失败:%w", gateway.Macaddr, err) } } // 提交配置到云端:配置版本 gateway.ConfigVersion if !arg.Mock { err := jd.GatewayConfigSet(gateway.ConfigVersion, gateway.Macaddr, edges) if err != nil { return fmt.Errorf("配置网关 %s 失败:%w", gateway.Macaddr, err) } } } println(fmt.Sprintf("提交配置:%v", time.Since(step))) println(fmt.Sprintf("总耗时:%v", time.Since(now))) println(fmt.Sprint("更新总数:", len(changesCreate))) return nil } type ConfigInfo struct { Change bool Remote jd.EdgeInfo Config model.ConfigUpdate }