[SPARK-56241][SQL] Derive outputOrdering from KeyedPartitioning key expressions#55036
[SPARK-56241][SQL] Derive outputOrdering from KeyedPartitioning key expressions#55036peter-toth wants to merge 5 commits intoapache:masterfrom
outputOrdering from KeyedPartitioning key expressions#55036Conversation
dongjoon-hyun
left a comment
There was a problem hiding this comment.
It's a nice improvement. I expected many generated query plan changes in the test case, but there is no change from the existing generated plan. Is there any reason, @peter-toth ?
We don't have any prodiction ready DSv2 filesources in Spark so the generated test plans / expected outputs doesn't cover this feature either. |
|
Got it~ |
dongjoon-hyun
left a comment
There was a problem hiding this comment.
+1, LGTM. Thank you, @peter-toth .
|
cc @cloud-fan , @szehon-ho , @aokolnychyi , @gengliangwang , too |
|
Iceberg can benefit from the change. |
7946dce to
4260f53
Compare
|
Marked as draft for now. Let me doublecheck a few edgecases as changing the reported ordering without the concept of constant order, which would be safe to prepend to any ordering, can be problematic. |
4260f53 to
f28c056
Compare
|
This PR now follows a safer approach and doesn't alter the reported ordering. The failures doesn't seem related, but looks like the same we hit here as well: #55048 (comment) |
.../src/main/scala/org/apache/spark/sql/execution/datasources/v2/DataSourceV2ScanExecBase.scala
Outdated
Show resolved
Hide resolved
.../src/main/scala/org/apache/spark/sql/execution/datasources/v2/DataSourceV2ScanExecBase.scala
Outdated
Show resolved
Hide resolved
sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/v2/GroupPartitionsExec.scala
Outdated
Show resolved
Hide resolved
| // Without reducers all merged partitions share the same original key value, so the key | ||
| // expressions remain constant within the output partition. | ||
| val keyedPartitionings = p.collect { case k: KeyedPartitioning => k } | ||
| keyedPartitionings.map(_.expressions).transpose.map { exprs => |
There was a problem hiding this comment.
just trying to wrap my head around it, is it because its the same partition value? thats why its ordered?
There was a problem hiding this comment.
Let me refine this part and make it more clear.
There was a problem hiding this comment.
I refined the logic in the latest version and updated the PR description to summarize when this change is useful.
|
I also feel we should gate this behind a flag, as its a new feature with certain risk |
6f08ec5 to
7e489d0
Compare
Yes, this PR can help in all cases when ordering is required. And no, it doesn't make This PR is useful:
I can introduce a flag to disable the feature, but I'm not sure it is needed as the logic is very consevative now. (There is no change in the expected output of |
7e489d0 to
47b386c
Compare
…xpressions
### What changes were proposed in this pull request?
Within a `KeyedPartitioning` partition, all rows share the same key value, so
the key expressions are trivially sorted (ascending) within each partition.
This PR makes two plan nodes expose that structural guarantee via
`outputOrdering`:
- **`DataSourceV2ScanExecBase`**: when `outputPartitioning` is a
`KeyedPartitioning` and the source reports no ordering via
`SupportsReportOrdering`, derive one ascending `SortOrder` per key
expression. When the source does report ordering, it is returned as-is.
- **`GroupPartitionsExec`**:
- *Non-coalescing* (every group has ≤ 1 input partition): pass through
`child.outputOrdering` unchanged.
- *Coalescing without reducers*: re-derive ordering from the output
`KeyedPartitioning` key expressions; a join may embed multiple
`KeyedPartitioning`s with different expressions — expose equivalences
via `sameOrderExpressions`.
- *Coalescing with reducers*: fall back to `super.outputOrdering` (empty),
because merged partitions share only the reduced key.
### Why are the changes needed?
Before this change, `outputOrdering` on both nodes returned an empty sequence
(unless `SupportsReportOrdering` was implemented), even though the within-
partition ordering was structurally guaranteed by the partitioning itself.
As a result, `EnsureRequirements` would insert a redundant `SortExec` before
`SortMergeJoin` inputs that are already in key order.
### Does this PR introduce _any_ user-facing change?
Yes. Queries involving storage-partitioned joins (v2 bucketing) no longer add
a redundant `SortExec` before `SortMergeJoin` when the join keys match the
partition keys, reducing CPU and memory overhead.
### How was this patch tested?
- New unit test class `GroupPartitionsExecSuite` covering all four
`outputOrdering` branches (non-coalescing, coalescing without reducers with
single and multi-key, join `sameOrderExpressions`, coalescing with reducers).
- New SQL integration tests in `KeyGroupedPartitioningSuite` (SPARK-56241):
- Scan with `KeyedPartitioning` reports key-derived `outputOrdering`.
- Non-coalescing `GroupPartitionsExec` (non-identical key sets) passes
through child ordering — no pre-join `SortExec`.
- Coalescing `GroupPartitionsExec` derives ordering from key expressions —
no pre-join `SortExec`.
- Updated expected output in `DataSourceV2Suite` for the case where a source
is partitioned by a key with no reported ordering — groupBy on the partition
key no longer requires a sort.
### Was this patch authored or co-authored using generative AI tooling?
Generated-by: Claude Sonnet 4.6
47b386c to
67325e9
Compare
|
@szehon-ho, I added a new config in 492a875. |
67325e9 to
492a875
Compare
There was a problem hiding this comment.
Thanks @peter-toth i think changes mostly look good, just some comments.
Re: config flag, my worry is that we are now skipping an explicit Sort in these SMJ cases, which should be fine, but is still a bit risky if there's some edge cases missing.
i also wonder, typically we have a new feature flag for every new improvement that default to false, does this improvement also need it? There's flag now but it default true. cc @dongjoon-hyun @sunchao as well. Not sure if i'm being too cautious here
| // Coalescing: multiple input partitions are merged into one output partition. The child's | ||
| // within-partition ordering is lost due to concatenation -- for example, if two input | ||
| // partitions both belong to the same output group (same partition key value) and hold | ||
| // [1, 3] and [2, 5] respectively (each sorted ascending), concatenating them yields |
There was a problem hiding this comment.
sorry, im a bit confused about the comment. Is [1,3],[2,5] a non partitioned key? The scenario is more about required ordering being the partitioning keys, right? Do you think we can limit the comment to that scenario (which iiuc is the one handled here?)
There was a problem hiding this comment.
I see your point. I extended this comment in 930f1e7#diff-0ba234a494a1f66f06609ddce5d0643d69df21aa8e1f84fec39e5b487f814b5eR195-R198.
| // [1, 3, 2, 5] which is no longer sorted. Only sort orders over partition key expressions | ||
| // (which are constant across all merged partitions) remain valid. | ||
| outputPartitioning match { | ||
| case p: Partitioning with Expression if reducers.isEmpty => |
There was a problem hiding this comment.
maybe add the gate here as well?
Ok, we can do that and be very caotious with new features. I added a second config to control the |
|
Thank you @szehon-ho and @dongjoon-hyun for the review! Merged to |
Two flags may be a bit overkill and can be the same guard, is there any use for the second individually? |
But these 2 functions are different. Now one config controls if we can derive the sort order and the other controls if we can keep some parts of it during grouping (regardless if the order is derived or not). |
…ey expressions ### What changes were proposed in this pull request? Within a `KeyedPartitioning` partition, all rows share the same key value, so the key expressions are trivially sorted within each partition. This PR makes two plan nodes expose that structural guarantee via `outputOrdering`: - `DataSourceV2ScanExecBase.outputOrdering` Previously this returned the source-reported ordering (via `SupportsReportOrdering`) or fell back to the empty default. It now also handles the case where no ordering is reported but the output partitioning is a `KeyedPartitioning`: since every row in a partition evaluates to the same constant value for the key expressions, the partition is trivially sorted by those expressions. This feature can be enabled with a new `spark.sql.sources.v2.bucketing.partitionKeyOrdering.enabled` config. - `GroupPartitionsExec.outputOrdering` Previously the coalescing branch always returned `super.outputOrdering` (empty), discarding any ordering the child produced. It now distinguishes the following cases: - *No coalescing* (all groups contain exactly one partition): the child's within-partition ordering is fully preserved -- `child.outputOrdering` is returned as-is, including any key-derived ordering that `DataSourceV2ScanExecBase` already set. - *Coalescing without reducers* (multiple input partitions merged into one): all merged partitions share the same original key value, so key expressions are constant across all merged rows. The child's `outputOrdering` should already be in sync with the partitioning (it was either reported by the source or derived from `KeyedPartitioning` in `DataSourceV2ScanExecBase`), so we simply filter it to keep only the sort orders whose expression is a partition key expression. This feature can be enabled with a new `spark.sql.sources.v2.bucketing.preserveKeyOrderingOnCoalesce.enabled` config. - *Coalescing with reducers*: merged partitions no longer share the same original key values, so empty ordering is returned. ### Why are the changes needed? Before this change, `outputOrdering` on both nodes returned an empty sequence (unless `SupportsReportOrdering` was implemented), even though the within-partition ordering was structurally guaranteed by the partitioning itself. As a result, `EnsureRequirements` would insert a redundant `SortExec` in some cases. ### Does this PR introduce _any_ user-facing change? Yes. Queries that use key-partitioned tables may now avoid inserting some `SortExec`s. ### How was this patch tested? - New `GroupPartitionsExecSuite` covering all branches of the updated `outputOrdering` logic. - New SQL-level tests in `KeyGroupedPartitioningSuite` validating end-to-end plan shapes. ### Was this patch authored or co-authored using generative AI tooling? Generated-by: Claude Sonnet 4.6 Closes apache#55036 from peter-toth/SPARK-56241-outputordering-from-keyedpartitioning. Authored-by: Peter Toth <peter.toth@gmail.com> Signed-off-by: Peter Toth <peter.toth@gmail.com>
What changes were proposed in this pull request?
Within a
KeyedPartitioningpartition, all rows share the same key value, so the key expressions are trivially sorted within each partition.This PR makes two plan nodes expose that structural guarantee via
outputOrdering:DataSourceV2ScanExecBase.outputOrderingPreviously this returned the source-reported ordering (via
SupportsReportOrdering) or fell back to the empty default. It now also handles the case where no ordering is reported but the output partitioning is aKeyedPartitioning: since every row in a partition evaluates to the same constant value for the key expressions, the partition is trivially sorted by those expressions.This feature can be enabled with a new
spark.sql.sources.v2.bucketing.partitionKeyOrdering.enabledconfig.GroupPartitionsExec.outputOrderingPreviously the coalescing branch always returned
super.outputOrdering(empty), discarding any ordering the child produced. It now distinguishes the following cases:No coalescing (all groups contain exactly one partition): the child's within-partition ordering is fully preserved --
child.outputOrderingis returned as-is, including any key-derived ordering thatDataSourceV2ScanExecBasealready set.Coalescing without reducers (multiple input partitions merged into one): all merged partitions share the same original key value, so key expressions are constant across all merged rows. The child's
outputOrderingshould already be in sync with the partitioning (it was either reported by the source or derived fromKeyedPartitioninginDataSourceV2ScanExecBase), so we simply filter it to keep only the sort orders whose expression is a partition key expression.This feature can be enabled with a new
spark.sql.sources.v2.bucketing.preserveKeyOrderingOnCoalesce.enabledconfig.Coalescing with reducers: merged partitions no longer share the same original key values, so empty ordering is returned.
Why are the changes needed?
Before this change,
outputOrderingon both nodes returned an empty sequence (unlessSupportsReportOrderingwas implemented), even though the within-partition ordering was structurally guaranteed by the partitioning itself. As a result,EnsureRequirementswould insert a redundantSortExecin some cases.Does this PR introduce any user-facing change?
Yes. Queries that use key-partitioned tables may now avoid inserting some
SortExecs.How was this patch tested?
GroupPartitionsExecSuitecovering all branches of the updatedoutputOrderinglogic.KeyGroupedPartitioningSuitevalidating end-to-end plan shapes.Was this patch authored or co-authored using generative AI tooling?
Generated-by: Claude Sonnet 4.6