Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
60 changes: 47 additions & 13 deletions a2a/src/main/java/com/google/adk/a2a/A2ASendMessageExecutor.java
Original file line number Diff line number Diff line change
Expand Up @@ -8,11 +8,11 @@
import com.google.adk.a2a.converters.ResponseConverter;
import com.google.adk.agents.BaseAgent;
import com.google.adk.agents.RunConfig;
import com.google.adk.artifacts.InMemoryArtifactService;
import com.google.adk.artifacts.BaseArtifactService;
import com.google.adk.events.Event;
import com.google.adk.memory.InMemoryMemoryService;
import com.google.adk.memory.BaseMemoryService;
import com.google.adk.runner.Runner;
import com.google.adk.sessions.InMemorySessionService;
import com.google.adk.sessions.BaseSessionService;
import com.google.adk.sessions.Session;
import com.google.common.collect.ImmutableList;
import com.google.genai.types.Content;
Expand Down Expand Up @@ -51,29 +51,63 @@ Single<ImmutableList<Event>> execute(
String invocationId);
}

private final InMemorySessionService sessionService;
private final BaseSessionService sessionService;
private final String appName;
@Nullable private final Runner runner;
@Nullable private final Duration agentTimeout;
private static final RunConfig DEFAULT_RUN_CONFIG =
RunConfig.builder().setStreamingMode(RunConfig.StreamingMode.NONE).setMaxLlmCalls(20).build();

