Skip to content

Commit

Permalink
[Pick-2183][AMORO-2168] Support hive commit protocol in mixed-hive fo…
Browse files Browse the repository at this point in the history
…rmat. (#2220)

* [Pick-2183][AMORO-2168] Support hive commit protocol in mixed-hive format. (#2183)

pick from master branch and PR-2183
---------

Co-authored-by: ZhouJinsong <[email protected]>

* consistent-write.enable docs

* FileNameGenerator pattern.

* move to HiveTableProperties

* fix checkstyle

---------

Co-authored-by: ZhouJinsong <[email protected]>
  • Loading branch information
baiyangtx and zhoujinsong authored Nov 1, 2023
1 parent e3c1e55 commit 0472fe2
Show file tree
Hide file tree
Showing 27 changed files with 777 additions and 80 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -52,7 +52,7 @@
*/
public class FileNameGenerator {

private static final String KEYED_FILE_NAME_PATTERN_STRING = "(\\d+)-(\\w+)-(\\d+)-(\\d+)-(\\d+)-.*";
private static final String KEYED_FILE_NAME_PATTERN_STRING = "\\.?(\\d+)-(\\w+)-(\\d+)-(\\d+)-(\\d+)-.*";
private static final Pattern KEYED_FILE_NAME_PATTERN = Pattern.compile(KEYED_FILE_NAME_PATTERN_STRING);

private static final String FORMAT = "%d-%s-%d-%05d-%d-%s-%05d";
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -298,7 +298,7 @@ private TableProperties() {

public static final String LOG_STORE_DATA_VERSION = "log-store.data-version";
public static final String LOG_STORE_DATA_VERSION_DEFAULT = "v1";

public static final String LOG_STORE_PROPERTIES_PREFIX = "properties.";

public static final String OWNER = "owner";
Expand Down
19 changes: 19 additions & 0 deletions core/src/test/java/com/netease/arctic/io/DataTestHelpers.java
Original file line number Diff line number Diff line change
Expand Up @@ -45,6 +45,7 @@
import org.apache.iceberg.io.CloseableIterable;
import org.apache.iceberg.io.CloseableIterator;
import org.apache.iceberg.io.InputFile;
import org.apache.iceberg.io.TaskWriter;
import org.apache.iceberg.io.WriteResult;
import org.apache.iceberg.orc.ORC;
import org.apache.iceberg.parquet.Parquet;
Expand Down Expand Up @@ -122,6 +123,24 @@ public static List<DataFile> writeBaseStore(KeyedTable keyedTable, long txId, Li
}
}

public static List<DataFile> writeRecords(TaskWriter<Record> taskWriter, List<Record> records) {
try {
records.forEach(
d -> {
try {
taskWriter.write(d);
} catch (IOException e) {
throw new UncheckedIOException(e);
}
});

WriteResult result = taskWriter.complete();
return Lists.newArrayList(Arrays.asList(result.dataFiles()));
} catch (IOException e) {
throw new UncheckedIOException(e);
}
}

public static List<DataFile> writeAndCommitChangeStore(
KeyedTable keyedTable, long txId, ChangeAction action,
List<Record> records) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -18,6 +18,7 @@

package com.netease.arctic.flink.write;

import com.netease.arctic.hive.HiveTableProperties;
import com.netease.arctic.hive.io.writer.AdaptHiveOperateToTableRelation;
import com.netease.arctic.hive.io.writer.AdaptHiveOutputFileFactory;
import com.netease.arctic.hive.table.HiveLocationKind;
Expand Down Expand Up @@ -137,10 +138,14 @@ private FlinkBaseTaskWriter buildBaseWriter(LocationKind locationKind) {

Schema selectSchema = TypeUtil.reassignIds(
FlinkSchemaUtil.convert(FlinkSchemaUtil.toSchema(flinkSchema)), schema);
boolean hiveConsistentWriteEnabled = PropertyUtil.propertyAsBoolean(
table.properties(),
HiveTableProperties.HIVE_CONSISTENT_WRITE_ENABLED,
HiveTableProperties.HIVE_CONSISTENT_WRITE_ENABLED_DEFAULT);

OutputFileFactory outputFileFactory = locationKind == HiveLocationKind.INSTANT ?
new AdaptHiveOutputFileFactory(((SupportHive) table).hiveLocation(), table.spec(), fileFormat, table.io(),
encryptionManager, partitionId, taskId, transactionId) :
encryptionManager, partitionId, taskId, transactionId, hiveConsistentWriteEnabled) :
new CommonOutputFileFactory(baseLocation, table.spec(), fileFormat, table.io(),
encryptionManager, partitionId, taskId, transactionId);
FileAppenderFactory<RowData> appenderFactory = TableTypeUtil.isHive(table) ?
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -18,6 +18,7 @@

package com.netease.arctic.flink.write;

import com.netease.arctic.hive.HiveTableProperties;
import com.netease.arctic.hive.io.writer.AdaptHiveOperateToTableRelation;
import com.netease.arctic.hive.io.writer.AdaptHiveOutputFileFactory;
import com.netease.arctic.hive.table.HiveLocationKind;
Expand Down Expand Up @@ -135,12 +136,17 @@ private FlinkBaseTaskWriter buildBaseWriter(LocationKind locationKind) {
schema = table.schema();
}

Schema selectSchema = TypeUtil.reassignIds(
FlinkSchemaUtil.convert(FlinkSchemaUtil.toSchema(flinkSchema)), schema);
Schema selectSchema =
TypeUtil.reassignIds(
FlinkSchemaUtil.convert(FlinkSchemaUtil.toSchema(flinkSchema)), schema);
boolean hiveConsistentWriteEnabled = PropertyUtil.propertyAsBoolean(
table.properties(),
HiveTableProperties.HIVE_CONSISTENT_WRITE_ENABLED,
HiveTableProperties.HIVE_CONSISTENT_WRITE_ENABLED_DEFAULT);

OutputFileFactory outputFileFactory = locationKind == HiveLocationKind.INSTANT ?
new AdaptHiveOutputFileFactory(((SupportHive) table).hiveLocation(), table.spec(), fileFormat, table.io(),
encryptionManager, partitionId, taskId, transactionId) :
encryptionManager, partitionId, taskId, transactionId, hiveConsistentWriteEnabled) :
new CommonOutputFileFactory(baseLocation, table.spec(), fileFormat, table.io(),
encryptionManager, partitionId, taskId, transactionId);
FileAppenderFactory<RowData> appenderFactory = TableTypeUtil.isHive(table) ?
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -18,6 +18,7 @@

package com.netease.arctic.flink.write;

import com.netease.arctic.hive.HiveTableProperties;
import com.netease.arctic.hive.io.writer.AdaptHiveOperateToTableRelation;
import com.netease.arctic.hive.io.writer.AdaptHiveOutputFileFactory;
import com.netease.arctic.hive.table.HiveLocationKind;
Expand Down Expand Up @@ -135,12 +136,17 @@ private FlinkBaseTaskWriter buildBaseWriter(LocationKind locationKind) {
schema = table.schema();
}

Schema selectSchema = TypeUtil.reassignIds(
FlinkSchemaUtil.convert(FlinkSchemaUtil.toSchema(flinkSchema)), schema);
Schema selectSchema =
TypeUtil.reassignIds(
FlinkSchemaUtil.convert(FlinkSchemaUtil.toSchema(flinkSchema)), schema);
boolean hiveConsistentWriteEnabled = PropertyUtil.propertyAsBoolean(
table.properties(),
HiveTableProperties.HIVE_CONSISTENT_WRITE_ENABLED,
HiveTableProperties.HIVE_CONSISTENT_WRITE_ENABLED_DEFAULT);

OutputFileFactory outputFileFactory = locationKind == HiveLocationKind.INSTANT ?
new AdaptHiveOutputFileFactory(((SupportHive) table).hiveLocation(), table.spec(), fileFormat, table.io(),
encryptionManager, partitionId, taskId, transactionId) :
encryptionManager, partitionId, taskId, transactionId, hiveConsistentWriteEnabled) :
new CommonOutputFileFactory(baseLocation, table.spec(), fileFormat, table.io(),
encryptionManager, partitionId, taskId, transactionId);
FileAppenderFactory<RowData> appenderFactory = TableTypeUtil.isHive(table) ?
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -51,6 +51,11 @@ public class HiveTableProperties {
public static final String AUTO_SYNC_HIVE_DATA_WRITE = "base.hive.auto-sync-data-write";
public static final boolean AUTO_SYNC_HIVE_DATA_WRITE_DEFAULT = false;

/** enable consistent write for hive store */
public static final String HIVE_CONSISTENT_WRITE_ENABLED = "base.hive.consistent-write.enabled";

public static final boolean HIVE_CONSISTENT_WRITE_ENABLED_DEFAULT = true;

public static final String ALLOW_HIVE_TABLE_EXISTED = "allow-hive-table-existed";

public static final String WATERMARK_HIVE = "watermark.hive";
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -19,6 +19,7 @@
package com.netease.arctic.hive.io.writer;

import com.netease.arctic.data.ChangeAction;
import com.netease.arctic.hive.HiveTableProperties;
import com.netease.arctic.hive.table.HiveLocationKind;
import com.netease.arctic.hive.table.SupportHive;
import com.netease.arctic.hive.utils.TableTypeUtil;
Expand Down Expand Up @@ -68,9 +69,14 @@ public class AdaptHiveGenericTaskWriterBuilder implements TaskWriterBuilder<Reco
private String customHiveSubdirectory;
private Long targetFileSize;
private boolean orderedWriter = false;
private Boolean hiveConsistentWrite;

private AdaptHiveGenericTaskWriterBuilder(ArcticTable table) {
this.table = table;
this.hiveConsistentWrite = PropertyUtil.propertyAsBoolean(
table.properties(),
HiveTableProperties.HIVE_CONSISTENT_WRITE_ENABLED,
HiveTableProperties.HIVE_CONSISTENT_WRITE_ENABLED_DEFAULT);
}

public AdaptHiveGenericTaskWriterBuilder withTransactionId(Long transactionId) {
Expand All @@ -97,7 +103,7 @@ public AdaptHiveGenericTaskWriterBuilder withCustomHiveSubdirectory(String custo
this.customHiveSubdirectory = customHiveSubdirectory;
return this;
}

public AdaptHiveGenericTaskWriterBuilder withTargetFileSize(long targetFileSize) {
this.targetFileSize = targetFileSize;
return this;
Expand All @@ -108,6 +114,11 @@ public AdaptHiveGenericTaskWriterBuilder withOrdered() {
return this;
}

public AdaptHiveGenericTaskWriterBuilder hiveConsistentWrite(boolean enabled) {
this.hiveConsistentWrite = enabled;
return this;
}

@Override
public TaskWriter<Record> buildWriter(WriteOperationKind writeOperationKind) {
LocationKind locationKind = AdaptHiveOperateToTableRelation.INSTANT.getLocationKindsFromOperateKind(
Expand Down Expand Up @@ -179,17 +190,41 @@ private GenericBaseTaskWriter buildBaseWriter(LocationKind locationKind) {
schema = table.schema();
}

OutputFileFactory outputFileFactory = locationKind == HiveLocationKind.INSTANT ?
new AdaptHiveOutputFileFactory(((SupportHive) table).hiveLocation(), table.spec(), fileFormat,
table.io(), encryptionManager, partitionId, taskId, transactionId, customHiveSubdirectory) :
new CommonOutputFileFactory(baseLocation, table.spec(), fileFormat, table.io(),
encryptionManager, partitionId, taskId, transactionId);
FileAppenderFactory<Record> appenderFactory = TableTypeUtil.isHive(table) ?
new AdaptHiveGenericAppenderFactory(schema, table.spec()) :
new GenericAppenderFactory(schema, table.spec());
return new GenericBaseTaskWriter(fileFormat, appenderFactory,
OutputFileFactory outputFileFactory =
locationKind == HiveLocationKind.INSTANT ? new AdaptHiveOutputFileFactory(
((SupportHive) table).hiveLocation(),
table.spec(),
fileFormat,
table.io(),
encryptionManager,
partitionId,
taskId,
transactionId,
customHiveSubdirectory,
hiveConsistentWrite)
: new CommonOutputFileFactory(
baseLocation,
table.spec(),
fileFormat,
table.io(),
encryptionManager,
partitionId,
taskId,
transactionId);
FileAppenderFactory<Record> appenderFactory =
TableTypeUtil.isHive(table) ? new AdaptHiveGenericAppenderFactory(schema, table.spec())
: new GenericAppenderFactory(schema, table.spec());
return new GenericBaseTaskWriter(
fileFormat,
appenderFactory,
outputFileFactory,
table.io(), fileSizeBytes, mask, schema, table.spec(), primaryKeySpec, orderedWriter);
table.io(),
fileSizeBytes,
mask,
schema,
table.spec(),
primaryKeySpec,
orderedWriter);
}

private GenericChangeTaskWriter buildChangeWriter() {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -50,36 +50,49 @@
*/
public class AdaptHiveOutputFileFactory implements OutputFileFactory {

private final String baseLocation;
private final String hiveLocation;
private final String hiveSubDirectory;
private final PartitionSpec partitionSpec;
private final ArcticFileIO io;
private final EncryptionManager encryptionManager;
private final FileNameGenerator fileNameGenerator;
private final boolean hiveConsistentWrite;

public AdaptHiveOutputFileFactory(
String baseLocation,
String hiveLocation,
PartitionSpec partitionSpec,
FileFormat format,
ArcticFileIO io,
EncryptionManager encryptionManager,
int partitionId,
long taskId,
Long transactionId) {
this(baseLocation, partitionSpec, format, io, encryptionManager, partitionId, taskId, transactionId, null);
Long transactionId,
boolean hiveConsistentWrite) {
this(
hiveLocation,
partitionSpec,
format,
io,
encryptionManager,
partitionId,
taskId,
transactionId,
null,
hiveConsistentWrite);
}

public AdaptHiveOutputFileFactory(
String baseLocation,
String hiveLocation,
PartitionSpec partitionSpec,
FileFormat format,
ArcticFileIO io,
EncryptionManager encryptionManager,
int partitionId,
long taskId,
Long transactionId,
String hiveSubDirectory) {
this.baseLocation = baseLocation;
String hiveSubDirectory,
boolean hiveConsistentWrite) {
this.hiveLocation = hiveLocation;
this.partitionSpec = partitionSpec;
this.io = io;
this.encryptionManager = encryptionManager;
Expand All @@ -90,15 +103,23 @@ public AdaptHiveOutputFileFactory(
this.hiveSubDirectory = hiveSubDirectory;
}
this.fileNameGenerator = new FileNameGenerator(format, partitionId, taskId, transactionId);
this.hiveConsistentWrite = hiveConsistentWrite;
}

private String generateFilename(TaskWriterKey key) {
return fileNameGenerator.fileName(key);
String filename = fileNameGenerator.fileName(key);
if (hiveConsistentWrite) {
filename = "." + filename;
}
return filename;
}

private String fileLocation(StructLike partitionData, String fileName) {
return String.format("%s/%s",
HiveTableUtil.newHiveDataLocation(baseLocation, partitionSpec, partitionData, hiveSubDirectory), fileName);
return String.format(
"%s/%s",
HiveTableUtil.newHiveDataLocation(
hiveLocation, partitionSpec, partitionData, hiveSubDirectory),
fileName);
}

public EncryptedOutputFile newOutputFile(TaskWriterKey key) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -29,6 +29,7 @@
import org.apache.iceberg.expressions.Expressions;
import org.apache.iceberg.relocated.com.google.common.base.Preconditions;

import java.util.List;
import java.util.function.Consumer;

import static com.netease.arctic.op.OverwriteBaseFiles.PROPERTIES_TRANSACTION_ID;
Expand Down Expand Up @@ -59,11 +60,10 @@ public OverwriteFiles overwriteByRowFilter(Expression expr) {

@Override
public OverwriteFiles addFile(DataFile file) {
delegate.addFile(file);
String hiveLocationRoot = table.hiveLocation();
String dataFileLocation = file.path().toString();
if (dataFileLocation.toLowerCase().contains(hiveLocationRoot.toLowerCase())) {
// only handle file in hive location
if (!isHiveDataFile(file)) {
delegate.addFile(file);
} else {
// handle file in hive location when commit
this.addFiles.add(file);
}
return this;
Expand All @@ -72,9 +72,7 @@ public OverwriteFiles addFile(DataFile file) {
@Override
public OverwriteFiles deleteFile(DataFile file) {
delegate.deleteFile(file);
String hiveLocation = table.hiveLocation();
String dataFileLocation = file.path().toString();
if (dataFileLocation.toLowerCase().contains(hiveLocation.toLowerCase())) {
if (isHiveDataFile(file)) {
// only handle file in hive location
this.deleteFiles.add(file);
}
Expand Down Expand Up @@ -123,6 +121,11 @@ public OverwriteFiles validateNoConflictingDeletes() {
return this;
}

@Override
protected void postHiveDataCommitted(List<DataFile> committedDataFile) {
committedDataFile.forEach(delegate::addFile);
}

@Override
public OverwriteFiles set(String property, String value) {
if (PROPERTIES_TRANSACTION_ID.equals(property)) {
Expand Down
Loading

0 comments on commit 0472fe2

Please sign in to comment.