Skip to content

Commit

Permalink
table expire get current snap from sql system
Browse files Browse the repository at this point in the history
  • Loading branch information
nicochen committed Sep 21, 2023
1 parent cd02f32 commit 490c62e
Show file tree
Hide file tree
Showing 5 changed files with 48 additions and 14 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -62,6 +62,29 @@ public interface TableOptimizeRuntimeMapper {
})
List<TableOptimizeRuntime> selectTableOptimizeRuntimes();

@Select("select catalog_name, db_name, table_name, current_snapshot_id, current_change_snapshotId," +
" latest_major_optimize_time, latest_full_optimize_time, latest_minor_optimize_time, latest_task_plan_group," +
" optimize_status, optimize_status_start_time " +
" from " + TABLE_NAME + " where " +
"catalog_name = #{tableIdentifier.catalog} and " +
"db_name = #{tableIdentifier.database} and " +
"table_name = #{tableIdentifier.tableName}")
@Results({
@Result(property = "currentSnapshotId", column = "current_snapshot_id"),
@Result(property = "currentChangeSnapshotId", column = "current_change_snapshotId"),
@Result(property = "latestMajorOptimizeTime", column = "latest_major_optimize_time",
typeHandler = MapLong2StringConverter.class),
@Result(property = "latestFullOptimizeTime", column = "latest_full_optimize_time",
typeHandler = MapLong2StringConverter.class),
@Result(property = "latestMinorOptimizeTime", column = "latest_minor_optimize_time",
typeHandler = MapLong2StringConverter.class),
@Result(property = "latestTaskPlanGroup", column = "latest_task_plan_group"),
@Result(property = "optimizeStatus", column = "optimize_status"),
@Result(property = "optimizeStatusStartTime", column = "optimize_status_start_time",
typeHandler = Long2TsConvertor.class)
})
TableOptimizeRuntime selectTableOptimizeRuntime(@Param("tableIdentifier") TableIdentifier tableIdentifier);

@Update("update " + TABLE_NAME + " set " +
"current_snapshot_id = #{runtime.currentSnapshotId}, " +
"current_change_snapshotId = #{runtime.currentChangeSnapshotId}, " +
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -19,7 +19,6 @@
package com.netease.arctic.ams.server.optimize;

import com.netease.arctic.ams.api.CommitMetaProducer;
import com.netease.arctic.ams.api.ErrorMessage;
import com.netease.arctic.ams.api.OptimizeType;
import com.netease.arctic.ams.server.model.BasicOptimizeTask;
import com.netease.arctic.ams.server.model.OptimizeCommitFailedHistory;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -18,6 +18,7 @@

package com.netease.arctic.ams.server.service;

import com.netease.arctic.ams.server.model.TableOptimizeRuntime;
import com.netease.arctic.ams.server.model.TableTaskHistory;
import com.netease.arctic.table.TableIdentifier;

Expand All @@ -38,4 +39,6 @@ List<TableTaskHistory> selectTaskHistoryByTableIdAndTime(TableIdentifier identif
void deleteTaskHistoryWithPlanGroup(TableIdentifier identifier, String taskPlanGroup);

void expireTaskHistory(TableIdentifier identifier, String latestTaskHistoryId, long expireTime);

TableOptimizeRuntime selectTableOptimizeRuntime(TableIdentifier identifier);
}
Original file line number Diff line number Diff line change
Expand Up @@ -19,8 +19,7 @@
package com.netease.arctic.ams.server.service.impl;

import com.netease.arctic.ams.api.DataFileInfo;
import com.netease.arctic.ams.api.NoSuchObjectException;
import com.netease.arctic.ams.server.optimize.TableOptimizeItem;
import com.netease.arctic.ams.server.model.TableOptimizeRuntime;
import com.netease.arctic.ams.server.service.ITableExpireService;
import com.netease.arctic.ams.server.service.ServiceContainer;
import com.netease.arctic.ams.server.utils.CatalogUtil;
Expand Down Expand Up @@ -226,20 +225,19 @@ public static long fetchLatestFlinkCommittedSnapshotTime(UnkeyedTable table) {
* @return commit time of snapshot for optimizing
*/
public static long fetchOptimizingSnapshotTime(UnkeyedTable table) {
try {
TableOptimizeItem tableOptimizeItem = ServiceContainer.getOptimizeService().getTableOptimizeItem(table.id());
if (!tableOptimizeItem.getOptimizeTasks().isEmpty()) {
long currentSnapshotId = tableOptimizeItem.getTableOptimizeRuntime().getCurrentSnapshotId();
for (Snapshot snapshot : table.snapshots()) {
if (snapshot.snapshotId() == currentSnapshotId) {
return snapshot.timestampMillis();
}
TableOptimizeRuntime tableOptimizeRuntime = ServiceContainer.getTableTaskHistoryService()
.selectTableOptimizeRuntime(table.id());
if (tableOptimizeRuntime != null &&
tableOptimizeRuntime.getCurrentSnapshotId() != -1 &&
tableOptimizeRuntime.getCurrentSnapshotId() != 0) {
long currentSnapshotId = tableOptimizeRuntime.getCurrentSnapshotId();
for (Snapshot snapshot : table.snapshots()) {
if (snapshot.snapshotId() == currentSnapshotId) {
return snapshot.timestampMillis();
}
}
return Long.MAX_VALUE;
} catch (NoSuchObjectException e) {
return Long.MAX_VALUE;
}
return Long.MAX_VALUE;
}

public static long min(long a, long b, long c) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -18,7 +18,9 @@

package com.netease.arctic.ams.server.service.impl;

import com.netease.arctic.ams.server.mapper.TableOptimizeRuntimeMapper;
import com.netease.arctic.ams.server.mapper.TaskHistoryMapper;
import com.netease.arctic.ams.server.model.TableOptimizeRuntime;
import com.netease.arctic.ams.server.model.TableTaskHistory;
import com.netease.arctic.ams.server.service.IJDBCService;
import com.netease.arctic.ams.server.service.ITableTaskHistoryService;
Expand Down Expand Up @@ -102,6 +104,15 @@ public void expireTaskHistory(TableIdentifier identifier, String latestTaskHisto
}
}

@Override
public TableOptimizeRuntime selectTableOptimizeRuntime(TableIdentifier tableIdentifier) {
try (SqlSession sqlSession = getSqlSession(true)) {
TableOptimizeRuntimeMapper tableOptimizeRuntimeMapper =
getMapper(sqlSession, TableOptimizeRuntimeMapper.class);
return tableOptimizeRuntimeMapper.selectTableOptimizeRuntime(tableIdentifier);
}
}

@Override
public void close() throws IOException {

Expand Down

0 comments on commit 490c62e

Please sign in to comment.