Skip to content
Closed
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -2165,6 +2165,32 @@ object SQLConf {
.booleanConf
.createWithDefault(false)

val V2_BUCKETING_PARTITION_KEY_ORDERING_ENABLED =
buildConf("spark.sql.sources.v2.bucketing.partitionKeyOrdering.enabled")
.doc("When enabled, Spark derives output ordering from the partition key expressions of " +
"a V2 data source that reports a KeyedPartitioning but does not report explicit ordering " +
"via SupportsReportOrdering. Within a single partition all rows share the same key " +
s"value, so the data is trivially sorted by those expressions. Requires " +
s"${V2_BUCKETING_ENABLED.key} to be enabled.")
.version("4.2.0")
.withBindingPolicy(ConfigBindingPolicy.SESSION)
.booleanConf
.createWithDefault(false)

val V2_BUCKETING_PRESERVE_KEY_ORDERING_ON_COALESCE_ENABLED =
buildConf("spark.sql.sources.v2.bucketing.preserveKeyOrderingOnCoalesce.enabled")
.doc("When enabled, Spark preserves sort orders over partition key expressions when " +
"GroupPartitionsExec coalesces multiple input partitions into one output partition. " +
"Because all merged partitions share the same partition key value, sort orders over " +
"those key expressions remain valid after the merge. This applies to both key-derived " +
"ordering (from SupportsReportOrdering) and ordering derived from " +
s"${V2_BUCKETING_PARTITION_KEY_ORDERING_ENABLED.key}. Requires " +
s"${V2_BUCKETING_ENABLED.key} to be enabled.")
.version("4.2.0")
.withBindingPolicy(ConfigBindingPolicy.SESSION)
.booleanConf
.createWithDefault(false)

val BUCKETING_MAX_BUCKETS = buildConf("spark.sql.sources.bucketing.maxBuckets")
.doc("The maximum number of buckets allowed.")
.version("2.4.0")
Expand Down Expand Up @@ -7731,6 +7757,12 @@ class SQLConf extends Serializable with Logging with SqlApiConf {
def v2BucketingAllowSorting: Boolean =
getConf(SQLConf.V2_BUCKETING_SORTING_ENABLED)

def v2BucketingPartitionKeyOrderingEnabled: Boolean =
getConf(SQLConf.V2_BUCKETING_PARTITION_KEY_ORDERING_ENABLED)

def v2BucketingPreserveKeyOrderingOnCoalesceEnabled: Boolean =
getConf(SQLConf.V2_BUCKETING_PRESERVE_KEY_ORDERING_ON_COALESCE_ENABLED)

def dataFrameSelfJoinAutoResolveAmbiguity: Boolean =
getConf(DATAFRAME_SELF_JOIN_AUTO_RESOLVE_AMBIGUITY)

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -19,7 +19,7 @@ package org.apache.spark.sql.execution.datasources.v2

import org.apache.spark.rdd.RDD
import org.apache.spark.sql.catalyst.InternalRow
import org.apache.spark.sql.catalyst.expressions.{Expression, RowOrdering, SortOrder}
import org.apache.spark.sql.catalyst.expressions.{Ascending, Expression, RowOrdering, SortOrder}
import org.apache.spark.sql.catalyst.plans.physical
import org.apache.spark.sql.catalyst.plans.physical.KeyedPartitioning
import org.apache.spark.sql.catalyst.util.truncatedString
Expand Down Expand Up @@ -104,11 +104,21 @@ trait DataSourceV2ScanExecBase extends LeafExecNode {
}

/**
* Returns the output ordering from the data source if available, otherwise falls back
* to the default (no ordering). This allows data sources to report their natural ordering
* through `SupportsReportOrdering`.
* Returns the output ordering for this scan. When the source reports ordering via
* `SupportsReportOrdering`, that ordering is returned as-is. Otherwise, when the output
* partitioning is a `KeyedPartitioning` and
* `spark.sql.sources.v2.bucketing.partitionKeyOrdering.enabled` is on, each partition
* contains rows where the key expressions evaluate to a single constant value, so the data
* is trivially sorted by those expressions within the partition.
*/
override def outputOrdering: Seq[SortOrder] = ordering.getOrElse(super.outputOrdering)
override def outputOrdering: Seq[SortOrder] = {
(ordering, outputPartitioning) match {
case (Some(o), _) => o
case (_, k: KeyedPartitioning) if conf.v2BucketingPartitionKeyOrderingEnabled =>
k.expressions.map(SortOrder(_, Ascending))
case _ => Seq.empty
}
}

override def supportsColumnar: Boolean = {
scan.columnarSupportMode() match {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -184,11 +184,34 @@ case class GroupPartitionsExec(
copy(child = newChild)

override def outputOrdering: Seq[SortOrder] = {
// when multiple partitions are grouped together, ordering inside partitions is not preserved
if (groupedPartitions.forall(_._2.size <= 1)) {
// No coalescing: each output partition is exactly one input partition. The child's
// within-partition ordering is fully preserved (including any key-derived ordering that
// `DataSourceV2ScanExecBase` already prepended).
child.outputOrdering
} else {
super.outputOrdering
// Coalescing: multiple input partitions are merged into one output partition. The child's
// within-partition ordering is lost due to concatenation -- for example, if two input
// partitions both share key=A and hold rows (A,1),(A,3) and (A,2),(A,5) respectively (each
// sorted ascending by the data column), concatenating them yields (A,1),(A,3),(A,2),(A,5)
// which is no longer sorted by the data column. Only sort orders over partition key
// expressions remain valid -- they evaluate to the same value (A) in every merged partition.
outputPartitioning match {
case p: Partitioning with Expression
if reducers.isEmpty && conf.v2BucketingPreserveKeyOrderingOnCoalesceEnabled =>
// Without reducers all merged partitions share the same original key value, so the key
// expressions remain constant within the output partition. The child's outputOrdering
// should already be in sync with the partitioning (either reported by the source or
// derived from it in DataSourceV2ScanExecBase), so we only need to keep the sort orders
// whose expression is a partition key expression -- all others are lost by concatenation.
val keyedPartitionings = p.collect { case k: KeyedPartitioning => k }
val keyExprs = ExpressionSet(keyedPartitionings.flatMap(_.expressions))
child.outputOrdering.filter(order => keyExprs.contains(order.child))
case _ =>
// With reducers, merged partitions share only the reduced key, not the original key
// expressions, which can take different values within the output partition.
super.outputOrdering
}
}
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -22,14 +22,14 @@ import java.util.Collections
import org.apache.spark.{SparkConf, SparkException}
import org.apache.spark.sql.{DataFrame, ExplainSuiteHelper, Row}
import org.apache.spark.sql.catalyst.InternalRow
import org.apache.spark.sql.catalyst.expressions.{Literal, TransformExpression}
import org.apache.spark.sql.catalyst.expressions.{Ascending, AttributeReference, Literal, TransformExpression}
import org.apache.spark.sql.catalyst.plans.physical
import org.apache.spark.sql.connector.catalog.{Column, Identifier, InMemoryTableCatalog}
import org.apache.spark.sql.connector.catalog.functions._
import org.apache.spark.sql.connector.distributions.Distributions
import org.apache.spark.sql.connector.expressions._
import org.apache.spark.sql.connector.expressions.Expressions._
import org.apache.spark.sql.execution.{ExtendedMode, FormattedMode, RDDScanExec, SimpleMode, SparkPlan}
import org.apache.spark.sql.execution.{ExtendedMode, FormattedMode, RDDScanExec, SimpleMode, SortExec, SparkPlan}
import org.apache.spark.sql.execution.datasources.v2.{BatchScanExec, DataSourceV2ScanRelation, GroupPartitionsExec}
import org.apache.spark.sql.execution.exchange.{ShuffleExchangeExec, ShuffleExchangeLike}
import org.apache.spark.sql.execution.joins.SortMergeJoinExec
Expand Down Expand Up @@ -3568,4 +3568,126 @@ class KeyGroupedPartitioningSuite extends DistributionAndOrderingSuiteBase with
}
}
}

test("SPARK-56241: scan with KeyedPartitioning reports key-derived outputOrdering") {
val items_partitions = Array(identity("id"))
createTable(items, itemsColumns, items_partitions)
sql(s"INSERT INTO testcat.ns.$items VALUES " +
"(3, 'cc', 30.0, cast('2021-01-01' as timestamp)), " +
"(1, 'aa', 10.0, cast('2022-01-01' as timestamp)), " +
"(2, 'bb', 20.0, cast('2022-01-01' as timestamp))")

val df = sql(s"SELECT id, name FROM testcat.ns.$items")
val plan = df.queryExecution.executedPlan
val scans = collectScans(plan)
assert(scans.size === 1)
// With the config disabled (default), ordering derivation is suppressed.
assert(scans.head.outputOrdering.isEmpty)
// When enabled, the scan derives an ascending sort on the partition key `id`.
// identity transforms are unwrapped to AttributeReferences by V2ExpressionUtils.
withSQLConf(SQLConf.V2_BUCKETING_PARTITION_KEY_ORDERING_ENABLED.key -> "true") {
val scansEnabled = collectScans(df.queryExecution.executedPlan)
assert(scansEnabled.size === 1)
val ordering = scansEnabled.head.outputOrdering
assert(ordering.length === 1)
assert(ordering.head.direction === Ascending)
val keyExpr = ordering.head.child
assert(keyExpr.isInstanceOf[AttributeReference])
assert(keyExpr.asInstanceOf[AttributeReference].name === "id")
}
}

test("SPARK-56241: GroupPartitionsExec non-coalescing passes through child ordering, " +
"no pre-join SortExec needed before SortMergeJoin") {
// Non-identical key sets force GroupPartitionsExec to be inserted on both sides align them,
// but each group has exactly one partition — no coalescing.
val items_partitions = Array(identity("id"))
createTable(items, itemsColumns, items_partitions)
sql(s"INSERT INTO testcat.ns.$items VALUES " +
"(1, 'aa', 10.0, cast('2021-01-01' as timestamp)), " +
"(2, 'bb', 20.0, cast('2021-01-01' as timestamp)), " +
"(3, 'cc', 30.0, cast('2021-01-01' as timestamp))")

val purchases_partitions = Array(identity("item_id"))
createTable(purchases, purchasesColumns, purchases_partitions)
sql(s"INSERT INTO testcat.ns.$purchases VALUES " +
"(1, 100.0, cast('2021-01-01' as timestamp)), " +
"(2, 200.0, cast('2021-01-01' as timestamp))")

// GroupPartitionsExec passes through the child's key-derived outputOrdering.
// EnsureRequirements checks outputOrdering directly so no SortExec should be inserted before
// the SMJ.
withSQLConf(SQLConf.V2_BUCKETING_PARTITION_KEY_ORDERING_ENABLED.key -> "true") {
val df = sql(
s"""
|${selectWithMergeJoinHint("i", "p")}
|i.id, i.name
|FROM testcat.ns.$items i JOIN testcat.ns.$purchases p ON p.item_id = i.id
|""".stripMargin)

checkAnswer(df, Seq(Row(1, "aa"), Row(2, "bb")))

val plan = df.queryExecution.executedPlan
val groupPartitions = collectGroupPartitions(plan)
assert(groupPartitions.nonEmpty, "expected GroupPartitionsExec in plan")
assert(groupPartitions.forall(_.groupedPartitions.forall(_._2.size <= 1)),
"expected non-coalescing GroupPartitionsExec")
val smjs = collect(plan) { case j: SortMergeJoinExec => j }
assert(smjs.nonEmpty, "expected SortMergeJoinExec in plan")
smjs.foreach { smj =>
val sorts = smj.children.flatMap(child => collect(child) { case s: SortExec => s })
assert(sorts.isEmpty, "should not add SortExec before SMJ when ordering passes through " +
"non-coalescing GroupPartitions")
}
}
}

test("SPARK-56241: GroupPartitionsExec coalescing derives ordering from key expressions, " +
"no pre-join SortExec needed before SortMergeJoin") {
// Duplicate key 1 on both sides causes coalescing.
val items_partitions = Array(identity("id"))
createTable(items, itemsColumns, items_partitions)
sql(s"INSERT INTO testcat.ns.$items VALUES " +
"(1, 'aa', 10.0, cast('2021-01-01' as timestamp)), " +
"(1, 'ab', 11.0, cast('2021-06-01' as timestamp)), " +
"(2, 'bb', 20.0, cast('2021-01-01' as timestamp))")

val purchases_partitions = Array(identity("item_id"))
createTable(purchases, purchasesColumns, purchases_partitions)
sql(s"INSERT INTO testcat.ns.$purchases VALUES " +
"(1, 100.0, cast('2021-01-01' as timestamp)), " +
"(1, 110.0, cast('2021-06-01' as timestamp)), " +
"(2, 200.0, cast('2021-01-01' as timestamp))")

// GroupPartitionsExec derives outputOrdering from the key expressions after coalescing.
// EnsureRequirements checks outputOrdering directly so no SortExec should be inserted before
// the SMJ.
withSQLConf(
SQLConf.V2_BUCKETING_PARTITION_KEY_ORDERING_ENABLED.key -> "true",
SQLConf.V2_BUCKETING_PRESERVE_KEY_ORDERING_ON_COALESCE_ENABLED.key -> "true") {
val df = sql(
s"""
|${selectWithMergeJoinHint("i", "p")}
|i.id, i.name
|FROM testcat.ns.$items i JOIN testcat.ns.$purchases p ON p.item_id = i.id
|""".stripMargin)

checkAnswer(df, Seq(
Row(1, "aa"), Row(1, "aa"), Row(1, "ab"), Row(1, "ab"),
Row(2, "bb")))

val plan = df.queryExecution.executedPlan
val groupPartitions = collectGroupPartitions(plan)
assert(groupPartitions.nonEmpty, "expected GroupPartitionsExec in plan")
assert(groupPartitions.exists(_.groupedPartitions.exists(_._2.size > 1)),
"expected coalescing GroupPartitionsExec")
val smjs = collect(plan) { case j: SortMergeJoinExec => j }
assert(smjs.nonEmpty, "expected SortMergeJoinExec in plan")
smjs.foreach { smj =>
val sorts = smj.children.flatMap(child => collect(child) { case s: SortExec => s })
assert(sorts.isEmpty, "should not add SortExec before SMJ when ordering is derived " +
"from coalesced partition key")
}
}
}
}
Loading