Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Identifiable composite nodes #56

Merged
merged 5 commits into from
Nov 5, 2019
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
14 changes: 0 additions & 14 deletions api/README.md

This file was deleted.

Original file line number Diff line number Diff line change
Expand Up @@ -22,7 +22,7 @@

import io.knotx.fragments.engine.FragmentEventContext;
import io.knotx.fragments.engine.Task;
import io.knotx.fragments.engine.graph.ActionNode;
import io.knotx.fragments.engine.graph.SingleNode;
import io.knotx.fragments.engine.graph.CompositeNode;
import io.knotx.fragments.engine.graph.Node;
import io.knotx.fragments.handler.action.ActionProvider;
Expand Down Expand Up @@ -73,7 +73,7 @@ private Node buildActionNode(GraphNodeOptions options, Map<String, Node> edges)
ActionNodeConfigOptions config = new ActionNodeConfigOptions(options.getNode().getConfig());
Action action = actionProvider.get(config.getAction()).orElseThrow(
() -> new GraphConfigurationException("No provider for action " + config.getAction()));
return new ActionNode(config.getAction(), toRxFunction(action), edges);
return new SingleNode(config.getAction(), toRxFunction(action), edges);
}

private Node buildCompositeNode(GraphNodeOptions options, Map<String, Node> edges) {
Expand All @@ -82,7 +82,12 @@ private Node buildCompositeNode(GraphNodeOptions options, Map<String, Node> edge
List<Node> nodes = config.getSubtasks().stream()
.map(this::initGraphRootNode)
.collect(Collectors.toList());
return new CompositeNode(nodes, edges.get(SUCCESS_TRANSITION), edges.get(ERROR_TRANSITION));
return new CompositeNode(getNodeId(), nodes, edges.get(SUCCESS_TRANSITION), edges.get(ERROR_TRANSITION));
}

private String getNodeId() {
// TODO this value should be calculated based on graph, the behaviour now is not changed
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

do we really need it here?

return "composite";
}

private Function<FragmentContext, Single<FragmentResult>> toRxFunction(
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -17,7 +17,6 @@
*/
package io.knotx.fragments.task;

import static io.knotx.fragments.engine.graph.CompositeNode.COMPOSITE_NODE_ID;
import static io.knotx.fragments.handler.api.domain.FragmentResult.ERROR_TRANSITION;
import static io.knotx.fragments.handler.api.domain.FragmentResult.SUCCESS_TRANSITION;
import static org.junit.jupiter.api.Assertions.assertEquals;
Expand All @@ -30,7 +29,7 @@
import io.knotx.fragments.engine.FragmentEvent;
import io.knotx.fragments.engine.FragmentEventContext;
import io.knotx.fragments.engine.Task;
import io.knotx.fragments.engine.graph.ActionNode;
import io.knotx.fragments.engine.graph.SingleNode;
import io.knotx.fragments.engine.graph.CompositeNode;
import io.knotx.fragments.engine.graph.Node;
import io.knotx.fragments.handler.action.ActionProvider;
Expand Down Expand Up @@ -62,6 +61,7 @@ class ConfigurationTaskProviderTest {

private static final Map<String, GraphNodeOptions> NO_TRANSITIONS = Collections.emptyMap();
private static final String TASK_NAME = "task";
private static final String COMPOSITE_NODE_ID = "composite";
private static final FragmentEventContext SAMPLE_FRAGMENT_EVENT =
new FragmentEventContext(new FragmentEvent(new Fragment("type",
new JsonObject().put(FragmentsHandlerOptions.DEFAULT_TASK_KEY, TASK_NAME), "body")),
Expand Down Expand Up @@ -124,7 +124,7 @@ void expectSingleActionNodeGraph() {
assertEquals(TASK_NAME, task.getName());
assertTrue(task.getRootNode().isPresent());
Node rootNode = task.getRootNode().get();
assertTrue(rootNode instanceof ActionNode);
assertTrue(rootNode instanceof SingleNode);
assertEquals("simpleAction", rootNode.getId());
assertFalse(rootNode.next(SUCCESS_TRANSITION).isPresent());
}
Expand All @@ -148,12 +148,12 @@ void expectActionNodesGraphWithTransition() {

assertTrue(task.getRootNode().isPresent());
Node rootNode = task.getRootNode().get();
assertTrue(rootNode instanceof ActionNode);
assertTrue(rootNode instanceof SingleNode);
assertEquals("actionA", rootNode.getId());
Optional<Node> customNode = rootNode.next("customTransition");
assertTrue(customNode.isPresent());
assertTrue(customNode.get() instanceof ActionNode);
ActionNode customSingleNode = (ActionNode) customNode.get();
assertTrue(customNode.get() instanceof SingleNode);
SingleNode customSingleNode = (SingleNode) customNode.get();
assertEquals("actionB", customSingleNode.getId());
}

Expand Down Expand Up @@ -183,7 +183,7 @@ void expectSingleCompositeNodeGraphWithNoEdges() {
CompositeNode compositeRootNode = (CompositeNode) rootNode;
assertEquals(1, compositeRootNode.getNodes().size());
Node node = compositeRootNode.getNodes().get(0);
assertTrue(node instanceof ActionNode);
assertTrue(node instanceof SingleNode);
assertEquals("simpleAction", node.getId());
}

Expand Down Expand Up @@ -213,7 +213,7 @@ void expectCompositeNodeWithSingleNodeOnSuccessGraph() {
Optional<Node> onSuccess = rootNode.next(SUCCESS_TRANSITION);
assertTrue(onSuccess.isPresent());
Node onSuccessNode = onSuccess.get();
assertTrue(onSuccessNode instanceof ActionNode);
assertTrue(onSuccessNode instanceof SingleNode);
assertEquals("lastAction", onSuccessNode.getId());
}

Expand Down Expand Up @@ -242,7 +242,7 @@ void expectCompositeNodeWithSingleNodeOnErrorGraph() {
Optional<Node> onError = rootNode.next(ERROR_TRANSITION);
assertTrue(onError.isPresent());
Node onErrorNode = onError.get();
assertTrue(onErrorNode instanceof ActionNode);
assertTrue(onErrorNode instanceof SingleNode);
assertEquals("fallbackAction", onErrorNode.getId());
}

Expand Down Expand Up @@ -302,7 +302,7 @@ void expectNestedCompositeNodesGraph() {

assertEquals(1, compositeChildNode.getNodes().size());
Node node = compositeChildNode.getNodes().get(0);
assertTrue(node instanceof ActionNode);
assertTrue(node instanceof SingleNode);
assertEquals("simpleAction", node.getId());
}

Expand All @@ -314,4 +314,4 @@ private Task getTask(GraphNodeOptions graph) {
private List<GraphNodeOptions> subTasks(GraphNodeOptions... nodes) {
return Arrays.asList(nodes);
}
}
}
126 changes: 75 additions & 51 deletions handler/engine/README.md
Original file line number Diff line number Diff line change
Expand Up @@ -2,86 +2,110 @@
Fragments Engine is a reactive asynchronous map-reduce implementation, enjoying the benefits of Reactive Extensions,
that evaluates each Fragment independently using a `Task` definition. `Task` specifies a directed graph of Nodes,
allowing to transform Fragment into the new one.
```
F -> F', T
```
- `F` - Fragment to transform
- `F'` - the modified Fragment
- `T` represents Transition, a text value, that defines the next Node from the graph

## How does it work
Any *Fragment* can define its processing path - a **Task** (which is a **directed graph** of **Nodes**).
A **Task** specifies the nodes through which Fragments will be routed by the Task Engine.
Each Node may define possible *outgoing edges* - **Transitions**.

Additionally, a **Node** can do one of the following:
- define a **single** [Action](https://github.com/Knotx/knotx-fragments/tree/master/handler/api#action)
that will be applied to Fragment (called [Action Node](#action-node)),
- define **many**/**parallel** Actions that are applied to Fragment (called [Composite Node](#composite-node)) .

### Action Node
Action that is applied on the node is a transformation function
`java.util.function.Function<FragmentContext, Single<FragmentResult>>` that transforms one Fragment
into another.

Part of the `FragmentResult` is a *Transition* that defines the next *Node* in the graph that the Fragment
should visit. If there is no transition defined, default `_success` value is used.
Action Node transformation may return any *Transition*, but all the transitions but `_success` must be
configured. If there is no **path** configured for the transition, the following logic is applied:
- if the *Transition* equals `_success` (default value), graph processing finishes
- otherwise "Unsupported Transition" error occurs.
### Node
The node responsibility can be described as:
> Graph node gets a fragment, processes it and responds with Transition. So a node is the function
>`F -> (F', T)` where `F` is the Fragment, `F'` is a modified Fragment and `T` is the Transition.

The node definition is abstract. It allows to define simple processing nodes but also more complex
structures such as a list of subgraphs.

There are two **node** types:
- **single nodes** that are simple operations that do some fragments modifications (called [Single Node](#single-node)),
- **parallel complex nodes** that are lists of subgraphs (called [Composite Node](#composite-node)).

### Single Node
A node represents a single operation that transforms one Fragment into another. The operation can
produce multiple **custom** transitions that indicate various business decisions.

The example of this node is calling an authentication RESTful API. The node implements a communication
logic and reacts to different API responses such as HTTP 200/401/404 status codes.
Each status code may represent various decisions such as a successful authentication, a user not
found or even expired password. Those responses can be easily converted into custom transitions.

### Composite Node
This Node may consist of other Composite Nodes or Action Nodes or a mix of both.
It enables parallel processing of independent Actions (e.g. calling two external data sources).
Composite Node may define only two transitions:
A node defines a list of subgrahs to evaluate. It may consist of other Composite Nodes or Single Nodes
or a mix of both. It enables parallel processing of independent nodes/subgraphs (e.g. calling two
external independent data sources).

Composite Node may respond with only two default transitions:
- `_success` - the default one, means that operation ends without any exception
- `_error` - when operation throws an exception

> Important note!
> Action Nodes inside the Composite Node may only modify the Fragment's payload and should not modify the Fragment's body.
> This is because Actions are executed in parallel and the output of modifying a single Fragment's body in parallel
> may differ between different executions.
> Single Nodes inside the Composite Node may only modify the Fragment's payload and should not modify
>the Fragment's body.

### Transition
A directed graph consists of nodes and edges. Edges are called transitions. Transition is identified by a string.

The pre-defined transitions are:
- `_success` - the default one, indicates that operation completes successfully (no exception)
- `_error` - means that operation has throw an exception

## Node states
There are two important rules to remember:
> If a node responds with *_success* transition, but the `_success` transition is not configured, then
>processing of the graph/subgraph is finished.

> If a node responds with *_error* transition, but the `_error` transition is not configured, then an
>exception is returned.

> If a node responds with a not configured transition, the "Unsupported Transition" error occurs.

### Fragment's status
During fragment's processing, a fragment's status is calculated. Each node responds with a transition.
Fragments Engine validates node responses and set one of the fragment's statuses:
- `unprocessed`
- `success`
- `failure`

The engine accepts a list of fragments to process and responds with a list of processed fragments
containing fragment's data, the processing status and log. The decision what should happen when some
fragment's statuses are `failure` is not taken in the engine.

Let's see the example below to understand when the fragment's status is `success` or `failure`.

![Node with exits](assets/images/graph_node.png)

If a node does not declare a `_success` transition, processing is finished and Task Engine responds with
`SUCCESS` status.
> The *A* node declares two transitions: `_success` and `_error`. If the processing of the *A* node
>finishes correctly, it responds with the `_success` transition and then the *B* node will continue
>processing.

> If the `B` node completes successfully, it ends fragment processing with the `SUCCESS` status.
> Otherwise, it returns the `_error` transition and the fragment's status is `FAILURE`.

Let's see the example above. *Node A* declares two transitions: `_success` and `_error`.
If the transformation logic defined in *Node A* ends correctly, then the `_success` transition
is set by default (unless *Node A* has set a custom transition) and *Node B* will continue processing.
If *Node B* ends correctly then Task Engine responds with the `SUCCESS` status. Otherwise, the `_error`
transition is set, *Node B* does not declare it so the `FAILURE` state is returned.
If the transformation logic from *Node A* raises an exception, the `_error` transition is set and
*Node C* continues processing. *Node C* may end correctly, and then the entire processing is marked
with the `SUCCESS` state.
> If the processing of the `A` node throws an exception, then the `_error` transition is set, and
>the `C` node continues processing.

The images below illustrates the above rules.

`SUCCESS` states:
`SUCCESS` statuses:

* *Node A* and *Node B* ends correctly:
* `A` and `B` ends correctly

![Node with exits](assets/images/a_next_b.png)
![A and B ends correctly](assets/images/a_success_b_success.png)

* *Node A* raises an exception, *Node B* ends correctly
* `A` raises an exception (or responds with `_error`), then `B` ends correctly

![Node with exits](assets/images/a_error_c.png)
![A ends with error, C ends correctly](assets/images/a_error_c_success.png)

`FAILURE` states:
`FAILURE` status:

* *Node A* ends correctly and *Node B* raises exception:
* `A` ends correctly, however `B` raises an exception (or responds with `_error`)

![Node with exits](assets/images/a_next_b_error.png)
![Node with exits](assets/images/a_success_b_error.png)

* *Node A* and *Node C* raises exceptions:
* `A` and `C` raise exceptions (or respond with `_error`)

![Node with exits](assets/images/a_error_c_error.png)

A node can also declare its own exits (transitions) but then we need to configure them in a graph.
Otherwise, if the custom transition is set but is not declared, then the `FAILURE` status is returned:
* `A` node can also respond with its custom transitions. Then we have to configure them in a graph.
Otherwise, if the `custom` transition is set but is not declared, then the `FAILURE` status is returned

![Node with exits](assets/images/a_custom.png)
![Node with exits](assets/images/a_custom_no_configuration.png)
Binary file removed handler/engine/assets/images/a_custom.png
Binary file not shown.
Loading
Sorry, something went wrong. Reload?
Sorry, we cannot display this file.
Sorry, this file is invalid so it cannot be displayed.
Binary file removed handler/engine/assets/images/a_error_c.png
Binary file not shown.
Binary file modified handler/engine/assets/images/a_error_c_error.png
Loading
Sorry, something went wrong. Reload?
Sorry, we cannot display this file.
Sorry, this file is invalid so it cannot be displayed.
Loading
Sorry, something went wrong. Reload?
Sorry, we cannot display this file.
Sorry, this file is invalid so it cannot be displayed.
Binary file removed handler/engine/assets/images/a_next_b.png
Binary file not shown.
Binary file removed handler/engine/assets/images/a_next_b_error.png
Binary file not shown.
Loading
Sorry, something went wrong. Reload?
Sorry, we cannot display this file.
Sorry, this file is invalid so it cannot be displayed.
Loading
Sorry, something went wrong. Reload?
Sorry, we cannot display this file.
Sorry, this file is invalid so it cannot be displayed.
Binary file modified handler/engine/assets/images/graph_node.png
Loading
Sorry, something went wrong. Reload?
Sorry, we cannot display this file.
Sorry, this file is invalid so it cannot be displayed.
Original file line number Diff line number Diff line change
Expand Up @@ -15,7 +15,8 @@
*/
package io.knotx.fragments.engine;

import io.knotx.fragments.engine.graph.ActionNode;
import io.knotx.fragments.engine.graph.NodeType;
import io.knotx.fragments.engine.graph.SingleNode;
import io.knotx.fragments.engine.graph.CompositeNode;
import io.knotx.fragments.engine.graph.Node;
import io.knotx.fragments.handler.api.domain.FragmentResult;
Expand Down Expand Up @@ -59,14 +60,14 @@ private Single<TaskExecutionContext> processTask(TaskExecutionContext context, N
}

private Single<FragmentResult> getResult(TaskExecutionContext context) {
return context.getCurrentNode().isComposite()
return NodeType.COMPOSITE == context.getCurrentNode().getType()
? mapReduce(context)
: execute(context);
}

private Single<FragmentResult> execute(TaskExecutionContext context) {
return Single.just(context.getCurrentNode())
.map(ActionNode.class::cast)
.map(SingleNode.class::cast)
.observeOn(RxHelper.blockingScheduler(vertx))
.flatMap(gn -> gn.doAction(context.fragmentContextInstance()))
.doOnSuccess(fr -> context.handleSuccess(fr.getTransition()))
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -23,20 +23,21 @@

public class CompositeNode implements Node {

public static final String COMPOSITE_NODE_ID = "composite";
private final String id;
private final List<Node> nodes;
private final Node onSuccess;
private final Node onError;

public CompositeNode(List<Node> nodes, Node onSuccess, Node onError) {
public CompositeNode(String id, List<Node> nodes, Node onSuccess, Node onError) {
this.id = id;
this.nodes = nodes;
this.onSuccess = onSuccess;
this.onError = onError;
}

@Override
public String getId() {
return COMPOSITE_NODE_ID;
return id;
}

@Override
Expand All @@ -51,8 +52,8 @@ public Optional<Node> next(String transition) {
}

@Override
public boolean isComposite() {
return true;
public NodeType getType() {
return NodeType.COMPOSITE;
}

public List<Node> getNodes() {
Expand All @@ -62,7 +63,8 @@ public List<Node> getNodes() {
@Override
public String toString() {
return "CompositeNode{" +
"nodes=" + nodes +
"id='" + id + '\'' +
", nodes=" + nodes +
", onSuccess=" + onSuccess +
", onError=" + onError +
'}';
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -23,6 +23,6 @@ public interface Node {

Optional<Node> next(String transition);

boolean isComposite();
NodeType getType();

}
Loading