diff --git a/impl/a2a/pom.xml b/impl/a2a/pom.xml
new file mode 100644
index 000000000..91a8fac99
--- /dev/null
+++ b/impl/a2a/pom.xml
@@ -0,0 +1,30 @@
+
+ 4.0.0
+
+ io.serverlessworkflow
+ serverlessworkflow-impl
+ 8.0.0-SNAPSHOT
+
+ serverlessworkflow-impl-a2a
+ Serverless Workflow :: Impl :: A2A
+
+
+ io.serverlessworkflow
+ serverlessworkflow-impl-core
+
+
+ org.a2aproject.sdk
+ a2a-java-sdk-client
+
+
+ org.junit.jupiter
+ junit-jupiter-engine
+ test
+
+
+ org.junit.jupiter
+ junit-jupiter-params
+ test
+
+
+
diff --git a/impl/a2a/src/main/java/io/serverlessworkflow/impl/executors/a2a/A2AExceptionHandler.java b/impl/a2a/src/main/java/io/serverlessworkflow/impl/executors/a2a/A2AExceptionHandler.java
new file mode 100644
index 000000000..2ccdcdda2
--- /dev/null
+++ b/impl/a2a/src/main/java/io/serverlessworkflow/impl/executors/a2a/A2AExceptionHandler.java
@@ -0,0 +1,37 @@
+/*
+ * Copyright 2020-Present The Serverless Workflow Specification Authors
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package io.serverlessworkflow.impl.executors.a2a;
+
+import io.serverlessworkflow.impl.WorkflowModel;
+import io.serverlessworkflow.impl.WorkflowPosition;
+import java.util.concurrent.CompletableFuture;
+import java.util.function.Consumer;
+
+class A2AExceptionHandler implements Consumer {
+
+ private final CompletableFuture future;
+ private final WorkflowPosition position;
+
+ A2AExceptionHandler(CompletableFuture future, WorkflowPosition position) {
+ this.future = future;
+ this.position = position;
+ }
+
+ @Override
+ public void accept(Throwable ex) {
+ future.completeExceptionally(A2AUtils.workflowException(position, ex));
+ }
+}
diff --git a/impl/a2a/src/main/java/io/serverlessworkflow/impl/executors/a2a/A2AExecutor.java b/impl/a2a/src/main/java/io/serverlessworkflow/impl/executors/a2a/A2AExecutor.java
new file mode 100644
index 000000000..1727cb059
--- /dev/null
+++ b/impl/a2a/src/main/java/io/serverlessworkflow/impl/executors/a2a/A2AExecutor.java
@@ -0,0 +1,91 @@
+/*
+ * Copyright 2020-Present The Serverless Workflow Specification Authors
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package io.serverlessworkflow.impl.executors.a2a;
+
+import io.serverlessworkflow.impl.TaskContext;
+import io.serverlessworkflow.impl.WorkflowContext;
+import io.serverlessworkflow.impl.WorkflowModel;
+import io.serverlessworkflow.impl.WorkflowValueResolver;
+import io.serverlessworkflow.impl.executors.CallableTask;
+import java.net.URI;
+import java.util.Map;
+import java.util.Optional;
+import java.util.concurrent.CompletableFuture;
+import org.a2aproject.sdk.client.Client;
+import org.a2aproject.sdk.client.config.ClientConfig;
+import org.a2aproject.sdk.client.http.A2ACardResolver;
+import org.a2aproject.sdk.client.transport.jsonrpc.JSONRPCTransport;
+import org.a2aproject.sdk.client.transport.jsonrpc.JSONRPCTransportConfig;
+import org.a2aproject.sdk.spec.A2AClientException;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+class A2AExecutor implements CallableTask {
+
+ private final WorkflowValueResolver uriSupplier;
+ private final A2ARequestDispatcher dispatcher;
+ private final Optional>> mapResolver;
+
+ private static final Logger logger = LoggerFactory.getLogger(A2AExecutor.class);
+
+ public A2AExecutor(
+ WorkflowValueResolver uriSupplier,
+ A2ARequestDispatcher dispatcher,
+ Optional>> mapResolver) {
+ this.uriSupplier = uriSupplier;
+ this.dispatcher = dispatcher;
+ this.mapResolver = mapResolver;
+ }
+
+ @Override
+ public CompletableFuture apply(
+ WorkflowContext workflowContext, TaskContext taskContext, WorkflowModel input) {
+ URI uri = uriSupplier.apply(workflowContext, taskContext, input);
+
+ return CompletableFuture.supplyAsync(
+ () -> {
+ try {
+ return A2ACardResolver.builder()
+ .baseUrl(uri.resolve("/").toString())
+ .agentCardPath(uri.getPath())
+ .build()
+ .getAgentCard();
+ } catch (A2AClientException ex) {
+ throw A2AUtils.workflowException(taskContext.position(), ex);
+ }
+ },
+ workflowContext.definition().application().executorService())
+ .thenCompose(
+ agentCard -> {
+ logger.debug("Agent card is {}", agentCard);
+ try {
+ return dispatcher.apply(
+ agentCard,
+ Client.builder(agentCard)
+ .clientConfig(new ClientConfig.Builder().build())
+ .withTransport(JSONRPCTransport.class, new JSONRPCTransportConfig())
+ .build(),
+ mapResolver
+ .map(m -> m.apply(workflowContext, taskContext, input))
+ .orElse(Map.of()),
+ workflowContext,
+ taskContext);
+ } catch (A2AClientException ex) {
+ throw A2AUtils.workflowException(taskContext.position(), ex);
+ }
+ });
+ }
+}
diff --git a/impl/a2a/src/main/java/io/serverlessworkflow/impl/executors/a2a/A2AExecutorBuilder.java b/impl/a2a/src/main/java/io/serverlessworkflow/impl/executors/a2a/A2AExecutorBuilder.java
new file mode 100644
index 000000000..b49e37e88
--- /dev/null
+++ b/impl/a2a/src/main/java/io/serverlessworkflow/impl/executors/a2a/A2AExecutorBuilder.java
@@ -0,0 +1,93 @@
+/*
+ * Copyright 2020-Present The Serverless Workflow Specification Authors
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package io.serverlessworkflow.impl.executors.a2a;
+
+import io.serverlessworkflow.api.types.A2AArguments;
+import io.serverlessworkflow.api.types.CallA2A;
+import io.serverlessworkflow.api.types.Parameters;
+import io.serverlessworkflow.api.types.TaskBase;
+import io.serverlessworkflow.api.types.WithA2AParameters;
+import io.serverlessworkflow.impl.WorkflowDefinition;
+import io.serverlessworkflow.impl.WorkflowMutablePosition;
+import io.serverlessworkflow.impl.WorkflowUtils;
+import io.serverlessworkflow.impl.WorkflowValueResolver;
+import io.serverlessworkflow.impl.executors.CallableTaskBuilder;
+import io.serverlessworkflow.impl.executors.CallableTaskFactory;
+import java.net.URI;
+import java.util.Map;
+import java.util.Optional;
+
+public class A2AExecutorBuilder implements CallableTaskBuilder {
+
+ @Override
+ public boolean accept(Class extends TaskBase> clazz) {
+ return CallA2A.class.equals(clazz);
+ }
+
+ @Override
+ public CallableTaskFactory init(
+ CallA2A task, WorkflowDefinition definition, WorkflowMutablePosition position) {
+ A2AArguments args = task.getWith();
+
+ WorkflowValueResolver uriSupplier;
+ if (args.getServer() != null) {
+ uriSupplier = definition.resourceLoader().uriSupplier(args.getServer());
+ } else if (args.getAgentCard() != null) {
+ uriSupplier = definition.resourceLoader().uriSupplier(args.getAgentCard().getEndpoint());
+ } else {
+ throw new IllegalArgumentException("Neither server nor agent card is set for task: " + task);
+ }
+
+ A2ARequestDispatcher dispatcher =
+ switch (args.getMethod()) {
+ case MESSAGE_SEND ->
+ new MessageDispatcher(
+ (workflowContext, taskContext, completableFuture) ->
+ new MessageSendConsumer(workflowContext.definition(), completableFuture));
+ case MESSAGE_STREAM ->
+ new MessageDispatcher(
+ (workflowContext, taskContext, completableFuture) ->
+ new MessageStreamConsumer(
+ workflowContext.definition(), completableFuture, taskContext.position()));
+ case TASKS_LIST -> new ListTaskDispatcher();
+ case TASKS_GET -> new GetTaskDispatcher();
+ case TASKS_CANCEL -> new CancelTaskDispatcher();
+ // TODO handle missing cases
+ case AGENT_GET_AUTHENTICATED_EXTENDED_CARD,
+ TASKS_PUSH_NOTIFICATION_CONFIG_DELETE,
+ TASKS_PUSH_NOTIFICATION_CONFIG_GET,
+ TASKS_PUSH_NOTIFICATION_CONFIG_LIST,
+ TASKS_PUSH_NOTIFICATION_CONFIG_SET,
+ TASKS_RESUBSCRIBE ->
+ throw new UnsupportedOperationException("Unimplemented case: " + args.getMethod());
+ };
+
+ Parameters parameters = args.getParameters();
+ Optional>> mapResolver;
+ if (parameters == null) {
+ mapResolver = Optional.empty();
+ } else {
+ WithA2AParameters a2aParameters = parameters.getWithA2AParameters();
+ mapResolver =
+ Optional.of(
+ WorkflowUtils.buildMapResolver(
+ definition.application(),
+ parameters.getString(),
+ a2aParameters != null ? a2aParameters.getAdditionalProperties() : null));
+ }
+ return () -> new A2AExecutor(uriSupplier, dispatcher, mapResolver);
+ }
+}
diff --git a/impl/a2a/src/main/java/io/serverlessworkflow/impl/executors/a2a/A2ARequestDispatcher.java b/impl/a2a/src/main/java/io/serverlessworkflow/impl/executors/a2a/A2ARequestDispatcher.java
new file mode 100644
index 000000000..6e8c91b17
--- /dev/null
+++ b/impl/a2a/src/main/java/io/serverlessworkflow/impl/executors/a2a/A2ARequestDispatcher.java
@@ -0,0 +1,34 @@
+/*
+ * Copyright 2020-Present The Serverless Workflow Specification Authors
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package io.serverlessworkflow.impl.executors.a2a;
+
+import io.serverlessworkflow.impl.TaskContext;
+import io.serverlessworkflow.impl.WorkflowContext;
+import io.serverlessworkflow.impl.WorkflowModel;
+import java.util.Map;
+import java.util.concurrent.CompletableFuture;
+import org.a2aproject.sdk.client.Client;
+import org.a2aproject.sdk.spec.AgentCard;
+
+@FunctionalInterface
+interface A2ARequestDispatcher {
+ CompletableFuture apply(
+ AgentCard agentCard,
+ Client client,
+ Map parameters,
+ WorkflowContext workflowContext,
+ TaskContext taskContext);
+}
diff --git a/impl/a2a/src/main/java/io/serverlessworkflow/impl/executors/a2a/A2AUtils.java b/impl/a2a/src/main/java/io/serverlessworkflow/impl/executors/a2a/A2AUtils.java
new file mode 100644
index 000000000..75c844350
--- /dev/null
+++ b/impl/a2a/src/main/java/io/serverlessworkflow/impl/executors/a2a/A2AUtils.java
@@ -0,0 +1,137 @@
+/*
+ * Copyright 2020-Present The Serverless Workflow Specification Authors
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package io.serverlessworkflow.impl.executors.a2a;
+
+import io.serverlessworkflow.impl.WorkflowContext;
+import io.serverlessworkflow.impl.WorkflowError;
+import io.serverlessworkflow.impl.WorkflowException;
+import io.serverlessworkflow.impl.WorkflowModel;
+import io.serverlessworkflow.impl.WorkflowModelFactory;
+import io.serverlessworkflow.impl.WorkflowPosition;
+import io.serverlessworkflow.types.Errors;
+import java.time.Instant;
+import java.util.Map;
+import java.util.Optional;
+import java.util.function.Consumer;
+import java.util.function.Supplier;
+import org.a2aproject.sdk.spec.Message;
+import org.a2aproject.sdk.spec.Task;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+class A2AUtils {
+
+ static final String TASK_ID = "taskId";
+
+ private static final Logger logger = LoggerFactory.getLogger(A2AUtils.class);
+
+ static T param(Map map, String key, Class instanceClass) {
+ Object obj = map.get(key);
+ return isInstanceOrThrow(
+ obj,
+ instanceClass,
+ () -> "value " + obj + " for key " + key + " is not an instance of type " + instanceClass);
+ }
+
+ static void paramThen(
+ Map map, String key, Class instanceClass, Consumer consumer) {
+ isInstanceThen(map.get(key), instanceClass, consumer);
+ }
+
+ static void isInstanceThen(Object obj, Class instanceClass, Consumer consumer) {
+ isInstance(obj, instanceClass).ifPresent(consumer);
+ }
+
+ static T optionalParam(
+ Map map, String key, Class instanceClass, Supplier defaultValue) {
+ return isInstanceOrDefault(map.get(key), instanceClass, defaultValue);
+ }
+
+ static > T enumParam(
+ Map map, String key, Class instanceClass, T defaultValue) {
+ Object obj = map.get(key);
+
+ if (instanceClass.isInstance(obj)) {
+ return instanceClass.cast(obj);
+ } else if (String.class.isInstance(obj)) {
+ return Enum.valueOf(instanceClass, String.class.cast(obj));
+ } else {
+ return defaultValue;
+ }
+ }
+
+ static T optionalParam(Map map, String key, Class instanceClass) {
+ return isInstanceOrDefault(map.get(key), instanceClass, () -> null);
+ }
+
+ static T isInstanceOrThrow(Object obj, Class instanceClass, Supplier message) {
+ return isInstance(obj, instanceClass)
+ .orElseThrow(() -> new IllegalArgumentException(message.get()));
+ }
+
+ static T isInstanceOrDefault(Object obj, Class instanceClass, Supplier defaultValue) {
+ return isInstance(obj, instanceClass)
+ .orElseGet(
+ () -> {
+ if (obj != null) {
+ logger.warn(
+ "Object {} is expected to be of class {} but it is of class {}. Using provided default",
+ obj,
+ instanceClass.getName(),
+ obj.getClass().getName());
+ }
+ return defaultValue.get();
+ });
+ }
+
+ private static Optional isInstance(Object obj, Class instanceClass) {
+ if (instanceClass.isInstance(obj)) {
+ return Optional.of(instanceClass.cast(obj));
+ } else if (Instant.class.isAssignableFrom(instanceClass) && String.class.isInstance(obj)) {
+ return Optional.of(instanceClass.cast(Instant.parse(String.class.cast(obj))));
+ } else {
+ return Optional.empty();
+ }
+ }
+
+ static WorkflowModel fromTask(WorkflowModelFactory factory, Task task) {
+ return factory.fromOther(task);
+ }
+
+ static WorkflowModel fromTask(WorkflowContext context, Task task) {
+ return fromTask(context.definition().application().modelFactory(), task);
+ }
+
+ static WorkflowModel fromMessage(WorkflowModelFactory factory, Message message) {
+ return factory.fromOther(message);
+ }
+
+ static WorkflowError.Builder workflowError(WorkflowPosition position) {
+ return WorkflowError.error(Errors.RUNTIME.toString(), Errors.RUNTIME.status())
+ .instance(position.jsonPointer());
+ }
+
+ static WorkflowException workflowException(WorkflowPosition position, Throwable ex) {
+ return new WorkflowException(
+ A2AUtils.workflowError(position)
+ .title(ex.getMessage())
+ .details(WorkflowError.getStackTrace(ex))
+ .build(),
+ ex);
+ }
+
+ private A2AUtils() {}
+}
diff --git a/impl/a2a/src/main/java/io/serverlessworkflow/impl/executors/a2a/CancelTaskDispatcher.java b/impl/a2a/src/main/java/io/serverlessworkflow/impl/executors/a2a/CancelTaskDispatcher.java
new file mode 100644
index 000000000..eb27c43b8
--- /dev/null
+++ b/impl/a2a/src/main/java/io/serverlessworkflow/impl/executors/a2a/CancelTaskDispatcher.java
@@ -0,0 +1,47 @@
+/*
+ * Copyright 2020-Present The Serverless Workflow Specification Authors
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package io.serverlessworkflow.impl.executors.a2a;
+
+import static io.serverlessworkflow.impl.executors.a2a.A2AUtils.param;
+
+import io.serverlessworkflow.impl.TaskContext;
+import io.serverlessworkflow.impl.WorkflowContext;
+import io.serverlessworkflow.impl.WorkflowModel;
+import java.util.Map;
+import java.util.concurrent.CompletableFuture;
+import org.a2aproject.sdk.client.Client;
+import org.a2aproject.sdk.spec.AgentCard;
+import org.a2aproject.sdk.spec.CancelTaskParams;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+class CancelTaskDispatcher implements A2ARequestDispatcher {
+
+ private static final Logger logger = LoggerFactory.getLogger(CancelTaskDispatcher.class);
+
+ @Override
+ public CompletableFuture apply(
+ AgentCard agentCard,
+ Client client,
+ Map parameters,
+ WorkflowContext workflowContext,
+ TaskContext taskContext) {
+ String taskId = param(parameters, A2AUtils.TASK_ID, String.class);
+ logger.debug("Cancelling task {}", taskId);
+ return CompletableFuture.completedFuture(
+ A2AUtils.fromTask(workflowContext, client.cancelTask(new CancelTaskParams(taskId))));
+ }
+}
diff --git a/impl/a2a/src/main/java/io/serverlessworkflow/impl/executors/a2a/GetTaskDispatcher.java b/impl/a2a/src/main/java/io/serverlessworkflow/impl/executors/a2a/GetTaskDispatcher.java
new file mode 100644
index 000000000..0b0526678
--- /dev/null
+++ b/impl/a2a/src/main/java/io/serverlessworkflow/impl/executors/a2a/GetTaskDispatcher.java
@@ -0,0 +1,47 @@
+/*
+ * Copyright 2020-Present The Serverless Workflow Specification Authors
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package io.serverlessworkflow.impl.executors.a2a;
+
+import static io.serverlessworkflow.impl.executors.a2a.A2AUtils.param;
+
+import io.serverlessworkflow.impl.TaskContext;
+import io.serverlessworkflow.impl.WorkflowContext;
+import io.serverlessworkflow.impl.WorkflowModel;
+import java.util.Map;
+import java.util.concurrent.CompletableFuture;
+import org.a2aproject.sdk.client.Client;
+import org.a2aproject.sdk.spec.AgentCard;
+import org.a2aproject.sdk.spec.TaskQueryParams;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+class GetTaskDispatcher implements A2ARequestDispatcher {
+
+ private static final Logger logger = LoggerFactory.getLogger(GetTaskDispatcher.class);
+
+ @Override
+ public CompletableFuture apply(
+ AgentCard agentCard,
+ Client client,
+ Map parameters,
+ WorkflowContext workflowContext,
+ TaskContext taskContext) {
+ String taskId = param(parameters, A2AUtils.TASK_ID, String.class);
+ logger.debug("Getting information of task {}", taskId);
+ return CompletableFuture.completedFuture(
+ A2AUtils.fromTask(workflowContext, client.getTask(new TaskQueryParams(taskId))));
+ }
+}
diff --git a/impl/a2a/src/main/java/io/serverlessworkflow/impl/executors/a2a/ListTaskDispatcher.java b/impl/a2a/src/main/java/io/serverlessworkflow/impl/executors/a2a/ListTaskDispatcher.java
new file mode 100644
index 000000000..8294b42c2
--- /dev/null
+++ b/impl/a2a/src/main/java/io/serverlessworkflow/impl/executors/a2a/ListTaskDispatcher.java
@@ -0,0 +1,69 @@
+/*
+ * Copyright 2020-Present The Serverless Workflow Specification Authors
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package io.serverlessworkflow.impl.executors.a2a;
+
+import static io.serverlessworkflow.impl.executors.a2a.A2AUtils.enumParam;
+import static io.serverlessworkflow.impl.executors.a2a.A2AUtils.optionalParam;
+
+import io.serverlessworkflow.impl.TaskContext;
+import io.serverlessworkflow.impl.WorkflowContext;
+import io.serverlessworkflow.impl.WorkflowModel;
+import io.serverlessworkflow.impl.WorkflowModelCollection;
+import io.serverlessworkflow.impl.WorkflowModelFactory;
+import java.time.Instant;
+import java.util.Map;
+import java.util.Optional;
+import java.util.concurrent.CompletableFuture;
+import org.a2aproject.sdk.client.Client;
+import org.a2aproject.sdk.jsonrpc.common.wrappers.ListTasksResult;
+import org.a2aproject.sdk.spec.AgentCard;
+import org.a2aproject.sdk.spec.ListTasksParams;
+import org.a2aproject.sdk.spec.TaskState;
+
+class ListTaskDispatcher implements A2ARequestDispatcher {
+
+ @Override
+ public CompletableFuture apply(
+ AgentCard agentCard,
+ Client client,
+ Map parameters,
+ WorkflowContext workflowContext,
+ TaskContext taskContext) {
+ ListTasksResult tasks =
+ client.listTasks(
+ new ListTasksParams(
+ optionalParam(parameters, "contextId", String.class),
+ enumParam(parameters, "status", TaskState.class, null),
+ optionalParam(parameters, "pageSize", Integer.class),
+ optionalParam(parameters, "pageToken", String.class),
+ optionalParam(parameters, "historyLength", Integer.class),
+ optionalParam(parameters, "statusTimestampAfter", Instant.class),
+ optionalParam(parameters, "includeArtifacts", Boolean.class),
+ optionalParam(
+ parameters,
+ "tenant",
+ String.class,
+ () ->
+ Optional.ofNullable(
+ agentCard.supportedInterfaces().iterator().next().tenant())
+ .orElse(""))));
+
+ WorkflowModelFactory factory = workflowContext.definition().application().modelFactory();
+ WorkflowModelCollection model = factory.createCollection();
+ tasks.tasks().forEach(t -> model.add(A2AUtils.fromTask(factory, t)));
+ return CompletableFuture.completedFuture(model);
+ }
+}
diff --git a/impl/a2a/src/main/java/io/serverlessworkflow/impl/executors/a2a/MessageConsumer.java b/impl/a2a/src/main/java/io/serverlessworkflow/impl/executors/a2a/MessageConsumer.java
new file mode 100644
index 000000000..2bc06aa0f
--- /dev/null
+++ b/impl/a2a/src/main/java/io/serverlessworkflow/impl/executors/a2a/MessageConsumer.java
@@ -0,0 +1,36 @@
+/*
+ * Copyright 2020-Present The Serverless Workflow Specification Authors
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package io.serverlessworkflow.impl.executors.a2a;
+
+import io.serverlessworkflow.impl.WorkflowDefinition;
+import io.serverlessworkflow.impl.WorkflowModel;
+import io.serverlessworkflow.impl.WorkflowModelFactory;
+import java.util.concurrent.CompletableFuture;
+import java.util.function.BiConsumer;
+import org.a2aproject.sdk.client.ClientEvent;
+import org.a2aproject.sdk.spec.AgentCard;
+
+abstract class MessageConsumer implements BiConsumer {
+
+ protected final CompletableFuture completableFuture;
+ protected final WorkflowModelFactory factory;
+
+ public MessageConsumer(
+ WorkflowDefinition definition, CompletableFuture completableFuture) {
+ this.completableFuture = completableFuture;
+ factory = definition.application().modelFactory();
+ }
+}
diff --git a/impl/a2a/src/main/java/io/serverlessworkflow/impl/executors/a2a/MessageConsumerFactory.java b/impl/a2a/src/main/java/io/serverlessworkflow/impl/executors/a2a/MessageConsumerFactory.java
new file mode 100644
index 000000000..88b58e9df
--- /dev/null
+++ b/impl/a2a/src/main/java/io/serverlessworkflow/impl/executors/a2a/MessageConsumerFactory.java
@@ -0,0 +1,38 @@
+/*
+ * Copyright 2020-Present The Serverless Workflow Specification Authors
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package io.serverlessworkflow.impl.executors.a2a;
+
+import io.serverlessworkflow.impl.TaskContext;
+import io.serverlessworkflow.impl.WorkflowContext;
+import io.serverlessworkflow.impl.WorkflowModel;
+import java.util.concurrent.CompletableFuture;
+import java.util.function.Consumer;
+
+@FunctionalInterface
+interface MessageConsumerFactory {
+
+ MessageConsumer buildConsumer(
+ WorkflowContext workflowContext,
+ TaskContext taskContext,
+ CompletableFuture completableFuture);
+
+ default Consumer buildExceptionHandler(
+ WorkflowContext workflowContext,
+ TaskContext taskContext,
+ CompletableFuture completableFuture) {
+ return new A2AExceptionHandler(completableFuture, taskContext.position());
+ }
+}
diff --git a/impl/a2a/src/main/java/io/serverlessworkflow/impl/executors/a2a/MessageDispatcher.java b/impl/a2a/src/main/java/io/serverlessworkflow/impl/executors/a2a/MessageDispatcher.java
new file mode 100644
index 000000000..385b9932c
--- /dev/null
+++ b/impl/a2a/src/main/java/io/serverlessworkflow/impl/executors/a2a/MessageDispatcher.java
@@ -0,0 +1,92 @@
+/*
+ * Copyright 2020-Present The Serverless Workflow Specification Authors
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package io.serverlessworkflow.impl.executors.a2a;
+
+import static io.serverlessworkflow.impl.executors.a2a.A2AUtils.isInstanceOrThrow;
+import static io.serverlessworkflow.impl.executors.a2a.A2AUtils.optionalParam;
+import static io.serverlessworkflow.impl.executors.a2a.A2AUtils.param;
+
+import io.serverlessworkflow.impl.TaskContext;
+import io.serverlessworkflow.impl.WorkflowContext;
+import io.serverlessworkflow.impl.WorkflowModel;
+import java.util.ArrayList;
+import java.util.Collection;
+import java.util.List;
+import java.util.Map;
+import java.util.concurrent.CompletableFuture;
+import org.a2aproject.sdk.client.Client;
+import org.a2aproject.sdk.spec.AgentCard;
+import org.a2aproject.sdk.spec.DataPart;
+import org.a2aproject.sdk.spec.Message;
+import org.a2aproject.sdk.spec.Message.Role;
+import org.a2aproject.sdk.spec.Part;
+import org.a2aproject.sdk.spec.TextPart;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+class MessageDispatcher implements A2ARequestDispatcher {
+
+ private static final Logger logger = LoggerFactory.getLogger(MessageDispatcher.class);
+ private final MessageConsumerFactory consumerFactory;
+
+ public MessageDispatcher(MessageConsumerFactory consumerFactory) {
+ this.consumerFactory = consumerFactory;
+ }
+
+ @Override
+ public CompletableFuture apply(
+ AgentCard agentCard,
+ Client client,
+ Map parameters,
+ WorkflowContext workflowContext,
+ TaskContext taskContext) {
+ CompletableFuture future = new CompletableFuture<>();
+ MessageConsumer consumer = consumerFactory.buildConsumer(workflowContext, taskContext, future);
+ Message message = buildMessage(parameters);
+ logger.debug("Sending message {}", message);
+ client.sendMessage(
+ message,
+ List.of(consumer),
+ consumerFactory.buildExceptionHandler(workflowContext, taskContext, future));
+ return future;
+ }
+
+ @SuppressWarnings({"rawtypes", "unchecked"})
+ private Message buildMessage(Map parameters) {
+
+ Map message = param(parameters, "message", Map.class);
+ Message.Builder messageBuilder = Message.builder();
+
+ Collection items = param(message, "parts", Collection.class);
+ List> parts = new ArrayList<>();
+ for (Object item : items) {
+ Map part = isInstanceOrThrow(item, Map.class, () -> "One item of parts is not an object");
+ String kind = optionalParam(part, "kind", String.class, () -> "text");
+ parts.add(
+ switch (kind) {
+ case TextPart.TEXT -> new TextPart(param(part, TextPart.TEXT, String.class));
+ case DataPart.DATA -> new DataPart(param(part, DataPart.DATA, Object.class));
+ default -> throw new UnsupportedOperationException("Unimplemented kind: " + kind);
+ });
+ }
+ messageBuilder.parts(parts);
+ A2AUtils.paramThen(message, "messageId", String.class, messageBuilder::messageId);
+ A2AUtils.paramThen(message, "contextId", String.class, messageBuilder::contextId);
+
+ messageBuilder.role(A2AUtils.enumParam(message, "role", Role.class, Role.ROLE_USER));
+ return messageBuilder.build();
+ }
+}
diff --git a/impl/a2a/src/main/java/io/serverlessworkflow/impl/executors/a2a/MessageSendConsumer.java b/impl/a2a/src/main/java/io/serverlessworkflow/impl/executors/a2a/MessageSendConsumer.java
new file mode 100644
index 000000000..5ac792bd2
--- /dev/null
+++ b/impl/a2a/src/main/java/io/serverlessworkflow/impl/executors/a2a/MessageSendConsumer.java
@@ -0,0 +1,41 @@
+/*
+ * Copyright 2020-Present The Serverless Workflow Specification Authors
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package io.serverlessworkflow.impl.executors.a2a;
+
+import io.serverlessworkflow.impl.WorkflowDefinition;
+import io.serverlessworkflow.impl.WorkflowModel;
+import java.util.concurrent.CompletableFuture;
+import org.a2aproject.sdk.client.ClientEvent;
+import org.a2aproject.sdk.client.MessageEvent;
+import org.a2aproject.sdk.client.TaskEvent;
+import org.a2aproject.sdk.spec.AgentCard;
+
+class MessageSendConsumer extends MessageConsumer {
+
+ public MessageSendConsumer(
+ WorkflowDefinition definition, CompletableFuture completableFuture) {
+ super(definition, completableFuture);
+ }
+
+ @Override
+ public void accept(ClientEvent event, AgentCard card) {
+ if (event instanceof MessageEvent resp) {
+ completableFuture.complete(A2AUtils.fromMessage(factory, resp.getMessage()));
+ } else if (event instanceof TaskEvent resp) {
+ completableFuture.complete(A2AUtils.fromTask(factory, resp.getTask()));
+ }
+ }
+}
diff --git a/impl/a2a/src/main/java/io/serverlessworkflow/impl/executors/a2a/MessageStreamConsumer.java b/impl/a2a/src/main/java/io/serverlessworkflow/impl/executors/a2a/MessageStreamConsumer.java
new file mode 100644
index 000000000..bd5e386e6
--- /dev/null
+++ b/impl/a2a/src/main/java/io/serverlessworkflow/impl/executors/a2a/MessageStreamConsumer.java
@@ -0,0 +1,73 @@
+/*
+ * Copyright 2020-Present The Serverless Workflow Specification Authors
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package io.serverlessworkflow.impl.executors.a2a;
+
+import io.serverlessworkflow.impl.WorkflowDefinition;
+import io.serverlessworkflow.impl.WorkflowException;
+import io.serverlessworkflow.impl.WorkflowModel;
+import io.serverlessworkflow.impl.WorkflowPosition;
+import java.util.concurrent.CompletableFuture;
+import org.a2aproject.sdk.client.ClientEvent;
+import org.a2aproject.sdk.client.MessageEvent;
+import org.a2aproject.sdk.client.TaskEvent;
+import org.a2aproject.sdk.client.TaskUpdateEvent;
+import org.a2aproject.sdk.spec.AgentCard;
+import org.a2aproject.sdk.spec.Task;
+
+class MessageStreamConsumer extends MessageConsumer {
+
+ private final WorkflowPosition position;
+
+ public MessageStreamConsumer(
+ WorkflowDefinition definition,
+ CompletableFuture completableFuture,
+ WorkflowPosition position) {
+ super(definition, completableFuture);
+ this.position = position;
+ }
+
+ @Override
+ public void accept(ClientEvent event, AgentCard card) {
+ if (event instanceof MessageEvent resp) {
+ completableFuture.complete(A2AUtils.fromMessage(factory, resp.getMessage()));
+ } else if (event instanceof TaskUpdateEvent resp) {
+ checkTaskCompletion(resp.getTask());
+ } else if (event instanceof TaskEvent resp) {
+ checkTaskCompletion(resp.getTask());
+ }
+ }
+
+ private void checkTaskCompletion(Task task) {
+ switch (task.status().state()) {
+ case TASK_STATE_REJECTED, TASK_STATE_FAILED, TASK_STATE_CANCELED:
+ completableFuture.completeExceptionally(exception(task));
+ break;
+ case TASK_STATE_COMPLETED:
+ completableFuture.complete(A2AUtils.fromTask(factory, task));
+ break;
+ default:
+ // do nothing
+ }
+ }
+
+ private WorkflowException exception(Task task) {
+ return new WorkflowException(
+ A2AUtils.workflowError(position)
+ .title(task.status().state().toString())
+ .details(task.history().toString())
+ .build());
+ }
+}
diff --git a/impl/a2a/src/main/resources/META-INF/services/io.serverlessworkflow.impl.executors.CallableTaskBuilder b/impl/a2a/src/main/resources/META-INF/services/io.serverlessworkflow.impl.executors.CallableTaskBuilder
new file mode 100644
index 000000000..bc458e2b9
--- /dev/null
+++ b/impl/a2a/src/main/resources/META-INF/services/io.serverlessworkflow.impl.executors.CallableTaskBuilder
@@ -0,0 +1 @@
+io.serverlessworkflow.impl.executors.a2a.A2AExecutorBuilder
diff --git a/impl/core/src/main/java/io/serverlessworkflow/impl/WorkflowError.java b/impl/core/src/main/java/io/serverlessworkflow/impl/WorkflowError.java
index bd0d84d51..ee2c4a6c5 100644
--- a/impl/core/src/main/java/io/serverlessworkflow/impl/WorkflowError.java
+++ b/impl/core/src/main/java/io/serverlessworkflow/impl/WorkflowError.java
@@ -83,16 +83,20 @@ private static WorkflowError error(Throwable cause, WorkflowPosition position) {
}
private static WorkflowError commonError(Throwable cause, WorkflowPosition position) {
+ return new WorkflowError(
+ cause.getClass().getTypeName(),
+ 500,
+ position == null ? null : position.jsonPointer(),
+ cause.getMessage(),
+ getStackTrace(cause));
+ }
+
+ public static String getStackTrace(Throwable cause) {
StringWriter stackTrace = new StringWriter();
try (PrintWriter writer = new PrintWriter(stackTrace)) {
cause.printStackTrace(writer);
- return new WorkflowError(
- cause.getClass().getTypeName(),
- 500,
- position == null ? null : position.jsonPointer(),
- cause.getMessage(),
- stackTrace.toString());
}
+ return stackTrace.toString();
}
@Deprecated
diff --git a/impl/pom.xml b/impl/pom.xml
index 72737c6db..8c7b1249d 100644
--- a/impl/pom.xml
+++ b/impl/pom.xml
@@ -17,6 +17,8 @@
9.2.1
3.7.1
25.0.3
+ 1.0.0.Final
+ 2.14.0
@@ -123,6 +125,11 @@
serverlessworkflow-impl-grpc
${project.version}
+
+ io.serverlessworkflow
+ serverlessworkflow-impl-a2a
+ ${project.version}
+
net.thisptr
jackson-jq
@@ -191,6 +198,16 @@
polyglot
${version.org.graalvm.polyglot}
+
+ org.a2aproject.sdk
+ a2a-java-sdk-client
+ ${version.org.a2aproject.sdk}
+
+
+ com.google.code.gson
+ gson
+ ${version.com.google.code.gson}
+
@@ -213,5 +230,6 @@
python
grpc
openapi-jackson
+ a2a
diff --git a/impl/test/pom.xml b/impl/test/pom.xml
index 49ad1d893..165ef1a98 100644
--- a/impl/test/pom.xml
+++ b/impl/test/pom.xml
@@ -42,6 +42,10 @@
io.serverlessworkflow
serverlessworkflow-impl-script-python
+
+ io.serverlessworkflow
+ serverlessworkflow-impl-a2a
+
org.glassfish.jersey.media
jersey-media-json-jackson
diff --git a/impl/test/src/test/java/io/serverlessworkflow/impl/test/A2AExecutorTest.java b/impl/test/src/test/java/io/serverlessworkflow/impl/test/A2AExecutorTest.java
new file mode 100644
index 000000000..136083cbc
--- /dev/null
+++ b/impl/test/src/test/java/io/serverlessworkflow/impl/test/A2AExecutorTest.java
@@ -0,0 +1,231 @@
+/*
+ * Copyright 2020-Present The Serverless Workflow Specification Authors
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package io.serverlessworkflow.impl.test;
+
+import static io.serverlessworkflow.api.WorkflowReader.readWorkflowFromClasspath;
+import static org.assertj.core.api.Assertions.assertThat;
+import static org.assertj.core.api.Assertions.assertThatThrownBy;
+
+import io.serverlessworkflow.impl.WorkflowApplication;
+import io.serverlessworkflow.impl.WorkflowDefinition;
+import io.serverlessworkflow.impl.WorkflowException;
+import java.io.IOException;
+import java.util.concurrent.ExecutionException;
+import java.util.concurrent.TimeUnit;
+import java.util.concurrent.TimeoutException;
+import okhttp3.mockwebserver.MockResponse;
+import okhttp3.mockwebserver.MockWebServer;
+import okio.Buffer;
+import org.junit.jupiter.api.AfterEach;
+import org.junit.jupiter.api.BeforeEach;
+import org.junit.jupiter.api.Test;
+
+class A2AExecutorTest {
+
+ private MockWebServer apiServer;
+
+ @BeforeEach
+ public void setUp() throws IOException {
+ apiServer = new MockWebServer();
+ apiServer.start(11111);
+ mockAgentCard();
+ }
+
+ @AfterEach
+ public void tearDown() throws IOException {
+ apiServer.close();
+ }
+
+ @Test
+ void testSendMessageMessage()
+ throws IOException, InterruptedException, ExecutionException, TimeoutException {
+ apiServer.enqueue(
+ new MockResponse()
+ .setResponseCode(200)
+ .setHeader("Content-Type", "application/json")
+ .setBody(
+ "{\"jsonrpc\":\"2.0\",\"id\":\"14fc4dbc-989e-4f5b-a1bf-da25a7a2c10c\",\"result\":{\"message\":{\"messageId\":\"8545ebb6-2d8a-4676-8698-932f36c47e90\",\"contextId\":\"028f609d-c842-4851-afeb-5e61bd6dc3d1\",\"taskId\":\"4bfdadfc-3295-4019-b78f-e1681c86d6e9\",\"role\":\"ROLE_AGENT\",\"parts\":[{\"text\":\"Hello World\",\"metadata\":{},\"filename\":\"\",\"mediaType\":\"\"}],\"metadata\":{},\"extensions\":[],\"referenceTaskIds\":[]}}}"));
+ try (WorkflowApplication appl = WorkflowApplication.builder().build()) {
+ WorkflowDefinition def =
+ appl.workflowDefinition(
+ readWorkflowFromClasspath("workflows-samples/a2a/a2a-hello-world.yaml"));
+ assertThat(def.instance().start().get(1, TimeUnit.SECONDS).asJavaObject())
+ .isEqualTo("Hello World");
+ }
+ }
+
+ private static final String SSE_STREAM =
+ "data: {\"jsonrpc\": \"2.0\",\"id\": \"363422be-b0f9-4692-a24d-278670e7c7f1\",\"result\":%s}\nid: %d\n\n";
+
+ private String getStreamBody(String json, int streamId) {
+ return String.format(SSE_STREAM, json, streamId);
+ }
+
+ @Test
+ void testSendMessageTask()
+ throws IOException, InterruptedException, ExecutionException, TimeoutException {
+ apiServer.enqueue(
+ new MockResponse()
+ .setResponseCode(200)
+ .setHeader("Content-Type", "text/event-stream")
+ .setBody(
+ getStreamBody(
+ "{\"task\":{"
+ + " \"id\": \"363422be-b0f9-4692-a24d-278670e7c7f1\","
+ + " \"contextId\": \"c295ea44-7543-4f78-b524-7a38915ad6e4\","
+ + " \"status\": {"
+ + " \"state\": \"TASK_STATE_COMPLETED\""
+ + " },"
+ + " \"artifacts\": ["
+ + " {"
+ + " \"artifactId\": \"9b6934dd-37e3-4eb1-8766-962efaab63a1\","
+ + " \"name\": \"joke\","
+ + " \"parts\": ["
+ + " {"
+ + " \"text\": \"Why did the chicken cross the road? To get to the other side!\""
+ + " }"
+ + " ]"
+ + " }"
+ + " ],"
+ + " \"history\": ["
+ + " {"
+ + " \"role\": \"ROLE_USER\","
+ + " \"parts\": ["
+ + " {"
+ + " \"text\": \"tell me a joke\""
+ + " }"
+ + " ],"
+ + " \"messageId\": \"9229e770-767c-417b-a0b0-f0741243c589\","
+ + " \"taskId\": \"363422be-b0f9-4692-a24d-278670e7c7f1\","
+ + " \"contextId\": \"c295ea44-7543-4f78-b524-7a38915ad6e4\""
+ + " }"
+ + " ],"
+ + " \"metadata\": {}"
+ + " }}",
+ 0)));
+
+ try (WorkflowApplication appl = WorkflowApplication.builder().build()) {
+ WorkflowDefinition def =
+ appl.workflowDefinition(
+ readWorkflowFromClasspath("workflows-samples/a2a/a2a-tell-joke.yaml"));
+ assertThat(def.instance().start().get(1, TimeUnit.SECONDS).asJavaObject())
+ .isEqualTo("Why did the chicken cross the road? To get to the other side!");
+ }
+ }
+
+ @Test
+ void testSendMessageStream()
+ throws IOException, InterruptedException, ExecutionException, TimeoutException {
+
+ Buffer buffer = new Buffer();
+ buffer.writeUtf8(
+ getStreamBody(
+ "{\"task\":{\"id\":\"1ae7e733-129b-4454-b0d2-0811d945e7bb\",\"contextId\":\"53a10b19-3ef7-4818-8365-2a84160c06ff\",\"status\":{\"state\":\"TASK_STATE_SUBMITTED\",\"timestamp\":\"2026-06-23T16:23:42.315374250Z\"},\"artifacts\":[],\"history\":[{\"messageId\":\"9229e770-767c-417b-a0b0-f0741243c589\",\"contextId\":\"53a10b19-3ef7-4818-8365-2a84160c06ff\",\"taskId\":\"1ae7e733-129b-4454-b0d2-0811d945e7bb\",\"role\":\"ROLE_USER\",\"parts\":[{\"text\":\"why are we here?\",\"metadata\":{},\"filename\":\"\",\"mediaType\":\"\"}],\"metadata\":{},\"extensions\":[],\"referenceTaskIds\":[]}],\"metadata\":{}}}",
+ 0));
+ buffer.writeUtf8(
+ getStreamBody(
+ "{\"artifactUpdate\":{\"taskId\":\"1ae7e733-129b-4454-b0d2-0811d945e7bb\",\"contextId\":\"53a10b19-3ef7-4818-8365-2a84160c06ff\",\"artifact\":{\"artifactId\":\"4ad59044-7d52-454c-84b9-f9d49594abed\",\"name\":\"\",\"description\":\"\",\"parts\":[{\"text\":\"After some time thinking about your complex question, I feel emptiness and decide to close the task without answering\",\"metadata\":{},\"filename\":\"\",\"mediaType\":\"\"}],\"metadata\":{},\"extensions\":[]},\"append\":false,\"lastChunk\":false,\"metadata\":{}}}",
+ 1));
+ buffer.writeUtf8(
+ getStreamBody(
+ "{\"statusUpdate\":{\"taskId\":\"1ae7e733-129b-4454-b0d2-0811d945e7bb\",\"contextId\":\"53a10b19-3ef7-4818-8365-2a84160c06ff\",\"status\":{\"state\":\"TASK_STATE_COMPLETED\",\"timestamp\":\"2026-06-23T16:23:42.315784582Z\"},\"metadata\":{}}}",
+ 2));
+ apiServer.enqueue(
+ new MockResponse()
+ .setResponseCode(200)
+ .setHeader("Content-Type", "text/event-stream")
+ .setBody(buffer)
+ .setBodyDelay(100, TimeUnit.MILLISECONDS));
+
+ try (WorkflowApplication appl = WorkflowApplication.builder().build()) {
+ WorkflowDefinition def =
+ appl.workflowDefinition(
+ readWorkflowFromClasspath("workflows-samples/a2a/a2a-life-meaning.yaml"));
+ assertThat(def.instance().start().get(1, TimeUnit.SECONDS).asJavaObject())
+ .isEqualTo(
+ "After some time thinking about your complex question, I feel emptiness and decide to close the task without answering");
+ }
+ }
+
+ @Test
+ void testListGetCancelTask()
+ throws IOException, InterruptedException, ExecutionException, TimeoutException {
+
+ try (WorkflowApplication appl = WorkflowApplication.builder().build()) {
+ Buffer buffer = new Buffer();
+ buffer.writeUtf8(
+ getStreamBody(
+ "{\"task\":{\"id\":\"12253522-b561-4d7a-8fd7-d3a71e465f67\",\"contextId\":\"53a10b19-3ef7-4818-8365-2a84160c06ff\",\"status\":{\"state\":\"TASK_STATE_SUBMITTED\",\"timestamp\":\"2026-06-23T16:23:42.315374250Z\"},\"artifacts\":[],\"history\":[{\"messageId\":\"9229e770-767c-417b-a0b0-f0741243c589\",\"contextId\":\"53a10b19-3ef7-4818-8365-2a84160c06ff\",\"taskId\":\"1ae7e733-129b-4454-b0d2-0811d945e7bb\",\"role\":\"ROLE_USER\",\"parts\":[{\"text\":\"why are we here?\",\"metadata\":{},\"filename\":\"\",\"mediaType\":\"\"}],\"metadata\":{},\"extensions\":[],\"referenceTaskIds\":[]}],\"metadata\":{}}}",
+ 0));
+ buffer.writeUtf8(
+ getStreamBody(
+ "{\"artifactUpdate\":{\"taskId\":\"12253522-b561-4d7a-8fd7-d3a71e465f67\",\"contextId\":\"53a10b19-3ef7-4818-8365-2a84160c06ff\",\"artifact\":{\"artifactId\":\"4ad59044-7d52-454c-84b9-f9d49594abed\",\"name\":\"\",\"description\":\"\",\"parts\":[{\"text\":\"After some time thinking about your complex question, I feel emptiness and decide to close the task without answering\",\"metadata\":{},\"filename\":\"\",\"mediaType\":\"\"}],\"metadata\":{},\"extensions\":[]},\"append\":false,\"lastChunk\":false,\"metadata\":{}}}",
+ 1));
+ buffer.writeUtf8(
+ getStreamBody(
+ "{\"statusUpdate\":{\"taskId\":\"12253522-b561-4d7a-8fd7-d3a71e465f67\",\"contextId\":\"d0a5bd21-2e32-4b4f-a454-2d769a895710\",\"status\":{\"state\":\"TASK_STATE_CANCELED\",\"timestamp\":\"2026-06-24T12:11:49.591113597Z\"},\"metadata\":{}}}",
+ 2));
+ apiServer.enqueue(
+ new MockResponse()
+ .setResponseCode(200)
+ .setHeader("Content-Type", "text/event-stream")
+ .setBody(buffer));
+ mockAgentCard();
+ apiServer.enqueue(
+ new MockResponse()
+ .setResponseCode(200)
+ .setHeader("Content-Type", "application/json")
+ .setBody(
+ "{\"jsonrpc\":\"2.0\",\"id\":\"a8758715-2018-4f19-b78b-38b484089153\",\"result\":{\"tasks\":[{\"id\":\"12253522-b561-4d7a-8fd7-d3a71e465f67\",\"contextId\":\"d0a5bd21-2e32-4b4f-a454-2d769a895710\",\"status\":{\"state\":\"TASK_STATE_SUBMITTED\",\"timestamp\":\"2026-06-24T12:11:49.468232286Z\"},\"artifacts\":[],\"history\":[],\"metadata\":{}}],\"nextPageToken\":\"\",\"pageSize\":1,\"totalSize\":1}}"));
+
+ mockAgentCard();
+ apiServer.enqueue(
+ new MockResponse()
+ .setResponseCode(200)
+ .setHeader("Content-Type", "application/json")
+ .setBody(
+ "{\"jsonrpc\":\"2.0\",\"id\":\"d5dbc8de-130e-49d1-af88-370551f0ed69\",\"result\":{\"id\":\"12253522-b561-4d7a-8fd7-d3a71e465f67\",\"contextId\":\"d0a5bd21-2e32-4b4f-a454-2d769a895710\",\"status\":{\"state\":\"TASK_STATE_SUBMITTED\",\"timestamp\":\"2026-06-24T12:11:49.468232286Z\"},\"artifacts\":[{\"artifactId\":\"4febb066-3096-43aa-a8e5-ec2e3cabfe8a\",\"name\":\"\",\"description\":\"\",\"parts\":[{\"text\":\"After some time thinking about your complex question, I feel emptiness and decide to close the task without answering\",\"metadata\":{},\"filename\":\"\",\"mediaType\":\"\"}],\"metadata\":{},\"extensions\":[]}],\"history\":[{\"messageId\":\"9229e770-767c-417b-a0b0-f0741243c589\",\"contextId\":\"d0a5bd21-2e32-4b4f-a454-2d769a895710\",\"taskId\":\"12253522-b561-4d7a-8fd7-d3a71e465f67\",\"role\":\"ROLE_USER\",\"parts\":[{\"text\":\"why are we here?\",\"metadata\":{},\"filename\":\"\",\"mediaType\":\"\"}],\"metadata\":{},\"extensions\":[],\"referenceTaskIds\":[]}],\"metadata\":{}}}"));
+
+ mockAgentCard();
+ apiServer.enqueue(
+ new MockResponse()
+ .setResponseCode(200)
+ .setHeader("Content-Type", "application/json")
+ .setBody(
+ "{\"jsonrpc\":\"2.0\",\"id\":\"9c46108c-daae-4eff-af61-a8d42cfd923f\",\"result\":{\"id\":\"12253522-b561-4d7a-8fd7-d3a71e465f67\",\"contextId\":\"d0a5bd21-2e32-4b4f-a454-2d769a895710\",\"status\":{\"state\":\"TASK_STATE_CANCELED\",\"timestamp\":\"2026-06-24T12:11:49.591113597Z\"},\"artifacts\":[{\"artifactId\":\"4febb066-3096-43aa-a8e5-ec2e3cabfe8a\",\"name\":\"\",\"description\":\"\",\"parts\":[{\"text\":\"After some time thinking about your complex question, I feel emptiness and decide to close the task without answering\",\"metadata\":{},\"filename\":\"\",\"mediaType\":\"\"}],\"metadata\":{},\"extensions\":[]}],\"history\":[{\"messageId\":\"9229e770-767c-417b-a0b0-f0741243c589\",\"contextId\":\"d0a5bd21-2e32-4b4f-a454-2d769a895710\",\"taskId\":\"12253522-b561-4d7a-8fd7-d3a71e465f67\",\"role\":\"ROLE_USER\",\"parts\":[{\"text\":\"why are we here?\",\"metadata\":{},\"filename\":\"\",\"mediaType\":\"\"}],\"metadata\":{},\"extensions\":[],\"referenceTaskIds\":[]}],\"metadata\":{}}}"));
+
+ WorkflowDefinition taskDef =
+ appl.workflowDefinition(
+ readWorkflowFromClasspath("workflows-samples/a2a/a2a-life-meaning.yaml"));
+ WorkflowDefinition handlerDef =
+ appl.workflowDefinition(
+ readWorkflowFromClasspath("workflows-samples/a2a/a2a-task-handler.yaml"));
+ assertThatThrownBy(() -> taskDef.instance().start().join())
+ .hasCauseInstanceOf(WorkflowException.class);
+ assertThat(handlerDef.instance().start().join().asMap().orElseThrow().get("id"))
+ .isEqualTo("12253522-b561-4d7a-8fd7-d3a71e465f67");
+ }
+ }
+
+ private void mockAgentCard() {
+ apiServer.enqueue(
+ new MockResponse()
+ .setResponseCode(200)
+ .setHeader("Content-Type", "application/json")
+ .setBody(
+ "{\"name\":\"Hello World Agent\",\"description\":\"Just a hello world agent\",\"version\":\"1.0.0\",\"documentationUrl\":\"http://example.com/docs\",\"capabilities\":{\"streaming\":true,\"pushNotifications\":true,\"extendedAgentCard\":false},\"defaultInputModes\":[\"text\"],\"defaultOutputModes\":[\"text\"],\"skills\":[{\"id\":\"hello_world\",\"name\":\"Returns hello world\",\"description\":\"just returns hello world\",\"tags\":[\"hello world\"],\"examples\":[\"hi\",\"hello world\"]}],\"supportedInterfaces\":[{\"protocolBinding\":\"JSONRPC\",\"url\":\"http://localhost:11111\",\"protocolVersion\":\"1.0\"}],\"preferredTransport\":\"JSONRPC\"}"));
+ }
+}
diff --git a/impl/test/src/test/resources/workflows-samples/a2a/a2a-hello-world.yaml b/impl/test/src/test/resources/workflows-samples/a2a/a2a-hello-world.yaml
new file mode 100644
index 000000000..f8d1da00d
--- /dev/null
+++ b/impl/test/src/test/resources/workflows-samples/a2a/a2a-hello-world.yaml
@@ -0,0 +1,19 @@
+document:
+ dsl: '1.0.3'
+ namespace: test
+ name: a2a-hello-world
+ version: '0.1.0'
+do:
+ - sayHello:
+ call: a2a
+ with:
+ method: message/send
+ agentCard:
+ endpoint: http://localhost:11111
+ parameters:
+ message:
+ parts:
+ - kind: text
+ text: Hello Agent!
+ output:
+ as: .parts | map(.text) | join(" ")
\ No newline at end of file
diff --git a/impl/test/src/test/resources/workflows-samples/a2a/a2a-life-meaning.yaml b/impl/test/src/test/resources/workflows-samples/a2a/a2a-life-meaning.yaml
new file mode 100644
index 000000000..8ca9e1617
--- /dev/null
+++ b/impl/test/src/test/resources/workflows-samples/a2a/a2a-life-meaning.yaml
@@ -0,0 +1,21 @@
+document:
+ dsl: '1.0.3'
+ namespace: test
+ name: a2a-life-meaning
+ version: '0.1.0'
+do:
+ - complexQuestion:
+ call: a2a
+ with:
+ method: message/stream
+ agentCard:
+ endpoint: http://localhost:11111
+ parameters:
+ message:
+ messageId: 9229e770-767c-417b-a0b0-f0741243c589
+ parts:
+ - kind: text
+ text: why are we here?
+ output:
+ as: .artifacts[] | .parts | map (.text) | join (" ")
+
\ No newline at end of file
diff --git a/impl/test/src/test/resources/workflows-samples/a2a/a2a-task-handler.yaml b/impl/test/src/test/resources/workflows-samples/a2a/a2a-task-handler.yaml
new file mode 100644
index 000000000..37647aaa3
--- /dev/null
+++ b/impl/test/src/test/resources/workflows-samples/a2a/a2a-task-handler.yaml
@@ -0,0 +1,30 @@
+document:
+ dsl: '1.0.3'
+ namespace: test
+ name: a2a-task-cancel
+ version: '0.1.0'
+do:
+ - listTasks:
+ call: a2a
+ with:
+ method: tasks/list
+ agentCard:
+ endpoint: http://localhost:11111
+ output:
+ as: .[0]
+ - getTasks:
+ call: a2a
+ with:
+ method: tasks/get
+ agentCard:
+ endpoint: http://localhost:11111
+ parameters:
+ taskId: ${.id}
+ - cancelTasks:
+ call: a2a
+ with:
+ method: tasks/cancel
+ agentCard:
+ endpoint: http://localhost:11111
+ parameters:
+ taskId: ${.id}
\ No newline at end of file
diff --git a/impl/test/src/test/resources/workflows-samples/a2a/a2a-tell-joke.yaml b/impl/test/src/test/resources/workflows-samples/a2a/a2a-tell-joke.yaml
new file mode 100644
index 000000000..cb4c5c790
--- /dev/null
+++ b/impl/test/src/test/resources/workflows-samples/a2a/a2a-tell-joke.yaml
@@ -0,0 +1,21 @@
+document:
+ dsl: '1.0.3'
+ namespace: test
+ name: a2a-tell-joke
+ version: '0.1.0'
+do:
+ - tellJoke:
+ call: a2a
+ with:
+ method: message/send
+ agentCard:
+ endpoint: http://localhost:11111
+ parameters:
+ message:
+ messageId: 9229e770-767c-417b-a0b0-f0741243c589
+ parts:
+ - kind: text
+ text: tell me a joke
+ output:
+ as: .artifacts[] | .parts | map (.text) | join (" ")
+
\ No newline at end of file