diff --git a/amoro-ams/pom.xml b/amoro-ams/pom.xml
index 1db3a8f81a..e731558fd2 100644
--- a/amoro-ams/pom.xml
+++ b/amoro-ams/pom.xml
@@ -394,6 +394,12 @@
+
+ com.github.pagehelper
+ pagehelper
+ ${pagehelper.version}
+
+
org.apache.iceberg
iceberg-data
diff --git a/amoro-ams/src/main/java/org/apache/amoro/server/dashboard/MixedAndIcebergTableDescriptor.java b/amoro-ams/src/main/java/org/apache/amoro/server/dashboard/MixedAndIcebergTableDescriptor.java
index 6d499259a6..792a9170d9 100644
--- a/amoro-ams/src/main/java/org/apache/amoro/server/dashboard/MixedAndIcebergTableDescriptor.java
+++ b/amoro-ams/src/main/java/org/apache/amoro/server/dashboard/MixedAndIcebergTableDescriptor.java
@@ -18,6 +18,9 @@
package org.apache.amoro.server.dashboard;
+import com.github.pagehelper.Page;
+import com.github.pagehelper.PageHelper;
+import com.github.pagehelper.PageInfo;
import org.apache.amoro.AmoroTable;
import org.apache.amoro.TableFormat;
import org.apache.amoro.api.CommitMetaProducer;
@@ -65,7 +68,6 @@
import org.apache.amoro.utils.MixedDataFiles;
import org.apache.amoro.utils.MixedTableUtil;
import org.apache.commons.collections.CollectionUtils;
-import org.apache.commons.lang3.StringUtils;
import org.apache.commons.lang3.tuple.Pair;
import org.apache.iceberg.ContentFile;
import org.apache.iceberg.HasTableOperations;
@@ -505,29 +507,34 @@ public List getTableConsumerInfos(AmoroTable> amoroTable) {
public Pair, Integer> getOptimizingProcessesInfo(
AmoroTable> amoroTable, String type, ProcessStatus status, int limit, int offset) {
TableIdentifier tableIdentifier = amoroTable.id();
- List processMetaList =
- getAs(
- OptimizingMapper.class,
- mapper ->
- mapper.selectOptimizingProcesses(
- tableIdentifier.getCatalog(),
- tableIdentifier.getDatabase(),
- tableIdentifier.getTableName()));
-
- processMetaList =
- processMetaList.stream()
- .filter(
- p ->
- StringUtils.isBlank(type)
- || type.equalsIgnoreCase(p.getOptimizingType().getStatus().displayValue()))
- .filter(p -> status == null || status.name().equalsIgnoreCase(p.getStatus().name()))
- .collect(Collectors.toList());
-
- int total = processMetaList.size();
- processMetaList =
- processMetaList.stream().skip(offset).limit(limit).collect(Collectors.toList());
- if (CollectionUtils.isEmpty(processMetaList)) {
- return Pair.of(Collections.emptyList(), 0);
+ int total = 0;
+ // page helper is 1-based
+ int pageNumber = (offset / limit) + 1;
+ List processMetaList = Collections.emptyList();
+ try (Page> ignored = PageHelper.startPage(pageNumber, limit, true)) {
+ processMetaList =
+ getAs(
+ OptimizingMapper.class,
+ mapper ->
+ mapper.selectOptimizingProcesses(
+ tableIdentifier.getCatalog(),
+ tableIdentifier.getDatabase(),
+ tableIdentifier.getTableName(),
+ type,
+ status,
+ offset,
+ limit));
+ PageInfo pageInfo = new PageInfo<>(processMetaList);
+ total = (int) pageInfo.getTotal();
+ LOG.info(
+ "Get optimizing processes total : {} , pageNumber:{}, limit:{}, offset:{}",
+ total,
+ pageNumber,
+ limit,
+ offset);
+ if (pageInfo.getSize() == 0) {
+ return Pair.of(Collections.emptyList(), 0);
+ }
}
List processIds =
processMetaList.stream()
@@ -537,6 +544,7 @@ public Pair, Integer> getOptimizingProcessesInfo(
getAs(OptimizingMapper.class, mapper -> mapper.selectOptimizeTaskMetas(processIds)).stream()
.collect(Collectors.groupingBy(OptimizingTaskMeta::getProcessId));
+ LOG.info("Get {} optimizing tasks. ", optimizingTasks.size());
return Pair.of(
processMetaList.stream()
.map(p -> buildOptimizingProcessInfo(p, optimizingTasks.get(p.getProcessId())))
diff --git a/amoro-ams/src/main/java/org/apache/amoro/server/dashboard/controller/TableController.java b/amoro-ams/src/main/java/org/apache/amoro/server/dashboard/controller/TableController.java
index 9db6f074df..e009840844 100644
--- a/amoro-ams/src/main/java/org/apache/amoro/server/dashboard/controller/TableController.java
+++ b/amoro-ams/src/main/java/org/apache/amoro/server/dashboard/controller/TableController.java
@@ -310,16 +310,20 @@ public void getOptimizingProcesses(Context ctx) {
String db = ctx.pathParam("db");
String table = ctx.pathParam("table");
String type = ctx.queryParam("type");
+
+ if (StringUtils.isBlank(type)) {
+ // treat all blank string to null
+ type = null;
+ }
+
String status = ctx.queryParam("status");
Integer page = ctx.queryParamAsClass("page", Integer.class).getOrDefault(1);
Integer pageSize = ctx.queryParamAsClass("pageSize", Integer.class).getOrDefault(20);
int offset = (page - 1) * pageSize;
int limit = pageSize;
- ServerCatalog serverCatalog = tableService.getServerCatalog(catalog);
Preconditions.checkArgument(offset >= 0, "offset[%s] must >= 0", offset);
Preconditions.checkArgument(limit >= 0, "limit[%s] must >= 0", limit);
- Preconditions.checkState(serverCatalog.tableExists(db, table), "no such table");
TableIdentifier tableIdentifier = TableIdentifier.of(catalog, db, table);
ProcessStatus processStatus =
diff --git a/amoro-ams/src/main/java/org/apache/amoro/server/optimizing/OptimizingProcess.java b/amoro-ams/src/main/java/org/apache/amoro/server/optimizing/OptimizingProcess.java
index 5e0f47447d..d8e0f58a4f 100644
--- a/amoro-ams/src/main/java/org/apache/amoro/server/optimizing/OptimizingProcess.java
+++ b/amoro-ams/src/main/java/org/apache/amoro/server/optimizing/OptimizingProcess.java
@@ -18,6 +18,8 @@
package org.apache.amoro.server.optimizing;
+import org.apache.amoro.process.ProcessStatus;
+
public interface OptimizingProcess {
long getProcessId();
@@ -36,7 +38,7 @@ public interface OptimizingProcess {
OptimizingType getOptimizingType();
- Status getStatus();
+ ProcessStatus getStatus();
long getRunningQuotaTime(long calculatingStartTime, long calculatingEndTime);
diff --git a/amoro-ams/src/main/java/org/apache/amoro/server/optimizing/OptimizingQueue.java b/amoro-ams/src/main/java/org/apache/amoro/server/optimizing/OptimizingQueue.java
index ccbe9ccd5a..710ec729db 100644
--- a/amoro-ams/src/main/java/org/apache/amoro/server/optimizing/OptimizingQueue.java
+++ b/amoro-ams/src/main/java/org/apache/amoro/server/optimizing/OptimizingQueue.java
@@ -24,6 +24,7 @@
import org.apache.amoro.api.OptimizingTaskId;
import org.apache.amoro.exception.OptimizingClosedException;
import org.apache.amoro.optimizing.RewriteFilesInput;
+import org.apache.amoro.process.ProcessStatus;
import org.apache.amoro.resource.ResourceGroup;
import org.apache.amoro.server.AmoroServiceConstants;
import org.apache.amoro.server.manager.MetricManager;
@@ -354,7 +355,7 @@ private class TableOptimizingProcess implements OptimizingProcess, TaskRuntime.T
private final Map> taskMap = Maps.newHashMap();
private final Queue> taskQueue = new LinkedList<>();
private final Lock lock = new ReentrantLock();
- private volatile Status status = OptimizingProcess.Status.RUNNING;
+ private volatile ProcessStatus status = ProcessStatus.RUNNING;
private volatile String failedReason;
private long endTime = AmoroServiceConstants.INVALID_TIME;
private Map fromSequence = Maps.newHashMap();
@@ -396,7 +397,7 @@ public TableOptimizingProcess(TableRuntime tableRuntime) {
if (tableRuntime.getToSequence() != null) {
toSequence = tableRuntime.getToSequence();
}
- if (this.status != OptimizingProcess.Status.CLOSED) {
+ if (this.status != ProcessStatus.CLOSED) {
tableRuntime.recover(this);
}
loadTaskRuntimes(this);
@@ -413,7 +414,7 @@ public OptimizingType getOptimizingType() {
}
@Override
- public Status getStatus() {
+ public ProcessStatus getStatus() {
return status;
}
@@ -421,10 +422,10 @@ public Status getStatus() {
public void close() {
lock.lock();
try {
- if (this.status != Status.RUNNING) {
+ if (this.status != ProcessStatus.RUNNING) {
return;
}
- this.status = OptimizingProcess.Status.CLOSED;
+ this.status = ProcessStatus.CLOSED;
this.endTime = System.currentTimeMillis();
persistProcessCompleted(false);
clearProcess(this);
@@ -468,7 +469,7 @@ public void acceptResult(TaskRuntime taskRuntime) {
} else {
clearProcess(this);
this.failedReason = taskRuntime.getFailReason();
- this.status = OptimizingProcess.Status.FAILED;
+ this.status = ProcessStatus.FAILED;
this.endTime = taskRuntime.getEndTime();
persistProcessCompleted(false);
}
@@ -481,15 +482,14 @@ public void acceptResult(TaskRuntime taskRuntime) {
// the cleanup of task should be done after unlock to avoid deadlock
@Override
public void releaseResourcesIfNecessary() {
- if (this.status == OptimizingProcess.Status.FAILED
- || this.status == OptimizingProcess.Status.CLOSED) {
+ if (this.status == ProcessStatus.FAILED || this.status == ProcessStatus.CLOSED) {
cancelTasks();
}
}
@Override
public boolean isClosed() {
- return status == OptimizingProcess.Status.CLOSED;
+ return status == ProcessStatus.CLOSED;
}
@Override
@@ -566,12 +566,12 @@ public void commit() {
try {
hasCommitted = true;
buildCommit().commit();
- status = Status.SUCCESS;
+ status = ProcessStatus.SUCCESS;
endTime = System.currentTimeMillis();
persistProcessCompleted(true);
} catch (Exception e) {
LOG.error("{} Commit optimizing failed ", tableRuntime.getTableIdentifier(), e);
- status = Status.FAILED;
+ status = ProcessStatus.FAILED;
failedReason = ExceptionUtil.getErrorMessage(e, 4000);
endTime = System.currentTimeMillis();
persistProcessCompleted(false);
diff --git a/amoro-ams/src/main/java/org/apache/amoro/server/persistence/SqlSessionFactoryProvider.java b/amoro-ams/src/main/java/org/apache/amoro/server/persistence/SqlSessionFactoryProvider.java
index 4560e5a4e7..14f4b8fdf3 100644
--- a/amoro-ams/src/main/java/org/apache/amoro/server/persistence/SqlSessionFactoryProvider.java
+++ b/amoro-ams/src/main/java/org/apache/amoro/server/persistence/SqlSessionFactoryProvider.java
@@ -18,6 +18,12 @@
package org.apache.amoro.server.persistence;
+import static com.github.pagehelper.page.PageAutoDialect.registerDialectAlias;
+
+import com.github.pagehelper.PageInterceptor;
+import com.github.pagehelper.dialect.helper.MySqlDialect;
+import com.github.pagehelper.dialect.helper.PostgreSqlDialect;
+import com.github.pagehelper.dialect.helper.SqlServer2012Dialect;
import org.apache.amoro.config.Configurations;
import org.apache.amoro.server.AmoroManagementConf;
import org.apache.amoro.server.persistence.mapper.ApiTokensMapper;
@@ -78,6 +84,9 @@ public static SqlSessionFactoryProvider getInstance() {
private volatile SqlSessionFactory sqlSessionFactory;
public void init(Configurations config) throws SQLException {
+
+ registerDialectAliases();
+
BasicDataSource dataSource = new BasicDataSource();
dataSource.setUrl(config.getString(AmoroManagementConf.DB_CONNECTION_URL));
dataSource.setDriverClassName(config.getString(AmoroManagementConf.DB_DRIVER_CLASS_NAME));
@@ -116,6 +125,12 @@ public void init(Configurations config) throws SQLException {
configuration.addMapper(ResourceMapper.class);
configuration.addMapper(TableBlockerMapper.class);
+ PageInterceptor interceptor = new PageInterceptor();
+ Properties interceptorProperties = new Properties();
+ interceptorProperties.setProperty("reasonable", "false");
+ interceptor.setProperties(interceptorProperties);
+ configuration.addInterceptor(interceptor);
+
DatabaseIdProvider provider = new VendorDatabaseIdProvider();
Properties properties = new Properties();
properties.setProperty("MySQL", "mysql");
@@ -133,6 +148,12 @@ public void init(Configurations config) throws SQLException {
createTablesIfNeed(config);
}
+ private void registerDialectAliases() {
+ registerDialectAlias("postgres", PostgreSqlDialect.class);
+ registerDialectAlias("mysql", MySqlDialect.class);
+ registerDialectAlias("derby", SqlServer2012Dialect.class);
+ }
+
/**
* create tables for database
*
diff --git a/amoro-ams/src/main/java/org/apache/amoro/server/persistence/mapper/OptimizingMapper.java b/amoro-ams/src/main/java/org/apache/amoro/server/persistence/mapper/OptimizingMapper.java
index 817e7cfd2a..7de47f5b54 100644
--- a/amoro-ams/src/main/java/org/apache/amoro/server/persistence/mapper/OptimizingMapper.java
+++ b/amoro-ams/src/main/java/org/apache/amoro/server/persistence/mapper/OptimizingMapper.java
@@ -20,8 +20,8 @@
import org.apache.amoro.ServerTableIdentifier;
import org.apache.amoro.optimizing.RewriteFilesInput;
+import org.apache.amoro.process.ProcessStatus;
import org.apache.amoro.server.optimizing.MetricsSummary;
-import org.apache.amoro.server.optimizing.OptimizingProcess;
import org.apache.amoro.server.optimizing.OptimizingProcessMeta;
import org.apache.amoro.server.optimizing.OptimizingTaskMeta;
import org.apache.amoro.server.optimizing.OptimizingType;
@@ -69,7 +69,7 @@ void insertOptimizingProcess(
@Param("processId") long processId,
@Param("targetSnapshotId") long targetSnapshotId,
@Param("targetChangeSnapshotId") long targetChangeSnapshotId,
- @Param("status") OptimizingProcess.Status status,
+ @Param("status") ProcessStatus status,
@Param("optimizingType") OptimizingType optimizingType,
@Param("planTime") long planTime,
@Param("summary") MetricsSummary summary,
@@ -85,19 +85,23 @@ void insertOptimizingProcess(
void updateOptimizingProcess(
@Param("tableId") long tableId,
@Param("processId") long processId,
- @Param("optimizingStatus") OptimizingProcess.Status status,
+ @Param("optimizingStatus") ProcessStatus status,
@Param("endTime") long endTime,
@Param("summary") MetricsSummary summary,
@Param("failedReason") String failedReason);
@Select(
- "SELECT a.process_id, a.table_id, a.catalog_name, a.db_name, a.table_name, a.target_snapshot_id,"
+ "")
@Results({
@Result(property = "processId", column = "process_id"),
@Result(property = "tableId", column = "table_id"),
@@ -124,7 +128,11 @@ void updateOptimizingProcess(
List selectOptimizingProcesses(
@Param("catalogName") String catalogName,
@Param("dbName") String dbName,
- @Param("tableName") String tableName);
+ @Param("tableName") String tableName,
+ @Param("optimizingType") String optimizingType,
+ @Param("optimizingStatus") ProcessStatus optimizingStatus,
+ @Param("pageNum") int pageNum,
+ @Param("pageSize") int pageSize);
/** Optimizing TaskRuntime operation below */
@Insert({
diff --git a/amoro-ams/src/main/java/org/apache/amoro/server/table/executor/TableRuntimeRefreshExecutor.java b/amoro-ams/src/main/java/org/apache/amoro/server/table/executor/TableRuntimeRefreshExecutor.java
index e613027f03..b290323ab3 100644
--- a/amoro-ams/src/main/java/org/apache/amoro/server/table/executor/TableRuntimeRefreshExecutor.java
+++ b/amoro-ams/src/main/java/org/apache/amoro/server/table/executor/TableRuntimeRefreshExecutor.java
@@ -20,6 +20,7 @@
import org.apache.amoro.AmoroTable;
import org.apache.amoro.config.TableConfiguration;
+import org.apache.amoro.process.ProcessStatus;
import org.apache.amoro.server.optimizing.OptimizingProcess;
import org.apache.amoro.server.optimizing.plan.OptimizingEvaluator;
import org.apache.amoro.server.table.TableManager;
@@ -73,8 +74,7 @@ public void handleConfigChanged(TableRuntime tableRuntime, TableConfiguration or
if (originalConfig.getOptimizingConfig().isEnabled()
&& !tableRuntime.getTableConfiguration().getOptimizingConfig().isEnabled()) {
OptimizingProcess optimizingProcess = tableRuntime.getOptimizingProcess();
- if (optimizingProcess != null
- && optimizingProcess.getStatus() == OptimizingProcess.Status.RUNNING) {
+ if (optimizingProcess != null && optimizingProcess.getStatus() == ProcessStatus.RUNNING) {
optimizingProcess.close();
}
}
diff --git a/amoro-ams/src/test/java/org/apache/amoro/server/TestDefaultOptimizingService.java b/amoro-ams/src/test/java/org/apache/amoro/server/TestDefaultOptimizingService.java
index 99b950a2b1..0c15cbf6f0 100644
--- a/amoro-ams/src/test/java/org/apache/amoro/server/TestDefaultOptimizingService.java
+++ b/amoro-ams/src/test/java/org/apache/amoro/server/TestDefaultOptimizingService.java
@@ -33,7 +33,7 @@
import org.apache.amoro.io.MixedDataTestHelpers;
import org.apache.amoro.optimizing.RewriteFilesOutput;
import org.apache.amoro.optimizing.TableOptimizing;
-import org.apache.amoro.server.optimizing.OptimizingProcess;
+import org.apache.amoro.process.ProcessStatus;
import org.apache.amoro.server.optimizing.OptimizingStatus;
import org.apache.amoro.server.optimizing.TaskRuntime;
import org.apache.amoro.server.resource.OptimizerInstance;
@@ -386,7 +386,7 @@ private void assertTaskCompleted(TaskRuntime taskRuntime) {
Assertions.assertEquals(
0, optimizingService().listTasks(defaultResourceGroup().getName()).size());
Assertions.assertEquals(
- OptimizingProcess.Status.RUNNING,
+ ProcessStatus.RUNNING,
tableService()
.getRuntime(serverTableIdentifier().getId())
.getOptimizingProcess()
diff --git a/amoro-ams/src/test/java/org/apache/amoro/server/dashboard/TestIcebergServerTableDescriptor.java b/amoro-ams/src/test/java/org/apache/amoro/server/dashboard/TestIcebergServerTableDescriptor.java
index 7bd4011544..356d886b0a 100644
--- a/amoro-ams/src/test/java/org/apache/amoro/server/dashboard/TestIcebergServerTableDescriptor.java
+++ b/amoro-ams/src/test/java/org/apache/amoro/server/dashboard/TestIcebergServerTableDescriptor.java
@@ -18,18 +18,55 @@
package org.apache.amoro.server.dashboard;
+import static org.mockito.Mockito.doReturn;
+import static org.mockito.Mockito.mock;
+
+import org.apache.amoro.AmoroTable;
+import org.apache.amoro.ServerTableIdentifier;
+import org.apache.amoro.TableFormat;
+import org.apache.amoro.config.Configurations;
import org.apache.amoro.formats.AmoroCatalogTestHelper;
import org.apache.amoro.formats.IcebergHadoopCatalogTestHelper;
+import org.apache.amoro.formats.iceberg.IcebergTable;
import org.apache.amoro.hive.formats.IcebergHiveCatalogTestHelper;
+import org.apache.amoro.process.ProcessStatus;
+import org.apache.amoro.server.AmoroManagementConf;
+import org.apache.amoro.server.optimizing.MetricsSummary;
+import org.apache.amoro.server.optimizing.OptimizingType;
+import org.apache.amoro.server.persistence.SqlSessionFactoryProvider;
+import org.apache.amoro.server.persistence.mapper.OptimizingMapper;
+import org.apache.amoro.server.persistence.mapper.TableMetaMapper;
+import org.apache.amoro.server.table.DerbyPersistence;
+import org.apache.amoro.shade.guava32.com.google.common.collect.Lists;
+import org.apache.amoro.table.TableIdentifier;
import org.apache.amoro.table.descriptor.FormatTableDescriptor;
+import org.apache.amoro.table.descriptor.OptimizingProcessInfo;
import org.apache.amoro.table.descriptor.TestServerTableDescriptor;
+import org.apache.commons.lang3.tuple.Pair;
+import org.apache.ibatis.session.SqlSession;
import org.apache.iceberg.Table;
import org.apache.iceberg.types.Types;
+import org.junit.After;
+import org.junit.Assert;
+import org.junit.Test;
+import org.junit.rules.TemporaryFolder;
import org.junit.runner.RunWith;
import org.junit.runners.Parameterized;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import java.io.IOException;
+import java.sql.Connection;
+import java.sql.ResultSet;
+import java.sql.SQLException;
+import java.sql.Statement;
+import java.util.Collections;
+import java.util.List;
+import java.util.Map;
@RunWith(Parameterized.class)
public class TestIcebergServerTableDescriptor extends TestServerTableDescriptor {
+ Persistency persistency = null;
public TestIcebergServerTableDescriptor(AmoroCatalogTestHelper> amoroCatalogTestHelper) {
super(amoroCatalogTestHelper);
@@ -51,6 +88,189 @@ protected void tableOperationsAddColumns() {
.commit();
}
+ @After
+ public void after() throws IOException {
+ if (persistency != null) {
+ persistency.truncateAllTables();
+ }
+ super.after();
+ }
+
+ @Test
+ public void testOptimizingPorcess() {
+ Persistency persistency = new Persistency();
+
+ String catalogName = "catalog1";
+ String dbName = "db1";
+ String tableName = "table1";
+
+ ServerTableIdentifier identifier =
+ ServerTableIdentifier.of(1L, catalogName, dbName, tableName, TableFormat.ICEBERG);
+ persistency.insertTable(identifier);
+ MetricsSummary dummySummery = new MetricsSummary();
+ dummySummery.setNewDeleteFileCnt(1);
+ dummySummery.setNewFileCnt(1);
+ dummySummery.setNewFileSize(1);
+ dummySummery.setNewDeleteFileCnt(1);
+ dummySummery.setNewDeleteSize(1);
+ persistency.insertOptimizingProcess(
+ identifier,
+ 1L,
+ 1,
+ 1,
+ ProcessStatus.SUCCESS,
+ OptimizingType.MAJOR,
+ 1L,
+ dummySummery,
+ Collections.emptyMap(),
+ Collections.emptyMap());
+
+ persistency.insertOptimizingProcess(
+ identifier,
+ 2L,
+ 2L,
+ 2L,
+ ProcessStatus.SUCCESS,
+ OptimizingType.MINOR,
+ 2L,
+ dummySummery,
+ Collections.emptyMap(),
+ Collections.emptyMap());
+
+ persistency.insertOptimizingProcess(
+ identifier,
+ 3L,
+ 3L,
+ 3L,
+ ProcessStatus.SUCCESS,
+ OptimizingType.FULL,
+ 3L,
+ dummySummery,
+ Collections.emptyMap(),
+ Collections.emptyMap());
+
+ persistency.insertOptimizingProcess(
+ identifier,
+ 4L,
+ 4L,
+ 4L,
+ ProcessStatus.FAILED,
+ OptimizingType.MAJOR,
+ 4L,
+ dummySummery,
+ Collections.emptyMap(),
+ Collections.emptyMap());
+
+ persistency.insertOptimizingProcess(
+ identifier,
+ 5L,
+ 5L,
+ 5L,
+ ProcessStatus.SUCCESS,
+ OptimizingType.MINOR,
+ 5L,
+ dummySummery,
+ Collections.emptyMap(),
+ Collections.emptyMap());
+
+ persistency.insertOptimizingProcess(
+ identifier,
+ 6L,
+ 6L,
+ 6L,
+ ProcessStatus.SUCCESS,
+ OptimizingType.FULL,
+ 6L,
+ dummySummery,
+ Collections.emptyMap(),
+ Collections.emptyMap());
+
+ persistency.insertOptimizingProcess(
+ identifier,
+ 7L,
+ 7L,
+ 7L,
+ ProcessStatus.FAILED,
+ OptimizingType.MAJOR,
+ 7L,
+ dummySummery,
+ Collections.emptyMap(),
+ Collections.emptyMap());
+
+ persistency.insertOptimizingProcess(
+ identifier,
+ 8L,
+ 8L,
+ 8L,
+ ProcessStatus.SUCCESS,
+ OptimizingType.MINOR,
+ 8L,
+ dummySummery,
+ Collections.emptyMap(),
+ Collections.emptyMap());
+
+ persistency.insertOptimizingProcess(
+ identifier,
+ 9L,
+ 9L,
+ 9L,
+ ProcessStatus.FAILED,
+ OptimizingType.FULL,
+ 9L,
+ dummySummery,
+ Collections.emptyMap(),
+ Collections.emptyMap());
+
+ persistency.insertOptimizingProcess(
+ identifier,
+ 10L,
+ 10L,
+ 10L,
+ ProcessStatus.SUCCESS,
+ OptimizingType.MINOR,
+ 10L,
+ dummySummery,
+ Collections.emptyMap(),
+ Collections.emptyMap());
+
+ AmoroTable> table = mock(IcebergTable.class);
+ TableIdentifier tableIdentifier =
+ TableIdentifier.of(
+ identifier.getCatalog(), identifier.getDatabase(), identifier.getTableName());
+ doReturn(tableIdentifier).when(table).id();
+
+ Pair, Integer> res =
+ persistency.getOptimizingProcessesInfo(table, null, null, 4, 4);
+ Integer expectResturnItemSizeForNoTypeNoStatusOffset0Limit5 = 4;
+ Integer expectTotalForNoTypeNoStatusOffset0Limit5 = 10;
+ Assert.assertEquals(
+ expectResturnItemSizeForNoTypeNoStatusOffset0Limit5, (Integer) res.getLeft().size());
+ Assert.assertEquals(expectTotalForNoTypeNoStatusOffset0Limit5, res.getRight());
+
+ res = persistency.getOptimizingProcessesInfo(table, null, ProcessStatus.SUCCESS, 5, 0);
+ Integer expectReturnItemSizeForOnlyStatusOffset0limit5 = 5;
+ Integer expectedTotalForOnlyStatusOffset0Limit5 = 7;
+ Assert.assertEquals(
+ expectReturnItemSizeForOnlyStatusOffset0limit5, (Integer) res.getLeft().size());
+ Assert.assertEquals(expectedTotalForOnlyStatusOffset0Limit5, res.getRight());
+
+ res = persistency.getOptimizingProcessesInfo(table, OptimizingType.MINOR.name(), null, 5, 0);
+ Integer expectedRetItemsSizeForOnlyTypeOffset0Limit5 = 4;
+ Integer expectedRetTotalForOnlyTypeOffset0Limit5 = 4;
+ Assert.assertEquals(
+ expectedRetItemsSizeForOnlyTypeOffset0Limit5, (Integer) res.getLeft().size());
+ Assert.assertEquals(expectedRetTotalForOnlyTypeOffset0Limit5, res.getRight());
+
+ res =
+ persistency.getOptimizingProcessesInfo(
+ table, OptimizingType.MINOR.name(), ProcessStatus.SUCCESS, 2, 2);
+ Integer expectedRetItemSizeForBothTypeAndStatusOffset2Limit2 = 2;
+ Integer expectedRetTotalForBothTypeAndStatusOffset2Limit2 = 4;
+ Assert.assertEquals(
+ expectedRetItemSizeForBothTypeAndStatusOffset2Limit2, (Integer) res.getLeft().size());
+ Assert.assertEquals(expectedRetTotalForBothTypeAndStatusOffset2Limit2, res.getRight());
+ }
+
@Override
protected void tableOperationsRenameColumns() {
getTable().updateSchema().renameColumn("new_col", "renamed_col").commit();
@@ -87,4 +307,93 @@ protected FormatTableDescriptor getTableDescriptor() {
private Table getTable() {
return (Table) getAmoroCatalog().loadTable(TEST_DB, TEST_TABLE).originalTable();
}
+
+ /**
+ * Test persistence class used to test MixedAndIcebergTableDescriptor, it will use derby as the
+ * underly db.
+ */
+ private static class Persistency extends MixedAndIcebergTableDescriptor {
+ private static final Logger LOG = LoggerFactory.getLogger(DerbyPersistence.class);
+
+ private static TemporaryFolder SINGLETON_FOLDER;
+
+ Persistency() {
+ try {
+ SINGLETON_FOLDER = new TemporaryFolder();
+ SINGLETON_FOLDER.create();
+ String derbyFilePath = SINGLETON_FOLDER.newFolder("derby").getPath();
+ String derbyUrl = String.format("jdbc:derby:%s/derby;create=true", derbyFilePath);
+ Configurations configurations = new Configurations();
+ configurations.set(AmoroManagementConf.DB_CONNECTION_URL, derbyUrl);
+ configurations.set(AmoroManagementConf.DB_TYPE, AmoroManagementConf.DB_TYPE_DERBY);
+ configurations.set(
+ AmoroManagementConf.DB_DRIVER_CLASS_NAME, "org.apache.derby.jdbc.EmbeddedDriver");
+ SqlSessionFactoryProvider.getInstance().init(configurations);
+ LOG.info("Initialized derby persistent with url: {}", derbyUrl);
+ Runtime.getRuntime()
+ .addShutdownHook(
+ new Thread(
+ () -> {
+ SINGLETON_FOLDER.delete();
+ LOG.info("Deleted resources in derby persistent.");
+ }));
+ truncateAllTables();
+ } catch (Exception e) {
+ throw new RuntimeException(e);
+ }
+ }
+
+ private void truncateAllTables() {
+ try (SqlSession sqlSession =
+ SqlSessionFactoryProvider.getInstance().get().openSession(true)) {
+ try (Connection connection = sqlSession.getConnection()) {
+ try (Statement statement = connection.createStatement()) {
+ String query = "SELECT TABLENAME FROM SYS.SYSTABLES WHERE TABLETYPE='T'";
+ List tableList = Lists.newArrayList();
+ try (ResultSet rs = statement.executeQuery(query)) {
+ while (rs.next()) {
+ tableList.add(rs.getString(1));
+ }
+ }
+ for (String table : tableList) {
+ statement.execute("TRUNCATE TABLE " + table);
+ }
+ }
+ }
+ } catch (SQLException e) {
+ throw new RuntimeException("Clear table failed", e);
+ }
+ }
+
+ public void insertTable(ServerTableIdentifier identifier) {
+ doAs(TableMetaMapper.class, mapper -> mapper.insertTable(identifier));
+ }
+
+ public void insertOptimizingProcess(
+ ServerTableIdentifier identifier,
+ long processId,
+ long targetSnapshotId,
+ long targetChangeSnapshotId,
+ ProcessStatus status,
+ OptimizingType type,
+ long planTime,
+ MetricsSummary summary,
+ Map fromSequence,
+ Map toSequence) {
+ doAs(
+ OptimizingMapper.class,
+ mapper ->
+ mapper.insertOptimizingProcess(
+ identifier,
+ processId,
+ targetSnapshotId,
+ targetChangeSnapshotId,
+ status,
+ type,
+ planTime,
+ summary,
+ fromSequence,
+ toSequence));
+ }
+ }
}
diff --git a/amoro-ams/src/test/java/org/apache/amoro/server/optimizing/BaseOptimizingChecker.java b/amoro-ams/src/test/java/org/apache/amoro/server/optimizing/BaseOptimizingChecker.java
index 965e7d43bc..f4acb99137 100644
--- a/amoro-ams/src/test/java/org/apache/amoro/server/optimizing/BaseOptimizingChecker.java
+++ b/amoro-ams/src/test/java/org/apache/amoro/server/optimizing/BaseOptimizingChecker.java
@@ -116,7 +116,11 @@ protected OptimizingProcessMeta waitOptimizeResult() {
mapper.selectOptimizingProcesses(
tableIdentifier.getCatalog(),
tableIdentifier.getDatabase(),
- tableIdentifier.getTableName()));
+ tableIdentifier.getTableName(),
+ null,
+ null,
+ 0,
+ Integer.MAX_VALUE));
if (tableOptimizingProcesses == null || tableOptimizingProcesses.isEmpty()) {
LOG.info("optimize history is empty");
return Status.RUNNING;
@@ -153,7 +157,11 @@ protected OptimizingProcessMeta waitOptimizeResult() {
mapper.selectOptimizingProcesses(
tableIdentifier.getCatalog(),
tableIdentifier.getDatabase(),
- tableIdentifier.getTableName()))
+ tableIdentifier.getTableName(),
+ null,
+ null,
+ 0,
+ Integer.MAX_VALUE))
.stream()
.filter(p -> p.getProcessId() > lastProcessId)
.filter(p -> p.getStatus().equals(OptimizingProcess.Status.SUCCESS))
@@ -182,11 +190,15 @@ protected void assertOptimizeHangUp() {
mapper.selectOptimizingProcesses(
tableIdentifier.getCatalog(),
tableIdentifier.getDatabase(),
- tableIdentifier.getTableName()))
+ tableIdentifier.getTableName(),
+ null,
+ null,
+ 0,
+ Integer.MAX_VALUE))
.stream()
.filter(p -> p.getProcessId() > lastProcessId)
.collect(Collectors.toList());
- Assert.assertFalse("optimize is not stopped", tableOptimizingProcesses.size() > 0);
+ Assert.assertTrue("optimize is not stopped", tableOptimizingProcesses.isEmpty());
}
protected boolean waitUntilFinish(Supplier statusSupplier, final long timeout)
diff --git a/amoro-ams/src/test/java/org/apache/amoro/server/optimizing/TestOptimizingQueue.java b/amoro-ams/src/test/java/org/apache/amoro/server/optimizing/TestOptimizingQueue.java
index 24ce7c0366..bf668fbcfb 100644
--- a/amoro-ams/src/test/java/org/apache/amoro/server/optimizing/TestOptimizingQueue.java
+++ b/amoro-ams/src/test/java/org/apache/amoro/server/optimizing/TestOptimizingQueue.java
@@ -43,6 +43,7 @@
import org.apache.amoro.metrics.MetricKey;
import org.apache.amoro.optimizing.RewriteFilesOutput;
import org.apache.amoro.optimizing.TableOptimizing;
+import org.apache.amoro.process.ProcessStatus;
import org.apache.amoro.resource.ResourceGroup;
import org.apache.amoro.server.manager.MetricManager;
import org.apache.amoro.server.metrics.MetricRegistry;
@@ -219,14 +220,14 @@ public void testCommitTask() {
// 7.commit
OptimizingProcess optimizingProcess = tableRuntime.getOptimizingProcess();
- Assert.assertEquals(OptimizingProcess.Status.RUNNING, optimizingProcess.getStatus());
+ Assert.assertEquals(ProcessStatus.RUNNING, optimizingProcess.getStatus());
optimizingProcess.commit();
- Assert.assertEquals(OptimizingProcess.Status.SUCCESS, optimizingProcess.getStatus());
+ Assert.assertEquals(ProcessStatus.SUCCESS, optimizingProcess.getStatus());
Assert.assertNull(tableRuntime.getOptimizingProcess());
// 8.commit again, throw exceptions, and status not changed.
Assert.assertThrows(IllegalStateException.class, optimizingProcess::commit);
- Assert.assertEquals(OptimizingProcess.Status.SUCCESS, optimizingProcess.getStatus());
+ Assert.assertEquals(ProcessStatus.SUCCESS, optimizingProcess.getStatus());
Assert.assertEquals(0, queue.collectTasks().size());
queue.dispose();
diff --git a/pom.xml b/pom.xml
index 2b7290b220..173d1d3ba8 100644
--- a/pom.xml
+++ b/pom.xml
@@ -149,6 +149,7 @@
1.3.2
32.1.1-jre
0.14.1
+ 6.1.0
compile
compile