-
Notifications
You must be signed in to change notification settings - Fork 3.7k
[fix][broker]Fixed an issue where the entire subscription would be blocked when a chunk message with an ID of zero did not exist. #25120
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
base: master
Are you sure you want to change the base?
Conversation
…ocked when a chunk message with an ID of zero did not exist.
|
@zjxxzjwang Please add the following content to your PR description and select a checkbox: |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Pull request overview
This PR fixes a critical issue where subscriptions become blocked when chunk messages have problems: either when chunk0 is missing, or when a consumer's available permits remain negative. The fix acknowledges and logs orphaned non-first chunks instead of endlessly retrying them, and removes permit checking for subsequent chunks after the first chunk to prevent permit-related blocking.
Key changes:
- Modified chunk message handling to directly acknowledge non-first chunks when chunk0 is missing
- Removed permit validation for chunk messages after the first chunk to prevent blocking when permits are exhausted
- Added subscription parameter to SharedConsumerAssignor to enable acknowledgment of orphaned chunks
Reviewed changes
Copilot reviewed 4 out of 4 changed files in this pull request and generated 7 comments.
| File | Description |
|---|---|
| pulsar-broker/src/main/java/org/apache/pulsar/broker/service/SharedConsumerAssignor.java | Core fix: inline chunk handling logic, add subscription parameter for acknowledgment, remove getConsumerForUuid method, and handle missing chunk0 scenario |
| pulsar-broker/src/main/java/org/apache/pulsar/broker/service/persistent/PersistentDispatcherMultipleConsumers.java | Pass subscription to SharedConsumerAssignor constructor |
| pulsar-broker/src/main/java/org/apache/pulsar/broker/service/persistent/PersistentDispatcherMultipleConsumersClassic.java | Pass subscription to SharedConsumerAssignor constructor |
| pulsar-broker/src/test/java/org/apache/pulsar/broker/service/SharedConsumerAssignorTest.java | Update test to accommodate new constructor signature with null subscription |
💡 Add Copilot custom instructions for smarter, more guided reviews. Learn how to get started.
| entryAndMetadataList.clear(); | ||
| replayQueue.clear(); | ||
| assignor = new SharedConsumerAssignor(roundRobinConsumerSelector, replayQueue::add); | ||
| assignor = new SharedConsumerAssignor(roundRobinConsumerSelector, replayQueue::add, null); |
Copilot
AI
Jan 2, 2026
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
The test passes null for the subscription parameter, which means the new logic for handling missing chunk0 (lines 97-108 in SharedConsumerAssignor.java) is not covered by existing tests. Consider adding test cases that:
- Verify non-first chunks are properly acknowledged when chunk0 is missing
- Test the behavior when subscription is a PulsarCompactorSubscription
- Ensure that acknowledged chunks don't get dispatched to consumers
| /** | ||
| * The assigner to assign entries to the proper {@link Consumer} in the shared subscription. | ||
| */ | ||
|
|
Copilot
AI
Jan 2, 2026
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
There is an unnecessary blank line between the Javadoc comment and the class annotations. The annotations should immediately follow the Javadoc without a blank line in between.
|
|
||
| final int permits = consumerToPermits.computeIfAbsent(consumerForUuid, Consumer::getAvailablePermits); | ||
| if (metadata.getChunkId() == metadata.getNumChunksFromMsg() - 1) { | ||
| // The last chunk is received, we should remove the uuid |
Copilot
AI
Jan 2, 2026
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
The comment states "we should remove the uuid" but should clarify "we should remove the uuid from the cache" for better clarity.
| // The last chunk is received, we should remove the uuid | |
| // The last chunk is received, we should remove the uuid from the cache |
| entryAndMetadata.release(); | ||
| } | ||
| } |
Copilot
AI
Jan 2, 2026
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
When subscription is null or the subscription is a PulsarCompactorSubscription, non-first chunks without a cached consumer are not acknowledged or released (the if block at lines 97-108 doesn't execute acknowledgment). However, the code still proceeds to cache the UUID mapping (line 111) and add the message to consumerToEntries (line 120). This means subsequent chunks with the same UUID will be delivered to this consumer, even though chunk0 was never received. This partially defeats the purpose of the fix, as the subscription will still receive incomplete chunked messages in these cases.
| entryAndMetadata.release(); | |
| } | |
| } | |
| entryAndMetadata.release(); | |
| } else { | |
| // For compactor subscriptions, just release the entry without ack | |
| entryAndMetadata.release(); | |
| } | |
| } else { | |
| // No subscription available, just release the entry | |
| entryAndMetadata.release(); | |
| } | |
| // Do not cache UUID mapping or deliver this non-first chunk | |
| continue; |
| log.warn("[{}][{}] Skip the message because of it not the first chunk." | ||
| + " Position: {}, UUID: {}, ChunkId: {}, NumChunksFromMsg: {}", | ||
| subscription.getTopicName(), subscription.getName(), entryAndMetadata.getPosition(), | ||
| metadata.getUuid(), metadata.getChunkId(), metadata.getNumChunksFromMsg()); | ||
| // Directly ack the message | ||
| if (!(subscription instanceof PulsarCompactorSubscription)) { |
Copilot
AI
Jan 2, 2026
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
The error message uses double concatenation with a plus sign outside the format arguments, which is unconventional and harder to read. The message should be formatted as a single string template or the concatenation should be inside the curly braces if necessary for readability.
| log.warn("[{}][{}] Skip the message because of it not the first chunk." | |
| + " Position: {}, UUID: {}, ChunkId: {}, NumChunksFromMsg: {}", | |
| subscription.getTopicName(), subscription.getName(), entryAndMetadata.getPosition(), | |
| metadata.getUuid(), metadata.getChunkId(), metadata.getNumChunksFromMsg()); | |
| // Directly ack the message | |
| if (!(subscription instanceof PulsarCompactorSubscription)) { | |
| log.warn("[{}][{}] Skip the message because of it not the first chunk. Position: {}, UUID: {}, ChunkId: {}, NumChunksFromMsg: {}", | |
| subscription.getTopicName(), subscription.getName(), entryAndMetadata.getPosition(), | |
| metadata.getUuid(), metadata.getChunkId(), metadata.getNumChunksFromMsg()); | |
| // Directly ack the message | |
| if (!(subscription instanceof PulsarCompactorSubscription)) { | |
| if (!(subscription instanceof PulsarCompactorSubscription)) { |
| continue; | ||
| if (metadata.getChunkId() != 0) { | ||
| if (subscription != null) { | ||
| log.warn("[{}][{}] Skip the message because of it not the first chunk." |
Copilot
AI
Jan 2, 2026
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
The error message contains a grammatical error. "because of it not the first chunk" should be "because it is not the first chunk".
| log.warn("[{}][{}] Skip the message because of it not the first chunk." | |
| log.warn("[{}][{}] Skip the message because it is not the first chunk." |
| if (!(subscription instanceof PulsarCompactorSubscription)) { | ||
| subscription.acknowledgeMessage(Collections.singletonList( | ||
| entryAndMetadata.getPosition()), AckType.Individual, Collections.emptyMap()); | ||
| entryAndMetadata.release(); | ||
| } | ||
| } | ||
| } | ||
| consumerForUuid = consumer; | ||
| uuidToConsumer.put(uuid, consumerForUuid); | ||
| } | ||
|
|
||
| final int permits = consumerToPermits.computeIfAbsent(consumerForUuid, Consumer::getAvailablePermits); | ||
| if (metadata.getChunkId() == metadata.getNumChunksFromMsg() - 1) { | ||
| // The last chunk is received, we should remove the uuid | ||
| uuidToConsumer.remove(uuid); | ||
| } | ||
|
|
||
| consumerToEntries.computeIfAbsent(consumerForUuid, __ -> new ArrayList<>()).add(entryAndMetadata); |
Copilot
AI
Jan 2, 2026
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
When a non-first chunk is acknowledged and released (lines 104-106), the message should not continue to be processed. However, the code proceeds to add this already-released message to consumerToEntries (line 120) and cache its UUID mapping (line 111). This will cause the consumer to receive a message whose buffer has already been released, leading to potential reference counting errors or access to freed memory. After acknowledging and releasing the message, the code should continue to the next iteration of the loop without further processing.
|
@zjxxzjwang please resolve the merge conflict |
Motivation
1、A message is split into chunk0, chunk1, and chunk2. When chunk0 is unexpectedly lost, the current code logic will endlessly deliver chunk1 and chunk2 to redeliveryMessages, causing the entire subscription to become blocked.
2、When a chunk message corresponds to a producer whose availablePermits remain below zero for an extended period, that chunk message will also be endlessly delivered to the “redeliveryMessages” queue, similarly causing the entire subscription to become blocked.
Modifications
1、In the event of chunk0 being unexpectedly lost, chunk1 and chunk2 messages are directly acknowledged and logged without blocking the entire subscription.
2、When sending each chunk of the same message, the availablePermits of the consumer corresponding to that chunk message are no longer checked, ensuring normal delivery.
Documentation
docdoc-requireddoc-not-neededdoc-complete