Skip to content

Commit

Permalink
Address latest comments
Browse files Browse the repository at this point in the history
  • Loading branch information
ajantha-bhat committed Jan 22, 2025
1 parent 3bb323b commit bfdd45a
Show file tree
Hide file tree
Showing 9 changed files with 306 additions and 270 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -18,19 +18,9 @@
*/
package org.apache.iceberg.data.parquet;

import java.nio.ByteBuffer;
import java.nio.ByteOrder;
import java.time.Instant;
import java.time.LocalDate;
import java.time.LocalDateTime;
import java.time.LocalTime;
import java.time.OffsetDateTime;
import java.time.ZoneOffset;
import java.time.temporal.ChronoUnit;
import java.util.List;
import java.util.Map;
import java.util.Optional;
import java.util.concurrent.TimeUnit;
import org.apache.iceberg.MetadataColumns;
import org.apache.iceberg.Schema;
import org.apache.iceberg.parquet.ParquetSchemaUtil;
Expand Down Expand Up @@ -81,40 +71,40 @@ protected abstract ParquetValueReader<T> createStructReader(
List<Type> types, List<ParquetValueReader<?>> fieldReaders, Types.StructType structType);

protected ParquetValueReader<?> fixedReader(ColumnDescriptor desc) {
return new FixedReader(desc);
return new GenericParquetReaders.FixedReader(desc);
}

protected ParquetValueReader<?> dateReader(ColumnDescriptor desc) {
return new DateReader(desc);
return new GenericParquetReaders.DateReader(desc);
}

protected ParquetValueReader<?> timeReader(
ColumnDescriptor desc, LogicalTypeAnnotation.TimeUnit unit) {
switch (unit) {
case MICROS:
return new TimeReader(desc);
return new GenericParquetReaders.TimeReader(desc);
case MILLIS:
return new TimeMillisReader(desc);
return new GenericParquetReaders.TimeMillisReader(desc);
default:
throw new UnsupportedOperationException("Unsupported Unit: " + unit);
}
}

protected ParquetValueReader<?> timestampReader(
ColumnDescriptor desc, LogicalTypeAnnotation.TimeUnit unit, boolean isAdjustedToUTC) {
if (desc.getPrimitiveType().getPrimitiveTypeName() == PrimitiveType.PrimitiveTypeName.INT96) {
return new GenericParquetReaders.TimestampInt96Reader(desc);
}

switch (unit) {
case MICROS:
return isAdjustedToUTC ? new TimestamptzReader(desc) : new TimestampReader(desc);
return isAdjustedToUTC
? new GenericParquetReaders.TimestamptzReader(desc)
: new GenericParquetReaders.TimestampReader(desc);
case MILLIS:
return isAdjustedToUTC
? new TimestamptzMillisReader(desc)
: new TimestampMillisReader(desc);
case NANOS:
if (isAdjustedToUTC) {
return new TimestampInt96Reader(desc);
} else {
throw new UnsupportedOperationException("Nanos should be adjusted to UTC");
}
? new GenericParquetReaders.TimestamptzMillisReader(desc)
: new GenericParquetReaders.TimestampMillisReader(desc);
default:
throw new UnsupportedOperationException("Unsupported Unit: " + unit);
}
Expand Down Expand Up @@ -443,124 +433,4 @@ MessageType type() {
return type;
}
}

private static final OffsetDateTime EPOCH = Instant.ofEpochSecond(0).atOffset(ZoneOffset.UTC);
private static final LocalDate EPOCH_DAY = EPOCH.toLocalDate();

private static class DateReader extends ParquetValueReaders.PrimitiveReader<LocalDate> {
private DateReader(ColumnDescriptor desc) {
super(desc);
}

@Override
public LocalDate read(LocalDate reuse) {
return EPOCH_DAY.plusDays(column.nextInteger());
}
}

private static class TimestampReader extends ParquetValueReaders.PrimitiveReader<LocalDateTime> {
private TimestampReader(ColumnDescriptor desc) {
super(desc);
}

@Override
public LocalDateTime read(LocalDateTime reuse) {
return EPOCH.plus(column.nextLong(), ChronoUnit.MICROS).toLocalDateTime();
}
}

private static class TimestampMillisReader
extends ParquetValueReaders.PrimitiveReader<LocalDateTime> {
private TimestampMillisReader(ColumnDescriptor desc) {
super(desc);
}

@Override
public LocalDateTime read(LocalDateTime reuse) {
return EPOCH.plus(column.nextLong() * 1000, ChronoUnit.MICROS).toLocalDateTime();
}
}

private static class TimestampInt96Reader
extends ParquetValueReaders.PrimitiveReader<OffsetDateTime> {
private static final long UNIX_EPOCH_JULIAN = 2_440_588L;

private TimestampInt96Reader(ColumnDescriptor desc) {
super(desc);
}

@Override
public OffsetDateTime read(OffsetDateTime reuse) {
final ByteBuffer byteBuffer =
column.nextBinary().toByteBuffer().order(ByteOrder.LITTLE_ENDIAN);
final long timeOfDayNanos = byteBuffer.getLong();
final int julianDay = byteBuffer.getInt();

return Instant.ofEpochMilli(TimeUnit.DAYS.toMillis(julianDay - UNIX_EPOCH_JULIAN))
.plusNanos(timeOfDayNanos)
.atOffset(ZoneOffset.UTC);
}
}

private static class TimestamptzReader
extends ParquetValueReaders.PrimitiveReader<OffsetDateTime> {
private TimestamptzReader(ColumnDescriptor desc) {
super(desc);
}

@Override
public OffsetDateTime read(OffsetDateTime reuse) {
return EPOCH.plus(column.nextLong(), ChronoUnit.MICROS);
}
}

private static class TimestamptzMillisReader
extends ParquetValueReaders.PrimitiveReader<OffsetDateTime> {
private TimestamptzMillisReader(ColumnDescriptor desc) {
super(desc);
}

@Override
public OffsetDateTime read(OffsetDateTime reuse) {
return EPOCH.plus(column.nextLong() * 1000, ChronoUnit.MICROS);
}
}

private static class TimeMillisReader extends ParquetValueReaders.PrimitiveReader<LocalTime> {
private TimeMillisReader(ColumnDescriptor desc) {
super(desc);
}

@Override
public LocalTime read(LocalTime reuse) {
return LocalTime.ofNanoOfDay(column.nextInteger() * 1000000L);
}
}

private static class TimeReader extends ParquetValueReaders.PrimitiveReader<LocalTime> {
private TimeReader(ColumnDescriptor desc) {
super(desc);
}

@Override
public LocalTime read(LocalTime reuse) {
return LocalTime.ofNanoOfDay(column.nextLong() * 1000L);
}
}

private static class FixedReader extends ParquetValueReaders.PrimitiveReader<byte[]> {
private FixedReader(ColumnDescriptor desc) {
super(desc);
}

@Override
public byte[] read(byte[] reuse) {
if (reuse != null) {
column.nextBinary().toByteBuffer().duplicate().get(reuse);
return reuse;
} else {
return column.nextBinary().getBytes();
}
}
}
}
Loading

0 comments on commit bfdd45a

Please sign in to comment.