From f2533495a1bed693ff1ba145d835cfa51cef27db Mon Sep 17 00:00:00 2001 From: wangtao Date: Tue, 25 Jul 2023 11:37:47 +0800 Subject: [PATCH] cherry-pick to 0.3.x add try-catch for scheduleWithFixedDelay --- .../ams/server/optimize/OptimizeService.java | 12 ++-- .../service/impl/OptimizeExecuteService.java | 27 +++++--- .../service/impl/OrphanFilesCleanService.java | 26 ++++---- .../impl/RuntimeDataExpireService.java | 66 ++++++++++--------- .../service/impl/SupportHiveSyncService.java | 28 ++++---- .../service/impl/TableExpireService.java | 28 ++++---- .../service/impl/TrashCleanService.java | 30 +++++---- 7 files changed, 124 insertions(+), 93 deletions(-) diff --git a/ams/ams-server/src/main/java/com/netease/arctic/ams/server/optimize/OptimizeService.java b/ams/ams-server/src/main/java/com/netease/arctic/ams/server/optimize/OptimizeService.java index 425279f7c9..e8714d66f5 100644 --- a/ams/ams-server/src/main/java/com/netease/arctic/ams/server/optimize/OptimizeService.java +++ b/ams/ams-server/src/main/java/com/netease/arctic/ams/server/optimize/OptimizeService.java @@ -112,11 +112,15 @@ private void init() { @Override public synchronized void checkOptimizeCheckTasks(long checkInterval) { - this.optimizeStatusCheckInterval = checkInterval; - if (checkTasks == null) { - checkTasks = new ScheduledTasks<>(ThreadPool.Type.OPTIMIZE_CHECK); + try { + this.optimizeStatusCheckInterval = checkInterval; + if (checkTasks == null) { + checkTasks = new ScheduledTasks<>(ThreadPool.Type.OPTIMIZE_CHECK); + } + internalCheckOptimizeChecker(checkInterval); + } catch (Throwable t) { + LOG.error("Failed to check optimize check tasks", t); } - internalCheckOptimizeChecker(checkInterval); } private void internalCheckOptimizeChecker(long checkInterval) { diff --git a/ams/ams-server/src/main/java/com/netease/arctic/ams/server/service/impl/OptimizeExecuteService.java b/ams/ams-server/src/main/java/com/netease/arctic/ams/server/service/impl/OptimizeExecuteService.java index 2c4408c19e..3be4b7bfeb 100644 --- a/ams/ams-server/src/main/java/com/netease/arctic/ams/server/service/impl/OptimizeExecuteService.java +++ b/ams/ams-server/src/main/java/com/netease/arctic/ams/server/service/impl/OptimizeExecuteService.java @@ -28,6 +28,8 @@ import com.netease.arctic.ams.server.service.ServiceContainer; import com.netease.arctic.optimizer.Optimizer; import com.netease.arctic.optimizer.factory.OptimizerFactory; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; import java.io.Closeable; import java.io.IOException; @@ -111,23 +113,28 @@ public OptimizerFactory findOptimizerFactory(String type) throws NoSuchObjectExc @Override public void close() throws IOException { - + } public static class OptimizerMonitor { + private static final Logger LOG = LoggerFactory.getLogger(OptimizerMonitor.class); private static final long OPTIMIZER_JOB_TIMEOUT = 10 * 60 * 1000; public void monitorStatus() { - long currentTime = System.currentTimeMillis(); - List optimizers = - ServiceContainer.getOptimizerService().getOptimizers(); - optimizers.forEach(optimizer -> { - if ((currentTime - optimizer.getUpdateTime().getTime()) > OPTIMIZER_JOB_TIMEOUT) { - ServiceContainer.getOptimizerService() - .updateOptimizerStatus(optimizer.getJobId(), TableTaskStatus.FAILED); - } - }); + try { + long currentTime = System.currentTimeMillis(); + List optimizers = + ServiceContainer.getOptimizerService().getOptimizers(); + optimizers.forEach(optimizer -> { + if ((currentTime - optimizer.getUpdateTime().getTime()) > OPTIMIZER_JOB_TIMEOUT) { + ServiceContainer.getOptimizerService() + .updateOptimizerStatus(optimizer.getJobId(), TableTaskStatus.FAILED); + } + }); + } catch (Throwable t) { + LOG.error("monitor optimizer status failed", t); + } } } } diff --git a/ams/ams-server/src/main/java/com/netease/arctic/ams/server/service/impl/OrphanFilesCleanService.java b/ams/ams-server/src/main/java/com/netease/arctic/ams/server/service/impl/OrphanFilesCleanService.java index 6c8142857d..9875b7397a 100644 --- a/ams/ams-server/src/main/java/com/netease/arctic/ams/server/service/impl/OrphanFilesCleanService.java +++ b/ams/ams-server/src/main/java/com/netease/arctic/ams/server/service/impl/OrphanFilesCleanService.java @@ -69,18 +69,22 @@ public class OrphanFilesCleanService implements IOrphanFilesCleanService { @Override public synchronized void checkOrphanFilesCleanTasks() { - LOG.info("Schedule Orphan Cleaner"); - if (cleanTasks == null) { - cleanTasks = new ScheduledTasks<>(ThreadPool.Type.ORPHAN); + try { + LOG.info("Schedule Orphan Cleaner"); + if (cleanTasks == null) { + cleanTasks = new ScheduledTasks<>(ThreadPool.Type.ORPHAN); + } + List tables = ServiceContainer.getMetaService().listTables(); + Set ids = tables.stream().map(TableMetadata::getTableIdentifier).collect(Collectors.toSet()); + cleanTasks.checkRunningTask(ids, + () -> 0L, + () -> CHECK_INTERVAL, + TableOrphanFileClean::new, + true); + LOG.info("Schedule Orphan Cleaner finished with {} tasks", ids.size()); + } catch (Throwable t) { + LOG.error("unexpected error when checkOrphanFilesCleanTasks", t); } - List tables = ServiceContainer.getMetaService().listTables(); - Set ids = tables.stream().map(TableMetadata::getTableIdentifier).collect(Collectors.toSet()); - cleanTasks.checkRunningTask(ids, - () -> 0L, - () -> CHECK_INTERVAL, - TableOrphanFileClean::new, - true); - LOG.info("Schedule Orphan Cleaner finished with {} tasks", ids.size()); } @Override diff --git a/ams/ams-server/src/main/java/com/netease/arctic/ams/server/service/impl/RuntimeDataExpireService.java b/ams/ams-server/src/main/java/com/netease/arctic/ams/server/service/impl/RuntimeDataExpireService.java index 1d12861605..6b6e34c62e 100644 --- a/ams/ams-server/src/main/java/com/netease/arctic/ams/server/service/impl/RuntimeDataExpireService.java +++ b/ams/ams-server/src/main/java/com/netease/arctic/ams/server/service/impl/RuntimeDataExpireService.java @@ -56,39 +56,43 @@ public RuntimeDataExpireService() { } public void doExpire() { - List tableMetadata = metaService.listTables(); - // expire and clear transaction table - tableMetadata.forEach(meta -> { - TableIdentifier identifier = meta.getTableIdentifier(); - transactionService.expire( - identifier.buildTableIdentifier(), - System.currentTimeMillis() - this.txDataExpireInterval); - }); + try { + List tableMetadata = metaService.listTables(); + // expire and clear transaction table + tableMetadata.forEach(meta -> { + TableIdentifier identifier = meta.getTableIdentifier(); + transactionService.expire( + identifier.buildTableIdentifier(), + System.currentTimeMillis() - this.txDataExpireInterval); + }); - // expire and clear table_task_history table - tableMetadata.forEach(meta -> { - TableIdentifier identifier = meta.getTableIdentifier(); - try { - TableOptimizeRuntime tableOptimizeRuntime = - optimizeService.getTableOptimizeItem(identifier).getTableOptimizeRuntime(); - tableTaskHistoryService.expireTaskHistory(identifier, - tableOptimizeRuntime.getLatestTaskPlanGroup(), - System.currentTimeMillis() - this.taskHistoryDataExpireInterval); - } catch (Exception e) { - LOG.error("failed to expire and clear table_task_history table", e); - } - }); + // expire and clear table_task_history table + tableMetadata.forEach(meta -> { + TableIdentifier identifier = meta.getTableIdentifier(); + try { + TableOptimizeRuntime tableOptimizeRuntime = + optimizeService.getTableOptimizeItem(identifier).getTableOptimizeRuntime(); + tableTaskHistoryService.expireTaskHistory(identifier, + tableOptimizeRuntime.getLatestTaskPlanGroup(), + System.currentTimeMillis() - this.taskHistoryDataExpireInterval); + } catch (Exception e) { + LOG.error("failed to expire and clear table_task_history table", e); + } + }); - // expire and clear optimize_history table - tableMetadata.forEach(meta -> { - TableIdentifier identifier = meta.getTableIdentifier(); - try { - optimizeService.expireOptimizeHistory(identifier, - System.currentTimeMillis() - this.optimizeHistoryDataExpireInterval); - } catch (Exception e) { - LOG.error("failed to expire and clear optimize_history table", e); - } - }); + // expire and clear optimize_history table + tableMetadata.forEach(meta -> { + TableIdentifier identifier = meta.getTableIdentifier(); + try { + optimizeService.expireOptimizeHistory(identifier, + System.currentTimeMillis() - this.optimizeHistoryDataExpireInterval); + } catch (Exception e) { + LOG.error("failed to expire and clear optimize_history table", e); + } + }); + } catch (Throwable t) { + LOG.error("failed to expire and clear runtime data", t); + } } @Override diff --git a/ams/ams-server/src/main/java/com/netease/arctic/ams/server/service/impl/SupportHiveSyncService.java b/ams/ams-server/src/main/java/com/netease/arctic/ams/server/service/impl/SupportHiveSyncService.java index d8ed8b35a9..b7d3ea2f35 100644 --- a/ams/ams-server/src/main/java/com/netease/arctic/ams/server/service/impl/SupportHiveSyncService.java +++ b/ams/ams-server/src/main/java/com/netease/arctic/ams/server/service/impl/SupportHiveSyncService.java @@ -64,19 +64,23 @@ public class SupportHiveSyncService implements ISupportHiveSyncService { @Override public void checkHiveSyncTasks() { - LOG.info("Schedule Support Hive Sync"); - if (syncTasks == null) { - syncTasks = new ScheduledTasks<>(ThreadPool.Type.HIVE_SYNC); + try { + LOG.info("Schedule Support Hive Sync"); + if (syncTasks == null) { + syncTasks = new ScheduledTasks<>(ThreadPool.Type.HIVE_SYNC); + } + List tables = ServiceContainer.getMetaService().listTables(); + Set ids = + tables.stream().map(TableMetadata::getTableIdentifier).collect(Collectors.toSet()); + syncTasks.checkRunningTask(ids, + () -> 0L, + () -> SYNC_INTERVAL, + SupportHiveSyncService.SupportHiveSyncTask::new, + false); + LOG.info("Schedule Support Hive Sync finished with {} valid ids", ids.size()); + } catch (Throwable t) { + LOG.error("Schedule Support Hive Sync failed", t); } - List tables = ServiceContainer.getMetaService().listTables(); - Set ids = - tables.stream().map(TableMetadata::getTableIdentifier).collect(Collectors.toSet()); - syncTasks.checkRunningTask(ids, - () -> 0L, - () -> SYNC_INTERVAL, - SupportHiveSyncService.SupportHiveSyncTask::new, - false); - LOG.info("Schedule Support Hive Sync finished with {} valid ids", ids.size()); } @Override diff --git a/ams/ams-server/src/main/java/com/netease/arctic/ams/server/service/impl/TableExpireService.java b/ams/ams-server/src/main/java/com/netease/arctic/ams/server/service/impl/TableExpireService.java index f17d5c8f1c..c965b6fb99 100644 --- a/ams/ams-server/src/main/java/com/netease/arctic/ams/server/service/impl/TableExpireService.java +++ b/ams/ams-server/src/main/java/com/netease/arctic/ams/server/service/impl/TableExpireService.java @@ -74,19 +74,23 @@ public class TableExpireService implements ITableExpireService { @Override public synchronized void checkTableExpireTasks() { - LOG.info("Schedule Expired Cleaner"); - if (cleanTasks == null) { - cleanTasks = new ScheduledTasks<>(ThreadPool.Type.EXPIRE); + try { + LOG.info("Schedule Expired Cleaner"); + if (cleanTasks == null) { + cleanTasks = new ScheduledTasks<>(ThreadPool.Type.EXPIRE); + } + List tables = ServiceContainer.getMetaService().listTables(); + Set ids = + tables.stream().map(TableMetadata::getTableIdentifier).collect(Collectors.toSet()); + cleanTasks.checkRunningTask(ids, + () -> 0L, + () -> EXPIRE_INTERVAL, + TableExpireTask::new, + false); + LOG.info("Schedule Expired Cleaner finished with {} valid ids", ids.size()); + } catch (Throwable t) { + LOG.error("unexpected error when checkTableExpireTasks", t); } - List tables = ServiceContainer.getMetaService().listTables(); - Set ids = - tables.stream().map(TableMetadata::getTableIdentifier).collect(Collectors.toSet()); - cleanTasks.checkRunningTask(ids, - () -> 0L, - () -> EXPIRE_INTERVAL, - TableExpireTask::new, - false); - LOG.info("Schedule Expired Cleaner finished with {} valid ids", ids.size()); } @Override diff --git a/ams/ams-server/src/main/java/com/netease/arctic/ams/server/service/impl/TrashCleanService.java b/ams/ams-server/src/main/java/com/netease/arctic/ams/server/service/impl/TrashCleanService.java index 2dff07a9e1..f64022d91d 100644 --- a/ams/ams-server/src/main/java/com/netease/arctic/ams/server/service/impl/TrashCleanService.java +++ b/ams/ams-server/src/main/java/com/netease/arctic/ams/server/service/impl/TrashCleanService.java @@ -53,20 +53,24 @@ public class TrashCleanService implements Closeable { private ScheduledTasks cleanTasks; public synchronized void checkTrashCleanTasks() { - LOG.info("Schedule Trash Cleaner"); - if (cleanTasks == null) { - cleanTasks = new ScheduledTasks<>(ThreadPool.Type.TRASH_CLEAN); - } + try { + LOG.info("Schedule Trash Cleaner"); + if (cleanTasks == null) { + cleanTasks = new ScheduledTasks<>(ThreadPool.Type.TRASH_CLEAN); + } - List tables = ServiceContainer.getMetaService().listTables(); - Set ids = - tables.stream().map(TableMetadata::getTableIdentifier).collect(Collectors.toSet()); - cleanTasks.checkRunningTask(ids, - () -> RandomUtils.nextLong(0, CHECK_INTERVAL), - () -> CHECK_INTERVAL, - TableTrashCleanTask::new, - true); - LOG.info("Schedule Trash Cleaner finished with {} tasks", ids.size()); + List tables = ServiceContainer.getMetaService().listTables(); + Set ids = + tables.stream().map(TableMetadata::getTableIdentifier).collect(Collectors.toSet()); + cleanTasks.checkRunningTask(ids, + () -> RandomUtils.nextLong(0, CHECK_INTERVAL), + () -> CHECK_INTERVAL, + TableTrashCleanTask::new, + true); + LOG.info("Schedule Trash Cleaner finished with {} tasks", ids.size()); + } catch (Throwable t) { + LOG.error("Schedule Trash Cleaner unexpected error", t); + } } @Override