Skip to content

Fix poison pill deadlock: Make retry heap unbounded#2014

Open
the-mann wants to merge 57 commits intoenable-multithreaded-logging-by-defaultfrom
fix/poison-pill-deadlock
Open

Fix poison pill deadlock: Make retry heap unbounded#2014
the-mann wants to merge 57 commits intoenable-multithreaded-logging-by-defaultfrom
fix/poison-pill-deadlock

Conversation

@the-mann
Copy link
Contributor

@the-mann the-mann commented Feb 11, 2026

Summary

Fix deadlock when failing log groups exceed concurrency limit by making the retry heap unbounded.

Problem

With bounded retry heap (size = concurrency):

  • Retry heap size = concurrency (e.g., 2)
  • When failing log groups (10) > heap size (2)
  • Heap fills with failed batches
  • Workers block trying to push more failed batches
  • System deadlocks - no progress possible
  • Allowed log groups get starved

Solution

Make retry heap unbounded:

  • Remove maxSize constraint and semaphore
  • Push() is now non-blocking
  • Failed batches queue up without blocking workers
  • Allowed log groups continue publishing normally

Changes

  • Remove maxSize and semaphore from retryHeap struct
  • Make Push() non-blocking (no semaphore wait)
  • Remove semaphore release from PopReady()
  • Replace sync.Cond halt/resume with channel-based approach + mutex to prevent shutdown deadlock
  • Add TestQueueStopWhileHalted to verify no shutdown deadlock
  • Add state callback tests for retry, expiry, and shutdown scenarios
  • Add poison_pill_test.go with comprehensive poison pill scenario tests
  • Clean up test assertions and remove redundant tests

Test Results

Before Fix: Test deadlocked after 30s with all goroutines blocked

After Fix: ✅ Test PASSES

Allowed success=5 (all batches published)
Denied attempts=110
Heap size=28 (grew beyond concurrency limit of 2)
No deadlock or starvation

Integration Test

Integration test run: https://github.com/aws/amazon-cloudwatch-agent/actions/runs/21973935546

New integration test added in amazon-cloudwatch-agent-test:

  • Test directory: test/cloudwatchlogs_concurrency
  • Config: concurrency=2, force_flush_interval=5s
  • Validates: 1 allowed + 10 denied log groups
  • Expected: Allowed log group continues publishing despite denied groups

Manual Memory Test

1. Memory Stabilizes, Not Growing Unbounded ✅

  • Initial spike: +1.6 MB in first 3 minutes (161s)
  • Stabilization: Memory drops to +1.1 MB and remains stable
  • Final 2 minutes: No growth despite 5,000 more events

2. Garbage Collection Active

Memory actually decreased from 125,244 KB (161s) to 124,816 KB (247s), showing Go's garbage collector is reclaiming memory from processed batches.

3. Retry Heap Bounded by Target Count

With 10 denied log groups:

  • Each has 1 queue → max 1 batch in retry heap per target
  • Circuit breaker halts queues after failure → prevents new batch creation
  • Retry heap stabilizes at ~10 batches (one per denied log group)
  • Memory growth is from batch metadata, not unbounded accumulation

4. No Memory Leak

  • 0.9% growth over 5 minutes with 29,000 events
  • Stable memory in final 2 minutes despite continued event ingestion
  • Well below threshold: <1% growth vs 50% leak threshold

Conclusion

✅ No memory leak: 0.9% growth over 5 minutes with 29,000 events
✅ Memory stabilizes: Peaks at 1.6 MB, drops to 1.1 MB, remains stable
✅ Garbage collection works: Memory decreases after peak
✅ Retry heap bounded: Limited by number of failing targets (~10 batches)
✅ Production ready: Safe for long-running deployments with persistent failures

Related PRs

agarakan and others added 30 commits December 30, 2025 05:00
…ock-on-failure

# Conflicts:
#	plugins/outputs/cloudwatchlogs/cloudwatchlogs.go
#	plugins/outputs/cloudwatchlogs/internal/pusher/batch.go
#	plugins/outputs/cloudwatchlogs/internal/pusher/batch_test.go
#	plugins/outputs/cloudwatchlogs/internal/pusher/pool_test.go
#	plugins/outputs/cloudwatchlogs/internal/pusher/pusher.go
#	plugins/outputs/cloudwatchlogs/internal/pusher/pusher_test.go
#	plugins/outputs/cloudwatchlogs/internal/pusher/queue_test.go
#	plugins/outputs/cloudwatchlogs/internal/pusher/retryheap.go
#	plugins/outputs/cloudwatchlogs/internal/pusher/retryheap_test.go
#	plugins/outputs/cloudwatchlogs/internal/pusher/sender.go
#	plugins/outputs/cloudwatchlogs/internal/pusher/sender_test.go
…ock-on-failure

