使用临时表 save 以解决 on conflict 造成的 id 占用问题
This commit is contained in:
@@ -5,7 +5,6 @@ import (
|
||||
"zzman/model"
|
||||
|
||||
"gorm.io/gorm"
|
||||
"gorm.io/gorm/clause"
|
||||
)
|
||||
|
||||
const batchSize = 1000
|
||||
@@ -34,19 +33,90 @@ func SaveEdges(tx *gorm.DB, edges []model.Edge) error {
|
||||
}
|
||||
|
||||
// 分批处理边缘设备数据
|
||||
for i := 0; i < len(edges); i += batchSize {
|
||||
end := min(i+batchSize, len(edges))
|
||||
// for i := 0; i < len(edges); i += batchSize {
|
||||
// end := min(i+batchSize, len(edges))
|
||||
|
||||
batch := edges[i:end]
|
||||
err := tx.Clauses(clause.OnConflict{
|
||||
Columns: []clause.Column{{Name: "macaddr"}},
|
||||
UpdateAll: true,
|
||||
}).Create(&batch).Error
|
||||
// batch := edges[i:end]
|
||||
// err := tx.Clauses(clause.OnConflict{
|
||||
// Columns: []clause.Column{{Name: "macaddr"}},
|
||||
// UpdateAll: true,
|
||||
// }).Create(&batch).Error
|
||||
|
||||
if err != nil {
|
||||
return fmt.Errorf("failed to save edges batch %d-%d: %w", i, end-1, err)
|
||||
// if err != nil {
|
||||
// return fmt.Errorf("failed to save edges batch %d-%d: %w", i, end-1, err)
|
||||
// }
|
||||
// }
|
||||
|
||||
// 手动区分需要创建和更新的节点,以避免 on conflict 更新的 id 占用问题
|
||||
macAddrs := make([]string, len(edges))
|
||||
for i, e := range edges {
|
||||
macAddrs[i] = e.Macaddr
|
||||
}
|
||||
|
||||
var existingEdges []model.Edge
|
||||
if err := tx.Select("id", "macaddr").Where("macaddr IN ?", macAddrs).Find(&existingEdges).Error; err != nil {
|
||||
return fmt.Errorf("failed to find existing edges: %w", err)
|
||||
}
|
||||
existingEdgesMap := make(map[string]model.Edge)
|
||||
for _, e := range existingEdges {
|
||||
existingEdgesMap[e.Macaddr] = e
|
||||
}
|
||||
|
||||
var toCreate, toUpdate []model.Edge
|
||||
for _, edge := range edges {
|
||||
if existing, ok := existingEdgesMap[edge.Macaddr]; ok {
|
||||
edge.Id = existing.Id
|
||||
toUpdate = append(toUpdate, edge)
|
||||
} else {
|
||||
toCreate = append(toCreate, edge)
|
||||
}
|
||||
}
|
||||
|
||||
// 批量创建
|
||||
if len(toCreate) > 0 {
|
||||
if err := tx.CreateInBatches(toCreate, batchSize).Error; err != nil {
|
||||
return fmt.Errorf("failed to create edge: %w", err)
|
||||
}
|
||||
}
|
||||
|
||||
// 批量更新(使用临时表避免性能问题)
|
||||
if len(toUpdate) > 0 {
|
||||
// 使用临时表进行批量更新
|
||||
tempTableName := "temp_edge_update"
|
||||
if err := tx.Exec(fmt.Sprintf("CREATE TEMPORARY TABLE %s LIKE edge", tempTableName)).Error; err != nil {
|
||||
return fmt.Errorf("failed to create temporary table: %w", err)
|
||||
}
|
||||
|
||||
// 将待更新数据插入临时表
|
||||
if err := tx.Table(tempTableName).CreateInBatches(toUpdate, batchSize).Error; err != nil {
|
||||
return fmt.Errorf("failed to insert into temporary table: %w", err)
|
||||
}
|
||||
|
||||
// 使用 JOIN 更新主表
|
||||
updateQuery := fmt.Sprintf(`
|
||||
UPDATE edge e
|
||||
JOIN %s te ON e.id = te.id
|
||||
SET
|
||||
e.macaddr = te.macaddr,
|
||||
e.public = te.public,
|
||||
e.isp = te.isp,
|
||||
e.single = te.single,
|
||||
e.sole = te.sole,
|
||||
e.arch = te.arch,
|
||||
e.online = te.online,
|
||||
e.city_id = te.city_id,
|
||||
e.active = te.active
|
||||
`, tempTableName)
|
||||
if err := tx.Exec(updateQuery).Error; err != nil {
|
||||
return fmt.Errorf("failed to batch update edges from temporary table: %w", err)
|
||||
}
|
||||
|
||||
// 删除临时表
|
||||
if err := tx.Exec(fmt.Sprintf("DROP TEMPORARY TABLE %s", tempTableName)).Error; err != nil {
|
||||
fmt.Printf("warning: failed to drop temporary table %s: %v\n", tempTableName, err)
|
||||
}
|
||||
}
|
||||
|
||||
return nil
|
||||
}
|
||||
|
||||
|
||||
@@ -26,7 +26,9 @@ func main() {
|
||||
defer clients.CloseRedis()
|
||||
|
||||
// 测试功能
|
||||
actions.Update(actions.UpdateArgs{
|
||||
Mock: true,
|
||||
})
|
||||
// actions.Update(actions.UpdateArgs{
|
||||
// Mock: true,
|
||||
// })
|
||||
|
||||
actions.Sync()
|
||||
}
|
||||
|
||||
Reference in New Issue
Block a user