Skip to content

Commit

Permalink
Merge pull request #3265 from malakaganga/fix_task
Browse files Browse the repository at this point in the history
Fix running tasks in non coordinator when former coordinator  re-joining the cluster
  • Loading branch information
malakaganga authored Mar 6, 2024
2 parents b18f5b4 + 6cb5f52 commit 9bf3174
Showing 1 changed file with 22 additions and 0 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -74,6 +74,18 @@ public void memberAdded(NodeDetail nodeDetail) {
LOG.error("Exception occurred while resolving un assigned tasks upon member addition " + nodeDetail
.getNodeId(), e);
}
} else if (clusterCoordinator.getThisNodeId().equals(nodeDetail.getNodeId())
&& isMemberRejoinedAfterUnresponsiveness()) {
// This node became unresponsive and rejoined the cluster hence removing all tasks assigned to this node
// then start the scheduler again after cleaning the locally running tasks.
becameUnresponsive(nodeDetail.getNodeId());
try {
//Remove from database
taskStore.deleteTasks(nodeDetail.getNodeId());
} catch (TaskCoordinationException e) {
LOG.error("Error while removing the tasks of this node.", e);
}
reJoined(nodeDetail.getNodeId());
}
}

Expand Down Expand Up @@ -119,6 +131,16 @@ public void becameUnresponsive(String nodeId) {
});
}

/**
* Check whether the member has rejoined after being unresponsive.
*
* @return true if the member has rejoined after being unresponsive, false otherwise
*/
public boolean isMemberRejoinedAfterUnresponsiveness() {
return taskManager.getLocallyRunningCoordinatedTasks().size() > 0;
}


@Override
public void reJoined(String nodeId) {

Expand Down

0 comments on commit 9bf3174

Please sign in to comment.