Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
Show all changes
29 commits
Select commit Hold shift + click to select a range
37f2f7a
feat: wrong version which did weird stuff
discord9 Dec 19, 2025
f4d280c
feat: gather filter support alias
discord9 Dec 25, 2025
6ac3e10
feat: add support for detecting unknown columns in filter pushdown & …
discord9 Dec 25, 2025
473d558
feat: update projection alias handling and enhance PhysicalColumnRewr…
discord9 Dec 25, 2025
51dacbd
feat: update deeply nested expression helper function and enhance tes…
discord9 Dec 25, 2025
eeaa05e
chore: clippy
discord9 Dec 25, 2025
18dde3a
typo
discord9 Dec 25, 2025
844d571
feat: update test assertions for filter pushdown to reflect expected …
discord9 Dec 25, 2025
470ca47
c
discord9 Dec 25, 2025
73a6ce7
c
discord9 Dec 25, 2025
90d6552
clippy
discord9 Dec 26, 2025
c2f0ee6
test: update sqllogic test result
discord9 Dec 26, 2025
37f045e
test: more complex dyn filter
discord9 Dec 26, 2025
25f7fca
c
discord9 Dec 26, 2025
2f0d953
refactor: rename function have_unknown_columns to has_unknown_columns…
discord9 Jan 4, 2026
0fa54c3
test: topk with projection
discord9 Jan 5, 2026
15feef6
test: slt test for projection dyn filter
discord9 Jan 5, 2026
5199490
chore
discord9 Jan 5, 2026
2a6b017
test: ignore time
discord9 Jan 5, 2026
6040aa7
chore: fmt
discord9 Jan 5, 2026
c34a5ad
test: more slt test
discord9 Jan 5, 2026
05104ed
test: fix
discord9 Jan 7, 2026
6171245
test: more ignore
discord9 Jan 7, 2026
b50b1ab
test: more ignore&proper sql
discord9 Jan 7, 2026
03171ac
feat: unmap column not pushdown
discord9 Jan 9, 2026
51dc45f
clippy
discord9 Jan 9, 2026
9ca4de0
chore
discord9 Jan 9, 2026
34c8a71
test: add pushdown assert
discord9 Jan 9, 2026
bf744f9
refactor: ref column map
discord9 Jan 9, 2026
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
1 change: 1 addition & 0 deletions Cargo.lock

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

1 change: 1 addition & 0 deletions datafusion/core/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -175,6 +175,7 @@ env_logger = { workspace = true }
glob = { workspace = true }
insta = { workspace = true }
paste = { workspace = true }
pretty_assertions = "1.0"
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This appears to already be used elsewhere (this is not a net new depednecy), so I think it is ok to add

rand = { workspace = true, features = ["small_rng"] }
rand_distr = "0.5"
regex = { workspace = true }
Expand Down
229 changes: 229 additions & 0 deletions datafusion/core/tests/physical_optimizer/filter_pushdown/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -59,6 +59,7 @@ use datafusion_physical_plan::{
coalesce_partitions::CoalescePartitionsExec,
collect,
filter::FilterExec,
projection::ProjectionExec,
repartition::RepartitionExec,
sorts::sort::SortExec,
};
Expand Down Expand Up @@ -1813,6 +1814,234 @@ fn schema() -> SchemaRef {
Arc::clone(&TEST_SCHEMA)
}

struct ProjectionDynFilterTestCase {
schema: SchemaRef,
batches: Vec<RecordBatch>,
projection: Vec<(Arc<dyn PhysicalExpr>, String)>,
sort_expr: PhysicalSortExpr,
expected_plans: Vec<String>,
}

async fn run_projection_dyn_filter_case(case: ProjectionDynFilterTestCase) {
let ProjectionDynFilterTestCase {
schema,
batches,
projection,
sort_expr,
expected_plans,
} = case;

let scan = TestScanBuilder::new(Arc::clone(&schema))
.with_support(true)
.with_batches(batches)
.build();

let projection_exec = Arc::new(ProjectionExec::try_new(projection, scan).unwrap());

let sort = Arc::new(
SortExec::new(LexOrdering::new(vec![sort_expr]).unwrap(), projection_exec)
.with_fetch(Some(2)),
) as Arc<dyn ExecutionPlan>;

let mut config = ConfigOptions::default();
config.execution.parquet.pushdown_filters = true;
config.optimizer.enable_dynamic_filter_pushdown = true;

let optimized_plan = FilterPushdown::new_post_optimization()
.optimize(Arc::clone(&sort), &config)
.unwrap();

pretty_assertions::assert_eq!(
format_plan_for_test(&optimized_plan).trim(),
expected_plans[0].trim()
);

let config = SessionConfig::new().with_batch_size(2);
let session_ctx = SessionContext::new_with_config(config);
session_ctx.register_object_store(
ObjectStoreUrl::parse("test://").unwrap().as_ref(),
Arc::new(InMemory::new()),
);
let state = session_ctx.state();
let task_ctx = state.task_ctx();
let mut stream = optimized_plan.execute(0, Arc::clone(&task_ctx)).unwrap();
for (idx, expected_plan) in expected_plans.iter().enumerate().skip(1) {
stream.next().await.unwrap().unwrap();
let formatted_plan = format_plan_for_test(&optimized_plan);
pretty_assertions::assert_eq!(
formatted_plan.trim(),
expected_plan.trim(),
"Mismatch at iteration {}",
idx
);
}
}

