Skip to content

feat: Add in-process context propagation for OpenTelemetry agent execution#699

Open
vietk wants to merge 1 commit intoa2aproject:mainfrom
vietk:feature/otel_in_process_context_propagation
Open

feat: Add in-process context propagation for OpenTelemetry agent execution#699
vietk wants to merge 1 commit intoa2aproject:mainfrom
vietk:feature/otel_in_process_context_propagation

Conversation

@vietk
Copy link

@vietk vietk commented Mar 4, 2026

Description

Introduce a context propagation mechanism when an agent execution is requested.
The context propagation wraps the agent execution in a Runnable that can propagates a 'Context'

  • An OpenTelemetry implementation of AgentExecutionContextPropagator is provided in the opentelemetry module and is responsible to propagate the OpenTelemetry context between threads server thread and agent execution thread. This is needed to ensure well formed traces with a valid parent span.
  • The default implementation of AgentExecutionContextPropagator does not propagate any context and can be used when the opentelemetry feature is not enabled.
  • Any further implementation could carry more than the OpenTelemetry context, e.g. MDC context, security context, etc.

Fixes #698

…ution

Introduce a context propagation mechanism when an agent execution is requested.
The context propagation wraps the agent execution in a Runnable and an
implementation of `AgentExecutionContextPropagator`.
- An OpenTelemetry implementation of `AgentExecutionContextPropagator` is
provided in the opentelemetry module and is responsible to propagate
the OpenTelemetry context between threads server thread and agent execution thread.
This is needed to ensure well formed traces with a valid parent span.
- The default implementation of `AgentExecutionContextPropagator` does not propagate any context and can be used when the opentelemetry feature is not enabled.
- Any further implementation could carry more than the OpenTelemetry context, e.g. MDC context, security context, etc.

Refs: a2aproject#698
@gemini-code-assist
Copy link
Contributor

Summary of Changes

Hello, I'm Gemini Code Assist1! I'm currently reviewing this pull request and will post my feedback shortly. In the meantime, here's a summary to help you and other reviewers quickly get up to speed!

This pull request establishes a foundational mechanism for in-process context propagation during agent execution, primarily to support OpenTelemetry tracing. By introducing a new AgentExecutionContextPropagator interface and its OpenTelemetry-specific implementation, it ensures that tracing contexts are seamlessly carried across thread boundaries when agent tasks are processed. This enhancement is vital for maintaining accurate and complete distributed traces, significantly improving the observability of agent-driven workflows. A default, non-functional implementation is also included to provide flexibility for environments not utilizing OpenTelemetry.

Highlights

  • Context Propagation Interface: Introduced the AgentExecutionContextPropagator interface, defining a standard mechanism for propagating execution context across different threads or asynchronous operations within agent execution.
  • OpenTelemetry Integration: Implemented OtelAgentExecutionContextPropagator which leverages OpenTelemetry's Context.wrap to ensure OpenTelemetry trace contexts are correctly propagated between the server thread and agent execution threads, crucial for well-formed distributed traces.
  • Default No-Op Implementation: Provided DefaultAgentExecutionContextPropagator as a no-operation implementation, serving as a fallback when OpenTelemetry or other specific context propagation features are not enabled.
  • Agent Execution Wrapping: Integrated the AgentExecutionContextPropagator into DefaultRequestHandler to wrap agent execution Runnable tasks, ensuring that the chosen context propagation mechanism is applied automatically.
  • Enhanced Tracing in SimpleAgentExecutor: Updated SimpleAgentExecutor to inject an OpenTelemetry Tracer and create spans, demonstrating how agent executions can be instrumented for tracing.
  • Context Propagation Verification: Added new integration tests to OpenTelemetryA2ABaseTest to explicitly verify that OpenTelemetry trace contexts are correctly propagated and linked between server and agent spans.
