Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -6,6 +6,7 @@
import io.a2a.server.tasks.AgentEmitter;
import io.a2a.spec.A2AError;
import io.a2a.spec.TextPart;
import io.opentelemetry.api.trace.Tracer;
import jakarta.enterprise.context.ApplicationScoped;

/**
Expand All @@ -15,6 +16,12 @@
@ApplicationScoped
public class SimpleAgentExecutor implements AgentExecutor {

private final Tracer tracer;

public SimpleAgentExecutor(Tracer tracer) {
this.tracer = tracer;
}

@Override
public void execute(RequestContext context, AgentEmitter emitter) throws A2AError {
// If task doesn't exist, create it
Expand All @@ -29,6 +36,10 @@ public void execute(RequestContext context, AgentEmitter emitter) throws A2AErro
.findFirst()
.orElse("");

tracer.spanBuilder("SimpleAgentExecutor.execute")
.setAttribute("user.input.length", userText.length())
.startSpan()
.end();
// Echo it back
String response = "Echo: " + userText;
emitter.complete(A2A.toAgentMessage(response));
Comment on lines +39 to 45
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

high

The current OpenTelemetry span creation (startSpan().end()) immediately closes the span. This means it won't accurately capture the duration of the execute method or be properly parented if exceptions occur within the method. For accurate tracing and robust span management, it's best practice to use a try-with-resources block with Span.current().makeCurrent() to ensure the span encompasses the entire operation and is correctly ended, even in the presence of exceptions.

        io.opentelemetry.api.trace.Span span = tracer.spanBuilder("SimpleAgentExecutor.execute")
                .setAttribute("user.input.length", userText.length())
                .startSpan();
        try (io.opentelemetry.context.Scope scope = span.makeCurrent()) {
            // Echo it back
            String response = "Echo: " + userText;
            emitter.complete(A2A.toAgentMessage(response));
        } finally {
            span.end();
        }

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -236,6 +236,56 @@ void testSpanAttributes() throws Exception {
}
}

@Test
void testOpenTelemetryContextIsPropagatedWhenSendMessageIsCalled() throws Exception {
Message testMessage = Message.builder()
.role(Message.Role.ROLE_USER)
.parts(Collections.singletonList(new TextPart("test message")))
.contextId("context-1234")
.messageId("message-1234")
.build();
client.sendMessage(testMessage);

Thread.sleep(5000);
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

high

Using Thread.sleep() in tests can lead to flakiness and unnecessarily slow down test execution. It's more robust to use a waiting mechanism like Awaitility to wait for a specific condition to be met (e.g., spans being available in the exporter) rather than relying on arbitrary delays. This improves test reliability and performance.

Suggested change
Thread.sleep(5000);
await().atMost(5, SECONDS).until(() -> !getSpans().isEmpty());


List<Map<String, Object>> spans = getSpans();
var serverSpan = spans.stream().filter(span -> SpanKind.valueOf(span.get("kind").toString()) == SpanKind.SERVER).findFirst().get();
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

high

Directly calling .get() on an Optional without first checking isPresent() can lead to a NoSuchElementException if no matching span is found. It's safer to use orElseThrow() with a descriptive message to handle the absence of a value gracefully, making the test more robust.

Suggested change
var serverSpan = spans.stream().filter(span -> SpanKind.valueOf(span.get("kind").toString()) == SpanKind.SERVER).findFirst().get();
var serverSpan = spans.stream().filter(span -> SpanKind.valueOf(span.get("kind").toString()) == SpanKind.SERVER).findFirst().orElseThrow(() -> new AssertionError("Server span not found"));

var agentSpan = spans.stream().filter(span -> SpanKind.valueOf(span.get("kind").toString()) == SpanKind.INTERNAL).findFirst().get();
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

high

Directly calling .get() on an Optional without first checking isPresent() can lead to a NoSuchElementException if no matching span is found. It's safer to use orElseThrow() with a descriptive message to handle the absence of a value gracefully, making the test more robust.

Suggested change
var agentSpan = spans.stream().filter(span -> SpanKind.valueOf(span.get("kind").toString()) == SpanKind.INTERNAL).findFirst().get();
var agentSpan = spans.stream().filter(span -> SpanKind.valueOf(span.get("kind").toString()) == SpanKind.INTERNAL).findFirst().orElseThrow(() -> new AssertionError("Agent span not found"));


String serverTraceId = serverSpan.get("traceId").toString();
String serverSpanId = serverSpan.get("spanId").toString();
String agentTraceId = agentSpan.get("traceId").toString();
String agentParentSpanId = agentSpan.get("parentSpanId").toString();

assertEquals(serverSpanId, agentParentSpanId);
assertEquals(serverTraceId, agentTraceId);
}

@Test
void testOpenTelemetryContextIsPropagatedWhenSendMessageStreamIsCalled() throws Exception {
Message testMessage = Message.builder()
.role(Message.Role.ROLE_USER)
.parts(Collections.singletonList(new TextPart("test message")))
.contextId("context-1234")
.messageId("message-1234")
.build();
client.sendMessage(testMessage);

Thread.sleep(5000);

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

high

Using Thread.sleep() in tests can lead to flakiness and unnecessarily slow down test execution. It's more robust to use a waiting mechanism like Awaitility to wait for a specific condition to be met (e.g., spans being available in the exporter) rather than relying on arbitrary delays. This improves test reliability and performance.

Suggested change
await().atMost(5, SECONDS).until(() -> !getSpans().isEmpty());

List<Map<String, Object>> spans = getSpans();
var serverSpan = spans.stream().filter(span -> SpanKind.valueOf(span.get("kind").toString()) == SpanKind.SERVER).findFirst().get();
var agentSpan = spans.stream().filter(span -> SpanKind.valueOf(span.get("kind").toString()) == SpanKind.INTERNAL).findFirst().get();
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

high

Directly calling .get() on an Optional without first checking isPresent() can lead to a NoSuchElementException if no matching span is found. It's safer to use orElseThrow() with a descriptive message to handle the absence of a value gracefully, making the test more robust.

Suggested change
var agentSpan = spans.stream().filter(span -> SpanKind.valueOf(span.get("kind").toString()) == SpanKind.INTERNAL).findFirst().get();
var serverSpan = spans.stream().filter(span -> SpanKind.valueOf(span.get("kind").toString()) == SpanKind.SERVER).findFirst().orElseThrow(() -> new AssertionError("Server span not found"));


Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

high

Directly calling .get() on an Optional without first checking isPresent() can lead to a NoSuchElementException if no matching span is found. It's safer to use orElseThrow() with a descriptive message to handle the absence of a value gracefully, making the test more robust.

Suggested change
var agentSpan = spans.stream().filter(span -> SpanKind.valueOf(span.get("kind").toString()) == SpanKind.INTERNAL).findFirst().orElseThrow(() -> new AssertionError("Agent span not found"));

String serverTraceId = serverSpan.get("traceId").toString();
String serverSpanId = serverSpan.get("spanId").toString();
String agentTraceId = agentSpan.get("traceId").toString();
String agentParentSpanId = agentSpan.get("parentSpanId").toString();

assertEquals(serverSpanId, agentParentSpanId);
assertEquals(serverTraceId, agentTraceId);
}
Comment on lines +264 to +287
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

medium

The test methods testOpenTelemetryContextIsPropagatedWhenSendMessageIsCalled and testOpenTelemetryContextIsPropagatedWhenSendMessageStreamIsCalled contain significant code duplication. Extracting the common logic into a private helper method would improve maintainability, readability, and reduce the chance of errors when updating test logic.


protected void saveTaskInTaskStore(Task task) throws Exception {
HttpClient httpClient = HttpClient.newBuilder()
.version(HttpClient.Version.HTTP_2)
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,25 @@
package io.a2a.extras.opentelemetry;

import io.a2a.server.spi.AgentExecutionContextPropagator;
import io.opentelemetry.context.Context;
import jakarta.annotation.Priority;
import jakarta.enterprise.context.ApplicationScoped;
import jakarta.enterprise.inject.Alternative;

/**
* An implementation of AgentExecutionContextPropagator that uses OpenTelemetry's Context to propagate the execution
* context. This allows for proper context propagation across threads and asynchronous operations when using
* OpenTelemetry for tracing. The implementation simply delegates the wrapping OpenTelemetry's context propagation
* mechanism.
*
* @see Context#wrap(Runnable)
*/
@ApplicationScoped
@Alternative
@Priority(1)
public class OtelAgentExecutionContextPropagator implements AgentExecutionContextPropagator {
Copy link

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Do we really need this? I think we can always wrap the context if you add the opentelemetry API dependency to the project, no?

Copy link
Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

yep but because opentelemetry is provided as an extra dependency I was forced to use a SPI to implement it externally.
Adopting opentelemetry directly in the server is more a refactoring of the project that might not be suitable for a contribution

@Override
public Runnable wrap(Runnable runnable) {
return Context.current().wrap(runnable);
}
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,46 @@
package io.a2a.extras.opentelemetry;

import io.opentelemetry.context.Context;
import io.opentelemetry.context.ContextKey;
import io.opentelemetry.context.Scope;
import org.junit.jupiter.api.AfterEach;
import org.junit.jupiter.api.BeforeEach;
import org.junit.jupiter.api.Test;

import java.util.concurrent.CompletableFuture;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;
import java.util.concurrent.TimeUnit;

import static org.junit.jupiter.api.Assertions.assertEquals;

class OtelAgentExecutionContextPropagatorTest {

ContextKey<String> exampleContextKey = ContextKey.named("test-key");
Scope scope = null;

@BeforeEach
void setup() {
Context context = Context.current().with(exampleContextKey, "test-value");
scope = context.makeCurrent();
}

@Test
void testOtelAgentExecutionContextPropagation() throws InterruptedException {

OtelAgentExecutionContextPropagator propagator = new OtelAgentExecutionContextPropagator();
Runnable wrappedContext =
propagator.wrap(() -> assertEquals("test-value", Context.current().get(exampleContextKey)));

ExecutorService executorService = Executors.newSingleThreadExecutor();
CompletableFuture<Void> future = CompletableFuture.runAsync(wrappedContext, executorService);
future.join();
executorService.shutdown();
executorService.awaitTermination(5, TimeUnit.SECONDS);
}

@AfterEach
void tearDown() {
scope.close();
}
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,13 @@
package io.a2a.server.context;

import io.a2a.server.spi.AgentExecutionContextPropagator;
import jakarta.enterprise.context.ApplicationScoped;

@ApplicationScoped
public class DefaultAgentExecutionContextPropagator implements AgentExecutionContextPropagator {

@Override
public Runnable wrap(Runnable runnable) {
return runnable;
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -20,6 +20,8 @@
import java.util.concurrent.atomic.AtomicReference;
import java.util.function.Supplier;

import io.a2a.server.context.DefaultAgentExecutionContextPropagator;
import io.a2a.server.spi.AgentExecutionContextPropagator;
import jakarta.annotation.PostConstruct;
import jakarta.enterprise.context.ApplicationScoped;
import jakarta.inject.Inject;
Expand Down Expand Up @@ -225,6 +227,7 @@ public class DefaultRequestHandler implements RequestHandler {
private PushNotificationConfigStore pushConfigStore;
private MainEventBusProcessor mainEventBusProcessor;
private Supplier<RequestContext.Builder> requestContextBuilder;
private AgentExecutionContextPropagator context;

private final ConcurrentMap<String, CompletableFuture<Void>> runningAgents = new ConcurrentHashMap<>();

Expand All @@ -248,14 +251,16 @@ protected DefaultRequestHandler() {
this.requestContextBuilder = null;
this.executor = null;
this.eventConsumerExecutor = null;
this.context = null;
}

@Inject
public DefaultRequestHandler(AgentExecutor agentExecutor, TaskStore taskStore,
QueueManager queueManager, PushNotificationConfigStore pushConfigStore,
MainEventBusProcessor mainEventBusProcessor,
@Internal Executor executor,
@EventConsumerExecutor Executor eventConsumerExecutor) {
@EventConsumerExecutor Executor eventConsumerExecutor,
AgentExecutionContextPropagator context) {
this.agentExecutor = agentExecutor;
this.taskStore = taskStore;
this.queueManager = queueManager;
Expand All @@ -268,6 +273,7 @@ public DefaultRequestHandler(AgentExecutor agentExecutor, TaskStore taskStore,
// I am unsure about the correct scope.
// Also reworked to make a Supplier since otherwise the builder gets polluted with wrong tasks
this.requestContextBuilder = () -> new SimpleRequestContextBuilder(taskStore, false);
this.context = context;
}

@PostConstruct
Expand All @@ -288,7 +294,7 @@ public static DefaultRequestHandler create(AgentExecutor agentExecutor, TaskStor
Executor executor, Executor eventConsumerExecutor) {
DefaultRequestHandler handler =
new DefaultRequestHandler(agentExecutor, taskStore, queueManager, pushConfigStore,
mainEventBusProcessor, executor, eventConsumerExecutor);
mainEventBusProcessor, executor, eventConsumerExecutor, new DefaultAgentExecutionContextPropagator());
handler.agentCompletionTimeoutSeconds = 5;
handler.consumptionCompletionTimeoutSeconds = 2;

Expand Down Expand Up @@ -883,9 +889,8 @@ private boolean shouldAddPushInfo(MessageSendParams params) {
private EnhancedRunnable registerAndExecuteAgentAsync(String taskId, RequestContext requestContext, EventQueue queue, EnhancedRunnable.DoneCallback doneCallback) {
LOGGER.debug("Registering agent execution for task {}, runningAgents.size() before: {}", taskId, runningAgents.size());
logThreadStats("AGENT START");
EnhancedRunnable runnable = new EnhancedRunnable() {
@Override
public void run() {

Runnable wrappedRunnable = context.wrap(() -> {
LOGGER.debug("Agent execution starting for task {}", taskId);
AgentEmitter emitter = new AgentEmitter(requestContext, queue);
try {
Expand All @@ -909,6 +914,11 @@ public void run() {
LOGGER.debug("Agent execution completed for task {}", taskId);
// The consumer (running on the Vert.x worker thread) handles queue lifecycle.
// This avoids blocking agent-executor threads waiting for worker threads.
});
EnhancedRunnable runnable = new EnhancedRunnable() {
@Override
public void run() {
wrappedRunnable.run();
}
};

Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,14 @@
package io.a2a.server.spi;

/**
* Interface for propagating the agent execution context across different threads or asynchronous operations.
* Implementations can wrap Runnable tasks to ensure that the context is properly propagated.
*/
public interface AgentExecutionContextPropagator {

/**
* Wraps a Runnable task to propagate the agent execution context when the task is executed.
* @param runnable the Runnable task to wrap
*/
Runnable wrap(Runnable runnable);
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,15 @@
package io.a2a.server.context;

import org.junit.jupiter.api.Assertions;
import org.junit.jupiter.api.Test;

class DefaultAgentExecutionContextPropagatorTest {

@Test
void testDefaultAgentExecutionPropagation() {
DefaultAgentExecutionContextPropagator propagator = new DefaultAgentExecutionContextPropagator();
Runnable runnable = () -> {
};
Assertions.assertSame(runnable, propagator.wrap(runnable));
}
}
Loading