Skip to content

Commit

Permalink
Merge pull request #74 from rdblue/parquet_internal_writer
Browse files Browse the repository at this point in the history
Fix type params and time reader.
  • Loading branch information
ajantha-bhat authored Jan 24, 2025
2 parents 354f847 + a116756 commit 36063d6
Show file tree
Hide file tree
Showing 3 changed files with 40 additions and 16 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -21,7 +21,7 @@
import java.util.List;
import java.util.Map;
import org.apache.iceberg.Schema;
import org.apache.iceberg.data.Record;
import org.apache.iceberg.StructLike;
import org.apache.iceberg.parquet.ParquetValueReader;
import org.apache.iceberg.parquet.ParquetValueReaders;
import org.apache.iceberg.types.Types.StructType;
Expand All @@ -31,25 +31,28 @@
import org.apache.parquet.schema.PrimitiveType;
import org.apache.parquet.schema.Type;

public class InternalReader extends BaseParquetReaders<Record> {
public class InternalReader<T extends StructLike> extends BaseParquetReaders<T> {

private static final InternalReader INSTANCE = new InternalReader();
private static final InternalReader<?> INSTANCE = new InternalReader<>();

private InternalReader() {}

public static ParquetValueReader<Record> create(Schema expectedSchema, MessageType fileSchema) {
return INSTANCE.createReader(expectedSchema, fileSchema);
@SuppressWarnings("unchecked")
public static <T extends StructLike> ParquetValueReader<T> create(Schema expectedSchema, MessageType fileSchema) {
return (ParquetValueReader<T>) INSTANCE.createReader(expectedSchema, fileSchema);
}

public static ParquetValueReader<Record> create(
@SuppressWarnings("unchecked")
public static <T extends StructLike> ParquetValueReader<T> create(
Schema expectedSchema, MessageType fileSchema, Map<Integer, ?> idToConstant) {
return INSTANCE.createReader(expectedSchema, fileSchema, idToConstant);
return (ParquetValueReader<T>) INSTANCE.createReader(expectedSchema, fileSchema, idToConstant);
}

@Override
protected ParquetValueReader<Record> createStructReader(
@SuppressWarnings("unchecked")
protected ParquetValueReader<T> createStructReader(
List<Type> types, List<ParquetValueReader<?>> fieldReaders, StructType structType) {
return ParquetValueReaders.recordReader(types, fieldReaders, structType);
return (ParquetValueReader<T>) ParquetValueReaders.recordReader(types, fieldReaders, structType);
}

@Override
Expand All @@ -66,7 +69,7 @@ protected ParquetValueReader<?> dateReader(ColumnDescriptor desc) {
protected ParquetValueReader<?> timeReader(
ColumnDescriptor desc, LogicalTypeAnnotation.TimeUnit unit) {
if (unit == LogicalTypeAnnotation.TimeUnit.MILLIS) {
return ParquetValueReaders.millisAsTimestamps(desc);
return ParquetValueReaders.millisAsTimes(desc);
}

return new ParquetValueReaders.UnboxedReader<>(desc);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -33,17 +33,18 @@
* <p>Iceberg's internal in-memory object model produces the types defined in {@link
* Type.TypeID#javaClass()}.
*/
public class InternalWriter extends BaseParquetWriter<StructLike> {
private static final InternalWriter INSTANCE = new InternalWriter();
public class InternalWriter<T extends StructLike> extends BaseParquetWriter<T> {
private static final InternalWriter<?> INSTANCE = new InternalWriter<>();

private InternalWriter() {}

public static ParquetValueWriter<StructLike> create(MessageType type) {
return INSTANCE.createWriter(type);
@SuppressWarnings("unchecked")
public static <T extends StructLike> ParquetValueWriter<T> create(MessageType type) {
return (ParquetValueWriter<T>) INSTANCE.createWriter(type);
}

@Override
protected StructWriter<StructLike> createStructWriter(List<ParquetValueWriter<?>> writers) {
protected StructWriter<T> createStructWriter(List<ParquetValueWriter<?>> writers) {
return ParquetValueWriters.recordWriter(writers);
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -77,6 +77,10 @@ public static ParquetValueReader<Long> int96Timestamps(ColumnDescriptor desc) {
return new ParquetValueReaders.TimestampInt96Reader(desc);
}

public static ParquetValueReader<Long> millisAsTimes(ColumnDescriptor desc) {
return new ParquetValueReaders.TimeMillisReader(desc);
}

public static ParquetValueReader<Long> millisAsTimestamps(ColumnDescriptor desc) {
return new ParquetValueReaders.TimestampMillisReader(desc);
}
Expand Down Expand Up @@ -465,6 +469,22 @@ public long readLong() {
}
}

private static class TimeMillisReader extends UnboxedReader<Long> {
private TimeMillisReader(ColumnDescriptor desc) {
super(desc);
}

@Override
public Long read(Long ignored) {
return readLong();
}

@Override
public long readLong() {
return 1000L * column.nextInteger();
}
}

private static class TimestampMillisReader extends UnboxedReader<Long> {
private TimestampMillisReader(ColumnDescriptor desc) {
super(desc);
Expand All @@ -477,7 +497,7 @@ public Long read(Long ignored) {

@Override
public long readLong() {
return 1000L * column.nextInteger();
return 1000L * column.nextLong();
}
}

Expand Down

0 comments on commit 36063d6

Please sign in to comment.