diff --git a/core/src/main/java/com/netease/arctic/data/file/FileNameGenerator.java b/core/src/main/java/com/netease/arctic/data/file/FileNameGenerator.java index dbe8b5bb8a..294bafed2b 100644 --- a/core/src/main/java/com/netease/arctic/data/file/FileNameGenerator.java +++ b/core/src/main/java/com/netease/arctic/data/file/FileNameGenerator.java @@ -52,7 +52,7 @@ */ public class FileNameGenerator { - private static final String KEYED_FILE_NAME_PATTERN_STRING = "(\\d+)-(\\w+)-(\\d+)-(\\d+)-(\\d+)-.*"; + private static final String KEYED_FILE_NAME_PATTERN_STRING = "\\.?(\\d+)-(\\w+)-(\\d+)-(\\d+)-(\\d+)-.*"; private static final Pattern KEYED_FILE_NAME_PATTERN = Pattern.compile(KEYED_FILE_NAME_PATTERN_STRING); private static final String FORMAT = "%d-%s-%d-%05d-%d-%s-%05d"; diff --git a/core/src/main/java/com/netease/arctic/table/TableProperties.java b/core/src/main/java/com/netease/arctic/table/TableProperties.java index 4fc5cd71fb..736009b29b 100644 --- a/core/src/main/java/com/netease/arctic/table/TableProperties.java +++ b/core/src/main/java/com/netease/arctic/table/TableProperties.java @@ -298,7 +298,7 @@ private TableProperties() { public static final String LOG_STORE_DATA_VERSION = "log-store.data-version"; public static final String LOG_STORE_DATA_VERSION_DEFAULT = "v1"; - + public static final String LOG_STORE_PROPERTIES_PREFIX = "properties."; public static final String OWNER = "owner"; diff --git a/core/src/test/java/com/netease/arctic/io/DataTestHelpers.java b/core/src/test/java/com/netease/arctic/io/DataTestHelpers.java index 323937e1ac..d1d98eb83f 100644 --- a/core/src/test/java/com/netease/arctic/io/DataTestHelpers.java +++ b/core/src/test/java/com/netease/arctic/io/DataTestHelpers.java @@ -45,6 +45,7 @@ import org.apache.iceberg.io.CloseableIterable; import org.apache.iceberg.io.CloseableIterator; import org.apache.iceberg.io.InputFile; +import org.apache.iceberg.io.TaskWriter; import org.apache.iceberg.io.WriteResult; import org.apache.iceberg.orc.ORC; import org.apache.iceberg.parquet.Parquet; @@ -122,6 +123,24 @@ public static List writeBaseStore(KeyedTable keyedTable, long txId, Li } } + public static List writeRecords(TaskWriter taskWriter, List records) { + try { + records.forEach( + d -> { + try { + taskWriter.write(d); + } catch (IOException e) { + throw new UncheckedIOException(e); + } + }); + + WriteResult result = taskWriter.complete(); + return Lists.newArrayList(Arrays.asList(result.dataFiles())); + } catch (IOException e) { + throw new UncheckedIOException(e); + } + } + public static List writeAndCommitChangeStore( KeyedTable keyedTable, long txId, ChangeAction action, List records) { diff --git a/flink/v1.12/flink/src/main/java/com/netease/arctic/flink/write/FlinkTaskWriterBuilder.java b/flink/v1.12/flink/src/main/java/com/netease/arctic/flink/write/FlinkTaskWriterBuilder.java index 9c8a2f9956..896b607521 100644 --- a/flink/v1.12/flink/src/main/java/com/netease/arctic/flink/write/FlinkTaskWriterBuilder.java +++ b/flink/v1.12/flink/src/main/java/com/netease/arctic/flink/write/FlinkTaskWriterBuilder.java @@ -18,6 +18,7 @@ package com.netease.arctic.flink.write; +import com.netease.arctic.hive.HiveTableProperties; import com.netease.arctic.hive.io.writer.AdaptHiveOperateToTableRelation; import com.netease.arctic.hive.io.writer.AdaptHiveOutputFileFactory; import com.netease.arctic.hive.table.HiveLocationKind; @@ -137,10 +138,14 @@ private FlinkBaseTaskWriter buildBaseWriter(LocationKind locationKind) { Schema selectSchema = TypeUtil.reassignIds( FlinkSchemaUtil.convert(FlinkSchemaUtil.toSchema(flinkSchema)), schema); + boolean hiveConsistentWriteEnabled = PropertyUtil.propertyAsBoolean( + table.properties(), + HiveTableProperties.HIVE_CONSISTENT_WRITE_ENABLED, + HiveTableProperties.HIVE_CONSISTENT_WRITE_ENABLED_DEFAULT); OutputFileFactory outputFileFactory = locationKind == HiveLocationKind.INSTANT ? new AdaptHiveOutputFileFactory(((SupportHive) table).hiveLocation(), table.spec(), fileFormat, table.io(), - encryptionManager, partitionId, taskId, transactionId) : + encryptionManager, partitionId, taskId, transactionId, hiveConsistentWriteEnabled) : new CommonOutputFileFactory(baseLocation, table.spec(), fileFormat, table.io(), encryptionManager, partitionId, taskId, transactionId); FileAppenderFactory appenderFactory = TableTypeUtil.isHive(table) ? diff --git a/flink/v1.14/flink/src/main/java/com/netease/arctic/flink/write/FlinkTaskWriterBuilder.java b/flink/v1.14/flink/src/main/java/com/netease/arctic/flink/write/FlinkTaskWriterBuilder.java index 8aaa4224fc..a088164e47 100644 --- a/flink/v1.14/flink/src/main/java/com/netease/arctic/flink/write/FlinkTaskWriterBuilder.java +++ b/flink/v1.14/flink/src/main/java/com/netease/arctic/flink/write/FlinkTaskWriterBuilder.java @@ -18,6 +18,7 @@ package com.netease.arctic.flink.write; +import com.netease.arctic.hive.HiveTableProperties; import com.netease.arctic.hive.io.writer.AdaptHiveOperateToTableRelation; import com.netease.arctic.hive.io.writer.AdaptHiveOutputFileFactory; import com.netease.arctic.hive.table.HiveLocationKind; @@ -135,12 +136,17 @@ private FlinkBaseTaskWriter buildBaseWriter(LocationKind locationKind) { schema = table.schema(); } - Schema selectSchema = TypeUtil.reassignIds( - FlinkSchemaUtil.convert(FlinkSchemaUtil.toSchema(flinkSchema)), schema); + Schema selectSchema = + TypeUtil.reassignIds( + FlinkSchemaUtil.convert(FlinkSchemaUtil.toSchema(flinkSchema)), schema); + boolean hiveConsistentWriteEnabled = PropertyUtil.propertyAsBoolean( + table.properties(), + HiveTableProperties.HIVE_CONSISTENT_WRITE_ENABLED, + HiveTableProperties.HIVE_CONSISTENT_WRITE_ENABLED_DEFAULT); OutputFileFactory outputFileFactory = locationKind == HiveLocationKind.INSTANT ? new AdaptHiveOutputFileFactory(((SupportHive) table).hiveLocation(), table.spec(), fileFormat, table.io(), - encryptionManager, partitionId, taskId, transactionId) : + encryptionManager, partitionId, taskId, transactionId, hiveConsistentWriteEnabled) : new CommonOutputFileFactory(baseLocation, table.spec(), fileFormat, table.io(), encryptionManager, partitionId, taskId, transactionId); FileAppenderFactory appenderFactory = TableTypeUtil.isHive(table) ? diff --git a/flink/v1.15/flink/src/main/java/com/netease/arctic/flink/write/FlinkTaskWriterBuilder.java b/flink/v1.15/flink/src/main/java/com/netease/arctic/flink/write/FlinkTaskWriterBuilder.java index 8aaa4224fc..a088164e47 100644 --- a/flink/v1.15/flink/src/main/java/com/netease/arctic/flink/write/FlinkTaskWriterBuilder.java +++ b/flink/v1.15/flink/src/main/java/com/netease/arctic/flink/write/FlinkTaskWriterBuilder.java @@ -18,6 +18,7 @@ package com.netease.arctic.flink.write; +import com.netease.arctic.hive.HiveTableProperties; import com.netease.arctic.hive.io.writer.AdaptHiveOperateToTableRelation; import com.netease.arctic.hive.io.writer.AdaptHiveOutputFileFactory; import com.netease.arctic.hive.table.HiveLocationKind; @@ -135,12 +136,17 @@ private FlinkBaseTaskWriter buildBaseWriter(LocationKind locationKind) { schema = table.schema(); } - Schema selectSchema = TypeUtil.reassignIds( - FlinkSchemaUtil.convert(FlinkSchemaUtil.toSchema(flinkSchema)), schema); + Schema selectSchema = + TypeUtil.reassignIds( + FlinkSchemaUtil.convert(FlinkSchemaUtil.toSchema(flinkSchema)), schema); + boolean hiveConsistentWriteEnabled = PropertyUtil.propertyAsBoolean( + table.properties(), + HiveTableProperties.HIVE_CONSISTENT_WRITE_ENABLED, + HiveTableProperties.HIVE_CONSISTENT_WRITE_ENABLED_DEFAULT); OutputFileFactory outputFileFactory = locationKind == HiveLocationKind.INSTANT ? new AdaptHiveOutputFileFactory(((SupportHive) table).hiveLocation(), table.spec(), fileFormat, table.io(), - encryptionManager, partitionId, taskId, transactionId) : + encryptionManager, partitionId, taskId, transactionId, hiveConsistentWriteEnabled) : new CommonOutputFileFactory(baseLocation, table.spec(), fileFormat, table.io(), encryptionManager, partitionId, taskId, transactionId); FileAppenderFactory appenderFactory = TableTypeUtil.isHive(table) ? diff --git a/hive/src/main/java/com/netease/arctic/hive/HiveTableProperties.java b/hive/src/main/java/com/netease/arctic/hive/HiveTableProperties.java index e31a9277fa..8a913c7038 100644 --- a/hive/src/main/java/com/netease/arctic/hive/HiveTableProperties.java +++ b/hive/src/main/java/com/netease/arctic/hive/HiveTableProperties.java @@ -51,6 +51,11 @@ public class HiveTableProperties { public static final String AUTO_SYNC_HIVE_DATA_WRITE = "base.hive.auto-sync-data-write"; public static final boolean AUTO_SYNC_HIVE_DATA_WRITE_DEFAULT = false; + /** enable consistent write for hive store */ + public static final String HIVE_CONSISTENT_WRITE_ENABLED = "base.hive.consistent-write.enabled"; + + public static final boolean HIVE_CONSISTENT_WRITE_ENABLED_DEFAULT = true; + public static final String ALLOW_HIVE_TABLE_EXISTED = "allow-hive-table-existed"; public static final String WATERMARK_HIVE = "watermark.hive"; diff --git a/hive/src/main/java/com/netease/arctic/hive/io/writer/AdaptHiveGenericTaskWriterBuilder.java b/hive/src/main/java/com/netease/arctic/hive/io/writer/AdaptHiveGenericTaskWriterBuilder.java index 3b6f1afbf2..bff14c61b2 100644 --- a/hive/src/main/java/com/netease/arctic/hive/io/writer/AdaptHiveGenericTaskWriterBuilder.java +++ b/hive/src/main/java/com/netease/arctic/hive/io/writer/AdaptHiveGenericTaskWriterBuilder.java @@ -19,6 +19,7 @@ package com.netease.arctic.hive.io.writer; import com.netease.arctic.data.ChangeAction; +import com.netease.arctic.hive.HiveTableProperties; import com.netease.arctic.hive.table.HiveLocationKind; import com.netease.arctic.hive.table.SupportHive; import com.netease.arctic.hive.utils.TableTypeUtil; @@ -68,9 +69,14 @@ public class AdaptHiveGenericTaskWriterBuilder implements TaskWriterBuilder buildWriter(WriteOperationKind writeOperationKind) { LocationKind locationKind = AdaptHiveOperateToTableRelation.INSTANT.getLocationKindsFromOperateKind( @@ -179,17 +190,41 @@ private GenericBaseTaskWriter buildBaseWriter(LocationKind locationKind) { schema = table.schema(); } - OutputFileFactory outputFileFactory = locationKind == HiveLocationKind.INSTANT ? - new AdaptHiveOutputFileFactory(((SupportHive) table).hiveLocation(), table.spec(), fileFormat, - table.io(), encryptionManager, partitionId, taskId, transactionId, customHiveSubdirectory) : - new CommonOutputFileFactory(baseLocation, table.spec(), fileFormat, table.io(), - encryptionManager, partitionId, taskId, transactionId); - FileAppenderFactory appenderFactory = TableTypeUtil.isHive(table) ? - new AdaptHiveGenericAppenderFactory(schema, table.spec()) : - new GenericAppenderFactory(schema, table.spec()); - return new GenericBaseTaskWriter(fileFormat, appenderFactory, + OutputFileFactory outputFileFactory = + locationKind == HiveLocationKind.INSTANT ? new AdaptHiveOutputFileFactory( + ((SupportHive) table).hiveLocation(), + table.spec(), + fileFormat, + table.io(), + encryptionManager, + partitionId, + taskId, + transactionId, + customHiveSubdirectory, + hiveConsistentWrite) + : new CommonOutputFileFactory( + baseLocation, + table.spec(), + fileFormat, + table.io(), + encryptionManager, + partitionId, + taskId, + transactionId); + FileAppenderFactory appenderFactory = + TableTypeUtil.isHive(table) ? new AdaptHiveGenericAppenderFactory(schema, table.spec()) + : new GenericAppenderFactory(schema, table.spec()); + return new GenericBaseTaskWriter( + fileFormat, + appenderFactory, outputFileFactory, - table.io(), fileSizeBytes, mask, schema, table.spec(), primaryKeySpec, orderedWriter); + table.io(), + fileSizeBytes, + mask, + schema, + table.spec(), + primaryKeySpec, + orderedWriter); } private GenericChangeTaskWriter buildChangeWriter() { diff --git a/hive/src/main/java/com/netease/arctic/hive/io/writer/AdaptHiveOutputFileFactory.java b/hive/src/main/java/com/netease/arctic/hive/io/writer/AdaptHiveOutputFileFactory.java index 8e5e02a9d2..5dd9dddf31 100644 --- a/hive/src/main/java/com/netease/arctic/hive/io/writer/AdaptHiveOutputFileFactory.java +++ b/hive/src/main/java/com/netease/arctic/hive/io/writer/AdaptHiveOutputFileFactory.java @@ -50,27 +50,39 @@ */ public class AdaptHiveOutputFileFactory implements OutputFileFactory { - private final String baseLocation; + private final String hiveLocation; private final String hiveSubDirectory; private final PartitionSpec partitionSpec; private final ArcticFileIO io; private final EncryptionManager encryptionManager; private final FileNameGenerator fileNameGenerator; + private final boolean hiveConsistentWrite; public AdaptHiveOutputFileFactory( - String baseLocation, + String hiveLocation, PartitionSpec partitionSpec, FileFormat format, ArcticFileIO io, EncryptionManager encryptionManager, int partitionId, long taskId, - Long transactionId) { - this(baseLocation, partitionSpec, format, io, encryptionManager, partitionId, taskId, transactionId, null); + Long transactionId, + boolean hiveConsistentWrite) { + this( + hiveLocation, + partitionSpec, + format, + io, + encryptionManager, + partitionId, + taskId, + transactionId, + null, + hiveConsistentWrite); } public AdaptHiveOutputFileFactory( - String baseLocation, + String hiveLocation, PartitionSpec partitionSpec, FileFormat format, ArcticFileIO io, @@ -78,8 +90,9 @@ public AdaptHiveOutputFileFactory( int partitionId, long taskId, Long transactionId, - String hiveSubDirectory) { - this.baseLocation = baseLocation; + String hiveSubDirectory, + boolean hiveConsistentWrite) { + this.hiveLocation = hiveLocation; this.partitionSpec = partitionSpec; this.io = io; this.encryptionManager = encryptionManager; @@ -90,15 +103,23 @@ public AdaptHiveOutputFileFactory( this.hiveSubDirectory = hiveSubDirectory; } this.fileNameGenerator = new FileNameGenerator(format, partitionId, taskId, transactionId); + this.hiveConsistentWrite = hiveConsistentWrite; } private String generateFilename(TaskWriterKey key) { - return fileNameGenerator.fileName(key); + String filename = fileNameGenerator.fileName(key); + if (hiveConsistentWrite) { + filename = "." + filename; + } + return filename; } private String fileLocation(StructLike partitionData, String fileName) { - return String.format("%s/%s", - HiveTableUtil.newHiveDataLocation(baseLocation, partitionSpec, partitionData, hiveSubDirectory), fileName); + return String.format( + "%s/%s", + HiveTableUtil.newHiveDataLocation( + hiveLocation, partitionSpec, partitionData, hiveSubDirectory), + fileName); } public EncryptedOutputFile newOutputFile(TaskWriterKey key) { diff --git a/hive/src/main/java/com/netease/arctic/hive/op/OverwriteHiveFiles.java b/hive/src/main/java/com/netease/arctic/hive/op/OverwriteHiveFiles.java index ccbf696117..41f024e05f 100644 --- a/hive/src/main/java/com/netease/arctic/hive/op/OverwriteHiveFiles.java +++ b/hive/src/main/java/com/netease/arctic/hive/op/OverwriteHiveFiles.java @@ -29,6 +29,7 @@ import org.apache.iceberg.expressions.Expressions; import org.apache.iceberg.relocated.com.google.common.base.Preconditions; +import java.util.List; import java.util.function.Consumer; import static com.netease.arctic.op.OverwriteBaseFiles.PROPERTIES_TRANSACTION_ID; @@ -59,11 +60,10 @@ public OverwriteFiles overwriteByRowFilter(Expression expr) { @Override public OverwriteFiles addFile(DataFile file) { - delegate.addFile(file); - String hiveLocationRoot = table.hiveLocation(); - String dataFileLocation = file.path().toString(); - if (dataFileLocation.toLowerCase().contains(hiveLocationRoot.toLowerCase())) { - // only handle file in hive location + if (!isHiveDataFile(file)) { + delegate.addFile(file); + } else { + // handle file in hive location when commit this.addFiles.add(file); } return this; @@ -72,9 +72,7 @@ public OverwriteFiles addFile(DataFile file) { @Override public OverwriteFiles deleteFile(DataFile file) { delegate.deleteFile(file); - String hiveLocation = table.hiveLocation(); - String dataFileLocation = file.path().toString(); - if (dataFileLocation.toLowerCase().contains(hiveLocation.toLowerCase())) { + if (isHiveDataFile(file)) { // only handle file in hive location this.deleteFiles.add(file); } @@ -123,6 +121,11 @@ public OverwriteFiles validateNoConflictingDeletes() { return this; } + @Override + protected void postHiveDataCommitted(List committedDataFile) { + committedDataFile.forEach(delegate::addFile); + } + @Override public OverwriteFiles set(String property, String value) { if (PROPERTIES_TRANSACTION_ID.equals(property)) { diff --git a/hive/src/main/java/com/netease/arctic/hive/op/ReplaceHivePartitions.java b/hive/src/main/java/com/netease/arctic/hive/op/ReplaceHivePartitions.java index 1e9adec92f..547b9d15f7 100644 --- a/hive/src/main/java/com/netease/arctic/hive/op/ReplaceHivePartitions.java +++ b/hive/src/main/java/com/netease/arctic/hive/op/ReplaceHivePartitions.java @@ -22,6 +22,7 @@ import com.netease.arctic.hive.HiveTableProperties; import com.netease.arctic.hive.exceptions.CannotAlterHiveLocationException; import com.netease.arctic.hive.table.UnkeyedHiveTable; +import com.netease.arctic.hive.utils.HiveCommitUtil; import com.netease.arctic.hive.utils.HivePartitionUtil; import com.netease.arctic.hive.utils.HiveTableUtil; import com.netease.arctic.op.UpdatePartitionProperties; @@ -98,12 +99,13 @@ public ReplaceHivePartitions( @Override public ReplacePartitions addFile(DataFile file) { - delegate.addFile(file); String tableLocation = table.hiveLocation(); String dataFileLocation = file.path().toString(); if (dataFileLocation.toLowerCase().contains(tableLocation.toLowerCase())) { // only handle file in hive location this.addFiles.add(file); + } else { + delegate.addFile(file); } return this; } @@ -140,6 +142,12 @@ public Snapshot apply() { @Override public void commit() { if (!addFiles.isEmpty()) { + List dataFiles = + HiveCommitUtil.commitConsistentWriteFiles(this.addFiles, table.io(), table.spec()); + this.addFiles.clear(); + this.addFiles.addAll(dataFiles); + this.addFiles.forEach(delegate::addFile); + commitTimestamp = (int) (System.currentTimeMillis() / 1000); if (table.spec().isUnpartitioned()) { generateUnpartitionTableLocation(); diff --git a/hive/src/main/java/com/netease/arctic/hive/op/RewriteHiveFiles.java b/hive/src/main/java/com/netease/arctic/hive/op/RewriteHiveFiles.java index f10f3723cb..67069cff68 100644 --- a/hive/src/main/java/com/netease/arctic/hive/op/RewriteHiveFiles.java +++ b/hive/src/main/java/com/netease/arctic/hive/op/RewriteHiveFiles.java @@ -8,6 +8,7 @@ import org.apache.iceberg.Snapshot; import org.apache.iceberg.SnapshotUpdate; import org.apache.iceberg.Transaction; +import org.apache.iceberg.relocated.com.google.common.collect.Sets; import java.util.ArrayList; import java.util.List; @@ -20,6 +21,12 @@ public class RewriteHiveFiles extends UpdateHiveFiles implements R private final RewriteFiles delegate; + private final Set filesToDelete = Sets.newHashSet(); + private final Set filesToAdd = Sets.newHashSet(); + private final Set deleteFilesToReplace = Sets.newHashSet(); + private final Set deleteFilesToAdd = Sets.newHashSet(); + private long dataSequenceNumber = -1; + public RewriteHiveFiles(Transaction transaction, boolean insideTransaction, UnkeyedHiveTable table, HMSClientPool hmsClient, HMSClientPool transactionClient) { super(transaction, insideTransaction, table, hmsClient, transactionClient); @@ -33,24 +40,37 @@ SnapshotUpdate getSnapshotUpdateDelegate() { @Override public RewriteFiles rewriteFiles(Set filesToDelete, Set filesToAdd) { - delegate.rewriteFiles(filesToDelete, filesToAdd); + this.filesToDelete.addAll(filesToDelete); + // only add datafile not in hive location + filesToAdd.stream().filter(dataFile -> !isHiveDataFile(dataFile)).forEach(this.filesToAdd::add); markHiveFiles(filesToDelete, filesToAdd); return this; } @Override - public RewriteFiles rewriteFiles(Set filesToDelete, Set filesToAdd, long sequenceNumber) { - delegate.rewriteFiles(filesToDelete, filesToAdd, sequenceNumber); + public RewriteFiles rewriteFiles( + Set filesToDelete, Set filesToAdd, long sequenceNumber) { + this.dataSequenceNumber = sequenceNumber; + this.filesToDelete.addAll(filesToDelete); + // only add datafile not in hive location + filesToAdd.stream().filter(dataFile -> !isHiveDataFile(dataFile)).forEach(this.filesToAdd::add); markHiveFiles(filesToDelete, filesToAdd); return this; } @Override - public RewriteFiles rewriteFiles(Set dataFilesToReplace, - Set deleteFilesToReplace, - Set dataFilesToAdd, - Set deleteFilesToAdd) { - delegate.rewriteFiles(dataFilesToReplace, deleteFilesToReplace, dataFilesToAdd, deleteFilesToAdd); + public RewriteFiles rewriteFiles( + Set dataFilesToReplace, + Set deleteFilesToReplace, + Set dataFilesToAdd, + Set deleteFilesToAdd) { + this.filesToDelete.addAll(dataFilesToReplace); + this.deleteFilesToReplace.addAll(deleteFilesToReplace); + this.deleteFilesToAdd.addAll(deleteFilesToAdd); + // only add datafile not in hive location + dataFilesToAdd.stream() + .filter(dataFile -> !isHiveDataFile(dataFile)) + .forEach(this.filesToAdd::add); markHiveFiles(dataFilesToReplace, dataFilesToAdd); return this; @@ -71,6 +91,19 @@ public RewriteFiles validateFromSnapshot(long snapshotId) { return this; } + @Override + protected void postHiveDataCommitted(List committedDataFile) { + this.filesToAdd.addAll(committedDataFile); + if (this.dataSequenceNumber != -1) { + this.delegate.rewriteFiles(this.filesToDelete, this.filesToAdd, this.dataSequenceNumber); + } else { + this.delegate.rewriteFiles( + this.filesToDelete, this.deleteFilesToReplace, + this.filesToAdd, this.deleteFilesToAdd + ); + } + } + @Override public RewriteFiles set(String property, String value) { if (PROPERTIES_TRANSACTION_ID.equals(property)) { diff --git a/hive/src/main/java/com/netease/arctic/hive/op/UpdateHiveFiles.java b/hive/src/main/java/com/netease/arctic/hive/op/UpdateHiveFiles.java index 497e6beaea..8c5be37e9c 100644 --- a/hive/src/main/java/com/netease/arctic/hive/op/UpdateHiveFiles.java +++ b/hive/src/main/java/com/netease/arctic/hive/op/UpdateHiveFiles.java @@ -4,6 +4,7 @@ import com.netease.arctic.hive.HiveTableProperties; import com.netease.arctic.hive.exceptions.CannotAlterHiveLocationException; import com.netease.arctic.hive.table.UnkeyedHiveTable; +import com.netease.arctic.hive.utils.HiveCommitUtil; import com.netease.arctic.hive.utils.HiveMetaSynchronizer; import com.netease.arctic.hive.utils.HivePartitionUtil; import com.netease.arctic.hive.utils.HiveTableUtil; @@ -99,6 +100,8 @@ public UpdateHiveFiles(Transaction transaction, boolean insideTransaction, Unkey abstract SnapshotUpdate getSnapshotUpdateDelegate(); + protected abstract void postHiveDataCommitted(List committedDataFile); + @Override public void commit() { commitTimestamp = (int) (System.currentTimeMillis() / 1000); @@ -106,6 +109,12 @@ public void commit() { if (syncDataToHive) { HiveMetaSynchronizer.syncArcticDataToHive(table); } + List committedDataFiles = + HiveCommitUtil.commitConsistentWriteFiles(this.addFiles, table.io(), table.spec()); + this.addFiles.clear(); + this.addFiles.addAll(committedDataFiles); + postHiveDataCommitted(this.addFiles); + if (table.spec().isUnpartitioned()) { generateUnpartitionTableLocation(); } else { @@ -476,4 +485,10 @@ private String partitionToString(Partition p) { return "Partition(values: [" + Joiner.on("/").join(p.getValues()) + "], location: " + p.getSd().getLocation() + ")"; } + + protected boolean isHiveDataFile(DataFile dataFile) { + String hiveLocation = table.hiveLocation(); + String dataFileLocation = dataFile.path().toString(); + return dataFileLocation.toLowerCase().contains(hiveLocation.toLowerCase()); + } } diff --git a/hive/src/main/java/com/netease/arctic/hive/table/SupportHive.java b/hive/src/main/java/com/netease/arctic/hive/table/SupportHive.java index f575b2d9fb..eefdba025b 100644 --- a/hive/src/main/java/com/netease/arctic/hive/table/SupportHive.java +++ b/hive/src/main/java/com/netease/arctic/hive/table/SupportHive.java @@ -19,11 +19,12 @@ package com.netease.arctic.hive.table; import com.netease.arctic.hive.HMSClientPool; +import com.netease.arctic.table.ArcticTable; /** * Mix-in interface to mark task use hive as base store */ -public interface SupportHive { +public interface SupportHive extends ArcticTable { /** * Base path to store hive data files diff --git a/hive/src/main/java/com/netease/arctic/hive/utils/HiveCommitUtil.java b/hive/src/main/java/com/netease/arctic/hive/utils/HiveCommitUtil.java new file mode 100644 index 0000000000..3e10c4b0e7 --- /dev/null +++ b/hive/src/main/java/com/netease/arctic/hive/utils/HiveCommitUtil.java @@ -0,0 +1,75 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package com.netease.arctic.hive.utils; + +import com.netease.arctic.io.ArcticFileIO; +import com.netease.arctic.utils.TableFileUtils; +import org.apache.iceberg.DataFile; +import org.apache.iceberg.DataFiles; +import org.apache.iceberg.PartitionSpec; +import org.apache.iceberg.relocated.com.google.common.collect.Lists; + +import java.util.List; + +/** Util class to help commit datafile in hive location. */ +public class HiveCommitUtil { + + /** + * When hive consistent write enabled, the writer will write files with the filename ".filename". + * During commit phase, it is necessary to rename it to a visible file to ensure its final + * consistency. + */ + public static List commitConsistentWriteFiles( + List dataFiles, ArcticFileIO fileIO, PartitionSpec spec) { + return applyConsistentWriteFile( + dataFiles, + spec, + (location, committed) -> { + if (!fileIO.exists(committed)) { + fileIO.rename(location, committed); + } + }); + } + + public static List applyConsistentWriteFile( + List dataFiles, PartitionSpec spec, HiveFileCommitter hiveFileCommitter) { + List afterCommittedFiles = Lists.newArrayList(); + for (DataFile file : dataFiles) { + String filename = TableFileUtils.getFileName(file.path().toString()); + if (!filename.startsWith(".")) { + afterCommittedFiles.add(file); + continue; + } + String committedFilename = filename.substring(1); + String committedLocation = + TableFileUtils.getFileDir(file.path().toString()) + "/" + committedFilename; + + hiveFileCommitter.commit(file.path().toString(), committedLocation); + DataFile committedDatafile = + DataFiles.builder(spec).copy(file).withPath(committedLocation).build(); + afterCommittedFiles.add(committedDatafile); + } + return afterCommittedFiles; + } + + @FunctionalInterface + public interface HiveFileCommitter { + void commit(String fileLocation, String committedLocation); + } +} diff --git a/hive/src/test/java/com/netease/arctic/hive/HiveTableTestBase.java b/hive/src/test/java/com/netease/arctic/hive/HiveTableTestBase.java index e4c33241fb..6bb061c659 100644 --- a/hive/src/test/java/com/netease/arctic/hive/HiveTableTestBase.java +++ b/hive/src/test/java/com/netease/arctic/hive/HiveTableTestBase.java @@ -37,7 +37,11 @@ import org.apache.iceberg.PartitionSpec; import org.apache.iceberg.Schema; import org.apache.iceberg.StructLike; +import org.apache.iceberg.data.GenericRecord; +import org.apache.iceberg.data.Record; import org.apache.iceberg.relocated.com.google.common.base.Joiner; +import org.apache.iceberg.relocated.com.google.common.collect.ImmutableList; +import org.apache.iceberg.relocated.com.google.common.collect.ImmutableMap; import org.apache.iceberg.relocated.com.google.common.collect.Lists; import org.apache.iceberg.types.Types; import org.apache.iceberg.util.StructLikeMap; @@ -51,6 +55,10 @@ import org.slf4j.Logger; import org.slf4j.LoggerFactory; +import java.math.BigDecimal; +import java.time.LocalDateTime; +import java.time.OffsetDateTime; +import java.time.ZoneOffset; import java.util.ArrayList; import java.util.HashMap; import java.util.List; @@ -287,4 +295,18 @@ public static void asserFilesName(List exceptedFiles, ArcticTable table) } Assert.assertEquals(exceptedFiles, fileNameList); } + + public List records(String... partitionValues) { + GenericRecord record = GenericRecord.create(HIVE_TABLE_SCHEMA); + + ImmutableList.Builder builder = ImmutableList.builder(); + for (String partitionValue : partitionValues) { + builder.add(record.copy(ImmutableMap.of(COLUMN_NAME_ID, 1, COLUMN_NAME_NAME, partitionValue, + COLUMN_NAME_OP_TIME, LocalDateTime.of(2022, 1, 1, 12, 0, 0), + COLUMN_NAME_OP_TIME_WITH_ZONE, OffsetDateTime.of( + LocalDateTime.of(2022, 1, 1, 12, 0, 0), ZoneOffset.UTC), + COLUMN_NAME_D, new BigDecimal("100")))); + } + return builder.build(); + } } diff --git a/hive/src/test/java/com/netease/arctic/hive/io/HiveDataTestHelpers.java b/hive/src/test/java/com/netease/arctic/hive/io/HiveDataTestHelpers.java new file mode 100644 index 0000000000..f9b5c8f939 --- /dev/null +++ b/hive/src/test/java/com/netease/arctic/hive/io/HiveDataTestHelpers.java @@ -0,0 +1,186 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package com.netease.arctic.hive.io; + +import com.netease.arctic.data.ChangeAction; +import com.netease.arctic.hive.HiveTableProperties; +import com.netease.arctic.hive.io.writer.AdaptHiveGenericTaskWriterBuilder; +import com.netease.arctic.hive.table.HiveLocationKind; +import com.netease.arctic.hive.table.SupportHive; +import com.netease.arctic.io.DataTestHelpers; +import com.netease.arctic.table.ArcticTable; +import com.netease.arctic.table.BaseLocationKind; +import com.netease.arctic.table.ChangeLocationKind; +import com.netease.arctic.table.LocationKind; +import com.netease.arctic.table.UnkeyedTable; +import com.netease.arctic.utils.TableFileUtils; +import java.io.IOException; +import java.io.UncheckedIOException; +import java.util.List; +import org.apache.iceberg.DataFile; +import org.apache.iceberg.Table; +import org.apache.iceberg.data.Record; +import org.apache.iceberg.io.TaskWriter; +import org.apache.iceberg.relocated.com.google.common.collect.Lists; +import org.apache.iceberg.util.PropertyUtil; +import org.junit.Assert; + +public class HiveDataTestHelpers { + + public static WriterHelper writerOf(ArcticTable table) { + return new WriterHelper(table); + } + + public static class WriterHelper { + ArcticTable table; + + public WriterHelper(ArcticTable table) { + this.table = table; + } + + boolean orderedWrite = false; + String customHiveLocation = null; + + Long txId = null; + + Boolean consistentWriteEnabled = null; + + public WriterHelper customHiveLocation(String customHiveLocation) { + this.customHiveLocation = customHiveLocation; + return this; + } + + public WriterHelper transactionId(Long txId) { + this.txId = txId; + return this; + } + + public WriterHelper consistentWriteEnabled(boolean consistentWriteEnabled) { + this.consistentWriteEnabled = consistentWriteEnabled; + return this; + } + + public WriterHelper orderedWrite(boolean orderedWrite) { + this.orderedWrite = orderedWrite; + return this; + } + + public List writeChange(List records, ChangeAction action) { + AdaptHiveGenericTaskWriterBuilder builder = + AdaptHiveGenericTaskWriterBuilder.builderFor(table) + .withChangeAction(action) + .withTransactionId(txId); + if (orderedWrite) { + builder.withOrdered(); + } + try (TaskWriter writer = builder.buildWriter(ChangeLocationKind.INSTANT)) { + return DataTestHelpers.writeRecords(writer, records); + } catch (IOException e) { + throw new UncheckedIOException(e); + } + } + + public List writeBase(List records) { + return writeRecords(BaseLocationKind.INSTANT, records); + } + + public List writeHive(List records) { + return writeRecords(HiveLocationKind.INSTANT, records); + } + + private List writeRecords(LocationKind writeLocationKind, List records) { + try (TaskWriter writer = newBaseWriter(writeLocationKind)) { + return DataTestHelpers.writeRecords(writer, records); + } catch (IOException e) { + throw new UncheckedIOException(e); + } + } + + private TaskWriter newBaseWriter(LocationKind writeLocationKind) { + AdaptHiveGenericTaskWriterBuilder builder = + AdaptHiveGenericTaskWriterBuilder.builderFor(table); + if (table.isKeyedTable()) { + builder.withTransactionId(txId); + } + if (orderedWrite) { + builder.withOrdered(); + } + if (customHiveLocation != null) { + builder.withCustomHiveSubdirectory(customHiveLocation); + } + if (this.consistentWriteEnabled != null) { + builder.hiveConsistentWrite(this.consistentWriteEnabled); + } + return builder.buildWriter(writeLocationKind); + } + } + + public static List lastedAddedFiles(Table tableStore) { + tableStore.refresh(); + return Lists.newArrayList(tableStore.currentSnapshot().addedFiles()); + } + + /** + * Assert the consistent-write process, with this parameter enabled, the written file is a hidden + * file. + */ + public static void assertWriteConsistentFilesName(SupportHive table, List files) { + boolean consistentWriteEnabled = PropertyUtil.propertyAsBoolean( + table.properties(), + HiveTableProperties.HIVE_CONSISTENT_WRITE_ENABLED, + HiveTableProperties.HIVE_CONSISTENT_WRITE_ENABLED_DEFAULT); + String hiveLocation = table.hiveLocation(); + for (DataFile f : files) { + String filename = TableFileUtils.getFileName(f.path().toString()); + if (isHiveFile(hiveLocation, f)) { + Assert.assertEquals(consistentWriteEnabled, filename.startsWith(".")); + } else { + Assert.assertFalse(filename.startsWith(".")); + } + } + } + + /** Assert the consistent-write commit, all file will not be hidden file after commit. */ + public static void assertWriteConsistentFilesCommit(ArcticTable table) { + table.refresh(); + UnkeyedTable unkeyedTable = baseStore(table); + unkeyedTable + .newScan() + .planFiles() + .forEach( + t -> { + String filename = TableFileUtils.getFileName(t.file().path().toString()); + Assert.assertFalse(filename.startsWith(".")); + }); + } + + public static boolean isHiveFile(String hiveLocation, DataFile file) { + String location = file.path().toString(); + return location.toLowerCase().startsWith(hiveLocation.toLowerCase()); + } + + /** Return the base store of the arctic table. */ + private static UnkeyedTable baseStore(ArcticTable arcticTable) { + if (arcticTable.isKeyedTable()) { + return arcticTable.asKeyedTable().baseTable(); + } else { + return arcticTable.asUnkeyedTable(); + } + } +} diff --git a/hive/src/test/java/com/netease/arctic/hive/op/AutoSyncHiveTest.java b/hive/src/test/java/com/netease/arctic/hive/op/AutoSyncHiveTest.java index b8c375a5d9..d8c102809f 100644 --- a/hive/src/test/java/com/netease/arctic/hive/op/AutoSyncHiveTest.java +++ b/hive/src/test/java/com/netease/arctic/hive/op/AutoSyncHiveTest.java @@ -22,6 +22,7 @@ import com.google.common.collect.Sets; import com.netease.arctic.hive.HiveTableProperties; import com.netease.arctic.hive.HiveTableTestBase; +import com.netease.arctic.hive.io.HiveDataTestHelpers; import com.netease.arctic.hive.io.writer.AdaptHiveGenericTaskWriterBuilder; import com.netease.arctic.hive.table.HiveLocationKind; import com.netease.arctic.hive.utils.HivePartitionUtil; @@ -81,8 +82,10 @@ public void testAutoSyncUnpartitionedTableHiveDataWrite() throws TException, IOE private void testAutoSyncUnpartitionedTableHiveDataWrite(ArcticTable testTable) throws TException, IOException { testTable.updateProperties().set(HiveTableProperties.AUTO_SYNC_HIVE_DATA_WRITE, "true").commit(); - List dataFiles = writeDataFiles(testTable, HiveLocationKind.INSTANT, - writeRecords("p1")); + List dataFiles = HiveDataTestHelpers.writerOf(testTable) + .transactionId(1L) + .consistentWriteEnabled(false) + .writeHive(records("p1")); UnkeyedTable tableStore; if (testTable.isKeyedTable()) { tableStore = testTable.asKeyedTable().baseTable(); @@ -111,8 +114,11 @@ private void testAutoSyncUnpartitionedTableHiveDataWrite(ArcticTable testTable) private void testAutoSyncPartitionedTableHiveDataWrite(ArcticTable testTable) throws TException, IOException { testTable.updateProperties().set(HiveTableProperties.AUTO_SYNC_HIVE_DATA_WRITE, "true").commit(); Table hiveTable = hms.getClient().getTable(testTable.id().getDatabase(), testTable.id().getTableName()); - List dataFiles = writeDataFiles(testTable, HiveLocationKind.INSTANT, - writeRecords("p1", "p2")); + List dataFiles = HiveDataTestHelpers.writerOf(testTable) + .transactionId(1L) + .consistentWriteEnabled(false) + .writeHive(records("p1", "p2")); + UnkeyedTable tableStore; if (testTable.isKeyedTable()) { tableStore = testTable.asKeyedTable().baseTable(); @@ -124,8 +130,10 @@ private void testAutoSyncPartitionedTableHiveDataWrite(ArcticTable testTable) th overwriteFiles.commit(); //test add new hive partition - List newFiles = writeDataFiles(testTable, HiveLocationKind.INSTANT, - writeRecords("p3")); + List newFiles = HiveDataTestHelpers.writerOf(testTable) + .transactionId(2L) + .consistentWriteEnabled(false) + .writeHive(records("p3")); dataFiles.addAll(newFiles); Partition newPartition = HivePartitionUtil.newPartition(hiveTable, Lists.newArrayList("p3"), TableFileUtils.getFileDir(newFiles.get(0).path().toString()), newFiles, diff --git a/hive/src/test/java/com/netease/arctic/hive/op/TestOverwriteFiles.java b/hive/src/test/java/com/netease/arctic/hive/op/TestOverwriteFiles.java index 5f8e94b710..df027e5609 100644 --- a/hive/src/test/java/com/netease/arctic/hive/op/TestOverwriteFiles.java +++ b/hive/src/test/java/com/netease/arctic/hive/op/TestOverwriteFiles.java @@ -4,7 +4,10 @@ import com.netease.arctic.hive.HiveTableTestBase; import com.netease.arctic.hive.MockDataFileBuilder; import com.netease.arctic.hive.exceptions.CannotAlterHiveLocationException; +import com.netease.arctic.hive.io.HiveDataTestHelpers; +import com.netease.arctic.hive.table.SupportHive; import com.netease.arctic.hive.table.UnkeyedHiveTable; +import com.netease.arctic.io.DataTestHelpers; import com.netease.arctic.op.OverwriteBaseFiles; import com.netease.arctic.table.KeyedTable; import com.netease.arctic.table.TableIdentifier; @@ -17,6 +20,7 @@ import org.apache.iceberg.OverwriteFiles; import org.apache.iceberg.Transaction; import org.apache.iceberg.UpdateProperties; +import org.apache.iceberg.data.Record; import org.apache.iceberg.expressions.Expressions; import org.apache.iceberg.relocated.com.google.common.collect.Lists; import org.apache.iceberg.relocated.com.google.common.collect.Maps; @@ -420,6 +424,22 @@ public void testOverwriteCommitHMSFailed() throws TException { hms.getClient().alter_table(testHiveTable.id().getDatabase(), "new_table", hiveTable); } + @Test + public void testConsistentWriteCommit() { + UnkeyedHiveTable table = testHiveTable; + List records = records("p1", "p2"); + List files = HiveDataTestHelpers.writerOf(table) + .transactionId(1L) + .writeHive(records); + HiveDataTestHelpers.assertWriteConsistentFilesName(table, files); + + OverwriteFiles overwriteFiles = table.newOverwrite(); + files.forEach(overwriteFiles::addFile); + overwriteFiles.commit(); + + HiveDataTestHelpers.assertWriteConsistentFilesCommit(table); + } + private void applyOverwrite( Map partitionAndLocations, Predicate deleteFunc, diff --git a/hive/src/test/java/com/netease/arctic/hive/op/TestRewriteFiles.java b/hive/src/test/java/com/netease/arctic/hive/op/TestRewriteFiles.java index 512cafe9f6..25c97caa73 100644 --- a/hive/src/test/java/com/netease/arctic/hive/op/TestRewriteFiles.java +++ b/hive/src/test/java/com/netease/arctic/hive/op/TestRewriteFiles.java @@ -4,7 +4,9 @@ import com.netease.arctic.hive.HiveTableTestBase; import com.netease.arctic.hive.MockDataFileBuilder; import com.netease.arctic.hive.exceptions.CannotAlterHiveLocationException; +import com.netease.arctic.hive.io.HiveDataTestHelpers; import com.netease.arctic.hive.table.UnkeyedHiveTable; +import com.netease.arctic.io.DataTestHelpers; import com.netease.arctic.op.OverwriteBaseFiles; import com.netease.arctic.table.KeyedTable; import com.netease.arctic.table.TableIdentifier; @@ -18,6 +20,7 @@ import org.apache.iceberg.RewriteFiles; import org.apache.iceberg.Transaction; import org.apache.iceberg.UpdateProperties; +import org.apache.iceberg.data.Record; import org.apache.iceberg.relocated.com.google.common.collect.Lists; import org.apache.iceberg.relocated.com.google.common.collect.Maps; import org.apache.iceberg.relocated.com.google.common.collect.Sets; @@ -380,6 +383,28 @@ public void testExceptionDeleteCreateSamePartition() throws TException { Assert.assertThrows(CannotAlterHiveLocationException.class, rewriteFiles::commit); } + + @Test + public void testConsistentWriteCommit() { + UnkeyedHiveTable table = testHiveTable; + List records = records("p1", "p2"); + List files = HiveDataTestHelpers.writerOf(table) + .transactionId(1L) + .writeHive(records); + OverwriteFiles overwriteFiles = table.newOverwrite(); + files.forEach(overwriteFiles::addFile); + overwriteFiles.commit(); + files = HiveDataTestHelpers.lastedAddedFiles(table); + + List newDataFiles = HiveDataTestHelpers.writerOf(table) + .transactionId(2L) + .writeHive(records); + RewriteFiles rewriteFiles = table.newRewrite(); + rewriteFiles.rewriteFiles(Sets.newHashSet(files), Sets.newHashSet(newDataFiles)); + HiveDataTestHelpers.assertWriteConsistentFilesCommit(table); + } + + private void applyUpdateHiveFiles( Map partitionAndLocations, Predicate deleteFunc, diff --git a/hive/src/test/java/com/netease/arctic/hive/op/TestRewritePartitions.java b/hive/src/test/java/com/netease/arctic/hive/op/TestRewritePartitions.java index 447a3efc02..d67778a67a 100644 --- a/hive/src/test/java/com/netease/arctic/hive/op/TestRewritePartitions.java +++ b/hive/src/test/java/com/netease/arctic/hive/op/TestRewritePartitions.java @@ -21,6 +21,8 @@ import com.netease.arctic.hive.HiveTableTestBase; import com.netease.arctic.hive.MockDataFileBuilder; import com.netease.arctic.hive.exceptions.CannotAlterHiveLocationException; +import com.netease.arctic.hive.io.HiveDataTestHelpers; +import com.netease.arctic.hive.table.UnkeyedHiveTable; import com.netease.arctic.op.RewritePartitions; import com.netease.arctic.table.KeyedTable; import com.netease.arctic.table.UnkeyedTable; @@ -28,11 +30,15 @@ import org.apache.hadoop.hive.metastore.api.Table; import org.apache.iceberg.AppendFiles; import org.apache.iceberg.DataFile; +import org.apache.iceberg.OverwriteFiles; import org.apache.iceberg.ReplacePartitions; +import org.apache.iceberg.RewriteFiles; import org.apache.iceberg.Transaction; import org.apache.iceberg.UpdateProperties; +import org.apache.iceberg.data.Record; import org.apache.iceberg.relocated.com.google.common.collect.Lists; import org.apache.iceberg.relocated.com.google.common.collect.Maps; +import org.apache.iceberg.relocated.com.google.common.collect.Sets; import org.apache.thrift.TException; import org.junit.Assert; import org.junit.Test; @@ -176,6 +182,29 @@ public void testExceptionAddFileWithDifferentLocation() throws TException { Assert.assertThrows(CannotAlterHiveLocationException.class, replacePartitions::commit); } + + @Test + public void testConsistentWriteCommit() { + UnkeyedHiveTable table = testHiveTable; + List records = records("p1", "p2"); + List files = HiveDataTestHelpers.writerOf(table) + .transactionId(1L) + .writeHive(records); + HiveDataTestHelpers.assertWriteConsistentFilesName(table, files); + + OverwriteFiles overwriteFiles = table.newOverwrite(); + files.forEach(overwriteFiles::addFile); + overwriteFiles.commit(); + + List newDataFiles = HiveDataTestHelpers.writerOf(table) + .transactionId(2L) + .writeHive(records); + ReplacePartitions replacePartitions = table.newReplacePartitions(); + newDataFiles.forEach(replacePartitions::addFile); + HiveDataTestHelpers.assertWriteConsistentFilesCommit(table); + } + + private void applyRewritePartitions( Map partitionLocations, List> overwriteFiles) { diff --git a/hive/src/test/java/com/netease/arctic/hive/utils/HiveMetaSynchronizerTest.java b/hive/src/test/java/com/netease/arctic/hive/utils/HiveMetaSynchronizerTest.java index 58d503232a..d68af976e9 100644 --- a/hive/src/test/java/com/netease/arctic/hive/utils/HiveMetaSynchronizerTest.java +++ b/hive/src/test/java/com/netease/arctic/hive/utils/HiveMetaSynchronizerTest.java @@ -26,6 +26,7 @@ import com.netease.arctic.hive.HMSClientPool; import com.netease.arctic.hive.HiveTableProperties; import com.netease.arctic.hive.HiveTableTestBase; +import com.netease.arctic.hive.io.HiveDataTestHelpers; import com.netease.arctic.hive.io.writer.AdaptHiveGenericTaskWriterBuilder; import com.netease.arctic.hive.op.RewriteHiveFiles; import com.netease.arctic.hive.table.HiveLocationKind; @@ -96,9 +97,18 @@ public void testSyncDataToHive() throws IOException, TException { Assert.assertEquals(0, Iterables.size(testHiveTable.snapshots())); List dataFiles = writeDataFiles(testHiveTable, HiveLocationKind.INSTANT, writeRecords("p1", "p2")); + OverwriteFiles overwriteFiles = testHiveTable.newOverwrite(); + dataFiles.forEach(overwriteFiles::addFile); + overwriteFiles.commit(); + Assert.assertEquals(1, Iterables.size(testHiveTable.snapshots())); + List partitions = hms.getClient() + .listPartitions(HIVE_TABLE_ID.getDatabase(), HIVE_TABLE_ID.getTableName(), Short.MAX_VALUE); + Assert.assertEquals(2, partitions.size()); + String partition1FilePath = null; String partition2FilePath = null; String partition3FilePath = null; + dataFiles = HiveDataTestHelpers.lastedAddedFiles(testHiveTable); for (DataFile dataFile : dataFiles) { if (dataFile.partition().get(0, String.class).equals("p1")) { partition1FilePath = dataFile.path().toString(); @@ -109,17 +119,11 @@ public void testSyncDataToHive() throws IOException, TException { Assert.assertNotNull(partition1FilePath); Assert.assertNotNull(partition2FilePath); - OverwriteFiles overwriteFiles = testHiveTable.newOverwrite(); - dataFiles.forEach(overwriteFiles::addFile); - overwriteFiles.commit(); - Assert.assertEquals(1, Iterables.size(testHiveTable.snapshots())); - List partitions = hms.getClient() - .listPartitions(HIVE_TABLE_ID.getDatabase(), HIVE_TABLE_ID.getTableName(), Short.MAX_VALUE); - Assert.assertEquals(2, partitions.size()); //test add new hive partition - List newFiles = writeDataFiles(testHiveTable, HiveLocationKind.INSTANT, - writeRecords("p3")); + List newFiles = HiveDataTestHelpers.writerOf(testHiveTable) + .consistentWriteEnabled(false) + .writeHive(records("p3")); Assert.assertEquals(1, newFiles.size()); partition3FilePath = newFiles.get(0).path().toString(); Partition newPartition = HivePartitionUtil.newPartition(hiveTable, Lists.newArrayList("p3"), @@ -132,9 +136,9 @@ public void testSyncDataToHive() throws IOException, TException { partitions = hms.getClient() .listPartitions(HIVE_TABLE_ID.getDatabase(), HIVE_TABLE_ID.getTableName(), Short.MAX_VALUE); Assert.assertEquals(3, partitions.size()); - Assert.assertEquals( - Sets.newHashSet(partition1FilePath, partition2FilePath, partition3FilePath), - listTableFiles(testHiveTable).stream().map(DataFile::path).collect(Collectors.toSet())); + Assert.assertTrue( + listTableFiles(testHiveTable).stream().map(DataFile::path) + .collect(Collectors.toSet()).contains(partition3FilePath)); //test drop hive partition hms.getClient().dropPartition(HIVE_TABLE_ID.getDatabase(), HIVE_TABLE_ID.getTableName(), @@ -168,8 +172,10 @@ public void testSyncDataToHive() throws IOException, TException { //should not sync hive data to arctic when both hive location and hive transient_lastDdlTime not changed OverwriteFiles p4OverwriteFiles = testHiveTable.newOverwrite(); - List p4DataFiles = writeDataFiles(testHiveTable, HiveLocationKind.INSTANT, - writeRecords("p4")); + List p4DataFiles = HiveDataTestHelpers.writerOf(testHiveTable) + .consistentWriteEnabled(false) + .writeHive(records("p4")); + p4DataFiles.forEach(p4OverwriteFiles::addFile); p4OverwriteFiles.commit(); Partition hiveOldPartition = hms.getClient().getPartition(HIVE_TABLE_ID.getDatabase(), HIVE_TABLE_ID.getTableName(), @@ -196,8 +202,10 @@ public void testSyncDataToHive() throws IOException, TException { public void testKeyedTableSyncDataToHive() throws IOException, TException { Table hiveTable = hms.getClient() .getTable(UN_PARTITION_HIVE_PK_TABLE_ID.getDatabase(), UN_PARTITION_HIVE_PK_TABLE_ID.getTableName()); - List p1DataFiles = writeDataFiles(testUnPartitionKeyedHiveTable, HiveLocationKind.INSTANT, - writeRecords("p1")); + List p1DataFiles = HiveDataTestHelpers.writerOf(testUnPartitionKeyedHiveTable) + .consistentWriteEnabled(false) + .transactionId(1L) + .writeHive(records("p1")); hiveTable.getSd().setLocation(p1DataFiles.get(0).path().toString().substring( 0, p1DataFiles.get(0).path().toString().lastIndexOf("/"))); @@ -217,8 +225,10 @@ public void testKeyedTableSyncDataToHive() throws IOException, TException { //should not sync hive data to arctic when both hive location and hive transient_lastDdlTime not changed hiveTable = hms.getClient() .getTable(UN_PARTITION_HIVE_PK_TABLE_ID.getDatabase(), UN_PARTITION_HIVE_PK_TABLE_ID.getTableName()); - List p1NewFiles = writeDataFiles(testUnPartitionKeyedHiveTable, HiveLocationKind.INSTANT, - writeRecords("p1")); + List p1NewFiles = HiveDataTestHelpers.writerOf(testUnPartitionKeyedHiveTable) + .transactionId(2L) + .consistentWriteEnabled(false) + .writeHive(records("p1")); RewriteFiles p1NewRewrite = testUnPartitionKeyedHiveTable.baseTable().newRewrite(); p1NewRewrite.rewriteFiles(new HashSet<>(p1DataFiles), new HashSet<>(p1NewFiles)); p1NewRewrite.commit(); diff --git a/site/docs/ch/configurations.md b/site/docs/ch/configurations.md index 4baf155954..ae1739accc 100644 --- a/site/docs/ch/configurations.md +++ b/site/docs/ch/configurations.md @@ -90,10 +90,11 @@ Self-optimizing 配置对 Iceberg format, Mixed streaming format 都会生效。 ### Mixed Hive format 相关配置 -| 配置名称 | 默认值 | 描述 | -| ---------------------------------- | ---------------- | ---------------------------------- | -| base.hive.auto-sync-schema-change | true | 是否从 HMS 自动同步 Hive 的 schema 变更 | -| base.hive.auto-sync-data-write | false | 是否自动同步 Hive 的原生的数据写入,有 Hive 原生数据写入时需要打开 | +| 配置名称 | 默认值 | 描述 | +|------------------------------------|-------|---------------------------------------------| +| base.hive.auto-sync-schema-change | true | 是否从 HMS 自动同步 Hive 的 schema 变更 | +| base.hive.auto-sync-data-write | false | 是否自动同步 Hive 的原生的数据写入,有 Hive 原生数据写入时需要打开 | +| base.hive.consistent-write.enabled | true | 写入 hive 路径的文件会先写隐藏文件,commit 期间 rename 为可见文件 | ### Trash 相关配置 diff --git a/spark/v3.1/spark/src/main/java/com/netease/arctic/spark/io/TaskWriters.java b/spark/v3.1/spark/src/main/java/com/netease/arctic/spark/io/TaskWriters.java index 76825079c0..877f887fd0 100644 --- a/spark/v3.1/spark/src/main/java/com/netease/arctic/spark/io/TaskWriters.java +++ b/spark/v3.1/spark/src/main/java/com/netease/arctic/spark/io/TaskWriters.java @@ -18,6 +18,7 @@ package com.netease.arctic.spark.io; +import com.netease.arctic.hive.HiveTableProperties; import com.netease.arctic.hive.io.writer.AdaptHiveOutputFileFactory; import com.netease.arctic.hive.table.SupportHive; import com.netease.arctic.io.writer.ChangeTaskWriter; @@ -94,7 +95,7 @@ public TaskWriters withDataSourceSchema(StructType dsSchema) { this.dsSchema = dsSchema; return this; } - + public TaskWriters withHiveSubdirectory(String hiveSubdirectory) { this.hiveSubdirectory = hiveSubdirectory; return this; @@ -133,18 +134,21 @@ public TaskWriter newBaseWriter(boolean isOverwrite) { .builderFor(icebergTable, schema, dsSchema) .writeHive(isHiveTable) .build(); + boolean consistentWriteEnabled = PropertyUtil.propertyAsBoolean( + table.properties(), + HiveTableProperties.HIVE_CONSISTENT_WRITE_ENABLED, + HiveTableProperties.HIVE_CONSISTENT_WRITE_ENABLED_DEFAULT); OutputFileFactory outputFileFactory; if (isHiveTable && isOverwrite) { outputFileFactory = new AdaptHiveOutputFileFactory( ((SupportHive) table).hiveLocation(), table.spec(), fileFormat, table.io(), - encryptionManager, partitionId, taskId, transactionId, hiveSubdirectory); + encryptionManager, partitionId, taskId, transactionId, hiveSubdirectory, consistentWriteEnabled); } else { outputFileFactory = new CommonOutputFileFactory( baseLocation, table.spec(), fileFormat, table.io(), encryptionManager, partitionId, taskId, transactionId); } - return new ArcticSparkBaseTaskWriter( fileFormat, appenderFactory, outputFileFactory, table.io(), fileSize, mask, schema, diff --git a/spark/v3.1/spark/src/test/java/com/netease/arctic/spark/ArcticSparkCatalogTestGroup.java b/spark/v3.1/spark/src/test/java/com/netease/arctic/spark/ArcticSparkCatalogTestGroup.java index 8c15437c61..9220e0192f 100644 --- a/spark/v3.1/spark/src/test/java/com/netease/arctic/spark/ArcticSparkCatalogTestGroup.java +++ b/spark/v3.1/spark/src/test/java/com/netease/arctic/spark/ArcticSparkCatalogTestGroup.java @@ -31,6 +31,7 @@ import com.netease.arctic.spark.hive.TestUnkeyedHiveInsertOverwriteDynamic; import com.netease.arctic.spark.hive.TestUnkeyedHiveInsertOverwriteStatic; import com.netease.arctic.spark.hive.TestUnkeyedTableDml; +import com.netease.arctic.spark.io.TestConsistentWrite; import com.netease.arctic.spark.source.TestKeyedTableDataFrameAPI; import com.netease.arctic.spark.source.TestUnKeyedTableDataFrameAPI; import org.apache.iceberg.relocated.com.google.common.collect.Maps; @@ -78,7 +79,8 @@ TestHiveTableTruncate.class, TestKeyedTableMergeInto.class, TestUnKeyedTableMergeInto.class, - TestUpsert.class + TestUpsert.class, + TestConsistentWrite.class }) public class ArcticSparkCatalogTestGroup { diff --git a/spark/v3.1/spark/src/test/java/com/netease/arctic/spark/SparkTestContext.java b/spark/v3.1/spark/src/test/java/com/netease/arctic/spark/SparkTestContext.java index f6b89c9124..5b8869f56d 100644 --- a/spark/v3.1/spark/src/test/java/com/netease/arctic/spark/SparkTestContext.java +++ b/spark/v3.1/spark/src/test/java/com/netease/arctic/spark/SparkTestContext.java @@ -59,13 +59,17 @@ import org.apache.iceberg.relocated.com.google.common.collect.Lists; import org.apache.iceberg.relocated.com.google.common.collect.Maps; import org.apache.iceberg.relocated.com.google.common.collect.Sets; +import org.apache.iceberg.spark.SparkSchemaUtil; import org.apache.iceberg.util.StructLikeMap; import org.apache.spark.SparkConf; import org.apache.spark.sql.Dataset; import org.apache.spark.sql.Row; import org.apache.spark.sql.RowFactory; import org.apache.spark.sql.SparkSession; +import org.apache.spark.sql.catalyst.InternalRow; +import org.apache.spark.sql.catalyst.encoders.RowEncoder; import org.apache.spark.sql.internal.SQLConf; +import org.apache.spark.sql.types.StructType; import org.apache.thrift.TException; import org.junit.Assert; import org.junit.rules.ExternalResource; @@ -626,6 +630,13 @@ public static Row recordToRow(Record record) { return RowFactory.create(values); } + public static InternalRow recordToInternalRow(Schema schema, Record record) { + StructType structType = SparkSchemaUtil.convert(schema); + Row row = recordToRow(record); + return RowEncoder.apply(structType).createSerializer().apply(row); + } + + public Map properties(String... kv) { Map props = Maps.newHashMap(); for (int i = 0; i < kv.length; i = i + 2) { diff --git a/spark/v3.1/spark/src/test/java/com/netease/arctic/spark/io/TestConsistentWrite.java b/spark/v3.1/spark/src/test/java/com/netease/arctic/spark/io/TestConsistentWrite.java new file mode 100644 index 0000000000..6490bf2874 --- /dev/null +++ b/spark/v3.1/spark/src/test/java/com/netease/arctic/spark/io/TestConsistentWrite.java @@ -0,0 +1,147 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package com.netease.arctic.spark.io; + +import com.netease.arctic.catalog.ArcticCatalog; +import com.netease.arctic.hive.HiveTableProperties; +import com.netease.arctic.hive.io.HiveDataTestHelpers; +import com.netease.arctic.hive.table.SupportHive; +import com.netease.arctic.spark.SparkTestBase; +import com.netease.arctic.table.ArcticTable; +import com.netease.arctic.table.PrimaryKeySpec; +import com.netease.arctic.table.TableIdentifier; +import org.apache.iceberg.DataFile; +import org.apache.iceberg.PartitionSpec; +import org.apache.iceberg.Schema; +import org.apache.iceberg.data.Record; +import org.apache.iceberg.io.TaskWriter; +import org.apache.iceberg.io.WriteResult; +import org.apache.iceberg.relocated.com.google.common.collect.Lists; +import org.apache.iceberg.spark.SparkSchemaUtil; +import org.apache.iceberg.types.Types; +import org.apache.spark.sql.catalyst.InternalRow; +import org.apache.spark.sql.types.StructType; +import org.junit.Before; +import org.junit.Test; +import org.junit.runner.RunWith; +import org.junit.runners.Parameterized; + +import java.io.IOException; +import java.util.List; + +@RunWith(Parameterized.class) +public class TestConsistentWrite extends SparkTestBase { + private final String catalogName = catalogNameHive; + private final String database = "db_def"; + private final String tableName = "tbl"; + public static final Schema schema = new Schema( + Types.NestedField.required(1, "id", Types.IntegerType.get()), + Types.NestedField.required(2, "ts", Types.LongType.get()), + Types.NestedField.required(3, "pt", Types.StringType.get()) + ); + + + private final boolean consistentWriteEnabled ; + private final PartitionSpec partitionSpec; + private final PrimaryKeySpec keySpec; + + public TestConsistentWrite(boolean consistentWriteEnabled, PartitionSpec ptSpec, PrimaryKeySpec pkSpec) { + this.consistentWriteEnabled = consistentWriteEnabled; + this.partitionSpec = ptSpec; + this.keySpec = pkSpec; + } + + @Parameterized.Parameters(name = "{0}, {1}, {2}") + public static Object[] parameters() { + final PartitionSpec ptSpec = PartitionSpec.builderFor(schema) + .identity("pt").build(); + + final PrimaryKeySpec pkSpec = PrimaryKeySpec.builderFor(schema) + .addColumn("id") + .build(); + return new Object[][] { + {true, ptSpec, pkSpec}, + {false, ptSpec, pkSpec}, + {true, PartitionSpec.unpartitioned(), pkSpec}, + {false, PartitionSpec.unpartitioned(), pkSpec}, + {true, ptSpec, PrimaryKeySpec.noPrimaryKey()}, + {false, ptSpec, PrimaryKeySpec.noPrimaryKey()} + }; + } + + ArcticCatalog catalog; + @Before + public void before() { + catalog = catalog(catalogName); + catalog.createDatabase(database); + } + + public void after() { + try{ + catalog.dropTable(TableIdentifier.of(catalogName, database, tableName), true); + } catch (Exception e) { + // pass + } + catalog.dropDatabase(database); + } + + + @Test + public void testConsistentWrite() { + ArcticTable table = catalog.newTableBuilder( + TableIdentifier.of(catalogName, database, tableName), schema) + .withProperty(HiveTableProperties.HIVE_CONSISTENT_WRITE_ENABLED, consistentWriteEnabled + "") + .withPrimaryKeySpec(this.keySpec) + .withPartitionSpec(this.partitionSpec) + .create(); + + StructType dsSchema = SparkSchemaUtil.convert(schema); + List records = Lists.newArrayList( + newRecord(schema, 1, 0L, "pt1"), + newRecord(schema, 2, 0L, "pt2"), + newRecord(schema, 3, 0L, "pt3") + ); + TaskWriters taskWriters = TaskWriters.of(table) + .withOrderedWriter(false) + .withDataSourceSchema(dsSchema); + if (keySpec.primaryKeyExisted()) { + taskWriters = taskWriters.withTransactionId(1L); + } + + try (TaskWriter writer = taskWriters.newBaseWriter(true)) { + records.stream() + .map(r -> recordToInternalRow(schema, r)) + .forEach( + i -> { + try { + writer.write(i); + } catch (IOException e) { + throw new RuntimeException(e); + } + }); + WriteResult result = writer.complete(); + DataFile[] dataFiles = result.dataFiles(); + HiveDataTestHelpers.assertWriteConsistentFilesName( + (SupportHive) table, Lists.newArrayList(dataFiles)); + } catch (IOException e) { + throw new RuntimeException(e); + } + } +} +