Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
6 changes: 2 additions & 4 deletions src/strands/models/bedrock.py
Original file line number Diff line number Diff line change
Expand Up @@ -743,12 +743,10 @@ async def stream(
"""

def callback(event: StreamEvent | None = None) -> None:
loop.call_soon_threadsafe(queue.put_nowait, event)
if event is None:
return
asyncio.run_coroutine_threadsafe(queue.put(event), loop).result()

loop = asyncio.get_event_loop()
queue: asyncio.Queue[StreamEvent | None] = asyncio.Queue()
queue: asyncio.Queue[StreamEvent | None] = asyncio.Queue(maxsize=1)
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I'm think actually we will want something along the lines of:

async def stream(self, ...) -> AsyncGenerator[StreamEvent, None]:
    ready = threading.Event()
    ready.set()

    def callback(event: StreamEvent | None = None) -> None:
        ready.wait(timeout=0.1)  # can configure different timeouts
        ready.clear()
        loop.call_soon_threadsafe(queue.put_nowait, event)

    loop = asyncio.get_event_loop()
    queue: asyncio.Queue[StreamEvent | None] = asyncio.Queue()

    thread = asyncio.to_thread(self._stream, callback, ...)
    task = asyncio.create_task(thread)

    while True:
        event = await queue.get()
        if event is None:
            break
        yield event
        ready.set()

    await task

The advantages are:

  • Because of timeout, producer thread (_stream) always finishes, no orphaned threads that are blocked if consumer thread (stream) dies.
  • Graceful degradation to batching model chunks instead of deadlock when consumer is slow.
  • No run_coroutine_threadsafe overhead per chunk. call_soon_threadsafe is more lightweight.
  • Same smooth streaming when consumer is keeping up.


# Handle backward compatibility: if system_prompt is provided but system_prompt_content is None
if system_prompt and system_prompt_content is None:
Expand Down
33 changes: 33 additions & 0 deletions tests/strands/models/test_bedrock.py
Original file line number Diff line number Diff line change
@@ -1,3 +1,4 @@
import asyncio
import copy
import logging
import os
Expand Down Expand Up @@ -660,6 +661,38 @@ async def test_stream(bedrock_client, model, messages, tool_spec, model_id, addi
bedrock_client.converse_stream.assert_called_once_with(**request)


@pytest.mark.asyncio
async def test_stream_delivers_chunks_individually(bedrock_client, model, messages):
"""Test that stream delivers chunks one at a time, not in batches.

Regression test for https://github.com/strands-agents/sdk-python/issues/1523.
When the Bedrock stream produces chunks in a burst (no delay between them),
the consumer should still receive them individually rather than all at once.
"""
chunks = [f"chunk_{i}" for i in range(10)]
bedrock_client.converse_stream.return_value = {"stream": chunks}

queue_depths = []
original_get = asyncio.Queue.get

async def tracking_get(self):
result = await original_get(self)
queue_depths.append(self.qsize())
return result

with unittest.mock.patch.object(asyncio.Queue, "get", tracking_get):
received = []
async for event in model.stream(messages):
received.append(event)

assert received == chunks
# After each get(), the queue should be empty (depth 0) — chunks delivered one at a time
assert all(d == 0 for d in queue_depths), (
f"Chunks were batched! Queue depths after each get(): {queue_depths}. "
f"Expected all zeros for individual delivery."
)


@pytest.mark.asyncio
async def test_stream_with_system_prompt_content(bedrock_client, model, messages, alist):
"""Test stream method with system_prompt_content parameter."""
Expand Down