Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

[AMORO-3258] Use SimpleFuture to refactor TaskRuntime and eliminate t… #3259

Open
wants to merge 1 commit into
base: master
Choose a base branch
from
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -74,7 +74,7 @@ public class OptimizingQueue extends PersistentBase {

private final QuotaProvider quotaProvider;
private final Queue<TableOptimizingProcess> tableQueue = new LinkedTransferQueue<>();
private final Queue<TaskRuntime<RewriteStageTask>> retryTaskQueue = new LinkedTransferQueue<>();
private final Queue<TaskRuntime<?>> retryTaskQueue = new LinkedTransferQueue<>();
private final SchedulingPolicy scheduler;
private final TableManager tableManager;
private final Executor planExecutor;
Expand Down Expand Up @@ -146,6 +146,14 @@ public void refreshTable(TableRuntime tableRuntime) {

public void releaseTable(TableRuntime tableRuntime) {
scheduler.removeTable(tableRuntime);
List<OptimizingProcess> processList =
tableQueue.stream()
.filter(process -> process.tableRuntime == tableRuntime)
.collect(Collectors.toList());
for (OptimizingProcess process : processList) {
process.close();
clearProcess(process);
}
LOG.info(
"Release queue {} with table {}",
optimizerGroup.getName(),
Expand All @@ -156,15 +164,15 @@ public boolean containsTable(ServerTableIdentifier identifier) {
return scheduler.getTableRuntime(identifier) != null;
}

private void clearProcess(TableOptimizingProcess optimizingProcess) {
private void clearProcess(OptimizingProcess optimizingProcess) {
tableQueue.removeIf(process -> process.getProcessId() == optimizingProcess.getProcessId());
retryTaskQueue.removeIf(
taskRuntime -> taskRuntime.getTaskId().getProcessId() == optimizingProcess.getProcessId());
}

public TaskRuntime pollTask(long maxWaitTime) {
public TaskRuntime<?> pollTask(long maxWaitTime) {
long deadline = calculateDeadline(maxWaitTime);
TaskRuntime task = fetchTask();
TaskRuntime<?> task = fetchTask();
while (task == null && waitTask(deadline)) {
task = fetchTask();
}
Expand All @@ -191,11 +199,12 @@ private boolean waitTask(long waitDeadline) {
}
}

private TaskRuntime fetchTask() {
return Optional.ofNullable(retryTaskQueue.poll()).orElseGet(this::fetchScheduledTask);
private TaskRuntime<?> fetchTask() {
TaskRuntime<?> retryTask = retryTaskQueue.poll();
return retryTask != null ? retryTask : fetchScheduledTask();
}

private TaskRuntime fetchScheduledTask() {
private TaskRuntime<?> fetchScheduledTask() {
return tableQueue.stream()
.map(TableOptimizingProcess::poll)
.filter(Objects::nonNull)
Expand Down Expand Up @@ -302,7 +311,7 @@ public List<TaskRuntime<?>> collectTasks(Predicate<TaskRuntime<?>> predicate) {
.collect(Collectors.toList());
}

public void retryTask(TaskRuntime taskRuntime) {
public void retryTask(TaskRuntime<?> taskRuntime) {
taskRuntime.reset();
retryTaskQueue.offer(taskRuntime);
}
Expand Down Expand Up @@ -344,7 +353,9 @@ SchedulingPolicy getSchedulingPolicy() {
return scheduler;
}

private class TableOptimizingProcess implements OptimizingProcess, TaskRuntime.TaskOwner {
private class TableOptimizingProcess implements OptimizingProcess {

private final Lock lock = new ReentrantLock();
private final long processId;
private final OptimizingType optimizingType;
private final TableRuntime tableRuntime;
Expand All @@ -353,18 +364,17 @@ private class TableOptimizingProcess implements OptimizingProcess, TaskRuntime.T
private final long targetChangeSnapshotId;
private final Map<OptimizingTaskId, TaskRuntime<RewriteStageTask>> taskMap = Maps.newHashMap();
private final Queue<TaskRuntime<RewriteStageTask>> taskQueue = new LinkedList<>();
private final Lock lock = new ReentrantLock();
private volatile Status status = OptimizingProcess.Status.RUNNING;
private volatile String failedReason;
private long endTime = AmoroServiceConstants.INVALID_TIME;
private Map<String, Long> fromSequence = Maps.newHashMap();
private Map<String, Long> toSequence = Maps.newHashMap();
private boolean hasCommitted = false;

public TaskRuntime poll() {
public TaskRuntime<?> poll() {
lock.lock();
try {
return taskQueue.poll();
return status != Status.CLOSED || status != Status.FAILED ? taskQueue.poll() : null;
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

status != Status.CLOSED && status != Status.FAILED

?

} finally {
lock.unlock();
}
Expand Down Expand Up @@ -426,16 +436,13 @@ public void close() {
}
this.status = OptimizingProcess.Status.CLOSED;
this.endTime = System.currentTimeMillis();
persistProcessCompleted(false);
clearProcess(this);
persistAndSetCompleted(false);
} finally {
lock.unlock();
}
releaseResourcesIfNecessary();
}

@Override
public void acceptResult(TaskRuntime taskRuntime) {
private void acceptResult(TaskRuntime<?> taskRuntime) {
lock.lock();
try {
try {
Expand All @@ -447,6 +454,9 @@ public void acceptResult(TaskRuntime taskRuntime) {
taskRuntime.getTaskId(),
throwable);
}
if (taskRuntime.getStatus() == TaskRuntime.Status.CANCELED) {
return;
}
if (isClosed()) {
throw new OptimizingClosedException(processId);
}
Expand All @@ -456,7 +466,6 @@ public void acceptResult(TaskRuntime taskRuntime) {
&& tableRuntime.getOptimizingStatus().isProcessing()
&& tableRuntime.getOptimizingStatus() != OptimizingStatus.COMMITTING) {
tableRuntime.beginCommitting();
clearProcess(this);
}
} else if (taskRuntime.getStatus() == TaskRuntime.Status.FAILED) {
if (taskRuntime.getRetry() < tableRuntime.getMaxExecuteRetryCount()) {
Expand All @@ -466,27 +475,18 @@ public void acceptResult(TaskRuntime taskRuntime) {
taskRuntime.getFailReason());
retryTask(taskRuntime);
} else {
clearProcess(this);
this.failedReason = taskRuntime.getFailReason();
this.status = OptimizingProcess.Status.FAILED;
this.endTime = taskRuntime.getEndTime();
persistProcessCompleted(false);
persistAndSetCompleted(false);
clearProcess(this);
}
}
} finally {
lock.unlock();
}
}

// the cleanup of task should be done after unlock to avoid deadlock
@Override
public void releaseResourcesIfNecessary() {
if (this.status == OptimizingProcess.Status.FAILED
|| this.status == OptimizingProcess.Status.CLOSED) {
cancelTasks();
}
}

@Override
public boolean isClosed() {
return status == OptimizingProcess.Status.CLOSED;
Expand Down Expand Up @@ -568,15 +568,14 @@ public void commit() {
buildCommit().commit();
status = Status.SUCCESS;
endTime = System.currentTimeMillis();
persistProcessCompleted(true);
persistAndSetCompleted(true);
clearProcess(this);
} catch (Exception e) {
LOG.error("{} Commit optimizing failed ", tableRuntime.getTableIdentifier(), e);
status = Status.FAILED;
failedReason = ExceptionUtil.getErrorMessage(e, 4000);
endTime = System.currentTimeMillis();
persistProcessCompleted(false);
} finally {
clearProcess(this);
persistAndSetCompleted(false);
}
} finally {
lock.unlock();
Expand Down Expand Up @@ -644,8 +643,13 @@ private void beginAndPersistProcess() {
() -> tableRuntime.beginProcess(this));
}

private void persistProcessCompleted(boolean success) {
private void persistAndSetCompleted(boolean success) {
doAsTransaction(
() -> {
if (!success) {
cancelTasks();
}
},
() ->
doAs(
OptimizingMapper.class,
Expand All @@ -657,7 +661,8 @@ private void persistProcessCompleted(boolean success) {
endTime,
getSummary(),
getFailedReason())),
() -> tableRuntime.completeProcess(success));
() -> tableRuntime.completeProcess(success),
() -> clearProcess(this));
}

/** The cancellation should be invoked outside the process lock to avoid deadlock. */
Expand All @@ -676,7 +681,7 @@ private void loadTaskRuntimes(OptimizingProcess optimizingProcess) {
Map<Integer, RewriteFilesInput> inputs = TaskFilesPersistence.loadTaskInputs(processId);
taskRuntimes.forEach(
taskRuntime -> {
taskRuntime.claimOwnership(this);
taskRuntime.getCompletedFuture().whenCompleted(() -> acceptResult(taskRuntime));
taskRuntime
.getTaskDescriptor()
.setInput(inputs.get(taskRuntime.getTaskId().getTaskId()));
Expand Down Expand Up @@ -706,7 +711,8 @@ private void loadTaskRuntimes(List<RewriteStageTask> taskDescriptors) {
tableRuntime.getTableIdentifier(),
taskRuntime.getTaskId(),
taskRuntime.getSummary());
taskMap.put(taskRuntime.getTaskId(), taskRuntime.claimOwnership(this));
taskRuntime.getCompletedFuture().whenCompleted(() -> acceptResult(taskRuntime));
taskMap.put(taskRuntime.getTaskId(), taskRuntime);
taskQueue.offer(taskRuntime);
}
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -18,6 +18,10 @@

package org.apache.amoro.server.optimizing;

import org.apache.amoro.process.OptimizingStages;
import org.apache.amoro.process.ProcessStage;

// TODO delete this class
public enum OptimizingType {
MINOR(OptimizingStatus.MINOR_OPTIMIZING),
MAJOR(OptimizingStatus.MAJOR_OPTIMIZING),
Expand All @@ -32,4 +36,14 @@ public enum OptimizingType {
public OptimizingStatus getStatus() {
return status;
}

public ProcessStage getStage() {
if (status == OptimizingStatus.MINOR_OPTIMIZING) {
return OptimizingStages.MINOR;
} else if (status == OptimizingStatus.MAJOR_OPTIMIZING) {
return OptimizingStages.MAJOR;
} else {
return OptimizingStages.FULL;
}
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -20,6 +20,7 @@

import org.apache.amoro.optimizing.RewriteFilesInput;
import org.apache.amoro.optimizing.RewriteFilesOutput;
import org.apache.amoro.process.ProcessStage;
import org.apache.amoro.server.dashboard.utils.OptimizingUtil;
import org.apache.amoro.server.persistence.TaskFilesPersistence;
import org.apache.amoro.shade.guava32.com.google.common.base.MoreObjects;
Expand All @@ -34,14 +35,20 @@ public class RewriteStageTask
extends StagedTaskDescriptor<RewriteFilesInput, RewriteFilesOutput, MetricsSummary> {

private String partition;
private ProcessStage stage;

// only for mybatis and could be optimized by type handler afterward
public RewriteStageTask() {}

public RewriteStageTask(
long tableId, String partition, RewriteFilesInput input, Map<String, String> properties) {
OptimizingType optimizingType,
long tableId,
String partition,
RewriteFilesInput input,
Map<String, String> properties) {
super(tableId, input, properties);
this.partition = partition;
this.stage = optimizingType.getStage();
}

@Override
Expand All @@ -66,6 +73,11 @@ protected RewriteFilesOutput deserializeOutput(byte[] outputBytes) {
return TaskFilesPersistence.loadTaskOutput(outputBytes);
}

@Override
public ProcessStage getStage() {
return stage;
}

public String getPartition() {
return partition;
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -20,6 +20,7 @@

import org.apache.amoro.api.OptimizingTask;
import org.apache.amoro.api.OptimizingTaskId;
import org.apache.amoro.process.ProcessStage;
import org.apache.amoro.utils.SerializationUtil;

import java.util.Map;
Expand Down Expand Up @@ -55,6 +56,8 @@ public void reset() {
calculateSummary();
}

public abstract ProcessStage getStage();

protected abstract void calculateSummary();

protected abstract O deserializeOutput(byte[] outputBytes);
Expand Down
Loading
Loading