Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Identifiable composite nodes #56

Merged
merged 5 commits into from
Nov 5, 2019
Merged
Show file tree
Hide file tree
Changes from 1 commit
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
14 changes: 0 additions & 14 deletions api/README.md

This file was deleted.

Original file line number Diff line number Diff line change
Expand Up @@ -22,7 +22,7 @@

import io.knotx.fragments.engine.FragmentEventContext;
import io.knotx.fragments.engine.Task;
import io.knotx.fragments.engine.graph.ActionNode;
import io.knotx.fragments.engine.graph.SingleNode;
import io.knotx.fragments.engine.graph.CompositeNode;
import io.knotx.fragments.engine.graph.Node;
import io.knotx.fragments.handler.action.ActionProvider;
Expand Down Expand Up @@ -73,7 +73,7 @@ private Node buildActionNode(GraphNodeOptions options, Map<String, Node> edges)
ActionNodeConfigOptions config = new ActionNodeConfigOptions(options.getNode().getConfig());
Action action = actionProvider.get(config.getAction()).orElseThrow(
() -> new GraphConfigurationException("No provider for action " + config.getAction()));
return new ActionNode(config.getAction(), toRxFunction(action), edges);
return new SingleNode(config.getAction(), toRxFunction(action), edges);
}

private Node buildCompositeNode(GraphNodeOptions options, Map<String, Node> edges) {
Expand All @@ -82,7 +82,7 @@ private Node buildCompositeNode(GraphNodeOptions options, Map<String, Node> edge
List<Node> nodes = config.getSubtasks().stream()
.map(this::initGraphRootNode)
.collect(Collectors.toList());
return new CompositeNode(nodes, edges.get(SUCCESS_TRANSITION), edges.get(ERROR_TRANSITION));
return new CompositeNode("composite", nodes, edges.get(SUCCESS_TRANSITION), edges.get(ERROR_TRANSITION));
tomaszmichalak marked this conversation as resolved.
Show resolved Hide resolved
}

private Function<FragmentContext, Single<FragmentResult>> toRxFunction(
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -17,7 +17,6 @@
*/
package io.knotx.fragments.task;

import static io.knotx.fragments.engine.graph.CompositeNode.COMPOSITE_NODE_ID;
import static io.knotx.fragments.handler.api.domain.FragmentResult.ERROR_TRANSITION;
import static io.knotx.fragments.handler.api.domain.FragmentResult.SUCCESS_TRANSITION;
import static org.junit.jupiter.api.Assertions.assertEquals;
Expand All @@ -30,7 +29,7 @@
import io.knotx.fragments.engine.FragmentEvent;
import io.knotx.fragments.engine.FragmentEventContext;
import io.knotx.fragments.engine.Task;
import io.knotx.fragments.engine.graph.ActionNode;
import io.knotx.fragments.engine.graph.SingleNode;
import io.knotx.fragments.engine.graph.CompositeNode;
import io.knotx.fragments.engine.graph.Node;
import io.knotx.fragments.handler.action.ActionProvider;
Expand Down Expand Up @@ -62,6 +61,7 @@ class ConfigurationTaskProviderTest {

private static final Map<String, GraphNodeOptions> NO_TRANSITIONS = Collections.emptyMap();
private static final String TASK_NAME = "task";
private static final String COMPOSITE_NODE_ID = "composite";
private static final FragmentEventContext SAMPLE_FRAGMENT_EVENT =
new FragmentEventContext(new FragmentEvent(new Fragment("type",
new JsonObject().put(FragmentsHandlerOptions.DEFAULT_TASK_KEY, TASK_NAME), "body")),
Expand Down Expand Up @@ -124,7 +124,7 @@ void expectSingleActionNodeGraph() {
assertEquals(TASK_NAME, task.getName());
assertTrue(task.getRootNode().isPresent());
Node rootNode = task.getRootNode().get();
assertTrue(rootNode instanceof ActionNode);
assertTrue(rootNode instanceof SingleNode);
assertEquals("simpleAction", rootNode.getId());
assertFalse(rootNode.next(SUCCESS_TRANSITION).isPresent());
}
Expand All @@ -148,12 +148,12 @@ void expectActionNodesGraphWithTransition() {

assertTrue(task.getRootNode().isPresent());
Node rootNode = task.getRootNode().get();
assertTrue(rootNode instanceof ActionNode);
assertTrue(rootNode instanceof SingleNode);
assertEquals("actionA", rootNode.getId());
Optional<Node> customNode = rootNode.next("customTransition");
assertTrue(customNode.isPresent());
assertTrue(customNode.get() instanceof ActionNode);
ActionNode customSingleNode = (ActionNode) customNode.get();
assertTrue(customNode.get() instanceof SingleNode);
SingleNode customSingleNode = (SingleNode) customNode.get();
assertEquals("actionB", customSingleNode.getId());
}

Expand All @@ -176,14 +176,14 @@ void expectSingleCompositeNodeGraphWithNoEdges() {
assertTrue(task.getRootNode().isPresent());
Node rootNode = task.getRootNode().get();
assertTrue(rootNode instanceof CompositeNode);
assertEquals(COMPOSITE_NODE_ID, rootNode.getId());
assertEquals("composite", rootNode.getId());
tomaszmichalak marked this conversation as resolved.
Show resolved Hide resolved
assertFalse(rootNode.next(SUCCESS_TRANSITION).isPresent());
assertFalse(rootNode.next(ERROR_TRANSITION).isPresent());

CompositeNode compositeRootNode = (CompositeNode) rootNode;
assertEquals(1, compositeRootNode.getNodes().size());
Node node = compositeRootNode.getNodes().get(0);
assertTrue(node instanceof ActionNode);
assertTrue(node instanceof SingleNode);
assertEquals("simpleAction", node.getId());
}

Expand All @@ -209,11 +209,11 @@ void expectCompositeNodeWithSingleNodeOnSuccessGraph() {
assertTrue(task.getRootNode().isPresent());
Node rootNode = task.getRootNode().get();
assertTrue(rootNode instanceof CompositeNode);
assertEquals(COMPOSITE_NODE_ID, rootNode.getId());
assertEquals("composite", rootNode.getId());
tomaszmichalak marked this conversation as resolved.
Show resolved Hide resolved
Optional<Node> onSuccess = rootNode.next(SUCCESS_TRANSITION);
assertTrue(onSuccess.isPresent());
Node onSuccessNode = onSuccess.get();
assertTrue(onSuccessNode instanceof ActionNode);
assertTrue(onSuccessNode instanceof SingleNode);
assertEquals("lastAction", onSuccessNode.getId());
}

Expand All @@ -238,11 +238,11 @@ void expectCompositeNodeWithSingleNodeOnErrorGraph() {
assertTrue(task.getRootNode().isPresent());
Node rootNode = task.getRootNode().get();
assertTrue(rootNode instanceof CompositeNode);
assertEquals(COMPOSITE_NODE_ID, rootNode.getId());
assertEquals("composite", rootNode.getId());
tomaszmichalak marked this conversation as resolved.
Show resolved Hide resolved
Optional<Node> onError = rootNode.next(ERROR_TRANSITION);
assertTrue(onError.isPresent());
Node onErrorNode = onError.get();
assertTrue(onErrorNode instanceof ActionNode);
assertTrue(onErrorNode instanceof SingleNode);
assertEquals("fallbackAction", onErrorNode.getId());
}

Expand Down Expand Up @@ -291,7 +291,7 @@ void expectNestedCompositeNodesGraph() {
assertTrue(task.getRootNode().isPresent());
Node rootNode = task.getRootNode().get();
assertTrue(rootNode instanceof CompositeNode);
assertEquals(COMPOSITE_NODE_ID, rootNode.getId());
assertEquals("composite", rootNode.getId());
tomaszmichalak marked this conversation as resolved.
Show resolved Hide resolved

CompositeNode compositeRootNode = (CompositeNode) rootNode;
assertEquals(1, compositeRootNode.getNodes().size());
Expand All @@ -302,7 +302,7 @@ void expectNestedCompositeNodesGraph() {

assertEquals(1, compositeChildNode.getNodes().size());
Node node = compositeChildNode.getNodes().get(0);
assertTrue(node instanceof ActionNode);
assertTrue(node instanceof SingleNode);
assertEquals("simpleAction", node.getId());
}

Expand Down
64 changes: 36 additions & 28 deletions handler/engine/README.md
Original file line number Diff line number Diff line change
Expand Up @@ -2,48 +2,56 @@
Fragments Engine is a reactive asynchronous map-reduce implementation, enjoying the benefits of Reactive Extensions,
that evaluates each Fragment independently using a `Task` definition. `Task` specifies a directed graph of Nodes,
allowing to transform Fragment into the new one.
```
F -> F', T
```
- `F` - Fragment to transform
- `F'` - the modified Fragment
- `T` represents Transition, a text value, that defines the next Node from the graph

## How does it work
Any *Fragment* can define its processing path - a **Task** (which is a **directed graph** of **Nodes**).
A **Task** specifies the nodes through which Fragments will be routed by the Task Engine.
Each Node may define possible *outgoing edges* - **Transitions**.

Additionally, a **Node** can do one of the following:
- define a **single** [Action](https://github.com/Knotx/knotx-fragments/tree/master/handler/api#action)
that will be applied to Fragment (called [Action Node](#action-node)),
- define **many**/**parallel** Actions that are applied to Fragment (called [Composite Node](#composite-node)) .

### Action Node
Action that is applied on the node is a transformation function
`java.util.function.Function<FragmentContext, Single<FragmentResult>>` that transforms one Fragment
into another.

Part of the `FragmentResult` is a *Transition* that defines the next *Node* in the graph that the Fragment
should visit. If there is no transition defined, default `_success` value is used.
Action Node transformation may return any *Transition*, but all the transitions but `_success` must be
configured. If there is no **path** configured for the transition, the following logic is applied:
- if the *Transition* equals `_success` (default value), graph processing finishes
- otherwise "Unsupported Transition" error occurs.
### Node
The node responsibility can be described as:
> Graph node gets a fragment, processes it and responds with Transition. So a node is the function
>`F -> (F', T)` where `F` is the Fragment, `F'` is a modified Fragment and `T` is the Transition.

The node definition is abstract. It allows to define simple processing nodes but also more complex
structures such as a list of subgraphs.

There are two **node** types:
- **simple nodes** that are simple operations that do some fragment modifications (called [Single Node](#single-node)),
- **parallel complex nodes** that represent a list of subgraphs (called [Composite Node](#composite-node)).

### Single Node
The node represents a single operation that transforms one Fragment into another.

This operation can respond with **custom** transitions.

### Composite Node
This Node may consist of other Composite Nodes or Action Nodes or a mix of both.
It enables parallel processing of independent Actions (e.g. calling two external data sources).
This node may consist of other Composite Nodes or Single Nodes or a mix of both.
It enables parallel processing of independent nodes (e.g. calling two external data sources).
Composite Node may define only two transitions:
- `_success` - the default one, means that operation ends without any exception
- `_error` - when operation throws an exception

> Important note!
> Action Nodes inside the Composite Node may only modify the Fragment's payload and should not modify the Fragment's body.
> This is because Actions are executed in parallel and the output of modifying a single Fragment's body in parallel
> may differ between different executions.
> Single Nodes inside the Composite Node may only modify the Fragment's payload and should not modify the Fragment's body.

### Transition
A directed graph consists of nodes and edges. Edges are called transitions. Transition is a simple text.
tomaszmichalak marked this conversation as resolved.
Show resolved Hide resolved

The pre-defined transitions are:
- `_success` - the default one, indicates that operation completes successfully (no exception)
- `_error` - means that operation has throw an exception

There are two important rules to remember:
> If a node responds with *_success* transition, but the transition is not configured, then
tomaszmichalak marked this conversation as resolved.
Show resolved Hide resolved
>processing is finished.
tomaszmichalak marked this conversation as resolved.
Show resolved Hide resolved

> If a node responds with *_error* transition, but the transition is not configured, then an
tomaszmichalak marked this conversation as resolved.
Show resolved Hide resolved
>exception is returned.

> If a node responds with a not configured transition, the "Unsupported Transition" error occurs.

## Node states
#### States

![Node with exits](assets/images/graph_node.png)

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -15,7 +15,8 @@
*/
package io.knotx.fragments.engine;

import io.knotx.fragments.engine.graph.ActionNode;
import io.knotx.fragments.engine.graph.NodeType;
import io.knotx.fragments.engine.graph.SingleNode;
import io.knotx.fragments.engine.graph.CompositeNode;
import io.knotx.fragments.engine.graph.Node;
import io.knotx.fragments.handler.api.domain.FragmentResult;
Expand Down Expand Up @@ -59,14 +60,14 @@ private Single<TaskExecutionContext> processTask(TaskExecutionContext context, N
}

private Single<FragmentResult> getResult(TaskExecutionContext context) {
return context.getCurrentNode().isComposite()
return NodeType.COMPOSITE.equals(context.getCurrentNode().getType())
tomaszmichalak marked this conversation as resolved.
Show resolved Hide resolved
? mapReduce(context)
: execute(context);
}

private Single<FragmentResult> execute(TaskExecutionContext context) {
return Single.just(context.getCurrentNode())
.map(ActionNode.class::cast)
.map(SingleNode.class::cast)
.observeOn(RxHelper.blockingScheduler(vertx))
.flatMap(gn -> gn.doAction(context.fragmentContextInstance()))
.doOnSuccess(fr -> context.handleSuccess(fr.getTransition()))
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -23,20 +23,21 @@

public class CompositeNode implements Node {

public static final String COMPOSITE_NODE_ID = "composite";
private final String id;
private final List<Node> nodes;
private final Node onSuccess;
private final Node onError;

public CompositeNode(List<Node> nodes, Node onSuccess, Node onError) {
public CompositeNode(String id, List<Node> nodes, Node onSuccess, Node onError) {
this.id = id;
this.nodes = nodes;
this.onSuccess = onSuccess;
this.onError = onError;
}

@Override
public String getId() {
return COMPOSITE_NODE_ID;
return id;
}

@Override
Expand All @@ -51,8 +52,8 @@ public Optional<Node> next(String transition) {
}

@Override
public boolean isComposite() {
return true;
public NodeType getType() {
return NodeType.COMPOSITE;
}

public List<Node> getNodes() {
Expand All @@ -62,7 +63,8 @@ public List<Node> getNodes() {
@Override
public String toString() {
return "CompositeNode{" +
"nodes=" + nodes +
"id='" + id + '\'' +
", nodes=" + nodes +
", onSuccess=" + onSuccess +
", onError=" + onError +
'}';
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -23,6 +23,6 @@ public interface Node {

Optional<Node> next(String transition);

boolean isComposite();
NodeType getType();

}
Original file line number Diff line number Diff line change
@@ -0,0 +1,21 @@
/*
* Copyright (C) 2019 Knot.x Project
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package io.knotx.fragments.engine.graph;

public enum NodeType {
SINGLE,
COMPOSITE
}
Original file line number Diff line number Diff line change
Expand Up @@ -22,19 +22,19 @@
import java.util.Optional;
import java.util.function.Function;

public class ActionNode implements Node {
public class SingleNode implements Node {

private String id;

private Function<FragmentContext, Single<FragmentResult>> action;

private Map<String, Node> transitions;

public ActionNode(String id, Function<FragmentContext, Single<FragmentResult>> action) {
public SingleNode(String id, Function<FragmentContext, Single<FragmentResult>> action) {
this(id, action, null);
}

public ActionNode(String id, Function<FragmentContext, Single<FragmentResult>> action,
public SingleNode(String id, Function<FragmentContext, Single<FragmentResult>> action,
Map<String, Node> transitions) {
this.id = id;
this.action = action;
Expand All @@ -52,8 +52,8 @@ public Optional<Node> next(String transition) {
}

@Override
public boolean isComposite() {
return false;
public NodeType getType() {
return NodeType.SINGLE;
}

public Single<FragmentResult> doAction(FragmentContext fragmentContext) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -18,7 +18,7 @@
import static org.junit.jupiter.api.Assertions.assertTrue;

import io.knotx.fragments.api.Fragment;
import io.knotx.fragments.engine.graph.ActionNode;
import io.knotx.fragments.engine.graph.SingleNode;
import io.knotx.fragments.handler.api.domain.FragmentContext;
import io.knotx.fragments.handler.api.domain.FragmentResult;
import io.knotx.server.api.context.ClientRequest;
Expand Down Expand Up @@ -86,7 +86,7 @@ void expectParallelEvaluationStrategy(VertxTestContext testContext, Vertx vertx)
}

private FragmentEventContextTaskAware initFragmentEventContextTaskAware() {
ActionNode graphNode = new ActionNode("id", BLOCKING_OPERATION,
SingleNode graphNode = new SingleNode("id", BLOCKING_OPERATION,
Collections.emptyMap());
Fragment fragment = new Fragment("snippet", new JsonObject(), "some body");

Expand Down
Loading