diff --git a/components/mediation/tasks/org.wso2.micro.integrator.ntask.core/src/main/java/org/wso2/micro/integrator/ntask/coordination/task/TaskEventListener.java b/components/mediation/tasks/org.wso2.micro.integrator.ntask.core/src/main/java/org/wso2/micro/integrator/ntask/coordination/task/TaskEventListener.java index 1a75a00296..2d2a173ef1 100644 --- a/components/mediation/tasks/org.wso2.micro.integrator.ntask.core/src/main/java/org/wso2/micro/integrator/ntask/coordination/task/TaskEventListener.java +++ b/components/mediation/tasks/org.wso2.micro.integrator.ntask.core/src/main/java/org/wso2/micro/integrator/ntask/coordination/task/TaskEventListener.java @@ -74,6 +74,18 @@ public void memberAdded(NodeDetail nodeDetail) { LOG.error("Exception occurred while resolving un assigned tasks upon member addition " + nodeDetail .getNodeId(), e); } + } else if (clusterCoordinator.getThisNodeId().equals(nodeDetail.getNodeId()) + && isMemberRejoinedAfterUnresponsiveness()) { + // This node became unresponsive and rejoined the cluster hence removing all tasks assigned to this node + // then start the scheduler again after cleaning the locally running tasks. + becameUnresponsive(nodeDetail.getNodeId()); + try { + //Remove from database + taskStore.deleteTasks(nodeDetail.getNodeId()); + } catch (TaskCoordinationException e) { + LOG.error("Error while removing the tasks of this node.", e); + } + reJoined(nodeDetail.getNodeId()); } } @@ -119,6 +131,16 @@ public void becameUnresponsive(String nodeId) { }); } + /** + * Check whether the member has rejoined after being unresponsive. + * + * @return true if the member has rejoined after being unresponsive, false otherwise + */ + public boolean isMemberRejoinedAfterUnresponsiveness() { + return taskManager.getLocallyRunningCoordinatedTasks().size() > 0; + } + + @Override public void reJoined(String nodeId) {