-
Notifications
You must be signed in to change notification settings - Fork 129
feat: Add in-process context propagation for OpenTelemetry agent execution #699
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
base: main
Are you sure you want to change the base?
Changes from all commits
File filter
Filter by extension
Conversations
Jump to
Diff view
Diff view
There are no files selected for viewing
| Original file line number | Diff line number | Diff line change | ||||
|---|---|---|---|---|---|---|
|
|
@@ -236,6 +236,56 @@ void testSpanAttributes() throws Exception { | |||||
| } | ||||||
| } | ||||||
|
|
||||||
| @Test | ||||||
| void testOpenTelemetryContextIsPropagatedWhenSendMessageIsCalled() throws Exception { | ||||||
| Message testMessage = Message.builder() | ||||||
| .role(Message.Role.ROLE_USER) | ||||||
| .parts(Collections.singletonList(new TextPart("test message"))) | ||||||
| .contextId("context-1234") | ||||||
| .messageId("message-1234") | ||||||
| .build(); | ||||||
| client.sendMessage(testMessage); | ||||||
|
|
||||||
| Thread.sleep(5000); | ||||||
|
Contributor
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Using
Suggested change
|
||||||
|
|
||||||
| List<Map<String, Object>> spans = getSpans(); | ||||||
| var serverSpan = spans.stream().filter(span -> SpanKind.valueOf(span.get("kind").toString()) == SpanKind.SERVER).findFirst().get(); | ||||||
|
Contributor
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Directly calling
Suggested change
|
||||||
| var agentSpan = spans.stream().filter(span -> SpanKind.valueOf(span.get("kind").toString()) == SpanKind.INTERNAL).findFirst().get(); | ||||||
|
Contributor
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Directly calling
Suggested change
|
||||||
|
|
||||||
| String serverTraceId = serverSpan.get("traceId").toString(); | ||||||
| String serverSpanId = serverSpan.get("spanId").toString(); | ||||||
| String agentTraceId = agentSpan.get("traceId").toString(); | ||||||
| String agentParentSpanId = agentSpan.get("parentSpanId").toString(); | ||||||
|
|
||||||
| assertEquals(serverSpanId, agentParentSpanId); | ||||||
| assertEquals(serverTraceId, agentTraceId); | ||||||
| } | ||||||
|
|
||||||
| @Test | ||||||
| void testOpenTelemetryContextIsPropagatedWhenSendMessageStreamIsCalled() throws Exception { | ||||||
| Message testMessage = Message.builder() | ||||||
| .role(Message.Role.ROLE_USER) | ||||||
| .parts(Collections.singletonList(new TextPart("test message"))) | ||||||
| .contextId("context-1234") | ||||||
| .messageId("message-1234") | ||||||
| .build(); | ||||||
| client.sendMessage(testMessage); | ||||||
|
|
||||||
| Thread.sleep(5000); | ||||||
|
|
||||||
|
Contributor
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Using
Suggested change
|
||||||
| List<Map<String, Object>> spans = getSpans(); | ||||||
| var serverSpan = spans.stream().filter(span -> SpanKind.valueOf(span.get("kind").toString()) == SpanKind.SERVER).findFirst().get(); | ||||||
| var agentSpan = spans.stream().filter(span -> SpanKind.valueOf(span.get("kind").toString()) == SpanKind.INTERNAL).findFirst().get(); | ||||||
|
Contributor
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Directly calling
Suggested change
|
||||||
|
|
||||||
|
Contributor
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Directly calling
Suggested change
|
||||||
| String serverTraceId = serverSpan.get("traceId").toString(); | ||||||
| String serverSpanId = serverSpan.get("spanId").toString(); | ||||||
| String agentTraceId = agentSpan.get("traceId").toString(); | ||||||
| String agentParentSpanId = agentSpan.get("parentSpanId").toString(); | ||||||
|
|
||||||
| assertEquals(serverSpanId, agentParentSpanId); | ||||||
| assertEquals(serverTraceId, agentTraceId); | ||||||
| } | ||||||
|
Comment on lines
+264
to
+287
Contributor
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. The test methods |
||||||
|
|
||||||
| protected void saveTaskInTaskStore(Task task) throws Exception { | ||||||
| HttpClient httpClient = HttpClient.newBuilder() | ||||||
| .version(HttpClient.Version.HTTP_2) | ||||||
|
|
||||||
| Original file line number | Diff line number | Diff line change |
|---|---|---|
| @@ -0,0 +1,25 @@ | ||
| package io.a2a.extras.opentelemetry; | ||
|
|
||
| import io.a2a.server.spi.AgentExecutionContextPropagator; | ||
| import io.opentelemetry.context.Context; | ||
| import jakarta.annotation.Priority; | ||
| import jakarta.enterprise.context.ApplicationScoped; | ||
| import jakarta.enterprise.inject.Alternative; | ||
|
|
||
| /** | ||
| * An implementation of AgentExecutionContextPropagator that uses OpenTelemetry's Context to propagate the execution | ||
| * context. This allows for proper context propagation across threads and asynchronous operations when using | ||
| * OpenTelemetry for tracing. The implementation simply delegates the wrapping OpenTelemetry's context propagation | ||
| * mechanism. | ||
| * | ||
| * @see Context#wrap(Runnable) | ||
| */ | ||
| @ApplicationScoped | ||
| @Alternative | ||
| @Priority(1) | ||
| public class OtelAgentExecutionContextPropagator implements AgentExecutionContextPropagator { | ||
|
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Do we really need this? I think we can always wrap the context if you add the opentelemetry API dependency to the project, no?
Author
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. yep but because opentelemetry is provided as an extra dependency I was forced to use a SPI to implement it externally. |
||
| @Override | ||
| public Runnable wrap(Runnable runnable) { | ||
| return Context.current().wrap(runnable); | ||
| } | ||
| } | ||
| Original file line number | Diff line number | Diff line change |
|---|---|---|
| @@ -0,0 +1,46 @@ | ||
| package io.a2a.extras.opentelemetry; | ||
|
|
||
| import io.opentelemetry.context.Context; | ||
| import io.opentelemetry.context.ContextKey; | ||
| import io.opentelemetry.context.Scope; | ||
| import org.junit.jupiter.api.AfterEach; | ||
| import org.junit.jupiter.api.BeforeEach; | ||
| import org.junit.jupiter.api.Test; | ||
|
|
||
| import java.util.concurrent.CompletableFuture; | ||
| import java.util.concurrent.ExecutorService; | ||
| import java.util.concurrent.Executors; | ||
| import java.util.concurrent.TimeUnit; | ||
|
|
||
| import static org.junit.jupiter.api.Assertions.assertEquals; | ||
|
|
||
| class OtelAgentExecutionContextPropagatorTest { | ||
|
|
||
| ContextKey<String> exampleContextKey = ContextKey.named("test-key"); | ||
| Scope scope = null; | ||
|
|
||
| @BeforeEach | ||
| void setup() { | ||
| Context context = Context.current().with(exampleContextKey, "test-value"); | ||
| scope = context.makeCurrent(); | ||
| } | ||
|
|
||
| @Test | ||
| void testOtelAgentExecutionContextPropagation() throws InterruptedException { | ||
|
|
||
| OtelAgentExecutionContextPropagator propagator = new OtelAgentExecutionContextPropagator(); | ||
| Runnable wrappedContext = | ||
| propagator.wrap(() -> assertEquals("test-value", Context.current().get(exampleContextKey))); | ||
|
|
||
| ExecutorService executorService = Executors.newSingleThreadExecutor(); | ||
| CompletableFuture<Void> future = CompletableFuture.runAsync(wrappedContext, executorService); | ||
| future.join(); | ||
| executorService.shutdown(); | ||
| executorService.awaitTermination(5, TimeUnit.SECONDS); | ||
| } | ||
|
|
||
| @AfterEach | ||
| void tearDown() { | ||
| scope.close(); | ||
| } | ||
| } |
| Original file line number | Diff line number | Diff line change |
|---|---|---|
| @@ -0,0 +1,13 @@ | ||
| package io.a2a.server.context; | ||
|
|
||
| import io.a2a.server.spi.AgentExecutionContextPropagator; | ||
| import jakarta.enterprise.context.ApplicationScoped; | ||
|
|
||
| @ApplicationScoped | ||
| public class DefaultAgentExecutionContextPropagator implements AgentExecutionContextPropagator { | ||
|
|
||
| @Override | ||
| public Runnable wrap(Runnable runnable) { | ||
| return runnable; | ||
| } | ||
| } |
| Original file line number | Diff line number | Diff line change |
|---|---|---|
| @@ -0,0 +1,14 @@ | ||
| package io.a2a.server.spi; | ||
|
|
||
| /** | ||
| * Interface for propagating the agent execution context across different threads or asynchronous operations. | ||
| * Implementations can wrap Runnable tasks to ensure that the context is properly propagated. | ||
| */ | ||
| public interface AgentExecutionContextPropagator { | ||
|
|
||
| /** | ||
| * Wraps a Runnable task to propagate the agent execution context when the task is executed. | ||
| * @param runnable the Runnable task to wrap | ||
| */ | ||
| Runnable wrap(Runnable runnable); | ||
| } |
| Original file line number | Diff line number | Diff line change |
|---|---|---|
| @@ -0,0 +1,15 @@ | ||
| package io.a2a.server.context; | ||
|
|
||
| import org.junit.jupiter.api.Assertions; | ||
| import org.junit.jupiter.api.Test; | ||
|
|
||
| class DefaultAgentExecutionContextPropagatorTest { | ||
|
|
||
| @Test | ||
| void testDefaultAgentExecutionPropagation() { | ||
| DefaultAgentExecutionContextPropagator propagator = new DefaultAgentExecutionContextPropagator(); | ||
| Runnable runnable = () -> { | ||
| }; | ||
| Assertions.assertSame(runnable, propagator.wrap(runnable)); | ||
| } | ||
| } |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
The current OpenTelemetry span creation (
startSpan().end()) immediately closes the span. This means it won't accurately capture the duration of theexecutemethod or be properly parented if exceptions occur within the method. For accurate tracing and robust span management, it's best practice to use atry-with-resourcesblock withSpan.current().makeCurrent()to ensure the span encompasses the entire operation and is correctly ended, even in the presence of exceptions.