Skip to content

Commit

Permalink
[Fix apache#3642] There were three issues that prevents deadlines to …
Browse files Browse the repository at this point in the history
…work

1) HumanTaskNodeInstance was not getting registered for job service
events
2) The event published was not checking the right type of event
3) Job Service was using a different Default Unit of work manager
  • Loading branch information
fjtirado committed Aug 27, 2024
1 parent 4cd44dc commit 3c44ac3
Show file tree
Hide file tree
Showing 8 changed files with 16 additions and 7 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -37,6 +37,7 @@
*
*/
public class DefaultUnitOfWorkManager implements UnitOfWorkManager {
private static DefaultUnitOfWorkManager instance = new DefaultUnitOfWorkManager(new CollectingUnitOfWorkFactory());
// uses thread local to associate unit of works to execution context/thread
private ThreadLocal<UnitOfWork> currentUnitOfWork = new ThreadLocal<>();
// uses pass through unit of work as fallback if no unit of work has been started
Expand All @@ -48,6 +49,11 @@ public class DefaultUnitOfWorkManager implements UnitOfWorkManager {

private Set<UnitOfWorkEventListener> listeners = new LinkedHashSet<>();


public static DefaultUnitOfWorkManager get () {
return instance ;
}

public DefaultUnitOfWorkManager(UnitOfWorkFactory factory) {
super();
this.factory = factory;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -32,7 +32,7 @@ public LightProcessRuntimeServiceProvider() {
new DefaultWorkItemHandlerConfig(),
new DefaultProcessEventListenerConfig(),
new DefaultSignalManagerHub(),
new DefaultUnitOfWorkManager(new CollectingUnitOfWorkFactory()),
DefaultUnitOfWorkManager.get(),
new NoOpIdentityProvider());
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -98,7 +98,7 @@ public ProcessRuntimeImpl(Application application, InternalWorkingMemory working
this.kruntime = workingMemory.getKnowledgeRuntime();
initProcessInstanceManager();
initSignalManager();
unitOfWorkManager = new DefaultUnitOfWorkManager(new CollectingUnitOfWorkFactory());
unitOfWorkManager = DefaultUnitOfWorkManager.get();
jobService = new LegacyInMemoryJobService(kogitoProcessRuntime, unitOfWorkManager);
this.processEventSupport = new KogitoProcessEventSupportImpl(new NoOpIdentityProvider());
if (isActive()) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -216,12 +216,14 @@ private boolean checkAndReassign(Map<String, Reassignment> timers,
protected void addWorkItemListener() {
super.addWorkItemListener();
getProcessInstance().addEventListener(WORK_ITEM_TRANSITION, this, false);
getProcessInstance().addEventListener(TIMER_TRIGGERED_EVENT, this, false);
}

@Override
protected void removeWorkItemListener() {
super.removeWorkItemListener();
getProcessInstance().removeEventListener(WORK_ITEM_TRANSITION, this, false);
getProcessInstance().removeEventListener(TIMER_TRIGGERED_EVENT, this, false);
}

private KogitoProcessEventSupport getEventSupport() {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -25,6 +25,7 @@
import java.util.stream.Stream;
import java.util.stream.StreamSupport;

import org.drools.core.process.impl.DefaultWorkItemManager;
import org.kie.api.event.process.ProcessEventListener;
import org.kie.kogito.Addons;
import org.kie.kogito.auth.IdentityProvider;
Expand Down Expand Up @@ -67,8 +68,7 @@ protected AbstractProcessConfig(
this.workItemHandlerConfig = mergeWorkItemHandler(workItemHandlerConfig, DefaultWorkItemHandlerConfig::new);
this.processEventListenerConfig = merge(processEventListenerConfigs, processEventListeners);
this.unitOfWorkManager = orDefault(unitOfWorkManager,
() -> new DefaultUnitOfWorkManager(
new CollectingUnitOfWorkFactory()));
() -> DefaultUnitOfWorkManager.get());
this.jobsService = orDefault(jobsService, () -> null);
this.versionResolver = orDefault(versionResolver, () -> null);
this.identityProvider = orDefault(identityProvider, NoOpIdentityProvider::new);
Expand Down
2 changes: 1 addition & 1 deletion quarkus/addons/task-notification/runtime/pom.xml
Original file line number Diff line number Diff line change
Expand Up @@ -55,7 +55,7 @@
</dependency>
<dependency>
<groupId>org.kie.kogito</groupId>
<artifactId>kogito-events-api</artifactId>
<artifactId>kogito-events-core</artifactId>
</dependency>
<dependency>
<groupId>io.quarkus</groupId>
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -24,6 +24,7 @@
import org.eclipse.microprofile.reactive.messaging.Emitter;
import org.kie.kogito.event.DataEvent;
import org.kie.kogito.event.EventPublisher;
import org.kie.kogito.event.usertask.UserTaskInstanceDeadlineDataEvent;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

Expand All @@ -42,7 +43,7 @@ public class NotificationEventPublisher implements EventPublisher {

@Override
public void publish(DataEvent<?> event) {
if (event.getType().startsWith("UserTaskDeadline")) {
if (event instanceof UserTaskInstanceDeadlineDataEvent) {
logger.debug("About to publish event {} to topic {}", event, CHANNEL_NAME);
try {
emitter.send(event);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -51,7 +51,7 @@ CorrelationService correlationService() {
@DefaultBean
@Produces
UnitOfWorkManager unitOfWorkManager() {
return new DefaultUnitOfWorkManager(new CollectingUnitOfWorkFactory());
return DefaultUnitOfWorkManager.get();
}

@DefaultBean
Expand Down

0 comments on commit 3c44ac3

Please sign in to comment.