From 7665174536d236ee59acff8f1ed39c96182a1381 Mon Sep 17 00:00:00 2001 From: wangtaohz <103108928+wangtaohz@users.noreply.github.com> Date: Wed, 26 Jul 2023 13:53:21 +0800 Subject: [PATCH] [ARCTIC-1737] Fix the tasks of `ScheduledExecutorService` that halts when an exception or error is thrown (#1739) add try-catch for scheduleWithFixedDelay --- .../service/impl/OptimizeExecuteService.java | 27 +++++++++++------- .../service/impl/OrphanFilesCleanService.java | 26 +++++++++-------- .../service/impl/SupportHiveSyncService.java | 28 +++++++++++-------- .../service/impl/TableExpireService.java | 26 +++++++++-------- .../service/impl/TrashCleanService.java | 26 +++++++++-------- 5 files changed, 78 insertions(+), 55 deletions(-) diff --git a/ams/ams-server/src/main/java/com/netease/arctic/ams/server/service/impl/OptimizeExecuteService.java b/ams/ams-server/src/main/java/com/netease/arctic/ams/server/service/impl/OptimizeExecuteService.java index 2c4408c19e..3be4b7bfeb 100644 --- a/ams/ams-server/src/main/java/com/netease/arctic/ams/server/service/impl/OptimizeExecuteService.java +++ b/ams/ams-server/src/main/java/com/netease/arctic/ams/server/service/impl/OptimizeExecuteService.java @@ -28,6 +28,8 @@ import com.netease.arctic.ams.server.service.ServiceContainer; import com.netease.arctic.optimizer.Optimizer; import com.netease.arctic.optimizer.factory.OptimizerFactory; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; import java.io.Closeable; import java.io.IOException; @@ -111,23 +113,28 @@ public OptimizerFactory findOptimizerFactory(String type) throws NoSuchObjectExc @Override public void close() throws IOException { - + } public static class OptimizerMonitor { + private static final Logger LOG = LoggerFactory.getLogger(OptimizerMonitor.class); private static final long OPTIMIZER_JOB_TIMEOUT = 10 * 60 * 1000; public void monitorStatus() { - long currentTime = System.currentTimeMillis(); - List optimizers = - ServiceContainer.getOptimizerService().getOptimizers(); - optimizers.forEach(optimizer -> { - if ((currentTime - optimizer.getUpdateTime().getTime()) > OPTIMIZER_JOB_TIMEOUT) { - ServiceContainer.getOptimizerService() - .updateOptimizerStatus(optimizer.getJobId(), TableTaskStatus.FAILED); - } - }); + try { + long currentTime = System.currentTimeMillis(); + List optimizers = + ServiceContainer.getOptimizerService().getOptimizers(); + optimizers.forEach(optimizer -> { + if ((currentTime - optimizer.getUpdateTime().getTime()) > OPTIMIZER_JOB_TIMEOUT) { + ServiceContainer.getOptimizerService() + .updateOptimizerStatus(optimizer.getJobId(), TableTaskStatus.FAILED); + } + }); + } catch (Throwable t) { + LOG.error("monitor optimizer status failed", t); + } } } } diff --git a/ams/ams-server/src/main/java/com/netease/arctic/ams/server/service/impl/OrphanFilesCleanService.java b/ams/ams-server/src/main/java/com/netease/arctic/ams/server/service/impl/OrphanFilesCleanService.java index a860c2840a..631687a4e5 100644 --- a/ams/ams-server/src/main/java/com/netease/arctic/ams/server/service/impl/OrphanFilesCleanService.java +++ b/ams/ams-server/src/main/java/com/netease/arctic/ams/server/service/impl/OrphanFilesCleanService.java @@ -75,18 +75,22 @@ public class OrphanFilesCleanService implements IOrphanFilesCleanService { @Override public synchronized void checkOrphanFilesCleanTasks() { - LOG.info("Schedule Orphan Cleaner"); - if (cleanTasks == null) { - cleanTasks = new ScheduledTasks<>(ThreadPool.Type.ORPHAN); - } + try { + LOG.info("Schedule Orphan Cleaner"); + if (cleanTasks == null) { + cleanTasks = new ScheduledTasks<>(ThreadPool.Type.ORPHAN); + } - Set tableIds = CatalogUtil.loadTablesFromCatalog(); - cleanTasks.checkRunningTask(tableIds, - () -> 0L, - () -> CHECK_INTERVAL, - TableOrphanFileClean::new, - true); - LOG.info("Schedule Orphan Cleaner finished with {} tasks", tableIds.size()); + Set tableIds = CatalogUtil.loadTablesFromCatalog(); + cleanTasks.checkRunningTask(tableIds, + () -> 0L, + () -> CHECK_INTERVAL, + TableOrphanFileClean::new, + true); + LOG.info("Schedule Orphan Cleaner finished with {} tasks", tableIds.size()); + } catch (Throwable t) { + LOG.error("unexpected error when checkOrphanFilesCleanTasks", t); + } } @Override diff --git a/ams/ams-server/src/main/java/com/netease/arctic/ams/server/service/impl/SupportHiveSyncService.java b/ams/ams-server/src/main/java/com/netease/arctic/ams/server/service/impl/SupportHiveSyncService.java index af1dd85b99..0aae84d881 100644 --- a/ams/ams-server/src/main/java/com/netease/arctic/ams/server/service/impl/SupportHiveSyncService.java +++ b/ams/ams-server/src/main/java/com/netease/arctic/ams/server/service/impl/SupportHiveSyncService.java @@ -65,19 +65,23 @@ public class SupportHiveSyncService implements ISupportHiveSyncService { @Override public void checkHiveSyncTasks() { - LOG.info("Schedule Support Hive Sync"); - if (syncTasks == null) { - syncTasks = new ScheduledTasks<>(ThreadPool.Type.HIVE_SYNC); + try { + LOG.info("Schedule Support Hive Sync"); + if (syncTasks == null) { + syncTasks = new ScheduledTasks<>(ThreadPool.Type.HIVE_SYNC); + } + List tables = ServiceContainer.getMetaService().listTables(); + Set ids = + tables.stream().map(TableMetadata::getTableIdentifier).collect(Collectors.toSet()); + syncTasks.checkRunningTask(ids, + () -> 0L, + () -> SYNC_INTERVAL, + SupportHiveSyncService.SupportHiveSyncTask::new, + false); + LOG.info("Schedule Support Hive Sync finished with {} valid ids", ids.size()); + } catch (Throwable t) { + LOG.error("Schedule Support Hive Sync failed", t); } - List tables = ServiceContainer.getMetaService().listTables(); - Set ids = - tables.stream().map(TableMetadata::getTableIdentifier).collect(Collectors.toSet()); - syncTasks.checkRunningTask(ids, - () -> 0L, - () -> SYNC_INTERVAL, - SupportHiveSyncService.SupportHiveSyncTask::new, - false); - LOG.info("Schedule Support Hive Sync finished with {} valid ids", ids.size()); } @Override diff --git a/ams/ams-server/src/main/java/com/netease/arctic/ams/server/service/impl/TableExpireService.java b/ams/ams-server/src/main/java/com/netease/arctic/ams/server/service/impl/TableExpireService.java index 586b4b1083..b22a5fdde4 100644 --- a/ams/ams-server/src/main/java/com/netease/arctic/ams/server/service/impl/TableExpireService.java +++ b/ams/ams-server/src/main/java/com/netease/arctic/ams/server/service/impl/TableExpireService.java @@ -75,18 +75,22 @@ public class TableExpireService implements ITableExpireService { @Override public synchronized void checkTableExpireTasks() { - LOG.info("Schedule Expired Cleaner"); - if (cleanTasks == null) { - cleanTasks = new ScheduledTasks<>(ThreadPool.Type.EXPIRE); - } + try { + LOG.info("Schedule Expired Cleaner"); + if (cleanTasks == null) { + cleanTasks = new ScheduledTasks<>(ThreadPool.Type.EXPIRE); + } - Set tableIds = CatalogUtil.loadTablesFromCatalog(); - cleanTasks.checkRunningTask(tableIds, - () -> 0L, - () -> EXPIRE_INTERVAL, - TableExpireTask::new, - false); - LOG.info("Schedule Expired Cleaner finished with {} valid ids", tableIds.size()); + Set tableIds = CatalogUtil.loadTablesFromCatalog(); + cleanTasks.checkRunningTask(tableIds, + () -> 0L, + () -> EXPIRE_INTERVAL, + TableExpireTask::new, + false); + LOG.info("Schedule Expired Cleaner finished with {} valid ids", tableIds.size()); + } catch (Throwable t) { + LOG.error("unexpected error when checkTableExpireTasks", t); + } } @Override diff --git a/ams/ams-server/src/main/java/com/netease/arctic/ams/server/service/impl/TrashCleanService.java b/ams/ams-server/src/main/java/com/netease/arctic/ams/server/service/impl/TrashCleanService.java index a3956ff5d4..e9a722732e 100644 --- a/ams/ams-server/src/main/java/com/netease/arctic/ams/server/service/impl/TrashCleanService.java +++ b/ams/ams-server/src/main/java/com/netease/arctic/ams/server/service/impl/TrashCleanService.java @@ -50,18 +50,22 @@ public class TrashCleanService implements Closeable { private ScheduledTasks cleanTasks; public synchronized void checkTrashCleanTasks() { - LOG.info("Schedule Trash Cleaner"); - if (cleanTasks == null) { - cleanTasks = new ScheduledTasks<>(ThreadPool.Type.TRASH_CLEAN); - } + try { + LOG.info("Schedule Trash Cleaner"); + if (cleanTasks == null) { + cleanTasks = new ScheduledTasks<>(ThreadPool.Type.TRASH_CLEAN); + } - Set tableIds = CatalogUtil.loadTablesFromCatalog(); - cleanTasks.checkRunningTask(tableIds, - () -> RandomUtils.nextLong(0, CHECK_INTERVAL), - () -> CHECK_INTERVAL, - TableTrashCleanTask::new, - true); - LOG.info("Schedule Trash Cleaner finished with {} tasks", tableIds.size()); + Set tableIds = CatalogUtil.loadTablesFromCatalog(); + cleanTasks.checkRunningTask(tableIds, + () -> RandomUtils.nextLong(0, CHECK_INTERVAL), + () -> CHECK_INTERVAL, + TableTrashCleanTask::new, + true); + LOG.info("Schedule Trash Cleaner finished with {} tasks", tableIds.size()); + } catch (Throwable t) { + LOG.error("Schedule Trash Cleaner unexpected error", t); + } } @Override