From 211f5d550b1d505d0c2da1e190551919448e0605 Mon Sep 17 00:00:00 2001 From: ggershinsky Date: Fri, 12 Jan 2024 02:20:50 +0200 Subject: [PATCH] Spark 3.5: Support encrypted output files (#9435) --- .../spark/source/SparkAppenderFactory.java | 20 ++++++++++++------- 1 file changed, 13 insertions(+), 7 deletions(-) diff --git a/spark/v3.5/spark/src/main/java/org/apache/iceberg/spark/source/SparkAppenderFactory.java b/spark/v3.5/spark/src/main/java/org/apache/iceberg/spark/source/SparkAppenderFactory.java index 6372edde0782..9df12fc060ae 100644 --- a/spark/v3.5/spark/src/main/java/org/apache/iceberg/spark/source/SparkAppenderFactory.java +++ b/spark/v3.5/spark/src/main/java/org/apache/iceberg/spark/source/SparkAppenderFactory.java @@ -31,6 +31,7 @@ import org.apache.iceberg.deletes.EqualityDeleteWriter; import org.apache.iceberg.deletes.PositionDeleteWriter; import org.apache.iceberg.encryption.EncryptedOutputFile; +import org.apache.iceberg.encryption.EncryptionUtil; import org.apache.iceberg.exceptions.RuntimeIOException; import org.apache.iceberg.io.DataWriter; import org.apache.iceberg.io.DeleteSchemaUtil; @@ -162,6 +163,11 @@ private StructType lazyPosDeleteSparkType() { @Override public FileAppender newAppender(OutputFile file, FileFormat fileFormat) { + return newAppender(EncryptionUtil.plainAsEncryptedOutput(file), fileFormat); + } + + @Override + public FileAppender newAppender(EncryptedOutputFile file, FileFormat fileFormat) { MetricsConfig metricsConfig = MetricsConfig.fromProperties(properties); try { switch (fileFormat) { @@ -203,7 +209,7 @@ public FileAppender newAppender(OutputFile file, FileFormat fileFor public DataWriter newDataWriter( EncryptedOutputFile file, FileFormat format, StructLike partition) { return new DataWriter<>( - newAppender(file.encryptingOutputFile(), format), + newAppender(file, format), format, file.encryptingOutputFile().location(), spec, @@ -224,7 +230,7 @@ public EqualityDeleteWriter newEqDeleteWriter( try { switch (format) { case PARQUET: - return Parquet.writeDeletes(file.encryptingOutputFile()) + return Parquet.writeDeletes(file) .createWriterFunc( msgType -> SparkParquetWriters.buildWriter(lazyEqDeleteSparkType(), msgType)) .overwrite() @@ -236,7 +242,7 @@ public EqualityDeleteWriter newEqDeleteWriter( .buildEqualityWriter(); case AVRO: - return Avro.writeDeletes(file.encryptingOutputFile()) + return Avro.writeDeletes(file) .createWriterFunc(ignored -> new SparkAvroWriter(lazyEqDeleteSparkType())) .overwrite() .rowSchema(eqDeleteRowSchema) @@ -247,7 +253,7 @@ public EqualityDeleteWriter newEqDeleteWriter( .buildEqualityWriter(); case ORC: - return ORC.writeDeletes(file.encryptingOutputFile()) + return ORC.writeDeletes(file) .createWriterFunc(SparkOrcWriter::new) .overwrite() .rowSchema(eqDeleteRowSchema) @@ -274,7 +280,7 @@ public PositionDeleteWriter newPosDeleteWriter( case PARQUET: StructType sparkPosDeleteSchema = SparkSchemaUtil.convert(DeleteSchemaUtil.posDeleteSchema(posDeleteRowSchema)); - return Parquet.writeDeletes(file.encryptingOutputFile()) + return Parquet.writeDeletes(file) .createWriterFunc( msgType -> SparkParquetWriters.buildWriter(sparkPosDeleteSchema, msgType)) .overwrite() @@ -286,7 +292,7 @@ public PositionDeleteWriter newPosDeleteWriter( .buildPositionWriter(); case AVRO: - return Avro.writeDeletes(file.encryptingOutputFile()) + return Avro.writeDeletes(file) .createWriterFunc(ignored -> new SparkAvroWriter(lazyPosDeleteSparkType())) .overwrite() .rowSchema(posDeleteRowSchema) @@ -296,7 +302,7 @@ public PositionDeleteWriter newPosDeleteWriter( .buildPositionWriter(); case ORC: - return ORC.writeDeletes(file.encryptingOutputFile()) + return ORC.writeDeletes(file) .createWriterFunc(SparkOrcWriter::new) .overwrite() .rowSchema(posDeleteRowSchema)