Skip to content

[SPARK-56315][SQL] Pre-aggregate before Expand to reduce data amplification for multiple COUNT(DISTINCT)#55130

Open
LuciferYang wants to merge 6 commits intoapache:masterfrom
LuciferYang:optimize-expand
Open

[SPARK-56315][SQL] Pre-aggregate before Expand to reduce data amplification for multiple COUNT(DISTINCT)#55130
LuciferYang wants to merge 6 commits intoapache:masterfrom
LuciferYang:optimize-expand

Conversation

@LuciferYang
Copy link
Copy Markdown
Contributor

What changes were proposed in this pull request?

Add an optimizer rule OptimizeExpand that inserts a de-duplication aggregate before the Expand operator produced by RewriteDistinctAggregates.

Queries with multiple COUNT(DISTINCT) on different columns are rewritten by RewriteDistinctAggregates into a plan that duplicates each input row N times via an Expand operator (where N = number of distinct groups). This data amplification becomes a bottleneck as N grows.

This rule transforms the plan from:

Aggregate(outer) -> Aggregate(inner) -> Expand(Nx) -> child

to:

Aggregate(outer) -> Aggregate(inner) -> Expand(Nx) -> Aggregate(keys + all distinct cols) -> child

The inserted pre-aggregation eliminates duplicate rows before the Expand applies its N-fold amplification, significantly reducing shuffled data volume.

The optimization is controlled by spark.sql.optimizer.optimizeExpandRatio (default -1 = disabled, minimum 2). It only applies when:

  • The Expand projection count >= the configured threshold
  • All aggregates are pure distinct (no non-distinct aggregates like SUM(value))
  • No FILTER clauses on distinct aggregates

Why are the changes needed?

For queries like SELECT key, COUNT(DISTINCT a), COUNT(DISTINCT b), COUNT(DISTINCT c) FROM t GROUP BY key, the Expand operator duplicates each row 3x. With 6 distinct aggregates, it's 6x. The pre-aggregation step eliminates redundant rows before this amplification, converting O(N * input_rows) to O(N * distinct_groups).

Benchmark results

Pure distinct aggregates (6 COUNT(DISTINCT), JDK 17):

Scenario Baseline Optimized Speedup
1K groups, moderate cardinality 8276ms 847ms 9.8x
100K groups, moderate cardinality 13842ms 3624ms 3.8x
1K groups, low cardinality (5 vals) 7902ms 770ms 10.7x
No grouping key 5560ms 370ms 22.4x

Scaling with number of distinct aggregates (JDK 17):

Distinct count Baseline Optimized Speedup
2 2181ms 660ms 3.3x
4 4872ms 794ms 6.1x
6 8259ms 836ms 9.9x
8 12642ms 1603ms 7.9x

Mixed aggregates (with SUM): no regression — baseline and optimized are within noise.

Results are consistent across JDK 17, 21, and 25.

Does this PR introduce any user-facing change?

No. The optimization is disabled by default (spark.sql.optimizer.optimizeExpandRatio = -1). When enabled, query results are unchanged — only execution performance is affected.

How was this patch tested?

Add new tests:

  • Unit tests in OptimizeExpandSuite (plan-level rule tests)
  • End-to-end tests in OptimizeExpandQuerySuite (result correctness)
  • Benchmark in ExpandBenchmark with results for JDK 17, 21, and 25

Was this patch authored or co-authored using generative AI tooling?

Generated-by: Claude Code

import org.apache.spark.sql.catalyst.rules._
import org.apache.spark.sql.internal.SQLConf

class OptimizeExpandSuite extends PlanTest {
Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Consider a test for COUNT(DISTINCT col1 + col2). The pre-aggregate groups by leaf attributes (col1, col2) not the expression, so dedup is less effective but still correct. Worth documenting.

Copy link
Copy Markdown
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Thanks for the suggestion! Added tests in the latest commit:

Plan-level test (OptimizeExpandSuite): verifies that for COUNT(DISTINCT col1 + col2), the pre-aggregate groups by leaf attributes (key, col1, col2, col3) rather than (key, col1+col2, col3). The comment documents the trade-off — dedup is less effective (more groups than strictly necessary) but correctness is preserved.

Correctness test (OptimizeExpandQuerySuite): uses coprime moduli (col1 = id % 7, col2 = id % 11) to generate data where 77 unique (col1, col2) pairs map to only 17 unique col1 + col2 values, ensuring the "less effective dedup" scenario is actually exercised. Verifies optimized results match the non-optimized baseline.

Address review feedback: document and test that the pre-aggregate groups
by leaf attributes (col1, col2) rather than the expression (col1 + col2),
making dedup less effective but still correct.
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment

Labels

None yet

Projects

None yet

Development

Successfully merging this pull request may close these issues.

2 participants