# Conflicts:
#	plugins/outputs/cloudwatchlogs/cloudwatchlogs.go
#	plugins/outputs/cloudwatchlogs/internal/pusher/batch.go
#	plugins/outputs/cloudwatchlogs/internal/pusher/batch_test.go
#	plugins/outputs/cloudwatchlogs/internal/pusher/pool_test.go
#	plugins/outputs/cloudwatchlogs/internal/pusher/pusher.go
#	plugins/outputs/cloudwatchlogs/internal/pusher/pusher_test.go
#	plugins/outputs/cloudwatchlogs/internal/pusher/queue_test.go
#	plugins/outputs/cloudwatchlogs/internal/pusher/retryheap.go
#	plugins/outputs/cloudwatchlogs/internal/pusher/retryheap_test.go
#	plugins/outputs/cloudwatchlogs/internal/pusher/sender.go
#	plugins/outputs/cloudwatchlogs/internal/pusher/sender_test.go
…nder-block-on-failure

# Conflicts:
#	plugins/outputs/cloudwatchlogs/internal/pusher/pool_test.go
#	plugins/outputs/cloudwatchlogs/internal/pusher/retryheap_test.go
- Add mutex protection to Stop() method to prevent race conditions
- Add stopped flag checks in Push() to prevent pushing after Stop()
- Ensure Push() checks stopped flag both before and after acquiring semaphore
- Fix TestRetryHeapStopTwice to verify correct behavior
- Add TestRetryHeapProcessorExpiredBatchShouldResume to demonstrate bug
- When a batch expires after 14 days, RetryHeapProcessor calls updateState()
  but not done(), leaving circuit breaker permanently closed
- Target remains blocked forever even though bad batch was dropped
- Test currently fails, demonstrating the bug from PR comment
Verifies that startTime and expireAfter are only set once on first call
and remain unchanged on subsequent calls, ensuring the 14-day expiration
is measured from the first send attempt, not from each retry.
Concurrency is now determined by whether workerPool and retryHeap are
provided, making the explicit concurrency parameter redundant.

🤖 Assisted by AI
Add comprehensive recovery tests validating:
1. Permission granted during retry - system recovers and publishes logs
2. System restart during retry - resumes correctly with preserved metadata
3. Multiple targets - healthy targets unaffected by failing target

Tests validate circuit breaker behavior, retry heap functionality,
and proper isolation between targets during permission failures.

Addresses CWQS-3192 (P1 requirement)

🤖 Assisted by AI
Add test_os_filter and test_dir_filter inputs to allow running
specific tests on specific OS platforms. Filters use jq to filter
generated test matrices before execution.

Usage:
  -f test_os_filter=al2023 (run only on al2023)
  -f test_dir_filter=./test/cloudwatchlogs (run only cloudwatchlogs)

When filters are omitted, all tests run (default behavior).
@the-mann the-mann requested a review from a team as a code owner February 11, 2026 21:00
The retry heap is now unbounded, so maxSize is no longer used.

🤖 Assisted by AI
batch.done() already calls updateState() internally, so the explicit
call is unnecessary.

🤖 Assisted by AI
Test had no assertions and was not validating any behavior.

🤖 Assisted by AI
Variable was set but never checked in the test.

🤖 Assisted by AI
Circuit breaker should always block after exactly 1 send attempt,
not "at most 1".

🤖 Assisted by AI
The dummyBatch was not connected to the queue's circuit breaker,
so calling done() on it had no effect. Simplified test to only
verify halt behavior.

🤖 Assisted by AI
- Replace sync.Cond with channel-based halt/resume to prevent shutdown
  deadlock (waitIfHalted now selects on haltCh and stopCh)
- Add mutex to halt/resume/waitIfHalted for thread safety
- Add TestQueueStopWhileHalted to verify no shutdown deadlock
- Add TestQueueHaltResume with proper resume assertions
- Clean up verbose test comments and weak assertions
- Remove orphaned TestQueueResumeOnBatchExpiry comment

🤖 Assisted by AI
Verify state file management during retry, expiry, and shutdown:
- Successful retry persists file offsets via state callbacks
- Expired batch (14d) still persists offsets to prevent re-read
- Clean shutdown does not persist state for unprocessed batches

🤖 Assisted by AI
- Fix TestRetryHeapProcessorSendsBatch: add events to batch, verify
  PutLogEvents is called and done callback fires (was testing empty batch)
