Skip to content

Commit

Permalink
Make isJobArtifactCachingEnabled onfigurable (#555)
Browse files Browse the repository at this point in the history
* Make isJobArtifactCachingEnabled onfigurable

* Fix test
  • Loading branch information
fdc-ntflx authored Sep 13, 2023
1 parent 731f3db commit adeca1f
Show file tree
Hide file tree
Showing 4 changed files with 16 additions and 11 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -116,8 +116,10 @@ class ResourceClusterActor extends AbstractActorWithTimers {
private final int maxJobArtifactsToCache;
private final String jobClustersWithArtifactCachingEnabled;

static Props props(final ClusterID clusterID, final Duration heartbeatTimeout, Duration assignmentTimeout, Duration disabledTaskExecutorsCheckInterval, Clock clock, RpcService rpcService, MantisJobStore mantisJobStore, JobMessageRouter jobMessageRouter, int maxJobArtifactsToCache, String jobClustersWithArtifactCachingEnabled) {
return Props.create(ResourceClusterActor.class, clusterID, heartbeatTimeout, assignmentTimeout, disabledTaskExecutorsCheckInterval, clock, rpcService, mantisJobStore, jobMessageRouter, maxJobArtifactsToCache, jobClustersWithArtifactCachingEnabled)
private final boolean isJobArtifactCachingEnabled;

static Props props(final ClusterID clusterID, final Duration heartbeatTimeout, Duration assignmentTimeout, Duration disabledTaskExecutorsCheckInterval, Clock clock, RpcService rpcService, MantisJobStore mantisJobStore, JobMessageRouter jobMessageRouter, int maxJobArtifactsToCache, String jobClustersWithArtifactCachingEnabled, boolean isJobArtifactCachingEnabled) {
return Props.create(ResourceClusterActor.class, clusterID, heartbeatTimeout, assignmentTimeout, disabledTaskExecutorsCheckInterval, clock, rpcService, mantisJobStore, jobMessageRouter, maxJobArtifactsToCache, jobClustersWithArtifactCachingEnabled, isJobArtifactCachingEnabled)
.withMailbox("akka.actor.metered-mailbox");
}

Expand All @@ -131,11 +133,13 @@ static Props props(final ClusterID clusterID, final Duration heartbeatTimeout, D
MantisJobStore mantisJobStore,
JobMessageRouter jobMessageRouter,
int maxJobArtifactsToCache,
String jobClustersWithArtifactCachingEnabled) {
String jobClustersWithArtifactCachingEnabled,
boolean isJobArtifactCachingEnabled) {
this.clusterID = clusterID;
this.heartbeatTimeout = heartbeatTimeout;
this.assignmentTimeout = assignmentTimeout;
this.disabledTaskExecutorsCheckInterval = disabledTaskExecutorsCheckInterval;
this.isJobArtifactCachingEnabled = isJobArtifactCachingEnabled;

this.clock = clock;
this.rpcService = rpcService;
Expand Down Expand Up @@ -565,7 +569,7 @@ private void onTaskExecutorRegistration(TaskExecutorRegistration registration) {
updateHeartbeatTimeout(registration.getTaskExecutorID());
}
log.info("Successfully registered {} with the resource cluster {}", registration.getTaskExecutorID(), this);
if (!jobArtifactsToCache.isEmpty() && isJobArtifactCachingEnabled()) {
if (!jobArtifactsToCache.isEmpty() && isJobArtifactCachingEnabled) {
self().tell(new CacheJobArtifactsOnTaskExecutorRequest(taskExecutorID, clusterID), self());
}
sender().tell(Ack.getInstance(), self());
Expand Down Expand Up @@ -1154,11 +1158,6 @@ public AvailabilityState onTaskExecutorStatusChange(TaskExecutorReport report) {
}
}

private Boolean isJobArtifactCachingEnabled() {
return true;
// TODO: fix this -> return ServiceRegistry.INSTANCE.getPropertiesService().getStringValue("mantis.job.artifact.caching.enabled", "false").equals("true");
}

private Predicate<Entry<TaskExecutorID, TaskExecutorState>> filterByAttrs(HasAttributes hasAttributes) {
if (hasAttributes.getAttributes().isEmpty()) {
return e -> true;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -175,7 +175,8 @@ private ActorRef createResourceClusterActorFor(ClusterID clusterID) {
mantisJobStore,
jobMessageRouter,
masterConfiguration.getMaxJobArtifactsToCache(),
masterConfiguration.getJobClustersWithArtifactCachingEnabled()),
masterConfiguration.getJobClustersWithArtifactCachingEnabled(),
masterConfiguration.isJobArtifactCachingEnabled()),
"ResourceClusterActor-" + clusterID.getResourceID());
log.info("Created resource cluster actor for {}", clusterID);
return clusterActor;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -378,6 +378,10 @@ default Duration getSchedulerIntervalBetweenRetries() {
@Default("")
String getJobClustersWithArtifactCachingEnabled();

@Config("mantis.artifactCaching.enabled")
@Default("true")
boolean isJobArtifactCachingEnabled();

// rate limit actions on resource cluster actor to control backlog.
@Config("mantis.master.resource.cluster.actions.permitsPerSecond")
@Default("2000")
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -192,7 +192,8 @@ public void setupActor() {
mantisJobStore,
jobMessageRouter,
0,
"");
"",
false);

resourceClusterActor = actorSystem.actorOf(props);
resourceCluster =
Expand Down

0 comments on commit adeca1f

Please sign in to comment.