Skip to content

Commit

Permalink
Enum -> Class for dynamic FileFormat handling
Browse files Browse the repository at this point in the history
  • Loading branch information
pvary committed Jan 28, 2025
1 parent 5b890ae commit d3875fb
Show file tree
Hide file tree
Showing 17 changed files with 161 additions and 114 deletions.
69 changes: 58 additions & 11 deletions api/src/main/java/org/apache/iceberg/FileFormat.java
Original file line number Diff line number Diff line change
Expand Up @@ -18,28 +18,57 @@
*/
package org.apache.iceberg;

import java.io.Serializable;
import java.util.Arrays;
import java.util.Locale;
import java.util.Map;
import org.apache.iceberg.relocated.com.google.common.base.Preconditions;
import org.apache.iceberg.relocated.com.google.common.collect.Maps;
import org.apache.iceberg.types.Comparators;

/** Enum of supported file formats. */
public enum FileFormat {
PUFFIN("puffin", false),
ORC("orc", true),
PARQUET("parquet", true),
AVRO("avro", true),
METADATA("metadata.json", false);
/**
* Enum like class of supported file formats which allows registering new FileFormats dynamically.
*/
public final class FileFormat implements Serializable {
public static final FileFormat PUFFIN = new FileFormat("PUFFIN", "puffin", false);
public static final FileFormat ORC = new FileFormat("ORC", "orc", true);
public static final FileFormat PARQUET = new FileFormat("PARQUET", "parquet", true);
public static final FileFormat AVRO = new FileFormat("AVRO", "avro", true);
public static final FileFormat METADATA = new FileFormat("METADATA", "metadata.json", false);

private static final Map<String, FileFormat> VALUES = Maps.newLinkedHashMap();

static {
Arrays.stream(FileFormat.class.getDeclaredFields())
.filter(declaredField -> declaredField.getType() == FileFormat.class)
.forEach(
f -> {
try {
VALUES.putIfAbsent(f.getName(), (FileFormat) f.get(null));
} catch (IllegalAccessException e) {
throw new RuntimeException("Initialization error: " + f.getName() + " " + e);
}
});
}

public static void register(FileFormat format) {
VALUES.putIfAbsent(format.name, format);
}

private final String name;
private final String ext;
private final boolean splittable;

private static final FileFormat[] VALUES = values();

FileFormat(String ext, boolean splittable) {
FileFormat(String name, String ext, boolean splittable) {
this.name = name;
this.ext = "." + ext;
this.splittable = splittable;
}

public String name() {
return name;
}

public boolean isSplittable() {
return splittable;
}
Expand All @@ -58,7 +87,7 @@ public String addExtension(String filename) {
}

public static FileFormat fromFileName(CharSequence filename) {
for (FileFormat format : VALUES) {
for (FileFormat format : VALUES.values()) {
int extStart = filename.length() - format.ext.length();
if (Comparators.charSequences()
.compare(format.ext, filename.subSequence(extStart, filename.length()))
Expand All @@ -78,4 +107,22 @@ public static FileFormat fromString(String fileFormat) {
throw new IllegalArgumentException(String.format("Invalid file format: %s", fileFormat), e);
}
}

public static FileFormat valueOf(String name) {
FileFormat format = VALUES.get(name);
if (format == null) {
throw new IllegalArgumentException("No FileFormat by the name " + name + " found");
}

return format;
}

@Override
public String toString() {
return name;
}

private Object readResolve() {
return FileFormat.valueOf(name);
}
}
4 changes: 2 additions & 2 deletions core/src/main/java/org/apache/iceberg/ManifestReader.java
Original file line number Diff line number Diff line change
Expand Up @@ -258,8 +258,8 @@ private CloseableIterable<ManifestEntry<F>> open(Schema projection) {
fields.addAll(projection.asStruct().fields());
fields.add(MetadataColumns.ROW_POSITION);

switch (format) {
case AVRO:
switch (format.name()) {
case "AVRO":
AvroIterable<ManifestEntry<F>> reader =
Avro.read(file)
.project(ManifestEntry.wrapFileSchema(Types.StructType.of(fields)))
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -97,8 +97,8 @@ public DataWriter<T> newDataWriter(
MetricsConfig metricsConfig = MetricsConfig.forTable(table);

try {
switch (dataFileFormat) {
case AVRO:
switch (dataFileFormat.name()) {
case "AVRO":
Avro.DataWriteBuilder avroBuilder =
Avro.writeData(file)
.schema(dataSchema)
Expand All @@ -114,7 +114,7 @@ public DataWriter<T> newDataWriter(

return avroBuilder.build();

case PARQUET:
case "PARQUET":
Parquet.DataWriteBuilder parquetBuilder =
Parquet.writeData(file)
.schema(dataSchema)
Expand All @@ -130,7 +130,7 @@ public DataWriter<T> newDataWriter(

return parquetBuilder.build();

case ORC:
case "ORC":
ORC.DataWriteBuilder orcBuilder =
ORC.writeData(file)
.schema(dataSchema)
Expand Down Expand Up @@ -163,8 +163,8 @@ public EqualityDeleteWriter<T> newEqualityDeleteWriter(
MetricsConfig metricsConfig = MetricsConfig.forTable(table);

try {
switch (deleteFileFormat) {
case AVRO:
switch (deleteFileFormat.name()) {
case "AVRO":
Avro.DeleteWriteBuilder avroBuilder =
Avro.writeDeletes(file)
.setAll(properties)
Expand All @@ -181,7 +181,7 @@ public EqualityDeleteWriter<T> newEqualityDeleteWriter(

return avroBuilder.buildEqualityWriter();

case PARQUET:
case "PARQUET":
Parquet.DeleteWriteBuilder parquetBuilder =
Parquet.writeDeletes(file)
.setAll(properties)
Expand All @@ -198,7 +198,7 @@ public EqualityDeleteWriter<T> newEqualityDeleteWriter(

return parquetBuilder.buildEqualityWriter();

case ORC:
case "ORC":
ORC.DeleteWriteBuilder orcBuilder =
ORC.writeDeletes(file)
.setAll(properties)
Expand Down Expand Up @@ -232,8 +232,8 @@ public PositionDeleteWriter<T> newPositionDeleteWriter(
MetricsConfig metricsConfig = MetricsConfig.forPositionDelete(table);

try {
switch (deleteFileFormat) {
case AVRO:
switch (deleteFileFormat.name()) {
case "AVRO":
Avro.DeleteWriteBuilder avroBuilder =
Avro.writeDeletes(file)
.setAll(properties)
Expand All @@ -248,7 +248,7 @@ public PositionDeleteWriter<T> newPositionDeleteWriter(

return avroBuilder.buildPositionWriter();

case PARQUET:
case "PARQUET":
Parquet.DeleteWriteBuilder parquetBuilder =
Parquet.writeDeletes(file)
.setAll(properties)
Expand All @@ -263,7 +263,7 @@ public PositionDeleteWriter<T> newPositionDeleteWriter(

return parquetBuilder.buildPositionWriter();

case ORC:
case "ORC":
ORC.DeleteWriteBuilder orcBuilder =
ORC.writeDeletes(file)
.setAll(properties)
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -93,8 +93,8 @@ public FileAppender<Record> newAppender(
EncryptedOutputFile encryptedOutputFile, FileFormat fileFormat) {
MetricsConfig metricsConfig = MetricsConfig.fromProperties(config);
try {
switch (fileFormat) {
case AVRO:
switch (fileFormat.name()) {
case "AVRO":
return Avro.write(encryptedOutputFile)
.schema(schema)
.createWriterFunc(DataWriter::create)
Expand All @@ -103,7 +103,7 @@ public FileAppender<Record> newAppender(
.overwrite()
.build();

case PARQUET:
case "PARQUET":
return Parquet.write(encryptedOutputFile)
.schema(schema)
.createWriterFunc(GenericParquetWriter::buildWriter)
Expand All @@ -112,7 +112,7 @@ public FileAppender<Record> newAppender(
.overwrite()
.build();

case ORC:
case "ORC":
return ORC.write(encryptedOutputFile)
.schema(schema)
.createWriterFunc(GenericOrcWriter::buildWriter)
Expand Down Expand Up @@ -155,8 +155,8 @@ public EqualityDeleteWriter<Record> newEqDeleteWriter(
MetricsConfig metricsConfig = MetricsConfig.fromProperties(config);

try {
switch (format) {
case AVRO:
switch (format.name()) {
case "AVRO":
return Avro.writeDeletes(file)
.createWriterFunc(DataWriter::create)
.withPartition(partition)
Expand All @@ -168,7 +168,7 @@ public EqualityDeleteWriter<Record> newEqDeleteWriter(
.equalityFieldIds(equalityFieldIds)
.buildEqualityWriter();

case ORC:
case "ORC":
return ORC.writeDeletes(file)
.createWriterFunc(GenericOrcWriter::buildWriter)
.withPartition(partition)
Expand All @@ -181,7 +181,7 @@ public EqualityDeleteWriter<Record> newEqDeleteWriter(
.equalityFieldIds(equalityFieldIds)
.buildEqualityWriter();

case PARQUET:
case "PARQUET":
return Parquet.writeDeletes(file)
.createWriterFunc(GenericParquetWriter::buildWriter)
.withPartition(partition)
Expand Down Expand Up @@ -209,8 +209,8 @@ public PositionDeleteWriter<Record> newPosDeleteWriter(
MetricsConfig metricsConfig = MetricsConfig.fromProperties(config);

try {
switch (format) {
case AVRO:
switch (format.name()) {
case "AVRO":
return Avro.writeDeletes(file)
.createWriterFunc(DataWriter::create)
.withPartition(partition)
Expand All @@ -221,7 +221,7 @@ public PositionDeleteWriter<Record> newPosDeleteWriter(
.withKeyMetadata(file.keyMetadata())
.buildPositionWriter();

case ORC:
case "ORC":
return ORC.writeDeletes(file)
.createWriterFunc(GenericOrcWriter::buildWriter)
.withPartition(partition)
Expand All @@ -232,7 +232,7 @@ public PositionDeleteWriter<Record> newPosDeleteWriter(
.withKeyMetadata(file.keyMetadata())
.buildPositionWriter();

case PARQUET:
case "PARQUET":
return Parquet.writeDeletes(file)
.createWriterFunc(GenericParquetWriter::buildWriter)
.withPartition(partition)
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -160,11 +160,11 @@ public static List<Object> parameters() {

@BeforeEach
public void createInputFile() throws IOException {
switch (format) {
case ORC:
switch (format.name()) {
case "ORC":
createOrcInputFile();
break;
case PARQUET:
case "PARQUET":
createParquetInputFile();
break;
default:
Expand Down Expand Up @@ -358,13 +358,13 @@ public void testIsNaN() {
.isTrue();

shouldRead = shouldRead(isNaN("no_nans"));
switch (format) {
case ORC:
switch (format.name()) {
case "ORC":
assertThat(shouldRead)
.as("Should read 0 rows due to the ORC filter push-down feature")
.isFalse();
break;
case PARQUET:
case "PARQUET":
assertThat(shouldRead)
.as("Should read: NaN counts are not tracked in Parquet metrics")
.isTrue();
Expand Down Expand Up @@ -995,10 +995,10 @@ private boolean shouldRead(Expression expression) {
}

private boolean shouldRead(Expression expression, boolean caseSensitive) {
switch (format) {
case ORC:
switch (format.name()) {
case "ORC":
return shouldReadOrc(expression, caseSensitive);
case PARQUET:
case "PARQUET":
return shouldReadParquet(expression, caseSensitive, parquetSchema, rowGroupMetadata);
default:
throw new UnsupportedOperationException(
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -164,11 +164,11 @@ public void createInputFile() throws IOException {
record.setField("_fixed_decimal", new BigDecimal("99.99"));
records.add(record);
}
switch (format) {
case ORC:
switch (format.name()) {
case "ORC":
createOrcInputFile(records);
break;
case PARQUET:
case "PARQUET":
createParquetInputFile(records);
break;
default:
Expand Down Expand Up @@ -309,10 +309,10 @@ public void testEq() {
}

private boolean shouldRead(Object value) {
switch (format) {
case ORC:
switch (format.name()) {
case "ORC":
return shouldReadOrc(value);
case PARQUET:
case "PARQUET":
return shouldReadParquet(value);
default:
throw new UnsupportedOperationException(
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -329,20 +329,20 @@ public void testPosDeleteWriterWithRowSchema() throws IOException {
}

private CloseableIterable<Record> createReader(Schema schema, InputFile inputFile) {
switch (format) {
case PARQUET:
switch (format.name()) {
case "PARQUET":
return Parquet.read(inputFile)
.project(schema)
.createReaderFunc(fileSchema -> GenericParquetReaders.buildReader(schema, fileSchema))
.build();

case AVRO:
case "AVRO":
return Avro.read(inputFile)
.project(schema)
.createResolvingReader(PlannedDataReader::create)
.build();

case ORC:
case "ORC":
return ORC.read(inputFile)
.project(schema)
.createReaderFunc(fileSchema -> GenericOrcReader.buildReader(schema, fileSchema))
Expand Down
Loading

0 comments on commit d3875fb

Please sign in to comment.