Skip to content

Commit

Permalink
Api start changes
Browse files Browse the repository at this point in the history
Differentiated between Pending and Started cases, being more adapted to
the spec.
  • Loading branch information
fjtirado committed Jan 10, 2025
1 parent 96000ad commit 05bbc2f
Show file tree
Hide file tree
Showing 4 changed files with 54 additions and 36 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -73,7 +73,7 @@ static WorkflowDefinition of(WorkflowApplication application, Workflow workflow,
application, workflow, application.resourceLoaderFactory().getResourceLoader(path));
}

public WorkflowInstance execute(Object input) {
public WorkflowInstance instance(Object input) {
return new WorkflowInstance(this, JsonUtils.fromValue(input));
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -28,17 +28,25 @@ public class WorkflowInstance {
private final String id;
private final JsonNode input;

private final Instant startedAt;
private WorkflowContext workflowContext;
private WorkflowDefinition definition;
private Instant startedAt;
private Instant completedAt;
private volatile JsonNode output;
private CompletableFuture<JsonNode> completableFuture;
private final WorkflowContext workflowContext;

WorkflowInstance(WorkflowDefinition definition, JsonNode input) {
this.id = definition.idFactory().get();
this.input = input;
this.definition = definition;
this.status = new AtomicReference<>(WorkflowStatus.PENDING);
definition.inputSchemaValidator().ifPresent(v -> v.validate(input));
}

public CompletableFuture<JsonNode> start() {
this.startedAt = Instant.now();
this.workflowContext = new WorkflowContext(definition, this);
this.status = new AtomicReference<>(WorkflowStatus.RUNNING);
this.status.set(WorkflowStatus.RUNNING);
this.completableFuture =
TaskExecutorHelper.processTaskList(
definition.startTask(),
Expand All @@ -49,18 +57,20 @@ public class WorkflowInstance {
.map(f -> f.apply(workflowContext, null, input))
.orElse(input))
.thenApply(this::whenCompleted);
return completableFuture;
}

private JsonNode whenCompleted(JsonNode node) {
JsonNode model =
output =
workflowContext
.definition()
.outputFilter()
.map(f -> f.apply(workflowContext, null, node))
.orElse(node);
workflowContext.definition().outputSchemaValidator().ifPresent(v -> v.validate(model));
workflowContext.definition().outputSchemaValidator().ifPresent(v -> v.validate(output));
status.compareAndSet(WorkflowStatus.RUNNING, WorkflowStatus.COMPLETED);
return model;
completedAt = Instant.now();
return output;
}

public String id() {
Expand All @@ -71,6 +81,10 @@ public Instant startedAt() {
return startedAt;
}

public Instant completedAt() {
return completedAt;
}

public JsonNode input() {
return input;
}
Expand All @@ -83,11 +97,11 @@ public void status(WorkflowStatus state) {
this.status.set(state);
}

public CompletableFuture<Object> output() {
return outputAsJsonNode().thenApply(JsonUtils::toJavaValue);
public Object output() {
return JsonUtils.toJavaValue(outputAsJsonNode());
}

public CompletableFuture<JsonNode> outputAsJsonNode() {
return completableFuture.thenApply(this::whenCompleted);
public JsonNode outputAsJsonNode() {
return output;
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -22,7 +22,6 @@

import com.fasterxml.jackson.databind.JsonNode;
import com.fasterxml.jackson.databind.node.ArrayNode;
import com.fasterxml.jackson.databind.node.ObjectNode;
import io.serverlessworkflow.impl.json.JsonUtils;
import java.io.IOException;
import java.time.Instant;
Expand Down Expand Up @@ -59,29 +58,25 @@ private static Stream<Arguments> provideParameters() {
args(
"switch-then-string.yaml",
Map.of("orderType", "electronic"),
o ->
assertThat(o.output().join())
.isEqualTo(Map.of("validate", true, "status", "fulfilled"))),
o -> assertThat(o).isEqualTo(Map.of("validate", true, "status", "fulfilled"))),
args(
"switch-then-string.yaml",
Map.of("orderType", "physical"),
o ->
assertThat(o.output().join())
assertThat(o)
.isEqualTo(Map.of("inventory", "clear", "items", 1, "address", "Elmer St"))),
args(
"switch-then-string.yaml",
Map.of("orderType", "unknown"),
o ->
assertThat(o.output().join())
.isEqualTo(Map.of("log", "warn", "message", "something's wrong"))),
o -> assertThat(o).isEqualTo(Map.of("log", "warn", "message", "something's wrong"))),
args(
"for-sum.yaml",
Map.of("input", Arrays.asList(1, 2, 3)),
o -> assertThat(o.output().join()).isEqualTo(6)),
o -> assertThat(o).isEqualTo(6)),
args(
"for-collect.yaml",
Map.of("input", Arrays.asList(1, 2, 3)),
o -> assertThat(o.output().join()).isEqualTo(Map.of("output", Arrays.asList(2, 4, 6)))),
o -> assertThat(o).isEqualTo(Map.of("output", Arrays.asList(2, 4, 6)))),
args(
"simple-expression.yaml",
Map.of("input", Arrays.asList(1, 2, 3)),
Expand All @@ -97,16 +92,25 @@ private static Stream<Arguments> provideParameters() {
args(
"fork.yaml",
Map.of(),
o ->
assertThat(((ObjectNode) o.outputAsJsonNode().join()).get("patientId").asText())
.isIn("John", "Smith")),
args("fork-no-compete.yaml", Map.of(), WorkflowDefinitionTest::checkNotCompeteOuput));
o -> assertThat(((Map<String, Object>) o).get("patientId")).isIn("John", "Smith")),
argsJson("fork-no-compete.yaml", Map.of(), WorkflowDefinitionTest::checkNotCompeteOuput));
}

private static Arguments args(
String fileName, Map<String, Object> input, Consumer<WorkflowInstance> instance) {
String fileName, Map<String, Object> input, Consumer<Object> instance) {
return Arguments.of(
fileName,
(Consumer<WorkflowDefinition>)
d ->
instance.accept(
d.instance(input).start().thenApply(JsonUtils::toJavaValue).join()));
}

private static Arguments argsJson(
String fileName, Map<String, Object> input, Consumer<JsonNode> instance) {
return Arguments.of(
fileName, (Consumer<WorkflowDefinition>) d -> instance.accept(d.execute(input)));
fileName,
(Consumer<WorkflowDefinition>) d -> instance.accept(d.instance(input).start().join()));
}

private static <T extends Throwable> Arguments args(
Expand All @@ -117,8 +121,7 @@ private static <T extends Throwable> Arguments args(
d ->
checkWorkflowException(
catchThrowableOfType(
CompletionException.class,
() -> d.execute(Map.of()).outputAsJsonNode().join()),
CompletionException.class, () -> d.instance(Map.of()).start().join()),
consumer,
clazz));
}
Expand All @@ -129,8 +132,7 @@ private static <T extends Throwable> void checkWorkflowException(
consumer.accept(clazz.cast(ex.getCause()));
}

private static void checkNotCompeteOuput(WorkflowInstance instance) {
JsonNode out = instance.outputAsJsonNode().join();
private static void checkNotCompeteOuput(JsonNode out) {
assertThat(out).isInstanceOf(ArrayNode.class);
assertThat(out).hasSize(2);
ArrayNode array = (ArrayNode) out;
Expand All @@ -156,8 +158,8 @@ private static void checkWorkflowException(WorkflowException ex) {
assertThat(ex.getWorflowError().instance()).isEqualTo("do/0/notImplemented");
}

private static void checkSpecialKeywords(WorkflowInstance obj) {
Map<String, Object> result = (Map<String, Object>) obj.output().join();
private static void checkSpecialKeywords(Object obj) {
Map<String, Object> result = (Map<String, Object>) obj;
assertThat(Instant.ofEpochMilli((long) result.get("startedAt")))
.isAfterOrEqualTo(before)
.isBeforeOrEqualTo(Instant.now());
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -19,6 +19,7 @@
import static org.assertj.core.api.Assertions.assertThat;
import static org.assertj.core.api.Assertions.catchThrowableOfType;

import io.serverlessworkflow.impl.json.JsonUtils;
import java.io.IOException;
import java.util.Map;
import java.util.stream.Stream;
Expand All @@ -44,8 +45,9 @@ void testWorkflowExecution(String fileName, Object input, Condition<Object> cond
throws IOException {
assertThat(
appl.workflowDefinition(readWorkflowFromClasspath(fileName))
.execute(input)
.output()
.instance(input)
.start()
.thenApply(JsonUtils::toJavaValue)
.join())
.is(condition);
}
Expand All @@ -60,7 +62,7 @@ void testWrongSchema(String fileName) {
IllegalArgumentException exception =
catchThrowableOfType(
IllegalArgumentException.class,
() -> appl.workflowDefinition(readWorkflowFromClasspath(fileName)).execute(Map.of()));
() -> appl.workflowDefinition(readWorkflowFromClasspath(fileName)).instance(Map.of()));
assertThat(exception)
.isNotNull()
.hasMessageContaining("There are JsonSchema validation errors");
Expand Down

0 comments on commit 05bbc2f

Please sign in to comment.