Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Feature/explorer executor groups #655

Open
wants to merge 12 commits into
base: master
Choose a base branch
from
Open
Show file tree
Hide file tree
Changes from 11 commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -176,6 +176,21 @@ public List<WorkflowExecutor> getExecutors() {
return new WorkflowExecutor(id, host, pid, executorGroup, started, active, expires, stopped, recovered);
});
}
public List<WorkflowExecutor> getAllExecutors() {
return jdbc.query("select id, host,executor_group, pid, started, active, expires, stopped, recovered from nflow_executor "
+ " order by id asc", (rs, rowNum) -> {
int id = rs.getInt("id");
String host = rs.getString("host");
int pid = rs.getInt("pid");
String executorGroupName = rs.getString("executor_group");
var started = sqlVariants.getDateTime(rs, "started");
var active = sqlVariants.getDateTime(rs, "active");
var expires = sqlVariants.getDateTime(rs, "expires");
var stopped = sqlVariants.getDateTime(rs, "stopped");
var recovered = sqlVariants.getDateTime(rs, "recovered");
return new WorkflowExecutor(id, host, pid, executorGroupName, started, active, expires, stopped, recovered);
});
}

public void markShutdown(boolean graceful) {
try {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -98,7 +98,7 @@ private String sha1(String serializedDefinition) {
}

public List<StoredWorkflowDefinition> queryStoredWorkflowDefinitions(Collection<String> types) {
String sql = "select definition from nflow_workflow_definition where " + executorInfo.getExecutorGroupCondition();
String sql = "select definition from nflow_workflow_definition " ;
MapSqlParameterSource params = new MapSqlParameterSource();
if (!isEmpty(types)) {
sql += " and type in (:types)";
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -114,6 +114,7 @@ public class WorkflowInstanceDao {
private final long workflowInstanceQueryMaxActionsDefault;
private final int workflowInstanceTypeCacheSize;
private final AtomicBoolean disableBatchUpdates = new AtomicBoolean();
private final AtomicBoolean defaultQueryAllExecutors = new AtomicBoolean(false);
AtomicInteger instanceStateTextLength = new AtomicInteger();
AtomicInteger actionStateTextLength = new AtomicInteger();
AtomicInteger stateVariableValueMaxLength = new AtomicInteger();
Expand Down Expand Up @@ -142,6 +143,7 @@ public WorkflowInstanceDao(SQLVariants sqlVariants, @NFlow JdbcTemplate nflowJdb
workflowInstanceQueryMaxActions = env.getRequiredProperty("nflow.workflow.instance.query.max.actions", Long.class);
workflowInstanceQueryMaxActionsDefault = env.getRequiredProperty("nflow.workflow.instance.query.max.actions.default",
Long.class);
defaultQueryAllExecutors.set(env.getRequiredProperty("nflow.db.query_all_executors", Boolean.class));
disableBatchUpdates.set(env.getRequiredProperty("nflow.db.disable_batch_updates", Boolean.class));
if (disableBatchUpdates.get()) {
logger.info("nFlow DB batch updates are disabled (system property nflow.db.disable_batch_updates=true)");
Expand All @@ -150,6 +152,12 @@ public WorkflowInstanceDao(SQLVariants sqlVariants, @NFlow JdbcTemplate nflowJdb
instanceStateTextLength.set(env.getProperty("nflow.workflow.instance.state.text.length", Integer.class, -1));
actionStateTextLength.set(env.getProperty("nflow.workflow.action.state.text.length", Integer.class, -1));
stateVariableValueMaxLength.set(env.getProperty("nflow.workflow.state.variable.value.length", Integer.class, -1));

}

protected AtomicBoolean getDefaultQueryAllExecutors() {

return defaultQueryAllExecutors;
}

private int getInstanceStateTextLength() {
Expand Down Expand Up @@ -193,7 +201,8 @@ private long insertWorkflowInstanceWithCte(WorkflowInstance instance) {
StringBuilder sqlb = new StringBuilder(256);
sqlb.append("with wf as (").append(insertWorkflowInstanceSql()).append(" returning id)");
Object[] instanceValues = new Object[] { instance.type, instance.priority, instance.parentWorkflowId,
instance.parentActionId, instance.businessKey, instance.externalId, executorInfo.getExecutorGroup(),
instance.parentActionId, instance.businessKey, instance.externalId,
instance.executorGroup == null ? executorInfo.getExecutorGroup() : instance.executorGroup,
instance.status.name(), instance.state, abbreviate(instance.stateText, getInstanceStateTextLength()),
toTimestamp(instance.nextActivation), instance.signal.orElse(null) };
int pos = instanceValues.length;
Expand Down Expand Up @@ -245,7 +254,11 @@ public PreparedStatement createPreparedStatement(Connection connection) throws S
ps.setObject(p++, instance.parentActionId);
ps.setString(p++, instance.businessKey);
ps.setString(p++, instance.externalId);
ps.setString(p++, executorInfo.getExecutorGroup());
if (instance.executorGroup!=null) {
ps.setString(p++, instance.executorGroup);
} else {
ps.setString(p++, executorInfo.getExecutorGroup());
}
ps.setString(p++, instance.status.name());
ps.setString(p++, instance.state);
ps.setString(p++, abbreviate(instance.stateText, getInstanceStateTextLength()));
Expand Down Expand Up @@ -692,7 +705,21 @@ public Stream<WorkflowInstance> queryWorkflowInstancesAsStream(QueryWorkflowInst
List<String> conditions = new ArrayList<>();
MapSqlParameterSource params = new MapSqlParameterSource();
queryOptionsToSqlAndParams(query, conditions, params);
conditions.add(executorInfo.getExecutorGroupCondition());

if (!isEmpty(query.executorGroups)) {
if (query.executorGroups.size() == 1) {
conditions.add("executor_group = :executor_group");
params.addValue("executor_group", query.executorGroups.get(0));
} else {
conditions.add("executor_group in (:executor_groups)");
params.addValue("executor_groups", query.executorGroups);
}
}
else if (!defaultQueryAllExecutors.get()){
//the old behaviour of nflow is that when you query for workflows you only get the executor group of the current instance
conditions.add(executorInfo.getExecutorGroupCondition());
}

String sqlSuffix = "from nflow_workflow wf ";
if (query.stateVariableKey != null) {
sqlSuffix += "inner join nflow_workflow_state wfs on wf.id = wfs.workflow_id and wfs.state_key = :state_key and " + sqlVariants.clobToComparable("wfs.state_value") + " = :state_value ";
Expand All @@ -701,7 +728,13 @@ public Stream<WorkflowInstance> queryWorkflowInstancesAsStream(QueryWorkflowInst
params.addValue("state_key", query.stateVariableKey);
params.addValue("state_value", query.stateVariableValue);
}
sqlSuffix += "where " + collectionToDelimitedString(conditions, " and ") + " order by id desc";
String collection = collectionToDelimitedString(conditions, " and ");
if (collection.isEmpty()){
//remove the where clause when there are no conditions
sqlSuffix += " order by id desc";
}else {
sqlSuffix += "where " + collection + " order by id desc";
}
long maxResults = getMaxResults(query.maxResults);
String sql = sqlVariants.limit("select " + ALL_WORKFLOW_COLUMNS + ", 0 as archived " + sqlSuffix, maxResults);
List<WorkflowInstance.Builder> results = namedJdbc.query(sql, params, workflowInstanceRowMapper);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -29,4 +29,11 @@ public WorkflowExecutorService(ExecutorDao executorDao) {
public List<WorkflowExecutor> getWorkflowExecutors() {
return executorDao.getExecutors();
}
/**
* Return all workflow executors .
* @return The workflow executors.
*/
public List<WorkflowExecutor> getAllWorkflowExecutors() {
return executorDao.getAllExecutors();
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -69,6 +69,10 @@ public class QueryWorkflowInstances extends ModelObject {
* Setting this to true will make the query return also workflow actions.
*/
public final boolean includeActions;
/**
* optional filter of executor groups, if empty it does not return all but instead backwards compactability of the current api group
*/
public final List<String> executorGroups;

/**
* Setting this to true will make the query return also the current state variables for the workflow.
Expand Down Expand Up @@ -114,6 +118,7 @@ public class QueryWorkflowInstances extends ModelObject {
this.stateVariableKey = builder.stateVariableKey;
this.stateVariableValue = builder.stateVariableValue;
this.includeActions = builder.includeActions;
this.executorGroups = builder.executorGroups;
this.includeCurrentStateVariables = builder.includeCurrentStateVariables;
this.includeActionStateVariables = builder.includeActionStateVariables;
this.includeChildWorkflows = builder.includeChildWorkflows;
Expand All @@ -137,6 +142,7 @@ public static class Builder {
String stateVariableKey;
String stateVariableValue;
boolean includeActions;
List<String> executorGroups = new ArrayList<>();
boolean includeCurrentStateVariables;
boolean includeActionStateVariables;
boolean includeChildWorkflows;
Expand All @@ -162,6 +168,7 @@ public Builder(QueryWorkflowInstances copy) {
this.stateVariableKey = copy.stateVariableKey;
this.stateVariableValue = copy.stateVariableValue;
this.includeActions = copy.includeActions;
this.executorGroups = copy.executorGroups;
this.includeCurrentStateVariables = copy.includeCurrentStateVariables;
this.includeActionStateVariables = copy.includeActionStateVariables;
this.includeChildWorkflows = copy.includeChildWorkflows;
Expand Down Expand Up @@ -273,6 +280,16 @@ public Builder setIncludeActions(boolean includeActions) {
return this;
}

/**
* Set whether specific executor groups should be included in the results.
* @param executorGroups list of executor names
* @return this.
*/
public Builder setExecutorGroups(String ... executorGroups) {
this.executorGroups.addAll(asList(executorGroups));
return this;
}

/**
* Set whether current workflow state variables should be included in the results. Default is `false`
* @param includeCurrentStateVariables True to include state variables, false otherwise.
Expand Down
1 change: 1 addition & 0 deletions nflow-engine/src/main/resources/nflow-engine.properties
Original file line number Diff line number Diff line change
Expand Up @@ -65,6 +65,7 @@ nflow.db.max_pool_size=4
nflow.db.idle_timeout_seconds=600
nflow.db.create_on_startup=true
nflow.db.disable_batch_updates=false
nflow.db.query_all_executors=false
nflow.db.workflowInstanceType.cacheSize=10000
nflow.db.initialization_fail_timeout_seconds=10

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -698,6 +698,50 @@ public void pollNextWorkflowInstances() {
assertThat(secondBatch.size(), equalTo(0));
}

/**
* Test creating a workflow with multiple executors and querying for them
*/
@Test
public void testCreateWorkflowMultipleExecutorsAndQuery() {

dao.insertWorkflowInstance(constructWorkflowInstanceBuilder().setNextActivation(now().minusMinutes(1))
.setPriority((short)1).setExecutorGroup("junit").build());
dao.insertWorkflowInstance(constructWorkflowInstanceBuilder().setNextActivation(now().minusMinutes(1))
.setPriority((short)1).setExecutorGroup("test_two").build());

//will return the one that matches the executor group of the current nflow
List<WorkflowInstance> defaultSearch = dao.queryWorkflowInstances(new QueryWorkflowInstances.Builder().setIncludeActions(true)
.build());
assertThat(defaultSearch.size(), is(1));

List<WorkflowInstance> workflows = dao.queryWorkflowInstances(new QueryWorkflowInstances.Builder().setIncludeActions(true)
.setExecutorGroups("junit").build());
assertThat(workflows.size(), is(1));

List<WorkflowInstance> workflowsAll = dao.queryWorkflowInstances(new QueryWorkflowInstances.Builder().setIncludeActions(true)
.setExecutorGroups("junit","test_two").build());

assertThat(workflowsAll.size(), is(2));

}
/**
* Test returning all executors (default being junit)
*/
@Test
public void testQueryWorkflowWithDefaultSetToTrue() {

dao.getDefaultQueryAllExecutors().set(true);

dao.insertWorkflowInstance(constructWorkflowInstanceBuilder().setNextActivation(now().minusMinutes(1))
.setPriority((short)1).setExecutorGroup("test").build());
dao.insertWorkflowInstance(constructWorkflowInstanceBuilder().setNextActivation(now().minusMinutes(1))
.setPriority((short)1).setExecutorGroup("test_two").build());
List<WorkflowInstance> workflows = dao.queryWorkflowInstances(new QueryWorkflowInstances.Builder().setIncludeActions(true)
.build());
assertThat(workflows.size(), is(2));

}

@Test
public void pollNextWorkflowInstancesReturnInstancesInCorrectOrder() {
long olderLowPrio = createInstance(2, (short) 1);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -30,7 +30,7 @@ protected WorkflowInstance.Builder constructWorkflowInstanceBuilder() {
.setExternalId(randomUUID().toString())
.setBusinessKey(randomUUID().toString())
.setRetries(0)
.setExecutorGroup("flowInstance1")
.setExecutorGroup("junit")
.setStateVariables(new LinkedHashMap<String, String>() {
{
put("requestData", "{ \"parameter\": \"abc\" }");
Expand Down
3 changes: 2 additions & 1 deletion nflow-engine/src/test/resources/junit.properties
Original file line number Diff line number Diff line change
Expand Up @@ -18,5 +18,6 @@ nflow.db.max_pool_size=20
nflow.db.idle_timeout_seconds=600
nflow.db.create_on_startup=true
nflow.db.disable_batch_updates=false
nflow.db.query_all_executors=false
nflow.db.workflowInstanceType.cacheSize=10000
nflow.db.initialization_fail_timeout_seconds=1
nflow.db.initialization_fail_timeout_seconds=1
22 changes: 17 additions & 5 deletions nflow-explorer/src/App.tsx
Original file line number Diff line number Diff line change
Expand Up @@ -46,12 +46,24 @@ function App() {
<div className="content-area">
<Routes>
<Route path="/" element={<Navigate to="/workflow" />} />
<Route path="/search" element={<Navigate to="/workflow"/>} />
<Route path="/workflow/create" element={ <CreateWorkflowInstancePage />} />
<Route path="/workflow/:id" element={<WorkflowInstanceDetailsPage />} />
<Route path="/search" element={<Navigate to="/workflow" />} />
<Route
path="/workflow/create"
element={<CreateWorkflowInstancePage />}
/>
<Route
path="/workflow/:id"
element={<WorkflowInstanceDetailsPage />}
/>
<Route path="/workflow" element={<WorkflowInstanceListPage />} />
<Route path="/workflow-definition/:type" element={<WorkflowDefinitionDetailsPage />} />
<Route path="/workflow-definition" element={<WorkflowDefinitionListPage />} />
<Route
path="/workflow-definition/:type"
element={<WorkflowDefinitionDetailsPage />}
/>
<Route
path="/workflow-definition"
element={<WorkflowDefinitionListPage />}
/>
<Route path="/executors" element={<ExecutorListPage />} />
<Route path="/about" element={<AboutPage />} />
<Route path="*" element={<NotFoundPage />} />
Expand Down
Loading
Loading