Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Fix type params and time reader. #74

Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -21,7 +21,7 @@
import java.util.List;
import java.util.Map;
import org.apache.iceberg.Schema;
import org.apache.iceberg.data.Record;
import org.apache.iceberg.StructLike;
import org.apache.iceberg.parquet.ParquetValueReader;
import org.apache.iceberg.parquet.ParquetValueReaders;
import org.apache.iceberg.types.Types.StructType;
Expand All @@ -31,25 +31,28 @@
import org.apache.parquet.schema.PrimitiveType;
import org.apache.parquet.schema.Type;

public class InternalReader extends BaseParquetReaders<Record> {
public class InternalReader<T extends StructLike> extends BaseParquetReaders<T> {

private static final InternalReader INSTANCE = new InternalReader();
private static final InternalReader<?> INSTANCE = new InternalReader<>();

private InternalReader() {}

public static ParquetValueReader<Record> create(Schema expectedSchema, MessageType fileSchema) {
return INSTANCE.createReader(expectedSchema, fileSchema);
@SuppressWarnings("unchecked")
public static <T extends StructLike> ParquetValueReader<T> create(Schema expectedSchema, MessageType fileSchema) {
return (ParquetValueReader<T>) INSTANCE.createReader(expectedSchema, fileSchema);
}

public static ParquetValueReader<Record> create(
@SuppressWarnings("unchecked")
public static <T extends StructLike> ParquetValueReader<T> create(
Schema expectedSchema, MessageType fileSchema, Map<Integer, ?> idToConstant) {
return INSTANCE.createReader(expectedSchema, fileSchema, idToConstant);
return (ParquetValueReader<T>) INSTANCE.createReader(expectedSchema, fileSchema, idToConstant);
}

@Override
protected ParquetValueReader<Record> createStructReader(
@SuppressWarnings("unchecked")
protected ParquetValueReader<T> createStructReader(
List<Type> types, List<ParquetValueReader<?>> fieldReaders, StructType structType) {
return ParquetValueReaders.recordReader(types, fieldReaders, structType);
return (ParquetValueReader<T>) ParquetValueReaders.recordReader(types, fieldReaders, structType);
}

@Override
Expand All @@ -66,7 +69,7 @@ protected ParquetValueReader<?> dateReader(ColumnDescriptor desc) {
protected ParquetValueReader<?> timeReader(
ColumnDescriptor desc, LogicalTypeAnnotation.TimeUnit unit) {
if (unit == LogicalTypeAnnotation.TimeUnit.MILLIS) {
return ParquetValueReaders.millisAsTimestamps(desc);
return ParquetValueReaders.millisAsTimes(desc);
}

return new ParquetValueReaders.UnboxedReader<>(desc);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -33,17 +33,18 @@
* <p>Iceberg's internal in-memory object model produces the types defined in {@link
* Type.TypeID#javaClass()}.
*/
public class InternalWriter extends BaseParquetWriter<StructLike> {
private static final InternalWriter INSTANCE = new InternalWriter();
public class InternalWriter<T extends StructLike> extends BaseParquetWriter<T> {
private static final InternalWriter<?> INSTANCE = new InternalWriter<>();

private InternalWriter() {}

public static ParquetValueWriter<StructLike> create(MessageType type) {
return INSTANCE.createWriter(type);
@SuppressWarnings("unchecked")
public static <T extends StructLike> ParquetValueWriter<T> create(MessageType type) {
return (ParquetValueWriter<T>) INSTANCE.createWriter(type);
}

@Override
protected StructWriter<StructLike> createStructWriter(List<ParquetValueWriter<?>> writers) {
protected StructWriter<T> createStructWriter(List<ParquetValueWriter<?>> writers) {
return ParquetValueWriters.recordWriter(writers);
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -77,6 +77,10 @@ public static ParquetValueReader<Long> int96Timestamps(ColumnDescriptor desc) {
return new ParquetValueReaders.TimestampInt96Reader(desc);
}

public static ParquetValueReader<Long> millisAsTimes(ColumnDescriptor desc) {
return new ParquetValueReaders.TimeMillisReader(desc);
}

public static ParquetValueReader<Long> millisAsTimestamps(ColumnDescriptor desc) {
return new ParquetValueReaders.TimestampMillisReader(desc);
}
Expand Down Expand Up @@ -465,6 +469,22 @@ public long readLong() {
}
}

private static class TimeMillisReader extends UnboxedReader<Long> {
private TimeMillisReader(ColumnDescriptor desc) {
super(desc);
}

@Override
public Long read(Long ignored) {
return readLong();
}

@Override
public long readLong() {
return 1000L * column.nextInteger();
}
}

private static class TimestampMillisReader extends UnboxedReader<Long> {
private TimestampMillisReader(ColumnDescriptor desc) {
super(desc);
Expand All @@ -477,7 +497,7 @@ public Long read(Long ignored) {

@Override
public long readLong() {
return 1000L * column.nextInteger();
return 1000L * column.nextLong();
}
}

Expand Down