feat(transaction): add ReplaceDataFilesAction for table compaction#1
Conversation
4c8e1b3 to
198b4da
Compare
70994c7 to
2e3ccad
Compare
|
This needs to be a PR into perpetual-main not into main. We keep a separate main so we can periodically sync main to upstream main. |
|
This is the claude summary of where else we might find PR's that add replace files into the library. CTTY is the main shot caller in that community. So, Claude's suggestion there is probably a good one. But you already did the work. So. Instead, I asked how hard it would be to port the compactor to use CTTY's interface once that ships, and got this answer: So. I'm good there. In that case, I think we should just take the advice to steal hints from the two ways that risingwave's patches are better: This one: And this one: Manifest merging will come next (as will expire snapshots). All three things are important. Compaction is just the one that is failing in trino and urgently in need of getting right. |
| @@ -0,0 +1,449 @@ | |||
| // Licensed to the Apache Software Foundation (ASF) under one | |||
There was a problem hiding this comment.
This patch isn't mergeable with compact here. Needs to move to simba.
| Ok(()) | ||
| } | ||
|
|
||
| fn before_list_element(&mut self, field: &Field) -> Result<()> { |
There was a problem hiding this comment.
I would be hesitant to change anything in this file. Arrow schema seems pretty central.
| /// | ||
| /// # TODO | ||
| /// Remove this allow later | ||
| #[allow(dead_code)] |
There was a problem hiding this comment.
Let's try to keep the patch non-intrusive. The goal isn't just to make things work. The goal is to write a narrow patch that adds just what we need such that we can keep rebasing our patch onto their main until something they write actually ships and we can use it instead.
|
|
||
| #[allow(dead_code)] | ||
| fn get_prop(previous_summary: &Summary, prop: &str) -> Result<i32> { | ||
| fn get_prop(previous_summary: &Summary, prop: &str) -> Result<i64> { |
There was a problem hiding this comment.
The point being that snapshots can be pretty big? And i32 will overflows? Seems like that needs to be a patch in the stack that we maintain independently of rewrite.
| )?]) | ||
| return Ok(vec![]); | ||
| } | ||
|
|
There was a problem hiding this comment.
Tad bit scared to touch this file. Do you know why this edit is needed?
|
API inconsistencies with the other actions in this repo: |
| // Partition-aware fanout: split each batch by partition key and route rows to a | ||
| // dedicated writer per partition. This is required for correctness — a single | ||
| // RollingFileWriter fed interleaved multi-partition rows would violate the Iceberg | ||
| // invariant that every data file belongs to exactly one partition. |
There was a problem hiding this comment.
My biggest worry is this part of the approach. But I think we have a plan for that. Schedule per-partition more frequently, run the cron over everything less frequently?
| let snapshot = current_table | ||
| .metadata() | ||
| .current_snapshot() | ||
| .expect("table has no snapshots — nothing to compact"); |
There was a problem hiding this comment.
I think we probably only want to compact when we notice small files. 🤔 Otherwise this will read and write every file when it fires up. Even the old partitions that were already compacted and fine!
30f08d8 to
31b32b3
Compare
6843d8c to
5c31ce0
Compare
derrley
left a comment
There was a problem hiding this comment.
Overall solid implementation — good validation, thorough tests, and the v2 equality-delete sequence number handling is correct. One blocking issue around data safety in existing_manifest().
| } | ||
| } | ||
|
|
||
| #[cfg(test)] |
There was a problem hiding this comment.
Bug: manifest-granularity exclusion can silently drop data files.
This drops the entire manifest if any file in it is being deleted. Files that are NOT being deleted but share a manifest with deleted files get silently excluded from the new snapshot — that's data loss.
Example: a manifest contains files A, B, C. Caller deletes only A. This code drops the entire manifest, so B and C vanish from the snapshot with no error.
This should be an error — if a manifest contains a mix of files-to-delete and files-to-keep, reject the operation rather than silently losing data. Something like:
let dominated = manifest.entries().iter().all(|e| {
!e.is_alive() || self.files_to_delete.contains(e.file_path())
});
if has_file_to_delete && !dominated {
return Err(Error::new(
ErrorKind::DataInvalid,
format!(
"Manifest contains files to delete AND files to keep. \
All alive files in a manifest must be included in files_to_delete \
or the manifest must not contain any files to delete. \
Manifest: {}",
entry.manifest_path()
),
));
}Alternatively (and probably better for Phase 2): rewrite the manifest to keep the surviving entries. But until then, an error is strictly better than silent data loss.
| // NOTE: `commit` takes `Arc<Self>` and cannot move out of fields, so | ||
| // `files_to_add` and `files_to_delete` must be cloned here. For large | ||
| // compaction jobs with many DataFiles this is non-trivial overhead. | ||
| // Tracked as a known Phase 1 limitation; revisit if memory becomes an issue. |
There was a problem hiding this comment.
Part of the trait, unfort. Kind of stuck with it.
Adds ReplaceDataFilesAction -- an atomic transaction action that replaces a set of input data files with compacted outputs in a single Iceberg snapshot (Operation::Replace). This is the core primitive needed for compaction workflows. Builder API: - delete_files() / add_files() -- input and output file sets - validate_from_snapshot(id) -- validate against a planning snapshot to avoid false rejections from concurrent writers between planning and committing - data_sequence_number(seq_num) -- pin added entries to an explicit sequence number for v2 equality-delete correctness (compacted files that predate an equality delete must retain the original seq number) - set_commit_uuid() / set_key_metadata() / set_snapshot_properties() Validation: no duplicate paths in files_to_delete; all deleted files must exist as alive entries (phantom-delete protection); no added files may already exist alive (duplicate protection). Mixed-manifest safety: existing_manifest() previously silently dropped entire manifests when any file in them was being deleted, discarding non-deleted files in the same manifest. Fixed with dual .any() calls that classify each manifest into three states: no deletions (carry forward), all alive entries deleted (exclude), mixed (Err DataInvalid). Phase 2 will replace the error with transparent rewriting via add_existing_entry. New tests: test_replace_mixed_manifest_errors, test_replace_full_manifest_delete_no_regression, test_replace_single_file_manifests_no_false_positive. Supporting changes: snapshot.rs: removed_data_files field and write_delete_manifest() on SnapshotProducer; fixed summary() snapshot lookup (new snapshot ID does not exist yet, causing subtract-overflow panics on file counts) snapshot_summary.rs: Operation::Replace added to update_snapshot_summaries() append.rs: preserve delete-only manifests from prior Replace operations (regression guard for apache/iceberg-rust PR 2149) scan/mod.rs: scan plan is now pure async streams rather than channels, eliminating a class of deadlock; added incremental scan validation scan/task.rs: FileScanTask exposes column_sizes and split_offsets parquet_writer.rs: use HEAD stat for authoritative file_size_in_bytes datafusion/expr_to_predicate.rs: timestamp filter pushdown to Iceberg predicates (previously caused full table scans on time-based queries) datafusion/table/mod.rs: IcebergTableProvider::try_new is now pub storage/opendal/src/lib.rs: normalize s3a:// paths to s3:// compact.rs (new): end-to-end compaction example using ReplaceDataFilesAction
23fdc81 to
60a93bf
Compare
Summary
Adds
ReplaceDataFilesAction-- an atomic transaction action that replaces a set of input data files with compacted output files in a single Iceberg snapshot. This is the core primitive needed for compaction workflows.Also includes several supporting improvements: timestamp predicate pushdown, scan plan deadlock fix,
IcebergTableProvider::try_newvisibility, and s3a path normalization.Changes
crates/iceberg/src/transaction/replace_data_files.rs(new)ReplaceDataFilesActionwithdelete_files(),add_files(),set_commit_uuid(),set_key_metadata(),set_snapshot_properties(),validate_from_snapshot(), anddata_sequence_number()buildersfiles_to_delete; validates all deleted files exist as alive entries (phantom delete protection); validates no added files already exist alivevalidate_from_snapshot(snapshot_id)enables correct concurrent compaction workflows by validating against a historical planning snapshotdata_sequence_number(seq_num)sets an explicit data sequence number on added entries for v2 equality-delete correctnessexisting_manifest()uses dual.any()checks to classify each manifest: no deletions (carry forward), all alive entries deleted (exclude), mixed (returnDataInvalid). This replaces the previous behavior of silently dropping surviving files from mixed manifests. Phase 2 will rewrite mixed manifests transparently usingManifestWriter::add_existing_entry.crates/iceberg/src/transaction/snapshot.rsremoved_data_filesfield andwith_removed_data_files()builder onSnapshotProducerwrite_delete_manifest()usingadd_delete_entry()to writeDeleted-status manifest entriessummary()snapshot lookup (was using new snapshot ID which does not exist yet, causing subtract-overflow panics)crates/iceberg/src/spec/snapshot_summary.rsOperation::Replacetoupdate_snapshot_summaries()unwrap()tounwrap_or(0)on parse calls;u64subtraction tosaturating_sub()crates/iceberg/src/transaction/append.rsexisting_manifest()now checksentry.has_deleted_files()(regression guard for apache/iceberg-rust fix(transaction): preserve delete-only manifests in fast_append apache/iceberg-rust#2149)crates/iceberg/src/spec/manifest/writer.rsadd_delete_entry()is now called (removed dead-code guard)crates/iceberg/src/writer/file_writer/parquet_writer.rsfile_size_in_bytesinstead of a byte-count estimatecrates/iceberg/src/scan/mod.rs+context.rscrates/iceberg/src/scan/task.rsFileScanTaskexposescolumn_sizesandsplit_offsetscrates/integrations/datafusion/src/physical_plan/expr_to_predicate.rscrates/integrations/datafusion/src/table/mod.rsIcebergTableProvider::try_newis nowpubcrates/storage/opendal/src/lib.rss3a://paths tos3://crates/examples/src/compact.rs(new)RollingFileWriter, commits viaReplaceDataFilesActionCATALOG_URI,CATALOG_WAREHOUSE,TABLE_NAMESPACE,TABLE_NAME,S3_REGIONTesting
Unit tests in
replace_data_files.rs:test_replace_requires_files_to_addtest_replace_add_file_already_in_snapshot_errorstest_replace_phantom_delete_errorstest_replace_duplicate_paths_in_delete_errorstest_replace_action_full_table-- 2 files to 1; verifies Deleted entries and Replace snapshot summary totalstest_replace_action_partial-- replaces 2 of 3 files across separate manifests; verifies unaffected file survivestest_replace_then_fast_append_preserves_delete_manifesttest_replace_validate_from_snapshot_succeedstest_replace_validate_from_snapshot_unknown_id_errorstest_replace_data_sequence_number_is_set_on_entriestest_replace_mixed_manifest_errors-- A+B in one manifest, delete only A -> DataInvalidtest_replace_full_manifest_delete_no_regression-- A+B in one manifest, delete both -> successtest_replace_single_file_manifests_no_false_positive-- A and B in separate manifests, delete A -> success, B survives