Skip to content

Conversation

@zjxxzjwang
Copy link
Contributor

@zjxxzjwang zjxxzjwang commented Dec 30, 2025

Motivation

1、A message is split into chunk0, chunk1, and chunk2. When chunk0 is unexpectedly lost, the current code logic will endlessly deliver chunk1 and chunk2 to redeliveryMessages, causing the entire subscription to become blocked.

2、When a chunk message corresponds to a producer whose availablePermits remain below zero for an extended period, that chunk message will also be endlessly delivered to the “redeliveryMessages” queue, similarly causing the entire subscription to become blocked.

Modifications

1、In the event of chunk0 being unexpectedly lost, chunk1 and chunk2 messages are directly acknowledged and logged without blocking the entire subscription.

2、When sending each chunk of the same message, the availablePermits of the consumer corresponding to that chunk message are no longer checked, ensuring normal delivery.

Documentation

  • doc
  • doc-required
  • doc-not-needed
  • doc-complete

…ocked when a chunk message with an ID of zero did not exist.
@github-actions
Copy link

@zjxxzjwang Please add the following content to your PR description and select a checkbox:

- [ ] `doc` <!-- Your PR contains doc changes -->
- [ ] `doc-required` <!-- Your PR changes impact docs and you will update later -->
- [ ] `doc-not-needed` <!-- Your PR changes do not impact docs -->
- [ ] `doc-complete` <!-- Docs have been already added -->

Copy link

Copilot AI left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Pull request overview

This PR fixes a critical issue where subscriptions become blocked when chunk messages have problems: either when chunk0 is missing, or when a consumer's available permits remain negative. The fix acknowledges and logs orphaned non-first chunks instead of endlessly retrying them, and removes permit checking for subsequent chunks after the first chunk to prevent permit-related blocking.

Key changes:

  • Modified chunk message handling to directly acknowledge non-first chunks when chunk0 is missing
  • Removed permit validation for chunk messages after the first chunk to prevent blocking when permits are exhausted
  • Added subscription parameter to SharedConsumerAssignor to enable acknowledgment of orphaned chunks

Reviewed changes

Copilot reviewed 4 out of 4 changed files in this pull request and generated 7 comments.

File Description
pulsar-broker/src/main/java/org/apache/pulsar/broker/service/SharedConsumerAssignor.java Core fix: inline chunk handling logic, add subscription parameter for acknowledgment, remove getConsumerForUuid method, and handle missing chunk0 scenario
pulsar-broker/src/main/java/org/apache/pulsar/broker/service/persistent/PersistentDispatcherMultipleConsumers.java Pass subscription to SharedConsumerAssignor constructor
pulsar-broker/src/main/java/org/apache/pulsar/broker/service/persistent/PersistentDispatcherMultipleConsumersClassic.java Pass subscription to SharedConsumerAssignor constructor
pulsar-broker/src/test/java/org/apache/pulsar/broker/service/SharedConsumerAssignorTest.java Update test to accommodate new constructor signature with null subscription

💡 Add Copilot custom instructions for smarter, more guided reviews. Learn how to get started.

entryAndMetadataList.clear();
replayQueue.clear();
assignor = new SharedConsumerAssignor(roundRobinConsumerSelector, replayQueue::add);
assignor = new SharedConsumerAssignor(roundRobinConsumerSelector, replayQueue::add, null);
Copy link

Copilot AI Jan 2, 2026

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

The test passes null for the subscription parameter, which means the new logic for handling missing chunk0 (lines 97-108 in SharedConsumerAssignor.java) is not covered by existing tests. Consider adding test cases that:

  1. Verify non-first chunks are properly acknowledged when chunk0 is missing
  2. Test the behavior when subscription is a PulsarCompactorSubscription
  3. Ensure that acknowledged chunks don't get dispatched to consumers

Copilot uses AI. Check for mistakes.
/**
* The assigner to assign entries to the proper {@link Consumer} in the shared subscription.
*/

Copy link

Copilot AI Jan 2, 2026

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

There is an unnecessary blank line between the Javadoc comment and the class annotations. The annotations should immediately follow the Javadoc without a blank line in between.

Suggested change

Copilot uses AI. Check for mistakes.

final int permits = consumerToPermits.computeIfAbsent(consumerForUuid, Consumer::getAvailablePermits);
if (metadata.getChunkId() == metadata.getNumChunksFromMsg() - 1) {
// The last chunk is received, we should remove the uuid
Copy link

Copilot AI Jan 2, 2026

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

The comment states "we should remove the uuid" but should clarify "we should remove the uuid from the cache" for better clarity.

Suggested change
// The last chunk is received, we should remove the uuid
// The last chunk is received, we should remove the uuid from the cache

Copilot uses AI. Check for mistakes.
Comment on lines +106 to +108
entryAndMetadata.release();
}
}
Copy link