#[tokio::test]
async fn test_topk_with_projection_transformation_on_dyn_filter() {
let schema = Arc::new(Schema::new(vec![
Field::new("a", DataType::Int32, false),
Field::new("b", DataType::Utf8, false),
Field::new("c", DataType::Float64, false),
]));
let simple_abc = vec![
record_batch!(
("a", Int32, [1, 2, 3]),
("b", Utf8, ["x", "y", "z"]),
("c", Float64, [1.0, 2.0, 3.0])
)
.unwrap(),
];

// Case 1: Reordering [b, a]
run_projection_dyn_filter_case(ProjectionDynFilterTestCase {
schema: Arc::clone(&schema),
batches: simple_abc.clone(),
projection: vec![
(col("b", &schema).unwrap(), "b".to_string()),
(col("a", &schema).unwrap(), "a".to_string()),
],
sort_expr: PhysicalSortExpr::new(
Arc::new(Column::new("a", 1)),
SortOptions::default(),
),
expected_plans: vec![
r#" - SortExec: TopK(fetch=2), expr=[a@1 ASC], preserve_partitioning=[false]
- ProjectionExec: expr=[b@1 as b, a@0 as a]
- DataSourceExec: file_groups={1 group: [[test.parquet]]}, projection=[a, b, c], file_type=test, pushdown_supported=true, predicate=DynamicFilter [ empty ]"#.to_string(),
r#" - SortExec: TopK(fetch=2), expr=[a@1 ASC], preserve_partitioning=[false], filter=[a@1 IS NULL OR a@1 < 2]
- ProjectionExec: expr=[b@1 as b, a@0 as a]
- DataSourceExec: file_groups={1 group: [[test.parquet]]}, projection=[a, b, c], file_type=test, pushdown_supported=true, predicate=DynamicFilter [ a@0 IS NULL OR a@0 < 2 ]"#.to_string()]
})
.await;

// Case 2: Pruning [a]
run_projection_dyn_filter_case(ProjectionDynFilterTestCase {
schema: Arc::clone(&schema),
batches: simple_abc.clone(),
projection: vec![(col("a", &schema).unwrap(), "a".to_string())],
sort_expr: PhysicalSortExpr::new(
Arc::new(Column::new("a", 0)),
SortOptions::default(),
),
expected_plans: vec![
r#" - SortExec: TopK(fetch=2), expr=[a@0 ASC], preserve_partitioning=[false]
- ProjectionExec: expr=[a@0 as a]
- DataSourceExec: file_groups={1 group: [[test.parquet]]}, projection=[a, b, c], file_type=test, pushdown_supported=true, predicate=DynamicFilter [ empty ]"#.to_string(),
r#" - SortExec: TopK(fetch=2), expr=[a@0 ASC], preserve_partitioning=[false], filter=[a@0 IS NULL OR a@0 < 2]
- ProjectionExec: expr=[a@0 as a]
- DataSourceExec: file_groups={1 group: [[test.parquet]]}, projection=[a, b, c], file_type=test, pushdown_supported=true, predicate=DynamicFilter [ a@0 IS NULL OR a@0 < 2 ]"#.to_string(),
],
})
.await;

// Case 3: Identity [a, b]
run_projection_dyn_filter_case(ProjectionDynFilterTestCase {
schema: Arc::clone(&schema),
batches: simple_abc.clone(),
projection: vec![
(col("a", &schema).unwrap(), "a".to_string()),
(col("b", &schema).unwrap(), "b".to_string()),
],
sort_expr: PhysicalSortExpr::new(
Arc::new(Column::new("a", 0)),
SortOptions::default(),
),
expected_plans: vec![
r#" - SortExec: TopK(fetch=2), expr=[a@0 ASC], preserve_partitioning=[false]
- ProjectionExec: expr=[a@0 as a, b@1 as b]
- DataSourceExec: file_groups={1 group: [[test.parquet]]}, projection=[a, b, c], file_type=test, pushdown_supported=true, predicate=DynamicFilter [ empty ]"#.to_string(),
r#" - SortExec: TopK(fetch=2), expr=[a@0 ASC], preserve_partitioning=[false], filter=[a@0 IS NULL OR a@0 < 2]
- ProjectionExec: expr=[a@0 as a, b@1 as b]
- DataSourceExec: file_groups={1 group: [[test.parquet]]}, projection=[a, b, c], file_type=test, pushdown_supported=true, predicate=DynamicFilter [ a@0 IS NULL OR a@0 < 2 ]"#.to_string(),
],
})
.await;

// Case 4: Expressions [a + 1, b]
run_projection_dyn_filter_case(ProjectionDynFilterTestCase {
schema: Arc::clone(&schema),
batches: simple_abc.clone(),
projection: vec![
(
Arc::new(BinaryExpr::new(
col("a", &schema).unwrap(),
Operator::Plus,
Arc::new(Literal::new(ScalarValue::Int32(Some(1)))),
)),
"a_plus_1".to_string(),
),
(col("b", &schema).unwrap(), "b".to_string()),
],
sort_expr: PhysicalSortExpr::new(
Arc::new(Column::new("a_plus_1", 0)),
SortOptions::default(),
),
expected_plans: vec![
r#" - SortExec: TopK(fetch=2), expr=[a_plus_1@0 ASC], preserve_partitioning=[false]
- ProjectionExec: expr=[a@0 + 1 as a_plus_1, b@1 as b]
- DataSourceExec: file_groups={1 group: [[test.parquet]]}, projection=[a, b, c], file_type=test, pushdown_supported=true, predicate=DynamicFilter [ empty ]"#.to_string(),
r#" - SortExec: TopK(fetch=2), expr=[a_plus_1@0 ASC], preserve_partitioning=[false], filter=[a_plus_1@0 IS NULL OR a_plus_1@0 < 3]
- ProjectionExec: expr=[a@0 + 1 as a_plus_1, b@1 as b]
- DataSourceExec: file_groups={1 group: [[test.parquet]]}, projection=[a, b, c], file_type=test, pushdown_supported=true, predicate=DynamicFilter [ a@0 + 1 IS NULL OR a@0 + 1 < 3 ]"#.to_string(),
],
})
.await;

// Case 5: [a as b, b as a] (swapped columns)
run_projection_dyn_filter_case(ProjectionDynFilterTestCase {
schema: Arc::clone(&schema),
batches: simple_abc.clone(),
projection: vec![
(col("a", &schema).unwrap(), "b".to_string()),
(col("b", &schema).unwrap(), "a".to_string()),
],
sort_expr: PhysicalSortExpr::new(
Arc::new(Column::new("b", 0)),
SortOptions::default(),
),
expected_plans: vec![
r#" - SortExec: TopK(fetch=2), expr=[b@0 ASC], preserve_partitioning=[false]
- ProjectionExec: expr=[a@0 as b, b@1 as a]
- DataSourceExec: file_groups={1 group: [[test.parquet]]}, projection=[a, b, c], file_type=test, pushdown_supported=true, predicate=DynamicFilter [ empty ]"#.to_string(),
r#" - SortExec: TopK(fetch=2), expr=[b@0 ASC], preserve_partitioning=[false], filter=[b@0 IS NULL OR b@0 < 2]
- ProjectionExec: expr=[a@0 as b, b@1 as a]
- DataSourceExec: file_groups={1 group: [[test.parquet]]}, projection=[a, b, c], file_type=test, pushdown_supported=true, predicate=DynamicFilter [ a@0 IS NULL OR a@0 < 2 ]"#.to_string(),
],
})
.await;

// Case 6: Confusing expr [a + 1 as a, b]
run_projection_dyn_filter_case(ProjectionDynFilterTestCase {
schema: Arc::clone(&schema),
batches: simple_abc.clone(),
projection: vec![
(
Arc::new(BinaryExpr::new(
col("a", &schema).unwrap(),
Operator::Plus,
Arc::new(Literal::new(ScalarValue::Int32(Some(1)))),
)),
"a".to_string(),
),
(col("b", &schema).unwrap(), "b".to_string()),
],
sort_expr: PhysicalSortExpr::new(
Arc::new(Column::new("a", 0)),
SortOptions::default(),
),
expected_plans: vec![
r#" - SortExec: TopK(fetch=2), expr=[a@0 ASC], preserve_partitioning=[false]
- ProjectionExec: expr=[a@0 + 1 as a, b@1 as b]
- DataSourceExec: file_groups={1 group: [[test.parquet]]}, projection=[a, b, c], file_type=test, pushdown_supported=true, predicate=DynamicFilter [ empty ]"#.to_string(),
r#" - SortExec: TopK(fetch=2), expr=[a@0 ASC], preserve_partitioning=[false], filter=[a@0 IS NULL OR a@0 < 3]
- ProjectionExec: expr=[a@0 + 1 as a, b@1 as b]
- DataSourceExec: file_groups={1 group: [[test.parquet]]}, projection=[a, b, c], file_type=test, pushdown_supported=true, predicate=DynamicFilter [ a@0 + 1 IS NULL OR a@0 + 1 < 3 ]"#.to_string(),
],
})
.await;
}

/// Returns a predicate that is a binary expression col = lit
fn col_lit_predicate(
column_name: &str,
Expand Down
1 change: 1 addition & 0 deletions datafusion/physical-plan/src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -92,6 +92,7 @@ pub mod streaming;
pub mod tree_node;
pub mod union;
pub mod unnest;
pub mod util;
pub mod windows;
pub mod work_table;
pub mod udaf {
Expand Down
Loading