Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

[AMORO-3066] Optimizing the efficiency for optimizing-processes rest api #3257

Open
wants to merge 6 commits into
base: master
Choose a base branch
from
Open
6 changes: 6 additions & 0 deletions amoro-ams/pom.xml
Original file line number Diff line number Diff line change
Expand Up @@ -394,6 +394,12 @@
</exclusions>
</dependency>

<dependency>
<groupId>com.github.pagehelper</groupId>
<artifactId>pagehelper</artifactId>
<version>${pagehelper.version}</version>
</dependency>

<dependency>
<groupId>org.apache.iceberg</groupId>
<artifactId>iceberg-data</artifactId>
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -18,6 +18,9 @@

package org.apache.amoro.server.dashboard;

import com.github.pagehelper.Page;
import com.github.pagehelper.PageHelper;
import com.github.pagehelper.PageInfo;
import org.apache.amoro.AmoroTable;
import org.apache.amoro.TableFormat;
import org.apache.amoro.api.CommitMetaProducer;
Expand Down Expand Up @@ -65,7 +68,6 @@
import org.apache.amoro.utils.MixedDataFiles;
import org.apache.amoro.utils.MixedTableUtil;
import org.apache.commons.collections.CollectionUtils;
import org.apache.commons.lang3.StringUtils;
import org.apache.commons.lang3.tuple.Pair;
import org.apache.iceberg.ContentFile;
import org.apache.iceberg.HasTableOperations;
Expand Down Expand Up @@ -505,29 +507,34 @@ public List<ConsumerInfo> getTableConsumerInfos(AmoroTable<?> amoroTable) {
public Pair<List<OptimizingProcessInfo>, Integer> getOptimizingProcessesInfo(
AmoroTable<?> amoroTable, String type, ProcessStatus status, int limit, int offset) {
TableIdentifier tableIdentifier = amoroTable.id();
List<OptimizingProcessMeta> processMetaList =
getAs(
OptimizingMapper.class,
mapper ->
mapper.selectOptimizingProcesses(
tableIdentifier.getCatalog(),
tableIdentifier.getDatabase(),
tableIdentifier.getTableName()));

processMetaList =
processMetaList.stream()
.filter(
p ->
StringUtils.isBlank(type)
|| type.equalsIgnoreCase(p.getOptimizingType().getStatus().displayValue()))
.filter(p -> status == null || status.name().equalsIgnoreCase(p.getStatus().name()))
.collect(Collectors.toList());

int total = processMetaList.size();
processMetaList =
processMetaList.stream().skip(offset).limit(limit).collect(Collectors.toList());
if (CollectionUtils.isEmpty(processMetaList)) {
return Pair.of(Collections.emptyList(), 0);
int total = 0;
// page helper is 1-based
int pageNumber = (offset / limit) + 1;
List<OptimizingProcessMeta> processMetaList = Collections.emptyList();
try (Page<?> ignored = PageHelper.startPage(pageNumber, limit, true)) {
processMetaList =
getAs(
OptimizingMapper.class,
mapper ->
mapper.selectOptimizingProcesses(
klion26 marked this conversation as resolved.
Show resolved Hide resolved
tableIdentifier.getCatalog(),
tableIdentifier.getDatabase(),
tableIdentifier.getTableName(),
type,
status,
offset,
limit));
PageInfo<OptimizingProcessMeta> pageInfo = new PageInfo<>(processMetaList);
total = (int) pageInfo.getTotal();
LOG.info(
"Get optimizing processes total : {} , pageNumber:{}, limit:{}, offset:{}",
total,
pageNumber,
limit,
offset);
if (pageInfo.getSize() == 0) {
return Pair.of(Collections.emptyList(), 0);
}
}
List<Long> processIds =
processMetaList.stream()
Expand All @@ -537,6 +544,7 @@ public Pair<List<OptimizingProcessInfo>, Integer> getOptimizingProcessesInfo(
getAs(OptimizingMapper.class, mapper -> mapper.selectOptimizeTaskMetas(processIds)).stream()
.collect(Collectors.groupingBy(OptimizingTaskMeta::getProcessId));

LOG.info("Get {} optimizing tasks. ", optimizingTasks.size());
return Pair.of(
processMetaList.stream()
.map(p -> buildOptimizingProcessInfo(p, optimizingTasks.get(p.getProcessId())))
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -316,10 +316,8 @@ public void getOptimizingProcesses(Context ctx) {

int offset = (page - 1) * pageSize;
int limit = pageSize;
ServerCatalog serverCatalog = tableService.getServerCatalog(catalog);
Preconditions.checkArgument(offset >= 0, "offset[%s] must >= 0", offset);
Preconditions.checkArgument(limit >= 0, "limit[%s] must >= 0", limit);
Preconditions.checkState(serverCatalog.tableExists(db, table), "no such table");

TableIdentifier tableIdentifier = TableIdentifier.of(catalog, db, table);
ProcessStatus processStatus =
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -18,6 +18,8 @@

package org.apache.amoro.server.optimizing;

import org.apache.amoro.process.ProcessStatus;

public interface OptimizingProcess {

long getProcessId();
Expand All @@ -36,7 +38,7 @@ public interface OptimizingProcess {

OptimizingType getOptimizingType();

Status getStatus();
ProcessStatus getStatus();

long getRunningQuotaTime(long calculatingStartTime, long calculatingEndTime);

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -24,6 +24,7 @@
import org.apache.amoro.api.OptimizingTaskId;
import org.apache.amoro.exception.OptimizingClosedException;
import org.apache.amoro.optimizing.RewriteFilesInput;
import org.apache.amoro.process.ProcessStatus;
import org.apache.amoro.resource.ResourceGroup;
import org.apache.amoro.server.AmoroServiceConstants;
import org.apache.amoro.server.manager.MetricManager;
Expand Down Expand Up @@ -354,7 +355,7 @@ private class TableOptimizingProcess implements OptimizingProcess, TaskRuntime.T
private final Map<OptimizingTaskId, TaskRuntime<RewriteStageTask>> taskMap = Maps.newHashMap();
private final Queue<TaskRuntime<RewriteStageTask>> taskQueue = new LinkedList<>();
private final Lock lock = new ReentrantLock();
private volatile Status status = OptimizingProcess.Status.RUNNING;
private volatile ProcessStatus status = ProcessStatus.RUNNING;
private volatile String failedReason;
private long endTime = AmoroServiceConstants.INVALID_TIME;
private Map<String, Long> fromSequence = Maps.newHashMap();
Expand Down Expand Up @@ -396,7 +397,7 @@ public TableOptimizingProcess(TableRuntime tableRuntime) {
if (tableRuntime.getToSequence() != null) {
toSequence = tableRuntime.getToSequence();
}
if (this.status != OptimizingProcess.Status.CLOSED) {
if (this.status != ProcessStatus.CLOSED) {
tableRuntime.recover(this);
}
loadTaskRuntimes(this);
Expand All @@ -413,18 +414,18 @@ public OptimizingType getOptimizingType() {
}

@Override
public Status getStatus() {
public ProcessStatus getStatus() {
return status;
}

@Override
public void close() {
lock.lock();
try {
if (this.status != Status.RUNNING) {
if (this.status != ProcessStatus.RUNNING) {
return;
}
this.status = OptimizingProcess.Status.CLOSED;
this.status = ProcessStatus.CLOSED;
this.endTime = System.currentTimeMillis();
persistProcessCompleted(false);
clearProcess(this);
Expand Down Expand Up @@ -468,7 +469,7 @@ public void acceptResult(TaskRuntime taskRuntime) {
} else {
clearProcess(this);
this.failedReason = taskRuntime.getFailReason();
this.status = OptimizingProcess.Status.FAILED;
this.status = ProcessStatus.FAILED;
this.endTime = taskRuntime.getEndTime();
persistProcessCompleted(false);
}
Expand All @@ -481,15 +482,14 @@ public void acceptResult(TaskRuntime taskRuntime) {
// the cleanup of task should be done after unlock to avoid deadlock
@Override
public void releaseResourcesIfNecessary() {
if (this.status == OptimizingProcess.Status.FAILED
|| this.status == OptimizingProcess.Status.CLOSED) {
if (this.status == ProcessStatus.FAILED || this.status == ProcessStatus.CLOSED) {
cancelTasks();
}
}

@Override
public boolean isClosed() {
return status == OptimizingProcess.Status.CLOSED;
return status == ProcessStatus.CLOSED;
}

@Override
Expand Down Expand Up @@ -566,12 +566,12 @@ public void commit() {
try {
hasCommitted = true;
buildCommit().commit();
status = Status.SUCCESS;
status = ProcessStatus.SUCCESS;
endTime = System.currentTimeMillis();
persistProcessCompleted(true);
} catch (Exception e) {
LOG.error("{} Commit optimizing failed ", tableRuntime.getTableIdentifier(), e);
status = Status.FAILED;
status = ProcessStatus.FAILED;
failedReason = ExceptionUtil.getErrorMessage(e, 4000);
endTime = System.currentTimeMillis();
persistProcessCompleted(false);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -18,6 +18,12 @@

package org.apache.amoro.server.persistence;

import static com.github.pagehelper.page.PageAutoDialect.registerDialectAlias;

import com.github.pagehelper.PageInterceptor;
import com.github.pagehelper.dialect.helper.MySqlDialect;
import com.github.pagehelper.dialect.helper.PostgreSqlDialect;
import com.github.pagehelper.dialect.helper.SqlServer2012Dialect;
import org.apache.amoro.config.Configurations;
import org.apache.amoro.server.AmoroManagementConf;
import org.apache.amoro.server.persistence.mapper.ApiTokensMapper;
Expand Down Expand Up @@ -78,6 +84,9 @@ public static SqlSessionFactoryProvider getInstance() {
private volatile SqlSessionFactory sqlSessionFactory;

public void init(Configurations config) throws SQLException {

registerDialectAliases();

BasicDataSource dataSource = new BasicDataSource();
dataSource.setUrl(config.getString(AmoroManagementConf.DB_CONNECTION_URL));
dataSource.setDriverClassName(config.getString(AmoroManagementConf.DB_DRIVER_CLASS_NAME));
Expand Down Expand Up @@ -116,6 +125,12 @@ public void init(Configurations config) throws SQLException {
configuration.addMapper(ResourceMapper.class);
configuration.addMapper(TableBlockerMapper.class);

PageInterceptor interceptor = new PageInterceptor();
Properties interceptorProperties = new Properties();
interceptorProperties.setProperty("reasonable", "false");
interceptor.setProperties(interceptorProperties);
configuration.addInterceptor(interceptor);

DatabaseIdProvider provider = new VendorDatabaseIdProvider();
Properties properties = new Properties();
properties.setProperty("MySQL", "mysql");
Expand All @@ -133,6 +148,12 @@ public void init(Configurations config) throws SQLException {
createTablesIfNeed(config);
}

private void registerDialectAliases() {
registerDialectAlias("postgres", PostgreSqlDialect.class);
registerDialectAlias("mysql", MySqlDialect.class);
registerDialectAlias("derby", SqlServer2012Dialect.class);
}

/**
* create tables for database
*
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -20,8 +20,8 @@

import org.apache.amoro.ServerTableIdentifier;
import org.apache.amoro.optimizing.RewriteFilesInput;
import org.apache.amoro.process.ProcessStatus;
import org.apache.amoro.server.optimizing.MetricsSummary;
import org.apache.amoro.server.optimizing.OptimizingProcess;
import org.apache.amoro.server.optimizing.OptimizingProcessMeta;
import org.apache.amoro.server.optimizing.OptimizingTaskMeta;
import org.apache.amoro.server.optimizing.OptimizingType;
Expand Down Expand Up @@ -69,7 +69,7 @@ void insertOptimizingProcess(
@Param("processId") long processId,
@Param("targetSnapshotId") long targetSnapshotId,
@Param("targetChangeSnapshotId") long targetChangeSnapshotId,
@Param("status") OptimizingProcess.Status status,
@Param("status") ProcessStatus status,
@Param("optimizingType") OptimizingType optimizingType,
@Param("planTime") long planTime,
@Param("summary") MetricsSummary summary,
Expand All @@ -85,19 +85,23 @@ void insertOptimizingProcess(
void updateOptimizingProcess(
@Param("tableId") long tableId,
@Param("processId") long processId,
@Param("optimizingStatus") OptimizingProcess.Status status,
@Param("optimizingStatus") ProcessStatus status,
@Param("endTime") long endTime,
@Param("summary") MetricsSummary summary,
@Param("failedReason") String failedReason);

@Select(
"SELECT a.process_id, a.table_id, a.catalog_name, a.db_name, a.table_name, a.target_snapshot_id,"
"<script>"
+ "SELECT a.process_id, a.table_id, a.catalog_name, a.db_name, a.table_name, a.target_snapshot_id,"
+ " a.target_change_snapshot_id, a.status, a.optimizing_type, a.plan_time, a.end_time,"
+ " a.fail_reason, a.summary, a.from_sequence, a.to_sequence FROM table_optimizing_process a"
+ " INNER JOIN table_identifier b ON a.table_id = b.table_id"
+ " WHERE a.catalog_name = #{catalogName} AND a.db_name = #{dbName} AND a.table_name = #{tableName}"
+ " AND b.catalog_name = #{catalogName} AND b.db_name = #{dbName} AND b.table_name = #{tableName}"
+ " ORDER BY process_id desc")
+ " <if test='optimizingType != null'> AND a.optimizing_type = #{optimizingType}</if>"
+ " <if test='optimizingStatus != null'> AND a.status = #{optimizingStatus}</if>"
+ " ORDER BY process_id desc"
+ "</script>")
@Results({
@Result(property = "processId", column = "process_id"),
@Result(property = "tableId", column = "table_id"),
Expand All @@ -124,7 +128,11 @@ void updateOptimizingProcess(
List<OptimizingProcessMeta> selectOptimizingProcesses(
@Param("catalogName") String catalogName,
@Param("dbName") String dbName,
@Param("tableName") String tableName);
@Param("tableName") String tableName,
@Param("optimizingType") String optimizingType,
@Param("optimizingStatus") ProcessStatus optimizingStatus,
@Param("pageNum") int pageNum,
@Param("pageSize") int pageSize);

/** Optimizing TaskRuntime operation below */
@Insert({
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -20,6 +20,7 @@

import org.apache.amoro.AmoroTable;
import org.apache.amoro.config.TableConfiguration;
import org.apache.amoro.process.ProcessStatus;
import org.apache.amoro.server.optimizing.OptimizingProcess;
import org.apache.amoro.server.optimizing.plan.OptimizingEvaluator;
import org.apache.amoro.server.table.TableManager;
Expand Down Expand Up @@ -73,8 +74,7 @@ public void handleConfigChanged(TableRuntime tableRuntime, TableConfiguration or
if (originalConfig.getOptimizingConfig().isEnabled()
&& !tableRuntime.getTableConfiguration().getOptimizingConfig().isEnabled()) {
OptimizingProcess optimizingProcess = tableRuntime.getOptimizingProcess();
if (optimizingProcess != null
&& optimizingProcess.getStatus() == OptimizingProcess.Status.RUNNING) {
if (optimizingProcess != null && optimizingProcess.getStatus() == ProcessStatus.RUNNING) {
optimizingProcess.close();
}
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -33,7 +33,7 @@
import org.apache.amoro.io.MixedDataTestHelpers;
import org.apache.amoro.optimizing.RewriteFilesOutput;
import org.apache.amoro.optimizing.TableOptimizing;
import org.apache.amoro.server.optimizing.OptimizingProcess;
import org.apache.amoro.process.ProcessStatus;
import org.apache.amoro.server.optimizing.OptimizingStatus;
import org.apache.amoro.server.optimizing.TaskRuntime;
import org.apache.amoro.server.resource.OptimizerInstance;
Expand Down Expand Up @@ -386,7 +386,7 @@ private void assertTaskCompleted(TaskRuntime taskRuntime) {
Assertions.assertEquals(
0, optimizingService().listTasks(defaultResourceGroup().getName()).size());
Assertions.assertEquals(
OptimizingProcess.Status.RUNNING,
ProcessStatus.RUNNING,
tableService()
.getRuntime(serverTableIdentifier().getId())
.getOptimizingProcess()
Expand Down
Loading
Loading