From fa5e9fc67f4b6a63d7fe1923cd8e88549e34e6e6 Mon Sep 17 00:00:00 2001 From: Wenchen Fan Date: Fri, 29 May 2026 16:40:24 +0000 Subject: [PATCH 1/2] [SPARK-56032][SQL] Skip FilterExec subexpression elimination codegen when there is no common subexpression ### What changes were proposed in this pull request? `FilterExec` whole-stage codegen takes the subexpression-elimination (CSE) path whenever `subexpressionEliminationEnabled && otherPreds.nonEmpty`, regardless of whether any common subexpression actually exists. That path emits an `inputVarsEvalCode` prologue at the top of the per-row loop that eagerly evaluates every input column referenced by `otherPreds` (required so eliminated subexpressions can be materialized into shared variables). When there is nothing to eliminate, this prologue provides no benefit but still defeats the short-circuiting the non-CSE path gets from loading columns lazily, just before the predicate that needs them. This PR gates the CSE path on whether `otherPreds` actually contain a common subexpression (`hasCommonSubexpressions`, using the same `EquivalentExpressions` detection and `output` binding as the CSE codegen). When there is none, it falls back to the non-CSE `generatePredicateCode`, which loads columns lazily and preserves short-circuiting. ### Why are the changes needed? For a filter with no common subexpression but multiple conjuncts over different columns (e.g. `q_int BETWEEN ... AND (decimal_a BETWEEN ... OR decimal_b BETWEEN ...)`), the eager prologue decodes the decimal columns for every row, including rows a cheaper earlier predicate would have rejected. Decoding a high-precision decimal allocates a BigInteger/BigDecimal per call, so this is pure waste and shows up as a measurable performance regression versus the lazy non-CSE path. ### Does this PR introduce _any_ user-facing change? No. This is a codegen-only change; query results are unchanged. ### How was this patch tested? New unit test in `WholeStageCodegenSuite` asserting that, for a filter with no common subexpression, the CSE-enabled generated code is identical to the CSE-disabled code (i.e. it falls back to the lazy, short-circuiting path). The existing `FilterExec` CSE tests, which use genuine common subexpressions, still exercise the CSE path and pass. ### Was this patch authored or co-authored using generative AI tooling? Generated-by: Claude (Claude Code) Co-authored-by: Isaac --- .../execution/basicPhysicalOperators.scala | 21 +++++++++- .../execution/WholeStageCodegenSuite.scala | 38 +++++++++++++++++++ 2 files changed, 58 insertions(+), 1 deletion(-) diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/basicPhysicalOperators.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/basicPhysicalOperators.scala index 92cf3f59d575b..27d4c946638d1 100644 --- a/sql/core/src/main/scala/org/apache/spark/sql/execution/basicPhysicalOperators.scala +++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/basicPhysicalOperators.scala @@ -242,6 +242,24 @@ case class FilterExec(condition: Expression, child: SparkPlan) // The columns that will filtered out by `IsNotNull` could be considered as not nullable. private val notNullAttributes = notNullPreds.flatMap(_.references).distinct.map(_.exprId) + // Whether `otherPreds` contain any common subexpression worth eliminating. The CSE codegen + // path eagerly evaluates every `otherPreds` input column at the top of the row loop (the + // `inputVarsEvalCode` prologue), which is required so the eliminated subexpressions can be + // materialized into shared variables. That eager evaluation defeats the short-circuiting the + // non-CSE path gets from loading columns lazily, just before the predicate that needs them. + // When there is no common subexpression to eliminate, the CSE path provides no benefit but + // still pays the eager-decode cost (e.g. decoding a decimal column for rows a cheaper earlier + // predicate would have rejected), so we fall back to the non-CSE `generatePredicateCode`. + // This uses the same binding (`output`) and detection (`getCommonSubexpressions`) as the CSE + // codegen below, so it agrees exactly with whether that path would find anything to eliminate. + private lazy val hasCommonSubexpressions: Boolean = { + val equivalentExpressions = new EquivalentExpressions + otherPreds.foreach { p => + equivalentExpressions.addExprTree(BindReferences.bindReference(p, output)) + } + equivalentExpressions.getCommonSubexpressions.nonEmpty + } + // Mark this as empty. We'll evaluate the input during doConsume(). We don't want to evaluate // all the variables at the beginning to take advantage of short circuiting. override def usedInputs: AttributeSet = AttributeSet.empty @@ -292,7 +310,8 @@ case class FilterExec(condition: Expression, child: SparkPlan) // for free, since the IsNotNull check fires before the CSE precompute keyed // off the same reference. val (prologueCode, predicateCode) = - if (conf.subexpressionEliminationEnabled && otherPreds.nonEmpty) { + if (conf.subexpressionEliminationEnabled && otherPreds.nonEmpty && + hasCommonSubexpressions) { // Pre-evaluate input variables before CSE analysis: CSE clears // ctx.currentVars[i].code as a side effect; without this pre-evaluation, Janino // fails when otherPreds reference the same input columns that CSE already diff --git a/sql/core/src/test/scala/org/apache/spark/sql/execution/WholeStageCodegenSuite.scala b/sql/core/src/test/scala/org/apache/spark/sql/execution/WholeStageCodegenSuite.scala index e5b9e7016841e..8f0ec0ffd6f1d 100644 --- a/sql/core/src/test/scala/org/apache/spark/sql/execution/WholeStageCodegenSuite.scala +++ b/sql/core/src/test/scala/org/apache/spark/sql/execution/WholeStageCodegenSuite.scala @@ -1186,4 +1186,42 @@ class WholeStageCodegenSuite extends SharedSparkSession } } } + + test("SPARK-56032: FilterExec skips CSE codegen when there is no common subexpression") { + // When otherPreds share no common subexpression, the CSE codegen path provides no benefit + // but would still eagerly evaluate every referenced input column at the top of the row loop + // (the inputVarsEvalCode prologue), defeating the lazy, short-circuiting column loads of the + // non-CSE path. Verify that with CSE enabled we fall back to the exact same generated code as + // with CSE disabled, so no column is decoded for rows an earlier predicate would reject. + val schema = StructType(Seq( + StructField("a", IntegerType, nullable = true), + StructField("b", IntegerType, nullable = true))) + val data = spark.sparkContext.parallelize(Seq( + Row(1, 5), Row(null, 3), Row(4, null), Row(5, 6), Row(7, 8), Row(2, 3))) + val expected = Seq(Row(5, 6), Row(7, 8)) + + def filterCode(cseEnabled: Boolean): String = { + withSQLConf( + SQLConf.SUBEXPRESSION_ELIMINATION_ENABLED.key -> cseEnabled.toString, + SQLConf.WHOLESTAGE_CODEGEN_ENABLED.key -> "true", + SQLConf.ADAPTIVE_EXECUTION_ENABLED.key -> "false") { + val df = spark.createDataFrame(data, schema) + // `a > 4` and `b > 4` reference different columns and share no subexpression. + val filtered = df.where("a IS NOT NULL AND a > 4 AND b > 4") + val plan = filtered.queryExecution.executedPlan + assert(plan.exists(_.isInstanceOf[WholeStageCodegenExec]), + "Filter should be in whole-stage codegen") + checkAnswer(filtered, expected) + codegenString(plan) + } + } + + // Each `createDataFrame` mints fresh attribute exprIds (e.g. `a#16`), which appear in the + // plan-tree header of the codegen dump but not in the generated Java. Normalize them away so + // the comparison reflects the generated code, not the id counter. + def normalize(code: String): String = code.replaceAll("#\\d+", "#") + assert(normalize(filterCode(cseEnabled = true)) == normalize(filterCode(cseEnabled = false)), + "With no common subexpression, CSE-enabled FilterExec codegen should be identical to " + + "CSE-disabled codegen (i.e. fall back to the lazy, short-circuiting non-CSE path)") + } } From b35db911bf08fc24a70674214bfbfa97ea5c4f56 Mon Sep 17 00:00:00 2001 From: Wenchen Fan Date: Fri, 29 May 2026 17:01:40 +0000 Subject: [PATCH 2/2] Build EquivalentExpressions once for the CSE gate Reuse a single EquivalentExpressions analysis between the has-common-subexpression gate and the CSE codegen, instead of building it twice. Add a subexpressionEliminationForWholeStageCodegen overload that takes a pre-built EquivalentExpressions, and have FilterExec hold the bound predicates and their CSE analysis as @transient lazy vals (driver-only codegen state; EquivalentExpressions is not serializable). Co-authored-by: Isaac --- .../expressions/codegen/CodeGenerator.scala | 16 +++++-- .../execution/basicPhysicalOperators.scala | 42 +++++++++++-------- 2 files changed, 36 insertions(+), 22 deletions(-) diff --git a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/expressions/codegen/CodeGenerator.scala b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/expressions/codegen/CodeGenerator.scala index 330116e592482..f83152225933b 100644 --- a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/expressions/codegen/CodeGenerator.scala +++ b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/expressions/codegen/CodeGenerator.scala @@ -1150,14 +1150,22 @@ class CodegenContext extends Logging { * evaluation, we can look for generated subexpressions and do replacement. */ def subexpressionEliminationForWholeStageCodegen(expressions: Seq[Expression]): SubExprCodes = { - // Create a clear EquivalentExpressions and SubExprEliminationState mapping + // Create a clear EquivalentExpressions and compute the common subexpressions. val equivalentExpressions: EquivalentExpressions = new EquivalentExpressions + expressions.foreach(equivalentExpressions.addExprTree(_)) + subexpressionEliminationForWholeStageCodegen(equivalentExpressions) + } + + /** + * Same as above, but takes a pre-built [[EquivalentExpressions]]. A caller that has already + * analyzed the expressions (e.g. to decide whether any common subexpression exists) can reuse + * that analysis here instead of rebuilding it. + */ + def subexpressionEliminationForWholeStageCodegen( + equivalentExpressions: EquivalentExpressions): SubExprCodes = { val localSubExprEliminationExprsForNonSplit = mutable.HashMap.empty[ExpressionEquals, SubExprEliminationState] - // Add each expression tree and compute the common subexpressions. - expressions.foreach(equivalentExpressions.addExprTree(_)) - // Get all the expressions that appear at least twice and set up the state for subexpression // elimination. val commonExprs = equivalentExpressions.getCommonSubexpressions diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/basicPhysicalOperators.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/basicPhysicalOperators.scala index 27d4c946638d1..8d183f915e8ac 100644 --- a/sql/core/src/main/scala/org/apache/spark/sql/execution/basicPhysicalOperators.scala +++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/basicPhysicalOperators.scala @@ -242,22 +242,20 @@ case class FilterExec(condition: Expression, child: SparkPlan) // The columns that will filtered out by `IsNotNull` could be considered as not nullable. private val notNullAttributes = notNullPreds.flatMap(_.references).distinct.map(_.exprId) - // Whether `otherPreds` contain any common subexpression worth eliminating. The CSE codegen - // path eagerly evaluates every `otherPreds` input column at the top of the row loop (the - // `inputVarsEvalCode` prologue), which is required so the eliminated subexpressions can be - // materialized into shared variables. That eager evaluation defeats the short-circuiting the - // non-CSE path gets from loading columns lazily, just before the predicate that needs them. - // When there is no common subexpression to eliminate, the CSE path provides no benefit but - // still pays the eager-decode cost (e.g. decoding a decimal column for rows a cheaper earlier - // predicate would have rejected), so we fall back to the non-CSE `generatePredicateCode`. - // This uses the same binding (`output`) and detection (`getCommonSubexpressions`) as the CSE - // codegen below, so it agrees exactly with whether that path would find anything to eliminate. - private lazy val hasCommonSubexpressions: Boolean = { + // `otherPreds` bound against this operator's `output`, shared between the CSE gate in + // `doConsume` and the CSE codegen itself. Codegen-only derived state, so `@transient`: it is + // computed on the driver during code generation and never accessed on executors. + @transient private lazy val boundOtherPreds: Seq[Expression] = + otherPreds.map(BindReferences.bindReference(_, output)) + + // CSE analysis of `boundOtherPreds`, built once and reused. `doConsume` consults it to decide + // whether any common subexpression is worth eliminating; when one is, the same analysis is + // handed to `subexpressionEliminationForWholeStageCodegen` rather than rebuilt. `@transient` + // because `EquivalentExpressions` is not serializable (and this is driver-only codegen state). + @transient private lazy val otherPredsEquivalentExpressions: EquivalentExpressions = { val equivalentExpressions = new EquivalentExpressions - otherPreds.foreach { p => - equivalentExpressions.addExprTree(BindReferences.bindReference(p, output)) - } - equivalentExpressions.getCommonSubexpressions.nonEmpty + boundOtherPreds.foreach(equivalentExpressions.addExprTree(_)) + equivalentExpressions } // Mark this as empty. We'll evaluate the input during doConsume(). We don't want to evaluate @@ -309,9 +307,17 @@ case class FilterExec(condition: Expression, child: SparkPlan) // without consulting `isNull_X`. The (b) interleaving gives us that ordering // for free, since the IsNotNull check fires before the CSE precompute keyed // off the same reference. + // Only take the CSE path when there is actually a common subexpression to eliminate. That + // path emits the `inputVarsEvalCode` prologue below, which eagerly evaluates every + // `otherPreds` input column at the top of the row loop -- required so eliminated + // subexpressions can be materialized into shared variables, but it defeats the + // short-circuiting the non-CSE path gets from loading columns lazily, just before the + // predicate that needs them. With no common subexpression the prologue is pure overhead + // (e.g. decoding a decimal column for rows a cheaper earlier predicate would reject), so we + // fall back to `generatePredicateCode`. val (prologueCode, predicateCode) = if (conf.subexpressionEliminationEnabled && otherPreds.nonEmpty && - hasCommonSubexpressions) { + otherPredsEquivalentExpressions.getCommonSubexpressions.nonEmpty) { // Pre-evaluate input variables before CSE analysis: CSE clears // ctx.currentVars[i].code as a side effect; without this pre-evaluation, Janino // fails when otherPreds reference the same input columns that CSE already @@ -320,8 +326,8 @@ case class FilterExec(condition: Expression, child: SparkPlan) val inputVarsEvalCode = evaluateRequiredVariables( child.output, input, otherPredInputAttrs) - val boundOtherPreds = otherPreds.map(BindReferences.bindReference(_, output)) - val subExprs = ctx.subexpressionEliminationForWholeStageCodegen(boundOtherPreds) + val subExprs = + ctx.subexpressionEliminationForWholeStageCodegen(otherPredsEquivalentExpressions) // Group CSE states by the index of the first otherPred that references them. // `evaluateSubExprEliminationState` recursively emits each state's children