Skip to content

Commit

Permalink
simplified TaskSchedulingContext
Browse files Browse the repository at this point in the history
  • Loading branch information
shounakmk219 committed Feb 3, 2025
1 parent 7040e3b commit 1fc7a9d
Show file tree
Hide file tree
Showing 12 changed files with 382 additions and 125 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -34,6 +34,7 @@
import java.net.URI;
import java.util.ArrayList;
import java.util.Arrays;
import java.util.Collections;
import java.util.HashMap;
import java.util.HashSet;
import java.util.List;
Expand Down Expand Up @@ -88,6 +89,7 @@
import org.apache.pinot.core.auth.TargetType;
import org.apache.pinot.core.minion.PinotTaskConfig;
import org.apache.pinot.spi.config.task.AdhocTaskConfig;
import org.apache.pinot.spi.utils.CommonConstants;
import org.apache.pinot.spi.utils.JsonUtils;
import org.glassfish.grizzly.http.server.Request;
import org.glassfish.jersey.server.ManagedAsync;
Expand Down Expand Up @@ -649,43 +651,25 @@ public Map<String, String> scheduleTasks(
List<String> generationErrors = new ArrayList<>();
List<String> schedulingErrors = new ArrayList<>();
TaskSchedulingContext context = new TaskSchedulingContext()
.setTriggeredBy(PinotTaskManager.Triggers.MANUAL_TRIGGER.name())
.setTriggeredBy(CommonConstants.TaskTriggers.MANUAL_TRIGGER.name())
.setMinionInstanceTag(minionInstanceTag)
.setLeader(false);
if (taskType != null) {
Map<String, Set<String>> tableToTaskNamesMap = new HashMap<>();
Set<String> taskTypes = new HashSet<>(1);
taskTypes.add(taskType);
// Schedule task for the given task type
if (tableName != null) {
tableToTaskNamesMap.put(DatabaseUtils.translateTableName(tableName, headers), taskTypes);
} else {
_pinotHelixResourceManager.getAllTables(database).forEach(table -> tableToTaskNamesMap.put(table, taskTypes));
}
context.setTableToTaskNamesMap(tableToTaskNamesMap);
TaskSchedulingInfo taskInfos = _pinotTaskManager.scheduleTasks(context).get(taskType);
response.put(taskType, StringUtils.join(taskInfos.getScheduledTaskNames(), ','));
generationErrors.addAll(taskInfos.getGenerationErrors());
schedulingErrors.addAll(taskInfos.getSchedulingErrors());
context.setTasksToSchedule(Collections.singleton(taskType));
}
if (tableName != null) {
context.setTablesToSchedule(Collections.singleton(DatabaseUtils.translateTableName(tableName, headers)));
} else {
Map<String, Set<String>> tableToTaskNamesMap = new HashMap<>();
// Schedule tasks for all task types
if (tableName != null) {
tableToTaskNamesMap.put(DatabaseUtils.translateTableName(tableName, headers), null);
} else {
_pinotHelixResourceManager.getAllTables(database)
.forEach(table -> tableToTaskNamesMap.put(table, null));
}
context.setTableToTaskNamesMap(tableToTaskNamesMap);
Map<String, TaskSchedulingInfo> allTaskInfos = _pinotTaskManager.scheduleTasks(context);
allTaskInfos.forEach((key, value) -> {
if (value.getScheduledTaskNames() != null) {
response.put(key, String.join(",", value.getScheduledTaskNames()));
}
generationErrors.addAll(value.getGenerationErrors());
schedulingErrors.addAll(value.getSchedulingErrors());
});
context.setDatabasesToSchedule(Collections.singleton(database));
}
Map<String, TaskSchedulingInfo> allTaskInfos = _pinotTaskManager.scheduleTasks(context);
allTaskInfos.forEach((key, value) -> {
if (value.getScheduledTaskNames() != null) {
response.put(key, String.join(",", value.getScheduledTaskNames()));
}
generationErrors.addAll(value.getGenerationErrors());
schedulingErrors.addAll(value.getSchedulingErrors());
});
response.put(GENERATION_ERRORS_KEY, String.join(",", generationErrors));
response.put(SCHEDULING_ERRORS_KEY, String.join(",", schedulingErrors));
return response;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -18,11 +18,13 @@
*/
package org.apache.pinot.controller.helix.core.minion;

import java.util.Collections;
import java.util.Date;
import java.util.concurrent.TimeUnit;
import org.apache.pinot.common.metrics.ControllerMeter;
import org.apache.pinot.common.metrics.ControllerTimer;
import org.apache.pinot.controller.LeadControllerManager;
import org.apache.pinot.spi.utils.CommonConstants;
import org.quartz.Job;
import org.quartz.JobExecutionContext;
import org.quartz.JobExecutionException;
Expand Down Expand Up @@ -64,8 +66,10 @@ public void execute(JobExecutionContext jobExecutionContext)
ControllerMeter.CRON_SCHEDULER_JOB_SKIPPED, 1L);
return;
}
TaskSchedulingContext context = new TaskSchedulingContext(table, taskType)
.setTriggeredBy(PinotTaskManager.Triggers.CRON_TRIGGER.name());
TaskSchedulingContext context = new TaskSchedulingContext()
.setTablesToSchedule(Collections.singleton(table))
.setTasksToSchedule(Collections.singleton(taskType))
.setTriggeredBy(CommonConstants.TaskTriggers.CRON_TRIGGER.name());
long jobStartTime = System.currentTimeMillis();
pinotTaskManager.scheduleTasks(context);
LOGGER.info("Finished CronJob: table - {}, task - {}, next runtime is {}", table, taskType,
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -18,9 +18,11 @@
*/
package org.apache.pinot.controller.helix.core.minion;

import com.google.common.base.Preconditions;
import java.io.PrintWriter;
import java.io.StringWriter;
import java.util.ArrayList;
import java.util.Collections;
import java.util.Date;
import java.util.HashMap;
import java.util.HashSet;
Expand Down Expand Up @@ -114,10 +116,6 @@ public class PinotTaskManager extends ControllerPeriodicTask<Void> {

private final TaskManagerStatusCache<TaskGeneratorMostRecentRunInfo> _taskManagerStatusCache;

public enum Triggers {
CRON_TRIGGER, MANUAL_TRIGGER, ADHOC_TRIGGER
}

public PinotTaskManager(PinotHelixTaskResourceManager helixTaskResourceManager,
PinotHelixResourceManager helixResourceManager, LeadControllerManager leadControllerManager,
ControllerConf controllerConf, ControllerMetrics controllerMetrics,
Expand Down Expand Up @@ -212,8 +210,8 @@ public Map<String, String> createTask(String taskType, String tableName, @Nullab
LOGGER.warn("No ad-hoc task generated for task type: {}", taskType);
continue;
}
pinotTaskConfigs.forEach(pinotTaskConfig ->
pinotTaskConfig.getConfigs().computeIfAbsent(TRIGGERED_BY, k -> Triggers.ADHOC_TRIGGER.name()));
pinotTaskConfigs.forEach(pinotTaskConfig -> pinotTaskConfig.getConfigs()
.computeIfAbsent(TRIGGERED_BY, k -> CommonConstants.TaskTriggers.ADHOC_TRIGGER.name()));
LOGGER.info("Submitting ad-hoc task for task type: {} with task configs: {}", taskType, pinotTaskConfigs);
_controllerMetrics.addMeteredTableValue(taskType, ControllerMeter.NUMBER_ADHOC_TASKS_SUBMITTED, 1);
responseMap.put(tableNameWithType,
Expand Down Expand Up @@ -489,6 +487,115 @@ public void registerTaskGenerator(PinotTaskGenerator taskGenerator) {
_taskGeneratorRegistry.registerTaskGenerator(taskGenerator);
}

/**
* Schedules tasks (all task types) for all tables.
* It might be called from the non-leader controller.
* Returns a map from the task type to the {@link TaskSchedulingInfo} of tasks scheduled.
*/
@Deprecated(forRemoval = true)
public synchronized Map<String, TaskSchedulingInfo> scheduleAllTasksForAllTables(@Nullable String minionInstanceTag) {
TaskSchedulingContext context = new TaskSchedulingContext()
.setMinionInstanceTag(minionInstanceTag);
return scheduleTasks(context);
}

/**
* Schedules tasks (all task types) for all tables in the given database.
* It might be called from the non-leader controller.
* Returns a map from the task type to the {@link TaskSchedulingInfo} of tasks scheduled.
*/
@Deprecated(forRemoval = true)
public synchronized Map<String, TaskSchedulingInfo> scheduleAllTasksForDatabase(@Nullable String database,
@Nullable String minionInstanceTag) {
TaskSchedulingContext context = new TaskSchedulingContext()
.setDatabasesToSchedule(Collections.singleton(database))
.setMinionInstanceTag(minionInstanceTag);
return scheduleTasks(context);
}

/**
* Schedules tasks (all task types) for the given table.
* It might be called from the non-leader controller.
* Returns a map from the task type to the {@link TaskSchedulingInfo} of tasks scheduled.
*/
@Deprecated(forRemoval = true)
public synchronized Map<String, TaskSchedulingInfo> scheduleAllTasksForTable(String tableNameWithType,
@Nullable String minionInstanceTag) {
TaskSchedulingContext context = new TaskSchedulingContext()
.setTablesToSchedule(Collections.singleton(tableNameWithType))
.setMinionInstanceTag(minionInstanceTag);
return scheduleTasks(context);
}

/**
* Schedules task for the given task type for all tables.
* It might be called from the non-leader controller.
* Returns {@link TaskSchedulingInfo} which consists
* - list of scheduled task names (empty list if nothing to schedule),
* or {@code null} if no task is scheduled due to scheduling errors.
* - list of task generation errors if any
* - list of task scheduling errors if any
*/
@Deprecated(forRemoval = true)
public synchronized TaskSchedulingInfo scheduleTaskForAllTables(String taskType, @Nullable String minionInstanceTag) {
TaskSchedulingContext context = new TaskSchedulingContext()
.setTasksToSchedule(Collections.singleton(taskType))
.setMinionInstanceTag(minionInstanceTag);
return scheduleTasks(context).get(taskType);
}

/**
* Schedules task for the given task type for all tables in the given database.
* It might be called from the non-leader controller.
* Returns {@link TaskSchedulingInfo} which consists
* - list of scheduled task names (empty list if nothing to schedule),
* or {@code null} if no task is scheduled due to scheduling errors.
* - list of task generation errors if any
* - list of task scheduling errors if any
*/
@Deprecated(forRemoval = true)
public synchronized TaskSchedulingInfo scheduleTaskForDatabase(String taskType, @Nullable String database,
@Nullable String minionInstanceTag) {
TaskSchedulingContext context = new TaskSchedulingContext()
.setTasksToSchedule(Collections.singleton(taskType))
.setDatabasesToSchedule(Collections.singleton(database))
.setMinionInstanceTag(minionInstanceTag);
return scheduleTasks(context).get(taskType);
}

/**
* Schedules task for the given task type for the give table.
* It might be called from the non-leader controller.
* Returns {@link TaskSchedulingInfo} which consists
* - list of scheduled task names (empty list if nothing to schedule),
* or {@code null} if no task is scheduled due to scheduling errors.
* - list of task generation errors if any
* - list of task scheduling errors if any
*/
@Deprecated(forRemoval = true)
public synchronized TaskSchedulingInfo scheduleTaskForTable(String taskType, String tableNameWithType,
@Nullable String minionInstanceTag) {
TaskSchedulingContext context = new TaskSchedulingContext()
.setTasksToSchedule(Collections.singleton(taskType))
.setTablesToSchedule(Collections.singleton(tableNameWithType))
.setMinionInstanceTag(minionInstanceTag);
return scheduleTasks(context).get(taskType);
}

/**
* Helper method to schedule tasks (all task types) for the given tables that have the tasks enabled.
* Returns a map from the task type to the {@link TaskSchedulingInfo} of the tasks scheduled.
*/
@Deprecated(forRemoval = true)
protected synchronized Map<String, TaskSchedulingInfo> scheduleTasks(List<String> tableNamesWithType,
boolean isLeader, @Nullable String minionInstanceTag) {
TaskSchedulingContext context = new TaskSchedulingContext()
.setTablesToSchedule(new HashSet<>(tableNamesWithType))
.setLeader(isLeader)
.setMinionInstanceTag(minionInstanceTag);
return scheduleTasks(context);
}

/**
* Helper method to schedule tasks (all task types) for the given tables that have the tasks enabled.
* Returns a map from the task type to the {@link TaskSchedulingInfo} of the tasks scheduled.
Expand All @@ -497,22 +604,28 @@ public synchronized Map<String, TaskSchedulingInfo> scheduleTasks(TaskScheduling
_controllerMetrics.addMeteredGlobalValue(ControllerMeter.NUMBER_TIMES_SCHEDULE_TASKS_CALLED, 1L);

Map<String, List<TableConfig>> enabledTableConfigMap = new HashMap<>();
Map<String, Set<String>> tableToTasksMap = context.getTableToTaskNamesMap();
if (context.getTableToTaskNamesMap().isEmpty()) {
_pinotHelixResourceManager.getAllTables().forEach(table -> tableToTasksMap.put(table, null));
}
for (Map.Entry<String, Set<String>> entry : context.getTableToTaskNamesMap().entrySet()) {
String tableNameWithType = entry.getKey();
Set<String> taskNames = entry.getValue();
Set<String> tablesToSchedule = context.getTablesToSchedule();
Set<String> databasesToSchedule = context.getDatabasesToSchedule();
Set<String> tasksToSchedule = context.getTasksToSchedule();
Set<String> finalTablesToSchedule = new HashSet<>();
if (tablesToSchedule != null) {
finalTablesToSchedule.addAll(tablesToSchedule);
}
if (databasesToSchedule != null) {
databasesToSchedule.forEach(database ->
finalTablesToSchedule.addAll(_pinotHelixResourceManager.getAllTables(database)));
}
for (String tableNameWithType : finalTablesToSchedule.isEmpty()
? _pinotHelixResourceManager.getAllTables() : finalTablesToSchedule) {
TableConfig tableConfig = _pinotHelixResourceManager.getTableConfig(tableNameWithType);
if (tableConfig != null && tableConfig.getTaskConfig() != null) {
Set<String> enabledTaskTypes = tableConfig.getTaskConfig().getTaskTypeConfigsMap().keySet();
Set<String> validTasks;
if (taskNames == null || taskNames.isEmpty()) {
if (tasksToSchedule == null || tasksToSchedule.isEmpty()) {
// if no specific task types are provided schedule for all tasks
validTasks = enabledTaskTypes;
} else {
validTasks = new HashSet<>(taskNames);
validTasks = new HashSet<>(tasksToSchedule);
validTasks.retainAll(enabledTaskTypes);
}
for (String taskType : validTasks) {
Expand Down Expand Up @@ -546,6 +659,28 @@ public synchronized Map<String, TaskSchedulingInfo> scheduleTasks(TaskScheduling
return tasksScheduled;
}

@Deprecated(forRemoval = true)
protected synchronized TaskSchedulingInfo scheduleTask(String taskType, List<String> tables,
@Nullable String minionInstanceTag) {
PinotTaskGenerator taskGenerator = _taskGeneratorRegistry.getTaskGenerator(taskType);
Preconditions.checkState(taskGenerator != null, "Task type: %s is not registered", taskType);

// Scan all table configs to get the tables with task enabled
List<TableConfig> enabledTableConfigs = new ArrayList<>();
for (String tableNameWithType : tables) {
TableConfig tableConfig = _pinotHelixResourceManager.getTableConfig(tableNameWithType);
if (tableConfig != null && tableConfig.getTaskConfig() != null && tableConfig.getTaskConfig()
.isTaskTypeEnabled(taskType)) {
enabledTableConfigs.add(tableConfig);
}
}

_helixTaskResourceManager.ensureTaskQueueExists(taskType);
addTaskTypeMetricsUpdaterIfNeeded(taskType);
return scheduleTask(taskGenerator, enabledTableConfigs, false, minionInstanceTag,
CommonConstants.TaskTriggers.UNKNOWN.name());
}

/**
* Helper method to schedule task with the given task generator for the given tables that have the task enabled.
* Returns
Expand Down Expand Up @@ -649,9 +784,9 @@ protected TaskSchedulingInfo scheduleTask(PinotTaskGenerator taskGenerator, List

@Override
protected void processTables(List<String> tableNamesWithType, Properties taskProperties) {
TaskSchedulingContext context = new TaskSchedulingContext(tableNamesWithType)
TaskSchedulingContext context = new TaskSchedulingContext()
.setLeader(true)
.setTriggeredBy(Triggers.CRON_TRIGGER.name());
.setTriggeredBy(CommonConstants.TaskTriggers.CRON_TRIGGER.name());
// cron schedule
scheduleTasks(context);
}
Expand Down
Loading

0 comments on commit 1fc7a9d

Please sign in to comment.