Skip to content

Commit

Permalink
[Feature]Support iceberg Remove orphan files
Browse files Browse the repository at this point in the history
Signed-off-by: Max-Cheng <[email protected]>
  • Loading branch information
Max-Cheng committed Feb 27, 2025
1 parent 48cc6fc commit fe814bc
Show file tree
Hide file tree
Showing 4 changed files with 183 additions and 3 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -21,6 +21,7 @@
import com.starrocks.common.util.TimeUtils;
import com.starrocks.connector.BranchOptions;
import com.starrocks.connector.ConnectorAlterTableExecutor;
import com.starrocks.connector.HdfsEnvironment;
import com.starrocks.connector.TagOptions;
import com.starrocks.connector.exception.StarRocksConnectorException;
import com.starrocks.qe.ConnectContext;
Expand All @@ -41,23 +42,38 @@
import com.starrocks.sql.ast.ModifyTablePropertiesClause;
import com.starrocks.sql.ast.TableRenameClause;
import com.starrocks.sql.optimizer.operator.scalar.ConstantOperator;
import org.apache.hadoop.fs.FileStatus;
import org.apache.hadoop.fs.FileSystem;
import org.apache.hadoop.fs.LocatedFileStatus;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.fs.RemoteIterator;
import org.apache.iceberg.ContentFile;
import org.apache.iceberg.ExpireSnapshots;
import org.apache.iceberg.FileFormat;
import org.apache.iceberg.ManageSnapshots;
import org.apache.iceberg.ManifestFile;
import org.apache.iceberg.ManifestFiles;
import org.apache.iceberg.ManifestReader;
import org.apache.iceberg.Snapshot;
import org.apache.iceberg.SnapshotRef;
import org.apache.iceberg.Table;
import org.apache.iceberg.TableProperties;
import org.apache.iceberg.Transaction;
import org.apache.iceberg.UpdateProperties;
import org.apache.iceberg.UpdateSchema;
import org.apache.iceberg.relocated.com.google.common.base.Preconditions;

import java.io.IOException;
import java.net.URI;
import java.time.Duration;
import java.time.LocalDateTime;
import java.util.ArrayList;
import java.util.HashSet;
import java.util.List;
import java.util.Locale;
import java.util.Map;
import java.util.Optional;
import java.util.Set;
import java.util.stream.Collectors;

import static com.starrocks.analysis.OutFileClause.PARQUET_COMPRESSION_TYPE_MAP;
Expand All @@ -66,17 +82,25 @@
import static com.starrocks.connector.iceberg.IcebergMetadata.COMPRESSION_CODEC;
import static com.starrocks.connector.iceberg.IcebergMetadata.FILE_FORMAT;
import static com.starrocks.connector.iceberg.IcebergMetadata.LOCATION_PROPERTY;
import static com.starrocks.connector.iceberg.IcebergUtil.fileName;
import static com.starrocks.sql.common.UnsupportedException.unsupportedException;
import static org.apache.iceberg.ReachableFileUtil.metadataFileLocations;
import static org.apache.iceberg.ReachableFileUtil.statisticsFilesLocations;

public class IcebergAlterTableExecutor extends ConnectorAlterTableExecutor {
private org.apache.iceberg.Table table;
private Table table;
private IcebergCatalog icebergCatalog;
private Transaction transaction;
private HdfsEnvironment hdfsEnvironment;

public IcebergAlterTableExecutor(AlterTableStmt stmt, org.apache.iceberg.Table table, IcebergCatalog icebergCatalog) {
public IcebergAlterTableExecutor(AlterTableStmt stmt,
Table table,
IcebergCatalog icebergCatalog,
HdfsEnvironment hdfsEnvironment) {
super(stmt);
this.table = table;
this.icebergCatalog = icebergCatalog;
this.hdfsEnvironment = hdfsEnvironment;
}

@Override
Expand Down Expand Up @@ -430,6 +454,9 @@ public Void visitAlterTableOperationClause(AlterTableOperationClause clause, Con
case EXPIRE_SNAPSHOTS:
expireSnapshots(args);
break;
case REMOVE_ORPHAN_FILES:
removeOrphanFiles(args);
break;
default:
throw new StarRocksConnectorException("Unsupported table operation %s", op);
}
Expand Down Expand Up @@ -495,4 +522,107 @@ private void expireSnapshots(List<ConstantOperator> args) {
expireSnapshots.commit();
});
}

