Implement replace data files action#2106
Conversation
eaaabd3 to
588c3e6
Compare
|
Hi @Shekharrajak , thanks for the contribution! However, even excluding the datafusion integration, I don't think just the transaction action alone is going to work as we wanted. We will have to add validation and delete processing. Here is a stale draft that I haven't got a chance to work on, and I'm planning to pick it up soon |
Yes, the plan is to use it in datafusion comet, this would be the flow : Spark CALL system.rewrite_data_files('table') -> Spark Iceberg (Plan compaction) -> Comet Native (Execute in Rust) -> iceberg-rust (Read old files using ArrowReader, Write new files using ParquetWriter, Commit replace using ReplaceDataFilesAction) |
Adds ReplaceDataFilesAction to Transaction for atomic replace operations (compaction/rewrite). Implements Operation::Replace end-to-end: snapshot summary accounting, manifest writing, and transaction action. Ports and extends the approach from apache/iceberg-rust PR apache#2106, incorporating the delete-manifest bug fix from PR apache#2149. Changes: - crates/iceberg/src/transaction/replace_data_files.rs (new) - ReplaceDataFilesAction with delete_files(), add_files(), set_commit_uuid(), set_key_metadata(), set_snapshot_properties() builders - Validates no duplicate paths in files_to_delete - Calls validate_duplicate_files() to reject files already alive in snapshot - Validates all files_to_delete paths exist in current snapshot (phantom delete protection) - ReplaceOperation implements SnapshotProduceOperation: scans existing manifests to build Deleted entries; per-manifest filtering in existing_manifest() preserves unaffected manifests - delete_files() doc comment describes manifest-granularity constraint - Comment on existing_manifest() documents O(2N) double scan limitation - crates/iceberg/src/transaction/snapshot.rs - Added removed_data_files field and with_removed_data_files() builder - Implemented write_delete_manifest() using add_delete_entry() - Fixed latent bug: summary() looked up new snapshot ID (not yet in metadata) to find previous snapshot, causing subtract-overflow panics - Fixed stale error message ("fast append" -> neutral wording) - Added comment explaining ManifestContentType::Data in write_delete_manifest - crates/iceberg/src/spec/snapshot_summary.rs - Added Operation::Replace to update_snapshot_summaries() allowlist - Fixed unwrap() -> unwrap_or(0) on property string parsing - Fixed u64 subtraction -> saturating_sub() to prevent underflow - Removed stale #[allow(dead_code)] from all four helpers - crates/iceberg/src/spec/manifest/writer.rs - Removed #[allow(dead_code)] from add_delete_entry() - crates/iceberg/src/transaction/mod.rs - Wired replace_data_files() method on Transaction - Added apply_updates_to_table() test helper - .gitignore: added .private/ Tests: 1052 pass (cargo test -p iceberg --lib)
- fix(append): carry forward delete-only manifests after Replace (port of apache/iceberg-rust PR apache#2149): add || entry.has_deleted_files() to FastAppendOperation::existing_manifest() so delete manifests created by Replace are not silently dropped by the next FastAppend - feat(replace): add validate_from_snapshot(snapshot_id) builder to ReplaceDataFilesAction for concurrent compaction workflows; validates files_to_delete against a historical snapshot instead of current, avoiding false rejections when another writer commits between planning and committing (port of apache/iceberg-rust PR apache#2106) - feat(replace): add data_sequence_number(seq_num) builder to ReplaceDataFilesAction and SnapshotProducer; threads an explicit sequence number into added manifest entries so compacted files can retain the original sequence number when equality deletes are present (port of apache/iceberg-rust PR apache#2106) - test: four new tests covering all three changes (1056 total, all pass)
Which issue does this PR close?
What changes are included in this PR?
This PR adds
ReplaceDataFilesAction- a new transaction action for replacing data files in Iceberg tables. This is essential for compaction operations where multiple small files are merged into larger ones.In Java/Spark Iceberg: RewriteDataFilesSparkAction -> calls RewriteDataFiles API -> uses Operation.REPLACE
This Rust implementation provides the same primitive that Spark uses for:
This allows maintenance tools to safely compact files without affecting ongoing reads or
concurrent writes - a key property for production table maintenance.
The plan is to have these features available so that datafusion-comet can use it for Comet Native execution.
Spark CALL system.rewrite_data_files('table') -> Spark Iceberg (Plan compaction) -> Comet Native (Execute in Rust) -> iceberg-rust (Read old files using ArrowReader, Write new files using ParquetWriter, Commit replace using ReplaceDataFilesAction)
Note: ArrowReader and ParquetWriter is already available.
Ref https://iceberg.apache.org/docs/latest/maintenance/
Are these changes tested?
Unit tests