diff --git a/nflow-engine/src/main/java/io/nflow/engine/internal/dao/ExecutorDao.java b/nflow-engine/src/main/java/io/nflow/engine/internal/dao/ExecutorDao.java index 8fb025e01..19498584c 100644 --- a/nflow-engine/src/main/java/io/nflow/engine/internal/dao/ExecutorDao.java +++ b/nflow-engine/src/main/java/io/nflow/engine/internal/dao/ExecutorDao.java @@ -176,6 +176,21 @@ public List getExecutors() { return new WorkflowExecutor(id, host, pid, executorGroup, started, active, expires, stopped, recovered); }); } + public List getAllExecutors() { + return jdbc.query("select id, host,executor_group, pid, started, active, expires, stopped, recovered from nflow_executor " + + " order by id asc", (rs, rowNum) -> { + int id = rs.getInt("id"); + String host = rs.getString("host"); + int pid = rs.getInt("pid"); + String executorGroupName = rs.getString("executor_group"); + var started = sqlVariants.getDateTime(rs, "started"); + var active = sqlVariants.getDateTime(rs, "active"); + var expires = sqlVariants.getDateTime(rs, "expires"); + var stopped = sqlVariants.getDateTime(rs, "stopped"); + var recovered = sqlVariants.getDateTime(rs, "recovered"); + return new WorkflowExecutor(id, host, pid, executorGroupName, started, active, expires, stopped, recovered); + }); + } public void markShutdown(boolean graceful) { try { diff --git a/nflow-engine/src/main/java/io/nflow/engine/internal/dao/WorkflowDefinitionDao.java b/nflow-engine/src/main/java/io/nflow/engine/internal/dao/WorkflowDefinitionDao.java index 031a3d2c4..43c79b701 100644 --- a/nflow-engine/src/main/java/io/nflow/engine/internal/dao/WorkflowDefinitionDao.java +++ b/nflow-engine/src/main/java/io/nflow/engine/internal/dao/WorkflowDefinitionDao.java @@ -98,7 +98,7 @@ private String sha1(String serializedDefinition) { } public List queryStoredWorkflowDefinitions(Collection types) { - String sql = "select definition from nflow_workflow_definition where " + executorInfo.getExecutorGroupCondition(); + String sql = "select definition from nflow_workflow_definition " ; MapSqlParameterSource params = new MapSqlParameterSource(); if (!isEmpty(types)) { sql += " and type in (:types)"; diff --git a/nflow-engine/src/main/java/io/nflow/engine/internal/dao/WorkflowInstanceDao.java b/nflow-engine/src/main/java/io/nflow/engine/internal/dao/WorkflowInstanceDao.java index cb1ccf592..32150ed00 100644 --- a/nflow-engine/src/main/java/io/nflow/engine/internal/dao/WorkflowInstanceDao.java +++ b/nflow-engine/src/main/java/io/nflow/engine/internal/dao/WorkflowInstanceDao.java @@ -114,6 +114,7 @@ public class WorkflowInstanceDao { private final long workflowInstanceQueryMaxActionsDefault; private final int workflowInstanceTypeCacheSize; private final AtomicBoolean disableBatchUpdates = new AtomicBoolean(); + private final AtomicBoolean defaultQueryAllExecutors = new AtomicBoolean(false); AtomicInteger instanceStateTextLength = new AtomicInteger(); AtomicInteger actionStateTextLength = new AtomicInteger(); AtomicInteger stateVariableValueMaxLength = new AtomicInteger(); @@ -142,6 +143,7 @@ public WorkflowInstanceDao(SQLVariants sqlVariants, @NFlow JdbcTemplate nflowJdb workflowInstanceQueryMaxActions = env.getRequiredProperty("nflow.workflow.instance.query.max.actions", Long.class); workflowInstanceQueryMaxActionsDefault = env.getRequiredProperty("nflow.workflow.instance.query.max.actions.default", Long.class); + defaultQueryAllExecutors.set(env.getRequiredProperty("nflow.db.query_all_executors", Boolean.class)); disableBatchUpdates.set(env.getRequiredProperty("nflow.db.disable_batch_updates", Boolean.class)); if (disableBatchUpdates.get()) { logger.info("nFlow DB batch updates are disabled (system property nflow.db.disable_batch_updates=true)"); @@ -150,6 +152,12 @@ public WorkflowInstanceDao(SQLVariants sqlVariants, @NFlow JdbcTemplate nflowJdb instanceStateTextLength.set(env.getProperty("nflow.workflow.instance.state.text.length", Integer.class, -1)); actionStateTextLength.set(env.getProperty("nflow.workflow.action.state.text.length", Integer.class, -1)); stateVariableValueMaxLength.set(env.getProperty("nflow.workflow.state.variable.value.length", Integer.class, -1)); + + } + + protected AtomicBoolean getDefaultQueryAllExecutors() { + + return defaultQueryAllExecutors; } private int getInstanceStateTextLength() { @@ -193,7 +201,8 @@ private long insertWorkflowInstanceWithCte(WorkflowInstance instance) { StringBuilder sqlb = new StringBuilder(256); sqlb.append("with wf as (").append(insertWorkflowInstanceSql()).append(" returning id)"); Object[] instanceValues = new Object[] { instance.type, instance.priority, instance.parentWorkflowId, - instance.parentActionId, instance.businessKey, instance.externalId, executorInfo.getExecutorGroup(), + instance.parentActionId, instance.businessKey, instance.externalId, + instance.executorGroup == null ? executorInfo.getExecutorGroup() : instance.executorGroup, instance.status.name(), instance.state, abbreviate(instance.stateText, getInstanceStateTextLength()), toTimestamp(instance.nextActivation), instance.signal.orElse(null) }; int pos = instanceValues.length; @@ -245,7 +254,11 @@ public PreparedStatement createPreparedStatement(Connection connection) throws S ps.setObject(p++, instance.parentActionId); ps.setString(p++, instance.businessKey); ps.setString(p++, instance.externalId); - ps.setString(p++, executorInfo.getExecutorGroup()); + if (instance.executorGroup!=null) { + ps.setString(p++, instance.executorGroup); + } else { + ps.setString(p++, executorInfo.getExecutorGroup()); + } ps.setString(p++, instance.status.name()); ps.setString(p++, instance.state); ps.setString(p++, abbreviate(instance.stateText, getInstanceStateTextLength())); @@ -692,7 +705,21 @@ public Stream queryWorkflowInstancesAsStream(QueryWorkflowInst List conditions = new ArrayList<>(); MapSqlParameterSource params = new MapSqlParameterSource(); queryOptionsToSqlAndParams(query, conditions, params); - conditions.add(executorInfo.getExecutorGroupCondition()); + + if (!isEmpty(query.executorGroups)) { + if (query.executorGroups.size() == 1) { + conditions.add("executor_group = :executor_group"); + params.addValue("executor_group", query.executorGroups.get(0)); + } else { + conditions.add("executor_group in (:executor_groups)"); + params.addValue("executor_groups", query.executorGroups); + } + } + else if (!defaultQueryAllExecutors.get()){ + //the old behaviour of nflow is that when you query for workflows you only get the executor group of the current instance + conditions.add(executorInfo.getExecutorGroupCondition()); + } + String sqlSuffix = "from nflow_workflow wf "; if (query.stateVariableKey != null) { sqlSuffix += "inner join nflow_workflow_state wfs on wf.id = wfs.workflow_id and wfs.state_key = :state_key and " + sqlVariants.clobToComparable("wfs.state_value") + " = :state_value "; @@ -701,7 +728,13 @@ public Stream queryWorkflowInstancesAsStream(QueryWorkflowInst params.addValue("state_key", query.stateVariableKey); params.addValue("state_value", query.stateVariableValue); } - sqlSuffix += "where " + collectionToDelimitedString(conditions, " and ") + " order by id desc"; + String collection = collectionToDelimitedString(conditions, " and "); + if (collection.isEmpty()){ + //remove the where clause when there are no conditions + sqlSuffix += " order by id desc"; + }else { + sqlSuffix += "where " + collection + " order by id desc"; + } long maxResults = getMaxResults(query.maxResults); String sql = sqlVariants.limit("select " + ALL_WORKFLOW_COLUMNS + ", 0 as archived " + sqlSuffix, maxResults); List results = namedJdbc.query(sql, params, workflowInstanceRowMapper); diff --git a/nflow-engine/src/main/java/io/nflow/engine/service/WorkflowExecutorService.java b/nflow-engine/src/main/java/io/nflow/engine/service/WorkflowExecutorService.java index b6fca63ee..2f5f68c6e 100644 --- a/nflow-engine/src/main/java/io/nflow/engine/service/WorkflowExecutorService.java +++ b/nflow-engine/src/main/java/io/nflow/engine/service/WorkflowExecutorService.java @@ -29,4 +29,11 @@ public WorkflowExecutorService(ExecutorDao executorDao) { public List getWorkflowExecutors() { return executorDao.getExecutors(); } + /** + * Return all workflow executors . + * @return The workflow executors. + */ + public List getAllWorkflowExecutors() { + return executorDao.getAllExecutors(); + } } diff --git a/nflow-engine/src/main/java/io/nflow/engine/workflow/instance/QueryWorkflowInstances.java b/nflow-engine/src/main/java/io/nflow/engine/workflow/instance/QueryWorkflowInstances.java index 443548778..5821a3bcf 100644 --- a/nflow-engine/src/main/java/io/nflow/engine/workflow/instance/QueryWorkflowInstances.java +++ b/nflow-engine/src/main/java/io/nflow/engine/workflow/instance/QueryWorkflowInstances.java @@ -69,6 +69,10 @@ public class QueryWorkflowInstances extends ModelObject { * Setting this to true will make the query return also workflow actions. */ public final boolean includeActions; + /** + * optional filter of executor groups, if empty it does not return all but instead backwards compactability of the current api group + */ + public final List executorGroups; /** * Setting this to true will make the query return also the current state variables for the workflow. @@ -114,6 +118,7 @@ public class QueryWorkflowInstances extends ModelObject { this.stateVariableKey = builder.stateVariableKey; this.stateVariableValue = builder.stateVariableValue; this.includeActions = builder.includeActions; + this.executorGroups = builder.executorGroups; this.includeCurrentStateVariables = builder.includeCurrentStateVariables; this.includeActionStateVariables = builder.includeActionStateVariables; this.includeChildWorkflows = builder.includeChildWorkflows; @@ -137,6 +142,7 @@ public static class Builder { String stateVariableKey; String stateVariableValue; boolean includeActions; + List executorGroups = new ArrayList<>(); boolean includeCurrentStateVariables; boolean includeActionStateVariables; boolean includeChildWorkflows; @@ -162,6 +168,7 @@ public Builder(QueryWorkflowInstances copy) { this.stateVariableKey = copy.stateVariableKey; this.stateVariableValue = copy.stateVariableValue; this.includeActions = copy.includeActions; + this.executorGroups = copy.executorGroups; this.includeCurrentStateVariables = copy.includeCurrentStateVariables; this.includeActionStateVariables = copy.includeActionStateVariables; this.includeChildWorkflows = copy.includeChildWorkflows; @@ -273,6 +280,16 @@ public Builder setIncludeActions(boolean includeActions) { return this; } + /** + * Set whether specific executor groups should be included in the results. + * @param executorGroups list of executor names + * @return this. + */ + public Builder setExecutorGroups(String ... executorGroups) { + this.executorGroups.addAll(asList(executorGroups)); + return this; + } + /** * Set whether current workflow state variables should be included in the results. Default is `false` * @param includeCurrentStateVariables True to include state variables, false otherwise. diff --git a/nflow-engine/src/main/resources/nflow-engine.properties b/nflow-engine/src/main/resources/nflow-engine.properties index 0599223a7..35bb936eb 100644 --- a/nflow-engine/src/main/resources/nflow-engine.properties +++ b/nflow-engine/src/main/resources/nflow-engine.properties @@ -65,6 +65,7 @@ nflow.db.max_pool_size=4 nflow.db.idle_timeout_seconds=600 nflow.db.create_on_startup=true nflow.db.disable_batch_updates=false +nflow.db.query_all_executors=false nflow.db.workflowInstanceType.cacheSize=10000 nflow.db.initialization_fail_timeout_seconds=10 diff --git a/nflow-engine/src/test/java/io/nflow/engine/internal/dao/WorkflowInstanceDaoTest.java b/nflow-engine/src/test/java/io/nflow/engine/internal/dao/WorkflowInstanceDaoTest.java index b326b8dd0..f28693b55 100644 --- a/nflow-engine/src/test/java/io/nflow/engine/internal/dao/WorkflowInstanceDaoTest.java +++ b/nflow-engine/src/test/java/io/nflow/engine/internal/dao/WorkflowInstanceDaoTest.java @@ -698,6 +698,50 @@ public void pollNextWorkflowInstances() { assertThat(secondBatch.size(), equalTo(0)); } + /** + * Test creating a workflow with multiple executors and querying for them + */ + @Test +public void testCreateWorkflowMultipleExecutorsAndQuery() { + + dao.insertWorkflowInstance(constructWorkflowInstanceBuilder().setNextActivation(now().minusMinutes(1)) + .setPriority((short)1).setExecutorGroup("junit").build()); + dao.insertWorkflowInstance(constructWorkflowInstanceBuilder().setNextActivation(now().minusMinutes(1)) + .setPriority((short)1).setExecutorGroup("test_two").build()); + + //will return the one that matches the executor group of the current nflow + List defaultSearch = dao.queryWorkflowInstances(new QueryWorkflowInstances.Builder().setIncludeActions(true) + .build()); + assertThat(defaultSearch.size(), is(1)); + + List workflows = dao.queryWorkflowInstances(new QueryWorkflowInstances.Builder().setIncludeActions(true) + .setExecutorGroups("junit").build()); + assertThat(workflows.size(), is(1)); + + List workflowsAll = dao.queryWorkflowInstances(new QueryWorkflowInstances.Builder().setIncludeActions(true) + .setExecutorGroups("junit","test_two").build()); + + assertThat(workflowsAll.size(), is(2)); + + } + /** + * Test returning all executors (default being junit) + */ + @Test +public void testQueryWorkflowWithDefaultSetToTrue() { + + dao.getDefaultQueryAllExecutors().set(true); + + dao.insertWorkflowInstance(constructWorkflowInstanceBuilder().setNextActivation(now().minusMinutes(1)) + .setPriority((short)1).setExecutorGroup("test").build()); + dao.insertWorkflowInstance(constructWorkflowInstanceBuilder().setNextActivation(now().minusMinutes(1)) + .setPriority((short)1).setExecutorGroup("test_two").build()); + List workflows = dao.queryWorkflowInstances(new QueryWorkflowInstances.Builder().setIncludeActions(true) + .build()); + assertThat(workflows.size(), is(2)); + + } + @Test public void pollNextWorkflowInstancesReturnInstancesInCorrectOrder() { long olderLowPrio = createInstance(2, (short) 1); diff --git a/nflow-engine/src/test/java/io/nflow/engine/internal/executor/BaseNflowTest.java b/nflow-engine/src/test/java/io/nflow/engine/internal/executor/BaseNflowTest.java index f29d65ccd..476db1f1d 100644 --- a/nflow-engine/src/test/java/io/nflow/engine/internal/executor/BaseNflowTest.java +++ b/nflow-engine/src/test/java/io/nflow/engine/internal/executor/BaseNflowTest.java @@ -30,7 +30,7 @@ protected WorkflowInstance.Builder constructWorkflowInstanceBuilder() { .setExternalId(randomUUID().toString()) .setBusinessKey(randomUUID().toString()) .setRetries(0) - .setExecutorGroup("flowInstance1") + .setExecutorGroup("junit") .setStateVariables(new LinkedHashMap() { { put("requestData", "{ \"parameter\": \"abc\" }"); diff --git a/nflow-engine/src/test/resources/junit.properties b/nflow-engine/src/test/resources/junit.properties index aafd514b1..51964be84 100644 --- a/nflow-engine/src/test/resources/junit.properties +++ b/nflow-engine/src/test/resources/junit.properties @@ -18,5 +18,6 @@ nflow.db.max_pool_size=20 nflow.db.idle_timeout_seconds=600 nflow.db.create_on_startup=true nflow.db.disable_batch_updates=false +nflow.db.query_all_executors=false nflow.db.workflowInstanceType.cacheSize=10000 -nflow.db.initialization_fail_timeout_seconds=1 \ No newline at end of file +nflow.db.initialization_fail_timeout_seconds=1 diff --git a/nflow-explorer/src/App.tsx b/nflow-explorer/src/App.tsx index 68cd4fd67..17a163d60 100644 --- a/nflow-explorer/src/App.tsx +++ b/nflow-explorer/src/App.tsx @@ -46,12 +46,24 @@ function App() {
} /> - } /> - } /> - } /> + } /> + } + /> + } + /> } /> - } /> - } /> + } + /> + } + /> } /> } /> } /> diff --git a/nflow-explorer/src/component/Navigation.tsx b/nflow-explorer/src/component/Navigation.tsx index 646c01e19..92b3e102a 100644 --- a/nflow-explorer/src/component/Navigation.tsx +++ b/nflow-explorer/src/component/Navigation.tsx @@ -2,7 +2,19 @@ import React, {useState} from 'react'; import {matchPath, NavLink, useLocation, useNavigate} from 'react-router-dom'; import './Navigation.scss'; -import {AppBar, Box, Button, IconButton, Menu, MenuItem, Select, Tab, Tabs, Toolbar, Typography} from '@mui/material'; +import { + AppBar, + Box, + Button, + IconButton, + Menu, + MenuItem, + Select, + Tab, + Tabs, + Toolbar, + Typography +} from '@mui/material'; import MenuIcon from '@mui/icons-material/Menu'; import {useConfig} from '../config'; import {Config} from '../types'; @@ -23,14 +35,16 @@ const Navigation = () => { const [selectedEndpointId, setSelectedEndpointId] = useState( config.activeNflowEndpoint.id ); - const [anchorElNav, setAnchorElNav] = React.useState(null); + const [anchorElNav, setAnchorElNav] = React.useState( + null + ); const pages = [ ['/workflow', 'Workflow instances'], ['/workflow-definition', 'Workflow definitions'], ['/executors', 'Executors'], ['/about', 'About'] - ] + ]; const handleOpenNavMenu = (event: React.MouseEvent) => { setAnchorElNav(event.currentTarget); @@ -40,24 +54,24 @@ const Navigation = () => { }; const useRouteMatch = (patterns: readonly string[]) => { - const { pathname } = useLocation(); + const {pathname} = useLocation(); for (let i = 0; i < patterns.length; i += 1) { - const pattern = patterns[i].replace("#", ""); + const pattern = patterns[i].replace('#', ''); const possibleMatch = matchPath(pattern, pathname); if (possibleMatch !== null) { return possibleMatch; } } return null; - } - const routeMatch = useRouteMatch(pages.map((page) => page[0])); + }; + const routeMatch = useRouteMatch(pages.map(page => page[0])); const currentTab = routeMatch?.pattern?.path; return ( -
- +
+ { anchorEl={anchorElNav} anchorOrigin={{ vertical: 'bottom', - horizontal: 'left', + horizontal: 'left' }} keepMounted transformOrigin={{ vertical: 'top', - horizontal: 'left', + horizontal: 'left' }} open={Boolean(anchorElNav)} onClose={handleCloseNavMenu} sx={{ - display: { xs: 'block', md: 'none' }, + display: {xs: 'block', md: 'none'} }} > - {pages.map((page) => ( - + {pages.map(page => ( + {page[1]} ))} @@ -92,41 +111,55 @@ const Navigation = () => { {renderLogo(config)} {config.nflowEndpoints.length > 1 && ( - - )} - + + )}
- - - + + + diff --git a/nflow-explorer/src/component/Selection.tsx b/nflow-explorer/src/component/Selection.tsx index a958ccd91..4ddfc23ae 100644 --- a/nflow-explorer/src/component/Selection.tsx +++ b/nflow-explorer/src/component/Selection.tsx @@ -13,7 +13,7 @@ function Selection(props: { }) { let currentIndex = ++index; return ( - + {props.label} @@ -77,7 +81,11 @@ const UpdateWorkflowInstanceStateForm = function (props: { - +