Changelog
  • extras/opentelemetry/integration-tests/src/main/java/io/a2a/extras/opentelemetry/it/SimpleAgentExecutor.java
    • Added Tracer dependency and constructor injection.
    • Created a new OpenTelemetry span for the execute method with an attribute for user input length.
  • extras/opentelemetry/integration-tests/src/test/java/io/a2a/extras/opentelemetry/it/OpenTelemetryA2ABaseTest.java
    • Added testOpenTelemetryContextIsPropagatedWhenSendMessageIsCalled to verify context propagation for sendMessage.
    • Added testOpenTelemetryContextIsPropagatedWhenSendMessageStreamIsCalled to verify context propagation for sendMessageStream.
  • extras/opentelemetry/server/src/main/java/io/a2a/extras/opentelemetry/OtelAgentExecutionContextPropagator.java
    • Added new class OtelAgentExecutionContextPropagator implementing AgentExecutionContextPropagator.
    • Implemented wrap method using io.opentelemetry.context.Context.current().wrap(runnable) for OpenTelemetry context propagation.
  • extras/opentelemetry/server/src/test/java/io/a2a/extras/opentelemetry/OtelAgentExecutionContextPropagatorTest.java
    • Added new unit test for OtelAgentExecutionContextPropagator to confirm correct context propagation across threads.
  • server-common/src/main/java/io/a2a/server/context/DefaultAgentExecutionContextPropagator.java
    • Added new class DefaultAgentExecutionContextPropagator implementing AgentExecutionContextPropagator.
    • Provided a no-op wrap method that returns the original Runnable.
  • server-common/src/main/java/io/a2a/server/requesthandlers/DefaultRequestHandler.java
    • Imported AgentExecutionContextPropagator and DefaultAgentExecutionContextPropagator.
    • Added AgentExecutionContextPropagator as a dependency and injected it via the constructor.
    • Modified the create static method to instantiate DefaultAgentExecutionContextPropagator.
    • Wrapped the agent execution Runnable with the injected AgentExecutionContextPropagator.
  • server-common/src/main/java/io/a2a/server/spi/AgentExecutionContextPropagator.java
    • Added new interface AgentExecutionContextPropagator with a single wrap method.
  • server-common/src/test/java/io/a2a/server/context/DefaultAgentExecutionContextPropagatorTest.java
    • Added new unit test for DefaultAgentExecutionContextPropagator to confirm its no-op behavior.
Using Gemini Code Assist

The full guide for Gemini Code Assist can be found on our documentation page, here are some quick tips.

Invoking Gemini

You can request assistance from Gemini at any point by creating a comment using either /gemini <command> or @gemini-code-assist <command>. Below is a summary of the supported commands on the current page.

Feature Command Description
Code Review /gemini review Performs a code review for the current pull request in its current state.
Pull Request Summary /gemini summary Provides a summary of the current pull request in its current state.
Comment @gemini-code-assist Responds in comments when explicitly tagged, both in pull request comments and review comments.
Help /gemini help Displays a list of available commands.

Customization

To customize Gemini Code Assist for GitHub experience, repository maintainers can create a configuration file and/or provide a custom code review style guide (such as PEP-8 for Python) by creating and adding files to a .gemini/ folder in the base of the repository. Detailed instructions can be found here.

Limitations & Feedback

Gemini Code Assist may make mistakes. Please leave feedback on any instances where its feedback is incorrect or counter productive. You can react with 👍 and 👎 on @gemini-code-assist comments. If you're interested in giving your feedback about your experience with Gemini Code Assist for Github and other Google products, sign up here.

You can also get AI-powered code generation, chat, as well as code reviews directly in the IDE at no cost with the Gemini Code Assist IDE Extension.

Footnotes

  1. Review the Privacy Notices, Generative AI Prohibited Use Policy, Terms of Service, and learn how to configure Gemini Code Assist in GitHub here. Gemini can make mistakes, so double check it and use code with caution.

Copy link
Contributor

@gemini-code-assist gemini-code-assist bot left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Code Review

The pull request introduces a context propagation mechanism for OpenTelemetry agent execution, which is crucial for maintaining trace continuity across asynchronous operations. It provides a default no-op implementation and an OpenTelemetry-specific one, allowing for flexible integration. The changes correctly integrate the new AgentExecutionContextPropagator into the DefaultRequestHandler and include relevant integration tests. While the core functionality is well-implemented, there are several areas in the test code that could be improved for robustness, maintainability, and reliability, such as replacing Thread.sleep() with Awaitility, handling Optional values more safely, and reducing code duplication.

