feat(iceberg): Enable delete files processing in snapshot producer#2367
feat(iceberg): Enable delete files processing in snapshot producer#2367CTTY wants to merge 4 commits intoapache:mainfrom
Conversation
xanderbailey
left a comment
There was a problem hiding this comment.
Nice PR! Took a pass at reviewing, let me know if it makes sense!
| let snapshot_producer = SnapshotProducer::new( | ||
| table, | ||
| self.commit_uuid.unwrap_or_else(Uuid::now_v7), | ||
| self.key_metadata.clone(), | ||
| self.snapshot_properties.clone(), | ||
| self.added_data_files.clone(), | ||
| ); | ||
| let snapshot_producer = SnapshotProducer::builder() | ||
| .with_table(table) | ||
| .with_commit_uuid(self.commit_uuid.unwrap_or_else(Uuid::now_v7)) | ||
| .with_key_metadata(self.key_metadata.clone()) | ||
| .with_snapshot_properties(self.snapshot_properties.clone()) | ||
| .with_added_data_files(self.added_data_files.clone()) | ||
| .build(); | ||
|
|
| .default_partition_spec() | ||
| .as_ref() | ||
| .partition_spec_by_id(spec_id) | ||
| .ok_or(Error::new( |
There was a problem hiding this comment.
ok_or will allocate the error on the success path, ok_or_else might be better?
| /// Reference to [`Snapshot`]. | ||
| pub type SnapshotRef = Arc<Snapshot>; | ||
| #[derive(Debug, Default, Serialize, Deserialize, PartialEq, Eq, Clone)] | ||
| #[derive(Debug, Default, Serialize, Deserialize, PartialEq, Eq, Clone, Hash)] |
There was a problem hiding this comment.
Why does this need to be Hash now?
|
|
||
| summary_collector.set_partition_summary_limit(partition_summary_limit); | ||
|
|
||
| for data_file in &self.added_data_files { |
There was a problem hiding this comment.
Do we need to include these changes in the summary here?
| if added_files.is_empty() { | ||
| return Err(Error::new( | ||
| ErrorKind::PreconditionFailed, | ||
| "No added data files found when write an added manifest file", |
There was a problem hiding this comment.
Does this need to be updated to added data / delete files?
| async fn existing_manifest( | ||
| &self, | ||
| snapshot_produce: &SnapshotProducer<'_>, | ||
| snapshot_produce: &mut SnapshotProducer<'_>, |
There was a problem hiding this comment.
Does this need to be mut?
| if deleted_entries.is_empty() { | ||
| Ok(Vec::new()) | ||
| } else { |
There was a problem hiding this comment.
nit: can we early return here to avoid the nesting?
| if deleted_entries.is_empty() { | |
| Ok(Vec::new()) | |
| } else { | |
| if deleted_entries.is_empty() { | |
| return Ok(Vec::new()) | |
| } |
| } | ||
| } | ||
|
|
||
| pub(crate) fn validate_added_data_files(&self) -> Result<()> { |
There was a problem hiding this comment.
Java has a validateDeleteFileForVersion which might be nice to have the same? https://github.com/apache/iceberg/blob/0f657edf12dc29f8487a679bfdd4210e9588d014/core/src/main/java/org/apache/iceberg/MergingSnapshotProducer.java#L295-L316
Which issue does this PR close?
What changes are included in this PR?
Are these changes tested?
Added uts in snapshot.rs, we should add end-to-end tests when actual rewrite/delete actions are supported