diff --git a/crates/iceberg/src/arrow/reader/mod.rs b/crates/iceberg/src/arrow/reader/mod.rs index c6c41accb7..429559fa3c 100644 --- a/crates/iceberg/src/arrow/reader/mod.rs +++ b/crates/iceberg/src/arrow/reader/mod.rs @@ -52,6 +52,7 @@ pub struct ArrowReaderBuilder { concurrency_limit_data_files: usize, row_group_filtering_enabled: bool, row_selection_enabled: bool, + bloom_filter_enabled: bool, parquet_read_options: ParquetReadOptions, } @@ -66,6 +67,7 @@ impl ArrowReaderBuilder { concurrency_limit_data_files: num_cpus, row_group_filtering_enabled: true, row_selection_enabled: false, + bloom_filter_enabled: false, parquet_read_options: ParquetReadOptions::builder().build(), } } @@ -95,6 +97,20 @@ impl ArrowReaderBuilder { self } + /// Determines whether to enable bloom filter-based row group filtering. + /// + /// When enabled, if a read is performed with an equality or IN predicate, + /// the bloom filter for relevant columns in each row group is read and + /// checked. Row groups where the bloom filter proves the value is absent + /// are skipped entirely. + /// + /// Defaults to disabled, as reading bloom filters requires additional I/O + /// per column per row group. + pub fn with_bloom_filter_enabled(mut self, bloom_filter_enabled: bool) -> Self { + self.bloom_filter_enabled = bloom_filter_enabled; + self + } + /// Provide a hint as to the number of bytes to prefetch for parsing the Parquet metadata /// /// This hint can help reduce the number of fetch requests. For more details see the @@ -133,6 +149,7 @@ impl ArrowReaderBuilder { concurrency_limit_data_files: self.concurrency_limit_data_files, row_group_filtering_enabled: self.row_group_filtering_enabled, row_selection_enabled: self.row_selection_enabled, + bloom_filter_enabled: self.bloom_filter_enabled, parquet_read_options: self.parquet_read_options, } } @@ -150,5 +167,6 @@ pub struct ArrowReader { row_group_filtering_enabled: bool, row_selection_enabled: bool, + bloom_filter_enabled: bool, parquet_read_options: ParquetReadOptions, } diff --git a/crates/iceberg/src/arrow/reader/pipeline.rs b/crates/iceberg/src/arrow/reader/pipeline.rs index 8ecee294c4..590a5cfc43 100644 --- a/crates/iceberg/src/arrow/reader/pipeline.rs +++ b/crates/iceberg/src/arrow/reader/pipeline.rs @@ -36,6 +36,9 @@ use crate::arrow::int96::coerce_int96_timestamps; use crate::arrow::record_batch_transformer::RecordBatchTransformerBuilder; use crate::arrow::scan_metrics::{CountingFileRead, ScanMetrics, ScanResult}; use crate::error::Result; +use crate::expr::visitors::bloom_filter_evaluator::{ + BloomFilterEvaluator, collect_bloom_filter_field_ids, +}; use crate::io::{FileIO, FileMetadata, FileRead}; use crate::metadata_columns::{RESERVED_FIELD_ID_FILE, is_metadata_field}; use crate::scan::{ArrowRecordBatchStream, FileScanTask, FileScanTaskStream}; @@ -57,6 +60,7 @@ impl ArrowReader { .with_scan_metrics(scan_metrics.clone()), row_group_filtering_enabled: self.row_group_filtering_enabled, row_selection_enabled: self.row_selection_enabled, + bloom_filter_enabled: self.bloom_filter_enabled, parquet_read_options: self.parquet_read_options, scan_metrics: scan_metrics.clone(), }; @@ -98,6 +102,7 @@ struct FileScanTaskReader { delete_file_loader: CachingDeleteFileLoader, row_group_filtering_enabled: bool, row_selection_enabled: bool, + bloom_filter_enabled: bool, parquet_read_options: ParquetReadOptions, scan_metrics: ScanMetrics, } @@ -336,6 +341,30 @@ impl FileScanTaskReader { }; } + if self.bloom_filter_enabled { + let all_rgs; + let candidate_rgs = match &selected_row_group_indices { + Some(indices) => indices.as_slice(), + None => { + all_rgs = (0..record_batch_stream_builder.metadata().num_row_groups()) + .collect::>(); + &all_rgs + } + }; + + let bloom_filtered = Self::filter_row_groups_by_bloom_filter( + &predicate, + &mut record_batch_stream_builder, + candidate_rgs, + &field_id_map, + ) + .await?; + + if bloom_filtered.len() < candidate_rgs.len() { + selected_row_group_indices = Some(bloom_filtered); + } + } + if self.row_selection_enabled { row_selection = Some(ArrowReader::get_row_selection_for_filter_predicate( &predicate, @@ -395,6 +424,70 @@ impl FileScanTaskReader { Ok(Box::pin(record_batch_stream) as ArrowRecordBatchStream) } + + /// Reads bloom filters for relevant columns and evaluates the predicate + /// against them to filter out row groups that definitely don't match. + async fn filter_row_groups_by_bloom_filter( + predicate: &crate::expr::BoundPredicate, + builder: &mut ParquetRecordBatchStreamBuilder, + candidate_row_groups: &[usize], + field_id_map: &std::collections::HashMap, + ) -> Result> { + use std::collections::HashMap; + + // Only collect field IDs from eq/in predicates — the only types + // bloom filters can help with. Skip columns not in the parquet schema. + let bloom_filter_field_ids: Vec = collect_bloom_filter_field_ids(predicate)? + .into_iter() + .filter(|id| field_id_map.contains_key(id)) + .collect(); + + if bloom_filter_field_ids.is_empty() { + return Ok(candidate_row_groups.to_vec()); + } + + let mut result = Vec::with_capacity(candidate_row_groups.len()); + + for &rg_idx in candidate_row_groups { + let mut bloom_filters: HashMap< + i32, + (parquet::bloom_filter::Sbbf, parquet::basic::Type), + > = HashMap::new(); + + for &field_id in &bloom_filter_field_ids { + let col_idx = field_id_map[&field_id]; + let col_meta = builder.metadata().row_group(rg_idx).column(col_idx); + + // Only attempt to load if this column chunk actually has a bloom filter + if col_meta.bloom_filter_offset().is_none() { + continue; + } + + let physical_type = col_meta.column_type(); + + match builder + .get_row_group_column_bloom_filter(rg_idx, col_idx) + .await + { + Ok(Some(sbbf)) => { + bloom_filters.insert(field_id, (sbbf, physical_type)); + } + Ok(None) => {} + Err(_) => { + // If we can't read the bloom filter, conservatively include the row group + } + } + } + + match BloomFilterEvaluator::eval(predicate, &bloom_filters) { + Ok(true) => result.push(rg_idx), + Ok(false) => { /* Row group pruned by bloom filter */ } + Err(_) => result.push(rg_idx), // On error, conservatively include + } + } + + Ok(result) + } } impl ArrowReader { diff --git a/crates/iceberg/src/arrow/reader/row_filter.rs b/crates/iceberg/src/arrow/reader/row_filter.rs index 80432a0437..49cbbbf74e 100644 --- a/crates/iceberg/src/arrow/reader/row_filter.rs +++ b/crates/iceberg/src/arrow/reader/row_filter.rs @@ -196,7 +196,7 @@ mod tests { use std::sync::Arc; use arrow_array::cast::AsArray; - use arrow_array::{ArrayRef, LargeStringArray, RecordBatch, StringArray}; + use arrow_array::{ArrayRef, Int32Array, LargeStringArray, RecordBatch, StringArray}; use arrow_schema::{DataType, Field, Schema as ArrowSchema}; use futures::TryStreamExt; use parquet::arrow::{ArrowWriter, PARQUET_FIELD_ID_META_KEY}; @@ -616,4 +616,179 @@ mod tests { assert_eq!(first_val, 100, "Task 2 should start with id=100, not id=0"); } } + + /// Tests that bloom filter pushdown correctly prunes row groups. + #[tokio::test] + async fn test_bloom_filter_pushdown_prunes_row_groups() { + let schema = Arc::new( + Schema::builder() + .with_schema_id(1) + .with_fields(vec![ + NestedField::required(1, "id", Type::Primitive(PrimitiveType::Int)).into(), + ]) + .build() + .unwrap(), + ); + + let arrow_schema = Arc::new(ArrowSchema::new(vec![ + Field::new("id", DataType::Int32, false).with_metadata(HashMap::from([( + PARQUET_FIELD_ID_META_KEY.to_string(), + "1".to_string(), + )])), + ])); + + let tmp_dir = TempDir::new().unwrap(); + let file_path = format!("{}/bloom_test.parquet", tmp_dir.path().to_str().unwrap()); + + // Write a Parquet file with 3 row groups, each containing distinct values, + // with bloom filters enabled. + // Row group 0: ids 0..100 + // Row group 1: ids 100..200 + // Row group 2: ids 200..300 + let props = WriterProperties::builder() + .set_compression(Compression::SNAPPY) + .set_max_row_group_row_count(Some(100)) + .set_bloom_filter_enabled(true) + .build(); + + let file = File::create(&file_path).unwrap(); + let mut writer = ArrowWriter::try_new(file, arrow_schema.clone(), Some(props)).unwrap(); + + for batch_start in [0, 100, 200] { + let batch = RecordBatch::try_new(arrow_schema.clone(), vec![Arc::new( + Int32Array::from((batch_start..batch_start + 100).collect::>()), + )]) + .unwrap(); + writer.write(&batch).unwrap(); + } + writer.close().unwrap(); + + let file_io = FileIO::new_with_fs(); + + // Query for id = 150, which is only in row group 1. + // With bloom filter pushdown, row groups 0 and 2 should be pruned. + let predicate = Reference::new("id").equal_to(Datum::int(150)); + + let reader = ArrowReaderBuilder::new(file_io.clone()) + .with_bloom_filter_enabled(true) + .build(); + + let tasks = Box::pin(futures::stream::iter(vec![Ok(FileScanTask { + file_size_in_bytes: std::fs::metadata(&file_path).unwrap().len(), + start: 0, + length: 0, + record_count: None, + data_file_path: file_path.clone(), + data_file_format: DataFileFormat::Parquet, + schema: schema.clone(), + project_field_ids: vec![1], + predicate: Some(predicate.bind(schema.clone(), true).unwrap()), + deletes: vec![], + partition: None, + partition_spec: None, + name_mapping: None, + case_sensitive: false, + })])) as FileScanTaskStream; + + let result = reader + .read(tasks) + .unwrap() + .stream() + .try_collect::>() + .await + .unwrap(); + + // Only row group 1 (ids 100..200) should be read. The row filter + // then further filters to just id=150. + let total_rows: usize = result.iter().map(|b| b.num_rows()).sum(); + assert_eq!(total_rows, 1, "Should find exactly one row matching id=150"); + + let id_col = result[0] + .column(0) + .as_primitive::(); + assert_eq!(id_col.value(0), 150); + } + + /// Tests that bloom filter pushdown skips all row groups when value is absent. + #[tokio::test] + async fn test_bloom_filter_pushdown_value_absent() { + let schema = Arc::new( + Schema::builder() + .with_schema_id(1) + .with_fields(vec![ + NestedField::required(1, "id", Type::Primitive(PrimitiveType::Int)).into(), + ]) + .build() + .unwrap(), + ); + + let arrow_schema = Arc::new(ArrowSchema::new(vec![ + Field::new("id", DataType::Int32, false).with_metadata(HashMap::from([( + PARQUET_FIELD_ID_META_KEY.to_string(), + "1".to_string(), + )])), + ])); + + let tmp_dir = TempDir::new().unwrap(); + let file_path = format!("{}/bloom_absent.parquet", tmp_dir.path().to_str().unwrap()); + + let props = WriterProperties::builder() + .set_compression(Compression::SNAPPY) + .set_max_row_group_row_count(Some(100)) + .set_bloom_filter_enabled(true) + .build(); + + let file = File::create(&file_path).unwrap(); + let mut writer = ArrowWriter::try_new(file, arrow_schema.clone(), Some(props)).unwrap(); + + for batch_start in [0, 100, 200] { + let batch = RecordBatch::try_new(arrow_schema.clone(), vec![Arc::new( + Int32Array::from((batch_start..batch_start + 100).collect::>()), + )]) + .unwrap(); + writer.write(&batch).unwrap(); + } + writer.close().unwrap(); + + let file_io = FileIO::new_with_fs(); + + // Query for id = 999, which doesn't exist in any row group. + // All row groups should be pruned by bloom filter. + let predicate = Reference::new("id").equal_to(Datum::int(999)); + + let reader = ArrowReaderBuilder::new(file_io) + .with_bloom_filter_enabled(true) + .build(); + + let tasks = Box::pin(futures::stream::iter(vec![Ok(FileScanTask { + file_size_in_bytes: std::fs::metadata(&file_path).unwrap().len(), + start: 0, + length: 0, + record_count: None, + data_file_path: file_path.clone(), + data_file_format: DataFileFormat::Parquet, + schema: schema.clone(), + project_field_ids: vec![1], + predicate: Some(predicate.bind(schema, true).unwrap()), + deletes: vec![], + partition: None, + partition_spec: None, + name_mapping: None, + case_sensitive: false, + })])) as FileScanTaskStream; + + let result = reader + .read(tasks) + .unwrap() + .stream() + .try_collect::>() + .await + .unwrap(); + + let total_rows: usize = result.iter().map(|b| b.num_rows()).sum(); + assert_eq!( + total_rows, 0, + "Should find zero rows when value is absent from all bloom filters" + ); + } } diff --git a/crates/iceberg/src/expr/visitors/bloom_filter_evaluator.rs b/crates/iceberg/src/expr/visitors/bloom_filter_evaluator.rs new file mode 100644 index 0000000000..7b94ec1690 --- /dev/null +++ b/crates/iceberg/src/expr/visitors/bloom_filter_evaluator.rs @@ -0,0 +1,968 @@ +// Licensed to the Apache Software Foundation (ASF) under one +// or more contributor license agreements. See the NOTICE file +// distributed with this work for additional information +// regarding copyright ownership. The ASF licenses this file +// to you under the Apache License, Version 2.0 (the +// "License"); you may not use this file except in compliance +// with the License. You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, +// software distributed under the License is distributed on an +// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +// KIND, either express or implied. See the License for the +// specific language governing permissions and limitations +// under the License. + +//! Evaluates predicates against Parquet bloom filters to determine whether +//! a row group can be skipped. + +use std::collections::{HashMap, HashSet}; + +use fnv::FnvHashSet; +use parquet::basic::Type as PhysicalType; +use parquet::bloom_filter::Sbbf; +use parquet::data_type::ByteArray; + +use crate::Result; +use crate::expr::visitors::bound_predicate_visitor::{BoundPredicateVisitor, visit}; +use crate::expr::{BoundPredicate, BoundReference}; +use crate::spec::decimal_utils::decimal_to_fixed_length_bytes; +use crate::spec::{Datum, PrimitiveLiteral}; + +const ROW_GROUP_MIGHT_MATCH: Result = Ok(true); +const ROW_GROUP_CANT_MATCH: Result = Ok(false); + +pub(crate) struct BloomFilterEvaluator<'a> { + /// Maps Iceberg field_id -> (bloom filter, Parquet physical type) for this row group + bloom_filters: &'a HashMap, +} + +impl<'a> BloomFilterEvaluator<'a> { + /// Evaluate the predicate against the provided bloom filters. + /// Returns `false` if the row group definitely does not match, + /// `true` if it might match. + pub(crate) fn eval( + filter: &BoundPredicate, + bloom_filters: &HashMap, + ) -> Result { + if bloom_filters.is_empty() { + return ROW_GROUP_MIGHT_MATCH; + } + + let mut evaluator = BloomFilterEvaluator { bloom_filters }; + visit(&mut evaluator, filter) + } + + fn check_datum(&self, reference: &BoundReference, datum: &Datum) -> bool { + let field_id = reference.field().id; + let Some((sbbf, physical_type)) = self.bloom_filters.get(&field_id) else { + // No bloom filter for this column — conservatively might match + return true; + }; + + check_in_bloom_filter(sbbf, datum, *physical_type) + } +} + +/// Collects field IDs that appear in `eq` or `in` predicates — the only +/// predicate types that benefit from bloom filter checks. +pub(crate) fn collect_bloom_filter_field_ids(predicate: &BoundPredicate) -> Result> { + let mut visitor = BloomFilterFieldIdCollector { + field_ids: HashSet::new(), + }; + visit(&mut visitor, predicate)?; + Ok(visitor.field_ids) +} + +struct BloomFilterFieldIdCollector { + field_ids: HashSet, +} + +impl BoundPredicateVisitor for BloomFilterFieldIdCollector { + type T = (); + + fn always_true(&mut self) -> Result<()> { + Ok(()) + } + + fn always_false(&mut self) -> Result<()> { + Ok(()) + } + + fn and(&mut self, _lhs: (), _rhs: ()) -> Result<()> { + Ok(()) + } + + fn or(&mut self, _lhs: (), _rhs: ()) -> Result<()> { + Ok(()) + } + + fn not(&mut self, _inner: ()) -> Result<()> { + Ok(()) + } + + fn is_null(&mut self, _r: &BoundReference, _p: &BoundPredicate) -> Result<()> { + Ok(()) + } + + fn not_null(&mut self, _r: &BoundReference, _p: &BoundPredicate) -> Result<()> { + Ok(()) + } + + fn is_nan(&mut self, _r: &BoundReference, _p: &BoundPredicate) -> Result<()> { + Ok(()) + } + + fn not_nan(&mut self, _r: &BoundReference, _p: &BoundPredicate) -> Result<()> { + Ok(()) + } + + fn less_than(&mut self, _r: &BoundReference, _l: &Datum, _p: &BoundPredicate) -> Result<()> { + Ok(()) + } + + fn less_than_or_eq( + &mut self, + _r: &BoundReference, + _l: &Datum, + _p: &BoundPredicate, + ) -> Result<()> { + Ok(()) + } + + fn greater_than(&mut self, _r: &BoundReference, _l: &Datum, _p: &BoundPredicate) -> Result<()> { + Ok(()) + } + + fn greater_than_or_eq( + &mut self, + _r: &BoundReference, + _l: &Datum, + _p: &BoundPredicate, + ) -> Result<()> { + Ok(()) + } + + fn eq(&mut self, r: &BoundReference, _l: &Datum, _p: &BoundPredicate) -> Result<()> { + self.field_ids.insert(r.field().id); + Ok(()) + } + + fn not_eq(&mut self, _r: &BoundReference, _l: &Datum, _p: &BoundPredicate) -> Result<()> { + Ok(()) + } + + fn starts_with(&mut self, _r: &BoundReference, _l: &Datum, _p: &BoundPredicate) -> Result<()> { + Ok(()) + } + + fn not_starts_with( + &mut self, + _r: &BoundReference, + _l: &Datum, + _p: &BoundPredicate, + ) -> Result<()> { + Ok(()) + } + + fn r#in( + &mut self, + r: &BoundReference, + _literals: &FnvHashSet, + _p: &BoundPredicate, + ) -> Result<()> { + self.field_ids.insert(r.field().id); + Ok(()) + } + + fn not_in( + &mut self, + _r: &BoundReference, + _literals: &FnvHashSet, + _p: &BoundPredicate, + ) -> Result<()> { + Ok(()) + } +} + +/// Check whether a datum value might be present in the bloom filter. +/// +/// The value must be checked using the same physical encoding the Parquet +/// writer used when inserting into the bloom filter. We use the actual +/// physical type from the column metadata to ensure correctness regardless +/// of which writer produced the file. +fn check_in_bloom_filter(sbbf: &Sbbf, datum: &Datum, physical_type: PhysicalType) -> bool { + match datum.literal() { + PrimitiveLiteral::Boolean(v) => sbbf.check(v), + PrimitiveLiteral::Int(v) => sbbf.check(v), + PrimitiveLiteral::Long(v) => sbbf.check(v), + PrimitiveLiteral::Float(v) => sbbf.check(&v.0), + PrimitiveLiteral::Double(v) => sbbf.check(&v.0), + PrimitiveLiteral::String(v) => sbbf.check(v.as_str()), + PrimitiveLiteral::Binary(v) => sbbf.check(v.as_slice()), + PrimitiveLiteral::Int128(v) => { + // Decimal: dispatch based on the actual Parquet physical type + // from the file, not inferred from precision. + match physical_type { + PhysicalType::INT32 => sbbf.check(&(*v as i32)), + PhysicalType::INT64 => sbbf.check(&(*v as i64)), + PhysicalType::FIXED_LEN_BYTE_ARRAY => { + // to_be_bytes() truncated to the column's fixed length. + // We use the Iceberg type's precision to determine the + // byte length, which must match the file's fixed length. + let crate::spec::PrimitiveType::Decimal { precision, .. } = datum.data_type() + else { + return true; + }; + let bytes = decimal_to_fixed_length_bytes(*v, *precision); + sbbf.check(&ByteArray::from(bytes)) + } + _ => true, // Unexpected physical type — conservatively might match + } + } + PrimitiveLiteral::UInt128(v) => { + // UUID: stored as FIXED_LEN_BYTE_ARRAY(16), big-endian + let bytes = v.to_be_bytes(); + sbbf.check(&ByteArray::from(bytes.to_vec())) + } + PrimitiveLiteral::AboveMax | PrimitiveLiteral::BelowMin => true, + } +} + +impl<'a> BoundPredicateVisitor for BloomFilterEvaluator<'a> { + type T = bool; + + fn always_true(&mut self) -> Result { + ROW_GROUP_MIGHT_MATCH + } + + fn always_false(&mut self) -> Result { + ROW_GROUP_CANT_MATCH + } + + fn and(&mut self, lhs: Self::T, rhs: Self::T) -> Result { + Ok(lhs && rhs) + } + + fn or(&mut self, lhs: Self::T, rhs: Self::T) -> Result { + Ok(lhs || rhs) + } + + fn not(&mut self, _inner: Self::T) -> Result { + // Bloom filters are not invertible — we cannot prove presence, + // so NOT of any result must conservatively return "might match". + ROW_GROUP_MIGHT_MATCH + } + + fn is_null( + &mut self, + _reference: &BoundReference, + _predicate: &BoundPredicate, + ) -> Result { + ROW_GROUP_MIGHT_MATCH + } + + fn not_null( + &mut self, + _reference: &BoundReference, + _predicate: &BoundPredicate, + ) -> Result { + ROW_GROUP_MIGHT_MATCH + } + + fn is_nan( + &mut self, + _reference: &BoundReference, + _predicate: &BoundPredicate, + ) -> Result { + ROW_GROUP_MIGHT_MATCH + } + + fn not_nan( + &mut self, + _reference: &BoundReference, + _predicate: &BoundPredicate, + ) -> Result { + ROW_GROUP_MIGHT_MATCH + } + + fn less_than( + &mut self, + _reference: &BoundReference, + _literal: &Datum, + _predicate: &BoundPredicate, + ) -> Result { + ROW_GROUP_MIGHT_MATCH + } + + fn less_than_or_eq( + &mut self, + _reference: &BoundReference, + _literal: &Datum, + _predicate: &BoundPredicate, + ) -> Result { + ROW_GROUP_MIGHT_MATCH + } + + fn greater_than( + &mut self, + _reference: &BoundReference, + _literal: &Datum, + _predicate: &BoundPredicate, + ) -> Result { + ROW_GROUP_MIGHT_MATCH + } + + fn greater_than_or_eq( + &mut self, + _reference: &BoundReference, + _literal: &Datum, + _predicate: &BoundPredicate, + ) -> Result { + ROW_GROUP_MIGHT_MATCH + } + + fn eq( + &mut self, + reference: &BoundReference, + literal: &Datum, + _predicate: &BoundPredicate, + ) -> Result { + if self.check_datum(reference, literal) { + ROW_GROUP_MIGHT_MATCH + } else { + ROW_GROUP_CANT_MATCH + } + } + + fn not_eq( + &mut self, + _reference: &BoundReference, + _literal: &Datum, + _predicate: &BoundPredicate, + ) -> Result { + ROW_GROUP_MIGHT_MATCH + } + + fn starts_with( + &mut self, + _reference: &BoundReference, + _literal: &Datum, + _predicate: &BoundPredicate, + ) -> Result { + ROW_GROUP_MIGHT_MATCH + } + + fn not_starts_with( + &mut self, + _reference: &BoundReference, + _literal: &Datum, + _predicate: &BoundPredicate, + ) -> Result { + ROW_GROUP_MIGHT_MATCH + } + + fn r#in( + &mut self, + reference: &BoundReference, + literals: &FnvHashSet, + _predicate: &BoundPredicate, + ) -> Result { + let field_id = reference.field().id; + let Some((sbbf, physical_type)) = self.bloom_filters.get(&field_id) else { + return ROW_GROUP_MIGHT_MATCH; + }; + + // If ANY literal might be present, the row group might match + for literal in literals { + if check_in_bloom_filter(sbbf, literal, *physical_type) { + return ROW_GROUP_MIGHT_MATCH; + } + } + + // All literals are definitely absent + ROW_GROUP_CANT_MATCH + } + + fn not_in( + &mut self, + _reference: &BoundReference, + _literals: &FnvHashSet, + _predicate: &BoundPredicate, + ) -> Result { + ROW_GROUP_MIGHT_MATCH + } +} + +#[cfg(test)] +mod tests { + use std::collections::HashMap; + use std::ops::Not; + + use parquet::basic::Type as PhysicalType; + use parquet::bloom_filter::Sbbf; + use parquet::data_type::ByteArray; + + use super::BloomFilterEvaluator; + use crate::expr::{Bind, Reference}; + use crate::spec::decimal_utils::decimal_to_fixed_length_bytes; + use crate::spec::{Datum, NestedField, PrimitiveType, Schema, Type}; + + fn create_test_schema() -> Schema { + Schema::builder() + .with_schema_id(1) + .with_fields(vec![ + NestedField::required(1, "id", Type::Primitive(PrimitiveType::Int)).into(), + NestedField::optional(2, "name", Type::Primitive(PrimitiveType::String)).into(), + ]) + .build() + .unwrap() + } + + fn create_bloom_filter_with_values_i32(values: &[i32]) -> Sbbf { + let mut sbbf = Sbbf::new_with_ndv_fpp(values.len() as u64, 0.01).unwrap(); + for v in values { + sbbf.insert(v); + } + sbbf + } + + fn create_bloom_filter_with_values_str(values: &[&str]) -> Sbbf { + let mut sbbf = Sbbf::new_with_ndv_fpp(values.len() as u64, 0.01).unwrap(); + for v in values { + sbbf.insert(*v); + } + sbbf + } + + #[test] + fn test_eq_value_present() { + let schema = create_test_schema(); + let bloom_filters = HashMap::from([( + 1, + ( + create_bloom_filter_with_values_i32(&[1, 2, 3]), + PhysicalType::INT32, + ), + )]); + + let predicate = Reference::new("id") + .equal_to(Datum::int(2)) + .bind(schema.into(), true) + .unwrap(); + + let result = BloomFilterEvaluator::eval(&predicate, &bloom_filters).unwrap(); + assert!(result, "Row group should might-match when value is present"); + } + + #[test] + fn test_eq_value_absent() { + let schema = create_test_schema(); + let bloom_filters = HashMap::from([( + 1, + ( + create_bloom_filter_with_values_i32(&[1, 2, 3]), + PhysicalType::INT32, + ), + )]); + + let predicate = Reference::new("id") + .equal_to(Datum::int(999)) + .bind(schema.into(), true) + .unwrap(); + + let result = BloomFilterEvaluator::eval(&predicate, &bloom_filters).unwrap(); + assert!( + !result, + "Row group should not match when value is absent from bloom filter" + ); + } + + #[test] + fn test_eq_no_bloom_filter_for_column() { + let schema = create_test_schema(); + let bloom_filters = HashMap::new(); // No bloom filters + + let predicate = Reference::new("id") + .equal_to(Datum::int(1)) + .bind(schema.into(), true) + .unwrap(); + + let result = BloomFilterEvaluator::eval(&predicate, &bloom_filters).unwrap(); + assert!( + result, + "Row group should might-match when no bloom filter available" + ); + } + + #[test] + fn test_in_all_absent() { + let schema = create_test_schema(); + let bloom_filters = HashMap::from([( + 1, + ( + create_bloom_filter_with_values_i32(&[1, 2, 3]), + PhysicalType::INT32, + ), + )]); + + let predicate = Reference::new("id") + .is_in([Datum::int(100), Datum::int(200), Datum::int(300)]) + .bind(schema.into(), true) + .unwrap(); + + let result = BloomFilterEvaluator::eval(&predicate, &bloom_filters).unwrap(); + assert!( + !result, + "Row group should not match when all IN values are absent" + ); + } + + #[test] + fn test_in_some_present() { + let schema = create_test_schema(); + let bloom_filters = HashMap::from([( + 1, + ( + create_bloom_filter_with_values_i32(&[1, 2, 3]), + PhysicalType::INT32, + ), + )]); + + let predicate = Reference::new("id") + .is_in([Datum::int(2), Datum::int(200)]) + .bind(schema.into(), true) + .unwrap(); + + let result = BloomFilterEvaluator::eval(&predicate, &bloom_filters).unwrap(); + assert!( + result, + "Row group should might-match when at least one IN value is present" + ); + } + + #[test] + fn test_and_one_absent() { + let schema = create_test_schema(); + let bloom_filters = HashMap::from([ + ( + 1, + ( + create_bloom_filter_with_values_i32(&[1, 2, 3]), + PhysicalType::INT32, + ), + ), + ( + 2, + ( + create_bloom_filter_with_values_str(&["alice", "bob"]), + PhysicalType::BYTE_ARRAY, + ), + ), + ]); + + // id = 999 AND name = 'alice' + // id=999 is absent, so AND should be false + let predicate = Reference::new("id") + .equal_to(Datum::int(999)) + .and(Reference::new("name").equal_to(Datum::string("alice"))) + .bind(schema.into(), true) + .unwrap(); + + let result = BloomFilterEvaluator::eval(&predicate, &bloom_filters).unwrap(); + assert!( + !result, + "AND should be false when one operand is definitely absent" + ); + } + + #[test] + fn test_or_one_present() { + let schema = create_test_schema(); + let bloom_filters = HashMap::from([( + 1, + ( + create_bloom_filter_with_values_i32(&[1, 2, 3]), + PhysicalType::INT32, + ), + )]); + + // id = 999 OR id = 2 + // id=2 is present, so OR should be true + let predicate = Reference::new("id") + .equal_to(Datum::int(999)) + .or(Reference::new("id").equal_to(Datum::int(2))) + .bind(schema.into(), true) + .unwrap(); + + let result = BloomFilterEvaluator::eval(&predicate, &bloom_filters).unwrap(); + assert!(result, "OR should be true when one operand might match"); + } + + #[test] + fn test_not_always_might_match() { + let schema = create_test_schema(); + let bloom_filters = HashMap::from([( + 1, + ( + create_bloom_filter_with_values_i32(&[1, 2, 3]), + PhysicalType::INT32, + ), + )]); + + // NOT(id = 999) — even though 999 is absent, NOT should still return true + let predicate = Reference::new("id") + .equal_to(Datum::int(999)) + .not() + .bind(schema.into(), true) + .unwrap(); + + let result = BloomFilterEvaluator::eval(&predicate, &bloom_filters).unwrap(); + assert!(result, "NOT should always return might-match"); + } + + #[test] + fn test_range_predicates_always_might_match() { + let schema = create_test_schema(); + let bloom_filters = HashMap::from([( + 1, + ( + create_bloom_filter_with_values_i32(&[1, 2, 3]), + PhysicalType::INT32, + ), + )]); + + let predicate = Reference::new("id") + .less_than(Datum::int(0)) + .bind(schema.into(), true) + .unwrap(); + + let result = BloomFilterEvaluator::eval(&predicate, &bloom_filters).unwrap(); + assert!(result, "Range predicates should always return might-match"); + } + + #[test] + fn test_string_eq_present() { + let schema = create_test_schema(); + let bloom_filters = HashMap::from([( + 2, + ( + create_bloom_filter_with_values_str(&["alice", "bob"]), + PhysicalType::BYTE_ARRAY, + ), + )]); + + let predicate = Reference::new("name") + .equal_to(Datum::string("alice")) + .bind(schema.into(), true) + .unwrap(); + + let result = BloomFilterEvaluator::eval(&predicate, &bloom_filters).unwrap(); + assert!(result, "Should might-match when string is in bloom filter"); + } + + #[test] + fn test_string_eq_absent() { + let schema = create_test_schema(); + let bloom_filters = HashMap::from([( + 2, + ( + create_bloom_filter_with_values_str(&["alice", "bob"]), + PhysicalType::BYTE_ARRAY, + ), + )]); + + let predicate = Reference::new("name") + .equal_to(Datum::string("charlie")) + .bind(schema.into(), true) + .unwrap(); + + let result = BloomFilterEvaluator::eval(&predicate, &bloom_filters).unwrap(); + assert!( + !result, + "Should not match when string is absent from bloom filter" + ); + } + + // --- Decimal tests --- + + fn create_decimal_schema(precision: u32, scale: u32) -> Schema { + Schema::builder() + .with_schema_id(1) + .with_fields(vec![ + NestedField::required( + 1, + "amount", + Type::Primitive(PrimitiveType::Decimal { precision, scale }), + ) + .into(), + ]) + .build() + .unwrap() + } + + /// Decimal with precision <= 9 is stored as INT32 in Parquet. + /// The bloom filter contains i32 values (the unscaled mantissa). + #[test] + fn test_decimal_int32_present() { + let schema = create_decimal_schema(9, 2); + + // Parquet stores decimal(9,2) as INT32 with unscaled value + // Value "123.45" has mantissa 12345 + let mut sbbf = Sbbf::new_with_ndv_fpp(10, 0.01).unwrap(); + sbbf.insert(&12345_i32); + sbbf.insert(&67890_i32); + + let bloom_filters = HashMap::from([(1, (sbbf, PhysicalType::INT32))]); + + let predicate = Reference::new("amount") + .equal_to( + Datum::decimal_with_precision( + crate::spec::decimal_utils::decimal_from_i128_with_scale(12345, 2), + 9, + ) + .unwrap(), + ) + .bind(schema.into(), true) + .unwrap(); + + let result = BloomFilterEvaluator::eval(&predicate, &bloom_filters).unwrap(); + assert!(result, "Decimal INT32 value present should might-match"); + } + + #[test] + fn test_decimal_int32_absent() { + let schema = create_decimal_schema(9, 2); + + let mut sbbf = Sbbf::new_with_ndv_fpp(10, 0.01).unwrap(); + sbbf.insert(&12345_i32); + sbbf.insert(&67890_i32); + + let bloom_filters = HashMap::from([(1, (sbbf, PhysicalType::INT32))]); + + // Value "999.99" has mantissa 99999, not in the bloom filter + let predicate = Reference::new("amount") + .equal_to( + Datum::decimal_with_precision( + crate::spec::decimal_utils::decimal_from_i128_with_scale(99999, 2), + 9, + ) + .unwrap(), + ) + .bind(schema.into(), true) + .unwrap(); + + let result = BloomFilterEvaluator::eval(&predicate, &bloom_filters).unwrap(); + assert!(!result, "Decimal INT32 value absent should not match"); + } + + /// Decimal with precision 10-18 is stored as INT64 in Parquet. + #[test] + fn test_decimal_int64_present() { + let schema = create_decimal_schema(15, 2); + + // "1234567890123.45" has mantissa 123456789012345 + let mantissa: i64 = 123456789012345; + let mut sbbf = Sbbf::new_with_ndv_fpp(10, 0.01).unwrap(); + sbbf.insert(&mantissa); + + let bloom_filters = HashMap::from([(1, (sbbf, PhysicalType::INT64))]); + + let predicate = Reference::new("amount") + .equal_to( + Datum::decimal_with_precision( + crate::spec::decimal_utils::decimal_from_i128_with_scale(mantissa as i128, 2), + 15, + ) + .unwrap(), + ) + .bind(schema.into(), true) + .unwrap(); + + let result = BloomFilterEvaluator::eval(&predicate, &bloom_filters).unwrap(); + assert!(result, "Decimal INT64 value present should might-match"); + } + + #[test] + fn test_decimal_int64_absent() { + let schema = create_decimal_schema(15, 2); + + let mut sbbf = Sbbf::new_with_ndv_fpp(10, 0.01).unwrap(); + sbbf.insert(&123456789012345_i64); + + let bloom_filters = HashMap::from([(1, (sbbf, PhysicalType::INT64))]); + + let predicate = Reference::new("amount") + .equal_to( + Datum::decimal_with_precision( + crate::spec::decimal_utils::decimal_from_i128_with_scale(999999999999999, 2), + 15, + ) + .unwrap(), + ) + .bind(schema.into(), true) + .unwrap(); + + let result = BloomFilterEvaluator::eval(&predicate, &bloom_filters).unwrap(); + assert!(!result, "Decimal INT64 value absent should not match"); + } + + /// Decimal with precision 19+ is stored as FIXED_LEN_BYTE_ARRAY in Parquet. + #[test] + fn test_decimal_fixed_bytes_present() { + let schema = create_decimal_schema(25, 2); + + // Large mantissa that requires FIXED_LEN_BYTE_ARRAY + let mantissa: i128 = 12345678901234567890; + let bytes = decimal_to_fixed_length_bytes(mantissa, 25); + + let mut sbbf = Sbbf::new_with_ndv_fpp(10, 0.01).unwrap(); + sbbf.insert(&ByteArray::from(bytes)); + + let bloom_filters = HashMap::from([(1, (sbbf, PhysicalType::FIXED_LEN_BYTE_ARRAY))]); + + let predicate = Reference::new("amount") + .equal_to( + Datum::decimal_with_precision( + crate::spec::decimal_utils::decimal_from_i128_with_scale(mantissa, 2), + 25, + ) + .unwrap(), + ) + .bind(schema.into(), true) + .unwrap(); + + let result = BloomFilterEvaluator::eval(&predicate, &bloom_filters).unwrap(); + assert!( + result, + "Decimal FIXED_LEN_BYTE_ARRAY value present should might-match" + ); + } + + #[test] + fn test_decimal_fixed_bytes_absent() { + let schema = create_decimal_schema(25, 2); + + let mantissa: i128 = 12345678901234567890; + let bytes = decimal_to_fixed_length_bytes(mantissa, 25); + + let mut sbbf = Sbbf::new_with_ndv_fpp(10, 0.01).unwrap(); + sbbf.insert(&ByteArray::from(bytes)); + + let bloom_filters = HashMap::from([(1, (sbbf, PhysicalType::FIXED_LEN_BYTE_ARRAY))]); + + // Different value not in the bloom filter + let predicate = Reference::new("amount") + .equal_to( + Datum::decimal_with_precision( + crate::spec::decimal_utils::decimal_from_i128_with_scale( + 99999999999999999999, + 2, + ), + 25, + ) + .unwrap(), + ) + .bind(schema.into(), true) + .unwrap(); + + let result = BloomFilterEvaluator::eval(&predicate, &bloom_filters).unwrap(); + assert!( + !result, + "Decimal FIXED_LEN_BYTE_ARRAY value absent should not match" + ); + } + + /// Negative decimal values should also work correctly. + #[test] + fn test_decimal_negative_int32() { + let schema = create_decimal_schema(9, 2); + + // "-123.45" has mantissa -12345 + let mut sbbf = Sbbf::new_with_ndv_fpp(10, 0.01).unwrap(); + sbbf.insert(&(-12345_i32)); + + let bloom_filters = HashMap::from([(1, (sbbf, PhysicalType::INT32))]); + + let predicate = Reference::new("amount") + .equal_to( + Datum::decimal_with_precision( + crate::spec::decimal_utils::decimal_from_i128_with_scale(-12345, 2), + 9, + ) + .unwrap(), + ) + .bind(schema.into(), true) + .unwrap(); + + let result = BloomFilterEvaluator::eval(&predicate, &bloom_filters).unwrap(); + assert!(result, "Negative decimal INT32 present should might-match"); + } + + #[test] + fn test_decimal_negative_fixed_bytes() { + let schema = create_decimal_schema(25, 2); + + let mantissa: i128 = -12345678901234567890; + let bytes = decimal_to_fixed_length_bytes(mantissa, 25); + + let mut sbbf = Sbbf::new_with_ndv_fpp(10, 0.01).unwrap(); + sbbf.insert(&ByteArray::from(bytes)); + + let bloom_filters = HashMap::from([(1, (sbbf, PhysicalType::FIXED_LEN_BYTE_ARRAY))]); + + let predicate = Reference::new("amount") + .equal_to( + Datum::decimal_with_precision( + crate::spec::decimal_utils::decimal_from_i128_with_scale(mantissa, 2), + 25, + ) + .unwrap(), + ) + .bind(schema.into(), true) + .unwrap(); + + let result = BloomFilterEvaluator::eval(&predicate, &bloom_filters).unwrap(); + assert!( + result, + "Negative decimal FIXED_LEN_BYTE_ARRAY present should might-match" + ); + } + + /// A Parquet writer other than arrow-rs (e.g. Spark, Java Parquet) might + /// choose FIXED_LEN_BYTE_ARRAY for a low-precision decimal rather than INT32. + /// The evaluator must use the physical type from the file metadata, not assume + /// a mapping based on precision. + #[test] + fn test_decimal_low_precision_stored_as_fixed_len_byte_array() { + let schema = create_decimal_schema(5, 2); + + // Simulate a file where decimal(5,2) was stored as FIXED_LEN_BYTE_ARRAY + let mantissa: i128 = 12345; + let bytes = decimal_to_fixed_length_bytes(mantissa, 5); + + let mut sbbf = Sbbf::new_with_ndv_fpp(10, 0.01).unwrap(); + sbbf.insert(&ByteArray::from(bytes)); + + let bloom_filters = HashMap::from([(1, (sbbf, PhysicalType::FIXED_LEN_BYTE_ARRAY))]); + + let predicate = Reference::new("amount") + .equal_to( + Datum::decimal_with_precision( + crate::spec::decimal_utils::decimal_from_i128_with_scale(mantissa, 2), + 5, + ) + .unwrap(), + ) + .bind(schema.into(), true) + .unwrap(); + + let result = BloomFilterEvaluator::eval(&predicate, &bloom_filters).unwrap(); + assert!( + result, + "Should match when physical type is FIXED_LEN_BYTE_ARRAY even for low-precision decimal" + ); + } +} diff --git a/crates/iceberg/src/expr/visitors/mod.rs b/crates/iceberg/src/expr/visitors/mod.rs index 38332ea6a0..c5aca6838d 100644 --- a/crates/iceberg/src/expr/visitors/mod.rs +++ b/crates/iceberg/src/expr/visitors/mod.rs @@ -15,6 +15,7 @@ // specific language governing permissions and limitations // under the License. +pub(crate) mod bloom_filter_evaluator; pub(crate) mod bound_predicate_visitor; pub(crate) mod expression_evaluator; pub(crate) mod inclusive_metrics_evaluator; diff --git a/crates/iceberg/src/scan/mod.rs b/crates/iceberg/src/scan/mod.rs index 27f479183a..41a592d0aa 100644 --- a/crates/iceberg/src/scan/mod.rs +++ b/crates/iceberg/src/scan/mod.rs @@ -61,6 +61,7 @@ pub struct TableScanBuilder<'a> { concurrency_limit_manifest_files: usize, row_group_filtering_enabled: bool, row_selection_enabled: bool, + bloom_filter_enabled: bool, } impl<'a> TableScanBuilder<'a> { @@ -79,6 +80,7 @@ impl<'a> TableScanBuilder<'a> { concurrency_limit_manifest_files: num_cpus, row_group_filtering_enabled: true, row_selection_enabled: false, + bloom_filter_enabled: false, } } @@ -185,6 +187,20 @@ impl<'a> TableScanBuilder<'a> { self } + /// Determines whether to enable bloom filter-based row group filtering. + /// + /// When enabled, if a read is performed with an equality or IN predicate, + /// the bloom filter for relevant columns in each row group is read and + /// checked. Row groups where the bloom filter proves the value is absent + /// are skipped entirely. + /// + /// Defaults to disabled, as reading bloom filters requires additional I/O + /// per column per row group. + pub fn with_bloom_filter_enabled(mut self, bloom_filter_enabled: bool) -> Self { + self.bloom_filter_enabled = bloom_filter_enabled; + self + } + /// Build the table scan. pub fn build(self) -> Result { let snapshot = match self.snapshot_id { @@ -211,6 +227,7 @@ impl<'a> TableScanBuilder<'a> { concurrency_limit_manifest_files: self.concurrency_limit_manifest_files, row_group_filtering_enabled: self.row_group_filtering_enabled, row_selection_enabled: self.row_selection_enabled, + bloom_filter_enabled: self.bloom_filter_enabled, }); }; current_snapshot_id.clone() @@ -304,6 +321,7 @@ impl<'a> TableScanBuilder<'a> { concurrency_limit_manifest_files: self.concurrency_limit_manifest_files, row_group_filtering_enabled: self.row_group_filtering_enabled, row_selection_enabled: self.row_selection_enabled, + bloom_filter_enabled: self.bloom_filter_enabled, }) } } @@ -332,6 +350,7 @@ pub struct TableScan { row_group_filtering_enabled: bool, row_selection_enabled: bool, + bloom_filter_enabled: bool, } impl TableScan { @@ -436,7 +455,8 @@ impl TableScan { let mut arrow_reader_builder = ArrowReaderBuilder::new(self.file_io.clone()) .with_data_file_concurrency_limit(self.concurrency_limit_data_files) .with_row_group_filtering_enabled(self.row_group_filtering_enabled) - .with_row_selection_enabled(self.row_selection_enabled); + .with_row_selection_enabled(self.row_selection_enabled) + .with_bloom_filter_enabled(self.bloom_filter_enabled); if let Some(batch_size) = self.batch_size { arrow_reader_builder = arrow_reader_builder.with_batch_size(batch_size); diff --git a/crates/iceberg/src/spec/values/decimal_utils.rs b/crates/iceberg/src/spec/values/decimal_utils.rs index 88e3f72f65..21f74de074 100644 --- a/crates/iceberg/src/spec/values/decimal_utils.rs +++ b/crates/iceberg/src/spec/values/decimal_utils.rs @@ -186,6 +186,40 @@ pub fn i128_to_be_bytes_min(value: i128) -> Vec { bytes[start..].to_vec() } +/// Encode an i128 decimal value as a fixed-length big-endian byte array, +/// matching how Parquet stores `FIXED_LEN_BYTE_ARRAY` decimals. +/// +/// The result is sign-extended or trimmed to exactly the number of bytes +/// required for the given precision, matching the Java implementation in +/// `DecimalUtil.toReusedFixLengthBytes`. +pub fn decimal_to_fixed_length_bytes(value: i128, precision: u32) -> Vec { + let required_len = parquet_decimal_byte_length(precision); + let be_bytes = value.to_be_bytes(); // 16 bytes, big-endian, two's complement + + if required_len >= 16 { + // Sign-extend to the required length + let fill_byte = if value < 0 { 0xFF } else { 0x00 }; + let mut buf = vec![fill_byte; required_len]; + let offset = required_len - 16; + buf[offset..].copy_from_slice(&be_bytes); + buf + } else { + // Trim leading bytes (value fits in fewer bytes) + let offset = 16 - required_len; + be_bytes[offset..].to_vec() + } +} + +/// Returns the number of bytes required to store a decimal with the given +/// precision as a Parquet `FIXED_LEN_BYTE_ARRAY`. +/// +/// Mirrors `parquet::arrow::schema::decimal_length_from_precision` which is +/// not publicly accessible outside the parquet crate without the `experimental` +/// feature flag. +fn parquet_decimal_byte_length(precision: u32) -> usize { + (((10.0_f64.powi(precision as i32) + 1.0).log2() + 1.0) / 8.0).ceil() as usize +} + #[cfg(test)] mod tests { use super::*; @@ -319,4 +353,61 @@ mod tests { ); } } + + #[test] + fn test_parquet_decimal_byte_length() { + // INT32 range (precision 1-9) should need <= 4 bytes + assert!(parquet_decimal_byte_length(1) <= 4); + assert!(parquet_decimal_byte_length(9) <= 4); + // INT64 range (precision 10-18) should need <= 8 bytes + assert!(parquet_decimal_byte_length(10) <= 8); + assert!(parquet_decimal_byte_length(18) <= 8); + // FIXED_LEN_BYTE_ARRAY range (precision 19+) + assert_eq!(parquet_decimal_byte_length(19), 9); + assert_eq!(parquet_decimal_byte_length(38), 16); + } + + #[test] + fn test_decimal_to_parquet_fixed_bytes_positive() { + // 12345 with precision 20 (requires 9 bytes) + let bytes = decimal_to_fixed_length_bytes(12345, 20); + assert_eq!(bytes.len(), parquet_decimal_byte_length(20)); + // Should be big-endian, zero-padded on the left + assert_eq!(bytes[bytes.len() - 2], 0x30); // 12345 = 0x3039 + assert_eq!(bytes[bytes.len() - 1], 0x39); + // Leading bytes should be 0x00 (positive) + assert!(bytes[..bytes.len() - 2].iter().all(|&b| b == 0x00)); + } + + #[test] + fn test_decimal_to_parquet_fixed_bytes_negative() { + // -1 with precision 20 + let bytes = decimal_to_fixed_length_bytes(-1, 20); + assert_eq!(bytes.len(), parquet_decimal_byte_length(20)); + // All bytes should be 0xFF (-1 in two's complement) + assert!(bytes.iter().all(|&b| b == 0xFF)); + } + + #[test] + fn test_decimal_to_parquet_fixed_bytes_round_trip() { + // Verify that encoding then decoding via i128_from_be_bytes gives back + // the original value + for (value, precision) in [ + (0i128, 20), + (1, 20), + (-1, 20), + (12345, 20), + (-12345, 20), + (i64::MAX as i128, 20), + (i64::MIN as i128, 20), + ] { + let bytes = decimal_to_fixed_length_bytes(value, precision); + let decoded = i128_from_be_bytes(&bytes); + assert_eq!( + decoded, + Some(value), + "Round trip failed for value={value}, precision={precision}" + ); + } + } }