From 0030180194e66af0a954f01bcc15f7e9ee9a76bd Mon Sep 17 00:00:00 2001 From: Javier Date: Tue, 27 Aug 2024 21:44:59 +0200 Subject: [PATCH] [Fix #3642] Fixing task deadline functionality There were three issues that prevents deadlines to work 1) HumanTaskNodeInstance was not getting registered for job service events 2) The event published was not checking the right type of event 3) Job Service was using a different Default Unit of work manager --- .../kie/kogito/services/uow/DefaultUnitOfWorkManager.java | 5 +++++ .../process/instance/LightProcessRuntimeServiceProvider.java | 3 +-- .../java/org/jbpm/process/instance/ProcessRuntimeImpl.java | 3 +-- .../jbpm/workflow/instance/node/HumanTaskNodeInstance.java | 2 ++ .../org/kie/kogito/process/impl/AbstractProcessConfig.java | 4 +--- quarkus/addons/task-notification/runtime/pom.xml | 2 +- .../notification/quarkus/NotificationEventPublisher.java | 3 ++- .../org/kie/kogito/quarkus/workflow/KogitoBeanProducer.java | 3 +-- 8 files changed, 14 insertions(+), 11 deletions(-) diff --git a/api/kogito-services/src/main/java/org/kie/kogito/services/uow/DefaultUnitOfWorkManager.java b/api/kogito-services/src/main/java/org/kie/kogito/services/uow/DefaultUnitOfWorkManager.java index 888ad47581c..4882740af26 100644 --- a/api/kogito-services/src/main/java/org/kie/kogito/services/uow/DefaultUnitOfWorkManager.java +++ b/api/kogito-services/src/main/java/org/kie/kogito/services/uow/DefaultUnitOfWorkManager.java @@ -37,6 +37,7 @@ * */ public class DefaultUnitOfWorkManager implements UnitOfWorkManager { + private static DefaultUnitOfWorkManager instance = new DefaultUnitOfWorkManager(new CollectingUnitOfWorkFactory()); // uses thread local to associate unit of works to execution context/thread private ThreadLocal currentUnitOfWork = new ThreadLocal<>(); // uses pass through unit of work as fallback if no unit of work has been started @@ -48,6 +49,10 @@ public class DefaultUnitOfWorkManager implements UnitOfWorkManager { private Set listeners = new LinkedHashSet<>(); + public static DefaultUnitOfWorkManager get() { + return instance; + } + public DefaultUnitOfWorkManager(UnitOfWorkFactory factory) { super(); this.factory = factory; diff --git a/jbpm/jbpm-flow/src/main/java/org/jbpm/process/instance/LightProcessRuntimeServiceProvider.java b/jbpm/jbpm-flow/src/main/java/org/jbpm/process/instance/LightProcessRuntimeServiceProvider.java index 2f551cb2028..f243afec74f 100644 --- a/jbpm/jbpm-flow/src/main/java/org/jbpm/process/instance/LightProcessRuntimeServiceProvider.java +++ b/jbpm/jbpm-flow/src/main/java/org/jbpm/process/instance/LightProcessRuntimeServiceProvider.java @@ -22,7 +22,6 @@ import org.kie.kogito.process.impl.DefaultWorkItemHandlerConfig; import org.kie.kogito.services.identity.NoOpIdentityProvider; import org.kie.kogito.services.signal.DefaultSignalManagerHub; -import org.kie.kogito.services.uow.CollectingUnitOfWorkFactory; import org.kie.kogito.services.uow.DefaultUnitOfWorkManager; public class LightProcessRuntimeServiceProvider extends AbstractProcessRuntimeServiceProvider { @@ -32,7 +31,7 @@ public LightProcessRuntimeServiceProvider() { new DefaultWorkItemHandlerConfig(), new DefaultProcessEventListenerConfig(), new DefaultSignalManagerHub(), - new DefaultUnitOfWorkManager(new CollectingUnitOfWorkFactory()), + DefaultUnitOfWorkManager.get(), new NoOpIdentityProvider()); } } diff --git a/jbpm/jbpm-flow/src/main/java/org/jbpm/process/instance/ProcessRuntimeImpl.java b/jbpm/jbpm-flow/src/main/java/org/jbpm/process/instance/ProcessRuntimeImpl.java index 3eaec2fce36..3630476f015 100755 --- a/jbpm/jbpm-flow/src/main/java/org/jbpm/process/instance/ProcessRuntimeImpl.java +++ b/jbpm/jbpm-flow/src/main/java/org/jbpm/process/instance/ProcessRuntimeImpl.java @@ -73,7 +73,6 @@ import org.kie.kogito.jobs.ProcessJobDescription; import org.kie.kogito.services.identity.NoOpIdentityProvider; import org.kie.kogito.services.jobs.impl.LegacyInMemoryJobService; -import org.kie.kogito.services.uow.CollectingUnitOfWorkFactory; import org.kie.kogito.services.uow.DefaultUnitOfWorkManager; import org.kie.kogito.signal.SignalManager; import org.kie.kogito.uow.UnitOfWorkManager; @@ -98,7 +97,7 @@ public ProcessRuntimeImpl(Application application, InternalWorkingMemory working this.kruntime = workingMemory.getKnowledgeRuntime(); initProcessInstanceManager(); initSignalManager(); - unitOfWorkManager = new DefaultUnitOfWorkManager(new CollectingUnitOfWorkFactory()); + unitOfWorkManager = DefaultUnitOfWorkManager.get(); jobService = new LegacyInMemoryJobService(kogitoProcessRuntime, unitOfWorkManager); this.processEventSupport = new KogitoProcessEventSupportImpl(new NoOpIdentityProvider()); if (isActive()) { diff --git a/jbpm/jbpm-flow/src/main/java/org/jbpm/workflow/instance/node/HumanTaskNodeInstance.java b/jbpm/jbpm-flow/src/main/java/org/jbpm/workflow/instance/node/HumanTaskNodeInstance.java index 3c1bc2a3415..3fc1522821d 100755 --- a/jbpm/jbpm-flow/src/main/java/org/jbpm/workflow/instance/node/HumanTaskNodeInstance.java +++ b/jbpm/jbpm-flow/src/main/java/org/jbpm/workflow/instance/node/HumanTaskNodeInstance.java @@ -216,12 +216,14 @@ private boolean checkAndReassign(Map timers, protected void addWorkItemListener() { super.addWorkItemListener(); getProcessInstance().addEventListener(WORK_ITEM_TRANSITION, this, false); + getProcessInstance().addEventListener(TIMER_TRIGGERED_EVENT, this, false); } @Override protected void removeWorkItemListener() { super.removeWorkItemListener(); getProcessInstance().removeEventListener(WORK_ITEM_TRANSITION, this, false); + getProcessInstance().removeEventListener(TIMER_TRIGGERED_EVENT, this, false); } private KogitoProcessEventSupport getEventSupport() { diff --git a/jbpm/jbpm-flow/src/main/java/org/kie/kogito/process/impl/AbstractProcessConfig.java b/jbpm/jbpm-flow/src/main/java/org/kie/kogito/process/impl/AbstractProcessConfig.java index 25d278f628c..05364f76d7d 100644 --- a/jbpm/jbpm-flow/src/main/java/org/kie/kogito/process/impl/AbstractProcessConfig.java +++ b/jbpm/jbpm-flow/src/main/java/org/kie/kogito/process/impl/AbstractProcessConfig.java @@ -36,7 +36,6 @@ import org.kie.kogito.process.WorkItemHandlerConfig; import org.kie.kogito.services.identity.NoOpIdentityProvider; import org.kie.kogito.services.signal.DefaultSignalManagerHub; -import org.kie.kogito.services.uow.CollectingUnitOfWorkFactory; import org.kie.kogito.services.uow.DefaultUnitOfWorkManager; import org.kie.kogito.signal.SignalManagerHub; import org.kie.kogito.uow.UnitOfWorkManager; @@ -67,8 +66,7 @@ protected AbstractProcessConfig( this.workItemHandlerConfig = mergeWorkItemHandler(workItemHandlerConfig, DefaultWorkItemHandlerConfig::new); this.processEventListenerConfig = merge(processEventListenerConfigs, processEventListeners); this.unitOfWorkManager = orDefault(unitOfWorkManager, - () -> new DefaultUnitOfWorkManager( - new CollectingUnitOfWorkFactory())); + () -> DefaultUnitOfWorkManager.get()); this.jobsService = orDefault(jobsService, () -> null); this.versionResolver = orDefault(versionResolver, () -> null); this.identityProvider = orDefault(identityProvider, NoOpIdentityProvider::new); diff --git a/quarkus/addons/task-notification/runtime/pom.xml b/quarkus/addons/task-notification/runtime/pom.xml index 4c5a8d41a9b..14c776bd1e1 100644 --- a/quarkus/addons/task-notification/runtime/pom.xml +++ b/quarkus/addons/task-notification/runtime/pom.xml @@ -55,7 +55,7 @@ org.kie.kogito - kogito-events-api + kogito-events-core io.quarkus diff --git a/quarkus/addons/task-notification/runtime/src/main/java/org/kie/kogito/task/notification/quarkus/NotificationEventPublisher.java b/quarkus/addons/task-notification/runtime/src/main/java/org/kie/kogito/task/notification/quarkus/NotificationEventPublisher.java index 1984f5cdedf..0bec2025262 100644 --- a/quarkus/addons/task-notification/runtime/src/main/java/org/kie/kogito/task/notification/quarkus/NotificationEventPublisher.java +++ b/quarkus/addons/task-notification/runtime/src/main/java/org/kie/kogito/task/notification/quarkus/NotificationEventPublisher.java @@ -24,6 +24,7 @@ import org.eclipse.microprofile.reactive.messaging.Emitter; import org.kie.kogito.event.DataEvent; import org.kie.kogito.event.EventPublisher; +import org.kie.kogito.event.usertask.UserTaskInstanceDeadlineDataEvent; import org.slf4j.Logger; import org.slf4j.LoggerFactory; @@ -42,7 +43,7 @@ public class NotificationEventPublisher implements EventPublisher { @Override public void publish(DataEvent event) { - if (event.getType().startsWith("UserTaskDeadline")) { + if (event instanceof UserTaskInstanceDeadlineDataEvent) { logger.debug("About to publish event {} to topic {}", event, CHANNEL_NAME); try { emitter.send(event); diff --git a/quarkus/extensions/kogito-quarkus-workflow-extension-common/kogito-quarkus-workflow-common/src/main/java/org/kie/kogito/quarkus/workflow/KogitoBeanProducer.java b/quarkus/extensions/kogito-quarkus-workflow-extension-common/kogito-quarkus-workflow-common/src/main/java/org/kie/kogito/quarkus/workflow/KogitoBeanProducer.java index ea7fe04f789..888afab6cfb 100644 --- a/quarkus/extensions/kogito-quarkus-workflow-extension-common/kogito-quarkus-workflow-common/src/main/java/org/kie/kogito/quarkus/workflow/KogitoBeanProducer.java +++ b/quarkus/extensions/kogito-quarkus-workflow-extension-common/kogito-quarkus-workflow-common/src/main/java/org/kie/kogito/quarkus/workflow/KogitoBeanProducer.java @@ -28,7 +28,6 @@ import org.kie.kogito.process.Processes; import org.kie.kogito.process.version.ProjectVersionProcessVersionResolver; import org.kie.kogito.services.jobs.impl.InMemoryJobService; -import org.kie.kogito.services.uow.CollectingUnitOfWorkFactory; import org.kie.kogito.services.uow.DefaultUnitOfWorkManager; import org.kie.kogito.uow.UnitOfWorkManager; @@ -51,7 +50,7 @@ CorrelationService correlationService() { @DefaultBean @Produces UnitOfWorkManager unitOfWorkManager() { - return new DefaultUnitOfWorkManager(new CollectingUnitOfWorkFactory()); + return DefaultUnitOfWorkManager.get(); } @DefaultBean