Skip to content

[SPARK-56241][SQL] Derive outputOrdering from KeyedPartitioning key expressions#55036

Closed
peter-toth wants to merge 5 commits intoapache:masterfrom
peter-toth:SPARK-56241-outputordering-from-keyedpartitioning
Closed

[SPARK-56241][SQL] Derive outputOrdering from KeyedPartitioning key expressions#55036
peter-toth wants to merge 5 commits intoapache:masterfrom
peter-toth:SPARK-56241-outputordering-from-keyedpartitioning

Conversation

@peter-toth
Copy link
Copy Markdown
Contributor

@peter-toth peter-toth commented Mar 26, 2026

What changes were proposed in this pull request?

Within a KeyedPartitioning partition, all rows share the same key value, so the key expressions are trivially sorted within each partition.

This PR makes two plan nodes expose that structural guarantee via outputOrdering:

  • DataSourceV2ScanExecBase.outputOrdering
    Previously this returned the source-reported ordering (via SupportsReportOrdering) or fell back to the empty default. It now also handles the case where no ordering is reported but the output partitioning is a KeyedPartitioning: since every row in a partition evaluates to the same constant value for the key expressions, the partition is trivially sorted by those expressions.
    This feature can be enabled with a new spark.sql.sources.v2.bucketing.partitionKeyOrdering.enabled config.

  • GroupPartitionsExec.outputOrdering
    Previously the coalescing branch always returned super.outputOrdering (empty), discarding any ordering the child produced. It now distinguishes the following cases:

    • No coalescing (all groups contain exactly one partition): the child's within-partition ordering is fully preserved -- child.outputOrdering is returned as-is, including any key-derived ordering that DataSourceV2ScanExecBase already set.

    • Coalescing without reducers (multiple input partitions merged into one): all merged partitions share the same original key value, so key expressions are constant across all merged rows. The child's outputOrdering should already be in sync with the partitioning (it was either reported by the source or derived from KeyedPartitioning in DataSourceV2ScanExecBase), so we simply filter it to keep only the sort orders whose expression is a partition key expression.
      This feature can be enabled with a new spark.sql.sources.v2.bucketing.preserveKeyOrderingOnCoalesce.enabled config.

    • Coalescing with reducers: merged partitions no longer share the same original key values, so empty ordering is returned.

Why are the changes needed?

Before this change, outputOrdering on both nodes returned an empty sequence (unless SupportsReportOrdering was implemented), even though the within-partition ordering was structurally guaranteed by the partitioning itself. As a result, EnsureRequirements would insert a redundant SortExec in some cases.

Does this PR introduce any user-facing change?

Yes. Queries that use key-partitioned tables may now avoid inserting some SortExecs.

How was this patch tested?

  • New GroupPartitionsExecSuite covering all branches of the updated outputOrdering logic.
  • New SQL-level tests in KeyGroupedPartitioningSuite validating end-to-end plan shapes.

Was this patch authored or co-authored using generative AI tooling?

Generated-by: Claude Sonnet 4.6

Copy link
Copy Markdown
Member

@dongjoon-hyun dongjoon-hyun left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

It's a nice improvement. I expected many generated query plan changes in the test case, but there is no change from the existing generated plan. Is there any reason, @peter-toth ?

@peter-toth
Copy link
Copy Markdown
Contributor Author

peter-toth commented Mar 26, 2026

It's a nice improvement. I expected many generated query plan changes in the test case, but there is no change from the existing generated plan. Is there any reason, @peter-toth ?

We don't have any prodiction ready DSv2 filesources in Spark so the generated test plans / expected outputs doesn't cover this feature either.

@dongjoon-hyun
Copy link
Copy Markdown
Member

Got it~

dongjoon-hyun
dongjoon-hyun previously approved these changes Mar 26, 2026
Copy link
Copy Markdown
Member

@dongjoon-hyun dongjoon-hyun left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

+1, LGTM. Thank you, @peter-toth .

@dongjoon-hyun
Copy link
Copy Markdown
Member

cc @cloud-fan , @szehon-ho , @aokolnychyi , @gengliangwang , too

@peter-toth
Copy link
Copy Markdown
Contributor Author

