Skip to content

Commit

Permalink
[ARCTIC-1737] Fix the tasks of ScheduledExecutorService that halts …
Browse files Browse the repository at this point in the history
…when an exception or error is thrown (apache#1739)

add try-catch for scheduleWithFixedDelay
  • Loading branch information
wangtaohz authored Jul 26, 2023
1 parent 9f005a0 commit 7665174
Show file tree
Hide file tree
Showing 5 changed files with 78 additions and 55 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -28,6 +28,8 @@
import com.netease.arctic.ams.server.service.ServiceContainer;
import com.netease.arctic.optimizer.Optimizer;
import com.netease.arctic.optimizer.factory.OptimizerFactory;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

import java.io.Closeable;
import java.io.IOException;
Expand Down Expand Up @@ -111,23 +113,28 @@ public OptimizerFactory findOptimizerFactory(String type) throws NoSuchObjectExc

@Override
public void close() throws IOException {

}

public static class OptimizerMonitor {
private static final Logger LOG = LoggerFactory.getLogger(OptimizerMonitor.class);

private static final long OPTIMIZER_JOB_TIMEOUT = 10 * 60 * 1000;

public void monitorStatus() {
long currentTime = System.currentTimeMillis();
List<com.netease.arctic.ams.server.model.Optimizer> optimizers =
ServiceContainer.getOptimizerService().getOptimizers();
optimizers.forEach(optimizer -> {
if ((currentTime - optimizer.getUpdateTime().getTime()) > OPTIMIZER_JOB_TIMEOUT) {
ServiceContainer.getOptimizerService()
.updateOptimizerStatus(optimizer.getJobId(), TableTaskStatus.FAILED);
}
});
try {
long currentTime = System.currentTimeMillis();
List<com.netease.arctic.ams.server.model.Optimizer> optimizers =
ServiceContainer.getOptimizerService().getOptimizers();
optimizers.forEach(optimizer -> {
if ((currentTime - optimizer.getUpdateTime().getTime()) > OPTIMIZER_JOB_TIMEOUT) {
ServiceContainer.getOptimizerService()
.updateOptimizerStatus(optimizer.getJobId(), TableTaskStatus.FAILED);
}
});
} catch (Throwable t) {
LOG.error("monitor optimizer status failed", t);
}
}
}
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -75,18 +75,22 @@ public class OrphanFilesCleanService implements IOrphanFilesCleanService {

@Override
public synchronized void checkOrphanFilesCleanTasks() {
LOG.info("Schedule Orphan Cleaner");
if (cleanTasks == null) {
cleanTasks = new ScheduledTasks<>(ThreadPool.Type.ORPHAN);
}
try {
LOG.info("Schedule Orphan Cleaner");
if (cleanTasks == null) {
cleanTasks = new ScheduledTasks<>(ThreadPool.Type.ORPHAN);
}

Set<TableIdentifier> tableIds = CatalogUtil.loadTablesFromCatalog();
cleanTasks.checkRunningTask(tableIds,
() -> 0L,
() -> CHECK_INTERVAL,
TableOrphanFileClean::new,
true);
LOG.info("Schedule Orphan Cleaner finished with {} tasks", tableIds.size());
Set<TableIdentifier> tableIds = CatalogUtil.loadTablesFromCatalog();
cleanTasks.checkRunningTask(tableIds,
() -> 0L,
() -> CHECK_INTERVAL,
TableOrphanFileClean::new,
true);
LOG.info("Schedule Orphan Cleaner finished with {} tasks", tableIds.size());
} catch (Throwable t) {
LOG.error("unexpected error when checkOrphanFilesCleanTasks", t);
}
}

@Override
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -65,19 +65,23 @@ public class SupportHiveSyncService implements ISupportHiveSyncService {

@Override
public void checkHiveSyncTasks() {
LOG.info("Schedule Support Hive Sync");
if (syncTasks == null) {
syncTasks = new ScheduledTasks<>(ThreadPool.Type.HIVE_SYNC);
try {
LOG.info("Schedule Support Hive Sync");
if (syncTasks == null) {
syncTasks = new ScheduledTasks<>(ThreadPool.Type.HIVE_SYNC);
}
List<TableMetadata> tables = ServiceContainer.getMetaService().listTables();
Set<TableIdentifier> ids =
tables.stream().map(TableMetadata::getTableIdentifier).collect(Collectors.toSet());
syncTasks.checkRunningTask(ids,
() -> 0L,
() -> SYNC_INTERVAL,
SupportHiveSyncService.SupportHiveSyncTask::new,
false);
LOG.info("Schedule Support Hive Sync finished with {} valid ids", ids.size());
} catch (Throwable t) {
LOG.error("Schedule Support Hive Sync failed", t);
}
List<TableMetadata> tables = ServiceContainer.getMetaService().listTables();
Set<TableIdentifier> ids =
tables.stream().map(TableMetadata::getTableIdentifier).collect(Collectors.toSet());
syncTasks.checkRunningTask(ids,
() -> 0L,
() -> SYNC_INTERVAL,
SupportHiveSyncService.SupportHiveSyncTask::new,
false);
LOG.info("Schedule Support Hive Sync finished with {} valid ids", ids.size());
}

@Override
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -75,18 +75,22 @@ public class TableExpireService implements ITableExpireService {

@Override
public synchronized void checkTableExpireTasks() {
LOG.info("Schedule Expired Cleaner");
if (cleanTasks == null) {
cleanTasks = new ScheduledTasks<>(ThreadPool.Type.EXPIRE);
}
try {
LOG.info("Schedule Expired Cleaner");
if (cleanTasks == null) {
cleanTasks = new ScheduledTasks<>(ThreadPool.Type.EXPIRE);
}

Set<TableIdentifier> tableIds = CatalogUtil.loadTablesFromCatalog();
cleanTasks.checkRunningTask(tableIds,
() -> 0L,
() -> EXPIRE_INTERVAL,
TableExpireTask::new,
false);
LOG.info("Schedule Expired Cleaner finished with {} valid ids", tableIds.size());
Set<TableIdentifier> tableIds = CatalogUtil.loadTablesFromCatalog();
cleanTasks.checkRunningTask(tableIds,
() -> 0L,
() -> EXPIRE_INTERVAL,
TableExpireTask::new,
false);
LOG.info("Schedule Expired Cleaner finished with {} valid ids", tableIds.size());
} catch (Throwable t) {
LOG.error("unexpected error when checkTableExpireTasks", t);
}
}

@Override
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -50,18 +50,22 @@ public class TrashCleanService implements Closeable {
private ScheduledTasks<TableIdentifier, TableTrashCleanTask> cleanTasks;

public synchronized void checkTrashCleanTasks() {
LOG.info("Schedule Trash Cleaner");
if (cleanTasks == null) {
cleanTasks = new ScheduledTasks<>(ThreadPool.Type.TRASH_CLEAN);
}
try {
LOG.info("Schedule Trash Cleaner");
if (cleanTasks == null) {
cleanTasks = new ScheduledTasks<>(ThreadPool.Type.TRASH_CLEAN);
}

Set<TableIdentifier> tableIds = CatalogUtil.loadTablesFromCatalog();
cleanTasks.checkRunningTask(tableIds,
() -> RandomUtils.nextLong(0, CHECK_INTERVAL),
() -> CHECK_INTERVAL,
TableTrashCleanTask::new,
true);
LOG.info("Schedule Trash Cleaner finished with {} tasks", tableIds.size());
Set<TableIdentifier> tableIds = CatalogUtil.loadTablesFromCatalog();
cleanTasks.checkRunningTask(tableIds,
() -> RandomUtils.nextLong(0, CHECK_INTERVAL),
() -> CHECK_INTERVAL,
TableTrashCleanTask::new,
true);
LOG.info("Schedule Trash Cleaner finished with {} tasks", tableIds.size());
} catch (Throwable t) {
LOG.error("Schedule Trash Cleaner unexpected error", t);
}
}

@Override
Expand Down

0 comments on commit 7665174

Please sign in to comment.