Skip to content

Commit

Permalink
[Improvement] Support filter by status and type when show table optim…
Browse files Browse the repository at this point in the history
…izing process (#3230)

* [AMORO][WEB] Support filter by status and type when show table optimizing process

(cherry picked from commit 80ad146b9007115a40b1ab6c4dee9c492b02d71c)

* add mock rest api call

---------

Co-authored-by: zhangyongxiang.alpha <[email protected]>
  • Loading branch information
baiyangtx and zhangyongxiang.alpha authored Sep 29, 2024
1 parent 222d816 commit 967eea2
Show file tree
Hide file tree
Showing 11 changed files with 175 additions and 25 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -221,6 +221,9 @@ private EndpointGroup apiGroup() {
get(
"/catalogs/{catalog}/dbs/{db}/tables/{table}/optimizing-processes",
tableController::getOptimizingProcesses);
get(
"/catalogs/{catalog}/dbs/{db}/tables/{table}/optimizing-types",
tableController::getOptimizingTypes);
get(
"/catalogs/{catalog}/dbs/{db}/tables/{table}/optimizing-processes/{processId}/tasks",
tableController::getOptimizingProcessTasks);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -33,6 +33,7 @@
import org.apache.amoro.server.optimizing.MetricsSummary;
import org.apache.amoro.server.optimizing.OptimizingProcessMeta;
import org.apache.amoro.server.optimizing.OptimizingTaskMeta;
import org.apache.amoro.server.optimizing.OptimizingType;
import org.apache.amoro.server.optimizing.TaskRuntime;
import org.apache.amoro.server.persistence.PersistentBase;
import org.apache.amoro.server.persistence.mapper.OptimizingMapper;
Expand Down Expand Up @@ -64,6 +65,7 @@
import org.apache.amoro.utils.MixedDataFiles;
import org.apache.amoro.utils.MixedTableUtil;
import org.apache.commons.collections.CollectionUtils;
import org.apache.commons.lang3.StringUtils;
import org.apache.commons.lang3.tuple.Pair;
import org.apache.iceberg.ContentFile;
import org.apache.iceberg.HasTableOperations;
Expand Down Expand Up @@ -501,7 +503,7 @@ public List<ConsumerInfo> getTableConsumerInfos(AmoroTable<?> amoroTable) {

@Override
public Pair<List<OptimizingProcessInfo>, Integer> getOptimizingProcessesInfo(
AmoroTable<?> amoroTable, int limit, int offset) {
AmoroTable<?> amoroTable, String type, ProcessStatus status, int limit, int offset) {
TableIdentifier tableIdentifier = amoroTable.id();
List<OptimizingProcessMeta> processMetaList =
getAs(
Expand All @@ -511,6 +513,16 @@ public Pair<List<OptimizingProcessInfo>, Integer> getOptimizingProcessesInfo(
tableIdentifier.getCatalog(),
tableIdentifier.getDatabase(),
tableIdentifier.getTableName()));

processMetaList =
processMetaList.stream()
.filter(
p ->
StringUtils.isBlank(type)
|| type.equalsIgnoreCase(p.getOptimizingType().getStatus().displayValue()))
.filter(p -> status == null || status.name().equalsIgnoreCase(p.getStatus().name()))
.collect(Collectors.toList());

int total = processMetaList.size();
processMetaList =
processMetaList.stream().skip(offset).limit(limit).collect(Collectors.toList());
Expand All @@ -532,6 +544,15 @@ public Pair<List<OptimizingProcessInfo>, Integer> getOptimizingProcessesInfo(
total);
}

@Override
public Map<String, String> getTableOptimizingTypes(AmoroTable<?> amoroTable) {
Map<String, String> types = Maps.newHashMap();
for (OptimizingType type : OptimizingType.values()) {
types.put(type.name(), type.getStatus().displayValue());
}
return types;
}

@Override
public List<OptimizingTaskInfo> getOptimizingTaskInfos(
AmoroTable<?> amoroTable, String processId) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -22,6 +22,7 @@
import org.apache.amoro.TableFormat;
import org.apache.amoro.api.TableIdentifier;
import org.apache.amoro.config.Configurations;
import org.apache.amoro.process.ProcessStatus;
import org.apache.amoro.server.catalog.ServerCatalog;
import org.apache.amoro.server.persistence.PersistentBase;
import org.apache.amoro.server.table.TableService;
Expand Down Expand Up @@ -124,10 +125,11 @@ public List<ConsumerInfo> getTableConsumersInfos(TableIdentifier tableIdentifier
}

public Pair<List<OptimizingProcessInfo>, Integer> getOptimizingProcessesInfo(
TableIdentifier tableIdentifier, int limit, int offset) {
TableIdentifier tableIdentifier, String type, ProcessStatus status, int limit, int offset) {
AmoroTable<?> amoroTable = loadTable(tableIdentifier);
FormatTableDescriptor formatTableDescriptor = formatDescriptorMap.get(amoroTable.format());
return formatTableDescriptor.getOptimizingProcessesInfo(amoroTable, limit, offset);
return formatTableDescriptor.getOptimizingProcessesInfo(
amoroTable, type, status, limit, offset);
}

public List<OptimizingTaskInfo> getOptimizingProcessTaskInfos(
Expand All @@ -137,6 +139,12 @@ public List<OptimizingTaskInfo> getOptimizingProcessTaskInfos(
return formatTableDescriptor.getOptimizingTaskInfos(amoroTable, processId);
}

public Map<String, String> getTableOptimizingTypes(TableIdentifier tableIdentifier) {
AmoroTable<?> amoroTable = loadTable(tableIdentifier);
FormatTableDescriptor formatTableDescriptor = formatDescriptorMap.get(amoroTable.format());
return formatTableDescriptor.getTableOptimizingTypes(amoroTable);
}

private AmoroTable<?> loadTable(TableIdentifier identifier) {
ServerCatalog catalog = tableService.getServerCatalog(identifier.getCatalog());
return catalog.loadTable(identifier.getDatabase(), identifier.getTableName());
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -32,6 +32,7 @@
import org.apache.amoro.hive.utils.HiveTableUtil;
import org.apache.amoro.hive.utils.UpgradeHiveTableUtil;
import org.apache.amoro.mixed.CatalogLoader;
import org.apache.amoro.process.ProcessStatus;
import org.apache.amoro.properties.CatalogMetaProperties;
import org.apache.amoro.properties.HiveTableProperties;
import org.apache.amoro.server.catalog.ServerCatalog;
Expand Down Expand Up @@ -308,6 +309,8 @@ public void getOptimizingProcesses(Context ctx) {
String catalog = ctx.pathParam("catalog");
String db = ctx.pathParam("db");
String table = ctx.pathParam("table");
String type = ctx.queryParam("type");
String status = ctx.queryParam("status");
Integer page = ctx.queryParamAsClass("page", Integer.class).getOrDefault(1);
Integer pageSize = ctx.queryParamAsClass("pageSize", Integer.class).getOrDefault(20);

Expand All @@ -319,15 +322,30 @@ public void getOptimizingProcesses(Context ctx) {
Preconditions.checkState(serverCatalog.tableExists(db, table), "no such table");

TableIdentifier tableIdentifier = TableIdentifier.of(catalog, db, table);
ProcessStatus processStatus =
StringUtils.isBlank(status) ? null : ProcessStatus.valueOf(status);
Pair<List<OptimizingProcessInfo>, Integer> optimizingProcessesInfo =
tableDescriptor.getOptimizingProcessesInfo(
tableIdentifier.buildTableIdentifier(), limit, offset);
tableIdentifier.buildTableIdentifier(), type, processStatus, limit, offset);
List<OptimizingProcessInfo> result = optimizingProcessesInfo.getLeft();
int total = optimizingProcessesInfo.getRight();

ctx.json(OkResponse.of(PageResult.of(result, total)));
}

public void getOptimizingTypes(Context ctx) {
String catalog = ctx.pathParam("catalog");
String db = ctx.pathParam("db");
String table = ctx.pathParam("table");
TableIdentifier tableIdentifier = TableIdentifier.of(catalog, db, table);
ServerCatalog serverCatalog = tableService.getServerCatalog(catalog);
Preconditions.checkState(serverCatalog.tableExists(db, table), "no such table");

Map<String, String> values =
tableDescriptor.getTableOptimizingTypes(tableIdentifier.buildTableIdentifier());
ctx.json(OkResponse.of(values));
}

/**
* Get tasks of optimizing process.
*
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -24,6 +24,7 @@
*/
public enum ProcessStatus {
UNKNOWN,
PENDING,

/** This status containing scheduled and running phases */
ACTIVE,
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -20,9 +20,11 @@

import org.apache.amoro.AmoroTable;
import org.apache.amoro.TableFormat;
import org.apache.amoro.process.ProcessStatus;
import org.apache.commons.lang3.tuple.Pair;

import java.util.List;
import java.util.Map;
import java.util.concurrent.ExecutorService;

/** API for obtaining metadata information of various formats. */
Expand Down Expand Up @@ -60,7 +62,10 @@ List<PartitionFileBaseInfo> getTableFiles(

/** Get the paged optimizing process information of the {@link AmoroTable} and total size. */
Pair<List<OptimizingProcessInfo>, Integer> getOptimizingProcessesInfo(
AmoroTable<?> amoroTable, int limit, int offset);
AmoroTable<?> amoroTable, String type, ProcessStatus status, int limit, int offset);

/** Return the optimizing types of the {@link AmoroTable} is supported. */
Map<String, String> getTableOptimizingTypes(AmoroTable<?> amoroTable);

/** Get the paged optimizing process tasks information of the {@link AmoroTable}. */
List<OptimizingTaskInfo> getOptimizingTaskInfos(AmoroTable<?> amoroTable, String processId);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -93,6 +93,8 @@
public class HudiTableDescriptor implements FormatTableDescriptor {

private static final Logger LOG = LoggerFactory.getLogger(HudiTableDescriptor.class);
private static final String COMPACTION = "compaction";
private static final String CLUSTERING = "clustering";

private ExecutorService ioExecutors;

Expand Down Expand Up @@ -328,7 +330,7 @@ private Stream<PartitionFileBaseInfo> fileSliceToFileStream(String partition, Fi

@Override
public Pair<List<OptimizingProcessInfo>, Integer> getOptimizingProcessesInfo(
AmoroTable<?> amoroTable, int limit, int offset) {
AmoroTable<?> amoroTable, String type, ProcessStatus status, int limit, int offset) {
HoodieJavaTable hoodieTable = (HoodieJavaTable) amoroTable.originalTable();
HoodieDefaultTimeline timeline = new HoodieActiveTimeline(hoodieTable.getMetaClient(), false);
List<HoodieInstant> instants = timeline.getInstants();
Expand Down Expand Up @@ -369,7 +371,24 @@ public Pair<List<OptimizingProcessInfo>, Integer> getOptimizingProcessesInfo(
})
.filter(Objects::nonNull)
.collect(Collectors.toList());
return Pair.of(infos, infos.size());
infos =
infos.stream()
.filter(
i ->
StringUtils.isNullOrEmpty(type) || type.equalsIgnoreCase(i.getOptimizingType()))
.filter(i -> status == null || status == i.getStatus())
.collect(Collectors.toList());
int total = infos.size();
infos = infos.stream().skip(offset).limit(limit).collect(Collectors.toList());
return Pair.of(infos, total);
}

@Override
public Map<String, String> getTableOptimizingTypes(AmoroTable<?> amoroTable) {
Map<String, String> types = Maps.newHashMap();
types.put(COMPACTION, COMPACTION);
types.put(CLUSTERING, CLUSTERING);
return types;
}

protected OptimizingProcessInfo getOptimizingInfo(
Expand Down Expand Up @@ -455,7 +474,7 @@ private void fillCompactProcessInfo(OptimizingProcessInfo processInfo, byte[] re
processInfo.getSummary().put("strategy", strategy.getCompactorClassName());
processInfo.getSummary().putAll(strategy.getStrategyParams());
}
processInfo.setOptimizingType("Compact");
processInfo.setOptimizingType(COMPACTION);
}

private OptimizingProcessInfo fillClusterProcessInfo(
Expand All @@ -481,7 +500,7 @@ private OptimizingProcessInfo fillClusterProcessInfo(
processInfo.setInputFiles(FilesStatistics.build(inputFileCount, inputFileSize));
int tasks = plan.getInputGroups().size();
processInfo.setTotalTasks(tasks);
processInfo.setOptimizingType("Cluster");
processInfo.setOptimizingType(CLUSTERING);

HoodieClusteringStrategy strategy = plan.getStrategy();
if (strategy != null) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -26,6 +26,7 @@
import org.apache.amoro.process.ProcessStatus;
import org.apache.amoro.shade.guava32.com.google.common.collect.ImmutableList;
import org.apache.amoro.shade.guava32.com.google.common.collect.Lists;
import org.apache.amoro.shade.guava32.com.google.common.collect.Maps;
import org.apache.amoro.shade.guava32.com.google.common.collect.Streams;
import org.apache.amoro.table.TableIdentifier;
import org.apache.amoro.table.descriptor.AMSColumnInfo;
Expand All @@ -45,6 +46,7 @@
import org.apache.amoro.table.descriptor.TableSummary;
import org.apache.amoro.table.descriptor.TagOrBranchInfo;
import org.apache.amoro.utils.CommonUtil;
import org.apache.commons.lang3.StringUtils;
import org.apache.commons.lang3.tuple.Pair;
import org.apache.paimon.CoreOptions;
import org.apache.paimon.FileStore;
Expand Down Expand Up @@ -359,26 +361,22 @@ public List<PartitionFileBaseInfo> getTableFiles(

@Override
public Pair<List<OptimizingProcessInfo>, Integer> getOptimizingProcessesInfo(
AmoroTable<?> amoroTable, int limit, int offset) {
AmoroTable<?> amoroTable, String type, ProcessStatus status, int limit, int offset) {
// Temporary solution for Paimon. TODO: Get compaction info from Paimon compaction task
List<OptimizingProcessInfo> processInfoList = new ArrayList<>();
TableIdentifier tableIdentifier = amoroTable.id();
FileStoreTable fileStoreTable = (FileStoreTable) amoroTable.originalTable();
FileStore<?> store = fileStoreTable.store();
boolean isPrimaryTable = !fileStoreTable.primaryKeys().isEmpty();
int maxLevel = CoreOptions.fromMap(fileStoreTable.options()).numLevels() - 1;
int total;
try {
List<Snapshot> compactSnapshots =
Streams.stream(store.snapshotManager().snapshots())
.filter(s -> s.commitKind() == Snapshot.CommitKind.COMPACT)
.collect(Collectors.toList());
total = compactSnapshots.size();
processInfoList =
compactSnapshots.stream()
.sorted(Comparator.comparing(Snapshot::id).reversed())
.skip(offset)
.limit(limit)
.map(
s -> {
OptimizingProcessInfo optimizingProcessInfo = new OptimizingProcessInfo();
Expand Down Expand Up @@ -438,9 +436,25 @@ public Pair<List<OptimizingProcessInfo>, Integer> getOptimizingProcessesInfo(
} catch (IOException e) {
throw new RuntimeException(e);
}
processInfoList =
processInfoList.stream()
.filter(p -> StringUtils.isBlank(type) || type.equalsIgnoreCase(p.getOptimizingType()))
.filter(p -> status == null || status == p.getStatus())
.collect(Collectors.toList());
int total = processInfoList.size();
processInfoList =
processInfoList.stream().skip(offset).limit(limit).collect(Collectors.toList());
return Pair.of(processInfoList, total);
}

@Override
public Map<String, String> getTableOptimizingTypes(AmoroTable<?> amoroTable) {
Map<String, String> types = Maps.newHashMap();
types.put("FULL", "full");
types.put("MINOR", "MINOR");
return types;
}

@Override
public List<OptimizingTaskInfo> getOptimizingTaskInfos(
AmoroTable<?> amoroTable, String processId) {
Expand Down
13 changes: 13 additions & 0 deletions amoro-web/mock/modules/table.js
Original file line number Diff line number Diff line change
Expand Up @@ -306,6 +306,19 @@ export default [
}
}),
},
{
url: '/mock/ams/v1/tables/catalogs/test_catalog/dbs/db/tables/user/optimizing-types',
method: 'get',
response: () => ({
"message": "success",
"code": 200,
"result": {
"MINOR": "minor",
"MAJOR": "major",
"FULL": "full",
}
}),
},
{
url: '/mock/ams/v1/tables/catalogs/test_catalog/dbs/db/tables/user/operations',
method: 'get',
Expand Down
19 changes: 17 additions & 2 deletions amoro-web/src/services/table.service.ts
Original file line number Diff line number Diff line change
Expand Up @@ -156,13 +156,28 @@ export function getOptimizingProcesses(
catalog: string
db: string
table: string
type: string
status: string
page: number
pageSize: number
token?: string
},
) {
const { catalog, db, table, page, pageSize, token } = params
return request.get(`ams/v1/tables/catalogs/${catalog}/dbs/${db}/tables/${table}/optimizing-processes`, { params: { page, pageSize, token } })
const { catalog, db, table, type, status, page, pageSize, token } = params
return request.get(`ams/v1/tables/catalogs/${catalog}/dbs/${db}/tables/${table}/optimizing-processes`, { params: { page, pageSize, token, type, status } })
}

// get optimizing process types
export function getTableOptimizingTypes(
params: {
catalog: string
db: string
table: string
token?: string
},
) {
const { catalog, db, table, token } = params
return request.get(`ams/v1/tables/catalogs/${catalog}/dbs/${db}/tables/${table}/optimizing-types`, { params: { token } })
}

// get optimizing taskes
Expand Down
Loading

0 comments on commit 967eea2

Please sign in to comment.