[fix](be) Preserve agg hash shuffle after non-hash exchange#63766
[fix](be) Preserve agg hash shuffle after non-hash exchange#63766BiteTheDDDDt wants to merge 4 commits into
Conversation
### What problem does this PR solve? Issue Number: None Related PR: apache#63529, apache#62438 Problem Summary: PR apache#63529 preserved local hash shuffle for serial merge aggregation, but the condition only covered serial children. A non-finalize merge aggregation can also receive input from a child that requires a non-hash local exchange, such as the adaptive passthrough exchange used by nested loop join after apache#62438. In that case identical group keys can be split across local aggregation instances and later count distinct aggregation may count duplicate partial groups. This change preserves local hash shuffle for merge aggregation when the child breaks local key distribution through either serial execution or a non-hash local exchange. ### Release note Fix wrong results for merge aggregation after non-hash local exchange. ### Check List (For Author) - Test: Unit Test - ./run-be-ut.sh --run --filter=AggOperatorRequiredDistributionTest.* - Behavior changed: Yes. Merge aggregation now keeps local hash shuffle when its child requires a non-hash local exchange. - Does this need documentation: No Co-authored-by: Copilot <223556219+Copilot@users.noreply.github.com>
|
Thank you for your contribution to Apache Doris. Please clearly describe your PR:
|
|
run buildall |
|
/review |
There was a problem hiding this comment.
Pull request overview
This PR fixes a correctness edge case in BE pipeline local-exchange planning: when a non-finalize merge aggregation follows a child operator that requires a non-hash local exchange (e.g. ADAPTIVE_PASSTHROUGH from nested loop join), the local key distribution can be broken and identical group keys may be split across local agg instances, causing wrong results (notably for later DISTINCT-related aggregation stages).
Changes:
- Extend the “don’t skip local hash shuffle” condition for merge aggregations from “serial child” to “child that breaks local key distribution” (serial or requires a non-hash local exchange).
- Add a helper to detect whether a child’s required distribution implies a non-hash local exchange that would break key distribution.
- Add a BE unit test covering the
ADAPTIVE_PASSTHROUGHchild case to ensureHASH_SHUFFLEis required.
Reviewed changes
Copilot reviewed 2 out of 2 changed files in this pull request and generated no comments.
| File | Description |
|---|---|
| be/src/exec/operator/aggregation_sink_operator.h | Refines merge-agg distribution requirement logic to preserve hash shuffle when a child requires non-hash local exchange (or is serial). |
| be/test/exec/operator/agg_operator_test.cpp | Adds a unit test validating merge-agg requires HASH_SHUFFLE after a child with ADAPTIVE_PASSTHROUGH distribution. |
💡 Add Copilot custom instructions for smarter, more guided reviews. Learn how to get started.
There was a problem hiding this comment.
Review result: no blocking issues found in the changed code.
Critical checkpoint conclusions:
- Goal/test: The PR preserves aggregation hash shuffle when a merge aggregation follows a child that requires a non-hash local exchange. The added unit test covers the new ADAPTIVE_PASSTHROUGH case at the required_data_distribution level.
- Scope: The change is small and focused on AggSinkOperatorX distribution selection plus one targeted unit test.
- Concurrency/lifecycle: No new shared mutable state, threads, dependencies, or non-trivial lifecycle management are introduced.
- Configuration/compatibility: No new configuration, storage format, protocol, or rolling-upgrade compatibility concerns found.
- Parallel paths: The modified path is the relevant non-streaming aggregation sink distribution path; no distinct parallel BE path appeared to require the same exact change in this PR.
- Conditional logic: The new condition preserves the existing serial-child behavior and extends it to child-required non-hash local exchanges; this matches the local-exchange planning flow inspected.
- Test coverage: Coverage is targeted but limited to BE unit validation of distribution selection, not a full SQL regression. This is a residual coverage limitation, not a blocking defect from the reviewed diff.
- Observability/transactions/data writes: Not applicable; no new transaction, persistence, data-write, or observability-sensitive path is introduced.
- Performance: The added child distribution check is lightweight and only occurs during pipeline distribution planning; no hot-path performance issue found.
User focus response: no additional user-provided review focus was present.
Verification: Attempted ./run-be-ut.sh --run --filter=AggOperatorRequiredDistributionTest.*, but the command timed out after 120s while initializing/building third-party submodules, before the targeted test could complete.
TPC-H: Total hot run time: 31690 ms |
TPC-DS: Total hot run time: 171410 ms |
BE UT Coverage ReportIncrement line coverage Increment coverage report
|
BE Regression && UT Coverage ReportIncrement line coverage Increment coverage report
|
1 similar comment
BE Regression && UT Coverage ReportIncrement line coverage Increment coverage report
|
### What problem does this PR solve? Issue Number: None Related PR: apache#63529, apache#62438 Problem Summary: A hash-distributed aggregation can receive input after a non-hash local exchange, such as the adaptive passthrough exchanges planned around nested loop joins. The downstream aggregation still relies on key distribution for distinct aggregation, but the non-hash local exchange can split identical keys across local aggregation instances. This caused wrong count distinct results. This change lets local exchange sources expose their actual exchange distribution and lets aggregations force a new local hash shuffle when their child breaks local key distribution. ### Release note Fix wrong results for distinct aggregation after non-hash local exchange. ### Check List (For Author) - Test: Regression test / Unit Test / Manual test - ./run-be-ut.sh --run --filter=AggOperatorRequiredDistributionTest.* - ./run-regression-test.sh --run -d query_p0/join -s test_agg_after_nested_loop_join_local_exchange - Manual reproduction from output/ddl.txt returned 10 5070261 - Behavior changed: Yes. Aggregation now preserves required hash distribution when its child local exchange uses a non-hash distribution. - Does this need documentation: No Co-authored-by: Copilot <223556219+Copilot@users.noreply.github.com>
|
run buildall |
TPC-H: Total hot run time: 31608 ms |
TPC-DS: Total hot run time: 171113 ms |
|
run buildall |
TPC-H: Total hot run time: 32261 ms |
TPC-DS: Total hot run time: 173240 ms |
What problem does this PR solve?
Issue Number: None
Related PR: #63529, #62438
Problem Summary:
enable_local_exchange_before_agg=falseallows first-phase aggregation to skip the local hash exchange before agg for performance. This is only correct when the input still preserves local key distribution.After #62438, nested loop join and other operators may introduce non-hash local exchanges such as
ADAPTIVE_PASSTHROUGH. Those exchanges can split rows with the same group/distinct key across local pipeline tasks. If agg still skips the hash local exchange, partial aggregation states for the same key are built in different tasks and laterCOUNT(DISTINCT ...)can over-count. The reproduced query inoutput/ddl.txtreturned wrong counts such as18/20instead of10.This PR preserves correctness while keeping the knob usable:
enable_local_exchange_before_agg=falseonly when the child preserves local key distribution.AggSinkOperatorX,StreamingAggOperatorX, andDistinctStreamingAggOperatorX.Pipeline::need_to_local_exchange()also handles the case where the current pipeline source is a non-hashLocalExchangeSourcebut the downstream target requires hash distribution, so inherited hash-ish pipeline state cannot incorrectly suppress the required local exchange.Release note
Fix wrong results for aggregation after non-hash local exchange when
enable_local_exchange_before_aggis disabled.Check List (For Author)
build-support/clang-format.sh --quiet && build-support/check-format.sh --quiet./run-be-ut.sh --run --filter=StreamingAggOperatorTest.require_hash_shuffle_after_non_hash_local_exchange:DistinctStreamingAggOperatorTest.require_hash_shuffle_after_non_hash_local_exchange:AggOperatorRequiredDistributionTest.*./build.sh --bemysql -h127.0.0.1 -P9333 -uroot -D regression_test --batch --raw --skip-column-names < output/ddl.txtreturned10 5070261./run-regression-test.sh --run -d query_p0/join -s test_agg_after_nested_loop_join_local_exchangebuild-support/run-clang-tidy.shattempted; remaining failures are existing/toolchain noise fromjni-util.hstatic assertions, old header warnings, test include resolution, and pre-existing operator complexity/redundant-init diagnostics.enable_local_exchange_before_agg=false.