Skip to content

Commit

Permalink
fix failed task will not retry
Browse files Browse the repository at this point in the history
  • Loading branch information
nicochen committed Oct 13, 2023
1 parent 489e9d6 commit 9e20fd8
Showing 1 changed file with 34 additions and 14 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -731,6 +731,40 @@ private List<OptimizeTaskItem> plan(long currentTime) {
for (TableIdentifier tableIdentifier : tableSort) {
try {
TableOptimizeItem tableItem = ServiceContainer.getOptimizeService().getTableOptimizeItem(tableIdentifier);

if (tableItem.optimizeRunning()) {
LOG.debug("{} is running continue", tableIdentifier);

ArcticTable arcticTable = tableItem.getArcticTable(true);

Map<String, String> properties = arcticTable.properties();
int queueId = ServiceContainer.getOptimizeQueueService().getQueueId(properties);

// queue was updated
if (optimizeQueue.getOptimizeQueueMeta().getQueueId() != queueId) {
releaseTable(tableIdentifier);
ServiceContainer.getOptimizeQueueService().getQueue(queueId).bindTable(tableIdentifier);
continue;
}

tableItem.checkTaskExecuteTimeout();
// if enable_optimize is false
if (!tableItem.allowOptimizing()) {
LOG.debug("{} is not enable optimize or retry too frequently continue", tableIdentifier);
continue;
}

// add failed tasks and retry
List<OptimizeTaskItem> toExecuteTasks = addTask(tableItem, Collections.emptyList());
if (!toExecuteTasks.isEmpty()) {
LOG.info("{} add {} failed tasks into queue and retry",
tableItem.getTableIdentifier(), toExecuteTasks.size());
return toExecuteTasks;
} else {
continue;
}
}

if (tableItem.getTableOptimizeRuntime().getOptimizeStatus() != TableOptimizeRuntime.OptimizeStatus.Pending) {
// only table in pending should plan
continue;
Expand All @@ -754,20 +788,6 @@ private List<OptimizeTaskItem> plan(long currentTime) {
continue;
}

if (tableItem.optimizeRunning()) {
LOG.debug("{} is running continue", tableIdentifier);

// add failed tasks and retry
List<OptimizeTaskItem> toExecuteTasks = addTask(tableItem, Collections.emptyList());
if (!toExecuteTasks.isEmpty()) {
LOG.info("{} add {} failed tasks into queue and retry",
tableItem.getTableIdentifier(), toExecuteTasks.size());
return toExecuteTasks;
} else {
continue;
}
}

OptimizePlanResult optimizePlanResult = OptimizePlanResult.EMPTY;
long startPlanTime = System.currentTimeMillis();
if (tableItem.startPlanIfNot()) {
Expand Down

0 comments on commit 9e20fd8

Please sign in to comment.