public A2ASendMessageExecutor(InMemorySessionService sessionService, String appName) {
public A2ASendMessageExecutor(BaseSessionService sessionService, String appName) {
this.sessionService = sessionService;
this.appName = appName;
this.runner = null;
this.agentTimeout = null;
}

public A2ASendMessageExecutor(BaseAgent agent, String appName, Duration agentTimeout) {
InMemorySessionService sessionService = new InMemorySessionService();
/**
* Creates an A2A send message executor with explicit service dependencies.
*
* <p>This constructor requires all service implementations to be provided explicitly, enabling
* flexible deployment configurations (e.g., persistent sessions, distributed artifacts).
*
* <p><strong>Note:</strong> In version 0.5.1, the constructor signature changed to require
* explicit service injection. Previously, services were created internally as in-memory
* implementations.
*
* <p><strong>For Spring Boot applications:</strong> Use {@link
* com.google.adk.webservice.A2ARemoteConfiguration} which automatically provides service beans
* with sensible defaults. Direct instantiation is typically only needed for custom frameworks or
* testing.
*
* <p>Example usage:
*
* <pre>{@code
* A2ASendMessageExecutor executor = new A2ASendMessageExecutor(
* myAgent,
* "my-app",
* Duration.ofSeconds(30),
* new InMemorySessionService(), // or DatabaseSessionService for persistence
* new InMemoryArtifactService(), // or S3ArtifactService for distributed storage
* new InMemoryMemoryService()); // or RedisMemoryService for shared state
* }</pre>
*
* @param agent the agent to execute when processing messages
* @param appName the application name used for session identification
* @param agentTimeout maximum duration to wait for agent execution before timing out
* @param sessionService service for managing conversation sessions (required, non-null)
* @param artifactService service for storing and retrieving artifacts (required, non-null)
* @param memoryService service for managing agent memory/state (required, non-null)
*/
public A2ASendMessageExecutor(
BaseAgent agent,
String appName,
Duration agentTimeout,
BaseSessionService sessionService,
BaseArtifactService artifactService,
BaseMemoryService memoryService) {
Runner runnerInstance =
new Runner(
agent,
appName,
new InMemoryArtifactService(),
sessionService,
new InMemoryMemoryService());
new Runner(agent, appName, artifactService, sessionService, memoryService);
this.sessionService = sessionService;
this.appName = appName;
this.runner = runnerInstance;
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,230 @@
package com.google.adk.a2a;

import static com.google.common.truth.Truth.assertThat;

import com.google.adk.agents.BaseAgent;
import com.google.adk.agents.InvocationContext;
import com.google.adk.artifacts.InMemoryArtifactService;
import com.google.adk.events.Event;
import com.google.adk.memory.InMemoryMemoryService;
import com.google.adk.sessions.InMemorySessionService;
import com.google.common.collect.ImmutableList;
import com.google.genai.types.Content;
import com.google.genai.types.Part;
import io.a2a.spec.Message;
import io.a2a.spec.TextPart;
import io.reactivex.rxjava3.core.Flowable;
import io.reactivex.rxjava3.core.Single;
import java.time.Duration;
import java.util.List;
import java.util.UUID;
import org.junit.Test;
import org.junit.runner.RunWith;
import org.junit.runners.JUnit4;

@RunWith(JUnit4.class)
public class A2ASendMessageExecutorAdvancedTest {

private InMemorySessionService sessionService;

@Test
public void execute_withCustomStrategy_usesStrategy() {
InMemorySessionService sessionService = new InMemorySessionService();

A2ASendMessageExecutor executor = new A2ASendMessageExecutor(sessionService, "test-app");

A2ASendMessageExecutor.AgentExecutionStrategy customStrategy =
(userId, sessionId, userContent, runConfig, invocationId) -> {
Event customEvent =
Event.builder()
.id(UUID.randomUUID().toString())
.invocationId(invocationId)
.author("agent")
.content(
Content.builder()
.role("model")
.parts(
ImmutableList.of(
Part.builder().text("Custom strategy response").build()))
.build())
.build();
return Single.just(ImmutableList.of(customEvent));
};

Message request =
new Message.Builder()
.messageId("msg-1")
.contextId("ctx-1")
.role(Message.Role.USER)
.parts(List.of(new TextPart("Test")))
.build();

Message response = executor.execute(request, customStrategy).blockingGet();

assertThat(response).isNotNull();
assertThat(response.getParts()).isNotEmpty();
assertThat(((TextPart) response.getParts().get(0)).getText())
.contains("Custom strategy response");
}

private A2ASendMessageExecutor createExecutorWithAgent() {
BaseAgent agent = createSimpleAgent();
sessionService = new InMemorySessionService();
return new A2ASendMessageExecutor(
agent,
"test-app",
Duration.ofSeconds(30),
sessionService,
new InMemoryArtifactService(),
new InMemoryMemoryService());
}

@Test
public void execute_withNullMessage_generatesDefaultContext() {
A2ASendMessageExecutor executor = createExecutorWithAgent();

Message response = executor.execute(null).blockingGet();

assertThat(response).isNotNull();
assertThat(response.getContextId()).isNotNull();
assertThat(response.getContextId()).isNotEmpty();
}

@Test
public void execute_withEmptyContextId_generatesNewContext() {
A2ASendMessageExecutor executor = createExecutorWithAgent();

Message request =
new Message.Builder()
.messageId("msg-1")
.role(Message.Role.USER)
.parts(List.of(new TextPart("Test")))
.build();

Message response = executor.execute(request).blockingGet();

assertThat(response).isNotNull();
assertThat(response.getContextId()).isNotNull();
assertThat(response.getContextId()).isNotEmpty();
}

@Test
public void execute_withProvidedContextId_preservesContext() {
A2ASendMessageExecutor executor = createExecutorWithAgent();

String contextId = "my-custom-context";
Message request =
new Message.Builder()
.messageId("msg-1")
.contextId(contextId)
.role(Message.Role.USER)
.parts(List.of(new TextPart("Test")))
.build();

Message response = executor.execute(request).blockingGet();

assertThat(response).isNotNull();
assertThat(response.getContextId()).isEqualTo(contextId);
}

@Test
public void execute_multipleRequests_maintainsSession() {
A2ASendMessageExecutor executor = createExecutorWithAgent();

String contextId = "persistent-context";

Message request1 =
new Message.Builder()
.messageId("msg-1")
.contextId(contextId)
.role(Message.Role.USER)
.parts(List.of(new TextPart("First message")))
.build();

Message response1 = executor.execute(request1).blockingGet();
assertThat(response1.getContextId()).isEqualTo(contextId);

Message request2 =
new Message.Builder()
.messageId("msg-2")
.contextId(contextId)
.role(Message.Role.USER)
.parts(List.of(new TextPart("Second message")))
.build();

Message response2 = executor.execute(request2).blockingGet();
assertThat(response2.getContextId()).isEqualTo(contextId);
}

@Test
public void execute_withoutRunnerConfig_throwsException() {
InMemorySessionService sessionService = new InMemorySessionService();

A2ASendMessageExecutor executor = new A2ASendMessageExecutor(sessionService, "test-app");

Message request =
new Message.Builder()
.messageId("msg-1")
.contextId("ctx-1")
.role(Message.Role.USER)
.parts(List.of(new TextPart("Test")))
.build();

try {
executor.execute(request).blockingGet();
assertThat(false).isTrue();
} catch (IllegalStateException e) {
assertThat(e.getMessage()).contains("Runner-based handle invoked without configured runner");
}
}

@Test
public void execute_errorInStrategy_returnsErrorResponse() {
InMemorySessionService sessionService = new InMemorySessionService();

A2ASendMessageExecutor executor = new A2ASendMessageExecutor(sessionService, "test-app");

A2ASendMessageExecutor.AgentExecutionStrategy failingStrategy =
(userId, sessionId, userContent, runConfig, invocationId) -> {
return Single.error(new RuntimeException("Strategy failed"));
};

Message request =
new Message.Builder()
.messageId("msg-1")
.contextId("ctx-1")
.role(Message.Role.USER)
.parts(List.of(new TextPart("Test")))
.build();

Message response = executor.execute(request, failingStrategy).blockingGet();

assertThat(response).isNotNull();
assertThat(response.getParts()).isNotEmpty();
assertThat(((TextPart) response.getParts().get(0)).getText()).contains("Error:");
assertThat(((TextPart) response.getParts().get(0)).getText()).contains("Strategy failed");
}

private BaseAgent createSimpleAgent() {
return new BaseAgent("test", "test agent", ImmutableList.of(), null, null) {
@Override
protected Flowable<Event> runAsyncImpl(InvocationContext ctx) {
return Flowable.just(
Event.builder()
.content(
Content.builder()
.role("model")
.parts(
ImmutableList.of(
com.google.genai.types.Part.builder().text("Response").build()))
.build())
.build());
}

@Override
protected Flowable<Event> runLiveImpl(InvocationContext ctx) {
return Flowable.empty();
}
};
}
}
Loading