From 490c62edc630e4dac328febfcba55d85a38f8fff Mon Sep 17 00:00:00 2001 From: nicochen Date: Thu, 21 Sep 2023 15:08:01 +0800 Subject: [PATCH] table expire get current snap from sql system --- .../mapper/TableOptimizeRuntimeMapper.java | 23 ++++++++++++++++++ .../server/optimize/BasicOptimizeCommit.java | 1 - .../service/ITableTaskHistoryService.java | 3 +++ .../service/impl/TableExpireService.java | 24 +++++++++---------- .../service/impl/TableTaskHistoryService.java | 11 +++++++++ 5 files changed, 48 insertions(+), 14 deletions(-) diff --git a/ams/ams-server/src/main/java/com/netease/arctic/ams/server/mapper/TableOptimizeRuntimeMapper.java b/ams/ams-server/src/main/java/com/netease/arctic/ams/server/mapper/TableOptimizeRuntimeMapper.java index fdde451e36..a24c5384cb 100644 --- a/ams/ams-server/src/main/java/com/netease/arctic/ams/server/mapper/TableOptimizeRuntimeMapper.java +++ b/ams/ams-server/src/main/java/com/netease/arctic/ams/server/mapper/TableOptimizeRuntimeMapper.java @@ -62,6 +62,29 @@ public interface TableOptimizeRuntimeMapper { }) List selectTableOptimizeRuntimes(); + @Select("select catalog_name, db_name, table_name, current_snapshot_id, current_change_snapshotId," + + " latest_major_optimize_time, latest_full_optimize_time, latest_minor_optimize_time, latest_task_plan_group," + + " optimize_status, optimize_status_start_time " + + " from " + TABLE_NAME + " where " + + "catalog_name = #{tableIdentifier.catalog} and " + + "db_name = #{tableIdentifier.database} and " + + "table_name = #{tableIdentifier.tableName}") + @Results({ + @Result(property = "currentSnapshotId", column = "current_snapshot_id"), + @Result(property = "currentChangeSnapshotId", column = "current_change_snapshotId"), + @Result(property = "latestMajorOptimizeTime", column = "latest_major_optimize_time", + typeHandler = MapLong2StringConverter.class), + @Result(property = "latestFullOptimizeTime", column = "latest_full_optimize_time", + typeHandler = MapLong2StringConverter.class), + @Result(property = "latestMinorOptimizeTime", column = "latest_minor_optimize_time", + typeHandler = MapLong2StringConverter.class), + @Result(property = "latestTaskPlanGroup", column = "latest_task_plan_group"), + @Result(property = "optimizeStatus", column = "optimize_status"), + @Result(property = "optimizeStatusStartTime", column = "optimize_status_start_time", + typeHandler = Long2TsConvertor.class) + }) + TableOptimizeRuntime selectTableOptimizeRuntime(@Param("tableIdentifier") TableIdentifier tableIdentifier); + @Update("update " + TABLE_NAME + " set " + "current_snapshot_id = #{runtime.currentSnapshotId}, " + "current_change_snapshotId = #{runtime.currentChangeSnapshotId}, " + diff --git a/ams/ams-server/src/main/java/com/netease/arctic/ams/server/optimize/BasicOptimizeCommit.java b/ams/ams-server/src/main/java/com/netease/arctic/ams/server/optimize/BasicOptimizeCommit.java index d675f36864..132c345fb0 100644 --- a/ams/ams-server/src/main/java/com/netease/arctic/ams/server/optimize/BasicOptimizeCommit.java +++ b/ams/ams-server/src/main/java/com/netease/arctic/ams/server/optimize/BasicOptimizeCommit.java @@ -19,7 +19,6 @@ package com.netease.arctic.ams.server.optimize; import com.netease.arctic.ams.api.CommitMetaProducer; -import com.netease.arctic.ams.api.ErrorMessage; import com.netease.arctic.ams.api.OptimizeType; import com.netease.arctic.ams.server.model.BasicOptimizeTask; import com.netease.arctic.ams.server.model.OptimizeCommitFailedHistory; diff --git a/ams/ams-server/src/main/java/com/netease/arctic/ams/server/service/ITableTaskHistoryService.java b/ams/ams-server/src/main/java/com/netease/arctic/ams/server/service/ITableTaskHistoryService.java index d96bd146bf..c33f8cb59d 100644 --- a/ams/ams-server/src/main/java/com/netease/arctic/ams/server/service/ITableTaskHistoryService.java +++ b/ams/ams-server/src/main/java/com/netease/arctic/ams/server/service/ITableTaskHistoryService.java @@ -18,6 +18,7 @@ package com.netease.arctic.ams.server.service; +import com.netease.arctic.ams.server.model.TableOptimizeRuntime; import com.netease.arctic.ams.server.model.TableTaskHistory; import com.netease.arctic.table.TableIdentifier; @@ -38,4 +39,6 @@ List selectTaskHistoryByTableIdAndTime(TableIdentifier identif void deleteTaskHistoryWithPlanGroup(TableIdentifier identifier, String taskPlanGroup); void expireTaskHistory(TableIdentifier identifier, String latestTaskHistoryId, long expireTime); + + TableOptimizeRuntime selectTableOptimizeRuntime(TableIdentifier identifier); } diff --git a/ams/ams-server/src/main/java/com/netease/arctic/ams/server/service/impl/TableExpireService.java b/ams/ams-server/src/main/java/com/netease/arctic/ams/server/service/impl/TableExpireService.java index b22a5fdde4..3d2317c5ab 100644 --- a/ams/ams-server/src/main/java/com/netease/arctic/ams/server/service/impl/TableExpireService.java +++ b/ams/ams-server/src/main/java/com/netease/arctic/ams/server/service/impl/TableExpireService.java @@ -19,8 +19,7 @@ package com.netease.arctic.ams.server.service.impl; import com.netease.arctic.ams.api.DataFileInfo; -import com.netease.arctic.ams.api.NoSuchObjectException; -import com.netease.arctic.ams.server.optimize.TableOptimizeItem; +import com.netease.arctic.ams.server.model.TableOptimizeRuntime; import com.netease.arctic.ams.server.service.ITableExpireService; import com.netease.arctic.ams.server.service.ServiceContainer; import com.netease.arctic.ams.server.utils.CatalogUtil; @@ -226,20 +225,19 @@ public static long fetchLatestFlinkCommittedSnapshotTime(UnkeyedTable table) { * @return commit time of snapshot for optimizing */ public static long fetchOptimizingSnapshotTime(UnkeyedTable table) { - try { - TableOptimizeItem tableOptimizeItem = ServiceContainer.getOptimizeService().getTableOptimizeItem(table.id()); - if (!tableOptimizeItem.getOptimizeTasks().isEmpty()) { - long currentSnapshotId = tableOptimizeItem.getTableOptimizeRuntime().getCurrentSnapshotId(); - for (Snapshot snapshot : table.snapshots()) { - if (snapshot.snapshotId() == currentSnapshotId) { - return snapshot.timestampMillis(); - } + TableOptimizeRuntime tableOptimizeRuntime = ServiceContainer.getTableTaskHistoryService() + .selectTableOptimizeRuntime(table.id()); + if (tableOptimizeRuntime != null && + tableOptimizeRuntime.getCurrentSnapshotId() != -1 && + tableOptimizeRuntime.getCurrentSnapshotId() != 0) { + long currentSnapshotId = tableOptimizeRuntime.getCurrentSnapshotId(); + for (Snapshot snapshot : table.snapshots()) { + if (snapshot.snapshotId() == currentSnapshotId) { + return snapshot.timestampMillis(); } } - return Long.MAX_VALUE; - } catch (NoSuchObjectException e) { - return Long.MAX_VALUE; } + return Long.MAX_VALUE; } public static long min(long a, long b, long c) { diff --git a/ams/ams-server/src/main/java/com/netease/arctic/ams/server/service/impl/TableTaskHistoryService.java b/ams/ams-server/src/main/java/com/netease/arctic/ams/server/service/impl/TableTaskHistoryService.java index 07696266f2..48ece83339 100644 --- a/ams/ams-server/src/main/java/com/netease/arctic/ams/server/service/impl/TableTaskHistoryService.java +++ b/ams/ams-server/src/main/java/com/netease/arctic/ams/server/service/impl/TableTaskHistoryService.java @@ -18,7 +18,9 @@ package com.netease.arctic.ams.server.service.impl; +import com.netease.arctic.ams.server.mapper.TableOptimizeRuntimeMapper; import com.netease.arctic.ams.server.mapper.TaskHistoryMapper; +import com.netease.arctic.ams.server.model.TableOptimizeRuntime; import com.netease.arctic.ams.server.model.TableTaskHistory; import com.netease.arctic.ams.server.service.IJDBCService; import com.netease.arctic.ams.server.service.ITableTaskHistoryService; @@ -102,6 +104,15 @@ public void expireTaskHistory(TableIdentifier identifier, String latestTaskHisto } } + @Override + public TableOptimizeRuntime selectTableOptimizeRuntime(TableIdentifier tableIdentifier) { + try (SqlSession sqlSession = getSqlSession(true)) { + TableOptimizeRuntimeMapper tableOptimizeRuntimeMapper = + getMapper(sqlSession, TableOptimizeRuntimeMapper.class); + return tableOptimizeRuntimeMapper.selectTableOptimizeRuntime(tableIdentifier); + } + } + @Override public void close() throws IOException {