Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
18 changes: 18 additions & 0 deletions crates/iceberg/src/arrow/reader/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -52,6 +52,7 @@ pub struct ArrowReaderBuilder {
concurrency_limit_data_files: usize,
row_group_filtering_enabled: bool,
row_selection_enabled: bool,
bloom_filter_enabled: bool,
parquet_read_options: ParquetReadOptions,
}

Expand All @@ -66,6 +67,7 @@ impl ArrowReaderBuilder {
concurrency_limit_data_files: num_cpus,
row_group_filtering_enabled: true,
row_selection_enabled: false,
bloom_filter_enabled: false,
parquet_read_options: ParquetReadOptions::builder().build(),
}
}
Expand Down Expand Up @@ -95,6 +97,20 @@ impl ArrowReaderBuilder {
self
}

/// Determines whether to enable bloom filter-based row group filtering.
///
/// When enabled, if a read is performed with an equality or IN predicate,
/// the bloom filter for relevant columns in each row group is read and
/// checked. Row groups where the bloom filter proves the value is absent
/// are skipped entirely.
///
/// Defaults to disabled, as reading bloom filters requires additional I/O
/// per column per row group.
pub fn with_bloom_filter_enabled(mut self, bloom_filter_enabled: bool) -> Self {
self.bloom_filter_enabled = bloom_filter_enabled;
self
}

/// Provide a hint as to the number of bytes to prefetch for parsing the Parquet metadata
///
/// This hint can help reduce the number of fetch requests. For more details see the
Expand Down Expand Up @@ -133,6 +149,7 @@ impl ArrowReaderBuilder {
concurrency_limit_data_files: self.concurrency_limit_data_files,
row_group_filtering_enabled: self.row_group_filtering_enabled,
row_selection_enabled: self.row_selection_enabled,
bloom_filter_enabled: self.bloom_filter_enabled,
parquet_read_options: self.parquet_read_options,
}
}
Expand All @@ -150,5 +167,6 @@ pub struct ArrowReader {

row_group_filtering_enabled: bool,
row_selection_enabled: bool,
bloom_filter_enabled: bool,
parquet_read_options: ParquetReadOptions,
}
93 changes: 93 additions & 0 deletions crates/iceberg/src/arrow/reader/pipeline.rs
Original file line number Diff line number Diff line change
Expand Up @@ -36,6 +36,9 @@ use crate::arrow::int96::coerce_int96_timestamps;
use crate::arrow::record_batch_transformer::RecordBatchTransformerBuilder;
use crate::arrow::scan_metrics::{CountingFileRead, ScanMetrics, ScanResult};
use crate::error::Result;
use crate::expr::visitors::bloom_filter_evaluator::{
BloomFilterEvaluator, collect_bloom_filter_field_ids,
};
use crate::io::{FileIO, FileMetadata, FileRead};
use crate::metadata_columns::{RESERVED_FIELD_ID_FILE, is_metadata_field};
use crate::scan::{ArrowRecordBatchStream, FileScanTask, FileScanTaskStream};
Expand All @@ -57,6 +60,7 @@ impl ArrowReader {
.with_scan_metrics(scan_metrics.clone()),
row_group_filtering_enabled: self.row_group_filtering_enabled,
row_selection_enabled: self.row_selection_enabled,
bloom_filter_enabled: self.bloom_filter_enabled,
parquet_read_options: self.parquet_read_options,
scan_metrics: scan_metrics.clone(),
};
Expand Down Expand Up @@ -98,6 +102,7 @@ struct FileScanTaskReader {
delete_file_loader: CachingDeleteFileLoader,
row_group_filtering_enabled: bool,
row_selection_enabled: bool,
bloom_filter_enabled: bool,
parquet_read_options: ParquetReadOptions,
scan_metrics: ScanMetrics,
}
Expand Down Expand Up @@ -336,6 +341,30 @@ impl FileScanTaskReader {
};
}

if self.bloom_filter_enabled {
let all_rgs;
let candidate_rgs = match &selected_row_group_indices {
Some(indices) => indices.as_slice(),
None => {
all_rgs = (0..record_batch_stream_builder.metadata().num_row_groups())
.collect::<Vec<_>>();
&all_rgs
}
};

let bloom_filtered = Self::filter_row_groups_by_bloom_filter(
&predicate,
&mut record_batch_stream_builder,
candidate_rgs,
&field_id_map,
)
.await?;

if bloom_filtered.len() < candidate_rgs.len() {
selected_row_group_indices = Some(bloom_filtered);
}
}