Copilot AI Jan 2, 2026

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

When subscription is null or the subscription is a PulsarCompactorSubscription, non-first chunks without a cached consumer are not acknowledged or released (the if block at lines 97-108 doesn't execute acknowledgment). However, the code still proceeds to cache the UUID mapping (line 111) and add the message to consumerToEntries (line 120). This means subsequent chunks with the same UUID will be delivered to this consumer, even though chunk0 was never received. This partially defeats the purpose of the fix, as the subscription will still receive incomplete chunked messages in these cases.

Suggested change
entryAndMetadata.release();
}
}
entryAndMetadata.release();
} else {
// For compactor subscriptions, just release the entry without ack
entryAndMetadata.release();
}
} else {
// No subscription available, just release the entry
entryAndMetadata.release();
}
// Do not cache UUID mapping or deliver this non-first chunk
continue;

Copilot uses AI. Check for mistakes.
Comment on lines +98 to +103
log.warn("[{}][{}] Skip the message because of it not the first chunk."
+ " Position: {}, UUID: {}, ChunkId: {}, NumChunksFromMsg: {}",
subscription.getTopicName(), subscription.getName(), entryAndMetadata.getPosition(),
metadata.getUuid(), metadata.getChunkId(), metadata.getNumChunksFromMsg());
// Directly ack the message
if (!(subscription instanceof PulsarCompactorSubscription)) {
Copy link

Copilot AI Jan 2, 2026

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

The error message uses double concatenation with a plus sign outside the format arguments, which is unconventional and harder to read. The message should be formatted as a single string template or the concatenation should be inside the curly braces if necessary for readability.

Suggested change
log.warn("[{}][{}] Skip the message because of it not the first chunk."
+ " Position: {}, UUID: {}, ChunkId: {}, NumChunksFromMsg: {}",
subscription.getTopicName(), subscription.getName(), entryAndMetadata.getPosition(),
metadata.getUuid(), metadata.getChunkId(), metadata.getNumChunksFromMsg());
// Directly ack the message
if (!(subscription instanceof PulsarCompactorSubscription)) {
log.warn("[{}][{}] Skip the message because of it not the first chunk. Position: {}, UUID: {}, ChunkId: {}, NumChunksFromMsg: {}",
subscription.getTopicName(), subscription.getName(), entryAndMetadata.getPosition(),
metadata.getUuid(), metadata.getChunkId(), metadata.getNumChunksFromMsg());
// Directly ack the message
if (!(subscription instanceof PulsarCompactorSubscription)) {
if (!(subscription instanceof PulsarCompactorSubscription)) {

Copilot uses AI. Check for mistakes.
continue;
if (metadata.getChunkId() != 0) {
if (subscription != null) {
log.warn("[{}][{}] Skip the message because of it not the first chunk."
Copy link

Copilot AI Jan 2, 2026

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

The error message contains a grammatical error. "because of it not the first chunk" should be "because it is not the first chunk".

Suggested change
log.warn("[{}][{}] Skip the message because of it not the first chunk."
log.warn("[{}][{}] Skip the message because it is not the first chunk."

Copilot uses AI. Check for mistakes.
Comment on lines +103 to 120
if (!(subscription instanceof PulsarCompactorSubscription)) {
subscription.acknowledgeMessage(Collections.singletonList(
entryAndMetadata.getPosition()), AckType.Individual, Collections.emptyMap());
entryAndMetadata.release();
}
}
}
consumerForUuid = consumer;
uuidToConsumer.put(uuid, consumerForUuid);
}

final int permits = consumerToPermits.computeIfAbsent(consumerForUuid, Consumer::getAvailablePermits);
if (metadata.getChunkId() == metadata.getNumChunksFromMsg() - 1) {
// The last chunk is received, we should remove the uuid
uuidToConsumer.remove(uuid);
}

consumerToEntries.computeIfAbsent(consumerForUuid, __ -> new ArrayList<>()).add(entryAndMetadata);
Copy link

Copilot AI Jan 2, 2026

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

When a non-first chunk is acknowledged and released (lines 104-106), the message should not continue to be processed. However, the code proceeds to add this already-released message to consumerToEntries (line 120) and cache its UUID mapping (line 111). This will cause the consumer to receive a message whose buffer has already been released, leading to potential reference counting errors or access to freed memory. After acknowledging and releasing the message, the code should continue to the next iteration of the loop without further processing.

Copilot uses AI. Check for mistakes.
@lhotari
Copy link
Member

lhotari commented Jan 2, 2026

@zjxxzjwang please resolve the merge conflict

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment

Labels

doc-not-needed Your PR changes do not impact docs

Projects

None yet

Development

Successfully merging this pull request may close these issues.

2 participants