peter-toth commented Mar 26, 2026

Iceberg can benefit from the change.
I will add a follow-up improvement in the scope of SPARK-55715 to keep ordering even when we coalesce partitions, and once @anuragmantri's apache/iceberg#14948 is also merged it will be a major improvement.

@peter-toth peter-toth force-pushed the SPARK-56241-outputordering-from-keyedpartitioning branch from 7946dce to 4260f53 Compare March 26, 2026 19:39
@peter-toth peter-toth marked this pull request as draft March 26, 2026 20:21
@peter-toth
Copy link
Copy Markdown
Contributor Author

peter-toth commented Mar 26, 2026

Marked as draft for now. Let me doublecheck a few edgecases as changing the reported ordering without the concept of constant order, which would be safe to prepend to any ordering, can be problematic.

@dongjoon-hyun dongjoon-hyun dismissed their stale review March 26, 2026 21:07

Stale review.

@peter-toth peter-toth force-pushed the SPARK-56241-outputordering-from-keyedpartitioning branch from 4260f53 to f28c056 Compare March 27, 2026 10:01
@peter-toth peter-toth marked this pull request as ready for review March 27, 2026 15:05
@peter-toth
Copy link
Copy Markdown
Contributor Author

peter-toth commented Mar 27, 2026

This PR now follows a safer approach and doesn't alter the reported ordering.

The failures doesn't seem related, but looks like the same we hit here as well: #55048 (comment)

