diff --git a/README.md b/README.md index e6fbf4d..962091e 100644 --- a/README.md +++ b/README.md @@ -1,7 +1,4 @@ ## TODO -update 操作: -- 如果网关是禁用状态,则清空配置(目前是只查询启用的节点 - 此实现目前并不是完全并发安全的: - 目前事务等级没有对 cityhash 表的 offset 字段做防丢失,并发操作可能会出问题 \ No newline at end of file diff --git a/actions/cities.go b/actions/cities.go index 0c943de..81f8fed 100644 --- a/actions/cities.go +++ b/actions/cities.go @@ -37,7 +37,7 @@ func FindCitiesWithEdgesCount(tx *gorm.DB) ([]model.City, error) { return cities, nil } -func AppendCityOffset(tx *gorm.DB, city int, offset int) error { +func UpdateCityOffset(tx *gorm.DB, city int, offset int) error { if offset < 0 { return fmt.Errorf("offset must be non-negative") } diff --git a/actions/edges.go b/actions/edges.go index e2abc76..cb5115c 100644 --- a/actions/edges.go +++ b/actions/edges.go @@ -2,9 +2,10 @@ package actions import ( "fmt" + "zzman/model" + "gorm.io/gorm" "gorm.io/gorm/clause" - "zzman/model" ) const batchSize = 1000 @@ -18,9 +19,9 @@ func FindEdgesByCity(tx *gorm.DB, cityId int) ([]model.Edge, error) { return edges, nil } -func SliceActiveEdgesByCity(tx *gorm.DB, cityId int, offset int, limit int) ([]model.Edge, error) { +func SliceActiveEdges(tx *gorm.DB, cityId int, afterEdgeId int, count int) ([]model.Edge, error) { var edges []model.Edge - err := tx.Limit(limit).Offset(offset).Find(&edges, "city_id = ? and active = 1", cityId).Error + err := tx.Limit(count).Find(&edges, "id > ? and city_id = ? and active = 1", afterEdgeId, cityId).Error if err != nil { return nil, fmt.Errorf("failed to find edges with offset: %w", err) } @@ -34,10 +35,7 @@ func SaveEdges(tx *gorm.DB, edges []model.Edge) error { // 分批处理边缘设备数据 for i := 0; i < len(edges); i += batchSize { - end := i + batchSize - if end > len(edges) { - end = len(edges) - } + end := min(i+batchSize, len(edges)) batch := edges[i:end] err := tx.Clauses(clause.OnConflict{ diff --git a/actions/update.go b/actions/update.go index 7950a20..e19898b 100644 --- a/actions/update.go +++ b/actions/update.go @@ -7,6 +7,8 @@ import ( "zzman/clients/jd" "zzman/model" u "zzman/util" + + "gorm.io/gorm" ) type UpdateArgs struct { @@ -14,7 +16,6 @@ type UpdateArgs struct { } func Update(args ...UpdateArgs) error { - var tx = model.DB var arg UpdateArgs if len(args) > 0 { arg = args[0] @@ -22,6 +23,12 @@ func Update(args ...UpdateArgs) error { arg.Mock = false } + return model.DB.Transaction(func(tx *gorm.DB) error { + return update(tx, arg) + }) +} + +func update(tx *gorm.DB, arg UpdateArgs) error { gateways, err := FindGateways(tx) if err != nil { return fmt.Errorf("获取所有网关失败:%w", err) @@ -56,7 +63,7 @@ func Update(args ...UpdateArgs) error { // 按城市循环,对比并更新网关配置 for iCity, city := range cities { - // 如果每个网关在此城市都有节点且无需改变,就不需要从云端拉取城市节点信息 + // 如果每个网关在此城市都有节点且无需改变,就不需要重新分配节点 // 相当于直接重新提交配置,此流程下配置更新是幂等的 var gateways2Update []model.Gateway for _, gateway := range gateways { @@ -77,32 +84,29 @@ func Update(args ...UpdateArgs) error { continue } - // 否则,正常从云端拉取城市节点信息 + // 否则获取足量新节点 offset := city.Offset count := len(gateways2Update) + if count > city.EdgesCount { slog.Warn("城市节点数量不足,跳过本次更新", "城市", city.Name, "节点数", city.EdgesCount, "网关数", count) continue } - if offset+count > city.EdgesCount { - slog.Debug("城市节点不足,将循环使用节点", "城市", city.Name, "节点数", city.EdgesCount, "网关数", count) - offset = 0 - } - - edges, err := SliceActiveEdgesByCity(tx, city.Id, offset, count) + edges, err := SliceActiveEdges(tx, city.Id, offset, count) if err != nil { return fmt.Errorf("查询城市 %s 可用节点失败:%w", city.Name, err) } if len(edges) < count { slog.Debug("城市节点不足,将循环使用节点", "城市", city.Name, "节点数", city.EdgesCount, "网关数", count) - edges, err = SliceActiveEdgesByCity(tx, city.Id, 0, count) + edges, err = SliceActiveEdges(tx, city.Id, 0, count) if err != nil { return fmt.Errorf("查询城市 %s 可用节点失败:%w", city.Name, err) } - offset = 0 } + offset = edges[len(edges)-1].Id + // 更新网关配置 var configs2Create []model.Config var configs2Update []model.ConfigUpdate @@ -113,7 +117,7 @@ func Update(args ...UpdateArgs) error { newConfig := edges[iGateway] if exists { - fmt.Printf("\t网关 %s 变更节点: %s -> %s\n", gateway.Macaddr, oldConfig.Macaddr, newConfig.Macaddr) + slog.Debug("网关配置变更", "网关", gateway.Macaddr, "旧节点", oldConfig.Macaddr, "新节点", newConfig.Macaddr) configs2Update = append(configs2Update, model.ConfigUpdate{ Id: oldConfig.Id, @@ -129,7 +133,7 @@ func Update(args ...UpdateArgs) error { }, } } else { - fmt.Printf("\t网关 %s 新增节点: %s\n", gateway.Macaddr, newConfig.Macaddr) + slog.Debug("网关配置新增", "网关", gateway.Macaddr, "新节点", newConfig.Macaddr) configs2Create = append(configs2Create, model.Config{ Cityhash: city.Hash, @@ -160,7 +164,7 @@ func Update(args ...UpdateArgs) error { if err != nil { return fmt.Errorf("更新配置失败:%w", err) } - err = AppendCityOffset(tx, city.Id, offset+count) + err = UpdateCityOffset(tx, city.Id, offset) if err != nil { return fmt.Errorf("更新城市 %s 的偏移量失败:%w", city.Name, err) } @@ -180,11 +184,11 @@ func Update(args ...UpdateArgs) error { setup++ } } - fmt.Printf("提交网关 %s 配置: %d 变更, %d 新增\n", gateway.Macaddr, change, setup) + slog.Info("提交网关配置", "网关", gateway.Macaddr, "变更", change, "新增", setup) // 提交配置到云端:配置版本 gateway.ConfigVersion if arg.Mock { - fmt.Printf("[MOCK] 配置网关 %s:\n%v\n", gateway.Macaddr, edges) + slog.Info("[MOCK] 配置网关", "网关", gateway.Macaddr, "配置", edges) } else { err := jd.GatewayConfigSet(gateway.ConfigVersion, gateway.Macaddr, edges) if err != nil { diff --git a/clients/redis.go b/clients/redis.go index c9da4e0..2024195 100644 --- a/clients/redis.go +++ b/clients/redis.go @@ -1,9 +1,10 @@ package clients import ( - redis "github.com/redis/go-redis/v9" "log/slog" "os" + + redis "github.com/redis/go-redis/v9" ) var Redis *redis.Client @@ -29,3 +30,12 @@ func InitRedis() { Password: password, }) } + +func CloseRedis() { + if Redis != nil { + err := Redis.Close() + if err != nil { + slog.Error("关闭 Redis 连接失败", "error", err) + } + } +} diff --git a/build.ps1 b/deploy.ps1 similarity index 50% rename from build.ps1 rename to deploy.ps1 index 35c3833..229d67d 100644 --- a/build.ps1 +++ b/deploy.ps1 @@ -1 +1,3 @@ -$env:GOOS="linux"; $env:GOARCH="amd64"; $env:CGO_ENABLED=0; go build -o dist/zz main.go \ No newline at end of file +$env:GOOS="linux"; $env:GOARCH="amd64"; $env:CGO_ENABLED=0; go build -o dist/zz main.go + +scp ./dist/zz wyk@43.226.58.254:~/zzcli \ No newline at end of file diff --git a/go.mod b/go.mod index 8a523de..ce0576c 100644 --- a/go.mod +++ b/go.mod @@ -16,5 +16,7 @@ require ( github.com/go-sql-driver/mysql v1.9.3 // indirect github.com/jinzhu/inflection v1.0.0 // indirect github.com/jinzhu/now v1.1.5 // indirect + github.com/systemd/slog-journal v0.1.0 // indirect + golang.org/x/sys v0.29.0 // indirect golang.org/x/text v0.27.0 // indirect ) diff --git a/go.sum b/go.sum index 6fcb52e..d746105 100644 --- a/go.sum +++ b/go.sum @@ -18,6 +18,10 @@ github.com/joho/godotenv v1.5.1 h1:7eLL/+HRGLY0ldzfGMeQkb7vMd0as4CfYvUVzLqw0N0= github.com/joho/godotenv v1.5.1/go.mod h1:f4LDr5Voq0i2e/R5DDNOoa2zzDfwtkZa6DnEwAbqwq4= github.com/redis/go-redis/v9 v9.11.0 h1:E3S08Gl/nJNn5vkxd2i78wZxWAPNZgUNTp8WIJUAiIs= github.com/redis/go-redis/v9 v9.11.0/go.mod h1:huWgSWd8mW6+m0VPhJjSSQ+d6Nh1VICQ6Q5lHuCH/Iw= +github.com/systemd/slog-journal v0.1.0 h1:/SdZsEp21ZzFWS3w6rcmiFCkaVKX3dt8W8mwcbNPdrM= +github.com/systemd/slog-journal v0.1.0/go.mod h1:RroeO1jghpRimGgddhGx+TS/CmRJdPjK9g2ZeYLr9P8= +golang.org/x/sys v0.29.0 h1:TPYlXGxvx1MGTn2GiZDhnjPA9wZzZeGKHHmKhHYvgaU= +golang.org/x/sys v0.29.0/go.mod h1:/VUhepiaJMQUp4+oa/7Zr1D23ma6VTLIYjOOTFZPUcA= golang.org/x/text v0.27.0 h1:4fGWRpyh641NLlecmyl4LOe6yDdfaYNrGb2zdfo4JV4= golang.org/x/text v0.27.0/go.mod h1:1D28KMCvyooCX9hBiosv5Tz/+YLxj0j7XhWjpSUF7CU= gorm.io/driver/mysql v1.6.0 h1:eNbLmNTpPpTOVZi8MMxCi2aaIm0ZpInbORNXDwyLGvg= diff --git a/main.go b/main.go index fcfd043..f528f7c 100644 --- a/main.go +++ b/main.go @@ -6,11 +6,13 @@ import ( "os" "path/filepath" "slices" + "strings" "zzman/actions" "zzman/clients" "zzman/model" "github.com/joho/godotenv" + slogjournal "github.com/systemd/slog-journal" ) func main() { @@ -29,8 +31,22 @@ func main() { slog.Error(fmt.Errorf("初始化变量失败:%w", err).Error()) } + // 初始化日志 + handler, err := slogjournal.NewHandler(&slogjournal.Options{ + ReplaceAttr: func(groups []string, a slog.Attr) slog.Attr { + a.Key = strings.ToUpper(a.Key) + a.Key = strings.ReplaceAll(a.Key, "-", "_") + return a + }, + }) + slog.SetDefault(slog.New(handler)) + + // 初始化数据库和 Redis model.Init() + defer model.Close() + clients.InitRedis() + defer clients.CloseRedis() // 执行命令 if len(os.Args) < 2 { diff --git a/model/common.go b/model/common.go index 81b4b2d..a94c3c8 100644 --- a/model/common.go +++ b/model/common.go @@ -2,10 +2,11 @@ package model import ( "fmt" - "gorm.io/driver/mysql" - "gorm.io/gorm" "os" "strconv" + + "gorm.io/driver/mysql" + "gorm.io/gorm" ) var DB *gorm.DB @@ -53,3 +54,13 @@ func Init() { panic(err) } } + +func Close() { + sqlDB, err := DB.DB() + if err != nil { + panic(fmt.Sprintf("Failed to get database instance: %s", err)) + } + if err := sqlDB.Close(); err != nil { + panic(fmt.Sprintf("Failed to close database connection: %s", err)) + } +}