Skip to content

Commit

Permalink
Refactor Task Executor Registration to Primarily Use Configurations f…
Browse files Browse the repository at this point in the history
…or MachineDefinition. (#639)
  • Loading branch information
fdc-ntflx authored Feb 26, 2024
1 parent 3386199 commit 56745b3
Show file tree
Hide file tree
Showing 5 changed files with 73 additions and 2 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -155,6 +155,18 @@ default Time getHeartbeatInterval() {
@DefaultNull
File getLocalStorageDir();

@Config("mantis.taskexecutor.hardware.cpu-cores")
@DefaultNull
Double getCpuCores();

@Config("mantis.taskexecutor.hardware.memory-in-mb")
@DefaultNull
Double getMemoryInMB();

@Config("mantis.taskexecutor.hardware.disk-in-mb")
@DefaultNull
Double getDiskInMB();

@Config("mantis.taskexecutor.hardware.network-bandwidth-in-mb")
@Default(value = "128.0")
double getNetworkBandwidthInMB();
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -53,6 +53,9 @@ public static <T extends WorkerConfiguration> WorkerConfigurationWritable toWrit
.metricsPort(configSource.getMetricsPort())
.metricsPublisher(configSource.getMetricsPublisher())
.metricsPublisherFrequencyInSeconds(configSource.getMetricsPublisherFrequencyInSeconds())
.cpuCores(configSource.getCpuCores())
.memoryInMB(configSource.getMemoryInMB())
.diskInMB(configSource.getDiskInMB())
.networkBandwidthInMB(configSource.getNetworkBandwidthInMB())
.sinkPort(configSource.getSinkPort())
.taskExecutorId(configSource.getTaskExecutorId())
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -65,6 +65,9 @@ public class WorkerConfigurationWritable implements WorkerConfiguration {

URI blobStoreArtifactDir;
File localStorageDir;
Double cpuCores;
Double memoryInMB;
Double diskInMB;
double networkBandwidthInMB;
String taskExecutorAttributesStr;
int asyncHttpClientMaxConnectionsPerHost;
Expand Down Expand Up @@ -243,6 +246,21 @@ public File getLocalStorageDir() {
return this.localStorageDir;
}

@Override
public Double getCpuCores() {
return this.cpuCores;
}

@Override
public Double getMemoryInMB() {
return this.memoryInMB;
}

@Override
public Double getDiskInMB() {
return this.diskInMB;
}

@Override
public double getNetworkBandwidthInMB() {
return this.networkBandwidthInMB;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -17,8 +17,17 @@

import io.mantisrx.common.WorkerPorts;
import io.mantisrx.runtime.MachineDefinition;
import io.mantisrx.runtime.loader.config.WorkerConfiguration;
import java.util.Optional;

public class MachineDefinitionUtils {
/**
* Creates a MachineDefinition object using system parameters.
*
* @param workerPorts Worker ports object.
* @param networkBandwidthInMB Network bandwidth capacity in MB.
* @return MachineDefinition object.
*/
public static MachineDefinition sys(WorkerPorts workerPorts, double networkBandwidthInMB) {
return new MachineDefinition(
Hardware.getNumberCPUCores(),
Expand All @@ -27,4 +36,34 @@ public static MachineDefinition sys(WorkerPorts workerPorts, double networkBandw
Hardware.getSizeOfDisk() / 1024.0 / 1024.0,
workerPorts.getNumberOfPorts());
}

/**
* Creates a MachineDefinition object from the worker configuration, falling back to system parameters.
*
* @param workerConfiguration Worker configuration object.
* @param workerPorts Worker ports object.
* @return MachineDefinition object.
*/
public static MachineDefinition from(WorkerConfiguration workerConfiguration, WorkerPorts workerPorts) {
return fromWorkerConfiguration(workerConfiguration, workerPorts).orElseGet(() -> sys(workerPorts, workerConfiguration.getNetworkBandwidthInMB()));
}

/**
* Attempts to create a MachineDefinition from the WorkerConfiguration if all necessary parameters are present.
*
* @param workerConfiguration WorkerConfiguration object.
* @param workerPorts Worker ports object.
* @return An Optional of MachineDefinition if all parameters are present in WorkerConfiguration, else empty Optional.
*/
private static Optional<MachineDefinition> fromWorkerConfiguration(WorkerConfiguration workerConfiguration, WorkerPorts workerPorts) {
if (workerConfiguration.getCpuCores() != null && workerConfiguration.getMemoryInMB() != null && workerConfiguration.getDiskInMB() != null) {
return Optional.of(new MachineDefinition(
workerConfiguration.getCpuCores(),
workerConfiguration.getMemoryInMB(),
workerConfiguration.getNetworkBandwidthInMB(),
workerConfiguration.getDiskInMB(),
workerPorts.getNumberOfPorts()));
}
return Optional.empty();
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -157,8 +157,7 @@ public TaskExecutor(
workerConfiguration.getDebugPort(), workerConfiguration.getConsolePort(),
workerConfiguration.getCustomPort(),
workerConfiguration.getSinkPort());
MachineDefinition machineDefinition =
MachineDefinitionUtils.sys(workerPorts, workerConfiguration.getNetworkBandwidthInMB());
MachineDefinition machineDefinition = MachineDefinitionUtils.from(workerConfiguration, workerPorts);
String hostName = workerConfiguration.getExternalAddress();

this.taskExecutorRegistration =
Expand Down

0 comments on commit 56745b3

Please sign in to comment.