From 5f099248459f7f3121897f28dea77fb1719eeb14 Mon Sep 17 00:00:00 2001 From: Irving Popovetsky Date: Thu, 1 Jan 2026 10:30:59 -0800 Subject: [PATCH 1/3] fix(anthropic): move beta AsyncMessages.stream to sync wrapper The beta streaming API methods (beta.messages.AsyncMessages.stream) were incorrectly placed in WRAPPED_AMETHODS, causing them to be wrapped with an async wrapper that awaits the result. However, stream() returns an async context manager, not a coroutine, so it should use the sync wrapper like the non-beta version. This fixes the error: RuntimeWarning: coroutine '_awrap' was never awaited TypeError: 'coroutine' object does not support the asynchronous context manager protocol Applies the same fix from PR #3220 to the beta API endpoints that were missed. Fixes #3178 Signed-off-by: Irving Popovetsky --- .../instrumentation/anthropic/__init__.py | 24 +++++++++---------- 1 file changed, 12 insertions(+), 12 deletions(-) diff --git a/packages/opentelemetry-instrumentation-anthropic/opentelemetry/instrumentation/anthropic/__init__.py b/packages/opentelemetry-instrumentation-anthropic/opentelemetry/instrumentation/anthropic/__init__.py index e4ac944ef5..38c6e49e90 100644 --- a/packages/opentelemetry-instrumentation-anthropic/opentelemetry/instrumentation/anthropic/__init__.py +++ b/packages/opentelemetry-instrumentation-anthropic/opentelemetry/instrumentation/anthropic/__init__.py @@ -97,6 +97,12 @@ "method": "stream", "span_name": "anthropic.chat", }, + { + "package": "anthropic.resources.beta.messages.messages", + "object": "AsyncMessages", + "method": "stream", + "span_name": "anthropic.chat", + }, # Beta API methods (Bedrock SDK) { "package": "anthropic.lib.bedrock._beta_messages", @@ -110,6 +116,12 @@ "method": "stream", "span_name": "anthropic.chat", }, + { + "package": "anthropic.lib.bedrock._beta_messages", + "object": "AsyncMessages", + "method": "stream", + "span_name": "anthropic.chat", + }, ] WRAPPED_AMETHODS = [ @@ -132,12 +144,6 @@ "method": "create", "span_name": "anthropic.chat", }, - { - "package": "anthropic.resources.beta.messages.messages", - "object": "AsyncMessages", - "method": "stream", - "span_name": "anthropic.chat", - }, # Beta API async methods (Bedrock SDK) { "package": "anthropic.lib.bedrock._beta_messages", @@ -145,12 +151,6 @@ "method": "create", "span_name": "anthropic.chat", }, - { - "package": "anthropic.lib.bedrock._beta_messages", - "object": "AsyncMessages", - "method": "stream", - "span_name": "anthropic.chat", - }, ] From c26aee31f016e9854e5681f62e093ace493f917e Mon Sep 17 00:00:00 2001 From: Irving Popovetsky Date: Thu, 1 Jan 2026 10:35:44 -0800 Subject: [PATCH 2/3] add test file --- .../tests/test_beta_streaming.py | 208 ++++++++++++++++++ 1 file changed, 208 insertions(+) create mode 100644 packages/opentelemetry-instrumentation-anthropic/tests/test_beta_streaming.py diff --git a/packages/opentelemetry-instrumentation-anthropic/tests/test_beta_streaming.py b/packages/opentelemetry-instrumentation-anthropic/tests/test_beta_streaming.py new file mode 100644 index 0000000000..18386b7d2b --- /dev/null +++ b/packages/opentelemetry-instrumentation-anthropic/tests/test_beta_streaming.py @@ -0,0 +1,208 @@ +"""Tests for beta streaming API. + +These tests verify that the OpenTelemetry instrumentation works correctly with +the beta streaming API (client.beta.messages.stream()), which was the subject +of a fix that moved AsyncMessages.stream from WRAPPED_AMETHODS to WRAPPED_METHODS. + +The beta streaming API returns an async context manager, not a coroutine, so it +needs the sync wrapper (not the async wrapper that would await it). + +Related: +- Issue #3178: https://github.com/traceloop/openllmetry/issues/3178 +- PR #3220: https://github.com/traceloop/openllmetry/pull/3220 (fixed non-beta) +""" + +import pytest +from opentelemetry.semconv._incubating.attributes import ( + gen_ai_attributes as GenAIAttributes, +) +from opentelemetry.semconv_ai import SpanAttributes + + +@pytest.mark.vcr +def test_anthropic_beta_message_stream_manager_legacy( + instrument_legacy, anthropic_client, span_exporter, log_exporter, reader +): + """Test sync beta streaming with legacy attributes.""" + response_content = "" + with anthropic_client.beta.messages.stream( + max_tokens=1024, + messages=[ + { + "role": "user", + "content": "Tell me a very short joke about OpenTelemetry", + } + ], + model="claude-3-5-haiku-20241022", + ) as stream: + for event in stream: + if event.type == "content_block_delta" and event.delta.type == "text_delta": + response_content += event.delta.text + + spans = span_exporter.get_finished_spans() + assert [span.name for span in spans] == [ + "anthropic.chat", + ] + anthropic_span = spans[0] + assert ( + anthropic_span.attributes[f"{GenAIAttributes.GEN_AI_PROMPT}.0.content"] + == "Tell me a very short joke about OpenTelemetry" + ) + assert (anthropic_span.attributes[f"{GenAIAttributes.GEN_AI_PROMPT}.0.role"]) == "user" + assert ( + anthropic_span.attributes.get(f"{GenAIAttributes.GEN_AI_COMPLETION}.0.content") + == response_content + ) + assert ( + anthropic_span.attributes.get(f"{GenAIAttributes.GEN_AI_COMPLETION}.0.role") + == "assistant" + ) + assert anthropic_span.attributes[GenAIAttributes.GEN_AI_USAGE_INPUT_TOKENS] >= 1 + assert ( + anthropic_span.attributes[GenAIAttributes.GEN_AI_USAGE_OUTPUT_TOKENS] + + anthropic_span.attributes[GenAIAttributes.GEN_AI_USAGE_INPUT_TOKENS] + == anthropic_span.attributes[SpanAttributes.LLM_USAGE_TOTAL_TOKENS] + ) + + logs = log_exporter.get_finished_logs() + assert len(logs) == 0, ( + "Assert that it doesn't emit logs when use_legacy_attributes is True" + ) + + +@pytest.mark.vcr +@pytest.mark.asyncio +async def test_async_anthropic_beta_message_stream_manager_legacy( + instrument_legacy, async_anthropic_client, span_exporter, log_exporter, reader +): + """Test async beta streaming with legacy attributes. + + This is the main test case for the fix. Before the fix, this would fail with: + RuntimeWarning: coroutine '_awrap' was never awaited + TypeError: 'coroutine' object does not support the asynchronous context manager protocol + + The fix moves beta.messages.AsyncMessages.stream from WRAPPED_AMETHODS to + WRAPPED_METHODS, using the sync wrapper instead of async wrapper. + """ + response_content = "" + async with async_anthropic_client.beta.messages.stream( + max_tokens=1024, + messages=[ + { + "role": "user", + "content": "Tell me a very short joke about OpenTelemetry", + } + ], + model="claude-3-5-haiku-20241022", + ) as stream: + async for event in stream: + if event.type == "content_block_delta" and event.delta.type == "text_delta": + response_content += event.delta.text + + spans = span_exporter.get_finished_spans() + assert [span.name for span in spans] == [ + "anthropic.chat", + ] + anthropic_span = spans[0] + assert ( + anthropic_span.attributes[f"{GenAIAttributes.GEN_AI_PROMPT}.0.content"] + == "Tell me a very short joke about OpenTelemetry" + ) + assert (anthropic_span.attributes[f"{GenAIAttributes.GEN_AI_PROMPT}.0.role"]) == "user" + assert ( + anthropic_span.attributes.get(f"{GenAIAttributes.GEN_AI_COMPLETION}.0.content") + == response_content + ) + assert ( + anthropic_span.attributes.get(f"{GenAIAttributes.GEN_AI_COMPLETION}.0.role") + == "assistant" + ) + assert anthropic_span.attributes[GenAIAttributes.GEN_AI_USAGE_INPUT_TOKENS] >= 1 + assert ( + anthropic_span.attributes[GenAIAttributes.GEN_AI_USAGE_OUTPUT_TOKENS] + + anthropic_span.attributes[GenAIAttributes.GEN_AI_USAGE_INPUT_TOKENS] + == anthropic_span.attributes[SpanAttributes.LLM_USAGE_TOTAL_TOKENS] + ) + + logs = log_exporter.get_finished_logs() + assert len(logs) == 0, ( + "Assert that it doesn't emit logs when use_legacy_attributes is True" + ) + + +@pytest.mark.vcr +@pytest.mark.asyncio +async def test_async_anthropic_beta_message_stream_manager_with_events_with_content( + instrument_with_content, async_anthropic_client, span_exporter, log_exporter, reader +): + """Test async beta streaming with events and content logging enabled.""" + response_content = "" + async with async_anthropic_client.beta.messages.stream( + max_tokens=1024, + messages=[ + { + "role": "user", + "content": "Tell me a very short joke about OpenTelemetry", + } + ], + model="claude-3-5-haiku-20241022", + ) as stream: + async for event in stream: + if event.type == "content_block_delta" and event.delta.type == "text_delta": + response_content += event.delta.text + + spans = span_exporter.get_finished_spans() + assert [span.name for span in spans] == [ + "anthropic.chat", + ] + anthropic_span = spans[0] + assert anthropic_span.attributes[GenAIAttributes.GEN_AI_USAGE_INPUT_TOKENS] >= 1 + assert ( + anthropic_span.attributes[GenAIAttributes.GEN_AI_USAGE_OUTPUT_TOKENS] + + anthropic_span.attributes[GenAIAttributes.GEN_AI_USAGE_INPUT_TOKENS] + == anthropic_span.attributes[SpanAttributes.LLM_USAGE_TOTAL_TOKENS] + ) + + logs = log_exporter.get_finished_logs() + assert len(logs) == 2 + + +@pytest.mark.vcr +@pytest.mark.asyncio +async def test_async_anthropic_beta_message_stream_manager_with_events_with_no_content( + instrument_with_no_content, + async_anthropic_client, + span_exporter, + log_exporter, + reader, +): + """Test async beta streaming with events but content logging disabled.""" + response_content = "" + async with async_anthropic_client.beta.messages.stream( + max_tokens=1024, + messages=[ + { + "role": "user", + "content": "Tell me a very short joke about OpenTelemetry", + } + ], + model="claude-3-5-haiku-20241022", + ) as stream: + async for event in stream: + if event.type == "content_block_delta" and event.delta.type == "text_delta": + response_content += event.delta.text + + spans = span_exporter.get_finished_spans() + assert [span.name for span in spans] == [ + "anthropic.chat", + ] + anthropic_span = spans[0] + assert anthropic_span.attributes[GenAIAttributes.GEN_AI_USAGE_INPUT_TOKENS] >= 1 + assert ( + anthropic_span.attributes[GenAIAttributes.GEN_AI_USAGE_OUTPUT_TOKENS] + + anthropic_span.attributes[GenAIAttributes.GEN_AI_USAGE_INPUT_TOKENS] + == anthropic_span.attributes[SpanAttributes.LLM_USAGE_TOTAL_TOKENS] + ) + + logs = log_exporter.get_finished_logs() + assert len(logs) == 2 From d95bb46c9050174a5026f22414dd1050f31cccfe Mon Sep 17 00:00:00 2001 From: Irving Popovetsky Date: Thu, 1 Jan 2026 10:44:54 -0800 Subject: [PATCH 3/3] address nitpick feedback, add comments --- .../opentelemetry/instrumentation/anthropic/__init__.py | 4 ++++ 1 file changed, 4 insertions(+) diff --git a/packages/opentelemetry-instrumentation-anthropic/opentelemetry/instrumentation/anthropic/__init__.py b/packages/opentelemetry-instrumentation-anthropic/opentelemetry/instrumentation/anthropic/__init__.py index 38c6e49e90..081e02cc3f 100644 --- a/packages/opentelemetry-instrumentation-anthropic/opentelemetry/instrumentation/anthropic/__init__.py +++ b/packages/opentelemetry-instrumentation-anthropic/opentelemetry/instrumentation/anthropic/__init__.py @@ -85,6 +85,8 @@ "span_name": "anthropic.chat", }, # Beta API methods (regular Anthropic SDK) + # Note: AsyncMessages.stream returns an async context manager (not a coroutine), + # so it uses the sync wrapper like the non-beta version above { "package": "anthropic.resources.beta.messages.messages", "object": "Messages", @@ -104,6 +106,8 @@ "span_name": "anthropic.chat", }, # Beta API methods (Bedrock SDK) + # Note: AsyncMessages.stream returns an async context manager (not a coroutine), + # so it uses the sync wrapper like the non-beta version above { "package": "anthropic.lib.bedrock._beta_messages", "object": "Messages",