Skip to content

Commit

Permalink
Merge branch 'master' into action-api
Browse files Browse the repository at this point in the history
  • Loading branch information
zhoujinsong authored Oct 11, 2024
2 parents b77e5ce + 699afcf commit a849c4e
Show file tree
Hide file tree
Showing 10 changed files with 69 additions and 15 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -99,7 +99,11 @@ public void getOptimizerTables(Context ctx) {
.collect(Collectors.toList());

PageResult<TableOptimizingInfo> amsPageResult =
PageResult.of(tableRuntimes, offset, pageSize, OptimizingUtil::buildTableOptimizeInfo);
PageResult.of(
tableRuntimes.stream()
.map(OptimizingUtil::buildTableOptimizeInfo)
.collect(Collectors.toList()),
tableService.listRuntimes(dbFilterStr, tableFilterStr).size());
ctx.json(OkResponse.of(amsPageResult));
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -24,6 +24,7 @@
import org.apache.amoro.shade.guava32.com.google.common.base.MoreObjects;
import org.apache.amoro.shade.guava32.com.google.common.base.Preconditions;
import org.apache.amoro.shade.guava32.com.google.common.collect.Sets;
import org.apache.amoro.utils.TableFileUtil;
import org.apache.iceberg.ContentFile;
import org.apache.iceberg.DataFile;
import org.apache.iceberg.FileContent;
Expand Down Expand Up @@ -226,8 +227,12 @@ public boolean fileShouldRewrite(DataFile dataFile, List<ContentFile<?>> deletes

public boolean segmentShouldRewritePos(DataFile dataFile, List<ContentFile<?>> deletes) {
Preconditions.checkArgument(!isFragmentFile(dataFile), "Unsupported fragment file.");
if (deletes.stream().filter(delete -> delete.content() == FileContent.POSITION_DELETES).count()
>= 2) {
long posDeleteFileCount =
deletes.stream().filter(delete -> delete.content() == FileContent.POSITION_DELETES).count();
if (posDeleteFileCount == 1) {
return !TableFileUtil.isOptimizingPosDeleteFile(
dataFile.path().toString(), deletes.get(0).path().toString());
} else if (posDeleteFileCount > 1) {
combinePosSegmentFileCount++;
return true;
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -62,7 +62,9 @@
import javax.annotation.Nullable;

import java.util.ArrayList;
import java.util.Collections;
import java.util.Comparator;
import java.util.HashMap;
import java.util.HashSet;
import java.util.List;
import java.util.Map;
Expand Down Expand Up @@ -471,6 +473,33 @@ public TableRuntime getRuntime(Long tableId) {
return tableRuntimeMap.get(tableId);
}

public Map<Long, TableRuntime> listRuntimes(
@Nullable String dbFilter, @Nullable String tableFilter) {
checkStarted();
// no filter, will return all the table runtime.
if (dbFilter == null && tableFilter == null) {
return Collections.unmodifiableMap(tableRuntimeMap);
}

Map<Long, TableRuntime> filteredRuntimes = new HashMap<>();
for (Map.Entry<Long, TableRuntime> entry : tableRuntimeMap.entrySet()) {
ServerTableIdentifier identifier = entry.getValue().getTableIdentifier();
// skip the runtime which fails the db filter.
if (dbFilter != null && !identifier.getDatabase().contains(dbFilter)) {
continue;
}

// skip the runtime which fails the table filter.
if (tableFilter != null && !identifier.getTableName().contains(tableFilter)) {
continue;
}

filteredRuntimes.put(entry.getKey(), entry.getValue());
}

return filteredRuntimes;
}

@Override
public boolean contains(Long tableId) {
checkStarted();
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -21,6 +21,10 @@
import org.apache.amoro.AmoroTable;
import org.apache.amoro.ServerTableIdentifier;

import javax.annotation.Nullable;

import java.util.Map;

public interface TableManager extends TableRuntimeHandler {

/**
Expand All @@ -33,6 +37,9 @@ public interface TableManager extends TableRuntimeHandler {

TableRuntime getRuntime(Long tableId);

/** Return the table runtimes associated to the given filter. */
Map<Long, TableRuntime> listRuntimes(@Nullable String dbFilter, @Nullable String tableFilter);

default boolean contains(Long tableId) {
return getRuntime(tableId) != null;
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -18,16 +18,12 @@

package org.apache.amoro.process;

/**
* Status of any {@link AmoroProcess}. Only UNKNOWN, RUNNING, FINISHED(SUCCESS, CLOSED, FAILED) are
* necessary Stage classes are used to define multiple phases of one process such as OptimizingStage
*/
/** Status of any {@link AmoroProcess}. */
public enum ProcessStatus {
UNKNOWN,
PENDING,

/** This status containing scheduled and running phases */
ACTIVE,
RUNNING,
SUBMITTED,
SUCCESS,
CLOSED,
FAILED
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -31,7 +31,7 @@ public abstract class TableProcess<T extends TableProcessState> implements Amoro
protected final TableRuntime tableRuntime;
private final SimpleFuture submitFuture = new SimpleFuture();
private final SimpleFuture completeFuture = new SimpleFuture();
private volatile ProcessStatus status = ProcessStatus.ACTIVE;
private volatile ProcessStatus status = ProcessStatus.RUNNING;
private volatile String failedReason;

protected TableProcess(T state, TableRuntime tableRuntime) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -32,7 +32,7 @@ public class TableProcessState implements ProcessState {
private final ServerTableIdentifier tableIdentifier;
@StateField private long startTime;
@StateField private long endTime = -1L;
@StateField private ProcessStatus status = ProcessStatus.ACTIVE;
@StateField private ProcessStatus status = ProcessStatus.SUBMITTED;
@StateField private volatile String failedReason;
private volatile Map<String, String> summary;

Expand Down Expand Up @@ -108,7 +108,7 @@ protected void setStatus(ProcessStatus status) {
|| status == ProcessStatus.FAILED
|| status == ProcessStatus.CLOSED) {
endTime = System.currentTimeMillis();
} else if (this.status != ProcessStatus.ACTIVE && status == ProcessStatus.ACTIVE) {
} else if (this.status != ProcessStatus.SUBMITTED && status == ProcessStatus.SUBMITTED) {
endTime = -1L;
failedReason = null;
summary = null;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -447,7 +447,7 @@ protected OptimizingProcessInfo getOptimizingInfo(
HoodieInstant inf =
instantMap.get(instantTimestamp + "_" + HoodieInstant.State.INFLIGHT.name());
if (inf != null) {
processInfo.setStatus(ProcessStatus.ACTIVE);
processInfo.setStatus(ProcessStatus.RUNNING);
}
}
return processInfo;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -157,7 +157,10 @@ private void flushDeletes() {
String fileDir = TableFileUtil.getFileDir(filePath.get().toString());
String deleteFilePath =
format.addExtension(
String.format("%s/%s-delete-%s", fileDir, fileName, fileNameSuffix));
String.format(
"%s/%s",
fileDir,
TableFileUtil.optimizingPosDeleteFileName(fileName, fileNameSuffix)));
EncryptedOutputFile outputFile =
encryptionManager.encrypt(fileIO.newOutputFile(deleteFilePath));

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -33,6 +33,7 @@

public class TableFileUtil {
private static final Logger LOG = LoggerFactory.getLogger(TableFileUtil.class);
private static final String POS_DELETE_FILE_IDENTIFIER = "delete";

/**
* Parse file name form file path
Expand Down Expand Up @@ -192,4 +193,13 @@ public static String getParent(String path) {
Path p = new Path(path);
return p.getParent().toString();
}

public static String optimizingPosDeleteFileName(String dataFileName, String suffix) {
return String.format("%s-%s-%s", dataFileName, POS_DELETE_FILE_IDENTIFIER, suffix);
}

public static boolean isOptimizingPosDeleteFile(String dataFilePath, String posDeleteFilePath) {
return getFileName(posDeleteFilePath)
.startsWith(String.format("%s-%s", getFileName(dataFilePath), POS_DELETE_FILE_IDENTIFIER));
}
}

0 comments on commit a849c4e

Please sign in to comment.