- Fix TestRetryHeapProcessorExpiredBatch: set expireAfter field so
  isExpired() actually returns true, verify done() is called
- Fix race in TestRetryHeapProcessorSendsBatch: use atomic.Bool
- Reduce TestRetryHeap_UnboundedPush sleep from 3s to 100ms

🤖 Assisted by AI
…Groups

TestPoisonPillScenario already covers the same scenario (10 denied +
1 allowed with low concurrency). The bounded heap no longer exists so
the 'smaller than' framing is no longer meaningful.

🤖 Assisted by AI
@the-mann the-mann force-pushed the fix/poison-pill-deadlock branch 2 times, most recently from c3d5e69 to 98bdc89 Compare February 13, 2026 17:10
Copy link
Contributor

@jefchien jefchien left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I think the tests could be improved, but functionally, it looks good to me.

Comment on lines +830 to +831
// Trigger resume by calling the success callback directly
queueImpl.resume()
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

nit: Comment isn't quite right, but it accomplishes the same thing.

Comment on lines +824 to +828
// Add second event - should be queued but not sent due to halt
q.AddEvent(newStubLogEvent("second message", time.Now()))

// Verify only one send happened (queue is halted)
assert.Equal(t, int32(1), sendCount.Load(), "Should have only one send due to halt")
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This doesn't wait long enough for the second batch to have been created and attempted to flush, so even without the halting logic this could be true. The flush interval is 10ms, so we should wait at least that amount of time before checking the sendCount and resuming.

Comment on lines +273 to +279
func stringPtr(s string) *string {
return &s
}

func int64Ptr(i int64) *int64 {
return &i
}
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

nit: If we need these functions, we typically use the functions provided by AWS SDK (e.g. aws.String("string")). Can see them in other tests/code in the repo.

@@ -139,7 +134,7 @@ type RetryHeapProcessor struct {
func NewRetryHeapProcessor(retryHeap RetryHeap, workerPool WorkerPool, service cloudWatchLogsService, targetManager TargetManager, logger telegraf.Logger, maxRetryDuration time.Duration, retryer *retryer.LogThrottleRetryer) *RetryHeapProcessor {
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

maxRetryDuration was removed everywhere else. This is no longer used.


// TestRecoveryAfterSystemRestart validates that when the system restarts with
// retry ongoing, it resumes correctly by loading state and continuing retries.
func TestRecoveryAfterSystemRestart(t *testing.T) {
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I'm not seeing the restart or anything to do with the state file.

Comment on lines +121 to +134
// Process batches continuously
processorDone := make(chan struct{})
go func() {
ticker := time.NewTicker(20 * time.Millisecond)
defer ticker.Stop()
for {
select {
case <-processorDone:
return
case <-ticker.C:
processor.processReadyMessages()
}
}
}()
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

nit: Isn't this what the retry heap processor is for? Is there a reason we aren't using processor.Start()?

// TestPoisonPillScenario validates that when 10 denied + 1 allowed log groups
// share a worker pool with concurrency=2, the allowed log group continues
// publishing without being starved by failed retries.
func TestPoisonPillScenario(t *testing.T) {
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

So this only tests the retry heap? Is that intentional?


// TestRecoveryWithMultipleTargets validates that when one target has permission
// issues, other healthy targets continue publishing successfully.
func TestRecoveryWithMultipleTargets(t *testing.T) {
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

What is this testing differently than TestSingleDeniedLogGroup? Don't see which part of this is recovery.


// TestRetryHeapSuccessCallsStateCallback verifies that when a batch succeeds
// on retry through the heap, state callbacks fire to persist file offsets.
func TestRetryHeapSuccessCallsStateCallback(t *testing.T) {
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This one and TestRetryHeapProcessorSendsBatch are pretty similar and a bit redundant.


// TestRetryHeapProcessorExpiredBatchShouldResume verifies that expired batches
// resume the circuit breaker, preventing the target from being permanently blocked.
func TestRetryHeapProcessorExpiredBatchShouldResume(t *testing.T) {
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This, TestRetryHeapProcessorExpiredBatch, and TestRetryHeapExpiryCallsStateCallback are pretty similar as well. Could have all of the assertions in a single test. Would help improve maintainability and test run time.

@the-mann the-mann force-pushed the fix/poison-pill-deadlock branch from a45d9be to 98bdc89 Compare February 19, 2026 18:24
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment

Labels

None yet

Projects

None yet

Development

Successfully merging this pull request may close these issues.

3 participants

Comments