Skip to content

Commit

Permalink
fixup! [AMORO-3066] Optimizing the efficiency for optimizing-processe…
Browse files Browse the repository at this point in the history
…s rest api
  • Loading branch information
klion26 committed Oct 17, 2024
1 parent bad18a7 commit d03ee6f
Show file tree
Hide file tree
Showing 7 changed files with 343 additions and 20 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -520,6 +520,8 @@ public Pair<List<OptimizingProcessInfo>, Integer> getOptimizingProcessesInfo(
tableIdentifier.getCatalog(),
tableIdentifier.getDatabase(),
tableIdentifier.getTableName(),
type,
status,
offset,
limit));
PageInfo<OptimizingProcessMeta> pageInfo = new PageInfo<>(processMetaList);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -18,6 +18,8 @@

package org.apache.amoro.server.optimizing;

import org.apache.amoro.process.ProcessStatus;

public interface OptimizingProcess {

long getProcessId();
Expand All @@ -36,7 +38,7 @@ public interface OptimizingProcess {

OptimizingType getOptimizingType();

Status getStatus();
ProcessStatus getStatus();

long getRunningQuotaTime(long calculatingStartTime, long calculatingEndTime);

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -24,6 +24,7 @@
import org.apache.amoro.api.OptimizingTaskId;
import org.apache.amoro.exception.OptimizingClosedException;
import org.apache.amoro.optimizing.RewriteFilesInput;
import org.apache.amoro.process.ProcessStatus;
import org.apache.amoro.resource.ResourceGroup;
import org.apache.amoro.server.AmoroServiceConstants;
import org.apache.amoro.server.manager.MetricManager;
Expand Down Expand Up @@ -354,7 +355,7 @@ private class TableOptimizingProcess implements OptimizingProcess, TaskRuntime.T
private final Map<OptimizingTaskId, TaskRuntime<RewriteStageTask>> taskMap = Maps.newHashMap();
private final Queue<TaskRuntime<RewriteStageTask>> taskQueue = new LinkedList<>();
private final Lock lock = new ReentrantLock();
private volatile Status status = OptimizingProcess.Status.RUNNING;
private volatile ProcessStatus status = ProcessStatus.RUNNING;
private volatile String failedReason;
private long endTime = AmoroServiceConstants.INVALID_TIME;
private Map<String, Long> fromSequence = Maps.newHashMap();
Expand Down Expand Up @@ -396,7 +397,7 @@ public TableOptimizingProcess(TableRuntime tableRuntime) {
if (tableRuntime.getToSequence() != null) {
toSequence = tableRuntime.getToSequence();
}
if (this.status != OptimizingProcess.Status.CLOSED) {
if (this.status != ProcessStatus.CLOSED) {
tableRuntime.recover(this);
}
loadTaskRuntimes(this);
Expand All @@ -413,18 +414,18 @@ public OptimizingType getOptimizingType() {
}

@Override
public Status getStatus() {
public ProcessStatus getStatus() {
return status;
}

@Override
public void close() {
lock.lock();
try {
if (this.status != Status.RUNNING) {
if (this.status != ProcessStatus.RUNNING) {
return;
}
this.status = OptimizingProcess.Status.CLOSED;
this.status = ProcessStatus.CLOSED;
this.endTime = System.currentTimeMillis();
persistProcessCompleted(false);
clearProcess(this);
Expand Down Expand Up @@ -468,7 +469,7 @@ public void acceptResult(TaskRuntime taskRuntime) {
} else {
clearProcess(this);
this.failedReason = taskRuntime.getFailReason();
this.status = OptimizingProcess.Status.FAILED;
this.status = ProcessStatus.FAILED;
this.endTime = taskRuntime.getEndTime();
persistProcessCompleted(false);
}
Expand All @@ -481,15 +482,14 @@ public void acceptResult(TaskRuntime taskRuntime) {
// the cleanup of task should be done after unlock to avoid deadlock
@Override
public void releaseResourcesIfNecessary() {
if (this.status == OptimizingProcess.Status.FAILED
|| this.status == OptimizingProcess.Status.CLOSED) {
if (this.status == ProcessStatus.FAILED || this.status == ProcessStatus.CLOSED) {
cancelTasks();
}
}

@Override
public boolean isClosed() {
return status == OptimizingProcess.Status.CLOSED;
return status == ProcessStatus.CLOSED;
}

@Override
Expand Down Expand Up @@ -566,12 +566,12 @@ public void commit() {
try {
hasCommitted = true;
buildCommit().commit();
status = Status.SUCCESS;
status = ProcessStatus.SUCCESS;
endTime = System.currentTimeMillis();
persistProcessCompleted(true);
} catch (Exception e) {
LOG.error("{} Commit optimizing failed ", tableRuntime.getTableIdentifier(), e);
status = Status.FAILED;
status = ProcessStatus.FAILED;
failedReason = ExceptionUtil.getErrorMessage(e, 4000);
endTime = System.currentTimeMillis();
persistProcessCompleted(false);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -23,7 +23,7 @@
import com.github.pagehelper.PageInterceptor;
import com.github.pagehelper.dialect.helper.MySqlDialect;
import com.github.pagehelper.dialect.helper.PostgreSqlDialect;
import com.github.pagehelper.dialect.helper.SqlServerDialect;
import com.github.pagehelper.dialect.helper.SqlServer2012Dialect;
import org.apache.amoro.config.Configurations;
import org.apache.amoro.server.AmoroManagementConf;
import org.apache.amoro.server.persistence.mapper.ApiTokensMapper;
Expand Down Expand Up @@ -149,9 +149,9 @@ public void init(Configurations config) throws SQLException {
}

private void registerDialectAliases() {
registerDialectAlias("postgresql", PostgreSqlDialect.class);
registerDialectAlias("postgres", PostgreSqlDialect.class);
registerDialectAlias("mysql", MySqlDialect.class);
registerDialectAlias("derby", SqlServerDialect.class);
registerDialectAlias("derby", SqlServer2012Dialect.class);
}

/**
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -20,8 +20,8 @@

import org.apache.amoro.ServerTableIdentifier;
import org.apache.amoro.optimizing.RewriteFilesInput;
import org.apache.amoro.process.ProcessStatus;
import org.apache.amoro.server.optimizing.MetricsSummary;
import org.apache.amoro.server.optimizing.OptimizingProcess;
import org.apache.amoro.server.optimizing.OptimizingProcessMeta;
import org.apache.amoro.server.optimizing.OptimizingTaskMeta;
import org.apache.amoro.server.optimizing.OptimizingType;
Expand Down Expand Up @@ -69,7 +69,7 @@ void insertOptimizingProcess(
@Param("processId") long processId,
@Param("targetSnapshotId") long targetSnapshotId,
@Param("targetChangeSnapshotId") long targetChangeSnapshotId,
@Param("status") OptimizingProcess.Status status,
@Param("status") ProcessStatus status,
@Param("optimizingType") OptimizingType optimizingType,
@Param("planTime") long planTime,
@Param("summary") MetricsSummary summary,
Expand All @@ -85,19 +85,23 @@ void insertOptimizingProcess(
void updateOptimizingProcess(
@Param("tableId") long tableId,
@Param("processId") long processId,
@Param("optimizingStatus") OptimizingProcess.Status status,
@Param("optimizingStatus") ProcessStatus status,
@Param("endTime") long endTime,
@Param("summary") MetricsSummary summary,
@Param("failedReason") String failedReason);

@Select(
"SELECT a.process_id, a.table_id, a.catalog_name, a.db_name, a.table_name, a.target_snapshot_id,"
"<script>"
+ "SELECT a.process_id, a.table_id, a.catalog_name, a.db_name, a.table_name, a.target_snapshot_id,"
+ " a.target_change_snapshot_id, a.status, a.optimizing_type, a.plan_time, a.end_time,"
+ " a.fail_reason, a.summary, a.from_sequence, a.to_sequence FROM table_optimizing_process a"
+ " INNER JOIN table_identifier b ON a.table_id = b.table_id"
+ " WHERE a.catalog_name = #{catalogName} AND a.db_name = #{dbName} AND a.table_name = #{tableName}"
+ " AND b.catalog_name = #{catalogName} AND b.db_name = #{dbName} AND b.table_name = #{tableName}"
+ " ORDER BY process_id desc")
+ " <if test='optimizingType != null'> AND a.optimizing_type = #{optimizingType}</if>"
+ " <if test='optimizingStatus != null'> AND a.status = #{optimizingStatus}</if>"
+ " ORDER BY process_id desc"
+ "</script>")
@Results({
@Result(property = "processId", column = "process_id"),
@Result(property = "tableId", column = "table_id"),
Expand Down Expand Up @@ -125,6 +129,8 @@ List<OptimizingProcessMeta> selectOptimizingProcesses(
@Param("catalogName") String catalogName,
@Param("dbName") String dbName,
@Param("tableName") String tableName,
@Param("optimizingType") String optimizingType,
@Param("optimizingStatus") ProcessStatus optimizingStatus,
@Param("pageNum") int pageNum,
@Param("pageSize") int pageSize);

Expand Down
Loading

0 comments on commit d03ee6f

Please sign in to comment.