From 6132d3a5d6ecc1faee04c4945b8138ae38e3d0a1 Mon Sep 17 00:00:00 2001 From: Dmitrii Tikhomirov Date: Fri, 5 Dec 2025 16:53:50 -0800 Subject: [PATCH] added experimental support of Flexible Processes Signed-off-by: Dmitrii Tikhomirov --- experimental/fluent/flex-processes/README.md | 192 +++ experimental/fluent/flex-processes/pom.xml | 75 + .../fluent/processes/Activity.java | 150 ++ .../fluent/processes/FlexibleProcess.java | 95 ++ .../processes/FlexibleProcessCallTask.java | 32 + .../processes/dsl/FlexibleProcessDSL.java | 219 +++ .../internal/FlexibleProcessExecutor.java | 67 + .../FlexibleProcessExecutorBuilder.java | 46 + .../internal/FlexibleProcessManager.java | 85 + ...orkflow.impl.executors.CallableTaskBuilder | 1 + .../fluent/processes/FlexibleProcessTest.java | 1455 +++++++++++++++++ experimental/fluent/pom.xml | 1 + .../fluent/spec/BaseTaskItemListBuilder.java | 2 +- 13 files changed, 2419 insertions(+), 1 deletion(-) create mode 100644 experimental/fluent/flex-processes/README.md create mode 100644 experimental/fluent/flex-processes/pom.xml create mode 100644 experimental/fluent/flex-processes/src/main/java/io/serverlessworkflow/fluent/processes/Activity.java create mode 100644 experimental/fluent/flex-processes/src/main/java/io/serverlessworkflow/fluent/processes/FlexibleProcess.java create mode 100644 experimental/fluent/flex-processes/src/main/java/io/serverlessworkflow/fluent/processes/FlexibleProcessCallTask.java create mode 100644 experimental/fluent/flex-processes/src/main/java/io/serverlessworkflow/fluent/processes/dsl/FlexibleProcessDSL.java create mode 100644 experimental/fluent/flex-processes/src/main/java/io/serverlessworkflow/fluent/processes/internal/FlexibleProcessExecutor.java create mode 100644 experimental/fluent/flex-processes/src/main/java/io/serverlessworkflow/fluent/processes/internal/FlexibleProcessExecutorBuilder.java create mode 100644 experimental/fluent/flex-processes/src/main/java/io/serverlessworkflow/fluent/processes/internal/FlexibleProcessManager.java create mode 100644 experimental/fluent/flex-processes/src/main/resources/META-INF/services/io.serverlessworkflow.impl.executors.CallableTaskBuilder create mode 100644 experimental/fluent/flex-processes/src/test/java/io/serverlessworkflow/fluent/processes/FlexibleProcessTest.java diff --git a/experimental/fluent/flex-processes/README.md b/experimental/fluent/flex-processes/README.md new file mode 100644 index 00000000..924b4b60 --- /dev/null +++ b/experimental/fluent/flex-processes/README.md @@ -0,0 +1,192 @@ +# Flexible Process — Dynamic, Condition-Driven Task Execution for Serverless Workflow + +`FlexibleProcess` is an extension for **Serverless Workflow** that enables execution of a set +of activities where: + +- the **order is not predefined**, +- activities run only when their **entry conditions** are satisfied, +- execution continues until an **exit condition** evaluates to true, +- activities can be **repeatable** or **single-run**, +- a process may contain **many activities** working together like a rule engine. + +This allows modeling dynamic, state-driven workflows not possible in linear step-based systems. + +--- + +## Why Flexible Process? + +Traditional workflow engines follow a static sequence of tasks. +However, real business logic often requires: + +- executing a task only when certain data becomes available, +- skipping tasks dynamically, +- repeating tasks until a business rule is met, +- enabling tasks conditionally, +- mixing multiple independent “microsteps” that coordinate through shared state, + +`FlexibleProcess` provides exactly this capability. + +--- + +## Core Concepts + +### **Activity** + +An `Activity` is a small executable unit consisting of: + +- **task** — the business logic (Java function, service call, etc.) +- **entryCondition** — predicate that decides whether this activity should run +- **isRepeatable** — whether the activity may run more than once +- **postAction** — (optional) callback after execution +- **executed flag** (internal, per-run) + +Example: + +```java +Activity increment = + Activity.builder() + .callTask(consumer(map -> { + int counter = (int) map.get("counter"); + map.put("counter", counter + 1); + }, Map.class)) + .entryCondition(model -> true) + .isRepeatable(true) + .build(); +``` + +# FlexibleProcess + +A **Flexible Process** is a group of activities executed in a loop: all activities check their entry conditions each iteration, those that match execute, loop stops when the `exitCondition` is met or when `maxAttempts` is reached. + +```java +FlexibleProcess process = FlexibleProcess.builder() + .exitCondition(model -> (int) model.asMap().get().get("counter") >= 3) + .activities(increment) + .build(); +``` + +You can attach: +- `onProcessFinished` +- `onException` +- `maxAttempts` + +## Running a Simple Flexible Process + +### 1. Define an activity + +```java +Activity increment = Activity.builder() + .callTask(consumer(map -> { + int value = (int) map.get("value"); + map.put("value", value + 1); + }, Map.class)) + .entryCondition(model -> true) + .isRepeatable(true) + .build(); +``` + +### 2. Define exit condition + +```java +Predicate done = model -> + (int) model.asMap().get().get("value") >= 5; +``` + +### 3. Build the FlexibleProcess + +```java +FlexibleProcess flexible = FlexibleProcess.builder() + .exitCondition(done) + .activities(increment) + .maxAttempts(100) + .build(); +``` + +### 4. Insert into workflow + +```java + Workflow workflow = + FuncWorkflowBuilder.workflow("flex-demo") + .tasks(process("flex_proc", flexible)) + .build(); +``` + +### 5. Execute + +```java +try (WorkflowApplication app = WorkflowApplication.builder().build()) { + Map result = app.workflowDefinition(workflow) + .instance(Map.of("value", 0)) + .start() + .get() + .asMap() + .orElseThrow(); + + System.out.println(result); // prints {value=5} +} +``` + +## Advanced Example: Multi-Activity Dynamic Flow + +```java +Activity doubler = Activity.builder() + .callTask(consumer(map -> { + int v = (int) map.get("value"); + map.put("value", v * 2); + }, Map.class)) + .entryCondition(model -> + ((int) model.asMap().get().get("value")) < 50) + .isRepeatable(true) + .build(); + +Activity subtract = Activity.builder() + .callTask(consumer(map -> { + int v = (int) map.get("value"); + map.put("value", v - 5); + }, Map.class)) + .entryCondition(model -> + ((int) model.asMap().get().get("value")) >= 50) + .isRepeatable(true) + .build(); + +FlexibleProcess process = FlexibleProcess.builder() + .exitCondition(model -> + (int) model.asMap().get().get("value") < 10) + .activities(doubler, subtract) + .build(); +``` + +This process: +- Doubles value until it reaches 50 +- Then repeatedly subtracts 5 +- Stops when result < 10 + +A perfect example of **non-linear, state-driven execution**. + +## Use Cases + +### Rule-Based Decision Logic +- document validation +- scoring & risk models +- conditional enrichment +- fraud detection pipelines + +### Iterative / Loop-Based Processing +- stabilization loops +- retries with state transitions +- multi-step data transformations + +## Execution Model + +```mermaid +flowchart TD + A[Start Flexible Process] --> B[Clone Activities
new Activity instances] + B --> C{Exit Condition Met?} + C -- Yes --> Z[Stop] + C -- No --> D[Evaluate entry conditions for all activities] + D --> E[Execute all matching activities] + E --> F[Mark executed non-repeatable activities] + F --> G{Any activity executed?} + G -- No --> Z[Stop: no further progress possible] + G -- Yes --> C +``` \ No newline at end of file diff --git a/experimental/fluent/flex-processes/pom.xml b/experimental/fluent/flex-processes/pom.xml new file mode 100644 index 00000000..c4365e15 --- /dev/null +++ b/experimental/fluent/flex-processes/pom.xml @@ -0,0 +1,75 @@ + + + 4.0.0 + + io.serverlessworkflow + serverlessworkflow-experimental-fluent + 8.0.0-SNAPSHOT + + + Serverless Workflow :: Experimental :: Fluent :: Flexible Processes + flex-processes + + + + io.serverlessworkflow + serverlessworkflow-experimental-types + + + io.serverlessworkflow + serverlessworkflow-experimental-fluent-func + + + io.serverlessworkflow + serverlessworkflow-experimental-model + + + io.serverlessworkflow + serverlessworkflow-impl-jq + + + io.serverlessworkflow + serverlessworkflow-experimental-lambda + + + org.slf4j + slf4j-simple + test + + + org.junit.jupiter + junit-jupiter-api + test + + + org.mockito + mockito-core + test + + + org.assertj + assertj-core + test + + + + + + + maven-jar-plugin + ${version.jar.plugin} + + + + + test-jar + + + + + + + + \ No newline at end of file diff --git a/experimental/fluent/flex-processes/src/main/java/io/serverlessworkflow/fluent/processes/Activity.java b/experimental/fluent/flex-processes/src/main/java/io/serverlessworkflow/fluent/processes/Activity.java new file mode 100644 index 00000000..063e2eaa --- /dev/null +++ b/experimental/fluent/flex-processes/src/main/java/io/serverlessworkflow/fluent/processes/Activity.java @@ -0,0 +1,150 @@ +/* + * Copyright 2020-Present The Serverless Workflow Specification Authors + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package io.serverlessworkflow.fluent.processes; + +import io.serverlessworkflow.api.types.CallTask; +import io.serverlessworkflow.api.types.Task; +import io.serverlessworkflow.api.types.func.CallJava; +import io.serverlessworkflow.api.types.func.CallTaskJava; +import io.serverlessworkflow.impl.WorkflowModel; +import java.util.Objects; +import java.util.UUID; +import java.util.function.Consumer; +import java.util.function.Predicate; + +public class Activity { + + private final boolean isRepeatable; + private final Task task; + private final Predicate entryCondition; + private final String name; + private Consumer postAction; + private boolean executed; + + Activity( + Task task, + String name, + Predicate entryCondition, + Consumer postAction, + boolean isRepeatable) { + this.task = task; + this.name = name; + this.entryCondition = entryCondition; + this.postAction = postAction; + this.isRepeatable = isRepeatable; + } + + public Task getTask() { + return task; + } + + public Predicate getEntryCondition() { + return entryCondition; + } + + public Consumer getPostAction() { + return postAction; + } + + public String getName() { + return name; + } + + public boolean isRepeatable() { + return isRepeatable; + } + + public boolean isExecuted() { + return executed; + } + + public void setExecuted() { + this.executed = true; + } + + @Override + public boolean equals(Object object) { + if (this == object) return true; + if (object == null || getClass() != object.getClass()) return false; + Activity activity = (Activity) object; + return Objects.equals(task, activity.task) + && Objects.equals(entryCondition, activity.entryCondition) + && Objects.equals(postAction, activity.postAction); + } + + public static ActivityBuilder builder() { + return new ActivityBuilder(); + } + + public static class ActivityBuilder { + private Task task; + private String name; + private Predicate entryCondition; + private Consumer postAction; + private boolean isRepeatable; + + private ActivityBuilder() {} + + public ActivityBuilder callTask(CallTask task) { + this.task = new Task().withCallTask(task); + return this; + } + + public ActivityBuilder callTask(CallJava callJava) { + return callTask(new CallTaskJava(callJava)); + } + + public ActivityBuilder name(String name) { + this.name = name; + return this; + } + + public ActivityBuilder entryCondition(Predicate entryCondition) { + this.entryCondition = entryCondition; + return this; + } + + public ActivityBuilder postAction(Consumer postAction) { + this.postAction = postAction; + return this; + } + + public ActivityBuilder isRepeatable(boolean isRepeatable) { + this.isRepeatable = isRepeatable; + return this; + } + + public Activity build() { + return new Activity( + Objects.requireNonNull(task, "Task must be provided"), + name != null ? name : "activity-" + UUID.randomUUID(), + Objects.requireNonNull(entryCondition, "Entry condition must be provided"), + postAction, + isRepeatable); + } + } + + // Activity keeps track of its own execution state, so we need to create a new instance when used + // In other words, an Activity is a prototype + Activity newInstance() { + return new Activity(this.task, name, this.entryCondition, this.postAction, this.isRepeatable); + } + + @Override + public int hashCode() { + return Objects.hash(task, name, entryCondition, postAction); + } +} diff --git a/experimental/fluent/flex-processes/src/main/java/io/serverlessworkflow/fluent/processes/FlexibleProcess.java b/experimental/fluent/flex-processes/src/main/java/io/serverlessworkflow/fluent/processes/FlexibleProcess.java new file mode 100644 index 00000000..9940e7e3 --- /dev/null +++ b/experimental/fluent/flex-processes/src/main/java/io/serverlessworkflow/fluent/processes/FlexibleProcess.java @@ -0,0 +1,95 @@ +/* + * Copyright 2020-Present The Serverless Workflow Specification Authors + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package io.serverlessworkflow.fluent.processes; + +import io.serverlessworkflow.api.types.Task; +import io.serverlessworkflow.api.types.TaskBase; +import io.serverlessworkflow.impl.WorkflowModel; +import java.util.Objects; +import java.util.function.Predicate; + +public class FlexibleProcess extends TaskBase { + + private Predicate exitCondition; + private Activity[] activities; + + private int maxAttempts = 1024; + + private FlexibleProcess() {} + + public Predicate getExitCondition() { + return exitCondition; + } + + public Activity[] getActivities() { + Activity[] result = new Activity[activities.length]; + for (int i = 0; i < activities.length; i++) { + result[i] = activities[i].newInstance(); + } + return result; + } + + public int getMaxAttempts() { + return maxAttempts; + } + + public Task asTask() { + Task task = new Task(); + task.setCallTask(new FlexibleProcessCallTask(this)); + return task; + } + + public static FlexibleProcessBuilder builder() { + return new FlexibleProcessBuilder(); + } + + public static class FlexibleProcessBuilder { + private Predicate exitCondition; + private Activity[] activities; + private Integer maxAttempts; + + public FlexibleProcessBuilder exitCondition(Predicate exitCondition) { + this.exitCondition = exitCondition; + return this; + } + + public FlexibleProcessBuilder activities(Activity... activities) { + this.activities = activities; + return this; + } + + public FlexibleProcessBuilder maxAttempts(Integer maxAttempts) { + Objects.requireNonNull(maxAttempts, "maxAttempts cannot be null"); + if (maxAttempts < 1) { + throw new IllegalArgumentException("maxAttempts must be greater than 0"); + } + this.maxAttempts = maxAttempts; + return this; + } + + public FlexibleProcess build() { + FlexibleProcess flexibleProcess = new FlexibleProcess(); + flexibleProcess.exitCondition = + Objects.requireNonNull(this.exitCondition, "Exit condition must be provided"); + flexibleProcess.activities = + Objects.requireNonNull(this.activities, "Activities must be provided"); + if (this.maxAttempts != null) { + flexibleProcess.maxAttempts = this.maxAttempts; + } + return flexibleProcess; + } + } +} diff --git a/experimental/fluent/flex-processes/src/main/java/io/serverlessworkflow/fluent/processes/FlexibleProcessCallTask.java b/experimental/fluent/flex-processes/src/main/java/io/serverlessworkflow/fluent/processes/FlexibleProcessCallTask.java new file mode 100644 index 00000000..9a1e75b7 --- /dev/null +++ b/experimental/fluent/flex-processes/src/main/java/io/serverlessworkflow/fluent/processes/FlexibleProcessCallTask.java @@ -0,0 +1,32 @@ +/* + * Copyright 2020-Present The Serverless Workflow Specification Authors + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package io.serverlessworkflow.fluent.processes; + +import io.serverlessworkflow.api.types.CallTask; + +public class FlexibleProcessCallTask extends CallTask { + + private final FlexibleProcess process; + + FlexibleProcessCallTask(FlexibleProcess process) { + this.process = process; + } + + @Override + public Object get() { + return process; + } +} diff --git a/experimental/fluent/flex-processes/src/main/java/io/serverlessworkflow/fluent/processes/dsl/FlexibleProcessDSL.java b/experimental/fluent/flex-processes/src/main/java/io/serverlessworkflow/fluent/processes/dsl/FlexibleProcessDSL.java new file mode 100644 index 00000000..9bdc0d2a --- /dev/null +++ b/experimental/fluent/flex-processes/src/main/java/io/serverlessworkflow/fluent/processes/dsl/FlexibleProcessDSL.java @@ -0,0 +1,219 @@ +/* + * Copyright 2020-Present The Serverless Workflow Specification Authors + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package io.serverlessworkflow.fluent.processes.dsl; + +import io.serverlessworkflow.api.types.TaskItem; +import io.serverlessworkflow.fluent.func.configurers.FuncTaskConfigurer; +import io.serverlessworkflow.fluent.processes.Activity; +import io.serverlessworkflow.fluent.processes.FlexibleProcess; +import java.util.Objects; +import java.util.UUID; +import java.util.function.Consumer; + +/** + * DSL helper class providing fluent API methods for creating and configuring FlexibleProcess and + * Activity instances in workflows. + * + *

This class offers convenient static methods to build workflow components with minimal + * boilerplate code, supporting both explicit and lambda-based configuration. + */ +public class FlexibleProcessDSL { + + /** + * Creates a FuncTaskConfigurer for a FlexibleProcess with an auto-generated unique name. + * + *

This is a convenience method that generates a random UUID-based name for the process task. + * Use this when the task name is not important for your workflow logic. + * + *

Example usage: + * + *

{@code
+   * FlexibleProcess myProcess = FlexibleProcess.builder()
+   *     .exitCondition(model -> ...)
+   *     .activities(activity1, activity2)
+   *     .build();
+   *
+   * Workflow workflow = FuncWorkflowBuilder.workflow("my-workflow")
+   *     .tasks(process(myProcess))
+   *     .build();
+   * }
+ * + * @param process the FlexibleProcess to be executed as a workflow task + * @return a FuncTaskConfigurer that adds the process as a task with a generated name + * @throws NullPointerException if process is null + * @see #process(String, FlexibleProcess) + */ + public static FuncTaskConfigurer process(FlexibleProcess process) { + return process(UUID.randomUUID().toString(), process); + } + + /** + * Creates a FuncTaskConfigurer for a FlexibleProcess with a specified name. + * + *

This method allows you to assign a meaningful name to the process task, which can be useful + * for debugging, monitoring, or when you need to reference this specific task in your workflow + * logic. + * + *

Example usage: + * + *

{@code
+   * FlexibleProcess validationProcess = FlexibleProcess.builder()
+   *     .exitCondition(model -> (boolean) model.asMap().get().get("validated"))
+   *     .activities(validateActivity)
+   *     .build();
+   *
+   * Workflow workflow = FuncWorkflowBuilder.workflow("order-processing")
+   *     .tasks(
+   *         process("validate-order", validationProcess),
+   *         process("fulfill-order", fulfillmentProcess)
+   *     )
+   *     .build();
+   * }
+ * + * @param name the name to assign to this process task (must not be null) + * @param process the FlexibleProcess to be executed as a workflow task + * @return a FuncTaskConfigurer that adds the named process as a task + * @throws NullPointerException if either name or process is null + */ + public static FuncTaskConfigurer process(String name, FlexibleProcess process) { + Objects.requireNonNull(process, "FlexibleProcess cannot be null"); + return list -> list.addTaskItem(new TaskItem(name, process.asTask())); + } + + /** + * Creates a FuncTaskConfigurer for a FlexibleProcess using a builder lambda with an + * auto-generated name. + * + *

This method provides a fluent inline way to configure a FlexibleProcess without explicitly + * calling the builder pattern. The builder instance is created automatically and passed to the + * consumer for configuration. + * + *

Example usage: + * + *

{@code
+   * Workflow workflow = FuncWorkflowBuilder.workflow("my-workflow")
+   *     .tasks(
+   *         process(builder -> builder
+   *             .exitCondition(model -> (int) model.asMap().get().get("count") >= 5)
+   *             .activities(incrementActivity)
+   *             .maxAttempts(10)
+   *         )
+   *     )
+   *     .build();
+   * }
+ * + * @param builder a consumer that configures the FlexibleProcess.FlexibleProcessBuilder + * @return a FuncTaskConfigurer that adds the configured process as a task with a generated name + * @throws NullPointerException if builder is null + * @see #process(String, Consumer) + */ + public static FuncTaskConfigurer process( + Consumer builder) { + FlexibleProcess.FlexibleProcessBuilder builderInstance = FlexibleProcess.builder(); + builder.accept(builderInstance); + return process(builderInstance.build()); + } + + /** + * Creates a FuncTaskConfigurer for a FlexibleProcess using a builder lambda with a specified + * name. + * + *

This method combines the benefits of named tasks with inline builder configuration, allowing + * you to create and configure a FlexibleProcess in a single fluent expression. + * + *

Example usage: + * + *

{@code
+   * Workflow workflow = FuncWorkflowBuilder.workflow("order-processing")
+   *     .tasks(
+   *         process("validation", builder -> builder
+   *             .exitCondition(model -> (boolean) model.asMap().get().get("validated"))
+   *             .activities(validationActivity)
+   *             .onProcessFinished((status, model) ->
+   *                 System.out.println("Validation finished with status: " + status))
+   *         ),
+   *         process("fulfillment", builder -> builder
+   *             .exitCondition(model -> (boolean) model.asMap().get().get("fulfilled"))
+   *             .activities(fulfillmentActivity)
+   *         )
+   *     )
+   *     .build();
+   * }
+ * + * @param name the name to assign to this process task (must not be null) + * @param builder a consumer that configures the FlexibleProcess.FlexibleProcessBuilder + * @return a FuncTaskConfigurer that adds the configured named process as a task + * @throws NullPointerException if either name or builder is null + * @see #process(Consumer) + */ + public static FuncTaskConfigurer process( + String name, Consumer builder) { + FlexibleProcess.FlexibleProcessBuilder builderInstance = FlexibleProcess.builder(); + builder.accept(builderInstance); + return process(name, builderInstance.build()); + } + + /** + * Creates an Activity using a builder lambda for inline configuration. + * + *

This method provides a fluent way to define activities without explicitly using the + * Activity.builder() pattern. The builder instance is created and configured through the provided + * consumer lambda. + * + *

Example usage: + * + *

{@code
+   * Activity incrementer = activity(builder -> builder
+   *     .name("increment-counter")
+   *     .callTask(consumer(map -> {
+   *         int value = (int) map.get("counter");
+   *         map.put("counter", value + 1);
+   *     }, Map.class))
+   *     .entryCondition(model -> true)
+   *     .isRepeatable(true)
+   * );
+   *
+   * // Use in a FlexibleProcess
+   * FlexibleProcess process = FlexibleProcess.builder()
+   *     .exitCondition(model -> (int) model.asMap().get().get("counter") >= 10)
+   *     .activities(incrementer)
+   *     .build();
+   * }
+ * + *

This is particularly useful when defining activities inline within a FlexibleProcess: + * + *

{@code
+   * FlexibleProcess process = FlexibleProcess.builder()
+   *     .exitCondition(done)
+   *     .activities(
+   *         activity(b -> b.callTask(task1).entryCondition(cond1).isRepeatable(true)),
+   *         activity(b -> b.callTask(task2).entryCondition(cond2).isRepeatable(false))
+   *     )
+   *     .build();
+   * }
+ * + * @param builder a consumer that configures the Activity.ActivityBuilder + * @return a fully configured Activity instance + * @throws NullPointerException if builder is null + * @throws NullPointerException if required fields (task, entryCondition) are not set in the + * builder + */ + public static Activity activity(Consumer builder) { + Activity.ActivityBuilder builderInstance = Activity.builder(); + builder.accept(builderInstance); + return builderInstance.build(); + } +} diff --git a/experimental/fluent/flex-processes/src/main/java/io/serverlessworkflow/fluent/processes/internal/FlexibleProcessExecutor.java b/experimental/fluent/flex-processes/src/main/java/io/serverlessworkflow/fluent/processes/internal/FlexibleProcessExecutor.java new file mode 100644 index 00000000..dbb36c00 --- /dev/null +++ b/experimental/fluent/flex-processes/src/main/java/io/serverlessworkflow/fluent/processes/internal/FlexibleProcessExecutor.java @@ -0,0 +1,67 @@ +/* + * Copyright 2020-Present The Serverless Workflow Specification Authors + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package io.serverlessworkflow.fluent.processes.internal; + +import io.serverlessworkflow.fluent.processes.Activity; +import io.serverlessworkflow.fluent.processes.FlexibleProcess; +import io.serverlessworkflow.impl.TaskContext; +import io.serverlessworkflow.impl.WorkflowContext; +import io.serverlessworkflow.impl.WorkflowDefinition; +import io.serverlessworkflow.impl.WorkflowModel; +import io.serverlessworkflow.impl.WorkflowMutablePosition; +import io.serverlessworkflow.impl.executors.CallableTask; +import io.serverlessworkflow.impl.executors.TaskExecutor; +import io.serverlessworkflow.impl.executors.TaskExecutorBuilder; +import io.serverlessworkflow.impl.executors.TaskExecutorFactory; +import java.util.HashMap; +import java.util.Map; +import java.util.Optional; +import java.util.UUID; +import java.util.concurrent.CompletableFuture; + +public class FlexibleProcessExecutor implements CallableTask { + + private final WorkflowDefinition definition; + private final FlexibleProcess flexibleProcess; + + public FlexibleProcessExecutor(FlexibleProcess flexibleProcess, WorkflowDefinition definition) { + this.flexibleProcess = flexibleProcess; + this.definition = definition; + } + + @Override + public CompletableFuture apply( + WorkflowContext workflowContext, TaskContext taskContext, WorkflowModel input) { + return CompletableFuture.supplyAsync( + () -> { + WorkflowMutablePosition position = definition.application().positionFactory().get(); + position.addProperty(UUID.randomUUID().toString()); + TaskExecutorFactory factory = definition.application().taskFactory(); + Map> executors = new HashMap<>(); + + for (Activity activity : flexibleProcess.getActivities()) { + TaskExecutorBuilder builder = + factory.getTaskExecutor(position, activity.getTask(), definition); + TaskExecutor executor = builder.build(); + executors.put(activity, executor); + } + + FlexibleProcessManager manager = new FlexibleProcessManager(flexibleProcess, executors); + manager.run(workflowContext, Optional.of(taskContext), input); + return input; + }); + } +} diff --git a/experimental/fluent/flex-processes/src/main/java/io/serverlessworkflow/fluent/processes/internal/FlexibleProcessExecutorBuilder.java b/experimental/fluent/flex-processes/src/main/java/io/serverlessworkflow/fluent/processes/internal/FlexibleProcessExecutorBuilder.java new file mode 100644 index 00000000..362b699e --- /dev/null +++ b/experimental/fluent/flex-processes/src/main/java/io/serverlessworkflow/fluent/processes/internal/FlexibleProcessExecutorBuilder.java @@ -0,0 +1,46 @@ +/* + * Copyright 2020-Present The Serverless Workflow Specification Authors + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package io.serverlessworkflow.fluent.processes.internal; + +import io.serverlessworkflow.api.types.TaskBase; +import io.serverlessworkflow.fluent.processes.FlexibleProcess; +import io.serverlessworkflow.impl.WorkflowDefinition; +import io.serverlessworkflow.impl.WorkflowMutablePosition; +import io.serverlessworkflow.impl.executors.CallableTask; +import io.serverlessworkflow.impl.executors.CallableTaskBuilder; + +public class FlexibleProcessExecutorBuilder implements CallableTaskBuilder { + + private FlexibleProcess task; + private WorkflowDefinition definition; + + @Override + public void init( + FlexibleProcess task, WorkflowDefinition definition, WorkflowMutablePosition position) { + this.task = task; + this.definition = definition; + } + + @Override + public CallableTask build() { + return new FlexibleProcessExecutor(task, definition); + } + + @Override + public boolean accept(Class clazz) { + return clazz.equals(FlexibleProcess.class); + } +} diff --git a/experimental/fluent/flex-processes/src/main/java/io/serverlessworkflow/fluent/processes/internal/FlexibleProcessManager.java b/experimental/fluent/flex-processes/src/main/java/io/serverlessworkflow/fluent/processes/internal/FlexibleProcessManager.java new file mode 100644 index 00000000..a7a10bb5 --- /dev/null +++ b/experimental/fluent/flex-processes/src/main/java/io/serverlessworkflow/fluent/processes/internal/FlexibleProcessManager.java @@ -0,0 +1,85 @@ +/* + * Copyright 2020-Present The Serverless Workflow Specification Authors + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package io.serverlessworkflow.fluent.processes.internal; + +import io.serverlessworkflow.fluent.processes.Activity; +import io.serverlessworkflow.fluent.processes.FlexibleProcess; +import io.serverlessworkflow.impl.TaskContext; +import io.serverlessworkflow.impl.WorkflowContext; +import io.serverlessworkflow.impl.WorkflowModel; +import io.serverlessworkflow.impl.executors.TaskExecutor; +import java.util.LinkedList; +import java.util.Map; +import java.util.Optional; +import java.util.Queue; +import java.util.stream.Collectors; + +public class FlexibleProcessManager { + + private final Map> executors; + private final FlexibleProcess flexibleProcess; + + public FlexibleProcessManager( + FlexibleProcess flexibleProcess, Map> executors) { + this.flexibleProcess = flexibleProcess; + this.executors = executors; + } + + public void run( + WorkflowContext workflowContext, Optional parentContext, WorkflowModel input) { + boolean exit = flexibleProcess.getExitCondition().test(input); + int counter = flexibleProcess.getMaxAttempts(); + while (!exit) { + Map> availableExecutors = getExecutors(input); + if (availableExecutors.isEmpty()) { + return; + } + Queue>> executorQueue = + new LinkedList<>(availableExecutors.entrySet()); + while (!executorQueue.isEmpty()) { + Map.Entry> entry = executorQueue.poll(); + Activity activity = entry.getKey(); + TaskExecutor executor = entry.getValue(); + try { + executor + .apply(workflowContext, parentContext, input) + .join(); // blocking, because we run flexible process one by one + if (activity.getPostAction() != null) { + activity.getPostAction().accept(input); + } + activity.setExecuted(); + } catch (Exception e) { + throw new RuntimeException("Error executing activity: " + activity.getName(), e); + } + exit = flexibleProcess.getExitCondition().test(input); + if (exit) { + break; + } + } + counter--; + if (counter <= 0) { + break; + } + } + } + + private Map> getExecutors(WorkflowModel input) { + return executors.entrySet().stream() + .filter(activity -> activity.getKey().isRepeatable() || !activity.getKey().isExecuted()) + .filter(e -> e.getKey().getEntryCondition().test(input)) + .collect(Collectors.toMap(Map.Entry::getKey, Map.Entry::getValue)); + } +} diff --git a/experimental/fluent/flex-processes/src/main/resources/META-INF/services/io.serverlessworkflow.impl.executors.CallableTaskBuilder b/experimental/fluent/flex-processes/src/main/resources/META-INF/services/io.serverlessworkflow.impl.executors.CallableTaskBuilder new file mode 100644 index 00000000..25ff58a1 --- /dev/null +++ b/experimental/fluent/flex-processes/src/main/resources/META-INF/services/io.serverlessworkflow.impl.executors.CallableTaskBuilder @@ -0,0 +1 @@ +io.serverlessworkflow.fluent.processes.internal.FlexibleProcessExecutorBuilder diff --git a/experimental/fluent/flex-processes/src/test/java/io/serverlessworkflow/fluent/processes/FlexibleProcessTest.java b/experimental/fluent/flex-processes/src/test/java/io/serverlessworkflow/fluent/processes/FlexibleProcessTest.java new file mode 100644 index 00000000..894ef1e8 --- /dev/null +++ b/experimental/fluent/flex-processes/src/test/java/io/serverlessworkflow/fluent/processes/FlexibleProcessTest.java @@ -0,0 +1,1455 @@ +/* + * Copyright 2020-Present The Serverless Workflow Specification Authors + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package io.serverlessworkflow.fluent.processes; + +import static io.serverlessworkflow.api.types.func.CallJava.consumer; +import static io.serverlessworkflow.fluent.processes.dsl.FlexibleProcessDSL.activity; +import static io.serverlessworkflow.fluent.processes.dsl.FlexibleProcessDSL.process; +import static org.junit.jupiter.api.Assertions.assertEquals; +import static org.junit.jupiter.api.Assertions.assertFalse; +import static org.junit.jupiter.api.Assertions.assertTrue; + +import io.serverlessworkflow.api.types.Document; +import io.serverlessworkflow.api.types.TaskItem; +import io.serverlessworkflow.api.types.Workflow; +import io.serverlessworkflow.fluent.func.FuncWorkflowBuilder; +import io.serverlessworkflow.impl.WorkflowApplication; +import io.serverlessworkflow.impl.WorkflowModel; +import java.util.List; +import java.util.Map; +import java.util.concurrent.ExecutionException; +import java.util.concurrent.atomic.AtomicInteger; +import java.util.function.Predicate; +import org.junit.jupiter.api.Test; + +public class FlexibleProcessTest { + + @Test + void testJavaFunction() throws InterruptedException, ExecutionException { + try (WorkflowApplication app = WorkflowApplication.builder().build()) { + + Workflow workflow = + FuncWorkflowBuilder.workflow("testJavaCall") + .tasks( + process( + pb -> + pb.exitCondition( + workflowModel -> { + Integer counter = + (Integer) workflowModel.asMap().get().get("counter"); + return counter >= 3; + }) + .activities( + activity( + ab -> + ab.callTask( + consumer( + map -> { + Integer counter = + (Integer) map.get("counter"); + counter++; + map.put("counter", counter); + }, + Map.class)) + .entryCondition(wm -> true) + .isRepeatable(true))))) + .build(); + + Map result = + app.workflowDefinition(workflow) + .instance(Map.of("counter", 0)) + .start() + .get() + .asMap() + .orElseThrow(); + + assertEquals(3, result.get("counter")); + } + } + + @Test + public void testFluentWorkflow() throws InterruptedException, ExecutionException { + FlexibleProcess flexibleProcess = + FlexibleProcess.builder() + .exitCondition( + workflowModel -> { + Integer counter = (Integer) workflowModel.asMap().get().get("counter"); + return counter >= 3; + }) + .activities( + activity( + builder -> + builder + .callTask( + consumer( + map -> { + Integer counter = (Integer) map.get("counter"); + counter++; + map.put("counter", counter); + }, + Map.class)) + .entryCondition(workflowModel -> true) + .isRepeatable(true))) + .build(); + + Workflow workflow = + FuncWorkflowBuilder.workflow("step-emit-export") + .tasks( + process("flexible_process", flexibleProcess), + process("flexible_process", flexibleProcess)) + .build(); + + try (WorkflowApplication app = WorkflowApplication.builder().build()) { + Map result = + app.workflowDefinition(workflow) + .instance(Map.of("counter", 0)) + .start() + .get() + .asMap() + .orElseThrow(); + + assertEquals(3, result.get("counter")); + } + } + + @Test + void testMaxIterations() throws InterruptedException, ExecutionException { + AtomicInteger attemptCounter1 = new AtomicInteger(0); + AtomicInteger attemptCounter2 = new AtomicInteger(0); + + Activity incrementer1 = + Activity.builder() + .callTask(consumer(map -> attemptCounter1.incrementAndGet(), Map.class)) + .entryCondition(workflowModel -> true) + .isRepeatable(true) + .build(); + + Activity incrementer2 = + Activity.builder() + .callTask(consumer(map -> attemptCounter2.incrementAndGet(), Map.class)) + .entryCondition(workflowModel -> true) + .isRepeatable(true) + .build(); + + try (WorkflowApplication app = WorkflowApplication.builder().build()) { + Workflow workflow = + new Workflow() + .withDocument( + new Document() + .withNamespace("test") + .withName("testMultipleActivities") + .withVersion("1.0")) + .withDo( + List.of( + new TaskItem( + "flexible_process", + FlexibleProcess.builder() + .exitCondition(workflowModel -> false) + .activities(incrementer1, incrementer2) + .maxAttempts(5) + .build() + .asTask()))); + + app.workflowDefinition(workflow).instance(Map.of()).start().get().asMap().orElseThrow(); + } + assertEquals(5, attemptCounter1.get()); + assertEquals(5, attemptCounter2.get()); + } + + @Test + void testConditionalActivityExecution() throws InterruptedException, ExecutionException { + Activity evenIncrementer = + Activity.builder() + .callTask( + consumer( + map -> { + Integer counter = (Integer) map.get("counter"); + map.put("counter", counter + 2); + }, + Map.class)) + .entryCondition( + workflowModel -> { + Integer counter = (Integer) workflowModel.asMap().get().get("counter"); + return counter % 2 == 0; + }) + .isRepeatable(true) + .build(); + + Activity oddIncrementer = + Activity.builder() + .callTask( + consumer( + map -> { + Integer counter = (Integer) map.get("counter"); + map.put("counter", counter + 1); + }, + Map.class)) + .entryCondition( + workflowModel -> { + Integer counter = (Integer) workflowModel.asMap().get().get("counter"); + return counter % 2 != 0; + }) + .isRepeatable(true) + .build(); + + Predicate exceeds10 = + workflowModel -> { + Integer counter = (Integer) workflowModel.asMap().get().get("counter"); + return counter >= 10; + }; + + try (WorkflowApplication app = WorkflowApplication.builder().build()) { + Workflow workflow = + new Workflow() + .withDocument( + new Document() + .withNamespace("test") + .withName("testConditional") + .withVersion("1.0")) + .withDo( + List.of( + new TaskItem( + "flexible_process", + FlexibleProcess.builder() + .exitCondition(exceeds10) + .activities(evenIncrementer, oddIncrementer) + .build() + .asTask()))); + + Map result = + app.workflowDefinition(workflow) + .instance(Map.of("counter", 0)) + .start() + .get() + .asMap() + .orElseThrow(); + + assertTrue((Integer) result.get("counter") >= 10); + } + } + + @Test + void testWorkflowWithMultipleFields() throws InterruptedException, ExecutionException { + Activity calculator = + Activity.builder() + .callTask( + consumer( + map -> { + Integer a = (Integer) map.get("a"); + Integer b = (Integer) map.get("b"); + map.put("sum", a + b); + map.put("a", a + 1); + }, + Map.class)) + .entryCondition(workflowModel -> true) + .isRepeatable(true) + .build(); + + Predicate sumExceeds20 = + workflowModel -> { + Map data = workflowModel.asMap().get(); + Integer sum = (Integer) data.getOrDefault("sum", 0); + return sum > 20; + }; + + try (WorkflowApplication app = WorkflowApplication.builder().build()) { + Workflow workflow = + new Workflow() + .withDocument( + new Document() + .withNamespace("test") + .withName("testMultipleFields") + .withVersion("1.0")) + .withDo( + List.of( + new TaskItem( + "flexible_process", + FlexibleProcess.builder() + .exitCondition(sumExceeds20) + .activities(calculator) + .build() + .asTask()))); + + Map result = + app.workflowDefinition(workflow) + .instance(Map.of("a", 5, "b", 10)) + .start() + .get() + .asMap() + .orElseThrow(); + + assertTrue((Integer) result.get("sum") > 20); + assertTrue((Integer) result.get("a") > 5); + assertEquals(10, result.get("b")); + } + } + + @Test + void testImmediateExitCondition() throws InterruptedException, ExecutionException { + Activity shouldNotRun = + Activity.builder() + .callTask( + consumer( + map -> { + map.put("executed", true); + }, + Map.class)) + .entryCondition(workflowModel -> true) + .isRepeatable(true) + .build(); + + Predicate alwaysTrue = workflowModel -> true; + + try (WorkflowApplication app = WorkflowApplication.builder().build()) { + Workflow workflow = + new Workflow() + .withDocument( + new Document() + .withNamespace("test") + .withName("testImmediateExit") + .withVersion("1.0")) + .withDo( + List.of( + new TaskItem( + "flexible_process", + FlexibleProcess.builder() + .exitCondition(alwaysTrue) + .activities(shouldNotRun) + .build() + .asTask()))); + + Map result = + app.workflowDefinition(workflow) + .instance(Map.of("counter", 0)) + .start() + .get() + .asMap() + .orElseThrow(); + + assertFalse(result.containsKey("executed")); + assertEquals(0, result.get("counter")); + } + } + + @Test + void testChainedFlexibleProcesses() throws InterruptedException, ExecutionException { + Activity multiplier = + Activity.builder() + .callTask( + consumer( + map -> { + Integer value = (Integer) map.get("value"); + map.put("value", value * 2); + }, + Map.class)) + .entryCondition(workflowModel -> true) + .isRepeatable(true) + .build(); + + Activity subtractor = + Activity.builder() + .callTask( + consumer( + map -> { + Integer value = (Integer) map.get("value"); + map.put("value", value - 5); + }, + Map.class)) + .entryCondition(workflowModel -> true) + .isRepeatable(true) + .build(); + + Predicate exceeds50 = + workflowModel -> { + Integer value = (Integer) workflowModel.asMap().get().get("value"); + return value >= 50; + }; + + Predicate below10 = + workflowModel -> { + Integer value = (Integer) workflowModel.asMap().get().get("value"); + return value < 10; + }; + + FlexibleProcess growthProcess = + FlexibleProcess.builder().exitCondition(exceeds50).activities(multiplier).build(); + + FlexibleProcess shrinkProcess = + FlexibleProcess.builder().exitCondition(below10).activities(subtractor).build(); + + Workflow workflow = + FuncWorkflowBuilder.workflow("chained-processes") + .tasks(process("growth", growthProcess), process("shrink", shrinkProcess)) + .build(); + + try (WorkflowApplication app = WorkflowApplication.builder().build()) { + Map result = + app.workflowDefinition(workflow) + .instance(Map.of("value", 5)) + .start() + .get() + .asMap() + .orElseThrow(); + + assertTrue((Integer) result.get("value") < 10); + } + } + + @Test + void testNoActivitiesMatch() throws InterruptedException, ExecutionException { + Activity neverRuns = + Activity.builder() + .callTask( + consumer( + map -> { + map.put("ran", true); + }, + Map.class)) + .entryCondition(workflowModel -> false) // Never matches + .isRepeatable(true) + .build(); + + Predicate maxIterations = + workflowModel -> { + Integer iterations = (Integer) workflowModel.asMap().get().getOrDefault("iterations", 0); + return iterations >= 5; + }; + + Activity iterationCounter = + Activity.builder() + .callTask( + consumer( + map -> { + Integer iterations = (Integer) map.getOrDefault("iterations", 0); + map.put("iterations", iterations + 1); + }, + Map.class)) + .entryCondition(workflowModel -> true) + .isRepeatable(true) + .build(); + + try (WorkflowApplication app = WorkflowApplication.builder().build()) { + Workflow workflow = + new Workflow() + .withDocument( + new Document().withNamespace("test").withName("testNoMatch").withVersion("1.0")) + .withDo( + List.of( + new TaskItem( + "flexible_process", + FlexibleProcess.builder() + .exitCondition(maxIterations) + .activities(neverRuns, iterationCounter) + .build() + .asTask()))); + + Map result = + app.workflowDefinition(workflow).instance(Map.of()).start().get().asMap().orElseThrow(); + + assertFalse(result.containsKey("ran")); + assertEquals(5, result.get("iterations")); + } + } + + @Test + void testComplexBusinessLogic() throws InterruptedException, ExecutionException { + Activity validator = + Activity.builder() + .callTask( + consumer( + map -> { + Integer score = (Integer) map.get("score"); + if (score < 0) { + map.put("score", 0); + } + map.put("validated", true); + }, + Map.class)) + .entryCondition( + workflowModel -> { + return !workflowModel.asMap().get().containsKey("validated"); + }) + .isRepeatable(false) + .build(); + + Activity scoreBooster = + Activity.builder() + .callTask( + consumer( + map -> { + Integer score = (Integer) map.get("score"); + Integer boost = (Integer) map.getOrDefault("boost", 5); + map.put("score", score + boost); + }, + Map.class)) + .entryCondition( + workflowModel -> { + Boolean validated = (Boolean) workflowModel.asMap().get().get("validated"); + return validated != null && validated; + }) + .isRepeatable(true) + .build(); + + Predicate targetReached = + workflowModel -> { + Integer score = (Integer) workflowModel.asMap().get().get("score"); + Integer target = (Integer) workflowModel.asMap().get().get("target"); + return score >= target; + }; + + try (WorkflowApplication app = WorkflowApplication.builder().build()) { + Workflow workflow = + new Workflow() + .withDocument( + new Document() + .withNamespace("test") + .withName("testBusinessLogic") + .withVersion("1.0")) + .withDo( + List.of( + new TaskItem( + "flexible_process", + FlexibleProcess.builder() + .exitCondition(targetReached) + .activities(validator, scoreBooster) + .build() + .asTask()))); + + Map result = + app.workflowDefinition(workflow) + .instance(Map.of("score", -10, "target", 30, "boost", 7)) + .start() + .get() + .asMap() + .orElseThrow(); + + assertEquals(true, result.get("validated")); + assertTrue((Integer) result.get("score") >= 30); + } + } + + @Test + void testLargeNumberOfActivities() throws InterruptedException, ExecutionException { + Activity[] activities = new Activity[10]; + for (int i = 0; i < activities.length; i++) { + final int index = i; + activities[i] = + Activity.builder() + .callTask( + consumer( + map -> { + Integer value = (Integer) map.getOrDefault("value", 0); + map.put("value", value + index + 1); + }, + Map.class)) + .entryCondition(workflowModel -> true) + .isRepeatable(true) + .build(); + } + + Predicate valueAtLeast200 = + workflowModel -> { + Integer value = (Integer) workflowModel.asMap().get().getOrDefault("value", 0); + return value >= 200; + }; + + try (WorkflowApplication app = WorkflowApplication.builder().build()) { + Workflow workflow = + new Workflow() + .withDocument( + new Document() + .withNamespace("test") + .withName("testLargeNumberOfActivities") + .withVersion("1.0")) + .withDo( + List.of( + new TaskItem( + "flexible_process", + FlexibleProcess.builder() + .exitCondition(valueAtLeast200) + .activities(activities) + .build() + .asTask()))); + + Map result = + app.workflowDefinition(workflow) + .instance(Map.of("value", 0)) + .start() + .get() + .asMap() + .orElseThrow(); + + Integer value = (Integer) result.get("value"); + assertTrue(value >= 200, "Expected value >= 200, got " + value); + } + } + + @Test + void testStageDrivenActivitiesFlow() throws InterruptedException, ExecutionException { + Activity[] activities = new Activity[5]; + for (int i = 0; i < activities.length; i++) { + final int index = i; + activities[i] = + Activity.builder() + .callTask( + consumer( + map -> { + Integer stage = (Integer) map.getOrDefault("stage", 0); + map.put("stage", stage + 1); + String executed = (String) map.getOrDefault("executed", ""); + map.put("executed", executed + index); + }, + Map.class)) + .entryCondition( + workflowModel -> { + Integer stage = (Integer) workflowModel.asMap().get().getOrDefault("stage", 0); + return stage == index; + }) + .isRepeatable(true) + .build(); + } + + Predicate allStagesCompleted = + workflowModel -> { + Integer stage = (Integer) workflowModel.asMap().get().getOrDefault("stage", 0); + return stage >= 5; + }; + + try (WorkflowApplication app = WorkflowApplication.builder().build()) { + Workflow workflow = + new Workflow() + .withDocument( + new Document() + .withNamespace("test") + .withName("testStageDrivenActivitiesFlow") + .withVersion("1.0")) + .withDo( + List.of( + new TaskItem( + "flexible_process", + FlexibleProcess.builder() + .exitCondition(allStagesCompleted) + .activities(activities) + .build() + .asTask()))); + + Map result = + app.workflowDefinition(workflow) + .instance(Map.of("stage", 0)) + .start() + .get() + .asMap() + .orElseThrow(); + + Integer finalStage = (Integer) result.get("stage"); + String executed = (String) result.get("executed"); + + assertEquals(5, finalStage); + assertEquals("01234", executed); + } + } + + @Test + void testMultipleRepeatableActivitiesWithMaxAttempts() + throws InterruptedException, ExecutionException { + + AtomicInteger[] attemptCounters = new AtomicInteger[3]; + for (int i = 0; i < attemptCounters.length; i++) { + attemptCounters[i] = new AtomicInteger(0); + } + + Activity[] activities = new Activity[attemptCounters.length]; + for (int i = 0; i < activities.length; i++) { + final AtomicInteger counter = attemptCounters[i]; + activities[i] = + Activity.builder() + .callTask(consumer(map -> counter.incrementAndGet(), Map.class)) + .entryCondition(workflowModel -> true) + .isRepeatable(true) + .build(); + } + + try (WorkflowApplication app = WorkflowApplication.builder().build()) { + Workflow workflow = + new Workflow() + .withDocument( + new Document() + .withNamespace("test") + .withName("testMultipleRepeatableActivitiesWithMaxAttempts") + .withVersion("1.0")) + .withDo( + List.of( + new TaskItem( + "flexible_process", + FlexibleProcess.builder() + .exitCondition(workflowModel -> false) + .activities(activities) + .maxAttempts(7) + .build() + .asTask()))); + + app.workflowDefinition(workflow).instance(Map.of()).start().get().asMap().orElseThrow(); + } + + for (int i = 0; i < attemptCounters.length; i++) { + assertEquals( + 7, + attemptCounters[i].get(), + "Activity " + i + " expected 7 attempts, got " + attemptCounters[i].get()); + } + } + + @Test + void testDependentActivitiesWithFlagsChain() throws InterruptedException, ExecutionException { + Activity loadCustomer = + Activity.builder() + .callTask( + consumer( + map -> { + map.put("customerLoaded", true); + map.putIfAbsent("balance", 0); + }, + Map.class)) + .entryCondition( + workflowModel -> { + Map data = workflowModel.asMap().get(); + return data.get("customerLoaded") == null; + }) + .isRepeatable(true) + .build(); + + Activity applyWelcomeBonus = + Activity.builder() + .callTask( + consumer( + map -> { + Integer balance = (Integer) map.getOrDefault("balance", 0); + map.put("balance", balance + 100); + map.put("bonusApplied", true); + }, + Map.class)) + .entryCondition( + workflowModel -> { + Map data = workflowModel.asMap().get(); + Boolean customerLoaded = (Boolean) data.get("customerLoaded"); + Boolean bonusApplied = (Boolean) data.get("bonusApplied"); + return Boolean.TRUE.equals(customerLoaded) && bonusApplied == null; + }) + .isRepeatable(true) + .build(); + + Activity chargeFee = + Activity.builder() + .callTask( + consumer( + map -> { + Integer balance = (Integer) map.getOrDefault("balance", 0); + map.put("balance", balance - 10); + map.put("feeCharged", true); + }, + Map.class)) + .entryCondition( + workflowModel -> { + Map data = workflowModel.asMap().get(); + Boolean bonusApplied = (Boolean) data.get("bonusApplied"); + Boolean feeCharged = (Boolean) data.get("feeCharged"); + return Boolean.TRUE.equals(bonusApplied) && feeCharged == null; + }) + .isRepeatable(true) + .build(); + + Predicate accountingFinished = + workflowModel -> { + Map data = workflowModel.asMap().get(); + Boolean feeCharged = (Boolean) data.get("feeCharged"); + return Boolean.TRUE.equals(feeCharged); + }; + + try (WorkflowApplication app = WorkflowApplication.builder().build()) { + Workflow workflow = + new Workflow() + .withDocument( + new Document() + .withNamespace("test") + .withName("testDependentActivitiesWithFlagsChain") + .withVersion("1.0")) + .withDo( + List.of( + new TaskItem( + "flexible_process", + FlexibleProcess.builder() + .exitCondition(accountingFinished) + .activities(loadCustomer, applyWelcomeBonus, chargeFee) + .build() + .asTask()))); + + Map result = + app.workflowDefinition(workflow) + .instance(Map.of()) // никаких полей изначально + .start() + .get() + .asMap() + .orElseThrow(); + + assertEquals(true, result.get("customerLoaded")); + assertEquals(true, result.get("bonusApplied")); + assertEquals(true, result.get("feeCharged")); + // 0 + 100 - 10 = 90 + assertEquals(90, result.get("balance")); + } + } + + @Test + void testProducerConsumerDependency() throws InterruptedException, ExecutionException { + Activity producer = + Activity.builder() + .callTask( + consumer( + map -> { + Integer produced = (Integer) map.getOrDefault("produced", 0); + Integer queue = (Integer) map.getOrDefault("queue", 0); + map.put("produced", produced + 1); + map.put("queue", queue + 1); + }, + Map.class)) + .entryCondition(workflowModel -> true) + .isRepeatable(true) + .build(); + + Activity consumerActivity = + Activity.builder() + .callTask( + consumer( + map -> { + Integer queue = (Integer) map.getOrDefault("queue", 0); + Integer processed = (Integer) map.getOrDefault("processed", 0); + if (queue > 0) { + map.put("queue", queue - 1); + map.put("processed", processed + 1); + } + }, + Map.class)) + .entryCondition( + workflowModel -> { + Integer queue = (Integer) workflowModel.asMap().get().getOrDefault("queue", 0); + return queue > 0; + }) + .isRepeatable(true) + .build(); + + Activity completionMarker = + Activity.builder() + .callTask( + consumer( + map -> { + map.put("done", true); + }, + Map.class)) + .entryCondition( + workflowModel -> { + Integer processed = + (Integer) workflowModel.asMap().get().getOrDefault("processed", 0); + Boolean done = (Boolean) workflowModel.asMap().get().get("done"); + return processed >= 5 && done == null; + }) + .isRepeatable(true) + .build(); + + Predicate producerConsumerFinished = + workflowModel -> { + Boolean done = (Boolean) workflowModel.asMap().get().get("done"); + return Boolean.TRUE.equals(done); + }; + + try (WorkflowApplication app = WorkflowApplication.builder().build()) { + Workflow workflow = + new Workflow() + .withDocument( + new Document() + .withNamespace("test") + .withName("testProducerConsumerDependency") + .withVersion("1.0")) + .withDo( + List.of( + new TaskItem( + "flexible_process", + FlexibleProcess.builder() + .exitCondition(producerConsumerFinished) + .activities(producer, consumerActivity, completionMarker) + .build() + .asTask()))); + + Map result = + app.workflowDefinition(workflow).instance(Map.of()).start().get().asMap().orElseThrow(); + + Integer produced = (Integer) result.getOrDefault("produced", 0); + Integer processed = (Integer) result.getOrDefault("processed", 0); + Integer queue = (Integer) result.getOrDefault("queue", 0); + + assertTrue(processed >= 5); + assertTrue(produced >= processed); + assertTrue(queue >= 0); + assertEquals(true, result.get("done")); + } + } + + @Test + void testCascadingDependenciesMultipleFields() throws InterruptedException, ExecutionException { + Activity prepareInput = + Activity.builder() + .callTask( + consumer( + map -> { + map.putIfAbsent("x", 2); + map.putIfAbsent("y", 3); + map.put("prepared", true); + }, + Map.class)) + .entryCondition( + workflowModel -> { + Map data = workflowModel.asMap().get(); + return data.get("prepared") == null; + }) + .isRepeatable(true) + .build(); + + Activity computeIntermediate = + Activity.builder() + .callTask( + consumer( + map -> { + Integer x = (Integer) map.get("x"); + Integer y = (Integer) map.get("y"); + map.put("z", x * y); + map.put("intermediateComputed", true); + }, + Map.class)) + .entryCondition( + workflowModel -> { + Map data = workflowModel.asMap().get(); + Boolean prepared = (Boolean) data.get("prepared"); + Boolean intermediateComputed = (Boolean) data.get("intermediateComputed"); + return Boolean.TRUE.equals(prepared) && intermediateComputed == null; + }) + .isRepeatable(true) + .build(); + + Activity finalizeResult = + Activity.builder() + .callTask( + consumer( + map -> { + Integer z = (Integer) map.get("z"); + map.put("result", z + 10); + map.put("finalized", true); + }, + Map.class)) + .entryCondition( + workflowModel -> { + Map data = workflowModel.asMap().get(); + Boolean intermediateComputed = (Boolean) data.get("intermediateComputed"); + Boolean finalized = (Boolean) data.get("finalized"); + return Boolean.TRUE.equals(intermediateComputed) && finalized == null; + }) + .isRepeatable(true) + .build(); + + Predicate cascadingFinished = + workflowModel -> { + Boolean finalized = (Boolean) workflowModel.asMap().get().get("finalized"); + return Boolean.TRUE.equals(finalized); + }; + + try (WorkflowApplication app = WorkflowApplication.builder().build()) { + Workflow workflow = + new Workflow() + .withDocument( + new Document() + .withNamespace("test") + .withName("testCascadingDependenciesMultipleFields") + .withVersion("1.0")) + .withDo( + List.of( + new TaskItem( + "flexible_process", + FlexibleProcess.builder() + .exitCondition(cascadingFinished) + .activities(prepareInput, computeIntermediate, finalizeResult) + .build() + .asTask()))); + + Map result = + app.workflowDefinition(workflow) + .instance(Map.of()) // пустой вход + .start() + .get() + .asMap() + .orElseThrow(); + + assertEquals(true, result.get("prepared")); + assertEquals(true, result.get("intermediateComputed")); + assertEquals(true, result.get("finalized")); + + // x = 2, y = 3, z = 6, result = 16 + assertEquals(2, result.get("x")); + assertEquals(3, result.get("y")); + assertEquals(6, result.get("z")); + assertEquals(16, result.get("result")); + } + } + + @Test + void testActivityEnablesAndThenDisablesAnother() throws InterruptedException, ExecutionException { + + AtomicInteger togglerExecutions = new AtomicInteger(0); + AtomicInteger workerExecutions = new AtomicInteger(0); + + Activity toggler = + Activity.builder() + .callTask( + consumer( + map -> { + togglerExecutions.incrementAndGet(); + Boolean enabled = (Boolean) map.get("workerEnabled"); + map.put("workerEnabled", enabled == null ? Boolean.TRUE : !enabled); + }, + Map.class)) + .entryCondition(workflowModel -> true) + .isRepeatable(true) + .build(); + + Activity worker = + Activity.builder() + .callTask( + consumer( + map -> { + workerExecutions.incrementAndGet(); + Integer processed = (Integer) map.getOrDefault("processed", 0); + map.put("processed", processed + 1); + }, + Map.class)) + .entryCondition( + workflowModel -> { + Map data = workflowModel.asMap().get(); + Boolean enabled = (Boolean) data.get("workerEnabled"); + return Boolean.TRUE.equals(enabled); + }) + .isRepeatable(true) + .build(); + + Predicate toggleScenarioFinished = + workflowModel -> { + Integer processed = (Integer) workflowModel.asMap().get().getOrDefault("processed", 0); + return processed >= 3; + }; + + try (WorkflowApplication app = WorkflowApplication.builder().build()) { + Workflow workflow = + new Workflow() + .withDocument( + new Document() + .withNamespace("test") + .withName("testActivityEnablesAndThenDisablesAnother") + .withVersion("1.0")) + .withDo( + List.of( + new TaskItem( + "flexible_process", + FlexibleProcess.builder() + .exitCondition(toggleScenarioFinished) + .activities(toggler, worker) + .maxAttempts(20) + .build() + .asTask()))); + + Map result = + app.workflowDefinition(workflow).instance(Map.of()).start().get().asMap().orElseThrow(); + + Integer processed = (Integer) result.getOrDefault("processed", 0); + assertTrue(processed >= 3); + assertTrue(togglerExecutions.get() >= workerExecutions.get()); + } + } + + @Test + void testChainedFlexibleProcessesSelectiveActivities() + throws InterruptedException, ExecutionException { + Activity classifier = + Activity.builder() + .callTask( + consumer( + map -> { + Integer amount = (Integer) map.get("amount"); + String risk = amount != null && amount >= 100 ? "HIGH" : "LOW"; + map.put("risk", risk); + map.put("classified", true); + }, + Map.class)) + .entryCondition(workflowModel -> true) + .isRepeatable(false) + .build(); + + Predicate classificationDone = + workflowModel -> { + Map data = workflowModel.asMap().get(); + return Boolean.TRUE.equals(data.get("classified")); + }; + + FlexibleProcess classificationProcess = + FlexibleProcess.builder().exitCondition(classificationDone).activities(classifier).build(); + + Activity highRiskHandler = + Activity.builder() + .callTask( + consumer( + map -> { + map.put("highHandled", true); + map.put("handled", true); + }, + Map.class)) + .entryCondition( + workflowModel -> { + Map data = workflowModel.asMap().get(); + return "HIGH".equals(data.get("risk")) && data.get("handled") == null; + }) + .isRepeatable(false) + .build(); + + Activity lowRiskHandler = + Activity.builder() + .callTask( + consumer( + map -> { + map.put("lowHandled", true); + map.put("handled", true); + }, + Map.class)) + .entryCondition( + workflowModel -> { + Map data = workflowModel.asMap().get(); + return "LOW".equals(data.get("risk")) && data.get("handled") == null; + }) + .isRepeatable(false) + .build(); + + Predicate handlingFinished = + workflowModel -> { + Map data = workflowModel.asMap().get(); + return Boolean.TRUE.equals(data.get("handled")); + }; + + FlexibleProcess handlingProcess = + FlexibleProcess.builder() + .exitCondition(handlingFinished) + .activities(highRiskHandler, lowRiskHandler) + .build(); + + Workflow workflow = + FuncWorkflowBuilder.workflow("chained-classifier-handler") + .tasks(process("classify", classificationProcess), process("handle", handlingProcess)) + .build(); + + try (WorkflowApplication app = WorkflowApplication.builder().build()) { + + Map highResult = + app.workflowDefinition(workflow) + .instance(Map.of("amount", 200)) + .start() + .get() + .asMap() + .orElseThrow(); + + assertEquals("HIGH", highResult.get("risk")); + assertEquals(true, highResult.get("classified")); + assertEquals(true, highResult.get("handled")); + assertEquals(true, highResult.get("highHandled")); + assertFalse(highResult.containsKey("lowHandled")); + + Map lowResult = + app.workflowDefinition(workflow) + .instance(Map.of("amount", 50)) + .start() + .get() + .asMap() + .orElseThrow(); + + assertEquals("LOW", lowResult.get("risk")); + assertEquals(true, lowResult.get("classified")); + assertEquals(true, lowResult.get("handled")); + assertEquals(true, lowResult.get("lowHandled")); + assertFalse(lowResult.containsKey("highHandled")); + } + } + + @Test + void testNonRepeatableActivityResetsBetweenInstancesOfSameDefinition() + throws InterruptedException, ExecutionException { + + Activity oneShot = + Activity.builder() + .callTask( + consumer( + map -> { + Integer counter = (Integer) map.getOrDefault("counter", 0); + map.put("counter", counter + 1); + }, + Map.class)) + .entryCondition(workflowModel -> true) + .isRepeatable(false) + .build(); + + Predicate counterReachedOne = + workflowModel -> { + Integer counter = (Integer) workflowModel.asMap().get().getOrDefault("counter", 0); + return counter >= 1; + }; + + Workflow workflow = + new Workflow() + .withDocument( + new Document() + .withNamespace("test") + .withName("testNonRepeatableResetsBetweenInstances") + .withVersion("1.0")) + .withDo( + List.of( + new TaskItem( + "flexible_process", + FlexibleProcess.builder() + .exitCondition(counterReachedOne) + .activities(oneShot) + .build() + .asTask()))); + + try (WorkflowApplication app = WorkflowApplication.builder().build()) { + var definition = app.workflowDefinition(workflow); + Map result1 = + definition.instance(Map.of("counter", 0)).start().get().asMap().orElseThrow(); + + assertEquals( + 1, result1.get("counter"), "Non-repeatable activity must run once in first instance"); + + Map result2 = + definition.instance(Map.of("counter", 0)).start().get().asMap().orElseThrow(); + + assertEquals( + 1, + result2.get("counter"), + "Non-repeatable activity must also run once in second instance"); + } + } + + @Test + void testNonRepeatableActivityResetsBetweenWorkflowDefinitions() + throws InterruptedException, ExecutionException { + + Activity oneShot = + Activity.builder() + .callTask( + consumer( + map -> { + Integer counter = (Integer) map.getOrDefault("counter", 0); + map.put("counter", counter + 1); + }, + Map.class)) + .entryCondition(workflowModel -> true) + .isRepeatable(false) + .build(); + + Predicate counterReachedOne = + workflowModel -> { + Integer counter = (Integer) workflowModel.asMap().get().getOrDefault("counter", 0); + return counter >= 1; + }; + + FlexibleProcess process = + FlexibleProcess.builder().exitCondition(counterReachedOne).activities(oneShot).build(); + + Workflow workflow = + FuncWorkflowBuilder.workflow("test-non-repeatable-resets-between-definitions") + .tasks(process("flexible_process", process)) + .build(); + + try (WorkflowApplication app = WorkflowApplication.builder().build()) { + + Map result1 = + app.workflowDefinition(workflow) + .instance(Map.of("counter", 0)) + .start() + .get() + .asMap() + .orElseThrow(); + + assertEquals(1, result1.get("counter")); + + Map result2 = + app.workflowDefinition(workflow) + .instance(Map.of("counter", 0)) + .start() + .get() + .asMap() + .orElseThrow(); + + assertEquals(1, result2.get("counter")); + } + } + + @Test + void testMaxAttemptsResetsBetweenInstances() throws InterruptedException, ExecutionException { + Activity iterationCounter = + Activity.builder() + .callTask( + consumer( + map -> { + Integer iterations = (Integer) map.getOrDefault("iterations", 0); + map.put("iterations", iterations + 1); + }, + Map.class)) + .entryCondition(workflowModel -> true) + .isRepeatable(true) + .build(); + + Workflow workflow = + FuncWorkflowBuilder.workflow("testMaxAttemptsResetsBetweenInstances") + .tasks( + process( + "flexible_process", + FlexibleProcess.builder() + .exitCondition(workflowModel -> false) + .activities(iterationCounter) + .maxAttempts(3) + .build())) + .build(); + + try (WorkflowApplication app = WorkflowApplication.builder().build()) { + + var definition = app.workflowDefinition(workflow); + + Map result1 = + definition.instance(Map.of()).start().get().asMap().orElseThrow(); + + assertEquals(3, result1.get("iterations"), "First instance should run exactly 3 attempts"); + + Map result2 = + definition.instance(Map.of()).start().get().asMap().orElseThrow(); + + assertEquals( + 3, result2.get("iterations"), "Second instance should also run exactly 3 attempts"); + } + } + + @Test + void testChainedClassifierDoesNotLeakActivityStateBetweenInstances() + throws InterruptedException, ExecutionException { + + Activity classifier = + Activity.builder() + .callTask( + consumer( + map -> { + Integer amount = (Integer) map.get("amount"); + String risk = amount != null && amount >= 100 ? "HIGH" : "LOW"; + map.put("risk", risk); + map.put("classified", true); + }, + Map.class)) + .entryCondition(workflowModel -> true) + .isRepeatable(false) + .build(); + + Predicate classificationDone = + workflowModel -> { + Map data = workflowModel.asMap().get(); + return Boolean.TRUE.equals(data.get("classified")); + }; + + FlexibleProcess classificationProcess = + FlexibleProcess.builder().exitCondition(classificationDone).activities(classifier).build(); + + Activity highRiskHandler = + Activity.builder() + .callTask( + consumer( + map -> { + map.put("highHandled", true); + map.put("handled", true); + }, + Map.class)) + .entryCondition( + workflowModel -> { + Map data = workflowModel.asMap().get(); + return "HIGH".equals(data.get("risk")) && data.get("handled") == null; + }) + .isRepeatable(false) + .build(); + + Activity lowRiskHandler = + Activity.builder() + .callTask( + consumer( + map -> { + map.put("lowHandled", true); + map.put("handled", true); + }, + Map.class)) + .entryCondition( + workflowModel -> { + Map data = workflowModel.asMap().get(); + return "LOW".equals(data.get("risk")) && data.get("handled") == null; + }) + .isRepeatable(false) + .build(); + + Predicate handlingFinished = + workflowModel -> { + Map data = workflowModel.asMap().get(); + return Boolean.TRUE.equals(data.get("handled")); + }; + + FlexibleProcess handlingProcess = + FlexibleProcess.builder() + .exitCondition(handlingFinished) + .activities(highRiskHandler, lowRiskHandler) + .build(); + + Workflow workflow = + FuncWorkflowBuilder.workflow("chained-classifier-handler-nonleaking") + .tasks(process("classify", classificationProcess), process("handle", handlingProcess)) + .build(); + + try (WorkflowApplication app = WorkflowApplication.builder().build()) { + var definition = app.workflowDefinition(workflow); + + Map highResult = + definition.instance(Map.of("amount", 200)).start().get().asMap().orElseThrow(); + + assertEquals("HIGH", highResult.get("risk")); + assertEquals(true, highResult.get("classified")); + assertEquals(true, highResult.get("handled")); + assertEquals(true, highResult.get("highHandled")); + assertFalse(highResult.containsKey("lowHandled")); + + Map lowResult = + definition.instance(Map.of("amount", 50)).start().get().asMap().orElseThrow(); + + assertEquals("LOW", lowResult.get("risk")); + assertEquals(true, lowResult.get("classified")); + assertEquals(true, lowResult.get("handled")); + assertEquals(true, lowResult.get("lowHandled")); + assertFalse(lowResult.containsKey("highHandled")); + } + } +} diff --git a/experimental/fluent/pom.xml b/experimental/fluent/pom.xml index 71969d9e..d3ac5723 100644 --- a/experimental/fluent/pom.xml +++ b/experimental/fluent/pom.xml @@ -67,5 +67,6 @@ func agentic agentic-langchain4j + flex-processes \ No newline at end of file diff --git a/fluent/spec/src/main/java/io/serverlessworkflow/fluent/spec/BaseTaskItemListBuilder.java b/fluent/spec/src/main/java/io/serverlessworkflow/fluent/spec/BaseTaskItemListBuilder.java index d6d2e229..bb3c140c 100644 --- a/fluent/spec/src/main/java/io/serverlessworkflow/fluent/spec/BaseTaskItemListBuilder.java +++ b/fluent/spec/src/main/java/io/serverlessworkflow/fluent/spec/BaseTaskItemListBuilder.java @@ -53,7 +53,7 @@ protected final List mutableList() { return this.list; } - protected final SELF addTaskItem(TaskItem taskItem) { + public final SELF addTaskItem(TaskItem taskItem) { Objects.requireNonNull(taskItem, "taskItem must not be null"); list.add(taskItem); return self();