Skip to content

Commit

Permalink
[AMORO-3258] Use SimpleFuture to refactor TaskRuntime and eliminate t…
Browse files Browse the repository at this point in the history
…ask lock
  • Loading branch information
majin.nathan committed Oct 17, 2024
1 parent 97e4caf commit ad9ea59
Show file tree
Hide file tree
Showing 16 changed files with 157 additions and 126 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -74,7 +74,7 @@ public class OptimizingQueue extends PersistentBase {

private final QuotaProvider quotaProvider;
private final Queue<TableOptimizingProcess> tableQueue = new LinkedTransferQueue<>();
private final Queue<TaskRuntime<RewriteStageTask>> retryTaskQueue = new LinkedTransferQueue<>();
private final Queue<TaskRuntime<?>> retryTaskQueue = new LinkedTransferQueue<>();
private final SchedulingPolicy scheduler;
private final TableManager tableManager;
private final Executor planExecutor;
Expand Down Expand Up @@ -146,6 +146,14 @@ public void refreshTable(TableRuntime tableRuntime) {

public void releaseTable(TableRuntime tableRuntime) {
scheduler.removeTable(tableRuntime);
List<OptimizingProcess> processList =
tableQueue.stream()
.filter(process -> process.tableRuntime == tableRuntime)
.collect(Collectors.toList());
for (OptimizingProcess process : processList) {
process.close();
clearProcess(process);
}
LOG.info(
"Release queue {} with table {}",
optimizerGroup.getName(),
Expand All @@ -156,15 +164,15 @@ public boolean containsTable(ServerTableIdentifier identifier) {
return scheduler.getTableRuntime(identifier) != null;
}

private void clearProcess(TableOptimizingProcess optimizingProcess) {
private void clearProcess(OptimizingProcess optimizingProcess) {
tableQueue.removeIf(process -> process.getProcessId() == optimizingProcess.getProcessId());
retryTaskQueue.removeIf(
taskRuntime -> taskRuntime.getTaskId().getProcessId() == optimizingProcess.getProcessId());
}

public TaskRuntime pollTask(long maxWaitTime) {
public TaskRuntime<?> pollTask(long maxWaitTime) {
long deadline = calculateDeadline(maxWaitTime);
TaskRuntime task = fetchTask();
TaskRuntime<?> task = fetchTask();
while (task == null && waitTask(deadline)) {
task = fetchTask();
}
Expand All @@ -191,11 +199,12 @@ private boolean waitTask(long waitDeadline) {
}
}

private TaskRuntime fetchTask() {
return Optional.ofNullable(retryTaskQueue.poll()).orElseGet(this::fetchScheduledTask);
private TaskRuntime<?> fetchTask() {
TaskRuntime<?> retryTask = retryTaskQueue.poll();
return retryTask != null ? retryTask : fetchScheduledTask();
}

private TaskRuntime fetchScheduledTask() {
private TaskRuntime<?> fetchScheduledTask() {
return tableQueue.stream()
.map(TableOptimizingProcess::poll)
.filter(Objects::nonNull)
Expand Down Expand Up @@ -302,7 +311,7 @@ public List<TaskRuntime<?>> collectTasks(Predicate<TaskRuntime<?>> predicate) {
.collect(Collectors.toList());
}

public void retryTask(TaskRuntime taskRuntime) {
public void retryTask(TaskRuntime<?> taskRuntime) {
taskRuntime.reset();
retryTaskQueue.offer(taskRuntime);
}
Expand Down Expand Up @@ -344,7 +353,9 @@ SchedulingPolicy getSchedulingPolicy() {
return scheduler;
}

private class TableOptimizingProcess implements OptimizingProcess, TaskRuntime.TaskOwner {
private class TableOptimizingProcess implements OptimizingProcess {

private final Lock lock = new ReentrantLock();
private final long processId;
private final OptimizingType optimizingType;
private final TableRuntime tableRuntime;
Expand All @@ -353,18 +364,17 @@ private class TableOptimizingProcess implements OptimizingProcess, TaskRuntime.T
private final long targetChangeSnapshotId;
private final Map<OptimizingTaskId, TaskRuntime<RewriteStageTask>> taskMap = Maps.newHashMap();
private final Queue<TaskRuntime<RewriteStageTask>> taskQueue = new LinkedList<>();
private final Lock lock = new ReentrantLock();
private volatile Status status = OptimizingProcess.Status.RUNNING;
private volatile String failedReason;
private long endTime = AmoroServiceConstants.INVALID_TIME;
private Map<String, Long> fromSequence = Maps.newHashMap();
private Map<String, Long> toSequence = Maps.newHashMap();
private boolean hasCommitted = false;

public TaskRuntime poll() {
public TaskRuntime<?> poll() {
lock.lock();
try {
return taskQueue.poll();
return status != Status.CLOSED || status != Status.FAILED ? taskQueue.poll() : null;
} finally {
lock.unlock();
}
Expand Down Expand Up @@ -426,16 +436,13 @@ public void close() {
}
this.status = OptimizingProcess.Status.CLOSED;
this.endTime = System.currentTimeMillis();
persistProcessCompleted(false);
clearProcess(this);
persistAndSetCompleted(false);
} finally {
lock.unlock();
}
releaseResourcesIfNecessary();
}

@Override
public void acceptResult(TaskRuntime taskRuntime) {
private void acceptResult(TaskRuntime<?> taskRuntime) {
lock.lock();
try {
try {
Expand All @@ -447,6 +454,9 @@ public void acceptResult(TaskRuntime taskRuntime) {
taskRuntime.getTaskId(),
throwable);
}
if (taskRuntime.getStatus() == TaskRuntime.Status.CANCELED) {
return;
}
if (isClosed()) {
throw new OptimizingClosedException(processId);
}
Expand All @@ -456,7 +466,6 @@ public void acceptResult(TaskRuntime taskRuntime) {
&& tableRuntime.getOptimizingStatus().isProcessing()
&& tableRuntime.getOptimizingStatus() != OptimizingStatus.COMMITTING) {
tableRuntime.beginCommitting();
clearProcess(this);
}
} else if (taskRuntime.getStatus() == TaskRuntime.Status.FAILED) {
if (taskRuntime.getRetry() < tableRuntime.getMaxExecuteRetryCount()) {
Expand All @@ -466,27 +475,18 @@ public void acceptResult(TaskRuntime taskRuntime) {
taskRuntime.getFailReason());
retryTask(taskRuntime);
} else {
clearProcess(this);
this.failedReason = taskRuntime.getFailReason();
this.status = OptimizingProcess.Status.FAILED;
this.endTime = taskRuntime.getEndTime();
persistProcessCompleted(false);
persistAndSetCompleted(false);
clearProcess(this);
}
}
} finally {
lock.unlock();
}
}

// the cleanup of task should be done after unlock to avoid deadlock
@Override
public void releaseResourcesIfNecessary() {
if (this.status == OptimizingProcess.Status.FAILED
|| this.status == OptimizingProcess.Status.CLOSED) {
cancelTasks();
}
}

@Override
public boolean isClosed() {
return status == OptimizingProcess.Status.CLOSED;
Expand Down Expand Up @@ -568,15 +568,14 @@ public void commit() {
buildCommit().commit();
status = Status.SUCCESS;
endTime = System.currentTimeMillis();
persistProcessCompleted(true);
persistAndSetCompleted(true);
clearProcess(this);
} catch (Exception e) {
LOG.error("{} Commit optimizing failed ", tableRuntime.getTableIdentifier(), e);
status = Status.FAILED;
failedReason = ExceptionUtil.getErrorMessage(e, 4000);
endTime = System.currentTimeMillis();
persistProcessCompleted(false);
} finally {
clearProcess(this);
persistAndSetCompleted(false);
}
} finally {
lock.unlock();
Expand Down Expand Up @@ -644,8 +643,13 @@ private void beginAndPersistProcess() {
() -> tableRuntime.beginProcess(this));
}

private void persistProcessCompleted(boolean success) {
private void persistAndSetCompleted(boolean success) {
doAsTransaction(
() -> {
if (!success) {
cancelTasks();
}
},
() ->
doAs(
OptimizingMapper.class,
Expand All @@ -657,7 +661,8 @@ private void persistProcessCompleted(boolean success) {
endTime,
getSummary(),
getFailedReason())),
() -> tableRuntime.completeProcess(success));
() -> tableRuntime.completeProcess(success),
() -> clearProcess(this));
}

/** The cancellation should be invoked outside the process lock to avoid deadlock. */
Expand All @@ -676,7 +681,7 @@ private void loadTaskRuntimes(OptimizingProcess optimizingProcess) {
Map<Integer, RewriteFilesInput> inputs = TaskFilesPersistence.loadTaskInputs(processId);
taskRuntimes.forEach(
taskRuntime -> {
taskRuntime.claimOwnership(this);
taskRuntime.getCompletedFuture().whenCompleted(() -> acceptResult(taskRuntime));
taskRuntime
.getTaskDescriptor()
.setInput(inputs.get(taskRuntime.getTaskId().getTaskId()));
Expand Down Expand Up @@ -706,7 +711,8 @@ private void loadTaskRuntimes(List<RewriteStageTask> taskDescriptors) {
tableRuntime.getTableIdentifier(),
taskRuntime.getTaskId(),
taskRuntime.getSummary());
taskMap.put(taskRuntime.getTaskId(), taskRuntime.claimOwnership(this));
taskRuntime.getCompletedFuture().whenCompleted(() -> acceptResult(taskRuntime));
taskMap.put(taskRuntime.getTaskId(), taskRuntime);
taskQueue.offer(taskRuntime);
}
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -18,6 +18,10 @@

package org.apache.amoro.server.optimizing;

import org.apache.amoro.process.OptimizingStages;
import org.apache.amoro.process.ProcessStage;

// TODO delete this class
public enum OptimizingType {
MINOR(OptimizingStatus.MINOR_OPTIMIZING),
MAJOR(OptimizingStatus.MAJOR_OPTIMIZING),
Expand All @@ -32,4 +36,14 @@ public enum OptimizingType {
public OptimizingStatus getStatus() {
return status;
}

public ProcessStage getStage() {
if (status == OptimizingStatus.MINOR_OPTIMIZING) {
return OptimizingStages.MINOR;
} else if (status == OptimizingStatus.MAJOR_OPTIMIZING) {
return OptimizingStages.MAJOR;
} else {
return OptimizingStages.FULL;
}
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -20,6 +20,7 @@

import org.apache.amoro.optimizing.RewriteFilesInput;
import org.apache.amoro.optimizing.RewriteFilesOutput;
import org.apache.amoro.process.ProcessStage;
import org.apache.amoro.server.dashboard.utils.OptimizingUtil;
import org.apache.amoro.server.persistence.TaskFilesPersistence;
import org.apache.amoro.shade.guava32.com.google.common.base.MoreObjects;
Expand All @@ -34,14 +35,20 @@ public class RewriteStageTask
extends StagedTaskDescriptor<RewriteFilesInput, RewriteFilesOutput, MetricsSummary> {

private String partition;
private ProcessStage stage;

// only for mybatis and could be optimized by type handler afterward
public RewriteStageTask() {}

public RewriteStageTask(
long tableId, String partition, RewriteFilesInput input, Map<String, String> properties) {
OptimizingType optimizingType,
long tableId,
String partition,
RewriteFilesInput input,
Map<String, String> properties) {
super(tableId, input, properties);
this.partition = partition;
this.stage = optimizingType.getStage();
}

@Override
Expand All @@ -66,6 +73,11 @@ protected RewriteFilesOutput deserializeOutput(byte[] outputBytes) {
return TaskFilesPersistence.loadTaskOutput(outputBytes);
}

@Override
public ProcessStage getStage() {
return stage;
}

public String getPartition() {
return partition;
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -20,6 +20,7 @@

import org.apache.amoro.api.OptimizingTask;
import org.apache.amoro.api.OptimizingTaskId;
import org.apache.amoro.process.ProcessStage;
import org.apache.amoro.utils.SerializationUtil;

import java.util.Map;
Expand Down Expand Up @@ -55,6 +56,8 @@ public void reset() {
calculateSummary();
}

public abstract ProcessStage getStage();

protected abstract void calculateSummary();

protected abstract O deserializeOutput(byte[] outputBytes);
Expand Down
Loading

0 comments on commit ad9ea59

Please sign in to comment.