Skip to content

Commit

Permalink
[Fix apache#3642] Fixing task deadline functionality
Browse files Browse the repository at this point in the history
There were three issues that prevents deadlines to work

1) HumanTaskNodeInstance was not getting registered for job service
events
2) The event published was not checking the right type of event
3) Job Service was using a different Default Unit of work manager
  • Loading branch information
fjtirado committed Aug 28, 2024
1 parent 4cd44dc commit 0030180
Show file tree
Hide file tree
Showing 8 changed files with 14 additions and 11 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -37,6 +37,7 @@
*
*/
public class DefaultUnitOfWorkManager implements UnitOfWorkManager {
private static DefaultUnitOfWorkManager instance = new DefaultUnitOfWorkManager(new CollectingUnitOfWorkFactory());
// uses thread local to associate unit of works to execution context/thread
private ThreadLocal<UnitOfWork> currentUnitOfWork = new ThreadLocal<>();
// uses pass through unit of work as fallback if no unit of work has been started
Expand All @@ -48,6 +49,10 @@ public class DefaultUnitOfWorkManager implements UnitOfWorkManager {

private Set<UnitOfWorkEventListener> listeners = new LinkedHashSet<>();

public static DefaultUnitOfWorkManager get() {
return instance;
}

public DefaultUnitOfWorkManager(UnitOfWorkFactory factory) {
super();
this.factory = factory;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -22,7 +22,6 @@
import org.kie.kogito.process.impl.DefaultWorkItemHandlerConfig;
import org.kie.kogito.services.identity.NoOpIdentityProvider;
import org.kie.kogito.services.signal.DefaultSignalManagerHub;
import org.kie.kogito.services.uow.CollectingUnitOfWorkFactory;
import org.kie.kogito.services.uow.DefaultUnitOfWorkManager;

public class LightProcessRuntimeServiceProvider extends AbstractProcessRuntimeServiceProvider {
Expand All @@ -32,7 +31,7 @@ public LightProcessRuntimeServiceProvider() {
new DefaultWorkItemHandlerConfig(),
new DefaultProcessEventListenerConfig(),
new DefaultSignalManagerHub(),
new DefaultUnitOfWorkManager(new CollectingUnitOfWorkFactory()),
DefaultUnitOfWorkManager.get(),
new NoOpIdentityProvider());
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -73,7 +73,6 @@
import org.kie.kogito.jobs.ProcessJobDescription;
import org.kie.kogito.services.identity.NoOpIdentityProvider;
import org.kie.kogito.services.jobs.impl.LegacyInMemoryJobService;
import org.kie.kogito.services.uow.CollectingUnitOfWorkFactory;
import org.kie.kogito.services.uow.DefaultUnitOfWorkManager;
import org.kie.kogito.signal.SignalManager;
import org.kie.kogito.uow.UnitOfWorkManager;
Expand All @@ -98,7 +97,7 @@ public ProcessRuntimeImpl(Application application, InternalWorkingMemory working
this.kruntime = workingMemory.getKnowledgeRuntime();
initProcessInstanceManager();
initSignalManager();
unitOfWorkManager = new DefaultUnitOfWorkManager(new CollectingUnitOfWorkFactory());
unitOfWorkManager = DefaultUnitOfWorkManager.get();
jobService = new LegacyInMemoryJobService(kogitoProcessRuntime, unitOfWorkManager);
this.processEventSupport = new KogitoProcessEventSupportImpl(new NoOpIdentityProvider());
if (isActive()) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -216,12 +216,14 @@ private boolean checkAndReassign(Map<String, Reassignment> timers,
protected void addWorkItemListener() {
super.addWorkItemListener();
getProcessInstance().addEventListener(WORK_ITEM_TRANSITION, this, false);
getProcessInstance().addEventListener(TIMER_TRIGGERED_EVENT, this, false);
}

@Override
protected void removeWorkItemListener() {
super.removeWorkItemListener();
getProcessInstance().removeEventListener(WORK_ITEM_TRANSITION, this, false);
getProcessInstance().removeEventListener(TIMER_TRIGGERED_EVENT, this, false);
}

private KogitoProcessEventSupport getEventSupport() {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -36,7 +36,6 @@
import org.kie.kogito.process.WorkItemHandlerConfig;
import org.kie.kogito.services.identity.NoOpIdentityProvider;
import org.kie.kogito.services.signal.DefaultSignalManagerHub;
import org.kie.kogito.services.uow.CollectingUnitOfWorkFactory;
import org.kie.kogito.services.uow.DefaultUnitOfWorkManager;
import org.kie.kogito.signal.SignalManagerHub;
import org.kie.kogito.uow.UnitOfWorkManager;
Expand Down Expand Up @@ -67,8 +66,7 @@ protected AbstractProcessConfig(
this.workItemHandlerConfig = mergeWorkItemHandler(workItemHandlerConfig, DefaultWorkItemHandlerConfig::new);
this.processEventListenerConfig = merge(processEventListenerConfigs, processEventListeners);
this.unitOfWorkManager = orDefault(unitOfWorkManager,
() -> new DefaultUnitOfWorkManager(
new CollectingUnitOfWorkFactory()));
() -> DefaultUnitOfWorkManager.get());
this.jobsService = orDefault(jobsService, () -> null);
this.versionResolver = orDefault(versionResolver, () -> null);
this.identityProvider = orDefault(identityProvider, NoOpIdentityProvider::new);
Expand Down
2 changes: 1 addition & 1 deletion quarkus/addons/task-notification/runtime/pom.xml
Original file line number Diff line number Diff line change
Expand Up @@ -55,7 +55,7 @@
</dependency>
<dependency>
<groupId>org.kie.kogito</groupId>
<artifactId>kogito-events-api</artifactId>
<artifactId>kogito-events-core</artifactId>
</dependency>
<dependency>
<groupId>io.quarkus</groupId>
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -24,6 +24,7 @@
import org.eclipse.microprofile.reactive.messaging.Emitter;
import org.kie.kogito.event.DataEvent;
import org.kie.kogito.event.EventPublisher;
import org.kie.kogito.event.usertask.UserTaskInstanceDeadlineDataEvent;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

Expand All @@ -42,7 +43,7 @@ public class NotificationEventPublisher implements EventPublisher {

@Override
public void publish(DataEvent<?> event) {
if (event.getType().startsWith("UserTaskDeadline")) {
if (event instanceof UserTaskInstanceDeadlineDataEvent) {
logger.debug("About to publish event {} to topic {}", event, CHANNEL_NAME);
try {
emitter.send(event);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -28,7 +28,6 @@
import org.kie.kogito.process.Processes;
import org.kie.kogito.process.version.ProjectVersionProcessVersionResolver;
import org.kie.kogito.services.jobs.impl.InMemoryJobService;
import org.kie.kogito.services.uow.CollectingUnitOfWorkFactory;
import org.kie.kogito.services.uow.DefaultUnitOfWorkManager;
import org.kie.kogito.uow.UnitOfWorkManager;

Expand All @@ -51,7 +50,7 @@ CorrelationService correlationService() {
@DefaultBean
@Produces
UnitOfWorkManager unitOfWorkManager() {
return new DefaultUnitOfWorkManager(new CollectingUnitOfWorkFactory());
return DefaultUnitOfWorkManager.get();
}

@DefaultBean
Expand Down

0 comments on commit 0030180

Please sign in to comment.