Skip to content

Commit

Permalink
Handle remaining comments
Browse files Browse the repository at this point in the history
  • Loading branch information
ajantha-bhat committed Jan 23, 2025
1 parent bfdd45a commit 354f847
Show file tree
Hide file tree
Showing 8 changed files with 61 additions and 109 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -583,7 +583,7 @@ private void checkColumnarBatch(
columnSet,
"uuid",
(records, i) -> records.get(i).getField("uuid"),
ColumnVector::getBinary);
(array, i) -> UUIDUtil.convert(array.getBinary(i)));

checkColumnarArrayValues(
expectedNumRows,
Expand All @@ -593,7 +593,7 @@ private void checkColumnarBatch(
columnSet,
"uuid_nullable",
(records, i) -> records.get(i).getField("uuid_nullable"),
ColumnVector::getBinary);
(array, i) -> UUIDUtil.convert(array.getBinary(i)));

checkColumnarArrayValues(
expectedNumRows,
Expand Down Expand Up @@ -820,8 +820,7 @@ private List<GenericRecord> createIncrementalRecordsForDate(
rec.setField("int_promotion", i);
rec.setField("time", LocalTime.of(11, i));
rec.setField("time_nullable", LocalTime.of(11, i));
ByteBuffer bb = UUIDUtil.convertToByteBuffer(UUID.randomUUID());
byte[] uuid = bb.array();
UUID uuid = UUID.randomUUID();
rec.setField("uuid", uuid);
rec.setField("uuid_nullable", uuid);
rec.setField("decimal", new BigDecimal("14.0" + i % 10));
Expand Down Expand Up @@ -858,9 +857,7 @@ private List<GenericRecord> createConstantRecordsForDate(Schema schema, LocalDat
rec.setField("int_promotion", 1);
rec.setField("time", LocalTime.of(11, 30));
rec.setField("time_nullable", LocalTime.of(11, 30));
ByteBuffer bb =
UUIDUtil.convertToByteBuffer(UUID.fromString("abcd91cf-08d0-4223-b145-f64030b3077f"));
byte[] uuid = bb.array();
UUID uuid = UUID.fromString("abcd91cf-08d0-4223-b145-f64030b3077f");
rec.setField("uuid", uuid);
rec.setField("uuid_nullable", uuid);
rec.setField("decimal", new BigDecimal("14.20"));
Expand Down Expand Up @@ -1140,7 +1137,7 @@ private void checkAllVectorValues(
columnSet,
"uuid",
(records, i) -> records.get(i).getField("uuid"),
(vector, i) -> ((FixedSizeBinaryVector) vector).get(i));
(vector, i) -> UUIDUtil.convert(((FixedSizeBinaryVector) vector).get(i)));

checkVectorValues(
expectedNumRows,
Expand All @@ -1149,7 +1146,7 @@ private void checkAllVectorValues(
columnSet,
"uuid_nullable",
(records, i) -> records.get(i).getField("uuid_nullable"),
(vector, i) -> ((FixedSizeBinaryVector) vector).get(i));
(vector, i) -> UUIDUtil.convert(((FixedSizeBinaryVector) vector).get(i)));

checkVectorValues(
expectedNumRows,
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -86,7 +86,7 @@ protected ParquetValueReader<?> timeReader(
case MILLIS:
return new GenericParquetReaders.TimeMillisReader(desc);
default:
throw new UnsupportedOperationException("Unsupported Unit: " + unit);
throw new UnsupportedOperationException("Unsupported unit for time: " + unit);
}
}

Expand All @@ -106,7 +106,7 @@ protected ParquetValueReader<?> timestampReader(
? new GenericParquetReaders.TimestamptzMillisReader(desc)
: new GenericParquetReaders.TimestampMillisReader(desc);
default:
throw new UnsupportedOperationException("Unsupported Unit: " + unit);
throw new UnsupportedOperationException("Unsupported unit for timestamp: " + unit);
}
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -66,11 +66,6 @@ protected ParquetValueWriter<?> timestampWriter(ColumnDescriptor desc, boolean i
}
}

protected ParquetValueWriter<?> uuidWriter(ColumnDescriptor desc) {
// Use primitive-type writer (as FIXED_LEN_BYTE_ARRAY); no special writer needed.
return null;
}

private class WriteBuilder extends ParquetTypeVisitor<ParquetValueWriter<?>> {
private final MessageType type;

Expand Down Expand Up @@ -212,12 +207,7 @@ public Optional<ParquetValueWriter<?>> visit(
@Override
public Optional<ParquetValueWriter<?>> visit(
LogicalTypeAnnotation.DateLogicalTypeAnnotation dateType) {
ParquetValueWriter<?> dateWriter = dateWriter(desc);
if (dateWriter != null) {
return Optional.of(dateWriter(desc));
}

return LogicalTypeAnnotation.LogicalTypeAnnotationVisitor.super.visit(dateType);
return Optional.of(dateWriter(desc));
}

@Override
Expand All @@ -227,12 +217,7 @@ public Optional<ParquetValueWriter<?>> visit(
LogicalTypeAnnotation.TimeUnit.MICROS.equals(timeType.getUnit()),
"Cannot write time in %s, only MICROS is supported",
timeType.getUnit());
ParquetValueWriter<?> timeWriter = timeWriter(desc);
if (timeWriter != null) {
return Optional.of(timeWriter);
}

return LogicalTypeAnnotation.LogicalTypeAnnotationVisitor.super.visit(timeType);
return Optional.of(timeWriter(desc));
}

@Override
Expand All @@ -242,13 +227,7 @@ public Optional<ParquetValueWriter<?>> visit(
LogicalTypeAnnotation.TimeUnit.MICROS.equals(timestampType.getUnit()),
"Cannot write timestamp in %s, only MICROS is supported",
timestampType.getUnit());
ParquetValueWriter<?> timestampWriter =
timestampWriter(desc, timestampType.isAdjustedToUTC());
if (timestampWriter != null) {
return Optional.of(timestampWriter);
}

return LogicalTypeAnnotation.LogicalTypeAnnotationVisitor.super.visit(timestampType);
return Optional.of(timestampWriter(desc, timestampType.isAdjustedToUTC()));
}

@Override
Expand Down Expand Up @@ -279,12 +258,7 @@ public Optional<ParquetValueWriter<?>> visit(
@Override
public Optional<ParquetValueWriter<?>> visit(
LogicalTypeAnnotation.UUIDLogicalTypeAnnotation uuidLogicalType) {
ParquetValueWriter<?> uuidWriter = uuidWriter(desc);
if (uuidWriter != null) {
return Optional.of(uuidWriter);
}

return LogicalTypeAnnotation.LogicalTypeAnnotationVisitor.super.visit(uuidLogicalType);
return Optional.of(ParquetValueWriters.uuids(desc));
}
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -70,8 +70,8 @@ protected Object convertConstant(org.apache.iceberg.types.Type type, Object valu
private static final OffsetDateTime EPOCH = Instant.ofEpochSecond(0).atOffset(ZoneOffset.UTC);
private static final LocalDate EPOCH_DAY = EPOCH.toLocalDate();

protected static class DateReader extends ParquetValueReaders.PrimitiveReader<LocalDate> {
protected DateReader(ColumnDescriptor desc) {
static class DateReader extends ParquetValueReaders.PrimitiveReader<LocalDate> {
DateReader(ColumnDescriptor desc) {
super(desc);
}

Expand All @@ -81,9 +81,8 @@ public LocalDate read(LocalDate reuse) {
}
}

protected static class TimestampReader
extends ParquetValueReaders.PrimitiveReader<LocalDateTime> {
protected TimestampReader(ColumnDescriptor desc) {
static class TimestampReader extends ParquetValueReaders.PrimitiveReader<LocalDateTime> {
TimestampReader(ColumnDescriptor desc) {
super(desc);
}

Expand All @@ -93,9 +92,8 @@ public LocalDateTime read(LocalDateTime reuse) {
}
}

protected static class TimestampMillisReader
extends ParquetValueReaders.PrimitiveReader<LocalDateTime> {
protected TimestampMillisReader(ColumnDescriptor desc) {
static class TimestampMillisReader extends ParquetValueReaders.PrimitiveReader<LocalDateTime> {
TimestampMillisReader(ColumnDescriptor desc) {
super(desc);
}

Expand All @@ -105,11 +103,10 @@ public LocalDateTime read(LocalDateTime reuse) {
}
}

protected static class TimestampInt96Reader
extends ParquetValueReaders.PrimitiveReader<OffsetDateTime> {
static class TimestampInt96Reader extends ParquetValueReaders.PrimitiveReader<OffsetDateTime> {
private static final long UNIX_EPOCH_JULIAN = 2_440_588L;

protected TimestampInt96Reader(ColumnDescriptor desc) {
TimestampInt96Reader(ColumnDescriptor desc) {
super(desc);
}

Expand All @@ -126,9 +123,8 @@ public OffsetDateTime read(OffsetDateTime reuse) {
}
}

protected static class TimestamptzReader
extends ParquetValueReaders.PrimitiveReader<OffsetDateTime> {
protected TimestamptzReader(ColumnDescriptor desc) {
static class TimestamptzReader extends ParquetValueReaders.PrimitiveReader<OffsetDateTime> {
TimestamptzReader(ColumnDescriptor desc) {
super(desc);
}

Expand All @@ -138,9 +134,8 @@ public OffsetDateTime read(OffsetDateTime reuse) {
}
}

protected static class TimestamptzMillisReader
extends ParquetValueReaders.PrimitiveReader<OffsetDateTime> {
protected TimestamptzMillisReader(ColumnDescriptor desc) {
static class TimestamptzMillisReader extends ParquetValueReaders.PrimitiveReader<OffsetDateTime> {
TimestamptzMillisReader(ColumnDescriptor desc) {
super(desc);
}

Expand All @@ -150,8 +145,8 @@ public OffsetDateTime read(OffsetDateTime reuse) {
}
}

protected static class TimeMillisReader extends ParquetValueReaders.PrimitiveReader<LocalTime> {
protected TimeMillisReader(ColumnDescriptor desc) {
static class TimeMillisReader extends ParquetValueReaders.PrimitiveReader<LocalTime> {
TimeMillisReader(ColumnDescriptor desc) {
super(desc);
}

Expand All @@ -161,8 +156,8 @@ public LocalTime read(LocalTime reuse) {
}
}

protected static class TimeReader extends ParquetValueReaders.PrimitiveReader<LocalTime> {
protected TimeReader(ColumnDescriptor desc) {
static class TimeReader extends ParquetValueReaders.PrimitiveReader<LocalTime> {
TimeReader(ColumnDescriptor desc) {
super(desc);
}

Expand All @@ -172,8 +167,8 @@ public LocalTime read(LocalTime reuse) {
}
}

protected static class FixedReader extends ParquetValueReaders.PrimitiveReader<byte[]> {
protected FixedReader(ColumnDescriptor desc) {
static class FixedReader extends ParquetValueReaders.PrimitiveReader<byte[]> {
FixedReader(ColumnDescriptor desc) {
super(desc);
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -52,8 +52,8 @@ protected StructWriter<Record> createStructWriter(List<ParquetValueWriter<?>> wr
private static final OffsetDateTime EPOCH = Instant.ofEpochSecond(0).atOffset(ZoneOffset.UTC);
private static final LocalDate EPOCH_DAY = EPOCH.toLocalDate();

protected static class DateWriter extends ParquetValueWriters.PrimitiveWriter<LocalDate> {
protected DateWriter(ColumnDescriptor desc) {
static class DateWriter extends ParquetValueWriters.PrimitiveWriter<LocalDate> {
DateWriter(ColumnDescriptor desc) {
super(desc);
}

Expand All @@ -63,8 +63,8 @@ public void write(int repetitionLevel, LocalDate value) {
}
}

protected static class TimeWriter extends ParquetValueWriters.PrimitiveWriter<LocalTime> {
protected TimeWriter(ColumnDescriptor desc) {
static class TimeWriter extends ParquetValueWriters.PrimitiveWriter<LocalTime> {
TimeWriter(ColumnDescriptor desc) {
super(desc);
}

Expand All @@ -74,9 +74,8 @@ public void write(int repetitionLevel, LocalTime value) {
}
}

protected static class TimestampWriter
extends ParquetValueWriters.PrimitiveWriter<LocalDateTime> {
protected TimestampWriter(ColumnDescriptor desc) {
static class TimestampWriter extends ParquetValueWriters.PrimitiveWriter<LocalDateTime> {
TimestampWriter(ColumnDescriptor desc) {
super(desc);
}

Expand All @@ -87,9 +86,8 @@ public void write(int repetitionLevel, LocalDateTime value) {
}
}

protected static class TimestamptzWriter
extends ParquetValueWriters.PrimitiveWriter<OffsetDateTime> {
protected TimestamptzWriter(ColumnDescriptor desc) {
static class TimestamptzWriter extends ParquetValueWriters.PrimitiveWriter<OffsetDateTime> {
TimestamptzWriter(ColumnDescriptor desc) {
super(desc);
}

Expand All @@ -99,10 +97,10 @@ public void write(int repetitionLevel, OffsetDateTime value) {
}
}

protected static class FixedWriter extends ParquetValueWriters.PrimitiveWriter<byte[]> {
static class FixedWriter extends ParquetValueWriters.PrimitiveWriter<byte[]> {
private final int length;

protected FixedWriter(ColumnDescriptor desc) {
FixedWriter(ColumnDescriptor desc) {
super(desc);
this.length = desc.getPrimitiveType().getTypeLength();
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -21,7 +21,7 @@
import java.util.List;
import java.util.Map;
import org.apache.iceberg.Schema;
import org.apache.iceberg.StructLike;
import org.apache.iceberg.data.Record;
import org.apache.iceberg.parquet.ParquetValueReader;
import org.apache.iceberg.parquet.ParquetValueReaders;
import org.apache.iceberg.types.Types.StructType;
Expand All @@ -31,24 +31,23 @@
import org.apache.parquet.schema.PrimitiveType;
import org.apache.parquet.schema.Type;

public class InternalReader extends BaseParquetReaders<StructLike> {
public class InternalReader extends BaseParquetReaders<Record> {

private static final InternalReader INSTANCE = new InternalReader();

private InternalReader() {}

public static ParquetValueReader<StructLike> create(
Schema expectedSchema, MessageType fileSchema) {
public static ParquetValueReader<Record> create(Schema expectedSchema, MessageType fileSchema) {
return INSTANCE.createReader(expectedSchema, fileSchema);
}

public static ParquetValueReader<StructLike> create(
public static ParquetValueReader<Record> create(
Schema expectedSchema, MessageType fileSchema, Map<Integer, ?> idToConstant) {
return INSTANCE.createReader(expectedSchema, fileSchema, idToConstant);
}

@Override
protected ParquetValueReader<StructLike> createStructReader(
protected ParquetValueReader<Record> createStructReader(
List<Type> types, List<ParquetValueReader<?>> fieldReaders, StructType structType) {
return ParquetValueReaders.recordReader(types, fieldReaders, structType);
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -54,24 +54,16 @@ protected ParquetValueWriter<?> fixedWriter(ColumnDescriptor desc) {

@Override
protected ParquetValueWriter<?> dateWriter(ColumnDescriptor desc) {
// Use primitive-type writer; no special writer needed.
return null;
return ParquetValueWriters.ints(desc);
}

@Override
protected ParquetValueWriter<?> timeWriter(ColumnDescriptor desc) {
// Use primitive-type writer; no special writer needed.
return null;
return ParquetValueWriters.longs(desc);
}

@Override
protected ParquetValueWriter<?> timestampWriter(ColumnDescriptor desc, boolean isAdjustedToUTC) {
// Use primitive-type writer; no special writer needed.
return null;
}

@Override
protected ParquetValueWriter<?> uuidWriter(ColumnDescriptor desc) {
return ParquetValueWriters.uuids(desc);
return ParquetValueWriters.longs(desc);
}
}
Loading

0 comments on commit 354f847

Please sign in to comment.