private void removeOrphanFiles(List<ConstantOperator> args) {
if (args.size() > 1) {
throw new StarRocksConnectorException("invalid args. only support " +
"`older_than` in the remove orphan files operation");
}

long olderThanMillis;
if (args.isEmpty()) {
olderThanMillis = -1L;
} else {
LocalDateTime time = Optional.ofNullable(args.get(0))
.flatMap(arg -> arg.castTo(Type.DATETIME)
.map(ConstantOperator::getDatetime))
.orElseThrow(() -> new StarRocksConnectorException("invalid arg %s", args.get(0)));
olderThanMillis = Duration.ofSeconds(time.atZone(TimeUtils.getTimeZone().toZoneId()).toEpochSecond()).toMillis();
}

if (table.currentSnapshot() == null) {
return;
}

Set<String> processedManifestFilePaths = new HashSet<>();
Set<String> validFileNames = new HashSet<>();

for (Snapshot snapshot : table.snapshots()) {
if (snapshot.manifestListLocation() != null) {
validFileNames.add(fileName(snapshot.manifestListLocation()));
}

for (ManifestFile manifest : snapshot.allManifests(table.io())) {
if (!processedManifestFilePaths.add(manifest.path())) {
continue;
}

validFileNames.add(fileName(manifest.path()));
try (ManifestReader<? extends ContentFile<?>> manifestReader = readerForManifest(table, manifest)) {
for (ContentFile<?> contentFile : manifestReader) {
validFileNames.add(fileName(contentFile.location()));
}
} catch (IOException e) {
throw new StarRocksConnectorException("Unable to list manifest file content from " + manifest.path(), e);
}
}
}

metadataFileLocations(table, false).stream()
.map(IcebergUtil::fileName)
.forEach(validFileNames::add);

statisticsFilesLocations(table).stream()
.map(IcebergUtil::fileName)
.forEach(validFileNames::add);

validFileNames.add("version-hint.text");

actions.add(() -> scanAndDeleteInvalidFiles(table.location(), olderThanMillis, validFileNames));
}

private static ManifestReader<? extends ContentFile<?>> readerForManifest(Table table, ManifestFile manifest) {
return switch (manifest.content()) {
case DATA -> ManifestFiles.read(manifest, table.io());
case DELETES -> ManifestFiles.readDeleteManifest(manifest, table.io(), table.specs());
};
}

private void scanAndDeleteInvalidFiles(String tableLocation, long expiration, Set<String> validFiles) {
try {
URI uri = new Path(tableLocation).toUri();
FileSystem fileSystem = FileSystem.get(uri, hdfsEnvironment.getConfiguration());
RemoteIterator<LocatedFileStatus> allFiles = fileSystem.listFiles(new Path(tableLocation), true);
List<Path> filesToDelete = new ArrayList<>();
while (allFiles.hasNext()) {
LocatedFileStatus entry = allFiles.next();
FileStatus status = fileSystem.getFileStatus(entry.getPath());
if (status.getModificationTime() < expiration && !validFiles.contains(entry.getPath().getName())) {
filesToDelete.add(entry.getPath());
if (filesToDelete.size() >= 5000) {
filesToDelete.forEach(path -> {
try {
fileSystem.delete(path, false);
} catch (IOException e) {
throw new RuntimeException(e);
}
});
filesToDelete.clear();
}
}
}
if (!filesToDelete.isEmpty()) {
filesToDelete.forEach(path -> {
try {
fileSystem.delete(path, false);
} catch (IOException e) {
throw new RuntimeException(e);
}
});
filesToDelete.clear();
}
} catch (IOException e) {
throw new StarRocksConnectorException("Failed accessing data: ", e);
}
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -343,7 +343,7 @@ public void alterTable(ConnectContext context, AlterTableStmt stmt) throws StarR
"Failed to load iceberg table: " + stmt.getTbl().toString());
}

IcebergAlterTableExecutor executor = new IcebergAlterTableExecutor(stmt, table, icebergCatalog);
IcebergAlterTableExecutor executor = new IcebergAlterTableExecutor(stmt, table, icebergCatalog, hdfsEnvironment);
executor.execute();

synchronized (this) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -18,6 +18,7 @@ public enum IcebergTableOperation {
EXPIRE_SNAPSHOTS,
FAST_FORWARD,
CHERRYPICK_SNAPSHOT,
REMOVE_ORPHAN_FILES,
UNKNOWN;

public static IcebergTableOperation fromString(String opStr) {
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,49 @@
// Copyright 2021-present StarRocks, Inc. All rights reserved.
//
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
// You may obtain a copy of the License at
//
// https://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
// See the License for the specific language governing permissions and
// limitations under the License.

package com.starrocks.connector.iceberg;

import org.apache.iceberg.types.Type.PrimitiveType;
import org.apache.iceberg.types.Types.NestedField;

import java.util.List;
import java.util.Map;
import java.util.Map.Entry;
import java.util.stream.Stream;

public final class IcebergUtil {
private IcebergUtil() {}

private static Stream<Entry<Integer, PrimitiveType>> primitiveFieldTypes(List<NestedField> nestedFields) {
return nestedFields.stream()
.flatMap(IcebergUtil::primitiveFieldTypes);
}

private static Stream<Entry<Integer, PrimitiveType>> primitiveFieldTypes(NestedField nestedField) {
org.apache.iceberg.types.Type fieldType = nestedField.type();
if (fieldType.isPrimitiveType()) {
return Stream.of(Map.entry(nestedField.fieldId(), fieldType.asPrimitiveType()));
}

if (fieldType.isNestedType()) {
return primitiveFieldTypes(fieldType.asNestedType().fields());
}

throw new IllegalStateException("Unsupported field type: " + nestedField);
}

public static String fileName(String path) {
return path.substring(path.lastIndexOf('/') + 1);
}
}

0 comments on commit fe814bc

Please sign in to comment.