From 1294c46bf829c8d614ddc1ba11155741547995fd Mon Sep 17 00:00:00 2001 From: Kevin VIET Date: Wed, 4 Mar 2026 13:15:42 +0100 Subject: [PATCH] feat: Add in-process context propagation for OpenTelemetry agent execution Introduce a context propagation mechanism when an agent execution is requested. The context propagation wraps the agent execution in a Runnable and an implementation of `AgentExecutionContextPropagator`. - An OpenTelemetry implementation of `AgentExecutionContextPropagator` is provided in the opentelemetry module and is responsible to propagate the OpenTelemetry context between threads server thread and agent execution thread. This is needed to ensure well formed traces with a valid parent span. - The default implementation of `AgentExecutionContextPropagator` does not propagate any context and can be used when the opentelemetry feature is not enabled. - Any further implementation could carry more than the OpenTelemetry context, e.g. MDC context, security context, etc. Refs: #698 --- .../opentelemetry/it/SimpleAgentExecutor.java | 11 ++++ .../it/OpenTelemetryA2ABaseTest.java | 50 +++++++++++++++++++ .../OtelAgentExecutionContextPropagator.java | 25 ++++++++++ ...elAgentExecutionContextPropagatorTest.java | 46 +++++++++++++++++ ...efaultAgentExecutionContextPropagator.java | 13 +++++ .../DefaultRequestHandler.java | 20 ++++++-- .../spi/AgentExecutionContextPropagator.java | 14 ++++++ ...ltAgentExecutionContextPropagatorTest.java | 15 ++++++ 8 files changed, 189 insertions(+), 5 deletions(-) create mode 100644 extras/opentelemetry/server/src/main/java/io/a2a/extras/opentelemetry/OtelAgentExecutionContextPropagator.java create mode 100644 extras/opentelemetry/server/src/test/java/io/a2a/extras/opentelemetry/OtelAgentExecutionContextPropagatorTest.java create mode 100644 server-common/src/main/java/io/a2a/server/context/DefaultAgentExecutionContextPropagator.java create mode 100644 server-common/src/main/java/io/a2a/server/spi/AgentExecutionContextPropagator.java create mode 100644 server-common/src/test/java/io/a2a/server/context/DefaultAgentExecutionContextPropagatorTest.java diff --git a/extras/opentelemetry/integration-tests/src/main/java/io/a2a/extras/opentelemetry/it/SimpleAgentExecutor.java b/extras/opentelemetry/integration-tests/src/main/java/io/a2a/extras/opentelemetry/it/SimpleAgentExecutor.java index 6d33dd89c..228f1cb97 100644 --- a/extras/opentelemetry/integration-tests/src/main/java/io/a2a/extras/opentelemetry/it/SimpleAgentExecutor.java +++ b/extras/opentelemetry/integration-tests/src/main/java/io/a2a/extras/opentelemetry/it/SimpleAgentExecutor.java @@ -6,6 +6,7 @@ import io.a2a.server.tasks.AgentEmitter; import io.a2a.spec.A2AError; import io.a2a.spec.TextPart; +import io.opentelemetry.api.trace.Tracer; import jakarta.enterprise.context.ApplicationScoped; /** @@ -15,6 +16,12 @@ @ApplicationScoped public class SimpleAgentExecutor implements AgentExecutor { + private final Tracer tracer; + + public SimpleAgentExecutor(Tracer tracer) { + this.tracer = tracer; + } + @Override public void execute(RequestContext context, AgentEmitter emitter) throws A2AError { // If task doesn't exist, create it @@ -29,6 +36,10 @@ public void execute(RequestContext context, AgentEmitter emitter) throws A2AErro .findFirst() .orElse(""); + tracer.spanBuilder("SimpleAgentExecutor.execute") + .setAttribute("user.input.length", userText.length()) + .startSpan() + .end(); // Echo it back String response = "Echo: " + userText; emitter.complete(A2A.toAgentMessage(response)); diff --git a/extras/opentelemetry/integration-tests/src/test/java/io/a2a/extras/opentelemetry/it/OpenTelemetryA2ABaseTest.java b/extras/opentelemetry/integration-tests/src/test/java/io/a2a/extras/opentelemetry/it/OpenTelemetryA2ABaseTest.java index 50c8c82ed..029b86f90 100644 --- a/extras/opentelemetry/integration-tests/src/test/java/io/a2a/extras/opentelemetry/it/OpenTelemetryA2ABaseTest.java +++ b/extras/opentelemetry/integration-tests/src/test/java/io/a2a/extras/opentelemetry/it/OpenTelemetryA2ABaseTest.java @@ -236,6 +236,56 @@ void testSpanAttributes() throws Exception { } } + @Test + void testOpenTelemetryContextIsPropagatedWhenSendMessageIsCalled() throws Exception { + Message testMessage = Message.builder() + .role(Message.Role.ROLE_USER) + .parts(Collections.singletonList(new TextPart("test message"))) + .contextId("context-1234") + .messageId("message-1234") + .build(); + client.sendMessage(testMessage); + + Thread.sleep(5000); + + List> spans = getSpans(); + var serverSpan = spans.stream().filter(span -> SpanKind.valueOf(span.get("kind").toString()) == SpanKind.SERVER).findFirst().get(); + var agentSpan = spans.stream().filter(span -> SpanKind.valueOf(span.get("kind").toString()) == SpanKind.INTERNAL).findFirst().get(); + + String serverTraceId = serverSpan.get("traceId").toString(); + String serverSpanId = serverSpan.get("spanId").toString(); + String agentTraceId = agentSpan.get("traceId").toString(); + String agentParentSpanId = agentSpan.get("parentSpanId").toString(); + + assertEquals(serverSpanId, agentParentSpanId); + assertEquals(serverTraceId, agentTraceId); + } + + @Test + void testOpenTelemetryContextIsPropagatedWhenSendMessageStreamIsCalled() throws Exception { + Message testMessage = Message.builder() + .role(Message.Role.ROLE_USER) + .parts(Collections.singletonList(new TextPart("test message"))) + .contextId("context-1234") + .messageId("message-1234") + .build(); + client.sendMessage(testMessage); + + Thread.sleep(5000); + + List> spans = getSpans(); + var serverSpan = spans.stream().filter(span -> SpanKind.valueOf(span.get("kind").toString()) == SpanKind.SERVER).findFirst().get(); + var agentSpan = spans.stream().filter(span -> SpanKind.valueOf(span.get("kind").toString()) == SpanKind.INTERNAL).findFirst().get(); + + String serverTraceId = serverSpan.get("traceId").toString(); + String serverSpanId = serverSpan.get("spanId").toString(); + String agentTraceId = agentSpan.get("traceId").toString(); + String agentParentSpanId = agentSpan.get("parentSpanId").toString(); + + assertEquals(serverSpanId, agentParentSpanId); + assertEquals(serverTraceId, agentTraceId); + } + protected void saveTaskInTaskStore(Task task) throws Exception { HttpClient httpClient = HttpClient.newBuilder() .version(HttpClient.Version.HTTP_2) diff --git a/extras/opentelemetry/server/src/main/java/io/a2a/extras/opentelemetry/OtelAgentExecutionContextPropagator.java b/extras/opentelemetry/server/src/main/java/io/a2a/extras/opentelemetry/OtelAgentExecutionContextPropagator.java new file mode 100644 index 000000000..046499352 --- /dev/null +++ b/extras/opentelemetry/server/src/main/java/io/a2a/extras/opentelemetry/OtelAgentExecutionContextPropagator.java @@ -0,0 +1,25 @@ +package io.a2a.extras.opentelemetry; + +import io.a2a.server.spi.AgentExecutionContextPropagator; +import io.opentelemetry.context.Context; +import jakarta.annotation.Priority; +import jakarta.enterprise.context.ApplicationScoped; +import jakarta.enterprise.inject.Alternative; + +/** + * An implementation of AgentExecutionContextPropagator that uses OpenTelemetry's Context to propagate the execution + * context. This allows for proper context propagation across threads and asynchronous operations when using + * OpenTelemetry for tracing. The implementation simply delegates the wrapping OpenTelemetry's context propagation + * mechanism. + * + * @see Context#wrap(Runnable) + */ +@ApplicationScoped +@Alternative +@Priority(1) +public class OtelAgentExecutionContextPropagator implements AgentExecutionContextPropagator { + @Override + public Runnable wrap(Runnable runnable) { + return Context.current().wrap(runnable); + } +} diff --git a/extras/opentelemetry/server/src/test/java/io/a2a/extras/opentelemetry/OtelAgentExecutionContextPropagatorTest.java b/extras/opentelemetry/server/src/test/java/io/a2a/extras/opentelemetry/OtelAgentExecutionContextPropagatorTest.java new file mode 100644 index 000000000..d83a7d825 --- /dev/null +++ b/extras/opentelemetry/server/src/test/java/io/a2a/extras/opentelemetry/OtelAgentExecutionContextPropagatorTest.java @@ -0,0 +1,46 @@ +package io.a2a.extras.opentelemetry; + +import io.opentelemetry.context.Context; +import io.opentelemetry.context.ContextKey; +import io.opentelemetry.context.Scope; +import org.junit.jupiter.api.AfterEach; +import org.junit.jupiter.api.BeforeEach; +import org.junit.jupiter.api.Test; + +import java.util.concurrent.CompletableFuture; +import java.util.concurrent.ExecutorService; +import java.util.concurrent.Executors; +import java.util.concurrent.TimeUnit; + +import static org.junit.jupiter.api.Assertions.assertEquals; + +class OtelAgentExecutionContextPropagatorTest { + + ContextKey exampleContextKey = ContextKey.named("test-key"); + Scope scope = null; + + @BeforeEach + void setup() { + Context context = Context.current().with(exampleContextKey, "test-value"); + scope = context.makeCurrent(); + } + + @Test + void testOtelAgentExecutionContextPropagation() throws InterruptedException { + + OtelAgentExecutionContextPropagator propagator = new OtelAgentExecutionContextPropagator(); + Runnable wrappedContext = + propagator.wrap(() -> assertEquals("test-value", Context.current().get(exampleContextKey))); + + ExecutorService executorService = Executors.newSingleThreadExecutor(); + CompletableFuture future = CompletableFuture.runAsync(wrappedContext, executorService); + future.join(); + executorService.shutdown(); + executorService.awaitTermination(5, TimeUnit.SECONDS); + } + + @AfterEach + void tearDown() { + scope.close(); + } +} \ No newline at end of file diff --git a/server-common/src/main/java/io/a2a/server/context/DefaultAgentExecutionContextPropagator.java b/server-common/src/main/java/io/a2a/server/context/DefaultAgentExecutionContextPropagator.java new file mode 100644 index 000000000..205d67a66 --- /dev/null +++ b/server-common/src/main/java/io/a2a/server/context/DefaultAgentExecutionContextPropagator.java @@ -0,0 +1,13 @@ +package io.a2a.server.context; + +import io.a2a.server.spi.AgentExecutionContextPropagator; +import jakarta.enterprise.context.ApplicationScoped; + +@ApplicationScoped +public class DefaultAgentExecutionContextPropagator implements AgentExecutionContextPropagator { + + @Override + public Runnable wrap(Runnable runnable) { + return runnable; + } +} diff --git a/server-common/src/main/java/io/a2a/server/requesthandlers/DefaultRequestHandler.java b/server-common/src/main/java/io/a2a/server/requesthandlers/DefaultRequestHandler.java index 2023fc7ac..3b1070980 100644 --- a/server-common/src/main/java/io/a2a/server/requesthandlers/DefaultRequestHandler.java +++ b/server-common/src/main/java/io/a2a/server/requesthandlers/DefaultRequestHandler.java @@ -20,6 +20,8 @@ import java.util.concurrent.atomic.AtomicReference; import java.util.function.Supplier; +import io.a2a.server.context.DefaultAgentExecutionContextPropagator; +import io.a2a.server.spi.AgentExecutionContextPropagator; import jakarta.annotation.PostConstruct; import jakarta.enterprise.context.ApplicationScoped; import jakarta.inject.Inject; @@ -225,6 +227,7 @@ public class DefaultRequestHandler implements RequestHandler { private PushNotificationConfigStore pushConfigStore; private MainEventBusProcessor mainEventBusProcessor; private Supplier requestContextBuilder; + private AgentExecutionContextPropagator context; private final ConcurrentMap> runningAgents = new ConcurrentHashMap<>(); @@ -248,6 +251,7 @@ protected DefaultRequestHandler() { this.requestContextBuilder = null; this.executor = null; this.eventConsumerExecutor = null; + this.context = null; } @Inject @@ -255,7 +259,8 @@ public DefaultRequestHandler(AgentExecutor agentExecutor, TaskStore taskStore, QueueManager queueManager, PushNotificationConfigStore pushConfigStore, MainEventBusProcessor mainEventBusProcessor, @Internal Executor executor, - @EventConsumerExecutor Executor eventConsumerExecutor) { + @EventConsumerExecutor Executor eventConsumerExecutor, + AgentExecutionContextPropagator context) { this.agentExecutor = agentExecutor; this.taskStore = taskStore; this.queueManager = queueManager; @@ -268,6 +273,7 @@ public DefaultRequestHandler(AgentExecutor agentExecutor, TaskStore taskStore, // I am unsure about the correct scope. // Also reworked to make a Supplier since otherwise the builder gets polluted with wrong tasks this.requestContextBuilder = () -> new SimpleRequestContextBuilder(taskStore, false); + this.context = context; } @PostConstruct @@ -288,7 +294,7 @@ public static DefaultRequestHandler create(AgentExecutor agentExecutor, TaskStor Executor executor, Executor eventConsumerExecutor) { DefaultRequestHandler handler = new DefaultRequestHandler(agentExecutor, taskStore, queueManager, pushConfigStore, - mainEventBusProcessor, executor, eventConsumerExecutor); + mainEventBusProcessor, executor, eventConsumerExecutor, new DefaultAgentExecutionContextPropagator()); handler.agentCompletionTimeoutSeconds = 5; handler.consumptionCompletionTimeoutSeconds = 2; @@ -883,9 +889,8 @@ private boolean shouldAddPushInfo(MessageSendParams params) { private EnhancedRunnable registerAndExecuteAgentAsync(String taskId, RequestContext requestContext, EventQueue queue, EnhancedRunnable.DoneCallback doneCallback) { LOGGER.debug("Registering agent execution for task {}, runningAgents.size() before: {}", taskId, runningAgents.size()); logThreadStats("AGENT START"); - EnhancedRunnable runnable = new EnhancedRunnable() { - @Override - public void run() { + + Runnable wrappedRunnable = context.wrap(() -> { LOGGER.debug("Agent execution starting for task {}", taskId); AgentEmitter emitter = new AgentEmitter(requestContext, queue); try { @@ -909,6 +914,11 @@ public void run() { LOGGER.debug("Agent execution completed for task {}", taskId); // The consumer (running on the Vert.x worker thread) handles queue lifecycle. // This avoids blocking agent-executor threads waiting for worker threads. + }); + EnhancedRunnable runnable = new EnhancedRunnable() { + @Override + public void run() { + wrappedRunnable.run(); } }; diff --git a/server-common/src/main/java/io/a2a/server/spi/AgentExecutionContextPropagator.java b/server-common/src/main/java/io/a2a/server/spi/AgentExecutionContextPropagator.java new file mode 100644 index 000000000..38c8f896c --- /dev/null +++ b/server-common/src/main/java/io/a2a/server/spi/AgentExecutionContextPropagator.java @@ -0,0 +1,14 @@ +package io.a2a.server.spi; + +/** + * Interface for propagating the agent execution context across different threads or asynchronous operations. + * Implementations can wrap Runnable tasks to ensure that the context is properly propagated. + */ +public interface AgentExecutionContextPropagator { + + /** + * Wraps a Runnable task to propagate the agent execution context when the task is executed. + * @param runnable the Runnable task to wrap + */ + Runnable wrap(Runnable runnable); +} diff --git a/server-common/src/test/java/io/a2a/server/context/DefaultAgentExecutionContextPropagatorTest.java b/server-common/src/test/java/io/a2a/server/context/DefaultAgentExecutionContextPropagatorTest.java new file mode 100644 index 000000000..d61101732 --- /dev/null +++ b/server-common/src/test/java/io/a2a/server/context/DefaultAgentExecutionContextPropagatorTest.java @@ -0,0 +1,15 @@ +package io.a2a.server.context; + +import org.junit.jupiter.api.Assertions; +import org.junit.jupiter.api.Test; + +class DefaultAgentExecutionContextPropagatorTest { + + @Test + void testDefaultAgentExecutionPropagation() { + DefaultAgentExecutionContextPropagator propagator = new DefaultAgentExecutionContextPropagator(); + Runnable runnable = () -> { + }; + Assertions.assertSame(runnable, propagator.wrap(runnable)); + } +} \ No newline at end of file