Comment on lines +39 to 45
tracer.spanBuilder("SimpleAgentExecutor.execute")
.setAttribute("user.input.length", userText.length())
.startSpan()
.end();
// Echo it back
String response = "Echo: " + userText;
emitter.complete(A2A.toAgentMessage(response));
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

high

The current OpenTelemetry span creation (startSpan().end()) immediately closes the span. This means it won't accurately capture the duration of the execute method or be properly parented if exceptions occur within the method. For accurate tracing and robust span management, it's best practice to use a try-with-resources block with Span.current().makeCurrent() to ensure the span encompasses the entire operation and is correctly ended, even in the presence of exceptions.

        io.opentelemetry.api.trace.Span span = tracer.spanBuilder("SimpleAgentExecutor.execute")
                .setAttribute("user.input.length", userText.length())
                .startSpan();
        try (io.opentelemetry.context.Scope scope = span.makeCurrent()) {
            // Echo it back
            String response = "Echo: " + userText;
            emitter.complete(A2A.toAgentMessage(response));
        } finally {
            span.end();
        }

.build();
client.sendMessage(testMessage);

Thread.sleep(5000);
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

high

Using Thread.sleep() in tests can lead to flakiness and unnecessarily slow down test execution. It's more robust to use a waiting mechanism like Awaitility to wait for a specific condition to be met (e.g., spans being available in the exporter) rather than relying on arbitrary delays. This improves test reliability and performance.

Suggested change
Thread.sleep(5000);
await().atMost(5, SECONDS).until(() -> !getSpans().isEmpty());

Thread.sleep(5000);

List<Map<String, Object>> spans = getSpans();
var serverSpan = spans.stream().filter(span -> SpanKind.valueOf(span.get("kind").toString()) == SpanKind.SERVER).findFirst().get();
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

high

Directly calling .get() on an Optional without first checking isPresent() can lead to a NoSuchElementException if no matching span is found. It's safer to use orElseThrow() with a descriptive message to handle the absence of a value gracefully, making the test more robust.

Suggested change
var serverSpan = spans.stream().filter(span -> SpanKind.valueOf(span.get("kind").toString()) == SpanKind.SERVER).findFirst().get();
var serverSpan = spans.stream().filter(span -> SpanKind.valueOf(span.get("kind").toString()) == SpanKind.SERVER).findFirst().orElseThrow(() -> new AssertionError("Server span not found"));


List<Map<String, Object>> spans = getSpans();
var serverSpan = spans.stream().filter(span -> SpanKind.valueOf(span.get("kind").toString()) == SpanKind.SERVER).findFirst().get();
var agentSpan = spans.stream().filter(span -> SpanKind.valueOf(span.get("kind").toString()) == SpanKind.INTERNAL).findFirst().get();
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

high

Directly calling .get() on an Optional without first checking isPresent() can lead to a NoSuchElementException if no matching span is found. It's safer to use orElseThrow() with a descriptive message to handle the absence of a value gracefully, making the test more robust.

Suggested change
var agentSpan = spans.stream().filter(span -> SpanKind.valueOf(span.get("kind").toString()) == SpanKind.INTERNAL).findFirst().get();
var agentSpan = spans.stream().filter(span -> SpanKind.valueOf(span.get("kind").toString()) == SpanKind.INTERNAL).findFirst().orElseThrow(() -> new AssertionError("Agent span not found"));

client.sendMessage(testMessage);

Thread.sleep(5000);

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

high

Using Thread.sleep() in tests can lead to flakiness and unnecessarily slow down test execution. It's more robust to use a waiting mechanism like Awaitility to wait for a specific condition to be met (e.g., spans being available in the exporter) rather than relying on arbitrary delays. This improves test reliability and performance.

Suggested change
await().atMost(5, SECONDS).until(() -> !getSpans().isEmpty());


