Skip to content

Commit

Permalink
Spark 3.5: Support encrypted output files (apache#9435)
Browse files Browse the repository at this point in the history
  • Loading branch information
ggershinsky authored Jan 12, 2024
1 parent 53a1c86 commit 211f5d5
Showing 1 changed file with 13 additions and 7 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -31,6 +31,7 @@
import org.apache.iceberg.deletes.EqualityDeleteWriter;
import org.apache.iceberg.deletes.PositionDeleteWriter;
import org.apache.iceberg.encryption.EncryptedOutputFile;
import org.apache.iceberg.encryption.EncryptionUtil;
import org.apache.iceberg.exceptions.RuntimeIOException;
import org.apache.iceberg.io.DataWriter;
import org.apache.iceberg.io.DeleteSchemaUtil;
Expand Down Expand Up @@ -162,6 +163,11 @@ private StructType lazyPosDeleteSparkType() {

@Override
public FileAppender<InternalRow> newAppender(OutputFile file, FileFormat fileFormat) {
return newAppender(EncryptionUtil.plainAsEncryptedOutput(file), fileFormat);
}

@Override
public FileAppender<InternalRow> newAppender(EncryptedOutputFile file, FileFormat fileFormat) {
MetricsConfig metricsConfig = MetricsConfig.fromProperties(properties);
try {
switch (fileFormat) {
Expand Down Expand Up @@ -203,7 +209,7 @@ public FileAppender<InternalRow> newAppender(OutputFile file, FileFormat fileFor
public DataWriter<InternalRow> newDataWriter(
EncryptedOutputFile file, FileFormat format, StructLike partition) {
return new DataWriter<>(
newAppender(file.encryptingOutputFile(), format),
newAppender(file, format),
format,
file.encryptingOutputFile().location(),
spec,
Expand All @@ -224,7 +230,7 @@ public EqualityDeleteWriter<InternalRow> newEqDeleteWriter(
try {
switch (format) {
case PARQUET:
return Parquet.writeDeletes(file.encryptingOutputFile())
return Parquet.writeDeletes(file)
.createWriterFunc(
msgType -> SparkParquetWriters.buildWriter(lazyEqDeleteSparkType(), msgType))
.overwrite()
Expand All @@ -236,7 +242,7 @@ public EqualityDeleteWriter<InternalRow> newEqDeleteWriter(
.buildEqualityWriter();

case AVRO:
return Avro.writeDeletes(file.encryptingOutputFile())
return Avro.writeDeletes(file)
.createWriterFunc(ignored -> new SparkAvroWriter(lazyEqDeleteSparkType()))
.overwrite()
.rowSchema(eqDeleteRowSchema)
Expand All @@ -247,7 +253,7 @@ public EqualityDeleteWriter<InternalRow> newEqDeleteWriter(
.buildEqualityWriter();

case ORC:
return ORC.writeDeletes(file.encryptingOutputFile())
return ORC.writeDeletes(file)
.createWriterFunc(SparkOrcWriter::new)
.overwrite()
.rowSchema(eqDeleteRowSchema)
Expand All @@ -274,7 +280,7 @@ public PositionDeleteWriter<InternalRow> newPosDeleteWriter(
case PARQUET:
StructType sparkPosDeleteSchema =
SparkSchemaUtil.convert(DeleteSchemaUtil.posDeleteSchema(posDeleteRowSchema));
return Parquet.writeDeletes(file.encryptingOutputFile())
return Parquet.writeDeletes(file)
.createWriterFunc(
msgType -> SparkParquetWriters.buildWriter(sparkPosDeleteSchema, msgType))
.overwrite()
Expand All @@ -286,7 +292,7 @@ public PositionDeleteWriter<InternalRow> newPosDeleteWriter(
.buildPositionWriter();

case AVRO:
return Avro.writeDeletes(file.encryptingOutputFile())
return Avro.writeDeletes(file)
.createWriterFunc(ignored -> new SparkAvroWriter(lazyPosDeleteSparkType()))
.overwrite()
.rowSchema(posDeleteRowSchema)
Expand All @@ -296,7 +302,7 @@ public PositionDeleteWriter<InternalRow> newPosDeleteWriter(
.buildPositionWriter();

case ORC:
return ORC.writeDeletes(file.encryptingOutputFile())
return ORC.writeDeletes(file)
.createWriterFunc(SparkOrcWriter::new)
.overwrite()
.rowSchema(posDeleteRowSchema)
Expand Down

0 comments on commit 211f5d5

Please sign in to comment.