diff --git a/actions/edges.go b/actions/edges.go index 937d9a9..fbce462 100644 --- a/actions/edges.go +++ b/actions/edges.go @@ -5,7 +5,6 @@ import ( "zzman/model" "gorm.io/gorm" - "gorm.io/gorm/clause" ) const batchSize = 1000 @@ -34,19 +33,90 @@ func SaveEdges(tx *gorm.DB, edges []model.Edge) error { } // 分批处理边缘设备数据 - for i := 0; i < len(edges); i += batchSize { - end := min(i+batchSize, len(edges)) + // for i := 0; i < len(edges); i += batchSize { + // end := min(i+batchSize, len(edges)) - batch := edges[i:end] - err := tx.Clauses(clause.OnConflict{ - Columns: []clause.Column{{Name: "macaddr"}}, - UpdateAll: true, - }).Create(&batch).Error + // batch := edges[i:end] + // err := tx.Clauses(clause.OnConflict{ + // Columns: []clause.Column{{Name: "macaddr"}}, + // UpdateAll: true, + // }).Create(&batch).Error - if err != nil { - return fmt.Errorf("failed to save edges batch %d-%d: %w", i, end-1, err) + // if err != nil { + // return fmt.Errorf("failed to save edges batch %d-%d: %w", i, end-1, err) + // } + // } + + // 手动区分需要创建和更新的节点,以避免 on conflict 更新的 id 占用问题 + macAddrs := make([]string, len(edges)) + for i, e := range edges { + macAddrs[i] = e.Macaddr + } + + var existingEdges []model.Edge + if err := tx.Select("id", "macaddr").Where("macaddr IN ?", macAddrs).Find(&existingEdges).Error; err != nil { + return fmt.Errorf("failed to find existing edges: %w", err) + } + existingEdgesMap := make(map[string]model.Edge) + for _, e := range existingEdges { + existingEdgesMap[e.Macaddr] = e + } + + var toCreate, toUpdate []model.Edge + for _, edge := range edges { + if existing, ok := existingEdgesMap[edge.Macaddr]; ok { + edge.Id = existing.Id + toUpdate = append(toUpdate, edge) + } else { + toCreate = append(toCreate, edge) } } + + // 批量创建 + if len(toCreate) > 0 { + if err := tx.CreateInBatches(toCreate, batchSize).Error; err != nil { + return fmt.Errorf("failed to create edge: %w", err) + } + } + + // 批量更新(使用临时表避免性能问题) + if len(toUpdate) > 0 { + // 使用临时表进行批量更新 + tempTableName := "temp_edge_update" + if err := tx.Exec(fmt.Sprintf("CREATE TEMPORARY TABLE %s LIKE edge", tempTableName)).Error; err != nil { + return fmt.Errorf("failed to create temporary table: %w", err) + } + + // 将待更新数据插入临时表 + if err := tx.Table(tempTableName).CreateInBatches(toUpdate, batchSize).Error; err != nil { + return fmt.Errorf("failed to insert into temporary table: %w", err) + } + + // 使用 JOIN 更新主表 + updateQuery := fmt.Sprintf(` + UPDATE edge e + JOIN %s te ON e.id = te.id + SET + e.macaddr = te.macaddr, + e.public = te.public, + e.isp = te.isp, + e.single = te.single, + e.sole = te.sole, + e.arch = te.arch, + e.online = te.online, + e.city_id = te.city_id, + e.active = te.active + `, tempTableName) + if err := tx.Exec(updateQuery).Error; err != nil { + return fmt.Errorf("failed to batch update edges from temporary table: %w", err) + } + + // 删除临时表 + if err := tx.Exec(fmt.Sprintf("DROP TEMPORARY TABLE %s", tempTableName)).Error; err != nil { + fmt.Printf("warning: failed to drop temporary table %s: %v\n", tempTableName, err) + } + } + return nil } diff --git a/cmd/test.go b/cmd/test.go index ba705ff..bd106b6 100644 --- a/cmd/test.go +++ b/cmd/test.go @@ -26,7 +26,9 @@ func main() { defer clients.CloseRedis() // 测试功能 - actions.Update(actions.UpdateArgs{ - Mock: true, - }) + // actions.Update(actions.UpdateArgs{ + // Mock: true, + // }) + + actions.Sync() }