From 7792233832e95dfe1ae93b04d91bd7507c37cc8d Mon Sep 17 00:00:00 2001 From: Google Team Member Date: Thu, 15 Jan 2026 02:14:55 -0800 Subject: [PATCH] feat: refactor remote A2A agent to use A2A SDK client PiperOrigin-RevId: 856564808 --- .../java/com/google/adk/a2a/A2AClient.java | 136 --------- .../com/google/adk/a2a/RemoteA2AAgent.java | 261 ++++++++---------- .../adk/a2a/converters/EventConverter.java | 33 --- contrib/samples/a2a_basic/A2AAgent.java | 45 +-- contrib/samples/a2a_basic/A2AAgentRun.java | 78 +++--- 5 files changed, 171 insertions(+), 382 deletions(-) delete mode 100644 a2a/src/main/java/com/google/adk/a2a/A2AClient.java diff --git a/a2a/src/main/java/com/google/adk/a2a/A2AClient.java b/a2a/src/main/java/com/google/adk/a2a/A2AClient.java deleted file mode 100644 index 8bd87d4d1..000000000 --- a/a2a/src/main/java/com/google/adk/a2a/A2AClient.java +++ /dev/null @@ -1,136 +0,0 @@ -package com.google.adk.a2a; - -import com.fasterxml.jackson.annotation.JsonAutoDetect; -import com.fasterxml.jackson.annotation.PropertyAccessor; -import com.fasterxml.jackson.core.JsonProcessingException; -import com.fasterxml.jackson.databind.ObjectMapper; -import com.fasterxml.jackson.databind.json.JsonMapper; -import com.google.common.base.Preconditions; -import io.a2a.client.http.A2AHttpClient; -import io.a2a.client.http.A2AHttpResponse; -import io.a2a.client.http.JdkA2AHttpClient; -import io.a2a.spec.AgentCard; -import io.a2a.spec.SendMessageRequest; -import io.a2a.spec.SendMessageResponse; -import io.reactivex.rxjava3.core.Flowable; -import java.io.IOException; -import java.net.MalformedURLException; -import java.net.URI; -import java.net.URL; -import java.util.Map; -import org.jspecify.annotations.Nullable; -import org.slf4j.Logger; -import org.slf4j.LoggerFactory; - -/** - * A thin HTTP client for interacting with an A2A-compliant agent endpoint. - * - *

**EXPERIMENTAL:** Subject to change, rename, or removal in any future patch release. Do not - * use in production code. - */ -public final class A2AClient { - - private static final Logger logger = LoggerFactory.getLogger(A2AClient.class); - private static final String JSON_CONTENT_TYPE = "application/json"; - private static final String DEFAULT_SEND_PATH = "/v1/message:send"; - - private final AgentCard agentCard; - private final A2AHttpClient httpClient; - private final ObjectMapper objectMapper; - private final Map defaultHeaders; - - public A2AClient(AgentCard agentCard) { - this(agentCard, new JdkA2AHttpClient(), Map.of()); - } - - public A2AClient( - AgentCard agentCard, A2AHttpClient httpClient, Map defaultHeaders) { - this.agentCard = Preconditions.checkNotNull(agentCard, "agentCard"); - this.httpClient = Preconditions.checkNotNull(httpClient, "httpClient"); - this.objectMapper = - JsonMapper.builder() - .findAndAddModules() - .visibility(PropertyAccessor.FIELD, JsonAutoDetect.Visibility.ANY) - .build(); - this.defaultHeaders = defaultHeaders == null ? Map.of() : Map.copyOf(defaultHeaders); - } - - public AgentCard getAgentCard() { - return agentCard; - } - - public String getUrl() { - return agentCard.url(); - } - - /** - * Sends a JSON-RPC message to the remote A2A agent and converts the response into the canonical - * {@link SendMessageResponse} model. - */ - public Flowable sendMessage(SendMessageRequest request) { - return Flowable.fromCallable(() -> executeSendMessage(request)); - } - - private SendMessageResponse executeSendMessage(SendMessageRequest request) throws IOException { - Preconditions.checkNotNull(request, "request"); - String payload = serializeRequest(request); - String endpoint = resolveSendMessageEndpoint(agentCard.url()); - - A2AHttpClient.PostBuilder builder = - httpClient.createPost().url(endpoint).addHeader("Content-Type", JSON_CONTENT_TYPE); - defaultHeaders.forEach(builder::addHeader); - builder.body(payload); - - A2AHttpResponse response; - try { - response = builder.post(); - } catch (InterruptedException e) { - Thread.currentThread().interrupt(); - throw new IOException("Interrupted while sending A2A sendMessage request", e); - } - - if (!response.success()) { - String responseBody = response.body(); - logger.warn( - "A2A sendMessage request failed with status {} and body {}", - response.status(), - responseBody); - throw new IOException("A2A sendMessage request failed with HTTP status " + response.status()); - } - - return deserializeResponse(response.body()); - } - - private String serializeRequest(SendMessageRequest request) throws JsonProcessingException { - return objectMapper.writeValueAsString(request); - } - - private SendMessageResponse deserializeResponse(String body) throws JsonProcessingException { - return objectMapper.readValue(body, SendMessageResponse.class); - } - - private static String resolveSendMessageEndpoint(String baseUrl) { - if (baseUrl == null || baseUrl.isEmpty()) { - throw new IllegalArgumentException("Agent card URL cannot be null or empty"); - } - if (baseUrl.endsWith("/")) { - return baseUrl.substring(0, baseUrl.length() - 1) + DEFAULT_SEND_PATH; - } - return baseUrl + DEFAULT_SEND_PATH; - } - - public static @Nullable String extractHostAndPort(String urlString) { - try { - URL url = URI.create(urlString).toURL(); - String host = url.getHost(); - int port = url.getPort(); - if (port != -1) { - return host + ":" + port; - } - return host; - } catch (MalformedURLException | IllegalArgumentException e) { - logger.warn("Invalid URL when extracting host and port", e); - return null; - } - } -} diff --git a/a2a/src/main/java/com/google/adk/a2a/RemoteA2AAgent.java b/a2a/src/main/java/com/google/adk/a2a/RemoteA2AAgent.java index 0701d7422..5e6e341d7 100644 --- a/a2a/src/main/java/com/google/adk/a2a/RemoteA2AAgent.java +++ b/a2a/src/main/java/com/google/adk/a2a/RemoteA2AAgent.java @@ -1,28 +1,31 @@ package com.google.adk.a2a; -import static com.google.common.base.Strings.isNullOrEmpty; +import static com.google.common.base.Strings.nullToEmpty; -import com.fasterxml.jackson.databind.ObjectMapper; +import com.google.adk.a2a.common.A2AClientError; import com.google.adk.a2a.converters.EventConverter; import com.google.adk.a2a.converters.ResponseConverter; import com.google.adk.agents.BaseAgent; import com.google.adk.agents.Callbacks; import com.google.adk.agents.InvocationContext; import com.google.adk.events.Event; -import com.google.common.collect.ImmutableMap; -import com.google.common.io.Resources; +import com.google.common.collect.ImmutableList; import com.google.errorprone.annotations.CanIgnoreReturnValue; +import io.a2a.client.Client; +import io.a2a.client.ClientEvent; +import io.a2a.client.TaskEvent; +import io.a2a.client.TaskUpdateEvent; +import io.a2a.spec.A2AClientException; import io.a2a.spec.AgentCard; import io.a2a.spec.Message; -import io.a2a.spec.MessageSendParams; -import io.a2a.spec.SendMessageRequest; +import io.a2a.spec.TaskState; +import io.reactivex.rxjava3.core.BackpressureStrategy; import io.reactivex.rxjava3.core.Flowable; -import java.io.IOException; -import java.net.URL; +import io.reactivex.rxjava3.core.FlowableEmitter; import java.util.List; -import java.util.Map; import java.util.Optional; -import org.jspecify.annotations.Nullable; +import java.util.concurrent.atomic.AtomicBoolean; +import java.util.function.BiConsumer; import org.slf4j.Logger; import org.slf4j.LoggerFactory; @@ -52,21 +55,10 @@ public class RemoteA2AAgent extends BaseAgent { private static final Logger logger = LoggerFactory.getLogger(RemoteA2AAgent.class); - private Optional agentCard; - private final Optional agentCardSource; - private Optional a2aClient; - private Optional rpcUrl = Optional.empty(); + private final AgentCard agentCard; + private final Client a2aClient; private String description; - private boolean isResolved = false; - - public RemoteA2AAgent() { - // Initialize with empty values - will be configured later - super("", "", null, null, null); - this.agentCard = Optional.empty(); - this.agentCardSource = Optional.empty(); - this.a2aClient = Optional.empty(); - this.description = ""; - } + private final boolean streaming; // Internal constructor used by builder private RemoteA2AAgent(Builder builder) { @@ -77,32 +69,29 @@ private RemoteA2AAgent(Builder builder) { builder.beforeAgentCallback, builder.afterAgentCallback); - if (builder.agentCardOrSource == null) { - throw new IllegalArgumentException("agentCardOrSource cannot be null"); + if (builder.a2aClient == null) { + throw new IllegalArgumentException("a2aClient cannot be null"); } - if (builder.agentCardOrSource instanceof AgentCard) { - this.agentCard = Optional.of((AgentCard) builder.agentCardOrSource); - this.agentCardSource = Optional.empty(); - this.description = builder.description; - // If builder description is empty, use the one from AgentCard - if (this.description.isEmpty() && this.agentCard.get().description() != null) { - this.description = this.agentCard.get().description(); - } - } else if (builder.agentCardOrSource instanceof String) { - this.agentCard = Optional.empty(); - String source = (String) builder.agentCardOrSource; - if (source.trim().isEmpty()) { - throw new IllegalArgumentException("agentCard string cannot be empty"); - } - this.agentCardSource = Optional.of(source.trim()); + this.a2aClient = builder.a2aClient; + if (builder.agentCard != null) { + this.agentCard = builder.agentCard; } else { - throw new TypeError( - "agentCard must be AgentCard, URL string, or file path string, got " - + builder.agentCardOrSource.getClass()); + try { + this.agentCard = this.a2aClient.getAgentCard(); + } catch (A2AClientException e) { + throw new AgentCardResolutionError("Failed to resolve agent card", e); + } } - - this.a2aClient = builder.a2aClient; + if (this.agentCard == null) { + throw new IllegalArgumentException("agentCard cannot be null"); + } + this.description = nullToEmpty(builder.description); + // If builder description is empty, use the one from AgentCard + if (this.description.isEmpty() && this.agentCard.description() != null) { + this.description = this.agentCard.description(); + } + this.streaming = this.agentCard.capabilities().streaming(); } public static Builder builder() { @@ -112,9 +101,9 @@ public static Builder builder() { /** Builder for {@link RemoteA2AAgent}. */ public static class Builder { private String name; - private Object agentCardOrSource; + private AgentCard agentCard; + private Client a2aClient; private String description = ""; - private Optional a2aClient = Optional.empty(); private List subAgents; private List beforeAgentCallback; private List afterAgentCallback; @@ -126,8 +115,8 @@ public Builder name(String name) { } @CanIgnoreReturnValue - public Builder agentCardOrSource(Object agentCardOrSource) { - this.agentCardOrSource = agentCardOrSource; + public Builder agentCard(AgentCard agentCard) { + this.agentCard = agentCard; return this; } @@ -137,12 +126,6 @@ public Builder description(String description) { return this; } - @CanIgnoreReturnValue - public Builder a2aClient(@Nullable A2AClient a2aClient) { - this.a2aClient = Optional.ofNullable(a2aClient); - return this; - } - @CanIgnoreReturnValue public Builder subAgents(List subAgents) { this.subAgents = subAgents; @@ -161,73 +144,19 @@ public Builder afterAgentCallback(List afterAgentC return this; } - public RemoteA2AAgent build() { - return new RemoteA2AAgent(this); - } - } - - public Optional rpcUrl() { - return rpcUrl; - } - - private void ensureResolved() { - // This method is similar to getClientFromAgentCardUrl in the A2A Java SDK. It is called at - // runtime not constructor time. - if (isResolved) { - return; - } - - try { - // Resolve agent card if needed - if (agentCard.isEmpty()) { - if (agentCardSource.isPresent()) { - String source = agentCardSource.get(); - this.agentCard = Optional.of(resolveAgentCard(source)); - } else { - // This case should not happen based on constructor logic - } - } - - // Set RPC URL - this.rpcUrl = Optional.of(this.agentCard.get().url()); - - // Update description if empty - if (this.description == null && this.agentCard.get().description() != null) { - this.description = this.agentCard.get().description(); - } - - if (this.a2aClient.isEmpty() && this.agentCard.isPresent()) { - this.a2aClient = Optional.of(new A2AClient(this.agentCard.get())); - } - this.isResolved = true; - - } catch (Exception e) { - throw new AgentCardResolutionError( - "Failed to initialize remote A2A agent " + name() + ": " + e, e); + @CanIgnoreReturnValue + public Builder a2aClient(Client a2aClient) { + this.a2aClient = a2aClient; + return this; } - } - private AgentCard resolveAgentCard(String source) throws IOException { - ObjectMapper objectMapper = new ObjectMapper(); - try { - URL resourceUrl = Resources.getResource(source); - agentCard = Optional.of(objectMapper.readValue(resourceUrl, AgentCard.class)); - return agentCard.get(); - } catch (IllegalArgumentException e) { - throw new IOException( - "Failed to find AgentCard resource: " - + source - + ". Check if the resource exists and is included in the build.", - e); - } catch (Exception e) { - throw new IOException("Failed to load AgentCard from resource: " + source, e); + public RemoteA2AAgent build() { + return new RemoteA2AAgent(this); } } @Override protected Flowable runAsyncImpl(InvocationContext invocationContext) { - ensureResolved(); - // Construct A2A Message from the last ADK event List sessionEvents = invocationContext.session().events(); @@ -244,51 +173,77 @@ protected Flowable runAsyncImpl(InvocationContext invocationContext) { } Message originalMessage = a2aMessageOpt.get(); - String sessionId = invocationContext.session().id(); - String inboundContextId = originalMessage.getContextId(); - if (!isNullOrEmpty(inboundContextId) && !sessionId.equals(inboundContextId)) { - logger.warn("Inbound context id differs from active session; using session id instead."); + return Flowable.create( + emitter -> { + FlowableEmitter flowableEmitter = emitter.serialize(); + AtomicBoolean done = new AtomicBoolean(false); + ImmutableList> consumers = + ImmutableList.of( + (event, unused) -> + handleClientEvent(event, flowableEmitter, invocationContext, done)); + a2aClient.sendMessage( + originalMessage, consumers, e -> handleClientError(e, flowableEmitter, done), null); + }, + BackpressureStrategy.BUFFER); + } + + private void handleClientError(Throwable e, FlowableEmitter emitter, AtomicBoolean done) { + // Mark the flow as done if it is already cancelled. + done.compareAndSet(false, emitter.isCancelled()); + + // If the flow is already done, stop processing and exit the consumer. + if (done.get()) { + return; + } + // If the error is raised, complete the flow with an error. + if (!done.getAndSet(true)) { + emitter.tryOnError(new A2AClientError("Failed to communicate with the remote agent", e)); + } + } + + private void handleClientEvent( + ClientEvent clientEvent, + FlowableEmitter emitter, + InvocationContext invocationContext, + AtomicBoolean done) { + // Mark the flow as done if it is already cancelled. + done.compareAndSet(false, emitter.isCancelled()); + + // If the flow is already done, stop processing and exit the consumer. + if (done.get()) { + return; } - Message a2aMessage = new Message.Builder(originalMessage).contextId(sessionId).build(); - - Map metadata = - originalMessage.getMetadata() == null ? ImmutableMap.of() : originalMessage.getMetadata(); - - MessageSendParams params = new MessageSendParams(a2aMessage, null, metadata); - SendMessageRequest request = new SendMessageRequest(invocationContext.invocationId(), params); - - return a2aClient - .get() - .sendMessage(request) - .flatMap( - response -> { - List events = - ResponseConverter.sendMessageResponseToEvents( - response, - invocationContext.invocationId(), - invocationContext.branch().orElse(null)); - - if (events.isEmpty()) { - logger.warn("No events converted from A2A response"); - // Return a default event to indicate the agent executed - return Flowable.just( - Event.builder() - .author(name()) - .invocationId(invocationContext.invocationId()) - .branch(invocationContext.branch().orElse(null)) - .build()); - } - - return Flowable.fromIterable(events); - }); + Optional event = ResponseConverter.clientEventToEvent(clientEvent, invocationContext); + if (event.isPresent()) { + emitter.onNext(event.get()); + } + + // For non-streaming communication, complete the flow; for streaming, wait until the client + // marks the completion. + if (isCompleted(clientEvent) || !streaming) { + // Only complete the flow once. + if (!done.getAndSet(true)) { + emitter.onComplete(); + } + } + } + + private static boolean isCompleted(ClientEvent event) { + TaskState executionState = TaskState.UNKNOWN; + if (event instanceof TaskEvent taskEvent) { + executionState = taskEvent.getTask().getStatus().state(); + } else if (event instanceof TaskUpdateEvent updateEvent) { + executionState = updateEvent.getTask().getStatus().state(); + } + return executionState.equals(TaskState.COMPLETED); } @Override protected Flowable runLiveImpl(InvocationContext invocationContext) { throw new UnsupportedOperationException( - "_run_live_impl for " + getClass() + " via A2A is not implemented."); + "runLiveImpl for " + getClass() + " via A2A is not implemented."); } /** Exception thrown when the agent card cannot be resolved. */ diff --git a/a2a/src/main/java/com/google/adk/a2a/converters/EventConverter.java b/a2a/src/main/java/com/google/adk/a2a/converters/EventConverter.java index d112fccce..cd8bcefb0 100644 --- a/a2a/src/main/java/com/google/adk/a2a/converters/EventConverter.java +++ b/a2a/src/main/java/com/google/adk/a2a/converters/EventConverter.java @@ -37,39 +37,6 @@ public enum AggregationMode { EXTERNAL_HANDOFF } - public static Optional convertEventToA2AMessage(Event event) { - if (event == null) { - logger.warn("Cannot convert null event to A2A message."); - return Optional.empty(); - } - - List> a2aParts = new ArrayList<>(); - Optional contentOpt = event.content(); - - if (contentOpt.isPresent() && contentOpt.get().parts().isPresent()) { - for (Part part : contentOpt.get().parts().get()) { - PartConverter.fromGenaiPart(part).ifPresent(a2aParts::add); - } - } - - if (a2aParts.isEmpty()) { - logger.warn("No convertible content found in event."); - return Optional.empty(); - } - - Message.Builder builder = - new Message.Builder() - .messageId(event.id() != null ? event.id() : UUID.randomUUID().toString()) - .parts(a2aParts) - .role(event.author().equals("user") ? Message.Role.USER : Message.Role.AGENT); - event - .content() - .flatMap(Content::role) - .ifPresent( - role -> builder.role(role.equals("user") ? Message.Role.USER : Message.Role.AGENT)); - return Optional.of(builder.build()); - } - public static Optional convertEventsToA2AMessage(InvocationContext context) { return convertEventsToA2AMessage(context, AggregationMode.AS_IS); } diff --git a/contrib/samples/a2a_basic/A2AAgent.java b/contrib/samples/a2a_basic/A2AAgent.java index 1c684e188..fa4932cf9 100644 --- a/contrib/samples/a2a_basic/A2AAgent.java +++ b/contrib/samples/a2a_basic/A2AAgent.java @@ -1,6 +1,8 @@ package com.example.a2a_basic; -import com.google.adk.a2a.A2AClient; +import java.util.ArrayList; +import java.util.Random; + import com.google.adk.a2a.RemoteA2AAgent; import com.google.adk.agents.BaseAgent; import com.google.adk.agents.LlmAgent; @@ -8,12 +10,14 @@ import com.google.adk.tools.ToolContext; import com.google.common.collect.ImmutableList; import com.google.common.collect.ImmutableMap; + +import io.a2a.client.Client; +import io.a2a.client.config.ClientConfig; +import io.a2a.client.http.A2ACardResolver; import io.a2a.client.http.JdkA2AHttpClient; -import io.a2a.spec.AgentCapabilities; +import io.a2a.client.transport.jsonrpc.JSONRPCTransport; +import io.a2a.client.transport.jsonrpc.JSONRPCTransportConfig; import io.a2a.spec.AgentCard; -import java.util.ArrayList; -import java.util.List; -import java.util.Random; /** Provides local roll logic plus a remote A2A agent for the demo. */ public final class A2AAgent { @@ -60,24 +64,23 @@ public static LlmAgent createRootAgent(String primeAgentBaseUrl) { } private static BaseAgent createRemoteAgent(String primeAgentBaseUrl) { - AgentCapabilities capabilities = new AgentCapabilities.Builder().build(); - AgentCard agentCard = - new AgentCard.Builder() - .name("prime_agent") - .description("Stub agent metadata used for third-party A2A demo") - .url(primeAgentBaseUrl) - .version("1.0.0") - .capabilities(capabilities) - .defaultInputModes(List.of("text")) - .defaultOutputModes(List.of("text")) - .skills(List.of()) - .security(List.of()) + String agentCardUrl = primeAgentBaseUrl + "/.well-known/agent-card.json"; + AgentCard publicAgentCard = + new A2ACardResolver(new JdkA2AHttpClient(), primeAgentBaseUrl, agentCardUrl).getAgentCard(); + + Client a2aClient = + Client.builder(publicAgentCard) + .withTransport(JSONRPCTransport.class, new JSONRPCTransportConfig()) + .clientConfig( + new ClientConfig.Builder() + .setStreaming(publicAgentCard.capabilities().streaming()) + .build()) .build(); - A2AClient client = new A2AClient(agentCard, new JdkA2AHttpClient(), /* defaultHeaders= */ null); + return RemoteA2AAgent.builder() - .name(agentCard.name()) - .agentCardOrSource(agentCard) - .a2aClient(client) + .name(publicAgentCard.name()) + .a2aClient(a2aClient) + .agentCard(publicAgentCard) .build(); } diff --git a/contrib/samples/a2a_basic/A2AAgentRun.java b/contrib/samples/a2a_basic/A2AAgentRun.java index 406a8dcbc..12d515f62 100644 --- a/contrib/samples/a2a_basic/A2AAgentRun.java +++ b/contrib/samples/a2a_basic/A2AAgentRun.java @@ -1,21 +1,22 @@ package com.example.a2a_basic; +import java.util.List; +import java.util.UUID; +import java.util.concurrent.ConcurrentHashMap; +import java.util.concurrent.ConcurrentMap; +import java.util.concurrent.TimeUnit; + import com.google.adk.agents.BaseAgent; -import com.google.adk.agents.LlmAgent; import com.google.adk.agents.RunConfig; import com.google.adk.artifacts.InMemoryArtifactService; import com.google.adk.events.Event; import com.google.adk.runner.Runner; import com.google.adk.sessions.InMemorySessionService; import com.google.common.collect.ImmutableList; -import com.google.common.collect.Lists; import com.google.genai.types.Content; import com.google.genai.types.Part; + import io.reactivex.rxjava3.core.Flowable; -import java.util.List; -import java.util.UUID; -import java.util.concurrent.ConcurrentHashMap; -import java.util.concurrent.ConcurrentMap; /** Main class to demonstrate running the A2A agent with sequential inputs. */ public final class A2AAgentRun { @@ -38,7 +39,7 @@ public A2AAgentRun(BaseAgent agent) { sessionService.createSession(appName, userId, initialState, sessionId).blockingGet(); } - private List run(String prompt) { + private Flowable run(String prompt) { System.out.println("\n--------------------------------------------------"); System.out.println("You> " + prompt); Content userMessage = @@ -49,45 +50,44 @@ private List run(String prompt) { return processRunRequest(userMessage); } - private List processRunRequest(Content inputContent) { + private Flowable processRunRequest(Content inputContent) { RunConfig runConfig = RunConfig.builder().build(); - Flowable eventStream = - this.runner.runAsync(this.userId, this.sessionId, inputContent, runConfig); - List agentEvents = Lists.newArrayList(eventStream.blockingIterable()); - System.out.println("Agent>"); - for (Event event : agentEvents) { - if (event.content().isPresent() && event.content().get().parts().isPresent()) { - event - .content() - .get() - .parts() - .get() - .forEach( - part -> { - if (part.text().isPresent()) { - System.out.println(" Text: " + part.text().get().stripTrailing()); - } - }); - } - if (event.actions() != null && event.actions().transferToAgent().isPresent()) { - System.out.println(" Actions: transferTo=" + event.actions().transferToAgent().get()); - } - System.out.println(" Raw Event: " + event); + return this.runner.runAsync(this.userId, this.sessionId, inputContent, runConfig); + } + + private static void printOutEvent(Event event) { + if (event.content().isPresent() && event.content().get().parts().isPresent()) { + event + .content() + .get() + .parts() + .get() + .forEach( + part -> { + if (part.text().isPresent()) { + System.out.println(" Text: " + part.text().get().stripTrailing()); + } + }); } - return agentEvents; + if (event.actions() != null && event.actions().transferToAgent().isPresent()) { + System.out.println(" Actions: transferTo=" + event.actions().transferToAgent().get()); + } + System.out.println(" Raw Event: " + event); } public static void main(String[] args) { - String primeAgentUrl = args.length > 0 ? args[0] : "http://localhost:9876/a2a/prime_agent"; - LlmAgent agent = A2AAgent.createRootAgent(primeAgentUrl); + String primeAgentUrl = args.length > 0 ? args[0] : "http://localhost:8081/a2a/remote/v1"; + BaseAgent agent = A2AAgent.createRootAgent(primeAgentUrl); A2AAgentRun a2aRun = new A2AAgentRun(agent); - // First user input - System.out.println("Running turn 1"); - a2aRun.run("Roll a dice of 6 sides."); + List events = + a2aRun.run("Roll a dice of 6 sides.").toList().timeout(90, TimeUnit.SECONDS).blockingGet(); + + events.forEach(A2AAgentRun::printOutEvent); + + events = + a2aRun.run("Is this a prime number?").toList().timeout(90, TimeUnit.SECONDS).blockingGet(); - // Follow-up input triggers the remote prime agent so the A2A request is logged. - System.out.println("Running turn 2"); - a2aRun.run("Is this number a prime number?"); + events.forEach(A2AAgentRun::printOutEvent); } }