Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
Show all changes
42 commits
Select commit Hold shift + click to select a range
e3cc76d
fix(anthropic): Respect iterator protocol in synchronous streamed res…
alexander-alderman-webb Mar 16, 2026
7bd7ed4
version check and typing
alexander-alderman-webb Mar 16, 2026
c8a6ec3
fix tests
alexander-alderman-webb Mar 16, 2026
6f1d833
missing types
alexander-alderman-webb Mar 16, 2026
40e2bf0
.
alexander-alderman-webb Mar 16, 2026
a21406f
fix(anthropic): Respect iterator protocol in asynchronous streamed re…
alexander-alderman-webb Mar 16, 2026
a3cc18f
simplify
alexander-alderman-webb Mar 16, 2026
631e727
Merge branch 'webb/anthropic/sync-iterators' into webb/anthropic/asyn…
alexander-alderman-webb Mar 16, 2026
dd26abc
simplify
alexander-alderman-webb Mar 16, 2026
c08fd2c
fix(anthropic): Set exception info on span when applicable
alexander-alderman-webb Mar 17, 2026
336b2ee
skip error tests on old versions
alexander-alderman-webb Mar 17, 2026
f6b8909
merge master
alexander-alderman-webb Mar 17, 2026
0aeec72
update tests
alexander-alderman-webb Mar 17, 2026
005eb66
merge
alexander-alderman-webb Mar 17, 2026
c5cd959
.
alexander-alderman-webb Mar 17, 2026
f2cdd14
Merge branch 'webb/anthropic/async-iterators' into webb/anthropic/exc…
alexander-alderman-webb Mar 17, 2026
b92db6d
docstring
alexander-alderman-webb Mar 17, 2026
a956644
docstring
alexander-alderman-webb Mar 17, 2026
beb8f2c
docstrings
alexander-alderman-webb Mar 17, 2026
e84a63f
Merge branch 'webb/anthropic/sync-iterators' into webb/anthropic/asyn…
alexander-alderman-webb Mar 17, 2026
7ec95fe
docs
alexander-alderman-webb Mar 17, 2026
288a065
Merge branch 'webb/anthropic/sync-iterators' into webb/anthropic/asyn…
alexander-alderman-webb Mar 17, 2026
8e9bfab
docstrings
alexander-alderman-webb Mar 17, 2026
fab5d93
type annotation
alexander-alderman-webb Mar 17, 2026
cda41e2
review
alexander-alderman-webb Mar 17, 2026
4156446
Merge branch 'webb/anthropic/sync-iterators' into webb/anthropic/asyn…
alexander-alderman-webb Mar 17, 2026
e17e036
review
alexander-alderman-webb Mar 17, 2026
31869af
simplify
alexander-alderman-webb Mar 17, 2026
1017e9e
simplify
alexander-alderman-webb Mar 17, 2026
8897151
simplify
alexander-alderman-webb Mar 17, 2026
fcedba7
.
alexander-alderman-webb Mar 17, 2026
fdf770a
.
alexander-alderman-webb Mar 17, 2026
e2f7afc
.
alexander-alderman-webb Mar 17, 2026
170cdbf
merge
alexander-alderman-webb Mar 17, 2026
b3d9889
merge master
alexander-alderman-webb Mar 18, 2026
47fd7e4
introduce context manager
alexander-alderman-webb Mar 18, 2026
4b40b3a
return None from __exit__
alexander-alderman-webb Mar 18, 2026
8aeea86
.
alexander-alderman-webb Mar 18, 2026
ddcf52a
remove dead code
alexander-alderman-webb Mar 18, 2026
6d2507d
docstring
alexander-alderman-webb Mar 18, 2026
1bc5f86
docs
alexander-alderman-webb Mar 18, 2026
09d51bc
add comment about guaranteed streaming state
alexander-alderman-webb Mar 19, 2026
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
130 changes: 56 additions & 74 deletions sentry_sdk/integrations/anthropic.py
Original file line number Diff line number Diff line change
Expand Up @@ -89,6 +89,55 @@ class _RecordedUsage:
cache_read_input_tokens: "Optional[int]" = 0


class _StreamSpanContext:
"""
Sets accumulated data on the stream's span and finishes the span on exit.
Is a no-op if the stream has no span set, i.e., when the span has already been finished.
"""

def __init__(
self,
stream: "Union[Stream, MessageStream, AsyncStream, AsyncMessageStream]",
# Flag to avoid unreachable branches when the stream state is known to be initialized (stream._model, etc. are set).
guaranteed_streaming_state: bool = False,
) -> None:
self._stream = stream
self._guaranteed_streaming_state = guaranteed_streaming_state

def __enter__(self) -> "_StreamSpanContext":
return self

def __exit__(
self,
exc_type: "Optional[type[BaseException]]",
exc_val: "Optional[BaseException]",
exc_tb: "Optional[Any]",
) -> None:
with capture_internal_exceptions():
if not hasattr(self._stream, "_span"):
return

if not self._guaranteed_streaming_state and not hasattr(
self._stream, "_model"
):
self._stream._span.__exit__(exc_type, exc_val, exc_tb)
del self._stream._span
return

_set_streaming_output_data(
span=self._stream._span,
integration=self._stream._integration,
model=self._stream._model,
usage=self._stream._usage,
content_blocks=self._stream._content_blocks,
response_id=self._stream._response_id,
finish_reason=self._stream._finish_reason,
)

self._stream._span.__exit__(exc_type, exc_val, exc_tb)
del self._stream._span


