Skip to content
Draft
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
186 changes: 100 additions & 86 deletions sentry_sdk/integrations/anthropic.py
Original file line number Diff line number Diff line change
Expand Up @@ -49,6 +49,9 @@
from sentry_sdk.tracing import Span
from sentry_sdk._types import TextPart

from anthropic import AsyncStream
from anthropic.types import RawMessageStreamEvent


class _RecordedUsage:
output_tokens: int = 0
Expand Down Expand Up @@ -389,6 +392,98 @@
span.__exit__(None, None, None)


def _patch_streaming_response_iterator(
result: "AsyncStream[RawMessageStreamEvent]",
span: "sentry_sdk.tracing.Span",
integration: "AnthropicIntegration",
) -> None:
"""
Responsible for closing the `gen_ai.chat` span and setting attributes acquired during response consumption.
"""
old_iterator = result._iterator

def new_iterator() -> "Iterator[MessageStreamEvent]":
model = None
usage = _RecordedUsage()
content_blocks: "list[str]" = []

for event in old_iterator:
(
model,
usage,
content_blocks,
) = _collect_ai_data(
event,
model,
usage,
content_blocks,
)
yield event

# Anthropic's input_tokens excludes cached/cache_write tokens.
# Normalize to total input tokens for correct cost calculations.
total_input = (
usage.input_tokens
+ (usage.cache_read_input_tokens or 0)
+ (usage.cache_write_input_tokens or 0)
)

_set_output_data(
span=span,
integration=integration,
model=model,
input_tokens=total_input,
output_tokens=usage.output_tokens,
cache_read_input_tokens=usage.cache_read_input_tokens,
cache_write_input_tokens=usage.cache_write_input_tokens,
content_blocks=[{"text": "".join(content_blocks), "type": "text"}],
finish_span=True,
)

async def new_iterator_async() -> "AsyncIterator[MessageStreamEvent]":
model = None
usage = _RecordedUsage()
content_blocks: "list[str]" = []

async for event in old_iterator:
(
model,
usage,
content_blocks,
) = _collect_ai_data(
event,
model,
usage,
content_blocks,
)
yield event

# Anthropic's input_tokens excludes cached/cache_write tokens.
# Normalize to total input tokens for correct cost calculations.
total_input = (
usage.input_tokens
+ (usage.cache_read_input_tokens or 0)
+ (usage.cache_write_input_tokens or 0)
)

_set_output_data(
span=span,
integration=integration,
model=model,
input_tokens=total_input,
output_tokens=usage.output_tokens,
cache_read_input_tokens=usage.cache_read_input_tokens,
cache_write_input_tokens=usage.cache_write_input_tokens,
content_blocks=[{"text": "".join(content_blocks), "type": "text"}],
finish_span=True,
)

if str(type(result._iterator)) == "<class 'async_generator'>":
result._iterator = new_iterator_async()
else:
result._iterator = new_iterator()


def _sentry_patched_create_common(f: "Any", *args: "Any", **kwargs: "Any") -> "Any":
integration = kwargs.pop("integration")
if integration is None:
Expand All @@ -415,6 +510,11 @@

result = yield f, args, kwargs

is_streaming_response = kwargs.get("stream", False)
if is_streaming_response:
_patch_streaming_response_iterator(result, span, integration)
return result

Check warning on line 516 in sentry_sdk/integrations/anthropic.py

View workflow job for this annotation

GitHub Actions / warden: find-bugs

Unprotected _iterator access can crash user applications when stream=True response lacks _iterator attribute

The refactored code accesses `result._iterator` directly (lines 403, 481) without verifying the attribute exists, and the call to `_patch_streaming_response_iterator` (line 515) is not wrapped in `capture_internal_exceptions()`. Previously, the streaming path was inside `capture_internal_exceptions()` with `hasattr(result, "_iterator")` check. If `stream=True` is passed but the response object doesn't have `_iterator` (e.g., API version mismatch or unexpected response type), an unhandled `AttributeError` will propagate to the user's application instead of being gracefully logged.

Check warning on line 516 in sentry_sdk/integrations/anthropic.py

View workflow job for this annotation

GitHub Actions / warden: code-review

