Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

AVRO-3914: Add nanos support for the Java SDK #2608

Merged
merged 1 commit into from
Dec 12, 2023
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
49 changes: 49 additions & 0 deletions lang/java/avro/src/main/java/org/apache/avro/LogicalTypes.java
Original file line number Diff line number Diff line change
Expand Up @@ -149,6 +149,9 @@ private static LogicalType fromSchemaImpl(Schema schema, boolean throwErrors) {
case TIMESTAMP_MICROS:
logicalType = TIMESTAMP_MICROS_TYPE;
break;
case TIMESTAMP_NANOS:
logicalType = TIMESTAMP_NANOS_TYPE;
break;
case TIME_MILLIS:
logicalType = TIME_MILLIS_TYPE;
break;
Expand All @@ -161,6 +164,9 @@ private static LogicalType fromSchemaImpl(Schema schema, boolean throwErrors) {
case LOCAL_TIMESTAMP_MILLIS:
logicalType = LOCAL_TIMESTAMP_MILLIS_TYPE;
break;
case LOCAL_TIMESTAMP_NANOS:
logicalType = LOCAL_TIMESTAMP_NANOS_TYPE;
break;
default:
final LogicalTypeFactory typeFactory = REGISTERED_TYPES.get(typeName);
logicalType = (typeFactory == null) ? null : typeFactory.fromSchema(schema);
Expand Down Expand Up @@ -193,8 +199,10 @@ private static LogicalType fromSchemaImpl(Schema schema, boolean throwErrors) {
private static final String TIME_MICROS = "time-micros";
private static final String TIMESTAMP_MILLIS = "timestamp-millis";
private static final String TIMESTAMP_MICROS = "timestamp-micros";
private static final String TIMESTAMP_NANOS = "timestamp-nanos";
private static final String LOCAL_TIMESTAMP_MILLIS = "local-timestamp-millis";
private static final String LOCAL_TIMESTAMP_MICROS = "local-timestamp-micros";
private static final String LOCAL_TIMESTAMP_NANOS = "local-timestamp-nanos";

/** Create a Decimal LogicalType with the given precision and scale 0 */
public static Decimal decimal(int precision) {
Expand Down Expand Up @@ -255,6 +263,12 @@ public static TimestampMicros timestampMicros() {
return TIMESTAMP_MICROS_TYPE;
}

private static final TimestampNanos TIMESTAMP_NANOS_TYPE = new TimestampNanos();

public static TimestampNanos timestampNanos() {
return TIMESTAMP_NANOS_TYPE;
}

private static final LocalTimestampMillis LOCAL_TIMESTAMP_MILLIS_TYPE = new LocalTimestampMillis();

public static LocalTimestampMillis localTimestampMillis() {
Expand All @@ -267,6 +281,12 @@ public static LocalTimestampMicros localTimestampMicros() {
return LOCAL_TIMESTAMP_MICROS_TYPE;
}

private static final LocalTimestampMicros LOCAL_TIMESTAMP_NANOS_TYPE = new LocalTimestampMicros();

public static LocalTimestampMicros localTimestampNanos() {
return LOCAL_TIMESTAMP_NANOS_TYPE;
}

Comment on lines +284 to +289
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Shoudln't it be LocalTimestampNanos class here?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Yes, you're correct! Do you want to raise the PR?

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Done at #2706

/** Uuid represents a uuid without a time */
public static class Uuid extends LogicalType {
private Uuid() {
Expand Down Expand Up @@ -502,6 +522,21 @@ public void validate(Schema schema) {
}
}

/** TimestampNanos represents a date and time in nanoseconds */
public static class TimestampNanos extends LogicalType {
private TimestampNanos() {
super(TIMESTAMP_NANOS);
}

@Override
public void validate(Schema schema) {
super.validate(schema);
if (schema.getType() != Schema.Type.LONG) {
throw new IllegalArgumentException("Timestamp (nanos) can only be used with an underlying long type");
}
}
}

public static class LocalTimestampMillis extends LogicalType {
private LocalTimestampMillis() {
super(LOCAL_TIMESTAMP_MILLIS);
Expand Down Expand Up @@ -530,4 +565,18 @@ public void validate(Schema schema) {
}
}

public static class LocalTimestampNanos extends LogicalType {
private LocalTimestampNanos() {
super(LOCAL_TIMESTAMP_NANOS);
}

@Override
public void validate(Schema schema) {
super.validate(schema);
if (schema.getType() != Schema.Type.LONG) {
throw new IllegalArgumentException("Local timestamp (micros) can only be used with an underlying long type");
}
}
}

}
Original file line number Diff line number Diff line change
Expand Up @@ -204,6 +204,53 @@ public Schema getRecommendedSchema() {
}
}

public static class TimestampNanosConversion extends Conversion<Instant> {
@Override
public Class<Instant> getConvertedType() {
return Instant.class;
}

@Override
public String getLogicalTypeName() {
return "timestamp-nanos";
}

@Override
public String adjustAndSetValue(String varName, String valParamName) {
return varName + " = " + valParamName + ".truncatedTo(java.time.temporal.ChronoUnit.NANOS);";
}

@Override
public Instant fromLong(Long microsFromEpoch, Schema schema, LogicalType type) {
long epochSeconds = microsFromEpoch / 1_000_000_000L;
long nanoAdjustment = microsFromEpoch % 1_000_000_000L;

return Instant.ofEpochSecond(epochSeconds, nanoAdjustment);
}

@Override
public Long toLong(Instant instant, Schema schema, LogicalType type) {
long seconds = instant.getEpochSecond();
int nanos = instant.getNano();

if (seconds < 0 && nanos > 0) {
long micros = Math.multiplyExact(seconds + 1, 1_000_000_000L);
long adjustment = nanos - 1_000_000;

return Math.addExact(micros, adjustment);
} else {
long micros = Math.multiplyExact(seconds, 1_000_000_000L);

return Math.addExact(micros, nanos);
}
}

@Override
public Schema getRecommendedSchema() {
return LogicalTypes.timestampNanos().addToSchema(Schema.create(Schema.Type.LONG));
}
}

public static class LocalTimestampMillisConversion extends Conversion<LocalDateTime> {
private final TimestampMillisConversion timestampMillisConversion = new TimestampMillisConversion();

Expand Down Expand Up @@ -265,4 +312,35 @@ public Schema getRecommendedSchema() {
return LogicalTypes.localTimestampMicros().addToSchema(Schema.create(Schema.Type.LONG));
}
}

public static class LocalTimestampNanosConversion extends Conversion<LocalDateTime> {
private final TimestampNanosConversion timestampNanosConversion = new TimestampNanosConversion();

@Override
public Class<LocalDateTime> getConvertedType() {
return LocalDateTime.class;
}

@Override
public String getLogicalTypeName() {
return "local-timestamp-nanos";
}

@Override
public LocalDateTime fromLong(Long microsFromEpoch, Schema schema, LogicalType type) {
Instant instant = timestampNanosConversion.fromLong(microsFromEpoch, schema, type);
return LocalDateTime.ofInstant(instant, ZoneOffset.UTC);
}

@Override
public Long toLong(LocalDateTime timestamp, Schema schema, LogicalType type) {
Instant instant = timestamp.toInstant(ZoneOffset.UTC);
return timestampNanosConversion.toLong(instant, schema, type);
}

@Override
public Schema getRecommendedSchema() {
return LogicalTypes.localTimestampNanos().addToSchema(Schema.create(Schema.Type.LONG));
}
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -106,8 +106,10 @@ void addLogicalTypeConversions(SpecificData specificData) {
specificData.addLogicalTypeConversion(new TimeConversions.TimeMicrosConversion());
specificData.addLogicalTypeConversion(new TimeConversions.TimestampMillisConversion());
specificData.addLogicalTypeConversion(new TimeConversions.TimestampMicrosConversion());
specificData.addLogicalTypeConversion(new TimeConversions.TimestampNanosConversion());
specificData.addLogicalTypeConversion(new TimeConversions.LocalTimestampMicrosConversion());
specificData.addLogicalTypeConversion(new TimeConversions.LocalTimestampMillisConversion());
specificData.addLogicalTypeConversion(new TimeConversions.LocalTimestampNanosConversion());
specificData.addLogicalTypeConversion(new Conversions.UUIDConversion());
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -393,6 +393,7 @@ void javaTypeWithDateTimeTypes() throws Exception {
Schema timeMicrosSchema = LogicalTypes.timeMicros().addToSchema(Schema.create(Schema.Type.LONG));
Schema timestampSchema = LogicalTypes.timestampMillis().addToSchema(Schema.create(Schema.Type.LONG));
Schema timestampMicrosSchema = LogicalTypes.timestampMicros().addToSchema(Schema.create(Schema.Type.LONG));
Schema timestampNanosSchema = LogicalTypes.timestampNanos().addToSchema(Schema.create(Schema.Type.LONG));

// Date/time types should always use upper level java classes
assertEquals("java.time.LocalDate", compiler.javaType(dateSchema), "Should use java.time.LocalDate for date type");
Expand All @@ -404,6 +405,8 @@ void javaTypeWithDateTimeTypes() throws Exception {
"Should use java.time.LocalTime for time-micros type");
assertEquals("java.time.Instant", compiler.javaType(timestampMicrosSchema),
"Should use java.time.Instant for timestamp-micros type");
assertEquals("java.time.Instant", compiler.javaType(timestampNanosSchema),
"Should use java.time.Instant for timestamp-nanos type");
}

@Test
Expand Down Expand Up @@ -582,15 +585,18 @@ void getUsedConversionClassesForNullableTimestamps() throws Exception {
// present or added as converters (AVRO-2481).
final Schema tsMillis = LogicalTypes.timestampMillis().addToSchema(Schema.create(Schema.Type.LONG));
final Schema tsMicros = LogicalTypes.timestampMicros().addToSchema(Schema.create(Schema.Type.LONG));
final Schema tsNanos = LogicalTypes.timestampNanos().addToSchema(Schema.create(Schema.Type.LONG));

final Collection<String> conversions = compiler.getUsedConversionClasses(SchemaBuilder.record("WithTimestamps")
.fields().name("tsMillis").type(tsMillis).noDefault().name("tsMillisOpt").type().unionOf().nullType().and()
.type(tsMillis).endUnion().noDefault().name("tsMicros").type(tsMicros).noDefault().name("tsMicrosOpt").type()
.unionOf().nullType().and().type(tsMicros).endUnion().noDefault().endRecord());
.unionOf().nullType().and().type(tsMicros).endUnion().noDefault().name("tsNanos").type(tsNanos).noDefault()
.name("tsNanosOpt").type().unionOf().nullType().and().type(tsNanos).endUnion().noDefault().endRecord());

assertEquals(2, conversions.size());
assertEquals(3, conversions.size());
assertThat(conversions, hasItem("org.apache.avro.data.TimeConversions.TimestampMillisConversion"));
assertThat(conversions, hasItem("org.apache.avro.data.TimeConversions.TimestampMicrosConversion"));
assertThat(conversions, hasItem("org.apache.avro.data.TimeConversions.TimestampNanosConversion"));
}

@Test
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -25,8 +25,9 @@

public class ProtoConversions {

private static final int THOUSAND = 1000;
private static final int MILLION = 1000000;
private static final int THOUSAND = 1_000;
private static final int MILLION = 1_000_000;
private static final int BILLION = 1_000_000_000;

// second value must be from 0001-01-01T00:00:00Z to 9999-12-31T23:59:59Z
// inclusive.
Expand All @@ -39,7 +40,7 @@

// timestamp precise of conversion from long
private enum TimestampPrecise {
Millis, Micros
Millis, Micros, Nanos
};

public static class TimestampMillisConversion extends Conversion<Timestamp> {
Expand Down Expand Up @@ -96,10 +97,37 @@
}
}

public static class TimestampNanosConversion extends Conversion<Timestamp> {
@Override
public Class<Timestamp> getConvertedType() {
return Timestamp.class;
}

@Override
public String getLogicalTypeName() {
return "timestamp-nanos";
}

@Override
public Timestamp fromLong(Long nanosFromEpoch, Schema schema, LogicalType type) throws IllegalArgumentException {
return ProtoConversions.fromLong(nanosFromEpoch, TimestampPrecise.Nanos);
}

@Override
public Long toLong(Timestamp value, Schema schema, LogicalType type) {
return ProtoConversions.toLong(value, TimestampPrecise.Nanos);
}

@Override
public Schema getRecommendedSchema() {
return LogicalTypes.timestampNanos().addToSchema(Schema.create(Schema.Type.LONG));
}
}

private static long toLong(Timestamp value, TimestampPrecise precise) {
long rv = 0L;

switch (precise) {

Check warning

Code scanning / CodeQL

Missing enum case in switch Warning

Switch statement does not have a case for
Nanos
.
case Millis:
rv = value.getSeconds() * THOUSAND + value.getNanos() / MILLION;
break;
Expand All @@ -112,8 +140,8 @@
}

private static Timestamp fromLong(Long elapsedSinceEpoch, TimestampPrecise precise) throws IllegalArgumentException {
long seconds = 0L;
int nanos = 0;
final long seconds;
final int nanos;

switch (precise) {
case Millis:
Expand All @@ -124,6 +152,12 @@
seconds = Math.floorDiv(elapsedSinceEpoch, (long) MILLION);
nanos = (int) Math.floorMod(elapsedSinceEpoch, (long) MILLION) * THOUSAND;
break;
case Nanos:
seconds = Math.floorDiv(elapsedSinceEpoch, (long) BILLION);
nanos = (int) Math.floorMod(elapsedSinceEpoch, (long) BILLION);
break;
default:
throw new IllegalArgumentException("Unknown precision: " + precise);
}

if (seconds < SECONDS_LOWERLIMIT || seconds > SECONDS_UPPERLIMIT) {
Expand Down
Loading