class AnthropicIntegration(Integration):
identifier = "anthropic"
origin = f"auto.ai.{identifier}"
Expand Down Expand Up @@ -446,7 +495,7 @@ def _wrap_synchronous_message_iterator(
Sets information received while iterating the response stream on the AI Client Span.
Responsible for closing the AI Client Span unless the span has already been closed in the close() patch.
"""
try:
with _StreamSpanContext(stream, guaranteed_streaming_state=True):
for event in iterator:
# Message and content types are aliases for corresponding Raw* types, introduced in
# https://github.com/anthropics/anthropic-sdk-python/commit/bc9d11cd2addec6976c46db10b7c89a8c276101a
Expand All @@ -466,19 +515,6 @@ def _wrap_synchronous_message_iterator(

_accumulate_event_data(stream, event)
yield event
finally:
with capture_internal_exceptions():
if hasattr(stream, "_span"):
_finish_streaming_span(
span=stream._span,
integration=stream._integration,
model=stream._model,
usage=stream._usage,
content_blocks=stream._content_blocks,
response_id=stream._response_id,
finish_reason=stream._finish_reason,
)
del stream._span


async def _wrap_asynchronous_message_iterator(
Expand All @@ -489,7 +525,7 @@ async def _wrap_asynchronous_message_iterator(
Sets information received while iterating the response stream on the AI Client Span.
Responsible for closing the AI Client Span unless the span has already been closed in the close() patch.
"""
try:
with _StreamSpanContext(stream, guaranteed_streaming_state=True):
async for event in iterator:
# Message and content types are aliases for corresponding Raw* types, introduced in
# https://github.com/anthropics/anthropic-sdk-python/commit/bc9d11cd2addec6976c46db10b7c89a8c276101a
Expand All @@ -509,19 +545,6 @@ async def _wrap_asynchronous_message_iterator(

_accumulate_event_data(stream, event)
yield event
finally:
with capture_internal_exceptions():
if hasattr(stream, "_span"):
_finish_streaming_span(
span=stream._span,
integration=stream._integration,
model=stream._model,
usage=stream._usage,
content_blocks=stream._content_blocks,
response_id=stream._response_id,
finish_reason=stream._finish_reason,
)
del stream._span


def _set_output_data(
Expand All @@ -533,7 +556,6 @@ def _set_output_data(
cache_read_input_tokens: "int | None",
cache_write_input_tokens: "int | None",
content_blocks: "list[Any]",
finish_span: bool = False,
response_id: "str | None" = None,
finish_reason: "str | None" = None,
) -> None:
Expand Down Expand Up @@ -577,9 +599,6 @@ def _set_output_data(
input_tokens_cache_write=cache_write_input_tokens,
)

if finish_span:
span.__exit__(None, None, None)


def _sentry_patched_create_common(f: "Any", *args: "Any", **kwargs: "Any") -> "Any":
integration = kwargs.pop("integration")
Expand Down Expand Up @@ -658,10 +677,10 @@ def _sentry_patched_create_common(f: "Any", *args: "Any", **kwargs: "Any") -> "A
cache_read_input_tokens=cache_read_input_tokens,
cache_write_input_tokens=cache_write_input_tokens,
content_blocks=content_blocks,
finish_span=True,
response_id=getattr(result, "id", None),
finish_reason=getattr(result, "stop_reason", None),
)
span.__exit__(None, None, None)
else:
span.set_data("unknown_response", True)
span.__exit__(None, None, None)
Expand Down Expand Up @@ -742,7 +761,7 @@ def _accumulate_event_data(
stream._finish_reason = finish_reason


def _finish_streaming_span(
def _set_streaming_output_data(
span: "Span",
integration: "AnthropicIntegration",
model: "Optional[str]",
Expand All @@ -752,7 +771,7 @@ def _finish_streaming_span(
finish_reason: "Optional[str]",
) -> None:
"""
Set output attributes on the AI Client Span and end the span.
Set output attributes on the AI Client Span.
"""
# Anthropic's input_tokens excludes cached/cache_write tokens.
# Normalize to total input tokens for correct cost calculations.
Expand All @@ -771,7 +790,6 @@ def _finish_streaming_span(
cache_read_input_tokens=usage.cache_read_input_tokens,
cache_write_input_tokens=usage.cache_write_input_tokens,
content_blocks=[{"text": "".join(content_blocks), "type": "text"}],
finish_span=True,
response_id=response_id,
finish_reason=finish_reason,
)
Expand All @@ -785,27 +803,9 @@ def _wrap_close(
"""

def close(self: "Union[Stream, MessageStream]") -> None:
if not hasattr(self, "_span"):
return f(self)

if not hasattr(self, "_model"):
self._span.__exit__(None, None, None)
del self._span
with _StreamSpanContext(self):
return f(self)

_finish_streaming_span(
span=self._span,
integration=self._integration,
model=self._model,
usage=self._usage,
content_blocks=self._content_blocks,
response_id=self._response_id,
finish_reason=self._finish_reason,
)
del self._span

return f(self)

return close


Expand Down Expand Up @@ -855,27 +855,9 @@ def _wrap_async_close(
"""

async def close(self: "AsyncStream") -> None:
if not hasattr(self, "_span"):
return await f(self)

if not hasattr(self, "_model"):
self._span.__exit__(None, None, None)
del self._span
with _StreamSpanContext(self):
return await f(self)

_finish_streaming_span(
span=self._span,
integration=self._integration,
model=self._model,
usage=self._usage,
content_blocks=self._content_blocks,
response_id=self._response_id,
finish_reason=self._finish_reason,
)
del self._span

return await f(self)

return close


Expand Down
Loading
Loading