Skip to content

Commit

Permalink
AVRO-3914: Add nanos support (#2608)
Browse files Browse the repository at this point in the history
  • Loading branch information
Fokko authored Dec 12, 2023
1 parent 92512b2 commit ff32453
Show file tree
Hide file tree
Showing 5 changed files with 176 additions and 7 deletions.
49 changes: 49 additions & 0 deletions lang/java/avro/src/main/java/org/apache/avro/LogicalTypes.java
Original file line number Diff line number Diff line change
Expand Up @@ -149,6 +149,9 @@ private static LogicalType fromSchemaImpl(Schema schema, boolean throwErrors) {
case TIMESTAMP_MICROS:
logicalType = TIMESTAMP_MICROS_TYPE;
break;
case TIMESTAMP_NANOS:
logicalType = TIMESTAMP_NANOS_TYPE;
break;
case TIME_MILLIS:
logicalType = TIME_MILLIS_TYPE;
break;
Expand All @@ -161,6 +164,9 @@ private static LogicalType fromSchemaImpl(Schema schema, boolean throwErrors) {
case LOCAL_TIMESTAMP_MILLIS:
logicalType = LOCAL_TIMESTAMP_MILLIS_TYPE;
break;
case LOCAL_TIMESTAMP_NANOS:
logicalType = LOCAL_TIMESTAMP_NANOS_TYPE;
break;
default:
final LogicalTypeFactory typeFactory = REGISTERED_TYPES.get(typeName);
logicalType = (typeFactory == null) ? null : typeFactory.fromSchema(schema);
Expand Down Expand Up @@ -193,8 +199,10 @@ private static LogicalType fromSchemaImpl(Schema schema, boolean throwErrors) {
private static final String TIME_MICROS = "time-micros";
private static final String TIMESTAMP_MILLIS = "timestamp-millis";
private static final String TIMESTAMP_MICROS = "timestamp-micros";
private static final String TIMESTAMP_NANOS = "timestamp-nanos";
private static final String LOCAL_TIMESTAMP_MILLIS = "local-timestamp-millis";
private static final String LOCAL_TIMESTAMP_MICROS = "local-timestamp-micros";
private static final String LOCAL_TIMESTAMP_NANOS = "local-timestamp-nanos";

/** Create a Decimal LogicalType with the given precision and scale 0 */
public static Decimal decimal(int precision) {
Expand Down Expand Up @@ -255,6 +263,12 @@ public static TimestampMicros timestampMicros() {
return TIMESTAMP_MICROS_TYPE;
}

private static final TimestampNanos TIMESTAMP_NANOS_TYPE = new TimestampNanos();

public static TimestampNanos timestampNanos() {
return TIMESTAMP_NANOS_TYPE;
}

private static final LocalTimestampMillis LOCAL_TIMESTAMP_MILLIS_TYPE = new LocalTimestampMillis();

public static LocalTimestampMillis localTimestampMillis() {
Expand All @@ -267,6 +281,12 @@ public static LocalTimestampMicros localTimestampMicros() {
return LOCAL_TIMESTAMP_MICROS_TYPE;
}

private static final LocalTimestampMicros LOCAL_TIMESTAMP_NANOS_TYPE = new LocalTimestampMicros();

public static LocalTimestampMicros localTimestampNanos() {
return LOCAL_TIMESTAMP_NANOS_TYPE;
}

/** Uuid represents a uuid without a time */
public static class Uuid extends LogicalType {
private Uuid() {
Expand Down Expand Up @@ -502,6 +522,21 @@ public void validate(Schema schema) {
}
}

/** TimestampNanos represents a date and time in nanoseconds */
public static class TimestampNanos extends LogicalType {
private TimestampNanos() {
super(TIMESTAMP_NANOS);
}

@Override
public void validate(Schema schema) {
super.validate(schema);
if (schema.getType() != Schema.Type.LONG) {
throw new IllegalArgumentException("Timestamp (nanos) can only be used with an underlying long type");
}
}
}

public static class LocalTimestampMillis extends LogicalType {
private LocalTimestampMillis() {
super(LOCAL_TIMESTAMP_MILLIS);
Expand Down Expand Up @@ -530,4 +565,18 @@ public void validate(Schema schema) {
}
}

public static class LocalTimestampNanos extends LogicalType {
private LocalTimestampNanos() {
super(LOCAL_TIMESTAMP_NANOS);
}

@Override
public void validate(Schema schema) {
super.validate(schema);
if (schema.getType() != Schema.Type.LONG) {
throw new IllegalArgumentException("Local timestamp (micros) can only be used with an underlying long type");
}
}
}

}
Original file line number Diff line number Diff line change
Expand Up @@ -204,6 +204,53 @@ public Schema getRecommendedSchema() {
}
}

public static class TimestampNanosConversion extends Conversion<Instant> {
@Override
public Class<Instant> getConvertedType() {
return Instant.class;
}

@Override
public String getLogicalTypeName() {
return "timestamp-nanos";
}

@Override
public String adjustAndSetValue(String varName, String valParamName) {
return varName + " = " + valParamName + ".truncatedTo(java.time.temporal.ChronoUnit.NANOS);";
}

@Override
public Instant fromLong(Long microsFromEpoch, Schema schema, LogicalType type) {
long epochSeconds = microsFromEpoch / 1_000_000_000L;
long nanoAdjustment = microsFromEpoch % 1_000_000_000L;

return Instant.ofEpochSecond(epochSeconds, nanoAdjustment);
}

@Override
public Long toLong(Instant instant, Schema schema, LogicalType type) {
long seconds = instant.getEpochSecond();
int nanos = instant.getNano();

if (seconds < 0 && nanos > 0) {
long micros = Math.multiplyExact(seconds + 1, 1_000_000_000L);
long adjustment = nanos - 1_000_000;

return Math.addExact(micros, adjustment);
} else {
long micros = Math.multiplyExact(seconds, 1_000_000_000L);

return Math.addExact(micros, nanos);
}
}

@Override
public Schema getRecommendedSchema() {
return LogicalTypes.timestampNanos().addToSchema(Schema.create(Schema.Type.LONG));
}
}

public static class LocalTimestampMillisConversion extends Conversion<LocalDateTime> {
private final TimestampMillisConversion timestampMillisConversion = new TimestampMillisConversion();

Expand Down Expand Up @@ -265,4 +312,35 @@ public Schema getRecommendedSchema() {
return LogicalTypes.localTimestampMicros().addToSchema(Schema.create(Schema.Type.LONG));
}
}

public static class LocalTimestampNanosConversion extends Conversion<LocalDateTime> {
private final TimestampNanosConversion timestampNanosConversion = new TimestampNanosConversion();

@Override
public Class<LocalDateTime> getConvertedType() {
return LocalDateTime.class;
}

@Override
public String getLogicalTypeName() {
return "local-timestamp-nanos";
}

@Override
public LocalDateTime fromLong(Long microsFromEpoch, Schema schema, LogicalType type) {
Instant instant = timestampNanosConversion.fromLong(microsFromEpoch, schema, type);
return LocalDateTime.ofInstant(instant, ZoneOffset.UTC);
}

@Override
public Long toLong(LocalDateTime timestamp, Schema schema, LogicalType type) {
Instant instant = timestamp.toInstant(ZoneOffset.UTC);
return timestampNanosConversion.toLong(instant, schema, type);
}

@Override
public Schema getRecommendedSchema() {
return LogicalTypes.localTimestampNanos().addToSchema(Schema.create(Schema.Type.LONG));
}
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -106,8 +106,10 @@ void addLogicalTypeConversions(SpecificData specificData) {
specificData.addLogicalTypeConversion(new TimeConversions.TimeMicrosConversion());
specificData.addLogicalTypeConversion(new TimeConversions.TimestampMillisConversion());
specificData.addLogicalTypeConversion(new TimeConversions.TimestampMicrosConversion());
specificData.addLogicalTypeConversion(new TimeConversions.TimestampNanosConversion());
specificData.addLogicalTypeConversion(new TimeConversions.LocalTimestampMicrosConversion());
specificData.addLogicalTypeConversion(new TimeConversions.LocalTimestampMillisConversion());
specificData.addLogicalTypeConversion(new TimeConversions.LocalTimestampNanosConversion());
specificData.addLogicalTypeConversion(new Conversions.UUIDConversion());
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -393,6 +393,7 @@ void javaTypeWithDateTimeTypes() throws Exception {
Schema timeMicrosSchema = LogicalTypes.timeMicros().addToSchema(Schema.create(Schema.Type.LONG));
Schema timestampSchema = LogicalTypes.timestampMillis().addToSchema(Schema.create(Schema.Type.LONG));
Schema timestampMicrosSchema = LogicalTypes.timestampMicros().addToSchema(Schema.create(Schema.Type.LONG));
Schema timestampNanosSchema = LogicalTypes.timestampNanos().addToSchema(Schema.create(Schema.Type.LONG));

// Date/time types should always use upper level java classes
assertEquals("java.time.LocalDate", compiler.javaType(dateSchema), "Should use java.time.LocalDate for date type");
Expand All @@ -404,6 +405,8 @@ void javaTypeWithDateTimeTypes() throws Exception {
"Should use java.time.LocalTime for time-micros type");
assertEquals("java.time.Instant", compiler.javaType(timestampMicrosSchema),
"Should use java.time.Instant for timestamp-micros type");
assertEquals("java.time.Instant", compiler.javaType(timestampNanosSchema),
"Should use java.time.Instant for timestamp-nanos type");
}

@Test
Expand Down Expand Up @@ -582,15 +585,18 @@ void getUsedConversionClassesForNullableTimestamps() throws Exception {
// present or added as converters (AVRO-2481).
final Schema tsMillis = LogicalTypes.timestampMillis().addToSchema(Schema.create(Schema.Type.LONG));
final Schema tsMicros = LogicalTypes.timestampMicros().addToSchema(Schema.create(Schema.Type.LONG));
final Schema tsNanos = LogicalTypes.timestampNanos().addToSchema(Schema.create(Schema.Type.LONG));

final Collection<String> conversions = compiler.getUsedConversionClasses(SchemaBuilder.record("WithTimestamps")
.fields().name("tsMillis").type(tsMillis).noDefault().name("tsMillisOpt").type().unionOf().nullType().and()
.type(tsMillis).endUnion().noDefault().name("tsMicros").type(tsMicros).noDefault().name("tsMicrosOpt").type()
.unionOf().nullType().and().type(tsMicros).endUnion().noDefault().endRecord());
.unionOf().nullType().and().type(tsMicros).endUnion().noDefault().name("tsNanos").type(tsNanos).noDefault()
.name("tsNanosOpt").type().unionOf().nullType().and().type(tsNanos).endUnion().noDefault().endRecord());

assertEquals(2, conversions.size());
assertEquals(3, conversions.size());
assertThat(conversions, hasItem("org.apache.avro.data.TimeConversions.TimestampMillisConversion"));
assertThat(conversions, hasItem("org.apache.avro.data.TimeConversions.TimestampMicrosConversion"));
assertThat(conversions, hasItem("org.apache.avro.data.TimeConversions.TimestampNanosConversion"));
}

@Test
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -25,8 +25,9 @@

public class ProtoConversions {

private static final int THOUSAND = 1000;
private static final int MILLION = 1000000;
private static final int THOUSAND = 1_000;
private static final int MILLION = 1_000_000;
private static final int BILLION = 1_000_000_000;

// second value must be from 0001-01-01T00:00:00Z to 9999-12-31T23:59:59Z
// inclusive.
Expand All @@ -39,7 +40,7 @@ public class ProtoConversions {

// timestamp precise of conversion from long
private enum TimestampPrecise {
Millis, Micros
Millis, Micros, Nanos
};

public static class TimestampMillisConversion extends Conversion<Timestamp> {
Expand Down Expand Up @@ -96,6 +97,33 @@ public Schema getRecommendedSchema() {
}
}

public static class TimestampNanosConversion extends Conversion<Timestamp> {
@Override
public Class<Timestamp> getConvertedType() {
return Timestamp.class;
}

@Override
public String getLogicalTypeName() {
return "timestamp-nanos";
}

@Override
public Timestamp fromLong(Long nanosFromEpoch, Schema schema, LogicalType type) throws IllegalArgumentException {
return ProtoConversions.fromLong(nanosFromEpoch, TimestampPrecise.Nanos);
}

@Override
public Long toLong(Timestamp value, Schema schema, LogicalType type) {
return ProtoConversions.toLong(value, TimestampPrecise.Nanos);
}

@Override
public Schema getRecommendedSchema() {
return LogicalTypes.timestampNanos().addToSchema(Schema.create(Schema.Type.LONG));
}
}

private static long toLong(Timestamp value, TimestampPrecise precise) {
long rv = 0L;

Expand All @@ -112,8 +140,8 @@ private static long toLong(Timestamp value, TimestampPrecise precise) {
}

private static Timestamp fromLong(Long elapsedSinceEpoch, TimestampPrecise precise) throws IllegalArgumentException {
long seconds = 0L;
int nanos = 0;
final long seconds;
final int nanos;

switch (precise) {
case Millis:
Expand All @@ -124,6 +152,12 @@ private static Timestamp fromLong(Long elapsedSinceEpoch, TimestampPrecise preci
seconds = Math.floorDiv(elapsedSinceEpoch, (long) MILLION);
nanos = (int) Math.floorMod(elapsedSinceEpoch, (long) MILLION) * THOUSAND;
break;
case Nanos:
seconds = Math.floorDiv(elapsedSinceEpoch, (long) BILLION);
nanos = (int) Math.floorMod(elapsedSinceEpoch, (long) BILLION);
break;
default:
throw new IllegalArgumentException("Unknown precision: " + precise);
}

if (seconds < SECONDS_LOWERLIMIT || seconds > SECONDS_UPPERLIMIT) {
Expand Down

0 comments on commit ff32453

Please sign in to comment.