List<Map<String, Object>> spans = getSpans();
var serverSpan = spans.stream().filter(span -> SpanKind.valueOf(span.get("kind").toString()) == SpanKind.SERVER).findFirst().get();
var agentSpan = spans.stream().filter(span -> SpanKind.valueOf(span.get("kind").toString()) == SpanKind.INTERNAL).findFirst().get();
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

high

Directly calling .get() on an Optional without first checking isPresent() can lead to a NoSuchElementException if no matching span is found. It's safer to use orElseThrow() with a descriptive message to handle the absence of a value gracefully, making the test more robust.

Suggested change
var agentSpan = spans.stream().filter(span -> SpanKind.valueOf(span.get("kind").toString()) == SpanKind.INTERNAL).findFirst().get();
var serverSpan = spans.stream().filter(span -> SpanKind.valueOf(span.get("kind").toString()) == SpanKind.SERVER).findFirst().orElseThrow(() -> new AssertionError("Server span not found"));

List<Map<String, Object>> spans = getSpans();
var serverSpan = spans.stream().filter(span -> SpanKind.valueOf(span.get("kind").toString()) == SpanKind.SERVER).findFirst().get();
var agentSpan = spans.stream().filter(span -> SpanKind.valueOf(span.get("kind").toString()) == SpanKind.INTERNAL).findFirst().get();

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

high

Directly calling .get() on an Optional without first checking isPresent() can lead to a NoSuchElementException if no matching span is found. It's safer to use orElseThrow() with a descriptive message to handle the absence of a value gracefully, making the test more robust.

Suggested change
var agentSpan = spans.stream().filter(span -> SpanKind.valueOf(span.get("kind").toString()) == SpanKind.INTERNAL).findFirst().orElseThrow(() -> new AssertionError("Agent span not found"));

Comment on lines +264 to +287
@Test
void testOpenTelemetryContextIsPropagatedWhenSendMessageStreamIsCalled() throws Exception {
Message testMessage = Message.builder()
.role(Message.Role.ROLE_USER)
.parts(Collections.singletonList(new TextPart("test message")))
.contextId("context-1234")
.messageId("message-1234")
.build();
client.sendMessage(testMessage);

Thread.sleep(5000);

List<Map<String, Object>> spans = getSpans();
var serverSpan = spans.stream().filter(span -> SpanKind.valueOf(span.get("kind").toString()) == SpanKind.SERVER).findFirst().get();
var agentSpan = spans.stream().filter(span -> SpanKind.valueOf(span.get("kind").toString()) == SpanKind.INTERNAL).findFirst().get();

String serverTraceId = serverSpan.get("traceId").toString();
String serverSpanId = serverSpan.get("spanId").toString();
String agentTraceId = agentSpan.get("traceId").toString();
String agentParentSpanId = agentSpan.get("parentSpanId").toString();

assertEquals(serverSpanId, agentParentSpanId);
assertEquals(serverTraceId, agentTraceId);
}
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

medium

The test methods testOpenTelemetryContextIsPropagatedWhenSendMessageIsCalled and testOpenTelemetryContextIsPropagatedWhenSendMessageStreamIsCalled contain significant code duplication. Extracting the common logic into a private helper method would improve maintainability, readability, and reduce the chance of errors when updating test logic.

@ApplicationScoped
@Alternative
@Priority(1)
public class OtelAgentExecutionContextPropagator implements AgentExecutionContextPropagator {
Copy link

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Do we really need this? I think we can always wrap the context if you add the opentelemetry API dependency to the project, no?

Copy link
Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

yep but because opentelemetry is provided as an extra dependency I was forced to use a SPI to implement it externally.
Adopting opentelemetry directly in the server is more a refactoring of the project that might not be suitable for a contribution

@ehsavoie
Copy link
Collaborator

ehsavoie commented Mar 4, 2026

I think (but I need to finish some other stuff to check) that this is overkill and the best bet is to produce a ManagedExecutorService instead of the default @internal ExecutorService

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment

Labels

None yet

Projects

None yet

Development

Successfully merging this pull request may close these issues.

[Feat]: Propagate open telemetry context to agent execution for consolidated tracing

3 participants