feat: Add in-process context propagation for OpenTelemetry agent execution#699
feat: Add in-process context propagation for OpenTelemetry agent execution#699vietk wants to merge 1 commit intoa2aproject:mainfrom
Conversation
…ution Introduce a context propagation mechanism when an agent execution is requested. The context propagation wraps the agent execution in a Runnable and an implementation of `AgentExecutionContextPropagator`. - An OpenTelemetry implementation of `AgentExecutionContextPropagator` is provided in the opentelemetry module and is responsible to propagate the OpenTelemetry context between threads server thread and agent execution thread. This is needed to ensure well formed traces with a valid parent span. - The default implementation of `AgentExecutionContextPropagator` does not propagate any context and can be used when the opentelemetry feature is not enabled. - Any further implementation could carry more than the OpenTelemetry context, e.g. MDC context, security context, etc. Refs: a2aproject#698
Summary of ChangesHello, I'm Gemini Code Assist1! I'm currently reviewing this pull request and will post my feedback shortly. In the meantime, here's a summary to help you and other reviewers quickly get up to speed! This pull request establishes a foundational mechanism for in-process context propagation during agent execution, primarily to support OpenTelemetry tracing. By introducing a new Highlights
Changelog
Using Gemini Code AssistThe full guide for Gemini Code Assist can be found on our documentation page, here are some quick tips. Invoking Gemini You can request assistance from Gemini at any point by creating a comment using either
Customization To customize Gemini Code Assist for GitHub experience, repository maintainers can create a configuration file and/or provide a custom code review style guide (such as PEP-8 for Python) by creating and adding files to a Limitations & Feedback Gemini Code Assist may make mistakes. Please leave feedback on any instances where its feedback is incorrect or counter productive. You can react with 👍 and 👎 on @gemini-code-assist comments. If you're interested in giving your feedback about your experience with Gemini Code Assist for Github and other Google products, sign up here. You can also get AI-powered code generation, chat, as well as code reviews directly in the IDE at no cost with the Gemini Code Assist IDE Extension. Footnotes
|
There was a problem hiding this comment.
Code Review
The pull request introduces a context propagation mechanism for OpenTelemetry agent execution, which is crucial for maintaining trace continuity across asynchronous operations. It provides a default no-op implementation and an OpenTelemetry-specific one, allowing for flexible integration. The changes correctly integrate the new AgentExecutionContextPropagator into the DefaultRequestHandler and include relevant integration tests. While the core functionality is well-implemented, there are several areas in the test code that could be improved for robustness, maintainability, and reliability, such as replacing Thread.sleep() with Awaitility, handling Optional values more safely, and reducing code duplication.
| tracer.spanBuilder("SimpleAgentExecutor.execute") | ||
| .setAttribute("user.input.length", userText.length()) | ||
| .startSpan() | ||
| .end(); | ||
| // Echo it back | ||
| String response = "Echo: " + userText; | ||
| emitter.complete(A2A.toAgentMessage(response)); |
There was a problem hiding this comment.
The current OpenTelemetry span creation (startSpan().end()) immediately closes the span. This means it won't accurately capture the duration of the execute method or be properly parented if exceptions occur within the method. For accurate tracing and robust span management, it's best practice to use a try-with-resources block with Span.current().makeCurrent() to ensure the span encompasses the entire operation and is correctly ended, even in the presence of exceptions.
io.opentelemetry.api.trace.Span span = tracer.spanBuilder("SimpleAgentExecutor.execute")
.setAttribute("user.input.length", userText.length())
.startSpan();
try (io.opentelemetry.context.Scope scope = span.makeCurrent()) {
// Echo it back
String response = "Echo: " + userText;
emitter.complete(A2A.toAgentMessage(response));
} finally {
span.end();
}| .build(); | ||
| client.sendMessage(testMessage); | ||
|
|
||
| Thread.sleep(5000); |
There was a problem hiding this comment.
Using Thread.sleep() in tests can lead to flakiness and unnecessarily slow down test execution. It's more robust to use a waiting mechanism like Awaitility to wait for a specific condition to be met (e.g., spans being available in the exporter) rather than relying on arbitrary delays. This improves test reliability and performance.
| Thread.sleep(5000); | |
| await().atMost(5, SECONDS).until(() -> !getSpans().isEmpty()); |
| Thread.sleep(5000); | ||
|
|
||
| List<Map<String, Object>> spans = getSpans(); | ||
| var serverSpan = spans.stream().filter(span -> SpanKind.valueOf(span.get("kind").toString()) == SpanKind.SERVER).findFirst().get(); |
There was a problem hiding this comment.
Directly calling .get() on an Optional without first checking isPresent() can lead to a NoSuchElementException if no matching span is found. It's safer to use orElseThrow() with a descriptive message to handle the absence of a value gracefully, making the test more robust.
| var serverSpan = spans.stream().filter(span -> SpanKind.valueOf(span.get("kind").toString()) == SpanKind.SERVER).findFirst().get(); | |
| var serverSpan = spans.stream().filter(span -> SpanKind.valueOf(span.get("kind").toString()) == SpanKind.SERVER).findFirst().orElseThrow(() -> new AssertionError("Server span not found")); |
|
|
||
| List<Map<String, Object>> spans = getSpans(); | ||
| var serverSpan = spans.stream().filter(span -> SpanKind.valueOf(span.get("kind").toString()) == SpanKind.SERVER).findFirst().get(); | ||
| var agentSpan = spans.stream().filter(span -> SpanKind.valueOf(span.get("kind").toString()) == SpanKind.INTERNAL).findFirst().get(); |
There was a problem hiding this comment.
Directly calling .get() on an Optional without first checking isPresent() can lead to a NoSuchElementException if no matching span is found. It's safer to use orElseThrow() with a descriptive message to handle the absence of a value gracefully, making the test more robust.
| var agentSpan = spans.stream().filter(span -> SpanKind.valueOf(span.get("kind").toString()) == SpanKind.INTERNAL).findFirst().get(); | |
| var agentSpan = spans.stream().filter(span -> SpanKind.valueOf(span.get("kind").toString()) == SpanKind.INTERNAL).findFirst().orElseThrow(() -> new AssertionError("Agent span not found")); |
| client.sendMessage(testMessage); | ||
|
|
||
| Thread.sleep(5000); | ||
|
|
There was a problem hiding this comment.
Using Thread.sleep() in tests can lead to flakiness and unnecessarily slow down test execution. It's more robust to use a waiting mechanism like Awaitility to wait for a specific condition to be met (e.g., spans being available in the exporter) rather than relying on arbitrary delays. This improves test reliability and performance.
| await().atMost(5, SECONDS).until(() -> !getSpans().isEmpty()); |
|
|
||
| List<Map<String, Object>> spans = getSpans(); | ||
| var serverSpan = spans.stream().filter(span -> SpanKind.valueOf(span.get("kind").toString()) == SpanKind.SERVER).findFirst().get(); | ||
| var agentSpan = spans.stream().filter(span -> SpanKind.valueOf(span.get("kind").toString()) == SpanKind.INTERNAL).findFirst().get(); |
There was a problem hiding this comment.
Directly calling .get() on an Optional without first checking isPresent() can lead to a NoSuchElementException if no matching span is found. It's safer to use orElseThrow() with a descriptive message to handle the absence of a value gracefully, making the test more robust.
| var agentSpan = spans.stream().filter(span -> SpanKind.valueOf(span.get("kind").toString()) == SpanKind.INTERNAL).findFirst().get(); | |
| var serverSpan = spans.stream().filter(span -> SpanKind.valueOf(span.get("kind").toString()) == SpanKind.SERVER).findFirst().orElseThrow(() -> new AssertionError("Server span not found")); |
| List<Map<String, Object>> spans = getSpans(); | ||
| var serverSpan = spans.stream().filter(span -> SpanKind.valueOf(span.get("kind").toString()) == SpanKind.SERVER).findFirst().get(); | ||
| var agentSpan = spans.stream().filter(span -> SpanKind.valueOf(span.get("kind").toString()) == SpanKind.INTERNAL).findFirst().get(); | ||
|
|
There was a problem hiding this comment.
Directly calling .get() on an Optional without first checking isPresent() can lead to a NoSuchElementException if no matching span is found. It's safer to use orElseThrow() with a descriptive message to handle the absence of a value gracefully, making the test more robust.
| var agentSpan = spans.stream().filter(span -> SpanKind.valueOf(span.get("kind").toString()) == SpanKind.INTERNAL).findFirst().orElseThrow(() -> new AssertionError("Agent span not found")); |
| @Test | ||
| void testOpenTelemetryContextIsPropagatedWhenSendMessageStreamIsCalled() throws Exception { | ||
| Message testMessage = Message.builder() | ||
| .role(Message.Role.ROLE_USER) | ||
| .parts(Collections.singletonList(new TextPart("test message"))) | ||
| .contextId("context-1234") | ||
| .messageId("message-1234") | ||
| .build(); | ||
| client.sendMessage(testMessage); | ||
|
|
||
| Thread.sleep(5000); | ||
|
|
||
| List<Map<String, Object>> spans = getSpans(); | ||
| var serverSpan = spans.stream().filter(span -> SpanKind.valueOf(span.get("kind").toString()) == SpanKind.SERVER).findFirst().get(); | ||
| var agentSpan = spans.stream().filter(span -> SpanKind.valueOf(span.get("kind").toString()) == SpanKind.INTERNAL).findFirst().get(); | ||
|
|
||
| String serverTraceId = serverSpan.get("traceId").toString(); | ||
| String serverSpanId = serverSpan.get("spanId").toString(); | ||
| String agentTraceId = agentSpan.get("traceId").toString(); | ||
| String agentParentSpanId = agentSpan.get("parentSpanId").toString(); | ||
|
|
||
| assertEquals(serverSpanId, agentParentSpanId); | ||
| assertEquals(serverTraceId, agentTraceId); | ||
| } |
There was a problem hiding this comment.
The test methods testOpenTelemetryContextIsPropagatedWhenSendMessageIsCalled and testOpenTelemetryContextIsPropagatedWhenSendMessageStreamIsCalled contain significant code duplication. Extracting the common logic into a private helper method would improve maintainability, readability, and reduce the chance of errors when updating test logic.
| @ApplicationScoped | ||
| @Alternative | ||
| @Priority(1) | ||
| public class OtelAgentExecutionContextPropagator implements AgentExecutionContextPropagator { |
There was a problem hiding this comment.
Do we really need this? I think we can always wrap the context if you add the opentelemetry API dependency to the project, no?
There was a problem hiding this comment.
yep but because opentelemetry is provided as an extra dependency I was forced to use a SPI to implement it externally.
Adopting opentelemetry directly in the server is more a refactoring of the project that might not be suitable for a contribution
|
I think (but I need to finish some other stuff to check) that this is overkill and the best bet is to produce a ManagedExecutorService instead of the default @internal ExecutorService |
Description
Introduce a context propagation mechanism when an agent execution is requested.
The context propagation wraps the agent execution in a Runnable that can propagates a 'Context'
AgentExecutionContextPropagatoris provided in the opentelemetry module and is responsible to propagate the OpenTelemetry context between threads server thread and agent execution thread. This is needed to ensure well formed traces with a valid parent span.AgentExecutionContextPropagatordoes not propagate any context and can be used when the opentelemetry feature is not enabled.Fixes #698