From 4cf7e5ae1a9e6bb709ee0af266125daff6a1b73c Mon Sep 17 00:00:00 2001 From: Alexander Alderman Webb Date: Fri, 27 Feb 2026 10:51:45 +0100 Subject: [PATCH 1/3] ref(anthropic): Factor out streamed result handling --- sentry_sdk/integrations/anthropic.py | 184 ++++++++++++++------------- 1 file changed, 98 insertions(+), 86 deletions(-) diff --git a/sentry_sdk/integrations/anthropic.py b/sentry_sdk/integrations/anthropic.py index eca9e8bd3e..ada57aebd7 100644 --- a/sentry_sdk/integrations/anthropic.py +++ b/sentry_sdk/integrations/anthropic.py @@ -49,6 +49,9 @@ from sentry_sdk.tracing import Span from sentry_sdk._types import TextPart + from anthropic import AsyncStream + from anthropic.types import RawMessageStreamEvent + class _RecordedUsage: output_tokens: int = 0 @@ -389,6 +392,96 @@ def _set_output_data( span.__exit__(None, None, None) +def _set_streaming_output_data( + result: "AsyncStream[RawMessageStreamEvent]", + span: "sentry_sdk.tracing.Span", +): + integration = sentry_sdk.get_client().get_integration(AnthropicIntegration) + + old_iterator = result._iterator + + def new_iterator() -> "Iterator[MessageStreamEvent]": + model = None + usage = _RecordedUsage() + content_blocks: "list[str]" = [] + + for event in old_iterator: + ( + model, + usage, + content_blocks, + ) = _collect_ai_data( + event, + model, + usage, + content_blocks, + ) + yield event + + # Anthropic's input_tokens excludes cached/cache_write tokens. + # Normalize to total input tokens for correct cost calculations. + total_input = ( + usage.input_tokens + + (usage.cache_read_input_tokens or 0) + + (usage.cache_write_input_tokens or 0) + ) + + _set_output_data( + span=span, + integration=integration, + model=model, + input_tokens=total_input, + output_tokens=usage.output_tokens, + cache_read_input_tokens=usage.cache_read_input_tokens, + cache_write_input_tokens=usage.cache_write_input_tokens, + content_blocks=[{"text": "".join(content_blocks), "type": "text"}], + finish_span=True, + ) + + async def new_iterator_async() -> "AsyncIterator[MessageStreamEvent]": + model = None + usage = _RecordedUsage() + content_blocks: "list[str]" = [] + + async for event in old_iterator: + ( + model, + usage, + content_blocks, + ) = _collect_ai_data( + event, + model, + usage, + content_blocks, + ) + yield event + + # Anthropic's input_tokens excludes cached/cache_write tokens. + # Normalize to total input tokens for correct cost calculations. + total_input = ( + usage.input_tokens + + (usage.cache_read_input_tokens or 0) + + (usage.cache_write_input_tokens or 0) + ) + + _set_output_data( + span=span, + integration=integration, + model=model, + input_tokens=total_input, + output_tokens=usage.output_tokens, + cache_read_input_tokens=usage.cache_read_input_tokens, + cache_write_input_tokens=usage.cache_write_input_tokens, + content_blocks=[{"text": "".join(content_blocks), "type": "text"}], + finish_span=True, + ) + + if str(type(result._iterator)) == "": + result._iterator = new_iterator_async() + else: + result._iterator = new_iterator() + + def _sentry_patched_create_common(f: "Any", *args: "Any", **kwargs: "Any") -> "Any": integration = kwargs.pop("integration") if integration is None: @@ -415,6 +508,11 @@ def _sentry_patched_create_common(f: "Any", *args: "Any", **kwargs: "Any") -> "A result = yield f, args, kwargs + is_streaming_response = kwargs.get("stream", False) + if is_streaming_response: + _set_streaming_output_data(result, span) + return result + with capture_internal_exceptions(): if hasattr(result, "content"): ( @@ -444,92 +542,6 @@ def _sentry_patched_create_common(f: "Any", *args: "Any", **kwargs: "Any") -> "A content_blocks=content_blocks, finish_span=True, ) - - # Streaming response - elif hasattr(result, "_iterator"): - old_iterator = result._iterator - - def new_iterator() -> "Iterator[MessageStreamEvent]": - model = None - usage = _RecordedUsage() - content_blocks: "list[str]" = [] - - for event in old_iterator: - ( - model, - usage, - content_blocks, - ) = _collect_ai_data( - event, - model, - usage, - content_blocks, - ) - yield event - - # Anthropic's input_tokens excludes cached/cache_write tokens. - # Normalize to total input tokens for correct cost calculations. - total_input = ( - usage.input_tokens - + (usage.cache_read_input_tokens or 0) - + (usage.cache_write_input_tokens or 0) - ) - - _set_output_data( - span=span, - integration=integration, - model=model, - input_tokens=total_input, - output_tokens=usage.output_tokens, - cache_read_input_tokens=usage.cache_read_input_tokens, - cache_write_input_tokens=usage.cache_write_input_tokens, - content_blocks=[{"text": "".join(content_blocks), "type": "text"}], - finish_span=True, - ) - - async def new_iterator_async() -> "AsyncIterator[MessageStreamEvent]": - model = None - usage = _RecordedUsage() - content_blocks: "list[str]" = [] - - async for event in old_iterator: - ( - model, - usage, - content_blocks, - ) = _collect_ai_data( - event, - model, - usage, - content_blocks, - ) - yield event - - # Anthropic's input_tokens excludes cached/cache_write tokens. - # Normalize to total input tokens for correct cost calculations. - total_input = ( - usage.input_tokens - + (usage.cache_read_input_tokens or 0) - + (usage.cache_write_input_tokens or 0) - ) - - _set_output_data( - span=span, - integration=integration, - model=model, - input_tokens=total_input, - output_tokens=usage.output_tokens, - cache_read_input_tokens=usage.cache_read_input_tokens, - cache_write_input_tokens=usage.cache_write_input_tokens, - content_blocks=[{"text": "".join(content_blocks), "type": "text"}], - finish_span=True, - ) - - if str(type(result._iterator)) == "": - result._iterator = new_iterator_async() - else: - result._iterator = new_iterator() - else: span.set_data("unknown_response", True) span.__exit__(None, None, None) From 6cdab1673211cae9482d3ba93b50ca76dabb8865 Mon Sep 17 00:00:00 2001 From: Alexander Alderman Webb Date: Fri, 27 Feb 2026 13:29:09 +0100 Subject: [PATCH 2/3] . --- sentry_sdk/integrations/anthropic.py | 7 +++++-- 1 file changed, 5 insertions(+), 2 deletions(-) diff --git a/sentry_sdk/integrations/anthropic.py b/sentry_sdk/integrations/anthropic.py index ada57aebd7..7379d54311 100644 --- a/sentry_sdk/integrations/anthropic.py +++ b/sentry_sdk/integrations/anthropic.py @@ -392,10 +392,13 @@ def _set_output_data( span.__exit__(None, None, None) -def _set_streaming_output_data( +def _patch_streaming_response_iterator( result: "AsyncStream[RawMessageStreamEvent]", span: "sentry_sdk.tracing.Span", ): + """ + Responsible for closing the `gen_ai.chat` span and setting attributes acquired during response consumption. + """ integration = sentry_sdk.get_client().get_integration(AnthropicIntegration) old_iterator = result._iterator @@ -510,7 +513,7 @@ def _sentry_patched_create_common(f: "Any", *args: "Any", **kwargs: "Any") -> "A is_streaming_response = kwargs.get("stream", False) if is_streaming_response: - _set_streaming_output_data(result, span) + _patch_streaming_response_iterator(result, span) return result with capture_internal_exceptions(): From db5dbb921f70944c46d65dbfe870fad4a8cae565 Mon Sep 17 00:00:00 2001 From: Alexander Alderman Webb Date: Fri, 27 Feb 2026 13:31:22 +0100 Subject: [PATCH 3/3] . --- sentry_sdk/integrations/anthropic.py | 7 +++---- 1 file changed, 3 insertions(+), 4 deletions(-) diff --git a/sentry_sdk/integrations/anthropic.py b/sentry_sdk/integrations/anthropic.py index 7379d54311..16e14c3500 100644 --- a/sentry_sdk/integrations/anthropic.py +++ b/sentry_sdk/integrations/anthropic.py @@ -395,12 +395,11 @@ def _set_output_data( def _patch_streaming_response_iterator( result: "AsyncStream[RawMessageStreamEvent]", span: "sentry_sdk.tracing.Span", -): + integration: "AnthropicIntegration", +) -> None: """ Responsible for closing the `gen_ai.chat` span and setting attributes acquired during response consumption. """ - integration = sentry_sdk.get_client().get_integration(AnthropicIntegration) - old_iterator = result._iterator def new_iterator() -> "Iterator[MessageStreamEvent]": @@ -513,7 +512,7 @@ def _sentry_patched_create_common(f: "Any", *args: "Any", **kwargs: "Any") -> "A is_streaming_response = kwargs.get("stream", False) if is_streaming_response: - _patch_streaming_response_iterator(result, span) + _patch_streaming_response_iterator(result, span, integration) return result with capture_internal_exceptions():