diff --git a/README.md b/README.md index f072005..66879c8 100644 --- a/README.md +++ b/README.md @@ -19,7 +19,6 @@ The Unit Test section covers these goals. The Load Test section covers these goals. - Process-Automator executes scenario. One scenario pilot a process. It is possible to execute multiple at the same time to handle a use case like @@ -313,3 +312,15 @@ automator.servers: audience: "" secret: "4BPUva1U4lDtoG2-torvAtx6w5RbHULUFhGZ-bBXOMWwZJG3d3VDlfPHjVO3Kz-N" ```` + +# Build + +Rebuilt the image via +```` +mvn clean install +mvn springboot:build-image +```` + +The docker image is build using the Dockerfile present on the root level. + +Push the image to ghcr.io/camunda-community-hub/process-execution-automator: diff --git a/doc/scenarioreference/README.md b/doc/scenarioreference/README.md index 77be35d..3c18979 100644 --- a/doc/scenarioreference/README.md +++ b/doc/scenarioreference/README.md @@ -251,6 +251,16 @@ Example: the variable `loopcrawl` will be a list of 500 random string. +**generateuniqueid()** +Generate a unique sequential number. +The prefix is used to allo wmultiple counter +Example: +```` +"tidblue": "generateuniqueid(blue)" +"tidred": "generateuniqueid(red)" +```` +Variables `tidblue` and `tidred` got a unique id, each following a different counter. + ## Verification diff --git a/pom.xml b/pom.xml index 0835411..8ee1824 100644 --- a/pom.xml +++ b/pom.xml @@ -5,7 +5,8 @@ org.camunda.community.automator process-execution-automator - 1.3.0 + 1.4.0 + 17 @@ -222,7 +223,7 @@ org.springframework.boot spring-boot-maven-plugin - 3.0.6 + org.camunda.automator.AutomatorApplication exec diff --git a/src/main/java/org/camunda/automator/AutomatorAPI.java b/src/main/java/org/camunda/automator/AutomatorAPI.java index cdbb5ef..e4f7da0 100644 --- a/src/main/java/org/camunda/automator/AutomatorAPI.java +++ b/src/main/java/org/camunda/automator/AutomatorAPI.java @@ -67,7 +67,7 @@ public Scenario loadFromInputStream(InputStream scenarioInputStream, String orig /** * Search the engine from the scenario * - * @param scenario scenario + * @param scenario scenario * @param bpmnEngineList different engine configuration * @return the engine, null if no engine exist, an exception if the connection is not possible */ @@ -76,13 +76,13 @@ public BpmnEngine getBpmnEngineFromScenario(Scenario scenario, BpmnEngineList bp try { if (scenario.getServerName() != null) { - return getBpmnEngine( bpmnEngineList.getByServerName(scenario.getServerName()), true); + return getBpmnEngine(bpmnEngineList.getByServerName(scenario.getServerName()), true); } return null; } catch (AutomatorException e) { - logger.error("Can't connect the engine for the scenario [{}] serverName[{}]: {}", - scenario.getName(), scenario.getServerName(), e.getMessage()); + logger.error("Can't connect the engine for the scenario [{}] serverName[{}]: {}", scenario.getName(), + scenario.getServerName(), e.getMessage()); throw e; } @@ -121,8 +121,9 @@ public RunResult executeScenario(BpmnEngine bpmnEngine, RunParameters runParamet /* Deploy a process in the server */ /* ******************************************************************** */ - public BpmnEngine getBpmnEngine(BpmnEngineList.BpmnServerDefinition serverDefinition,boolean logDebug) throws AutomatorException { - return BpmnEngineFactory.getInstance().getEngineFromConfiguration( serverDefinition, logDebug); + public BpmnEngine getBpmnEngine(BpmnEngineList.BpmnServerDefinition serverDefinition, boolean logDebug) + throws AutomatorException { + return BpmnEngineFactory.getInstance().getEngineFromConfiguration(serverDefinition, logDebug); } /** diff --git a/src/main/java/org/camunda/automator/AutomatorCLI.java b/src/main/java/org/camunda/automator/AutomatorCLI.java index b2d041b..2b911d3 100644 --- a/src/main/java/org/camunda/automator/AutomatorCLI.java +++ b/src/main/java/org/camunda/automator/AutomatorCLI.java @@ -106,6 +106,15 @@ private static List detectRecursiveScenario(File folderRecursive) { return listFiles; } + /** + * To reduce the number of warning + * + * @param message message to log out + */ + private static void logOutLn(String message) { + System.out.println(message); + } + public void run(String[] args) { if (!isRunningCLI) return; @@ -223,13 +232,4 @@ public void run(String[] args) { } public enum ACTION {RUN, RECURSIVE, VERIFY, RUNVERIFY, RECURSIVVERIFY} - - /** - * To reduce the number of warning - * - * @param message message to log out - */ - private static void logOutLn(String message) { - System.out.println(message); - } } diff --git a/src/main/java/org/camunda/automator/bpmnengine/BpmnEngine.java b/src/main/java/org/camunda/automator/bpmnengine/BpmnEngine.java index 9977d8b..8bda01c 100644 --- a/src/main/java/org/camunda/automator/bpmnengine/BpmnEngine.java +++ b/src/main/java/org/camunda/automator/bpmnengine/BpmnEngine.java @@ -105,6 +105,7 @@ List searchUserTasksByProcessInstance(String processInstanceId, String u /** * @param workerId workerId * @param topic topic to register + * @param streamEnable true if the stream enable is open * @param lockTime lock time for the job * @param jobHandler C7: must implement ExternalTaskHandler. C8: must implement JobHandler * @param backoffSupplier backOffStrategy @@ -112,6 +113,7 @@ List searchUserTasksByProcessInstance(String processInstanceId, String u */ RegisteredTask registerServiceTask(String workerId, String topic, + boolean streamEnable, Duration lockTime, Object jobHandler, FixedBackoffSupplier backoffSupplier); diff --git a/src/main/java/org/camunda/automator/bpmnengine/BpmnEngineFactory.java b/src/main/java/org/camunda/automator/bpmnengine/BpmnEngineFactory.java index 7882b07..06a959f 100644 --- a/src/main/java/org/camunda/automator/bpmnengine/BpmnEngineFactory.java +++ b/src/main/java/org/camunda/automator/bpmnengine/BpmnEngineFactory.java @@ -27,8 +27,7 @@ public static BpmnEngineFactory getInstance() { return bpmnEngineFactory; } - public BpmnEngine getEngineFromConfiguration(BpmnEngineList.BpmnServerDefinition serverDefinition, - boolean logDebug) + public BpmnEngine getEngineFromConfiguration(BpmnEngineList.BpmnServerDefinition serverDefinition, boolean logDebug) throws AutomatorException { BpmnEngine engine = cacheEngine.get(serverDefinition.serverType); if (engine != null) @@ -45,7 +44,7 @@ public BpmnEngine getEngineFromConfiguration(BpmnEngineList.BpmnServerDefinition case CAMUNDA_8 -> BpmnEngineCamunda8.getFromServerDefinition(serverDefinition, logDebug); - case CAMUNDA_8_SAAS -> BpmnEngineCamunda8.getFromServerDefinition( serverDefinition, logDebug); + case CAMUNDA_8_SAAS -> BpmnEngineCamunda8.getFromServerDefinition(serverDefinition, logDebug); case DUMMY -> new BpmnEngineDummy(serverDefinition); diff --git a/src/main/java/org/camunda/automator/bpmnengine/camunda7/BpmnEngineCamunda7.java b/src/main/java/org/camunda/automator/bpmnengine/camunda7/BpmnEngineCamunda7.java index 3bf1f31..570e0ea 100644 --- a/src/main/java/org/camunda/automator/bpmnengine/camunda7/BpmnEngineCamunda7.java +++ b/src/main/java/org/camunda/automator/bpmnengine/camunda7/BpmnEngineCamunda7.java @@ -146,7 +146,7 @@ public void disconnection() throws AutomatorException { /** * Engine is ready. If not, a connection() method must be call * - * @return + * @return true if the engine is ready */ public boolean isReady() { if (count > 2) @@ -274,6 +274,7 @@ public void executeUserTask(String userTaskId, String userId, Map cacheProcessInstanceMarker = new HashMap<>(); Random random = new Random(System.currentTimeMillis()); + private BpmnEngineList.BpmnServerDefinition serverDefinition; private ZeebeClient zeebeClient; private CamundaOperateClient operateClient; private CamundaTaskListClient taskClient; @@ -281,7 +279,7 @@ public void connection() throws AutomatorException { } if (!isOk) - throw new AutomatorException("Invalid configuration " + analysis.toString()); + throw new AutomatorException("Invalid configuration " + analysis); clientBuilder.numJobWorkerExecutionThreads(serverDefinition.workerExecutionThreads); clientBuilder.defaultJobWorkerMaxJobsActive(serverDefinition.workerMaxJobsActive); @@ -330,7 +328,7 @@ public void disconnection() throws AutomatorException { /** * Engine is ready. If not, a connection() method must be call * - * @return + * @return true if the engine is ready */ public boolean isReady() { return zeebeClient != null; @@ -501,6 +499,7 @@ public List searchUserTasks(String userTaskId, int maxResult) throws Aut @Override public RegisteredTask registerServiceTask(String workerId, String topic, + boolean streamEnable, Duration lockTime, Object jobHandler, FixedBackoffSupplier backoffSupplier) { @@ -520,6 +519,7 @@ public RegisteredTask registerServiceTask(String workerId, .jobType(topic) .handler((JobHandler) jobHandler) .timeout(lockTime) + .streamEnabled(streamEnable) // according the parameter .name(workerId); if (backoffSupplier != null) { diff --git a/src/main/java/org/camunda/automator/bpmnengine/dummy/BpmnEngineDummy.java b/src/main/java/org/camunda/automator/bpmnengine/dummy/BpmnEngineDummy.java index 26c76d7..3fbbbb0 100644 --- a/src/main/java/org/camunda/automator/bpmnengine/dummy/BpmnEngineDummy.java +++ b/src/main/java/org/camunda/automator/bpmnengine/dummy/BpmnEngineDummy.java @@ -17,11 +17,9 @@ public class BpmnEngineDummy implements BpmnEngine { - private final BpmnEngineList.BpmnServerDefinition serverDefinition; private final Logger logger = LoggerFactory.getLogger(BpmnEngineDummy.class); public BpmnEngineDummy(BpmnEngineList.BpmnServerDefinition serverDefinition) { - this.serverDefinition = serverDefinition; } @Override @@ -30,9 +28,11 @@ public void init() { } public void connection() throws AutomatorException { + // nothing to do here } public void disconnection() throws AutomatorException { + // nothing to do here } /** @@ -78,6 +78,7 @@ public void executeUserTask(String userTaskId, String userId, Map getFromServersConnectionList() throws AutomatorException { // not possible to use a Stream: decode throw an exception List list = new ArrayList<>(); - int count=0; + int count = 0; for (String s : configurationServersEngine.serversConnection) { count++; if (s.isEmpty()) continue; - BpmnServerDefinition bpmnServerDefinition = decodeServerConnection(s, "Range in ConnectionString: #"+count); + BpmnServerDefinition bpmnServerDefinition = decodeServerConnection(s, "Range in ConnectionString: #" + count); if (bpmnServerDefinition.serverType == null) { logger.error("Server Type can't be detected in string [{}]", s); continue; @@ -165,54 +165,59 @@ private List getFromServersConnectionList() throws Automat /** * getFromServerList * in configuration, give a list of server. - * @return - * @throws AutomatorException + * + * @return the list of available server + * @throws AutomatorException in case of error */ private List getFromServersList() throws AutomatorException { List serverList = new ArrayList<>(); - int count=0; + int count = 0; for (Map serverMap : configurationServersEngine.getServersList()) { count++; BpmnServerDefinition bpmnServerDefinition = new BpmnServerDefinition(); - bpmnServerDefinition.name = getString("name", serverMap, null,"ServerList #"+count); - String contextLog = "ServerList #"+count+" Name ["+bpmnServerDefinition.name+"]"; - bpmnServerDefinition.workerMaxJobsActive = getInteger(CONF_WORKER_MAX_JOBS_ACTIVE, serverMap, DEFAULT_VALUE_MAX_JOBS_ACTIVE,contextLog); + bpmnServerDefinition.name = getString("name", serverMap, null, "ServerList #" + count); + String contextLog = "ServerList #" + count + " Name [" + bpmnServerDefinition.name + "]"; + bpmnServerDefinition.workerMaxJobsActive = getInteger(CONF_WORKER_MAX_JOBS_ACTIVE, serverMap, + DEFAULT_VALUE_MAX_JOBS_ACTIVE, contextLog); - if (CONF_TYPE_V_CAMUNDA_7.equalsIgnoreCase(getString(CONF_TYPE, serverMap, null,contextLog))) { + if (CONF_TYPE_V_CAMUNDA_7.equalsIgnoreCase(getString(CONF_TYPE, serverMap, null, contextLog))) { bpmnServerDefinition.serverType = CamundaEngine.CAMUNDA_7; - bpmnServerDefinition.camunda7ServerUrl = getString(CONF_URL, serverMap, null,contextLog); + bpmnServerDefinition.camunda7ServerUrl = getString(CONF_URL, serverMap, null, contextLog); if (bpmnServerDefinition.camunda7ServerUrl == null) - throw new AutomatorException("Incorrect Definition - [url] expected for ["+CONF_TYPE_V_CAMUNDA_7+"] type "+contextLog); + throw new AutomatorException( + "Incorrect Definition - [url] expected for [" + CONF_TYPE_V_CAMUNDA_7 + "] type " + contextLog); } - if (CONF_TYPE_V_CAMUNDA_8.equalsIgnoreCase(getString(CONF_TYPE, serverMap, null,contextLog))) { + if (CONF_TYPE_V_CAMUNDA_8.equalsIgnoreCase(getString(CONF_TYPE, serverMap, null, contextLog))) { bpmnServerDefinition.serverType = CamundaEngine.CAMUNDA_8; - bpmnServerDefinition.zeebeGatewayAddress = getString(CONF_ZEEBE_GATEWAY_ADDRESS, serverMap, null,contextLog); - bpmnServerDefinition.operateUserName = getString(CONF_OPERATE_USER_NAME, serverMap, null,contextLog); - bpmnServerDefinition.operateUserPassword = getString(CONF_OPERATE_USER_PASSWORD, serverMap, null,contextLog); - bpmnServerDefinition.operateUrl = getString(CONF_OPERATE_URL, serverMap, null,contextLog); - bpmnServerDefinition.taskListUrl = getString(CONF_TASK_LIST_URL, serverMap, null,contextLog); - bpmnServerDefinition.workerExecutionThreads = getInteger(CONF_WORKER_EXECUTION_THREADS, serverMap, DEFAULT_VALUE_EXECUTION_THREADS,contextLog); + bpmnServerDefinition.zeebeGatewayAddress = getString(CONF_ZEEBE_GATEWAY_ADDRESS, serverMap, null, contextLog); + bpmnServerDefinition.operateUserName = getString(CONF_OPERATE_USER_NAME, serverMap, null, contextLog); + bpmnServerDefinition.operateUserPassword = getString(CONF_OPERATE_USER_PASSWORD, serverMap, null, contextLog); + bpmnServerDefinition.operateUrl = getString(CONF_OPERATE_URL, serverMap, null, contextLog); + bpmnServerDefinition.taskListUrl = getString(CONF_TASK_LIST_URL, serverMap, null, contextLog); + bpmnServerDefinition.workerExecutionThreads = getInteger(CONF_WORKER_EXECUTION_THREADS, serverMap, + DEFAULT_VALUE_EXECUTION_THREADS, contextLog); if (bpmnServerDefinition.zeebeGatewayAddress == null) - throw new AutomatorException("Incorrect Definition - [zeebeGatewayAddress] expected for ["+CONF_TYPE_V_CAMUNDA_8+"] type"); + throw new AutomatorException( + "Incorrect Definition - [zeebeGatewayAddress] expected for [" + CONF_TYPE_V_CAMUNDA_8 + "] type"); } - if (CONF_TYPE_V_CAMUNDA_8_SAAS.equalsIgnoreCase(getString(CONF_TYPE, serverMap, null,contextLog))) { + if (CONF_TYPE_V_CAMUNDA_8_SAAS.equalsIgnoreCase(getString(CONF_TYPE, serverMap, null, contextLog))) { bpmnServerDefinition.serverType = CamundaEngine.CAMUNDA_8_SAAS; - bpmnServerDefinition.zeebeSaasRegion = getString(CONF_ZEEBE_SAAS_REGION, serverMap, null,contextLog); - bpmnServerDefinition.zeebeSaasClientSecret = getString(CONF_ZEEBE_SAAS_SECRET, serverMap, null,contextLog); - bpmnServerDefinition.zeebeSaasClusterId = getString(CONF_ZEEBE_SAAS_CLUSTER_ID, serverMap, null,contextLog); - bpmnServerDefinition.zeebeSaasClientId = getString(CONF_ZEEBE_SAAS_CLIENT_ID, serverMap, null,contextLog); - bpmnServerDefinition.zeebeSaasOAuthUrl = getString(CONF_ZEEBE_SAAS_OAUTHURL, serverMap, null,contextLog); - bpmnServerDefinition.zeebeSaasAudience = getString(CONF_ZEEBE_SAAS_AUDIENCE, serverMap, null,contextLog); - - bpmnServerDefinition.workerExecutionThreads = getInteger(CONF_WORKER_EXECUTION_THREADS, serverMap, DEFAULT_VALUE_EXECUTION_THREADS,contextLog); - bpmnServerDefinition.operateUserName = getString(CONF_OPERATE_USER_NAME, serverMap, null,contextLog); - bpmnServerDefinition.operateUserPassword = getString(CONF_OPERATE_USER_PASSWORD, serverMap, null,contextLog); - bpmnServerDefinition.operateUrl = getString(CONF_OPERATE_URL, serverMap, null,contextLog); - bpmnServerDefinition.taskListUrl = getString(CONF_TASK_LIST_URL, serverMap, null,contextLog); - if (bpmnServerDefinition.zeebeSaasRegion == null - || bpmnServerDefinition.zeebeSaasClientSecret == null || bpmnServerDefinition.zeebeSaasClusterId == null - || bpmnServerDefinition.zeebeSaasClientId == null) + bpmnServerDefinition.zeebeSaasRegion = getString(CONF_ZEEBE_SAAS_REGION, serverMap, null, contextLog); + bpmnServerDefinition.zeebeSaasClientSecret = getString(CONF_ZEEBE_SAAS_SECRET, serverMap, null, contextLog); + bpmnServerDefinition.zeebeSaasClusterId = getString(CONF_ZEEBE_SAAS_CLUSTER_ID, serverMap, null, contextLog); + bpmnServerDefinition.zeebeSaasClientId = getString(CONF_ZEEBE_SAAS_CLIENT_ID, serverMap, null, contextLog); + bpmnServerDefinition.zeebeSaasOAuthUrl = getString(CONF_ZEEBE_SAAS_OAUTHURL, serverMap, null, contextLog); + bpmnServerDefinition.zeebeSaasAudience = getString(CONF_ZEEBE_SAAS_AUDIENCE, serverMap, null, contextLog); + + bpmnServerDefinition.workerExecutionThreads = getInteger(CONF_WORKER_EXECUTION_THREADS, serverMap, + DEFAULT_VALUE_EXECUTION_THREADS, contextLog); + bpmnServerDefinition.operateUserName = getString(CONF_OPERATE_USER_NAME, serverMap, null, contextLog); + bpmnServerDefinition.operateUserPassword = getString(CONF_OPERATE_USER_PASSWORD, serverMap, null, contextLog); + bpmnServerDefinition.operateUrl = getString(CONF_OPERATE_URL, serverMap, null, contextLog); + bpmnServerDefinition.taskListUrl = getString(CONF_TASK_LIST_URL, serverMap, null, contextLog); + if (bpmnServerDefinition.zeebeSaasRegion == null || bpmnServerDefinition.zeebeSaasClientSecret == null + || bpmnServerDefinition.zeebeSaasClusterId == null || bpmnServerDefinition.zeebeSaasClientId == null) throw new AutomatorException( "Incorrect Definition - [zeebeCloudRegister],[zeebeCloudRegion], [zeebeClientSecret},[zeebeCloudClusterId],[zeebeCloudClientId] expected for [Camunda8SaaS] type"); } @@ -228,7 +233,8 @@ private List getFromServersList() throws AutomatorExceptio * @return a ServerDefinition * @throws AutomatorException on any error */ - private BpmnServerDefinition decodeServerConnection(String connectionString, String contextLog) throws AutomatorException { + private BpmnServerDefinition decodeServerConnection(String connectionString, String contextLog) + throws AutomatorException { StringTokenizer st = new StringTokenizer(connectionString, ","); BpmnServerDefinition bpmnServerDefinition = new BpmnServerDefinition(); bpmnServerDefinition.name = (st.hasMoreTokens() ? st.nextToken() : null); @@ -291,13 +297,12 @@ private List getFromServerConfiguration() { camunda7.camunda7UserName = configurationServersEngine.camunda7UserName; camunda7.camunda7Password = configurationServersEngine.camunda7Password; - camunda7.workerMaxJobsActive = parseInt("Camunda7."+CONF_WORKER_MAX_JOBS_ACTIVE, + camunda7.workerMaxJobsActive = parseInt("Camunda7." + CONF_WORKER_MAX_JOBS_ACTIVE, configurationServersEngine.C7WorkerMaxJobsActive, DEFAULT_VALUE_MAX_JOBS_ACTIVE, ""); - camunda7.workerExecutionThreads = parseInt("Camunda7."+CONF_WORKER_EXECUTION_THREADS, + camunda7.workerExecutionThreads = parseInt("Camunda7." + CONF_WORKER_EXECUTION_THREADS, configurationServersEngine.C7WorkerMaxJobsActive, DEFAULT_VALUE_EXECUTION_THREADS, ""); - ; - camunda7.workerMaxJobsActive = parseInt("Camunda7."+CONF_WORKER_MAX_JOBS_ACTIVE, + camunda7.workerMaxJobsActive = parseInt("Camunda7." + CONF_WORKER_MAX_JOBS_ACTIVE, configurationServersEngine.C7WorkerMaxJobsActive, DEFAULT_VALUE_MAX_JOBS_ACTIVE, ""); list.add(camunda7); logger.info("Configuration: Camunda7 Name[{}] url[{}] MaxJobsActive[{}]", camunda7.name, @@ -308,9 +313,9 @@ private List getFromServerConfiguration() { camunda8.serverType = CamundaEngine.CAMUNDA_8; camunda8.name = configurationServersEngine.zeebeName; camunda8.zeebeGatewayAddress = configurationServersEngine.zeebeGatewayAddress; - camunda8.workerExecutionThreads = parseInt("Camunda8."+CONF_WORKER_EXECUTION_THREADS, + camunda8.workerExecutionThreads = parseInt("Camunda8." + CONF_WORKER_EXECUTION_THREADS, configurationServersEngine.zeebeWorkerExecutionThreads, DEFAULT_VALUE_EXECUTION_THREADS, ""); - camunda8.workerMaxJobsActive = parseInt("Camunda8."+CONF_WORKER_MAX_JOBS_ACTIVE, + camunda8.workerMaxJobsActive = parseInt("Camunda8." + CONF_WORKER_MAX_JOBS_ACTIVE, configurationServersEngine.zeebeWorkerMaxJobsActive, DEFAULT_VALUE_MAX_JOBS_ACTIVE, ""); camunda8.operateUrl = configurationServersEngine.zeebeOperateUrl; camunda8.operateUserName = configurationServersEngine.zeebeOperateUserName; @@ -325,7 +330,7 @@ private List getFromServerConfiguration() { } if (hasValue(configurationServersEngine.zeebeSaasClusterId)) { BpmnServerDefinition camunda8 = new BpmnServerDefinition(); - camunda8.serverType= CamundaEngine.CAMUNDA_8_SAAS; + camunda8.serverType = CamundaEngine.CAMUNDA_8_SAAS; camunda8.name = configurationServersEngine.zeebeName; camunda8.zeebeSaasRegion = configurationServersEngine.zeebeSaasRegion; camunda8.zeebeSaasClusterId = configurationServersEngine.zeebeSaasClusterId; @@ -355,15 +360,15 @@ private List getFromServerConfiguration() { private String getString(String name, Map record, String defaultValue, String contextLog) { try { if (!record.containsKey(name)) { - if (defaultValue==null) - logger.error(contextLog+"Variable [{}] not defined in {}", name, contextLog); + if (defaultValue == null) + logger.error(contextLog + "Variable [{}] not defined in {}", name, contextLog); else - logger.info(contextLog+"Variable [{}] not defined in {}", name, contextLog); + logger.info(contextLog + "Variable [{}] not defined in {}", name, contextLog); return defaultValue; } return (String) record.get(name); } catch (Exception e) { - logger.error(contextLog+"Variable [{}] {} bad definition {}", name, contextLog, e.getMessage()); + logger.error(contextLog + "Variable [{}] {} bad definition {}", name, contextLog, e.getMessage()); return defaultValue; } } @@ -371,7 +376,7 @@ private String getString(String name, Map record, String default private Integer getInteger(String name, Map record, Integer defaultValue, String contextLog) { try { if (!record.containsKey(name)) { - if (defaultValue==null) + if (defaultValue == null) logger.error("Variable [{}] not defined in {}", name, contextLog); else logger.info("Variable [{}] not defined in {}", name, contextLog); @@ -449,8 +454,7 @@ public static class BpmnServerDefinition { public String operateUrl; public String taskListUrl; - - /** + /** * Camunda 7 */ public String camunda7ServerUrl; diff --git a/src/main/java/org/camunda/automator/configuration/ConfigurationServersEngine.java b/src/main/java/org/camunda/automator/configuration/ConfigurationServersEngine.java index 1c39f4b..aeac642 100644 --- a/src/main/java/org/camunda/automator/configuration/ConfigurationServersEngine.java +++ b/src/main/java/org/camunda/automator/configuration/ConfigurationServersEngine.java @@ -19,7 +19,6 @@ public class ConfigurationServersEngine { public List> serversList; - @Value("${automator.servers.camunda7.url:''}") public String camunda7Url; @Value("${automator.servers.camunda7.username:}") @@ -48,7 +47,6 @@ public class ConfigurationServersEngine { @Value("${automator.servers.camunda8.workerMaxJobsActive:''}") public String zeebeWorkerMaxJobsActive; - @Value("${automator.servers.camunda8Saas.region:''}") public String zeebeSaasRegion; @Value("${automator.servers.camunda8Saas.clusterId:''}") diff --git a/src/main/java/org/camunda/automator/configuration/ConfigurationStartup.java b/src/main/java/org/camunda/automator/configuration/ConfigurationStartup.java index 4f69d3f..85d55c9 100644 --- a/src/main/java/org/camunda/automator/configuration/ConfigurationStartup.java +++ b/src/main/java/org/camunda/automator/configuration/ConfigurationStartup.java @@ -26,6 +26,7 @@ public class ConfigurationStartup { public boolean deepTracking; @Value("${automator.startup.policyExecution:DEPLOYPROCESS|WARMINGUP|CREATION|SERVICETASK|USERTASK}") public String policyExecution; + /** * it may be necessary to wait the other component to warm up */ diff --git a/src/main/java/org/camunda/automator/definition/Scenario.java b/src/main/java/org/camunda/automator/definition/Scenario.java index c273979..6daeb13 100644 --- a/src/main/java/org/camunda/automator/definition/Scenario.java +++ b/src/main/java/org/camunda/automator/definition/Scenario.java @@ -8,7 +8,6 @@ import com.google.gson.Gson; import com.google.gson.GsonBuilder; -import org.camunda.automator.configuration.BpmnEngineList; import org.camunda.automator.engine.AutomatorException; import org.slf4j.Logger; import org.slf4j.LoggerFactory; @@ -31,8 +30,10 @@ public class Scenario { private final List deployments = new ArrayList<>(); private final List flows = new ArrayList<>(); - - public enum TYPESCENARIO { FLOW, UNIT}; + /** + * Type UNIT + */ + private final List executions = new ArrayList<>(); public TYPESCENARIO typeScenario; @@ -41,24 +42,15 @@ public enum TYPESCENARIO { FLOW, UNIT}; */ private ScenarioWarmingUp warmingUp; private ScenarioFlowControl flowControl; - - /** - * Type UNIT - */ - private final List executions = new ArrayList<>(); - private String name; private String version; private String processName; private String processId; - /** * Server to run the scenario (optional, will be overide by the configuration) */ private String serverName; - private String serverType; - /** * This value is fulfill only if the scenario was read from a file */ @@ -69,13 +61,14 @@ public static Scenario createFromJson(String jsonContent) { builder.setPrettyPrinting(); Gson gson = builder.create(); - Scenario scnHead = gson.fromJson(jsonContent, Scenario.class); - if (scnHead == null) { + Scenario scenario = gson.fromJson(jsonContent, Scenario.class); + if (scenario == null) { logger.error("Scenario: Can't build scenario from content [{}]", jsonContent); return null; } - scnHead.afterUnSerialize(); - return scnHead; + scenario.afterUnSerialize(); + scenario.initialize(); + return scenario; } public static Scenario createFromFile(File scenarioFile) throws AutomatorException { @@ -83,6 +76,7 @@ public static Scenario createFromFile(File scenarioFile) throws AutomatorExcepti Scenario scenario = createFromInputStream(new FileInputStream(scenarioFile), scenarioFile.getAbsolutePath()); scenario.scenarioFile = scenarioFile.getAbsolutePath(); + scenario.initialize(); return scenario; } catch (FileNotFoundException e) { @@ -110,8 +104,8 @@ public static Scenario createFromInputStream(InputStream scenarioInput, String o Scenario scnHead = createFromJson(jsonContent.toString()); if (scnHead == null) { throw new AutomatorException("Scenario: can't load from JSON [" + jsonContent + "] "); - } + scnHead.initialize(); return scnHead; } catch (IOException e) { logger.error("CreateScenarioFromInputString: origin[{}] error {} : {} ", origin, e.getMessage(), e.toString()); @@ -120,6 +114,15 @@ public static Scenario createFromInputStream(InputStream scenarioInput, String o } + /** + * Initialize the scenario and complete it + */ + private void initialize() { + for (int i = 0; i < flows.size(); i++) { + flows.get(i).setStepNumber(i); + } + } + /** * Add a new execution * @@ -185,14 +188,12 @@ public File getScenarioFile() { } } - public String getServerName() { if (serverName == null || serverName.isEmpty()) return null; return serverName; } - private void afterUnSerialize() { // Attention, now we have to manually set the tree relation for (ScenarioExecution scnExecution : getExecutions()) { @@ -200,4 +201,6 @@ private void afterUnSerialize() { } } + public enum TYPESCENARIO {FLOW, UNIT} + } diff --git a/src/main/java/org/camunda/automator/definition/ScenarioStep.java b/src/main/java/org/camunda/automator/definition/ScenarioStep.java index 286521a..1871943 100644 --- a/src/main/java/org/camunda/automator/definition/ScenarioStep.java +++ b/src/main/java/org/camunda/automator/definition/ScenarioStep.java @@ -20,12 +20,12 @@ public class ScenarioStep { * operations; stringtodate() */ private final Map variablesOperation = Collections.emptyMap(); + private final Long fixedBackOffDelay = 0L; + private final MODEEXECUTION modeExecution = MODEEXECUTION.CLASSICAL; /** * In case of a Flow Step, the number of workers to execute this tasks */ - private final Integer nbWorkers = Integer.valueOf(1); - private final Long fixedBackOffDelay = Long.valueOf(0); - private final MODEEXECUTION modeExecution = MODEEXECUTION.CLASSICAL; + private Integer numberOfWorkers; /** * if the step is used in a WarmingUp operation, it can decide this is the time to finish it * Expression is @@ -43,6 +43,7 @@ public class ScenarioStep { * to execute a service task in C8, topic is mandatory */ private String topic; + private final Boolean streamEnable = false; private Map variables = Collections.emptyMap(); private String userId; /** @@ -66,6 +67,11 @@ public class ScenarioStep { */ private String processId; + /** + * Receive a step range in the scenario, which help to identify the step + */ + private int stepNumber = -1; + public ScenarioStep(ScenarioExecution scnExecution) { this.scnExecution = scnExecution; } @@ -91,8 +97,10 @@ public static ScenarioStep createStepUserTask(ScenarioExecution scnExecution, St } public String getInformation() { - return (name == null ? "" : (name + ":")) + getType().toString() + ": taskId:[" + getTaskId() + "] topic:" - + getTopic() + "]"; + return "step_" + stepNumber + " " // cartouche + + (name == null ? "" : ("[" + name + "]:")) // name + + getType().toString() // type + + ",taskId:[" + getTaskId() + "]" + (getTopic() == null ? "" : " topic:[" + getTopic() + "]"); } public Step getType() { @@ -126,6 +134,17 @@ public String getTopic() { return topic; } + public boolean isStreamEnable() { + return streamEnable; + } + + public int getStepNumber() { + return stepNumber; + } + + public void setStepNumber(int stepNumber) { + this.stepNumber = stepNumber; + } /* ******************************************************************** */ /* */ /* getter */ @@ -195,8 +214,12 @@ public String getFrequency() { return frequency; } - public int getNbWorkers() { - return nbWorkers == null || nbWorkers == 0 ? 1 : nbWorkers; + public int getNumberOfWorkers() { + return numberOfWorkers == null || numberOfWorkers == 0 ? 1 : numberOfWorkers; + } + + public void setNumberOfWorkers(int nbWorkers) { + this.numberOfWorkers = nbWorkers; } public String getProcessId() { @@ -232,6 +255,19 @@ public MODEEXECUTION getModeExecution() { return modeExecution == null ? MODEEXECUTION.CLASSICAL : modeExecution; } + /** + * Return an uniq ID of the step (use to + * + * @return the id of the step + */ + public String getId() { + return getType() + " " + switch (getType()) { + case STARTEVENT -> getProcessId() + "-" + getTaskId() + "-" + Thread.currentThread().getName(); + case SERVICETASK -> getTopic() + "-" + Thread.currentThread().getName(); + default -> ""; + }; + } + /** * MODE EXECUTION * CLASSICAL, WAIT: the worker wait the waitingTime time diff --git a/src/main/java/org/camunda/automator/engine/RunParameters.java b/src/main/java/org/camunda/automator/engine/RunParameters.java index 3b6a29b..4ca18e7 100644 --- a/src/main/java/org/camunda/automator/engine/RunParameters.java +++ b/src/main/java/org/camunda/automator/engine/RunParameters.java @@ -51,16 +51,17 @@ public LOGLEVEL getLogLevel() { return logLevel; } - public RunParameters setServerName(String serverName) { - this.serverName = serverName; + public RunParameters setLogLevel(LOGLEVEL logLevel) { + this.logLevel = logLevel; return this; } + public String getServerName() { return this.serverName; } - public RunParameters setLogLevel(LOGLEVEL logLevel) { - this.logLevel = logLevel; + public RunParameters setServerName(String serverName) { + this.serverName = serverName; return this; } diff --git a/src/main/java/org/camunda/automator/engine/RunResult.java b/src/main/java/org/camunda/automator/engine/RunResult.java index 0185ca3..7b5653e 100644 --- a/src/main/java/org/camunda/automator/engine/RunResult.java +++ b/src/main/java/org/camunda/automator/engine/RunResult.java @@ -157,9 +157,9 @@ public void addVerification(ScenarioVerificationBasic verification, boolean isSu this.listVerifications.add(verificationStatus); } -public boolean hasErrors() { - return ! listErrors.isEmpty(); -} + public boolean hasErrors() { + return !listErrors.isEmpty(); + } /* ******************************************************************** */ /* */ @@ -262,7 +262,7 @@ public int getNumberOfErrorSteps() { */ public String getSynthesis(boolean fullDetail) { StringBuilder synthesis = new StringBuilder(); - synthesis.append((isSuccess() && ! hasErrors()) ? "SUCCESS " : "FAIL "); + synthesis.append((isSuccess() && !hasErrors()) ? "SUCCESS " : "FAIL "); synthesis.append(runScenario.getScenario().getName()); synthesis.append("("); synthesis.append(runScenario.getScenario().getProcessId()); @@ -271,9 +271,9 @@ public String getSynthesis(boolean fullDetail) { StringBuilder append = synthesis.append(timeExecution); synthesis.append(" timeExecution(ms), "); RecordCreationPI recordCreationPI = recordCreationPIMap.get(runScenario.getScenario().getProcessId()); - synthesis.append(recordCreationPI==null? 0 : recordCreationPI.nbCreated); + synthesis.append(recordCreationPI == null ? 0 : recordCreationPI.nbCreated); synthesis.append(" PICreated, "); - synthesis.append(recordCreationPI==null? 0 : recordCreationPI.nbFailed); + synthesis.append(recordCreationPI == null ? 0 : recordCreationPI.nbFailed); synthesis.append(" PIFailed, "); synthesis.append(numberOfSteps); synthesis.append(" stepsExecuted, "); diff --git a/src/main/java/org/camunda/automator/engine/RunScenario.java b/src/main/java/org/camunda/automator/engine/RunScenario.java index 7994973..9892760 100644 --- a/src/main/java/org/camunda/automator/engine/RunScenario.java +++ b/src/main/java/org/camunda/automator/engine/RunScenario.java @@ -146,8 +146,7 @@ public RunResult runExecutions() { if (scenario.typeScenario.equals(Scenario.TYPESCENARIO.UNIT)) { List> listFutures = new ArrayList<>(); logger.info("RunScenario: ------ execution UNIT scenario [{}] {} execution on {} Threads", scenario.getName(), - scenario.getExecutions().size(), - runParameters.getNumberOfThreadsPerScenario()); + scenario.getExecutions().size(), runParameters.getNumberOfThreadsPerScenario()); for (int i = 0; i < scenario.getExecutions().size(); i++) { ScenarioExecution scnExecution = scenario.getExecutions().get(i); diff --git a/src/main/java/org/camunda/automator/engine/RunZeebeOperation.java b/src/main/java/org/camunda/automator/engine/RunZeebeOperation.java index 552eabb..c76088e 100644 --- a/src/main/java/org/camunda/automator/engine/RunZeebeOperation.java +++ b/src/main/java/org/camunda/automator/engine/RunZeebeOperation.java @@ -7,11 +7,14 @@ package org.camunda.automator.engine; import org.camunda.automator.definition.ScenarioStep; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; import java.util.HashMap; import java.util.Map; public class RunZeebeOperation { + private static final Logger logger = LoggerFactory.getLogger(RunZeebeOperation.class); // Static method only private RunZeebeOperation() { @@ -20,17 +23,22 @@ private RunZeebeOperation() { /** * Resolve variables */ - public static Map getVariablesStep(RunScenario runScenario, ScenarioStep step) + public static Map getVariablesStep(RunScenario runScenario, ScenarioStep step, int index) throws AutomatorException { Map variablesCompleted = new HashMap<>(); variablesCompleted.putAll(step.getVariables()); // execute all operations now for (Map.Entry entryOperation : step.getVariablesOperations().entrySet()) { + if (runScenario.getRunParameters().showLevelDebug()) + logger.info("Scenario Key[{}] Value[{}] Step {}", entryOperation.getKey(), entryOperation.getValue(), + step.getInformation()); variablesCompleted.put(entryOperation.getKey(), runScenario.getServiceAccess().serviceDataOperation.execute(entryOperation.getValue(), runScenario, - "Step " + step.getInformation())); + "Step " + step.getInformation(), index)); } + if (runScenario.getRunParameters().showLevelDebug() && !variablesCompleted.isEmpty()) + logger.info("SetVariable [{}] {}", step.getVariables(), step.getInformation()); return variablesCompleted; } diff --git a/src/main/java/org/camunda/automator/engine/flow/CreateProcessInstanceThread.java b/src/main/java/org/camunda/automator/engine/flow/CreateProcessInstanceThread.java new file mode 100644 index 0000000..06357a8 --- /dev/null +++ b/src/main/java/org/camunda/automator/engine/flow/CreateProcessInstanceThread.java @@ -0,0 +1,187 @@ +package org.camunda.automator.engine.flow; + +import org.camunda.automator.definition.ScenarioStep; +import org.camunda.automator.engine.AutomatorException; +import org.camunda.automator.engine.RunResult; +import org.camunda.automator.engine.RunScenario; +import org.camunda.automator.engine.RunZeebeOperation; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +import java.time.Duration; +import java.util.ArrayList; +import java.util.List; +import java.util.Map; +import java.util.concurrent.ExecutorService; +import java.util.concurrent.Executors; +import java.util.concurrent.TimeUnit; +import java.util.stream.Collectors; + +public class CreateProcessInstanceThread { + private final int executionBatchNumber; + private final ScenarioStep scenarioStep; + private final RunScenario runScenario; + private final RunResult runResult; + private final Logger logger = LoggerFactory.getLogger(CreateProcessInstanceThread.class); + private final List listStartProcess = new ArrayList<>(); + + /** + * @param executionBatchNumber Each time a new batch is running, this number increase + * @param scenarioStep scenario step + * @param runScenario scenario + * @param runResult result to fulfill + */ + public CreateProcessInstanceThread(int executionBatchNumber, + ScenarioStep scenarioStep, + RunScenario runScenario, + RunResult runResult) { + this.executionBatchNumber = executionBatchNumber; + this.scenarioStep = scenarioStep; + this.runScenario = runScenario; + this.runResult = runResult; + } + + /** + * After the duration, we stop + * + * @param durationToCreateProcessInstances maximum duration to produce all PI + */ + public void startProcessInstance(Duration durationToCreateProcessInstances) { + + int numberOfThreads = scenarioStep.getNumberOfWorkers() == 0 ? 1 : scenarioStep.getNumberOfWorkers(); + + ExecutorService executor = Executors.newFixedThreadPool(numberOfThreads); + int totalNumberOfPi = 0; + + int processInstancePerThread = (int) Math.ceil(1.0 * scenarioStep.getNumberOfExecutions() / numberOfThreads); + // Submit tasks to the executor + for (int i = 0; i < numberOfThreads; i++) { + int numberOfProcessInstanceToStart = Math.min(processInstancePerThread, + scenarioStep.getNumberOfExecutions() - totalNumberOfPi); + totalNumberOfPi += numberOfProcessInstanceToStart; + StartProcess task = new StartProcess(executionBatchNumber, i, numberOfProcessInstanceToStart, + durationToCreateProcessInstances, scenarioStep, runScenario, runResult); + executor.submit(task); + listStartProcess.add(task); + } + // Shut down the executor and wait for all tasks to complete + executor.shutdown(); + try { + executor.awaitTermination(Long.MAX_VALUE, TimeUnit.NANOSECONDS); + } catch (InterruptedException e) { + logger.error("Error during waiting for the end of all tasks"); + } + } + + public List getListProcessInstances() { + return listStartProcess.stream().flatMap(t -> t.listProcessInstances.stream()).collect(Collectors.toList()); + } + + public int getTotalCreation() { + return listStartProcess.stream().mapToInt(t -> t.nbCreation).sum(); + } + + public int getTotalFailed() { + return listStartProcess.stream().mapToInt(t -> t.nbFailed).sum(); + } + + /** + * return true if the creation overload the durationToCreate: we can't create all PI in the duration + * + * @return true if it wasn't possible to create all PI during the duration + */ + public boolean isOverload() { + return listStartProcess.stream().anyMatch(t -> t.isOverload); + } + + /** + * This subclass start a range of process instances + */ + private class StartProcess implements Runnable { + private final ScenarioStep scenarioStep; + private final RunResult runResult; + private final RunScenario runScenario; + private final int executionBatchNumber; + private final int indexInBatch; + int numberOfProcessInstanceToStart; + List listProcessInstances = new ArrayList<>(); + int nbCreation = 0; + int nbFailed = 0; + /** + * the batch number + */ + boolean isOverload = false; + Duration durationToCreateProcessInstances; + + /** + * @param executionBatchNumber + * @param indexInBatch the component number, when multiple component where generated to handle the flow + * @param numberOfProcessInstanceToStart number of process instance to start by this object + * @param durationToCreateProcessInstances duration max allowed to create process instance + * @param scenarioStep step to use to create the process instance + * @param runScenario scenario to use + * @param runResult result object to save information + */ + public StartProcess(int executionBatchNumber, + int indexInBatch, + int numberOfProcessInstanceToStart, + Duration durationToCreateProcessInstances, + ScenarioStep scenarioStep, + RunScenario runScenario, + RunResult runResult) { + this.executionBatchNumber = executionBatchNumber; + this.indexInBatch = indexInBatch; + this.durationToCreateProcessInstances = durationToCreateProcessInstances; + this.numberOfProcessInstanceToStart = numberOfProcessInstanceToStart; + this.runResult = runResult; + this.runScenario = runScenario; + this.scenarioStep = scenarioStep; + } + + @Override + public void run() { + boolean alreadyLoggedError = false; + isOverload = false; + long begin = System.currentTimeMillis(); + for (int i = 0; i < numberOfProcessInstanceToStart; i++) { + + // operation + try { + Map variables = RunZeebeOperation.getVariablesStep(runScenario, scenarioStep, indexInBatch); + String processInstance = runScenario.getBpmnEngine() + .createProcessInstance(scenarioStep.getProcessId(), scenarioStep.getTaskId(), // activityId + variables); + + if (runScenario.getRunParameters().showLevelDebug()) + logger.info("batch_#{} Create ProcessInstance:{} Variables {}", executionBatchNumber, processInstance, + variables); + + if (listProcessInstances.size() < 21) + listProcessInstances.add(processInstance); + nbCreation++; + runResult.registerAddProcessInstance(scenarioStep.getProcessId(), true); + + } catch (AutomatorException e) { + if (!alreadyLoggedError) + runResult.addError(scenarioStep, + "batch_#" + executionBatchNumber + "-" + scenarioStep.getId() + " Error at creation: [" + e.getMessage() + + "]"); + alreadyLoggedError = true; + nbFailed++; + runResult.registerAddProcessInstance(scenarioStep.getProcessId(), false); + } + // do we have to stop the execution? + long currentTimeMillis = System.currentTimeMillis(); + Duration durationCurrent = durationToCreateProcessInstances.minusMillis(currentTimeMillis - begin); + if (durationCurrent.isNegative()) { + // take too long to create the required process instance, so stop now. + logger.info("batch_#" + executionBatchNumber + "-" + scenarioStep.getId() + + " Take too long to created ProcessInstances: created {} when expected {} in {} ms", nbCreation, + scenarioStep.getNumberOfExecutions(), currentTimeMillis - begin); + isOverload = true; + break; + } + } + } + } +} diff --git a/src/main/java/org/camunda/automator/engine/flow/RunScenarioFlowBasic.java b/src/main/java/org/camunda/automator/engine/flow/RunScenarioFlowBasic.java index d233b49..677c3fd 100644 --- a/src/main/java/org/camunda/automator/engine/flow/RunScenarioFlowBasic.java +++ b/src/main/java/org/camunda/automator/engine/flow/RunScenarioFlowBasic.java @@ -17,33 +17,20 @@ public abstract class RunScenarioFlowBasic { protected final RunResult runResult; private final ScenarioStep scenarioStep; private final RunScenario runScenario; - private final int index; - RunScenarioFlowBasic(ScenarioStep scenarioStep, int index, RunScenario runScenario, RunResult runResult) { - this.index = index; + RunScenarioFlowBasic(ScenarioStep scenarioStep, RunScenario runScenario, RunResult runResult) { this.scenarioStep = scenarioStep; this.runScenario = runScenario; this.runResult = runResult; } /** - * Return the index of this basicFlow + * Return an uniq ID of the step * - * @return index + * @return the ID of the step */ - public int getIndex() { - return index; - } - public String getId() { - String id = scenarioStep.getType() + " "; - id += switch (scenarioStep.getType()) { - case STARTEVENT -> scenarioStep.getProcessId() + "-" + scenarioStep.getTaskId() + "-" + Thread.currentThread() - .getName(); - case SERVICETASK -> scenarioStep.getTopic() + "-" + Thread.currentThread().getName(); - default -> ""; - }; - return id + "#" + getIndex(); + return scenarioStep.getId(); } public RunScenario getRunScenario() { diff --git a/src/main/java/org/camunda/automator/engine/flow/RunScenarioFlowServiceTask.java b/src/main/java/org/camunda/automator/engine/flow/RunScenarioFlowServiceTask.java index ec950c4..de94213 100644 --- a/src/main/java/org/camunda/automator/engine/flow/RunScenarioFlowServiceTask.java +++ b/src/main/java/org/camunda/automator/engine/flow/RunScenarioFlowServiceTask.java @@ -18,6 +18,7 @@ import org.camunda.automator.definition.ScenarioStep; import org.camunda.automator.engine.RunResult; import org.camunda.automator.engine.RunScenario; +import org.camunda.automator.engine.RunZeebeOperation; import org.camunda.bpm.client.task.ExternalTaskHandler; import org.camunda.bpm.client.task.ExternalTaskService; import org.slf4j.Logger; @@ -44,10 +45,9 @@ public class RunScenarioFlowServiceTask extends RunScenarioFlowBasic { public RunScenarioFlowServiceTask(TaskScheduler scheduler, ScenarioStep scenarioStep, - int index, RunScenario runScenario, RunResult runResult) { - super(scenarioStep, index, runScenario, runResult); + super(scenarioStep, runScenario, runResult); this.scheduler = scheduler; this.semaphore = new Semaphore(runScenario.getBpmnEngine().getWorkerExecutionThreads()); @@ -110,24 +110,9 @@ private void registerWorker() { registeredTask = bpmnEngine.registerServiceTask(getId(), // workerId getScenarioStep().getTopic(), // topic + getScenarioStep().isStreamEnable(), // stream durationSleep, // lock time new SimpleDelayHandler(this), new FixedBackoffSupplier(getScenarioStep().getFixedBackOffDelay())); - /* - // calculate the lock duration: this is * - ZeebeClient zeebeClient = ((BpmnEngineCamunda8) getRunScenario().getBpmnEngine()).getZeebeClient(); - - JobWorkerBuilderStep1.JobWorkerBuilderStep3 step3 = zeebeClient.newWorker() - .jobType(getScenarioStep().getTopic()) - .handler(new SimpleDelayC8Handler(this)) - .timeout(durationSleep) - .name(getId()); - - if (getScenarioStep().getFixedBackOffDelay() > 0) { - step3.backoffSupplier(new FixedBackoffSupplier(getScenarioStep().getFixedBackOffDelay())); - } - jobWorker = step3.open(); - */ - } private static class TrackActiveWorker { @@ -183,6 +168,8 @@ private void manageWaitExecution(org.camunda.bpm.client.task.ExternalTask extern ActivatedJob activatedJob, long waitTimeInMs) { long begin = System.currentTimeMillis(); + Map variables = new HashMap<>(); + Map currentVariables = new HashMap<>(); try { if (getRunScenario().getRunParameters().isDeepTracking()) trackActiveWorkers.movement(1); @@ -190,14 +177,18 @@ private void manageWaitExecution(org.camunda.bpm.client.task.ExternalTask extern if (waitTimeInMs > 0) Thread.sleep(waitTimeInMs); - Map variables = new HashMap<>(); + variables = RunZeebeOperation.getVariablesStep(flowServiceTask.getRunScenario(), + flowServiceTask.getScenarioStep(), 0); + /* C7 */ if (externalTask != null) { + currentVariables = externalTask.getAllVariables(); externalTaskService.complete(externalTask, variables); } /* C8 */ if (jobClient != null) { + currentVariables = activatedJob.getVariablesAsMap(); CompleteJobCommandStep1 completeCommand = jobClient.newCompleteCommand(activatedJob.getKey()); CommandWrapper command = new RefactoredCommandWrapper((FinalCommandStep) completeCommand, activatedJob.getDeadline(), activatedJob.toString(), exceptionHandlingStrategy); @@ -209,7 +200,11 @@ private void manageWaitExecution(org.camunda.bpm.client.task.ExternalTask extern } catch (Exception e) { logger.error( - "Error task[" + flowServiceTask.getId() + " " + externalTask.getBusinessKey() + " : " + e.getMessage()); + "Error task[{}] PI[{}] : {}", + flowServiceTask.getId(), + (externalTask != null ? externalTask.getProcessDefinitionKey() : activatedJob.getProcessInstanceKey()), + e.getMessage() + ); flowServiceTask.runResult.registerAddErrorStepExecution(); @@ -219,15 +214,14 @@ private void manageWaitExecution(org.camunda.bpm.client.task.ExternalTask extern if (getRunScenario().getRunParameters().isDeepTracking()) trackActiveWorkers.movement(-1); - if (getRunScenario().getRunParameters().showLevelMonitoring()) { - logger.info("Executed task[{}] in {} ms Sleep [{} s]", getId(), end - begin, durationSleep.getSeconds()); - /* - logger.info( - "Executed task[" + getId() + "] in " + (end - begin) + " ms" + " Sleep [" + durationSleep.getSeconds() - + " s]"); - */ + if (getRunScenario().getRunParameters().showLevelInfo()) { + logger.info("Executed task[{}] in {} ms PI[{}] CurrentVariable {} Variable {} Sleep [{} s]", getId(), + end - begin, + (externalTask != null ? externalTask.getProcessDefinitionKey() : activatedJob.getProcessInstanceKey()), + currentVariables, variables, durationSleep.getSeconds()); } + } private void manageAsynchronousExecution(org.camunda.bpm.client.task.ExternalTask externalTask, diff --git a/src/main/java/org/camunda/automator/engine/flow/RunScenarioFlowStartEvent.java b/src/main/java/org/camunda/automator/engine/flow/RunScenarioFlowStartEvent.java index 0491f33..65632cf 100644 --- a/src/main/java/org/camunda/automator/engine/flow/RunScenarioFlowStartEvent.java +++ b/src/main/java/org/camunda/automator/engine/flow/RunScenarioFlowStartEvent.java @@ -7,17 +7,14 @@ package org.camunda.automator.engine.flow; import org.camunda.automator.definition.ScenarioStep; -import org.camunda.automator.engine.AutomatorException; import org.camunda.automator.engine.RunResult; import org.camunda.automator.engine.RunScenario; -import org.camunda.automator.engine.RunZeebeOperation; import org.slf4j.Logger; import org.slf4j.LoggerFactory; import org.springframework.scheduling.TaskScheduler; import java.time.Duration; import java.time.Instant; -import java.util.ArrayList; import java.util.List; import java.util.stream.Collectors; @@ -26,14 +23,16 @@ public class RunScenarioFlowStartEvent extends RunScenarioFlowBasic { Logger logger = LoggerFactory.getLogger(RunScenarioFlowStartEvent.class); private boolean stopping; private boolean isRunning; - private int stepNumber = 0; + /** + * Each time we run a batch of start, execution Number increase + */ + private int executionBatchNumber = 1; public RunScenarioFlowStartEvent(TaskScheduler scheduler, ScenarioStep scenarioStep, - int index, RunScenario runScenario, RunResult runResult) { - super(scenarioStep, index, runScenario, runResult); + super(scenarioStep, runScenario, runResult); this.scheduler = scheduler; } @@ -41,11 +40,9 @@ public RunScenarioFlowStartEvent(TaskScheduler scheduler, public void execute() { stopping = false; isRunning = true; - Duration duration = Duration.parse(getScenarioStep().getFrequency()); - - StartEventRunnable startEventRunnable = new StartEventRunnable(scheduler, getScenarioStep(), runResult, - getRunScenario(), this); - scheduler.schedule(startEventRunnable, Instant.now()); + StartEventRunnable startEventRunnable = new StartEventRunnable(scheduler, getScenarioStep(), getRunScenario(), this, + runResult); + startEventRunnable.start(); } @Override @@ -92,9 +89,9 @@ class StartEventRunnable implements Runnable { public StartEventRunnable(TaskScheduler scheduler, ScenarioStep scenarioStep, - RunResult runResult, RunScenario runScenario, - RunScenarioFlowStartEvent flowStartEvent) { + RunScenarioFlowStartEvent flowStartEvent, + RunResult runResult) { this.scheduler = scheduler; this.scenarioStep = scenarioStep; this.runResult = runResult; @@ -102,9 +99,17 @@ public StartEventRunnable(TaskScheduler scheduler, this.flowStartEvent = flowStartEvent; } + /** + * Start it in a new tread + */ + public void start() { + scheduler.schedule(this, Instant.now()); + + } + @Override public void run() { - stepNumber++; + executionBatchNumber++; if (flowStartEvent.stopping) { if (runScenario.getRunParameters().showLevelMonitoring()) { logger.info("Stop now [" + getId() + "]"); @@ -121,68 +126,49 @@ public void run() { flowStartEvent.isRunning = false; return; } + Duration durationToCreateProcessInstances = Duration.parse(scenarioStep.getFrequency()); + long begin = System.currentTimeMillis(); - int nbCreation = 0; - int nbFailed = 0; boolean isOverloadSection = false; - Duration duration = Duration.parse(scenarioStep.getFrequency()); - List listProcessInstances = new ArrayList<>(); totalCreationGoal += scenarioStep.getNumberOfExecutions(); - boolean alreadyLoggedError = false; - for (int i = 0; i < scenarioStep.getNumberOfExecutions(); i++) { - - // operation - try { - String processInstance = runScenario.getBpmnEngine() - .createProcessInstance(scenarioStep.getProcessId(), scenarioStep.getTaskId(), // activityId - RunZeebeOperation.getVariablesStep(runScenario, scenarioStep)); - if (listProcessInstances.size() < 21) - listProcessInstances.add(processInstance); - nbCreation++; - totalCreation++; - runResult.registerAddProcessInstance(scenarioStep.getProcessId(), true); - } catch (AutomatorException e) { - if (!alreadyLoggedError) - runResult.addError(scenarioStep, - "Step #" + stepNumber + "-" + getId() + " Error at creation: [" + e.getMessage() + "]"); - alreadyLoggedError = true; - nbFailed++; - totalFailed++; - runResult.registerAddProcessInstance(scenarioStep.getProcessId(), false); - } - - // do we have to stop the execution? - long currentTimeMillis = System.currentTimeMillis(); - Duration durationCurrent = duration.minusMillis(currentTimeMillis - begin); - if (durationCurrent.isNegative()) { - // take too long to create the required process instance, so stop now. - logger.info("Step #" + stepNumber + "-" + getId() - + " Take too long to created ProcessInstances: created {} when expected {}", nbCreation, - scenarioStep.getNumberOfExecutions()); - isOverloadSection = true; - break; - } - - } // end of loop getNumberOfExecutions() + // generate process instance + CreateProcessInstanceThread createProcessInstanceThread = new CreateProcessInstanceThread(executionBatchNumber, + scenarioStep, runScenario, runResult); + createProcessInstanceThread.startProcessInstance(durationToCreateProcessInstances); + totalCreation += createProcessInstanceThread.getTotalCreation(); + totalFailed += createProcessInstanceThread.getTotalCreation(); + List listProcessInstances = createProcessInstanceThread.getListProcessInstances(); long end = System.currentTimeMillis(); - duration = duration.minusMillis(end - begin); - if (duration.isNegative()) { - duration = Duration.ZERO; - isOverloadSection = true; + + // do we have to stop the execution? + if (createProcessInstanceThread.isOverload()) { + // take too long to create the required process instance, so stop now. nbOverloaded++; + isOverloadSection = true; + } + + // calculate the time to wait now + Duration durationToWait = durationToCreateProcessInstances.minusMillis(end - begin); + if (durationToWait.isNegative()) { + durationToWait = Duration.ZERO; } - if (runScenario.getRunParameters().showLevelMonitoring()) { - logger.info("Step #" + stepNumber + "-" + getId() // id - + "] Create (real/scenario)[" + nbCreation + "/" + scenarioStep.getNumberOfExecutions() // creation/target - + "] Failed[" + nbFailed // failed + // report now + if (runScenario.getRunParameters().showLevelMonitoring() || createProcessInstanceThread.isOverload()) { + logger.info("Step #" + executionBatchNumber + "-" + getId() // id + + "] Create (real/scenario)[" + createProcessInstanceThread.getTotalCreation() + "/" + + scenarioStep.getNumberOfExecutions() // creation/target + + (isOverloadSection ? "OVERLOAD" : "") // Overload marker + + "] Failed[" + createProcessInstanceThread.getTotalCreation() // failed + "] in " + (end - begin) + " ms " // time of operation - + (isOverloadSection ? "OVERLOAD" : "") + " Sleep[" + duration.getSeconds() + " s] listPI(max20): " - + listProcessInstances.stream().collect(Collectors.joining(","))); + + " Sleep[" + durationToWait.getSeconds() + " s] listPI(max20): " + listProcessInstances.stream() + .collect(Collectors.joining(","))); } - scheduler.schedule(this, Instant.now().plusMillis(duration.toMillis())); + + // Wait to restart + scheduler.schedule(this, Instant.now().plusMillis(durationToWait.toMillis())); } } diff --git a/src/main/java/org/camunda/automator/engine/flow/RunScenarioFlowUserTask.java b/src/main/java/org/camunda/automator/engine/flow/RunScenarioFlowUserTask.java index 7c86f9e..c4141dc 100644 --- a/src/main/java/org/camunda/automator/engine/flow/RunScenarioFlowUserTask.java +++ b/src/main/java/org/camunda/automator/engine/flow/RunScenarioFlowUserTask.java @@ -24,14 +24,13 @@ public RunScenarioFlowUserTask(TaskScheduler scheduler, int index, RunScenario runScenario, RunResult runResult) { - super(scenarioStep, index, runScenario, runResult); + super(scenarioStep, runScenario, runResult); this.scheduler = scheduler; } @Override public void execute() { - RunScenarioFlowUserTask.UserTaskRunnable startEventRunnable = new RunScenarioFlowUserTask.UserTaskRunnable( scheduler, getScenarioStep(), runResult, getRunScenario(), this); scheduler.schedule(startEventRunnable, Instant.now()); @@ -119,7 +118,7 @@ public void run() { for (String taskInstanceId : listActivities) { getRunScenario().getBpmnEngine() .executeUserTask(taskInstanceId, getScenarioStep().getUserId(), - RunZeebeOperation.getVariablesStep(getRunScenario(), getScenarioStep())); + RunZeebeOperation.getVariablesStep(getRunScenario(), getScenarioStep(), 0)); } } } catch (AutomatorException e) { diff --git a/src/main/java/org/camunda/automator/engine/flow/RunScenarioFlows.java b/src/main/java/org/camunda/automator/engine/flow/RunScenarioFlows.java index 533803c..69ce230 100644 --- a/src/main/java/org/camunda/automator/engine/flow/RunScenarioFlows.java +++ b/src/main/java/org/camunda/automator/engine/flow/RunScenarioFlows.java @@ -44,7 +44,8 @@ public void execute(RunResult runResult) { RunScenarioWarmingUp runScenarioWarmingUp = new RunScenarioWarmingUp(serviceAccess, runScenario); Map recordCreationPIMap = new HashMap<>(); if (runScenario.getScenario().getFlowControl() == null) { - runResult.addError(null, "Scenario does not declare a [FlowControl] section. This section is mandatory for a Flow Scenario"); + runResult.addError(null, + "Scenario does not declare a [FlowControl] section. This section is mandatory for a Flow Scenario"); return; } @@ -93,14 +94,13 @@ private List startExecution() { case STARTEVENT -> { if (!runScenario.getRunParameters().isCreation()) { logger.info("According configuration, STARTEVENT[" + scenarioStep.getProcessId() + "] is fully disabled"); - } else - for (int i = 0; i < scenarioStep.getNbWorkers(); i++) { - RunScenarioFlowStartEvent runStartEvent = new RunScenarioFlowStartEvent( - serviceAccess.getTaskScheduler(scenarioStep.getProcessId() + "-" + i), scenarioStep, i, runScenario, - new RunResult(runScenario)); - runStartEvent.execute(); - listFlows.add(runStartEvent); - } + } else { + RunScenarioFlowStartEvent runStartEvent = new RunScenarioFlowStartEvent( + serviceAccess.getTaskScheduler(scenarioStep.getProcessId()), scenarioStep, runScenario, + new RunResult(runScenario)); + runStartEvent.execute(); + listFlows.add(runStartEvent); + } } case SERVICETASK -> { @@ -111,7 +111,7 @@ private List startExecution() { scenarioStep.getTopic(), runScenario.getRunParameters().getFilterServiceTask()); } else { RunScenarioFlowServiceTask runServiceTask = new RunScenarioFlowServiceTask( - serviceAccess.getTaskScheduler("serviceTask"), scenarioStep, 0, runScenario, new RunResult(runScenario)); + serviceAccess.getTaskScheduler("serviceTask"), scenarioStep, runScenario, new RunResult(runScenario)); runServiceTask.execute(); listFlows.add(runServiceTask); } diff --git a/src/main/java/org/camunda/automator/engine/flow/RunScenarioWarmingUp.java b/src/main/java/org/camunda/automator/engine/flow/RunScenarioWarmingUp.java index 14033df..4695c7c 100644 --- a/src/main/java/org/camunda/automator/engine/flow/RunScenarioWarmingUp.java +++ b/src/main/java/org/camunda/automator/engine/flow/RunScenarioWarmingUp.java @@ -12,7 +12,6 @@ import org.camunda.automator.engine.AutomatorException; import org.camunda.automator.engine.RunResult; import org.camunda.automator.engine.RunScenario; -import org.camunda.automator.engine.RunZeebeOperation; import org.camunda.automator.services.ServiceAccess; import org.slf4j.Logger; import org.slf4j.LoggerFactory; @@ -76,24 +75,23 @@ public void warmingUp(RunResult runResult) { .filter(t -> t.getType().equals(ScenarioStep.Step.USERTASK)) .toList()); } - logger.info("WarmingUp: Start ---- {} operations (useServiceTask {} useUserTask {}", listOperationWarmingUp.size(), + logger.info("WarmingUp: Start ---- {} operations (useServiceTask {} useUserTask {})", listOperationWarmingUp.size(), warmingUp.useServiceTasks, warmingUp.useUserTasks); for (ScenarioStep scenarioStep : listOperationWarmingUp) { switch (scenarioStep.getType()) { case STARTEVENT -> { - logger.info("WarmingUp: StartEvent Generate [{}] Frequency [{}] EndWarmingUp [{}]", + logger.info("WarmingUp: StartEvent GeneratePI[{}] Frequency[{}] EndWarmingUp[{}]", scenarioStep.getNumberOfExecutions(), scenarioStep.getFrequency(), scenarioStep.getEndWarmingUp()); - StartEventWarmingUpRunnable startEventWarmingUpRunnable = new StartEventWarmingUpRunnable( - serviceAccess.getTaskScheduler("warmingUp"), scenarioStep, runScenario, runResult); + serviceAccess.getTaskScheduler("warmingUp"), scenarioStep, 0, runScenario, runResult); listWarmingUpStartEvent.add(startEventWarmingUpRunnable); - startEventWarmingUpRunnable.run(); + startEventWarmingUpRunnable.start(); } case SERVICETASK -> { logger.info("WarmingUp: Start Service Task topic[{}]", scenarioStep.getTopic()); RunScenarioFlowServiceTask task = new RunScenarioFlowServiceTask(serviceAccess.getTaskScheduler("serviceTask"), - scenarioStep, 0, runScenario, new RunResult(runScenario)); + scenarioStep, runScenario, new RunResult(runScenario)); task.execute(); listWarmingUpServiceTask.add(task); } @@ -165,11 +163,13 @@ public void warmingUp(RunResult runResult) { /** * StartEventRunnable + * Must be runnable because we will schedule it. */ class StartEventWarmingUpRunnable implements Runnable { private final TaskScheduler scheduler; private final ScenarioStep scenarioStep; + private final int index; private final RunScenario runScenario; private final RunResult runResult; public boolean stop = false; @@ -178,13 +178,16 @@ class StartEventWarmingUpRunnable implements Runnable { public String warmingUpNotFinishedAnalysis = "Not verified yet"; public int nbInstancesCreated = 0; private int nbOverloaded = 0; + private int executionBatchNumber = 1; public StartEventWarmingUpRunnable(TaskScheduler scheduler, ScenarioStep scenarioStep, + int index, RunScenario runScenario, RunResult runResult) { this.scheduler = scheduler; this.scenarioStep = scenarioStep; + this.index = index; this.runScenario = runScenario; this.runResult = runResult; } @@ -193,11 +196,20 @@ public void pleaseStop(boolean stop) { this.stop = stop; } + /** + * Start it in a new tread + */ + public void start() { + scheduler.schedule(this, Instant.now()); + + } + @Override public void run() { if (stop) { return; } + executionBatchNumber++; // check if the condition is reach CheckFunctionResult checkFunctionResult = null; if (scenarioStep.getEndWarmingUp() != null) { @@ -213,47 +225,53 @@ public void run() { } // continue to generate PI long begin = System.currentTimeMillis(); - List listProcessInstance = new ArrayList<>(); - try { - for (int i = 0; i < scenarioStep.getNumberOfExecutions(); i++) { - String processInstance = runScenario.getBpmnEngine() - .createProcessInstance(scenarioStep.getProcessId(), scenarioStep.getTaskId(), // activityId - RunZeebeOperation.getVariablesStep(runScenario, scenarioStep)); - nbInstancesCreated++; - if (listProcessInstance.size() < 21) - listProcessInstance.add(processInstance); - } - } catch (AutomatorException e) { - logger.error("Error at creation: [{}]", e.getMessage()); + CreateProcessInstanceThread createProcessInstanceThread = new CreateProcessInstanceThread(executionBatchNumber, + scenarioStep, runScenario, runResult); + + Duration durationWarmup; + if (scenarioStep.getFrequency() == null || scenarioStep.getFrequency().isEmpty()) { + durationWarmup = Duration.ofHours(1); + } else { + durationWarmup = Duration.parse(scenarioStep.getFrequency()); } + + createProcessInstanceThread.startProcessInstance(durationWarmup); + long end = System.currentTimeMillis(); // one step generation? if (scenarioStep.getFrequency() == null || scenarioStep.getFrequency().isEmpty()) { if (runScenario.getRunParameters().showLevelMonitoring()) { - logger.info("WarmingUp:StartEvent Create[{}] in {} " + " ms" + " (oneShoot) listPI(max20): ", + logger.info("WarmingUp:StartEvent Create[{}] in {} ms (oneShoot) listPI(max20): ", scenarioStep.getNumberOfExecutions(), (end - begin), - listProcessInstance.stream().collect(Collectors.joining(","))); + createProcessInstanceThread.getListProcessInstances().stream().collect(Collectors.joining(","))); } warmingUpFinishedAnalysis += "GoalOneShoot"; warmingUpFinished = true; return; } - Duration duration = Duration.parse(scenarioStep.getFrequency()); - duration = duration.minusMillis(end - begin); - if (duration.isNegative()) { - duration = Duration.ZERO; + if (createProcessInstanceThread.isOverload()) { nbOverloaded++; } - + Duration durationToWait; + if (scenarioStep.getFrequency() == null || scenarioStep.getFrequency().isEmpty()) { + durationToWait = Duration.ZERO; + } else { + durationToWait = durationWarmup.minusMillis(end - begin); + if (durationToWait.isNegative()) { + durationToWait = Duration.ZERO; + } + } if (runScenario.getRunParameters().showLevelMonitoring()) { - logger.info( - "Warmingup Create[" + scenarioStep.getNumberOfExecutions() + "] in " + (end - begin) + " ms" + " Sleep [" - + duration.getSeconds() + " s]" + (checkFunctionResult == null ? - "" : - "EndWarmingUp:" + checkFunctionResult.analysis)); + logger.info("Warmingup batch_#{} Create real/scenario[{}/{}] in {} ms Sleep[{} s] {}", // log + executionBatchNumber, // batch + createProcessInstanceThread.getTotalCreation(), scenarioStep.getNumberOfExecutions(), + // Number of creation request + (end - begin), // duration + durationToWait.getSeconds(), // Sleep for the frequency + (checkFunctionResult == null ? "" : "EndWarmingUp:" + checkFunctionResult.analysis)); } - scheduler.schedule(this, Instant.now().plusMillis(duration.toMillis())); + scheduler.schedule(this, Instant.now().plusMillis(durationToWait.toMillis())); } /** diff --git a/src/main/java/org/camunda/automator/engine/unit/RunScenarioUnit.java b/src/main/java/org/camunda/automator/engine/unit/RunScenarioUnit.java index 14ba21d..3932074 100644 --- a/src/main/java/org/camunda/automator/engine/unit/RunScenarioUnit.java +++ b/src/main/java/org/camunda/automator/engine/unit/RunScenarioUnit.java @@ -187,12 +187,12 @@ public void runExecution() { case USERTASK -> { // wait for the user Task if (scnRunExecution.runScenario.getRunParameters().isUserTask()) - scnRunResult = userTask.executeUserTask(scnRunResult, step); + scnRunResult = userTask.executeUserTask(step, scnRunResult); } case SERVICETASK -> { // wait for the user Task if (scnRunExecution.runScenario.getRunParameters().isServiceTask()) { - scnRunResult = serviceTask.executeServiceTask(scnRunResult, step); + scnRunResult = serviceTask.executeServiceTask(step, scnRunResult); } } @@ -210,7 +210,6 @@ public void runExecution() { } if (scnRunExecution.runScenario.getRunParameters().showLevelMonitoring()) logger.info("ScnRunExecution.EndExecution [" + scnExecution.getName() + "] agent[" + agentName + "]"); - return; } /** diff --git a/src/main/java/org/camunda/automator/engine/unit/RunScenarioUnitServiceTask.java b/src/main/java/org/camunda/automator/engine/unit/RunScenarioUnitServiceTask.java index 7f0f00d..3430f2e 100644 --- a/src/main/java/org/camunda/automator/engine/unit/RunScenarioUnitServiceTask.java +++ b/src/main/java/org/camunda/automator/engine/unit/RunScenarioUnitServiceTask.java @@ -28,7 +28,7 @@ protected RunScenarioUnitServiceTask(RunScenario runScenario) { * @param step step to execute * @return result completed */ - public RunResult executeServiceTask(RunResult result, ScenarioStep step) { + public RunResult executeServiceTask(ScenarioStep step, RunResult result) { if (runScenario.getRunParameters().showLevelMonitoring()) { logger.info("Service TaskId[{}]", step.getTaskId()); } @@ -47,7 +47,7 @@ public RunResult executeServiceTask(RunResult result, ScenarioStep step) { waitingTimeInMs = duration.toMillis(); } if (waitingTimeInMs == null) - waitingTimeInMs = Long.valueOf(5 * 60 * 1000); + waitingTimeInMs = 5L * 60 * 1000; for (int index = 0; index < step.getNumberOfExecutions(); index++) { long beginTimeWait = System.currentTimeMillis(); @@ -71,9 +71,10 @@ public RunResult executeServiceTask(RunResult result, ScenarioStep step) { + result.getFirstProcessInstanceId() + "]"); return result; } + // this is a unit test : there is only one thread, index=1 runScenario.getBpmnEngine() .executeServiceTask(listActivities.get(0), step.getUserId(), - RunZeebeOperation.getVariablesStep(runScenario, step)); + RunZeebeOperation.getVariablesStep(runScenario, step, 1)); } catch (AutomatorException e) { result.addError(step, e.getMessage()); return result; diff --git a/src/main/java/org/camunda/automator/engine/unit/RunScenarioUnitStartEvent.java b/src/main/java/org/camunda/automator/engine/unit/RunScenarioUnitStartEvent.java index bd47d96..710eca4 100644 --- a/src/main/java/org/camunda/automator/engine/unit/RunScenarioUnitStartEvent.java +++ b/src/main/java/org/camunda/automator/engine/unit/RunScenarioUnitStartEvent.java @@ -32,8 +32,9 @@ public RunResult startEvent(RunResult result, ScenarioStep step) { if (runScenario.getRunParameters().showLevelMonitoring()) { logger.info("StartEvent EventId[{}]", step.getTaskId()); } - String processId = step.getScnExecution().getScnHead().getProcessId(); - Map processVariables = RunZeebeOperation.getVariablesStep(runScenario, step); + String processId = step.getScnExecution().getScnHead().getProcessId(); + // There is no multithread: index=1 + Map processVariables = RunZeebeOperation.getVariablesStep(runScenario, step, 1); String processInstanceId = runScenario.getBpmnEngine() .createProcessInstance(processId, step.getTaskId(), processVariables); diff --git a/src/main/java/org/camunda/automator/engine/unit/RunScenarioUnitUserTask.java b/src/main/java/org/camunda/automator/engine/unit/RunScenarioUnitUserTask.java index 94d8ee3..cb4efaf 100644 --- a/src/main/java/org/camunda/automator/engine/unit/RunScenarioUnitUserTask.java +++ b/src/main/java/org/camunda/automator/engine/unit/RunScenarioUnitUserTask.java @@ -24,11 +24,11 @@ protected RunScenarioUnitUserTask(RunScenario runScenario) { /** * Execute User task * - * @param result result to complete and return * @param step step to execute + * @param result result to complete and return * @return result completed */ - public RunResult executeUserTask(RunResult result, ScenarioStep step) { + public RunResult executeUserTask(ScenarioStep step, RunResult result) { if (runScenario.getRunParameters().showLevelMonitoring()) { logger.info("UserTask TaskId[{}]", step.getTaskId()); @@ -73,9 +73,10 @@ public RunResult executeUserTask(RunResult result, ScenarioStep step) { + result.getFirstProcessInstanceId() + "]"); return result; } + // unit test: there is no multi thread executing this part, index=1 runScenario.getBpmnEngine() .executeUserTask(listActivities.get(0), step.getUserId(), - RunZeebeOperation.getVariablesStep(runScenario, step)); + RunZeebeOperation.getVariablesStep(runScenario, step, 1)); } catch (AutomatorException e) { result.addError(step, e.getMessage()); return result; diff --git a/src/main/java/org/camunda/automator/services/AutomatorStartup.java b/src/main/java/org/camunda/automator/services/AutomatorStartup.java index 620a9a1..0e1fdbc 100644 --- a/src/main/java/org/camunda/automator/services/AutomatorStartup.java +++ b/src/main/java/org/camunda/automator/services/AutomatorStartup.java @@ -79,15 +79,20 @@ private List registerScenario() { if (configurationStartup.getScenarioFileAtStartup().isEmpty()) { logger.info("No scenario [File] from variable {} given", configurationStartup.getScenarioFileAtStartupName()); } else { - logger.info("Detect {} scenario [File] from variable {}", configurationStartup.getScenarioFileAtStartup().size(), - configurationStartup.getScenarioFileAtStartupName()); + logger.info("Detect {} scenario [File] from variable [{}] ScenarioPath[{}]", + configurationStartup.getScenarioFileAtStartup().size(), configurationStartup.getScenarioFileAtStartupName(), + configurationStartup.scenarioPath); for (String scenarioFileName : configurationStartup.getScenarioFileAtStartup()) { logger.info("Register scenario [File] [{}]", scenarioFileName); File scenarioFile = new File(configurationStartup.scenarioPath + "/" + scenarioFileName); if (!scenarioFile.exists()) { - logger.error("ScenarioFile: Can't find [{}/{}]", configurationStartup.scenarioPath, scenarioFileName); + scenarioFile = new File(scenarioFileName); + } + if (!scenarioFile.exists()) { + logger.error("ScenarioFile: Can't find File [{}/{}] or [{}]", configurationStartup.scenarioPath, + scenarioFileName, scenarioFileName); continue; } scenarioList.add(scenarioFile); @@ -98,12 +103,12 @@ private List registerScenario() { logger.info("No scenario [Resource] from variable {} given", configurationStartup.getScenarioResourceAtStartupName()); } else { - logger.info("Detect {} scenario [Resource] from variable {}", + logger.info("Detect {} scenario [Resource] from variable [{}]", configurationStartup.getScenarioResourceAtStartup().size(), configurationStartup.getScenarioResourceAtStartupName()); for (Resource resource : configurationStartup.getScenarioResourceAtStartup()) { - if (resource!=null) { + if (resource != null) { logger.info("Load scenario [Resource] from [{}]", resource.getDescription()); scenarioList.add(resource); } @@ -156,10 +161,9 @@ public void run() { logger.info( "AutomatorStartup parameters serverName[{}] warmingUp[{}] creation:[{}] serviceTask:[{}] userTask:[{}] ScenarioPath[{}] logLevel[{}] waitWarmingUpServer[{} s]", - runParameters.getServerName(), - runParameters.isWarmingUp(), runParameters.isCreation(), runParameters.isServiceTask(), - runParameters.isUserTask(), configurationStartup.scenarioPath, configurationStartup.logLevel, - configurationStartup.getWarmingUpServer().toMillis() / 1000); + runParameters.getServerName(), runParameters.isWarmingUp(), runParameters.isCreation(), + runParameters.isServiceTask(), runParameters.isUserTask(), configurationStartup.scenarioPath, + configurationStartup.logLevel, configurationStartup.getWarmingUpServer().toMillis() / 1000); try { String currentPath = new java.io.File(".").getCanonicalPath(); @@ -194,26 +198,25 @@ else if (scenarioObject instanceof Resource scenarioResource) { } if (scenario == null) continue; - logger.info("Start scenario [{}]", scenario.getName()); + logger.info("Start scenario [{}] on (1)ScenarioServerName[{}] (2)ConfigurationServerName[{}]", + scenario.getName(), scenario.getServerName(), runParameters.getServerName()); // BpmnEngine: find the correct one referenced in the scenario int countEngineIsNotReady = 0; BpmnEngine bpmnEngine = null; boolean pleaseTryAgain; + String message = ""; do { pleaseTryAgain = false; countEngineIsNotReady++; - String message = ""; try { - if (runParameters.showLevelDashboard()) { - logger.info("Connect to Bpmn Engine for scenario [{}]", scenario.getName()); - } if (scenario.getServerName() != null && !scenario.getServerName().isEmpty()) { + message += "ScenarioServerName[" + scenario.getServerName() + "];"; bpmnEngine = automatorAPI.getBpmnEngineFromScenario(scenario, engineConfiguration); - } else { if (runParameters.getServerName() == null) throw new AutomatorException("No Server define in configuration"); + message += "ConfigurationServerName[" + runParameters.getServerName() + "];"; BpmnEngineList.BpmnServerDefinition serverDefinition = engineConfiguration.getByServerName( runParameters.getServerName()); if (serverDefinition == null) @@ -222,12 +225,16 @@ else if (scenarioObject instanceof Resource scenarioResource) { bpmnEngine = automatorAPI.getBpmnEngine(serverDefinition, true); } + if (runParameters.showLevelDashboard()) { + logger.info("Scenario [{}] Connect to BpmnEngine {}", scenario.getName(), message); + } + if (!bpmnEngine.isReady()) { bpmnEngine.connection(); } } catch (AutomatorException e) { pleaseTryAgain = true; - message = e.getMessage(); + message += "Exception " + e.getMessage(); } if (pleaseTryAgain && countEngineIsNotReady < 10) { logger.info( @@ -242,7 +249,8 @@ else if (scenarioObject instanceof Resource scenarioResource) { } while (pleaseTryAgain && countEngineIsNotReady < 10); if (bpmnEngine == null) { - logger.error("Scenario [{}] file[{}] No BPM ENGINE running.", scenario.getName(), scenario.getName()); + logger.error("Scenario [{}] file[{}] Server {} No BPM ENGINE running.", scenario.getName(), + scenario.getName(), message); continue; } diff --git a/src/main/java/org/camunda/automator/services/ServiceDataOperation.java b/src/main/java/org/camunda/automator/services/ServiceDataOperation.java index bb7d9c5..dcfa511 100644 --- a/src/main/java/org/camunda/automator/services/ServiceDataOperation.java +++ b/src/main/java/org/camunda/automator/services/ServiceDataOperation.java @@ -3,6 +3,8 @@ import org.camunda.automator.engine.AutomatorException; import org.camunda.automator.engine.RunScenario; import org.camunda.automator.services.dataoperation.DataOperation; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; import org.springframework.beans.factory.annotation.Autowired; import org.springframework.stereotype.Component; @@ -11,6 +13,7 @@ @Component public class ServiceDataOperation { + static Logger logger = LoggerFactory.getLogger(ServiceDataOperation.class); @Autowired private List listDataOperation; @@ -27,13 +30,17 @@ private ServiceDataOperation() { * @param value value to process * @param runScenario scenario to get information * @param context give context in the exception in case of error + * @param index when multiple worker does the same operation, this is the index * @return the value calculated - * @throws AutomatorException + * @throws AutomatorException in case of error */ - public Object execute(String value, RunScenario runScenario, String context) throws AutomatorException { + public Object execute(String value, RunScenario runScenario, String context, int index) throws AutomatorException { for (DataOperation dataOperation : listDataOperation) { - if (dataOperation.match(value)) - return dataOperation.execute(value, runScenario); + if (dataOperation.match(value)) { + if (runScenario.getRunParameters().showLevelDebug()) + logger.info("Execute {} value[{}]", dataOperation.getName(), value); + return dataOperation.execute(value, runScenario, index); + } } String helpOperations = listDataOperation.stream().map(DataOperation::getHelp).collect(Collectors.joining(", ")); diff --git a/src/main/java/org/camunda/automator/services/dataoperation/DataOperation.java b/src/main/java/org/camunda/automator/services/dataoperation/DataOperation.java index b247807..ed8c3cc 100644 --- a/src/main/java/org/camunda/automator/services/dataoperation/DataOperation.java +++ b/src/main/java/org/camunda/automator/services/dataoperation/DataOperation.java @@ -13,7 +13,21 @@ public abstract class DataOperation { public abstract boolean match(String value); - public abstract Object execute(String value, RunScenario runScenario) throws AutomatorException; + /** + * return the name of the operation + * + * @return name + */ + public abstract String getName(); + + /** + * @param value value from the function + * @param runScenario scenario to run + * @param index when multiple workers run the same operation, each worker has a uniq index + * @return the result of the operation + * @throws AutomatorException in case of error + */ + public abstract Object execute(String value, RunScenario runScenario, int index) throws AutomatorException; protected boolean matchFunction(String value, String function) { return value.toUpperCase(Locale.ROOT).startsWith(function.toUpperCase(Locale.ROOT) + "("); diff --git a/src/main/java/org/camunda/automator/services/dataoperation/DataOperationGenerateList.java b/src/main/java/org/camunda/automator/services/dataoperation/DataOperationGenerateList.java index 94fbe3a..0fe64fb 100644 --- a/src/main/java/org/camunda/automator/services/dataoperation/DataOperationGenerateList.java +++ b/src/main/java/org/camunda/automator/services/dataoperation/DataOperationGenerateList.java @@ -2,6 +2,8 @@ import org.camunda.automator.engine.AutomatorException; import org.camunda.automator.engine.RunScenario; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; import org.springframework.stereotype.Component; import java.util.ArrayList; @@ -9,6 +11,14 @@ @Component public class DataOperationGenerateList extends DataOperation { + + Logger logger = LoggerFactory.getLogger(DataOperationGenerateList.class); + + @Override + public String getName() { + return "GenerateList"; + } + @Override public boolean match(String value) { return matchFunction(value, "generaterandomlist"); @@ -20,7 +30,8 @@ public String getHelp() { } @Override - public Object execute(String value, RunScenario runScenario) throws AutomatorException { + public Object execute(String value, RunScenario runScenario, int index) throws AutomatorException { + List args = extractArgument(value, true); List listValues = new ArrayList<>(); try { diff --git a/src/main/java/org/camunda/automator/services/dataoperation/DataOperationGenerateUniqueID.java b/src/main/java/org/camunda/automator/services/dataoperation/DataOperationGenerateUniqueID.java new file mode 100644 index 0000000..3833c1c --- /dev/null +++ b/src/main/java/org/camunda/automator/services/dataoperation/DataOperationGenerateUniqueID.java @@ -0,0 +1,45 @@ +package org.camunda.automator.services.dataoperation; + +import org.camunda.automator.engine.AutomatorException; +import org.camunda.automator.engine.RunScenario; +import org.springframework.stereotype.Component; + +import java.util.HashMap; +import java.util.List; +import java.util.Map; + +@Component +public class DataOperationGenerateUniqueID extends DataOperation { + + private final long baseTimer = System.currentTimeMillis(); + private final Map mapUniqueId = new HashMap<>(); + + @Override + public String getName() { + return "GenerateUniqueId"; + } + + @Override + public boolean match(String value) { + return matchFunction(value, "generateuniqueid"); + } + + @Override + public String getHelp() { + return "generateuniqueid()"; + } + + @Override + public Object execute(String value, RunScenario runScenario, int index) throws AutomatorException { + List args = extractArgument(value, true); + String prefix = args.get(0); + if (prefix == null) + prefix = "default"; + + Long uniqueId = mapUniqueId.getOrDefault(prefix, 0L); + uniqueId++; + mapUniqueId.put(prefix, uniqueId); + return index + "-" + uniqueId + "-" + baseTimer; + + } +} diff --git a/src/main/java/org/camunda/automator/services/dataoperation/DataOperationLoadFile.java b/src/main/java/org/camunda/automator/services/dataoperation/DataOperationLoadFile.java index cb0db57..1847d6a 100644 --- a/src/main/java/org/camunda/automator/services/dataoperation/DataOperationLoadFile.java +++ b/src/main/java/org/camunda/automator/services/dataoperation/DataOperationLoadFile.java @@ -17,16 +17,19 @@ public boolean match(String value) { return matchFunction(value, "loadfile"); } + @Override + public String getName() { + return "LoadFile"; + } + @Override public String getHelp() { return "loadfile()"; } @Override - public Object execute(String value, RunScenario runScenario) throws AutomatorException { + public Object execute(String value, RunScenario runScenario, int index) throws AutomatorException { File fileLoad = loadFile(value, runScenario); - if (fileLoad == null) - return null; FileValue typedFileValue = Variables.fileValue(fileLoad.getName()).file(fileLoad) // .mimeType("text/plain") diff --git a/src/main/java/org/camunda/automator/services/dataoperation/DataOperationStringToDate.java b/src/main/java/org/camunda/automator/services/dataoperation/DataOperationStringToDate.java index 6c4ce8c..c8ffbd6 100644 --- a/src/main/java/org/camunda/automator/services/dataoperation/DataOperationStringToDate.java +++ b/src/main/java/org/camunda/automator/services/dataoperation/DataOperationStringToDate.java @@ -32,6 +32,11 @@ public boolean match(String value) { return matchFunction(value, "stringtodate"); } + @Override + public String getName() { + return "StringToDate"; + } + @Override public String getHelp() { return "stringtodate(" + FCT_LOCALDATETIME + "|" + FCT_DATETIME + "|" + FCT_DATE + "|" + FCT_ZONEDATETIME + "|" @@ -39,7 +44,7 @@ public String getHelp() { } @Override - public Object execute(String value, RunScenario runScenario) throws AutomatorException { + public Object execute(String value, RunScenario runScenario, int index) throws AutomatorException { List args = extractArgument(value, true); if (args.size() != 2) { diff --git a/src/main/resources/application.yaml b/src/main/resources/application.yaml index c0fda48..3c17091 100644 --- a/src/main/resources/application.yaml +++ b/src/main/resources/application.yaml @@ -8,7 +8,7 @@ automator: scenarioPath: ./src/main/resources/loadtest # list of scenario separate by ; - scenarioAtStartup: file:/C8CrawlUrlScn.json; + scenarioFileAtStartup: D:\pym\CamundaDrive\Challenge\loadtest\bankscn.json; # one scenario resource - to be accessible in a Docker container via a configMap scenarioResourceAtStartup: @@ -108,4 +108,4 @@ automator: server.port: 8381 -scheduler.poolSize: 10 \ No newline at end of file +scheduler.poolSize: 10 diff --git a/src/main/resources/banner.txt b/src/main/resources/banner.txt new file mode 100644 index 0000000..6285faf --- /dev/null +++ b/src/main/resources/banner.txt @@ -0,0 +1,7 @@ +__________ __ __ +\______ \_______ ____ ____ ____ ______ ______ _____ __ ___/ |_ ____ _____ _____ _/ |_ ___________ + | ___/\_ __ \/ _ \_/ ___\/ __ \ / ___// ___/ ______ \__ \ | | \ __\/ _ \ / \\__ \\ __\/ _ \_ __ \ + | | | | \( <_> ) \__\ ___/ \___ \ \___ \ /_____/ / __ \| | /| | ( <_> ) Y Y \/ __ \| | ( <_> ) | \/ + |____| |__| \____/ \___ >___ >____ >____ > (____ /____/ |__| \____/|__|_| (____ /__| \____/|__| + \/ \/ \/ \/ \/ \/ \/ + (v1.4.0) \ No newline at end of file