diff --git a/sql/catalyst/src/main/scala/org/apache/spark/sql/internal/SQLConf.scala b/sql/catalyst/src/main/scala/org/apache/spark/sql/internal/SQLConf.scala index 22f5b3f6c7928..5639b6bbfbf4f 100644 --- a/sql/catalyst/src/main/scala/org/apache/spark/sql/internal/SQLConf.scala +++ b/sql/catalyst/src/main/scala/org/apache/spark/sql/internal/SQLConf.scala @@ -2165,6 +2165,32 @@ object SQLConf { .booleanConf .createWithDefault(false) + val V2_BUCKETING_PARTITION_KEY_ORDERING_ENABLED = + buildConf("spark.sql.sources.v2.bucketing.partitionKeyOrdering.enabled") + .doc("When enabled, Spark derives output ordering from the partition key expressions of " + + "a V2 data source that reports a KeyedPartitioning but does not report explicit ordering " + + "via SupportsReportOrdering. Within a single partition all rows share the same key " + + s"value, so the data is trivially sorted by those expressions. Requires " + + s"${V2_BUCKETING_ENABLED.key} to be enabled.") + .version("4.2.0") + .withBindingPolicy(ConfigBindingPolicy.SESSION) + .booleanConf + .createWithDefault(false) + + val V2_BUCKETING_PRESERVE_KEY_ORDERING_ON_COALESCE_ENABLED = + buildConf("spark.sql.sources.v2.bucketing.preserveKeyOrderingOnCoalesce.enabled") + .doc("When enabled, Spark preserves sort orders over partition key expressions when " + + "GroupPartitionsExec coalesces multiple input partitions into one output partition. " + + "Because all merged partitions share the same partition key value, sort orders over " + + "those key expressions remain valid after the merge. This applies to both key-derived " + + "ordering (from SupportsReportOrdering) and ordering derived from " + + s"${V2_BUCKETING_PARTITION_KEY_ORDERING_ENABLED.key}. Requires " + + s"${V2_BUCKETING_ENABLED.key} to be enabled.") + .version("4.2.0") + .withBindingPolicy(ConfigBindingPolicy.SESSION) + .booleanConf + .createWithDefault(false) + val BUCKETING_MAX_BUCKETS = buildConf("spark.sql.sources.bucketing.maxBuckets") .doc("The maximum number of buckets allowed.") .version("2.4.0") @@ -7731,6 +7757,12 @@ class SQLConf extends Serializable with Logging with SqlApiConf { def v2BucketingAllowSorting: Boolean = getConf(SQLConf.V2_BUCKETING_SORTING_ENABLED) + def v2BucketingPartitionKeyOrderingEnabled: Boolean = + getConf(SQLConf.V2_BUCKETING_PARTITION_KEY_ORDERING_ENABLED) + + def v2BucketingPreserveKeyOrderingOnCoalesceEnabled: Boolean = + getConf(SQLConf.V2_BUCKETING_PRESERVE_KEY_ORDERING_ON_COALESCE_ENABLED) + def dataFrameSelfJoinAutoResolveAmbiguity: Boolean = getConf(DATAFRAME_SELF_JOIN_AUTO_RESOLVE_AMBIGUITY) diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/v2/DataSourceV2ScanExecBase.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/v2/DataSourceV2ScanExecBase.scala index 877e65341c1c8..a1a6c6e022482 100644 --- a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/v2/DataSourceV2ScanExecBase.scala +++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/v2/DataSourceV2ScanExecBase.scala @@ -19,7 +19,7 @@ package org.apache.spark.sql.execution.datasources.v2 import org.apache.spark.rdd.RDD import org.apache.spark.sql.catalyst.InternalRow -import org.apache.spark.sql.catalyst.expressions.{Expression, RowOrdering, SortOrder} +import org.apache.spark.sql.catalyst.expressions.{Ascending, Expression, RowOrdering, SortOrder} import org.apache.spark.sql.catalyst.plans.physical import org.apache.spark.sql.catalyst.plans.physical.KeyedPartitioning import org.apache.spark.sql.catalyst.util.truncatedString @@ -104,11 +104,21 @@ trait DataSourceV2ScanExecBase extends LeafExecNode { } /** - * Returns the output ordering from the data source if available, otherwise falls back - * to the default (no ordering). This allows data sources to report their natural ordering - * through `SupportsReportOrdering`. + * Returns the output ordering for this scan. When the source reports ordering via + * `SupportsReportOrdering`, that ordering is returned as-is. Otherwise, when the output + * partitioning is a `KeyedPartitioning` and + * `spark.sql.sources.v2.bucketing.partitionKeyOrdering.enabled` is on, each partition + * contains rows where the key expressions evaluate to a single constant value, so the data + * is trivially sorted by those expressions within the partition. */ - override def outputOrdering: Seq[SortOrder] = ordering.getOrElse(super.outputOrdering) + override def outputOrdering: Seq[SortOrder] = { + (ordering, outputPartitioning) match { + case (Some(o), _) => o + case (_, k: KeyedPartitioning) if conf.v2BucketingPartitionKeyOrderingEnabled => + k.expressions.map(SortOrder(_, Ascending)) + case _ => Seq.empty + } + } override def supportsColumnar: Boolean = { scan.columnarSupportMode() match { diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/v2/GroupPartitionsExec.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/v2/GroupPartitionsExec.scala index 7ed394df8b300..81981c29b2b31 100644 --- a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/v2/GroupPartitionsExec.scala +++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/v2/GroupPartitionsExec.scala @@ -184,11 +184,34 @@ case class GroupPartitionsExec( copy(child = newChild) override def outputOrdering: Seq[SortOrder] = { - // when multiple partitions are grouped together, ordering inside partitions is not preserved if (groupedPartitions.forall(_._2.size <= 1)) { + // No coalescing: each output partition is exactly one input partition. The child's + // within-partition ordering is fully preserved (including any key-derived ordering that + // `DataSourceV2ScanExecBase` already prepended). child.outputOrdering } else { - super.outputOrdering + // Coalescing: multiple input partitions are merged into one output partition. The child's + // within-partition ordering is lost due to concatenation -- for example, if two input + // partitions both share key=A and hold rows (A,1),(A,3) and (A,2),(A,5) respectively (each + // sorted ascending by the data column), concatenating them yields (A,1),(A,3),(A,2),(A,5) + // which is no longer sorted by the data column. Only sort orders over partition key + // expressions remain valid -- they evaluate to the same value (A) in every merged partition. + outputPartitioning match { + case p: Partitioning with Expression + if reducers.isEmpty && conf.v2BucketingPreserveKeyOrderingOnCoalesceEnabled => + // Without reducers all merged partitions share the same original key value, so the key + // expressions remain constant within the output partition. The child's outputOrdering + // should already be in sync with the partitioning (either reported by the source or + // derived from it in DataSourceV2ScanExecBase), so we only need to keep the sort orders + // whose expression is a partition key expression -- all others are lost by concatenation. + val keyedPartitionings = p.collect { case k: KeyedPartitioning => k } + val keyExprs = ExpressionSet(keyedPartitionings.flatMap(_.expressions)) + child.outputOrdering.filter(order => keyExprs.contains(order.child)) + case _ => + // With reducers, merged partitions share only the reduced key, not the original key + // expressions, which can take different values within the output partition. + super.outputOrdering + } } } diff --git a/sql/core/src/test/scala/org/apache/spark/sql/connector/KeyGroupedPartitioningSuite.scala b/sql/core/src/test/scala/org/apache/spark/sql/connector/KeyGroupedPartitioningSuite.scala index 688196b47502e..44cb3a23cfa7c 100644 --- a/sql/core/src/test/scala/org/apache/spark/sql/connector/KeyGroupedPartitioningSuite.scala +++ b/sql/core/src/test/scala/org/apache/spark/sql/connector/KeyGroupedPartitioningSuite.scala @@ -22,14 +22,14 @@ import java.util.Collections import org.apache.spark.{SparkConf, SparkException} import org.apache.spark.sql.{DataFrame, ExplainSuiteHelper, Row} import org.apache.spark.sql.catalyst.InternalRow -import org.apache.spark.sql.catalyst.expressions.{Literal, TransformExpression} +import org.apache.spark.sql.catalyst.expressions.{Ascending, AttributeReference, Literal, TransformExpression} import org.apache.spark.sql.catalyst.plans.physical import org.apache.spark.sql.connector.catalog.{Column, Identifier, InMemoryTableCatalog} import org.apache.spark.sql.connector.catalog.functions._ import org.apache.spark.sql.connector.distributions.Distributions import org.apache.spark.sql.connector.expressions._ import org.apache.spark.sql.connector.expressions.Expressions._ -import org.apache.spark.sql.execution.{ExtendedMode, FormattedMode, RDDScanExec, SimpleMode, SparkPlan} +import org.apache.spark.sql.execution.{ExtendedMode, FormattedMode, RDDScanExec, SimpleMode, SortExec, SparkPlan} import org.apache.spark.sql.execution.datasources.v2.{BatchScanExec, DataSourceV2ScanRelation, GroupPartitionsExec} import org.apache.spark.sql.execution.exchange.{ShuffleExchangeExec, ShuffleExchangeLike} import org.apache.spark.sql.execution.joins.SortMergeJoinExec @@ -3568,4 +3568,126 @@ class KeyGroupedPartitioningSuite extends DistributionAndOrderingSuiteBase with } } } + + test("SPARK-56241: scan with KeyedPartitioning reports key-derived outputOrdering") { + val items_partitions = Array(identity("id")) + createTable(items, itemsColumns, items_partitions) + sql(s"INSERT INTO testcat.ns.$items VALUES " + + "(3, 'cc', 30.0, cast('2021-01-01' as timestamp)), " + + "(1, 'aa', 10.0, cast('2022-01-01' as timestamp)), " + + "(2, 'bb', 20.0, cast('2022-01-01' as timestamp))") + + val df = sql(s"SELECT id, name FROM testcat.ns.$items") + val plan = df.queryExecution.executedPlan + val scans = collectScans(plan) + assert(scans.size === 1) + // With the config disabled (default), ordering derivation is suppressed. + assert(scans.head.outputOrdering.isEmpty) + // When enabled, the scan derives an ascending sort on the partition key `id`. + // identity transforms are unwrapped to AttributeReferences by V2ExpressionUtils. + withSQLConf(SQLConf.V2_BUCKETING_PARTITION_KEY_ORDERING_ENABLED.key -> "true") { + val scansEnabled = collectScans(df.queryExecution.executedPlan) + assert(scansEnabled.size === 1) + val ordering = scansEnabled.head.outputOrdering + assert(ordering.length === 1) + assert(ordering.head.direction === Ascending) + val keyExpr = ordering.head.child + assert(keyExpr.isInstanceOf[AttributeReference]) + assert(keyExpr.asInstanceOf[AttributeReference].name === "id") + } + } + + test("SPARK-56241: GroupPartitionsExec non-coalescing passes through child ordering, " + + "no pre-join SortExec needed before SortMergeJoin") { + // Non-identical key sets force GroupPartitionsExec to be inserted on both sides align them, + // but each group has exactly one partition — no coalescing. + val items_partitions = Array(identity("id")) + createTable(items, itemsColumns, items_partitions) + sql(s"INSERT INTO testcat.ns.$items VALUES " + + "(1, 'aa', 10.0, cast('2021-01-01' as timestamp)), " + + "(2, 'bb', 20.0, cast('2021-01-01' as timestamp)), " + + "(3, 'cc', 30.0, cast('2021-01-01' as timestamp))") + + val purchases_partitions = Array(identity("item_id")) + createTable(purchases, purchasesColumns, purchases_partitions) + sql(s"INSERT INTO testcat.ns.$purchases VALUES " + + "(1, 100.0, cast('2021-01-01' as timestamp)), " + + "(2, 200.0, cast('2021-01-01' as timestamp))") + + // GroupPartitionsExec passes through the child's key-derived outputOrdering. + // EnsureRequirements checks outputOrdering directly so no SortExec should be inserted before + // the SMJ. + withSQLConf(SQLConf.V2_BUCKETING_PARTITION_KEY_ORDERING_ENABLED.key -> "true") { + val df = sql( + s""" + |${selectWithMergeJoinHint("i", "p")} + |i.id, i.name + |FROM testcat.ns.$items i JOIN testcat.ns.$purchases p ON p.item_id = i.id + |""".stripMargin) + + checkAnswer(df, Seq(Row(1, "aa"), Row(2, "bb"))) + + val plan = df.queryExecution.executedPlan + val groupPartitions = collectGroupPartitions(plan) + assert(groupPartitions.nonEmpty, "expected GroupPartitionsExec in plan") + assert(groupPartitions.forall(_.groupedPartitions.forall(_._2.size <= 1)), + "expected non-coalescing GroupPartitionsExec") + val smjs = collect(plan) { case j: SortMergeJoinExec => j } + assert(smjs.nonEmpty, "expected SortMergeJoinExec in plan") + smjs.foreach { smj => + val sorts = smj.children.flatMap(child => collect(child) { case s: SortExec => s }) + assert(sorts.isEmpty, "should not add SortExec before SMJ when ordering passes through " + + "non-coalescing GroupPartitions") + } + } + } + + test("SPARK-56241: GroupPartitionsExec coalescing derives ordering from key expressions, " + + "no pre-join SortExec needed before SortMergeJoin") { + // Duplicate key 1 on both sides causes coalescing. + val items_partitions = Array(identity("id")) + createTable(items, itemsColumns, items_partitions) + sql(s"INSERT INTO testcat.ns.$items VALUES " + + "(1, 'aa', 10.0, cast('2021-01-01' as timestamp)), " + + "(1, 'ab', 11.0, cast('2021-06-01' as timestamp)), " + + "(2, 'bb', 20.0, cast('2021-01-01' as timestamp))") + + val purchases_partitions = Array(identity("item_id")) + createTable(purchases, purchasesColumns, purchases_partitions) + sql(s"INSERT INTO testcat.ns.$purchases VALUES " + + "(1, 100.0, cast('2021-01-01' as timestamp)), " + + "(1, 110.0, cast('2021-06-01' as timestamp)), " + + "(2, 200.0, cast('2021-01-01' as timestamp))") + + // GroupPartitionsExec derives outputOrdering from the key expressions after coalescing. + // EnsureRequirements checks outputOrdering directly so no SortExec should be inserted before + // the SMJ. + withSQLConf( + SQLConf.V2_BUCKETING_PARTITION_KEY_ORDERING_ENABLED.key -> "true", + SQLConf.V2_BUCKETING_PRESERVE_KEY_ORDERING_ON_COALESCE_ENABLED.key -> "true") { + val df = sql( + s""" + |${selectWithMergeJoinHint("i", "p")} + |i.id, i.name + |FROM testcat.ns.$items i JOIN testcat.ns.$purchases p ON p.item_id = i.id + |""".stripMargin) + + checkAnswer(df, Seq( + Row(1, "aa"), Row(1, "aa"), Row(1, "ab"), Row(1, "ab"), + Row(2, "bb"))) + + val plan = df.queryExecution.executedPlan + val groupPartitions = collectGroupPartitions(plan) + assert(groupPartitions.nonEmpty, "expected GroupPartitionsExec in plan") + assert(groupPartitions.exists(_.groupedPartitions.exists(_._2.size > 1)), + "expected coalescing GroupPartitionsExec") + val smjs = collect(plan) { case j: SortMergeJoinExec => j } + assert(smjs.nonEmpty, "expected SortMergeJoinExec in plan") + smjs.foreach { smj => + val sorts = smj.children.flatMap(child => collect(child) { case s: SortExec => s }) + assert(sorts.isEmpty, "should not add SortExec before SMJ when ordering is derived " + + "from coalesced partition key") + } + } + } } diff --git a/sql/core/src/test/scala/org/apache/spark/sql/execution/datasources/v2/GroupPartitionsExecSuite.scala b/sql/core/src/test/scala/org/apache/spark/sql/execution/datasources/v2/GroupPartitionsExecSuite.scala new file mode 100644 index 0000000000000..c37e051929555 --- /dev/null +++ b/sql/core/src/test/scala/org/apache/spark/sql/execution/datasources/v2/GroupPartitionsExecSuite.scala @@ -0,0 +1,144 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.spark.sql.execution.datasources.v2 + +import org.apache.spark.sql.catalyst.InternalRow +import org.apache.spark.sql.catalyst.expressions.{Ascending, AttributeReference, SortOrder} +import org.apache.spark.sql.catalyst.plans.physical.{KeyedPartitioning, PartitioningCollection} +import org.apache.spark.sql.execution.DummySparkPlan +import org.apache.spark.sql.internal.SQLConf +import org.apache.spark.sql.test.SharedSparkSession +import org.apache.spark.sql.types.IntegerType + +class GroupPartitionsExecSuite extends SharedSparkSession { + + private val exprA = AttributeReference("a", IntegerType)() + private val exprB = AttributeReference("b", IntegerType)() + private val exprC = AttributeReference("c", IntegerType)() + + private def row(a: Int): InternalRow = InternalRow.fromSeq(Seq(a)) + private def row(a: Int, b: Int): InternalRow = InternalRow.fromSeq(Seq(a, b)) + + test("SPARK-56241: non-coalescing passes through child ordering unchanged") { + // Each partition has a distinct key — no coalescing happens. + val partitionKeys = Seq(row(1), row(2), row(3)) + val childOrdering = Seq(SortOrder(exprA, Ascending)) + val child = DummySparkPlan( + outputPartitioning = KeyedPartitioning(Seq(exprA), partitionKeys), + outputOrdering = childOrdering) + val gpe = GroupPartitionsExec(child) + + assert(gpe.groupedPartitions.forall(_._2.size <= 1), "expected non-coalescing") + assert(gpe.outputOrdering === childOrdering) + } + + test("SPARK-56241: coalescing without reducers keeps key-expression orders from child") { + // Key 1 appears on partitions 0 and 2, causing coalescing. + val partitionKeys = Seq(row(1), row(2), row(1)) + val child = DummySparkPlan( + outputPartitioning = KeyedPartitioning(Seq(exprA), partitionKeys), + outputOrdering = Seq(SortOrder(exprA, Ascending))) + val gpe = GroupPartitionsExec(child) + + assert(!gpe.groupedPartitions.forall(_._2.size <= 1), "expected coalescing") + // With the config disabled (default), key-expression filtering is skipped. + assert(gpe.outputOrdering === Nil) + // When enabled, the key-expression order is preserved through coalescing. + withSQLConf(SQLConf.V2_BUCKETING_PRESERVE_KEY_ORDERING_ON_COALESCE_ENABLED.key -> "true") { + val ordering = gpe.outputOrdering + assert(ordering.length === 1) + assert(ordering.head.child === exprA) + assert(ordering.head.direction === Ascending) + assert(ordering.head.sameOrderExpressions.isEmpty) + } + } + + test("SPARK-56241: coalescing without reducers keeps one SortOrder per key expression") { + // Multi-key partition: key (1,10) appears on partitions 0 and 2, causing coalescing. + val partitionKeys = Seq(row(1, 10), row(2, 20), row(1, 10)) + val child = DummySparkPlan( + outputPartitioning = KeyedPartitioning(Seq(exprA, exprB), partitionKeys), + outputOrdering = Seq(SortOrder(exprA, Ascending), SortOrder(exprB, Ascending))) + val gpe = GroupPartitionsExec(child) + + assert(!gpe.groupedPartitions.forall(_._2.size <= 1), "expected coalescing") + assert(gpe.outputOrdering === Nil) + withSQLConf(SQLConf.V2_BUCKETING_PRESERVE_KEY_ORDERING_ON_COALESCE_ENABLED.key -> "true") { + val ordering = gpe.outputOrdering + assert(ordering.length === 2) + assert(ordering.head.child === exprA) + assert(ordering(1).child === exprB) + assert(ordering.head.sameOrderExpressions.isEmpty) + assert(ordering(1).sameOrderExpressions.isEmpty) + } + } + + test("SPARK-56241: coalescing join case preserves sameOrderExpressions from child") { + // PartitioningCollection wraps two KeyedPartitionings (one per join side), sharing the same + // partition keys. Key 1 coalesces partitions 0 and 2. The child (e.g. SortMergeJoinExec) + // already carries sameOrderExpressions linking both sides' key expressions. + val partitionKeys = Seq(row(1), row(2), row(1)) + val leftKP = KeyedPartitioning(Seq(exprA), partitionKeys) + val rightKP = KeyedPartitioning(Seq(exprB), partitionKeys) + val child = DummySparkPlan( + outputPartitioning = PartitioningCollection(Seq(leftKP, rightKP)), + outputOrdering = Seq(SortOrder(exprA, Ascending, sameOrderExpressions = Seq(exprB)))) + val gpe = GroupPartitionsExec(child) + + assert(!gpe.groupedPartitions.forall(_._2.size <= 1), "expected coalescing") + assert(gpe.outputOrdering === Nil) + withSQLConf(SQLConf.V2_BUCKETING_PRESERVE_KEY_ORDERING_ON_COALESCE_ENABLED.key -> "true") { + val ordering = gpe.outputOrdering + assert(ordering.length === 1) + assert(ordering.head.child === exprA) + assert(ordering.head.sameOrderExpressions === Seq(exprB)) + } + } + + test("SPARK-56241: coalescing drops non-key sort orders from child") { + // exprA is the partition key; exprC is a non-key sort order the child also reports + // (e.g. a secondary sort within each partition). After coalescing, exprC ordering is lost + // by concatenation, so only the exprA order should survive. + val partitionKeys = Seq(row(1), row(2), row(1)) + val child = DummySparkPlan( + outputPartitioning = KeyedPartitioning(Seq(exprA), partitionKeys), + outputOrdering = Seq(SortOrder(exprA, Ascending), SortOrder(exprC, Ascending))) + val gpe = GroupPartitionsExec(child) + + assert(!gpe.groupedPartitions.forall(_._2.size <= 1), "expected coalescing") + assert(gpe.outputOrdering === Nil) + withSQLConf(SQLConf.V2_BUCKETING_PRESERVE_KEY_ORDERING_ON_COALESCE_ENABLED.key -> "true") { + val ordering = gpe.outputOrdering + assert(ordering.length === 1) + assert(ordering.head.child === exprA) + } + } + + test("SPARK-56241: coalescing with reducers returns empty ordering") { + // When reducers are present, the original key expressions are not constant within the merged + // partition, so outputOrdering falls back to the default (empty). + val partitionKeys = Seq(row(1), row(2), row(1)) + val child = DummySparkPlan(outputPartitioning = KeyedPartitioning(Seq(exprA), partitionKeys)) + // reducers = Some(Seq(None)) - None element means identity reducer; the important thing is + // that reducers.isDefined, which triggers the fallback. + val gpe = GroupPartitionsExec(child, reducers = Some(Seq(None))) + + assert(!gpe.groupedPartitions.forall(_._2.size <= 1), "expected coalescing") + assert(gpe.outputOrdering === Nil) + } +}