Skip to content

Commit

Permalink
Spark 3.5: Procedure to rewrite table path (#11931)
Browse files Browse the repository at this point in the history
  • Loading branch information
dramaticlly authored Jan 24, 2025
1 parent 681ff57 commit 72a165a
Show file tree
Hide file tree
Showing 4 changed files with 309 additions and 2 deletions.
Original file line number Diff line number Diff line change
@@ -0,0 +1,174 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing,
* software distributed under the License is distributed on an
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
* KIND, either express or implied. See the License for the
* specific language governing permissions and limitations
* under the License.
*/
package org.apache.iceberg.spark.extensions;

import static org.assertj.core.api.Assertions.assertThat;
import static org.assertj.core.api.Assertions.assertThatThrownBy;

import java.nio.file.Path;
import java.util.List;
import org.apache.iceberg.HasTableOperations;
import org.apache.iceberg.RewriteTablePathUtil;
import org.apache.iceberg.Table;
import org.apache.spark.sql.AnalysisException;
import org.junit.jupiter.api.AfterEach;
import org.junit.jupiter.api.BeforeEach;
import org.junit.jupiter.api.TestTemplate;
import org.junit.jupiter.api.io.TempDir;

public class TestRewriteTablePathProcedure extends ExtensionsTestBase {
@TempDir private Path staging;
@TempDir private Path targetTableDir;

@BeforeEach
public void setupTableLocation() {
sql("CREATE TABLE %s (id bigint NOT NULL, data string) USING iceberg", tableName);
}

@AfterEach
public void removeTables() {
sql("DROP TABLE IF EXISTS %s", tableName);
}

@TestTemplate
public void testRewriteTablePathWithPositionalArgument() {
String location = targetTableDir.toFile().toURI().toString();
Table table = validationCatalog.loadTable(tableIdent);
String metadataJson =
(((HasTableOperations) table).operations()).current().metadataFileLocation();

List<Object[]> result =
sql(
"CALL %s.system.rewrite_table_path('%s', '%s', '%s')",
catalogName, tableIdent, table.location(), location);
assertThat(result).hasSize(1);
assertThat(result.get(0)[0])
.as("Should return correct latest version")
.isEqualTo(RewriteTablePathUtil.fileName(metadataJson));
assertThat(result.get(0)[1])
.as("Should return file_list_location")
.asString()
.startsWith(table.location())
.endsWith("file-list");
checkFileListLocationCount((String) result.get(0)[1], 1);
}

@TestTemplate
public void testRewriteTablePathWithNamedArgument() {
Table table = validationCatalog.loadTable(tableIdent);
String v0Metadata =
RewriteTablePathUtil.fileName(
(((HasTableOperations) table).operations()).current().metadataFileLocation());
sql("INSERT INTO TABLE %s VALUES (1, 'a')", tableName);
String v1Metadata =
RewriteTablePathUtil.fileName(
(((HasTableOperations) table).operations()).refresh().metadataFileLocation());

String targetLocation = targetTableDir.toFile().toURI().toString();
String stagingLocation = staging.toFile().toURI().toString();
String expectedFileListLocation = stagingLocation + "file-list";

List<Object[]> result =
sql(
"CALL %s.system.rewrite_table_path("
+ "table => '%s', "
+ "target_prefix => '%s', "
+ "source_prefix => '%s', "
+ "end_version => '%s', "
+ "start_version => '%s', "
+ "staging_location => '%s')",
catalogName,
tableIdent,
targetLocation,
table.location(),
v1Metadata,
v0Metadata,
stagingLocation);
assertThat(result).hasSize(1);
assertThat(result.get(0)[0]).as("Should return correct latest version").isEqualTo(v1Metadata);
assertThat(result.get(0)[1])
.as("Should return correct file_list_location")
.isEqualTo(expectedFileListLocation);
checkFileListLocationCount((String) result.get(0)[1], 4);
}

@TestTemplate
public void testProcedureWithInvalidInput() {
String targetLocation = targetTableDir.toFile().toURI().toString();

assertThatThrownBy(
() -> sql("CALL %s.system.rewrite_table_path('%s')", catalogName, tableIdent))
.isInstanceOf(AnalysisException.class)
.hasMessageContaining("Missing required parameters: [source_prefix,target_prefix]");
assertThatThrownBy(
() ->
sql(
"CALL %s.system.rewrite_table_path('%s','%s')",
catalogName, tableIdent, targetLocation))
.isInstanceOf(AnalysisException.class)
.hasMessageContaining("Missing required parameters: [target_prefix]");
assertThatThrownBy(
() ->
sql(
"CALL %s.system.rewrite_table_path('%s', '%s','%s')",
catalogName, "notExists", targetLocation, targetLocation))
.isInstanceOf(RuntimeException.class)
.hasMessageContaining("Couldn't load table");

Table table = validationCatalog.loadTable(tableIdent);
String v0Metadata =
RewriteTablePathUtil.fileName(
(((HasTableOperations) table).operations()).current().metadataFileLocation());
assertThatThrownBy(
() ->
sql(
"CALL %s.system.rewrite_table_path("
+ "table => '%s', "
+ "source_prefix => '%s', "
+ "target_prefix => '%s', "
+ "start_version => '%s')",
catalogName, tableIdent, table.location(), targetLocation, "v20.metadata.json"))
.isInstanceOf(IllegalArgumentException.class)
.hasMessageContaining(
"Cannot find provided version file %s in metadata log.", "v20.metadata.json");
assertThatThrownBy(
() ->
sql(
"CALL %s.system.rewrite_table_path("
+ "table => '%s', "
+ "source_prefix => '%s', "
+ "target_prefix => '%s', "
+ "start_version => '%s',"
+ "end_version => '%s')",
catalogName,
tableIdent,
table.location(),
targetLocation,
v0Metadata,
"v11.metadata.json"))
.isInstanceOf(IllegalArgumentException.class)
.hasMessageContaining(
"Cannot find provided version file %s in metadata log.", "v11.metadata.json");
}

private void checkFileListLocationCount(String fileListLocation, long expectedFileCount) {
long fileCount = spark.read().format("text").load(fileListLocation).count();
assertThat(fileCount).isEqualTo(expectedFileCount);
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -219,8 +219,10 @@ private String validateVersion(TableMetadata tableMetadata, String versionFileNa
}
}

Preconditions.checkNotNull(
versionFile, "Version file %s does not exist in metadata log.", versionFile);
Preconditions.checkArgument(
versionFile != null,
"Cannot find provided version file %s in metadata log.",
versionFileName);
Preconditions.checkArgument(
fileExist(versionFile), "Version file %s does not exist.", versionFile);
return versionFile;
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,130 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing,
* software distributed under the License is distributed on an
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
* KIND, either express or implied. See the License for the
* specific language governing permissions and limitations
* under the License.
*/
package org.apache.iceberg.spark.procedures;

import org.apache.iceberg.actions.RewriteTablePath;
import org.apache.iceberg.spark.actions.RewriteTablePathSparkAction;
import org.apache.iceberg.spark.actions.SparkActions;
import org.apache.spark.sql.catalyst.InternalRow;
import org.apache.spark.sql.connector.catalog.Identifier;
import org.apache.spark.sql.connector.catalog.TableCatalog;
import org.apache.spark.sql.connector.iceberg.catalog.ProcedureParameter;
import org.apache.spark.sql.types.DataTypes;
import org.apache.spark.sql.types.Metadata;
import org.apache.spark.sql.types.StructField;
import org.apache.spark.sql.types.StructType;
import org.apache.spark.unsafe.types.UTF8String;

public class RewriteTablePathProcedure extends BaseProcedure {

private static final ProcedureParameter TABLE_PARAM =
ProcedureParameter.required("table", DataTypes.StringType);
private static final ProcedureParameter SOURCE_PREFIX_PARAM =
ProcedureParameter.required("source_prefix", DataTypes.StringType);
private static final ProcedureParameter TARGET_PREFIX_PARAM =
ProcedureParameter.required("target_prefix", DataTypes.StringType);
private static final ProcedureParameter START_VERSION_PARAM =
ProcedureParameter.optional("start_version", DataTypes.StringType);
private static final ProcedureParameter END_VERSION_PARM =
ProcedureParameter.optional("end_version", DataTypes.StringType);
private static final ProcedureParameter STAGING_LOCATION_PARAM =
ProcedureParameter.optional("staging_location", DataTypes.StringType);

private static final ProcedureParameter[] PARAMETERS =
new ProcedureParameter[] {
TABLE_PARAM,
SOURCE_PREFIX_PARAM,
TARGET_PREFIX_PARAM,
START_VERSION_PARAM,
END_VERSION_PARM,
STAGING_LOCATION_PARAM
};

private static final StructType OUTPUT_TYPE =
new StructType(
new StructField[] {
new StructField("latest_version", DataTypes.StringType, true, Metadata.empty()),
new StructField("file_list_location", DataTypes.StringType, true, Metadata.empty())
});

public static SparkProcedures.ProcedureBuilder builder() {
return new BaseProcedure.Builder<RewriteTablePathProcedure>() {
@Override
protected RewriteTablePathProcedure doBuild() {
return new RewriteTablePathProcedure(tableCatalog());
}
};
}

private RewriteTablePathProcedure(TableCatalog tableCatalog) {
super(tableCatalog);
}

@Override
public ProcedureParameter[] parameters() {
return PARAMETERS;
}

@Override
public StructType outputType() {
return OUTPUT_TYPE;
}

@Override
public InternalRow[] call(InternalRow args) {
ProcedureInput input = new ProcedureInput(spark(), tableCatalog(), PARAMETERS, args);
Identifier tableIdent = input.ident(TABLE_PARAM);
String sourcePrefix = input.asString(SOURCE_PREFIX_PARAM);
String targetPrefix = input.asString(TARGET_PREFIX_PARAM);
String startVersion = input.asString(START_VERSION_PARAM, null);
String endVersion = input.asString(END_VERSION_PARM, null);
String stagingLocation = input.asString(STAGING_LOCATION_PARAM, null);

return withIcebergTable(
tableIdent,
table -> {
RewriteTablePathSparkAction action = SparkActions.get().rewriteTablePath(table);

if (startVersion != null) {
action.startVersion(startVersion);
}
if (endVersion != null) {
action.endVersion(endVersion);
}
if (stagingLocation != null) {
action.stagingLocation(stagingLocation);
}

return toOutputRows(action.rewriteLocationPrefix(sourcePrefix, targetPrefix).execute());
});
}

private InternalRow[] toOutputRows(RewriteTablePath.Result result) {
return new InternalRow[] {
newInternalRow(
UTF8String.fromString(result.latestVersion()),
UTF8String.fromString(result.fileListLocation()))
};
}

@Override
public String description() {
return "RewriteTablePathProcedure";
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -62,6 +62,7 @@ private static Map<String, Supplier<ProcedureBuilder>> initProcedureBuilders() {
mapBuilder.put("rewrite_position_delete_files", RewritePositionDeleteFilesProcedure::builder);
mapBuilder.put("fast_forward", FastForwardBranchProcedure::builder);
mapBuilder.put("compute_table_stats", ComputeTableStatsProcedure::builder);
mapBuilder.put("rewrite_table_path", RewriteTablePathProcedure::builder);
return mapBuilder.build();
}

Expand Down

0 comments on commit 72a165a

Please sign in to comment.