Streaming response handling no longer protected by capture_internal_exceptions

The streaming path now checks `kwargs.get("stream", False)` and calls `_patch_streaming_response_iterator` before entering the `capture_internal_exceptions()` block. Previously, the streaming detection (`hasattr(result, "_iterator")`) was inside the exception handling block. If `stream=True` is passed but the result unexpectedly lacks `_iterator` (e.g., API behavior change, error responses), an `AttributeError` will propagate instead of being gracefully handled, potentially breaking the instrumented code path.
Comment on lines +513 to +516
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Streaming response handling no longer protected by capture_internal_exceptions

The streaming path now checks kwargs.get("stream", False) and calls _patch_streaming_response_iterator before entering the capture_internal_exceptions() block. Previously, the streaming detection (hasattr(result, "_iterator")) was inside the exception handling block. If stream=True is passed but the result unexpectedly lacks _iterator (e.g., API behavior change, error responses), an AttributeError will propagate instead of being gracefully handled, potentially breaking the instrumented code path.

Verification

Read anthropic.py lines 510-520 to verify the new streaming check is outside capture_internal_exceptions(). The old code had elif hasattr(result, "_iterator"): inside the with capture_internal_exceptions(): block per the diff context. The new code at lines 513-516 executes before entering the block at line 518.

Suggested fix: Wrap the streaming response handling in capture_internal_exceptions to maintain defensive error handling

Suggested change
is_streaming_response = kwargs.get("stream", False)
if is_streaming_response:
_patch_streaming_response_iterator(result, span, integration)
return result
with capture_internal_exceptions():
is_streaming_response = kwargs.get("stream", False)
if is_streaming_response:
_patch_streaming_response_iterator(result, span, integration)
return result

Identified by Warden code-review · TGC-FD8


with capture_internal_exceptions():
if hasattr(result, "content"):
(
Expand Down Expand Up @@ -444,92 +544,6 @@
content_blocks=content_blocks,
finish_span=True,
)

# Streaming response
elif hasattr(result, "_iterator"):
old_iterator = result._iterator

def new_iterator() -> "Iterator[MessageStreamEvent]":
model = None
usage = _RecordedUsage()
content_blocks: "list[str]" = []

for event in old_iterator:
(
model,
usage,
content_blocks,
) = _collect_ai_data(
event,
model,
usage,
content_blocks,
)
yield event

# Anthropic's input_tokens excludes cached/cache_write tokens.
# Normalize to total input tokens for correct cost calculations.
total_input = (
usage.input_tokens
+ (usage.cache_read_input_tokens or 0)
+ (usage.cache_write_input_tokens or 0)
)

_set_output_data(
span=span,
integration=integration,
model=model,
input_tokens=total_input,
output_tokens=usage.output_tokens,
cache_read_input_tokens=usage.cache_read_input_tokens,
cache_write_input_tokens=usage.cache_write_input_tokens,
content_blocks=[{"text": "".join(content_blocks), "type": "text"}],
finish_span=True,
)

async def new_iterator_async() -> "AsyncIterator[MessageStreamEvent]":
model = None
usage = _RecordedUsage()
content_blocks: "list[str]" = []

async for event in old_iterator:
(
model,
usage,
content_blocks,
) = _collect_ai_data(
event,
model,
usage,
content_blocks,
)
yield event

# Anthropic's input_tokens excludes cached/cache_write tokens.
# Normalize to total input tokens for correct cost calculations.
total_input = (
usage.input_tokens
+ (usage.cache_read_input_tokens or 0)
+ (usage.cache_write_input_tokens or 0)
)

_set_output_data(
span=span,
integration=integration,
model=model,
input_tokens=total_input,
output_tokens=usage.output_tokens,
cache_read_input_tokens=usage.cache_read_input_tokens,
cache_write_input_tokens=usage.cache_write_input_tokens,
content_blocks=[{"text": "".join(content_blocks), "type": "text"}],
finish_span=True,
)

if str(type(result._iterator)) == "<class 'async_generator'>":
result._iterator = new_iterator_async()
else:
result._iterator = new_iterator()

else:
span.set_data("unknown_response", True)
span.__exit__(None, None, None)
Expand Down
Loading