// Without reducers all merged partitions share the same original key value, so the key
// expressions remain constant within the output partition.
val keyedPartitionings = p.collect { case k: KeyedPartitioning => k }
keyedPartitionings.map(_.expressions).transpose.map { exprs =>
Copy link
Copy Markdown
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

just trying to wrap my head around it, is it because its the same partition value? thats why its ordered?

Copy link
Copy Markdown
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Let me refine this part and make it more clear.

Copy link
Copy Markdown
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I refined the logic in the latest version and updated the PR description to summarize when this change is useful.

Copy link
Copy Markdown
Member

@szehon-ho szehon-ho left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

thanks for the patch, i think it mostly looks good. So its not just SPJ case but also 'order by', is it right? And now we don't need SupportsReportOrdering?

@szehon-ho
Copy link
Copy Markdown
Member

I also feel we should gate this behind a flag, as its a new feature with certain risk

@peter-toth peter-toth force-pushed the SPARK-56241-outputordering-from-keyedpartitioning branch 2 times, most recently from 6f08ec5 to 7e489d0 Compare March 28, 2026 19:33
@peter-toth
Copy link
Copy Markdown
Contributor Author

peter-toth commented Mar 28, 2026

So its not just SPJ case but also 'order by', is it right? And now we don't need SupportsReportOrdering?

Yes, this PR can help in all cases when ordering is required. And no, it doesn't make SupportsReportOrdering obsolete as an ordering reported by a source can be more granular than the one that we can derive from partitioning. E.g. a source can report partitioning by [a] and ordering by [a, b].

This PR is useful:

  • When the source doesn't report anything because we can derive some basic ordering from the partitioning in DataSourceV2ScanExecBase.
  • When we coalesce partitions in GroupPartitionsExec because we can keep some parts of the ordering we get from the child.

I also feel we should gate this behind a flag, as its a new feature with certain risk.

I can introduce a flag to disable the feature, but I'm not sure it is needed as the logic is very consevative now. (There is no change in the expected output of ordering and partitioning reporting test of DataSourceV2Suite, which tests cases when ordering is reported by the data source.)

@peter-toth peter-toth force-pushed the SPARK-56241-outputordering-from-keyedpartitioning branch from 7e489d0 to 47b386c Compare March 29, 2026 10:44
…xpressions

### What changes were proposed in this pull request?

Within a `KeyedPartitioning` partition, all rows share the same key value, so
the key expressions are trivially sorted (ascending) within each partition.

This PR makes two plan nodes expose that structural guarantee via
`outputOrdering`:

- **`DataSourceV2ScanExecBase`**: when `outputPartitioning` is a
  `KeyedPartitioning` and the source reports no ordering via
  `SupportsReportOrdering`, derive one ascending `SortOrder` per key
  expression. When the source does report ordering, it is returned as-is.

- **`GroupPartitionsExec`**:
  - *Non-coalescing* (every group has ≤ 1 input partition): pass through
    `child.outputOrdering` unchanged.
  - *Coalescing without reducers*: re-derive ordering from the output
    `KeyedPartitioning` key expressions; a join may embed multiple
    `KeyedPartitioning`s with different expressions — expose equivalences
    via `sameOrderExpressions`.
  - *Coalescing with reducers*: fall back to `super.outputOrdering` (empty),
    because merged partitions share only the reduced key.

### Why are the changes needed?

Before this change, `outputOrdering` on both nodes returned an empty sequence
(unless `SupportsReportOrdering` was implemented), even though the within-
partition ordering was structurally guaranteed by the partitioning itself.
As a result, `EnsureRequirements` would insert a redundant `SortExec` before
`SortMergeJoin` inputs that are already in key order.

### Does this PR introduce _any_ user-facing change?

Yes. Queries involving storage-partitioned joins (v2 bucketing) no longer add
a redundant `SortExec` before `SortMergeJoin` when the join keys match the
partition keys, reducing CPU and memory overhead.

### How was this patch tested?

- New unit test class `GroupPartitionsExecSuite` covering all four
  `outputOrdering` branches (non-coalescing, coalescing without reducers with
  single and multi-key, join `sameOrderExpressions`, coalescing with reducers).
- New SQL integration tests in `KeyGroupedPartitioningSuite` (SPARK-56241):
  - Scan with `KeyedPartitioning` reports key-derived `outputOrdering`.
  - Non-coalescing `GroupPartitionsExec` (non-identical key sets) passes
    through child ordering — no pre-join `SortExec`.
  - Coalescing `GroupPartitionsExec` derives ordering from key expressions —
    no pre-join `SortExec`.
- Updated expected output in `DataSourceV2Suite` for the case where a source
  is partitioned by a key with no reported ordering — groupBy on the partition
  key no longer requires a sort.

### Was this patch authored or co-authored using generative AI tooling?

Generated-by: Claude Sonnet 4.6
@peter-toth peter-toth force-pushed the SPARK-56241-outputordering-from-keyedpartitioning branch from 47b386c to 67325e9 Compare March 30, 2026 09:04
@peter-toth
Copy link
Copy Markdown
Contributor Author

peter-toth commented Mar 30, 2026

@szehon-ho, I added a new config in 492a875.

Copy link
Copy Markdown
Member

@szehon-ho szehon-ho left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Thanks @peter-toth i think changes mostly look good, just some comments.

Re: config flag, my worry is that we are now skipping an explicit Sort in these SMJ cases, which should be fine, but is still a bit risky if there's some edge cases missing.

i also wonder, typically we have a new feature flag for every new improvement that default to false, does this improvement also need it? There's flag now but it default true. cc @dongjoon-hyun @sunchao as well. Not sure if i'm being too cautious here

// Coalescing: multiple input partitions are merged into one output partition. The child's
// within-partition ordering is lost due to concatenation -- for example, if two input
// partitions both belong to the same output group (same partition key value) and hold
// [1, 3] and [2, 5] respectively (each sorted ascending), concatenating them yields
Copy link
Copy Markdown
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

sorry, im a bit confused about the comment. Is [1,3],[2,5] a non partitioned key? The scenario is more about required ordering being the partitioning keys, right? Do you think we can limit the comment to that scenario (which iiuc is the one handled here?)

Copy link
Copy Markdown
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

// [1, 3, 2, 5] which is no longer sorted. Only sort orders over partition key expressions
// (which are constant across all merged partitions) remain valid.
outputPartitioning match {
case p: Partitioning with Expression if reducers.isEmpty =>
Copy link
Copy Markdown
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

maybe add the gate here as well?

Copy link
Copy Markdown
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Alright, added in 930f1e7.

@peter-toth
Copy link
Copy Markdown
Contributor Author

peter-toth commented Mar 31, 2026

Re: config flag, my worry is that we are now skipping an explicit Sort in these SMJ cases, which should be fine, but is still a bit risky if there's some edge cases missing.

i also wonder, typically we have a new feature flag for every new improvement that default to false, does this improvement also need it? There's flag now but it default true. cc @dongjoon-hyun @sunchao as well. Not sure if i'm being too cautious here

Ok, we can do that and be very caotious with new features. I added a second config to control the outputOrdering logic in GroupPartitionsExec and disabled both by default. The PR description reflects these amendments.

@peter-toth
Copy link
Copy Markdown
Contributor Author

Thank you @szehon-ho and @dongjoon-hyun for the review!

Merged to master (4.2.0).

@szehon-ho
Copy link
Copy Markdown
Member

I added a second config to control the outputOrdering logic in GroupPartitionsExec and disabled both by default

Two flags may be a bit overkill and can be the same guard, is there any use for the second individually?

@peter-toth
Copy link
Copy Markdown
Contributor Author

peter-toth commented Mar 31, 2026

I added a second config to control the outputOrdering logic in GroupPartitionsExec and disabled both by default

Two flags may be a bit overkill and can be the same guard, is there any use for the second individually?

But these 2 functions are different. Now one config controls if we can derive the sort order and the other controls if we can keep some parts of it during grouping (regardless if the order is derived or not).

zhengruifeng pushed a commit to zhengruifeng/spark that referenced this pull request Apr 1, 2026
…ey expressions

### What changes were proposed in this pull request?

Within a `KeyedPartitioning` partition, all rows share the same key value, so the key expressions are trivially sorted within each partition.

This PR makes two plan nodes expose that structural guarantee via `outputOrdering`:

- `DataSourceV2ScanExecBase.outputOrdering`
Previously this returned the source-reported ordering (via `SupportsReportOrdering`) or fell back to the empty default. It now also handles the case where no ordering is reported but the output partitioning is a `KeyedPartitioning`: since every row in a partition evaluates to the same constant value for the key expressions, the partition is trivially sorted by those expressions.
This feature can be enabled with a new `spark.sql.sources.v2.bucketing.partitionKeyOrdering.enabled` config.

- `GroupPartitionsExec.outputOrdering`
Previously the coalescing branch always returned `super.outputOrdering` (empty), discarding any ordering the child produced. It now distinguishes the following cases:

  - *No coalescing* (all groups contain exactly one partition): the child's within-partition ordering is fully preserved -- `child.outputOrdering` is returned as-is, including any key-derived ordering that `DataSourceV2ScanExecBase` already set.

  - *Coalescing without reducers* (multiple input partitions merged into one): all merged partitions share the same original key value, so key expressions are constant across all merged rows. The child's `outputOrdering` should already be in sync with the partitioning (it was either reported by the source or derived from `KeyedPartitioning` in `DataSourceV2ScanExecBase`), so we simply filter it to keep only the sort orders whose expression is a partition key expression.
This feature can be enabled with a new `spark.sql.sources.v2.bucketing.preserveKeyOrderingOnCoalesce.enabled` config.

  - *Coalescing with reducers*: merged partitions no longer share the same original key values, so empty ordering is returned.

### Why are the changes needed?

Before this change, `outputOrdering` on both nodes returned an empty sequence (unless `SupportsReportOrdering` was implemented), even though the within-partition ordering was structurally guaranteed by the partitioning itself. As a result, `EnsureRequirements` would insert a redundant `SortExec` in some cases.

### Does this PR introduce _any_ user-facing change?

Yes. Queries that use key-partitioned tables may now avoid inserting some `SortExec`s.

### How was this patch tested?

- New `GroupPartitionsExecSuite` covering all branches of the updated `outputOrdering` logic.
- New SQL-level tests in `KeyGroupedPartitioningSuite` validating end-to-end plan shapes.

### Was this patch authored or co-authored using generative AI tooling?

Generated-by: Claude Sonnet 4.6

Closes apache#55036 from peter-toth/SPARK-56241-outputordering-from-keyedpartitioning.

Authored-by: Peter Toth <peter.toth@gmail.com>
Signed-off-by: Peter Toth <peter.toth@gmail.com>
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment

Labels

None yet

Projects

None yet

Development

Successfully merging this pull request may close these issues.

3 participants