if self.row_selection_enabled {
row_selection = Some(ArrowReader::get_row_selection_for_filter_predicate(
&predicate,
Expand Down Expand Up @@ -395,6 +424,70 @@ impl FileScanTaskReader {

Ok(Box::pin(record_batch_stream) as ArrowRecordBatchStream)
}

/// Reads bloom filters for relevant columns and evaluates the predicate
/// against them to filter out row groups that definitely don't match.
async fn filter_row_groups_by_bloom_filter(
predicate: &crate::expr::BoundPredicate,
builder: &mut ParquetRecordBatchStreamBuilder<ArrowFileReader>,
Copy link
Copy Markdown
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

mut reference is because get_row_group_column_bloom_filter requires it.

candidate_row_groups: &[usize],
field_id_map: &std::collections::HashMap<i32, usize>,
) -> Result<Vec<usize>> {
use std::collections::HashMap;

// Only collect field IDs from eq/in predicates — the only types
// bloom filters can help with. Skip columns not in the parquet schema.
let bloom_filter_field_ids: Vec<i32> = collect_bloom_filter_field_ids(predicate)?
.into_iter()
.filter(|id| field_id_map.contains_key(id))
.collect();

if bloom_filter_field_ids.is_empty() {
return Ok(candidate_row_groups.to_vec());
}

let mut result = Vec::with_capacity(candidate_row_groups.len());

for &rg_idx in candidate_row_groups {
let mut bloom_filters: HashMap<
i32,
(parquet::bloom_filter::Sbbf, parquet::basic::Type),
> = HashMap::new();

for &field_id in &bloom_filter_field_ids {
let col_idx = field_id_map[&field_id];
let col_meta = builder.metadata().row_group(rg_idx).column(col_idx);

// Only attempt to load if this column chunk actually has a bloom filter
if col_meta.bloom_filter_offset().is_none() {
continue;
}

let physical_type = col_meta.column_type();

match builder
.get_row_group_column_bloom_filter(rg_idx, col_idx)
.await
{
Ok(Some(sbbf)) => {
bloom_filters.insert(field_id, (sbbf, physical_type));
}
Ok(None) => {}
Err(_) => {
// If we can't read the bloom filter, conservatively include the row group
}
}
}

match BloomFilterEvaluator::eval(predicate, &bloom_filters) {
Ok(true) => result.push(rg_idx),
Ok(false) => { /* Row group pruned by bloom filter */ }
Err(_) => result.push(rg_idx), // On error, conservatively include
}
}

Ok(result)
}
}

impl ArrowReader {
Expand Down
177 changes: 176 additions & 1 deletion crates/iceberg/src/arrow/reader/row_filter.rs
Original file line number Diff line number Diff line change
Expand Up @@ -196,7 +196,7 @@ mod tests {
use std::sync::Arc;

use arrow_array::cast::AsArray;
use arrow_array::{ArrayRef, LargeStringArray, RecordBatch, StringArray};
use arrow_array::{ArrayRef, Int32Array, LargeStringArray, RecordBatch, StringArray};
use arrow_schema::{DataType, Field, Schema as ArrowSchema};
use futures::TryStreamExt;
use parquet::arrow::{ArrowWriter, PARQUET_FIELD_ID_META_KEY};
Expand Down Expand Up @@ -616,4 +616,179 @@ mod tests {
assert_eq!(first_val, 100, "Task 2 should start with id=100, not id=0");
}
}

/// Tests that bloom filter pushdown correctly prunes row groups.
#[tokio::test]
async fn test_bloom_filter_pushdown_prunes_row_groups() {
let schema = Arc::new(
Schema::builder()
.with_schema_id(1)
.with_fields(vec![
NestedField::required(1, "id", Type::Primitive(PrimitiveType::Int)).into(),
])
.build()
.unwrap(),
);

let arrow_schema = Arc::new(ArrowSchema::new(vec![
Field::new("id", DataType::Int32, false).with_metadata(HashMap::from([(
PARQUET_FIELD_ID_META_KEY.to_string(),
"1".to_string(),
)])),
]));

let tmp_dir = TempDir::new().unwrap();
let file_path = format!("{}/bloom_test.parquet", tmp_dir.path().to_str().unwrap());

// Write a Parquet file with 3 row groups, each containing distinct values,
// with bloom filters enabled.
// Row group 0: ids 0..100
// Row group 1: ids 100..200
// Row group 2: ids 200..300
let props = WriterProperties::builder()
.set_compression(Compression::SNAPPY)
.set_max_row_group_row_count(Some(100))
.set_bloom_filter_enabled(true)
.build();

let file = File::create(&file_path).unwrap();
let mut writer = ArrowWriter::try_new(file, arrow_schema.clone(), Some(props)).unwrap();

for batch_start in [0, 100, 200] {
let batch = RecordBatch::try_new(arrow_schema.clone(), vec![Arc::new(
Int32Array::from((batch_start..batch_start + 100).collect::<Vec<i32>>()),
)])
.unwrap();
writer.write(&batch).unwrap();
}
writer.close().unwrap();

let file_io = FileIO::new_with_fs();

// Query for id = 150, which is only in row group 1.
// With bloom filter pushdown, row groups 0 and 2 should be pruned.
let predicate = Reference::new("id").equal_to(Datum::int(150));

let reader = ArrowReaderBuilder::new(file_io.clone())
.with_bloom_filter_enabled(true)
.build();

let tasks = Box::pin(futures::stream::iter(vec![Ok(FileScanTask {
file_size_in_bytes: std::fs::metadata(&file_path).unwrap().len(),
start: 0,
length: 0,
record_count: None,
data_file_path: file_path.clone(),
data_file_format: DataFileFormat::Parquet,
schema: schema.clone(),
project_field_ids: vec![1],
predicate: Some(predicate.bind(schema.clone(), true).unwrap()),
deletes: vec![],
partition: None,
partition_spec: None,
name_mapping: None,
case_sensitive: false,
})])) as FileScanTaskStream;

let result = reader
.read(tasks)
.unwrap()
.stream()
.try_collect::<Vec<RecordBatch>>()
.await
.unwrap();

// Only row group 1 (ids 100..200) should be read. The row filter
// then further filters to just id=150.
let total_rows: usize = result.iter().map(|b| b.num_rows()).sum();
assert_eq!(total_rows, 1, "Should find exactly one row matching id=150");

let id_col = result[0]
.column(0)
.as_primitive::<arrow_array::types::Int32Type>();
assert_eq!(id_col.value(0), 150);
}

/// Tests that bloom filter pushdown skips all row groups when value is absent.
#[tokio::test]
async fn test_bloom_filter_pushdown_value_absent() {
let schema = Arc::new(
Schema::builder()
.with_schema_id(1)
.with_fields(vec![
NestedField::required(1, "id", Type::Primitive(PrimitiveType::Int)).into(),
])
.build()
.unwrap(),
);

let arrow_schema = Arc::new(ArrowSchema::new(vec![
Field::new("id", DataType::Int32, false).with_metadata(HashMap::from([(
PARQUET_FIELD_ID_META_KEY.to_string(),
"1".to_string(),
)])),
]));

let tmp_dir = TempDir::new().unwrap();
let file_path = format!("{}/bloom_absent.parquet", tmp_dir.path().to_str().unwrap());

let props = WriterProperties::builder()
.set_compression(Compression::SNAPPY)
.set_max_row_group_row_count(Some(100))
.set_bloom_filter_enabled(true)
.build();

let file = File::create(&file_path).unwrap();
let mut writer = ArrowWriter::try_new(file, arrow_schema.clone(), Some(props)).unwrap();

for batch_start in [0, 100, 200] {
let batch = RecordBatch::try_new(arrow_schema.clone(), vec![Arc::new(
Int32Array::from((batch_start..batch_start + 100).collect::<Vec<i32>>()),
)])
.unwrap();
writer.write(&batch).unwrap();
}
writer.close().unwrap();

let file_io = FileIO::new_with_fs();

// Query for id = 999, which doesn't exist in any row group.
// All row groups should be pruned by bloom filter.
let predicate = Reference::new("id").equal_to(Datum::int(999));

let reader = ArrowReaderBuilder::new(file_io)
.with_bloom_filter_enabled(true)
.build();

let tasks = Box::pin(futures::stream::iter(vec![Ok(FileScanTask {
file_size_in_bytes: std::fs::metadata(&file_path).unwrap().len(),
start: 0,
length: 0,
record_count: None,
data_file_path: file_path.clone(),
data_file_format: DataFileFormat::Parquet,
schema: schema.clone(),
project_field_ids: vec![1],
predicate: Some(predicate.bind(schema, true).unwrap()),
deletes: vec![],
partition: None,
partition_spec: None,
name_mapping: None,
case_sensitive: false,
})])) as FileScanTaskStream;

let result = reader
.read(tasks)
.unwrap()
.stream()
.try_collect::<Vec<RecordBatch>>()
.await
.unwrap();

let total_rows: usize = result.iter().map(|b| b.num_rows()).sum();
assert_eq!(
total_rows, 0,
"Should find zero rows when value is absent from all bloom filters"
);
}
}
Loading
Loading