Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Add joda-time and ByteBuffer support in avro #740

Merged
merged 9 commits into from
May 12, 2023
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
50 changes: 40 additions & 10 deletions avro/src/main/scala/magnolify/avro/AvroType.scala
Original file line number Diff line number Diff line change
Expand Up @@ -16,21 +16,22 @@

package magnolify.avro

import java.nio.ByteBuffer
import java.time._
import java.{util => ju}
import magnolia1._
import magnolify.shared._
import magnolify.shims.FactoryCompat
import org.apache.avro.generic.GenericData.EnumSymbol
import org.apache.avro.generic._
import org.apache.avro.{JsonProperties, LogicalType, LogicalTypes, Schema}
import org.joda.{time => joda}

import java.nio.{ByteBuffer, ByteOrder}
import java.time._
import java.{util => ju}
import scala.annotation.{implicitNotFound, nowarn}
import scala.collection.concurrent
import scala.reflect.ClassTag
import scala.jdk.CollectionConverters._
import scala.collection.compat._
import scala.jdk.CollectionConverters._
import scala.reflect.ClassTag

sealed trait AvroType[T] extends Converter[T, GenericRecord, GenericRecord] {
val schema: Schema
Expand Down Expand Up @@ -196,14 +197,14 @@ object AvroField {
implicit val afLong: AvroField[Long] = id[Long](Schema.Type.LONG)
implicit val afFloat: AvroField[Float] = id[Float](Schema.Type.FLOAT)
implicit val afDouble: AvroField[Double] = id[Double](Schema.Type.DOUBLE)
implicit val afBytes: AvroField[Array[Byte]] = new Aux[Array[Byte], ByteBuffer, ByteBuffer] {
implicit val afByteBuffer: AvroField[ByteBuffer] = new Aux[ByteBuffer, ByteBuffer, ByteBuffer] {
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

looks like it could be declared as id[ByteBuffer](Schema.Type. BYTES), but id needs to have an override to accommodate d.array()

override protected def buildSchema(cm: CaseMapper): Schema = Schema.create(Schema.Type.BYTES)
// `JacksonUtils.toJson` expects `Array[Byte]` for `BYTES` defaults
override def makeDefault(d: Array[Byte])(cm: CaseMapper): Array[Byte] = d
override def from(v: ByteBuffer)(cm: CaseMapper): Array[Byte] =
ju.Arrays.copyOfRange(v.array(), v.position(), v.limit())
override def to(v: Array[Byte])(cm: CaseMapper): ByteBuffer = ByteBuffer.wrap(v)
Comment on lines -203 to -205
Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Here we have a non symmetric definition:

  • in the from we copy the data
  • in the to we only wrap, meaning a change to the converted avro object will modify the scala array.

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

interesting--yeah, I can't see why we would need to do an array-copy here, your change makes sense to me.

override def makeDefault(d: ByteBuffer)(cm: CaseMapper): Array[Byte] = d.array()
override def from(v: ByteBuffer)(cm: CaseMapper): ByteBuffer = v
override def to(v: ByteBuffer)(cm: CaseMapper): ByteBuffer = v
}
implicit val afBytes: AvroField[Array[Byte]] = from[ByteBuffer](_.array())(ByteBuffer.wrap)
Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Changed the behavior to pass the same mutable array from avro to scala

implicit val afCharSequence: AvroField[CharSequence] = id[CharSequence](Schema.Type.STRING)
implicit val afString: AvroField[String] = new Aux[String, String, String] {
override protected def buildSchema(cm: CaseMapper): Schema = {
Expand Down Expand Up @@ -316,8 +317,37 @@ object AvroField {
logicalType[CharSequence](LogicalTypes.uuid())(cs => ju.UUID.fromString(cs.toString))(
_.toString
)

// date
implicit val afDate: AvroField[LocalDate] =
logicalType[Int](LogicalTypes.date())(x => LocalDate.ofEpochDay(x.toLong))(_.toEpochDay.toInt)
private lazy val EpochJodaDate = new joda.LocalDate(1970, 1, 1)
implicit val afJodaDate: AvroField[joda.LocalDate] =
logicalType[Int](LogicalTypes.date()) { daysFromEpoch =>
EpochJodaDate.plusDays(daysFromEpoch)
} { date =>
joda.Days.daysBetween(EpochJodaDate, date).getDays
}

// duration, as in the avro spec. do not make implicit as there is not a specific type for it
// A duration logical type annotates Avro fixed type of size 12, which stores three little-endian unsigned integers
// that represent durations at different granularities of time.
// The first stores a number in months, the second stores a number in days, and the third stores a number in milliseconds.
val afDuration: AvroField[(Long, Long, Long)] =
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Could we use java.time.Duration for this? It supports a nanosecond granularity which we could use for encoding in the ByteBuffer

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Duration is not made to hold month but Period can, so we could define as (Period, Duration).
I was unsure though where to store the days, as both can hold the value. I think we can split like this:

  • Period: months + days
  • Duration: milliseconds

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Actually, Period is defined as (years: Int, months: Int, days: Int), so we can have an overflow when converting from avro, as the specification tells the value are unsinged integers (handled as long)

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

ok, got it. This is probably the least painful solution, then -- can you add documentation (both on how to import it, and on what it represents) to https://github.com/spotify/magnolify/blob/main/docs/avro.md + https://github.com/spotify/magnolify/blob/main/docs/mapping.md ?

logicalType[ByteBuffer](new LogicalType("duration")) { bs =>
bs.order(ByteOrder.LITTLE_ENDIAN)
val months = java.lang.Integer.toUnsignedLong(bs.getInt)
val days = java.lang.Integer.toUnsignedLong(bs.getInt)
val millis = java.lang.Integer.toUnsignedLong(bs.getInt)
(months, days, millis)
} { case (months, days, millis) =>
ByteBuffer
.allocate(12)
.order(ByteOrder.LITTLE_ENDIAN)
.putInt(months.toInt)
.putInt(days.toInt)
.putInt(millis.toInt)
}(AvroField.fixed(12)(ByteBuffer.wrap)(_.array()))

def fixed[T: ClassTag](
size: Int
Expand Down
170 changes: 140 additions & 30 deletions avro/src/main/scala/magnolify/avro/logical/package.scala
Original file line number Diff line number Diff line change
Expand Up @@ -17,53 +17,121 @@
package magnolify.avro

import org.apache.avro.LogicalTypes.LogicalTypeFactory
import org.apache.avro.{LogicalType, LogicalTypes, Schema}
import org.joda.{time => joda}

import java.time.{Instant, LocalDateTime, LocalTime, ZoneOffset}
import java.time._
import java.time.format.{DateTimeFormatter, DateTimeFormatterBuilder}
import org.apache.avro.{LogicalType, LogicalTypes, Schema}
import java.util.concurrent.TimeUnit

package object logical {
// Duplicate implementation from org.apache.avro.data.TimeConversions
// to support both 1.8 (joda-time based) and 1.9+ (java-time based)
object micros {
private def toTimestampMicros(microsFromEpoch: Long): Instant = {
val epochSeconds = microsFromEpoch / 1000000L
val nanoAdjustment = (microsFromEpoch % 1000000L) * 1000L;
Instant.ofEpochSecond(epochSeconds, nanoAdjustment)
}

private def fromTimestampMicros(instant: Instant): Long = {
val seconds = instant.getEpochSecond
val nanos = instant.getNano
if (seconds < 0 && nanos > 0) {
val micros = Math.multiplyExact(seconds + 1, 1000000L)
val adjustment = (nanos / 1000L) - 1000000
Math.addExact(micros, adjustment)
} else {
val micros = Math.multiplyExact(seconds, 1000000L)
Math.addExact(micros, nanos / 1000L)
}
}

implicit val afTimestampMicros: AvroField[Instant] =
AvroField.logicalType[Long](LogicalTypes.timestampMicros())(us =>
Instant.ofEpochMilli(us / 1000)
)(_.toEpochMilli * 1000)
AvroField.logicalType[Long](LogicalTypes.timestampMicros())(toTimestampMicros)(
fromTimestampMicros
)

implicit val afTimeMicros: AvroField[LocalTime] =
AvroField.logicalType[Long](LogicalTypes.timeMicros())(us =>
LocalTime.ofNanoOfDay(us * 1000)
)(_.toNanoOfDay / 1000)
AvroField.logicalType[Long](LogicalTypes.timeMicros()) { us =>
LocalTime.ofNanoOfDay(TimeUnit.MICROSECONDS.toNanos(us))
} { time =>
TimeUnit.NANOSECONDS.toMicros(time.toNanoOfDay)
}

// `LogicalTypes.localTimestampMicros` is Avro 1.10.0+
// `LogicalTypes.localTimestampMicros()` is Avro 1.10
implicit val afLocalTimestampMicros: AvroField[LocalDateTime] =
AvroField.logicalType[Long](new LogicalType("local-timestamp-micros"))(us =>
LocalDateTime.ofInstant(Instant.ofEpochMilli(us / 1000), ZoneOffset.UTC)
)(_.toInstant(ZoneOffset.UTC).toEpochMilli * 1000)
AvroField.logicalType[Long](new LogicalType("local-timestamp-micros")) { microsFromEpoch =>
val instant = toTimestampMicros(microsFromEpoch)
LocalDateTime.ofInstant(instant, ZoneOffset.UTC)
} { timestamp =>
val instant = timestamp.toInstant(ZoneOffset.UTC)
fromTimestampMicros(instant)
}

// avro 1.8 uses joda-time
implicit val afJodaTimestampMicros: AvroField[joda.DateTime] =
AvroField.logicalType[Long](LogicalTypes.timestampMicros()) { microsFromEpoch =>
new joda.DateTime(microsFromEpoch / 1000, joda.DateTimeZone.UTC)
} { timestamp =>
1000 * timestamp.getMillis
}

implicit val afJodaTimeMicros: AvroField[joda.LocalTime] =
AvroField.logicalType[Long](LogicalTypes.timeMicros()) { microsFromMidnight =>
joda.LocalTime.fromMillisOfDay(microsFromMidnight / 1000)
} { time =>
// from LossyTimeMicrosConversion
1000L * time.millisOfDay().get()
}
}

object millis {
implicit val afTimestampMillis: AvroField[Instant] =
AvroField.logicalType[Long](LogicalTypes.timestampMillis())(Instant.ofEpochMilli)(
_.toEpochMilli
)
AvroField.logicalType[Long](LogicalTypes.timestampMillis()) { millisFromEpoch =>
Instant.ofEpochMilli(millisFromEpoch)
} { timestamp =>
timestamp.toEpochMilli
}

implicit val afTimeMillis: AvroField[LocalTime] =
AvroField.logicalType[Int](LogicalTypes.timeMillis())(ms =>
LocalTime.ofNanoOfDay(ms * 1000000L)
)(t => (t.toNanoOfDay / 1000000).toInt)
AvroField.logicalType[Int](LogicalTypes.timeMillis()) { millisFromMidnight =>
LocalTime.ofNanoOfDay(TimeUnit.MILLISECONDS.toNanos(millisFromMidnight.toLong))
} { time =>
TimeUnit.NANOSECONDS.toMillis(time.toNanoOfDay).toInt
}

// `LogicalTypes.localTimestampMillis` is Avro 1.10.0+
implicit val afLocalTimestampMillis: AvroField[LocalDateTime] =
AvroField.logicalType[Long](new LogicalType("local-timestamp-millis"))(ms =>
LocalDateTime.ofInstant(Instant.ofEpochMilli(ms), ZoneOffset.UTC)
)(_.toInstant(ZoneOffset.UTC).toEpochMilli)
AvroField.logicalType[Long](new LogicalType("local-timestamp-millis")) { millisFromEpoch =>
val instant = Instant.ofEpochMilli(millisFromEpoch)
LocalDateTime.ofInstant(instant, ZoneOffset.UTC)
} { timestamp =>
val instant = timestamp.toInstant(ZoneOffset.UTC)
instant.toEpochMilli
}

// avro 1.8 uses joda-time
implicit val afJodaTimestampMillis: AvroField[joda.DateTime] =
AvroField.logicalType[Long](LogicalTypes.timestampMillis()) { millisFromEpoch =>
new joda.DateTime(millisFromEpoch, joda.DateTimeZone.UTC)
} { timestamp =>
timestamp.getMillis
}

implicit val afJodaTimeMillis: AvroField[joda.LocalTime] =
AvroField.logicalType[Int](LogicalTypes.timeMillis()) { millisFromMidnight =>
joda.LocalTime.fromMillisOfDay(millisFromMidnight.toLong)
} { time =>
time.millisOfDay().get()
}
}

object bigquery {
// datetime is a custom logical type and must be registered
private final val DateTimeTypeName = "datetime"
private final val DateTimeLogicalTypeFactory: LogicalTypeFactory = (_: Schema) =>
new org.apache.avro.LogicalType(DateTimeTypeName)
new LogicalType(DateTimeTypeName)

/**
* Register custom logical types with avro, which is necessary to correctly parse a custom
Expand All @@ -72,38 +140,80 @@ package object logical {
* on the type name.
*/
def registerLogicalTypes(): Unit =
org.apache.avro.LogicalTypes.register(DateTimeTypeName, DateTimeLogicalTypeFactory)
LogicalTypes.register(DateTimeTypeName, DateTimeLogicalTypeFactory)

// DATETIME
// YYYY-[M]M-[D]D[ [H]H:[M]M:[S]S[.DDDDDD]]
private val DatetimePrinter = DateTimeFormatter.ofPattern("yyyy-MM-dd HH:mm:ss.SSSSSS")
private val DatePattern = "yyyy-MM-dd"
private val TimePattern = "HH:mm:ss"
private val DecimalPattern = "SSSSSS"
private val DatetimePattern = s"$DatePattern $TimePattern.$DecimalPattern"
private val DatetimePrinter = DateTimeFormatter.ofPattern(DatetimePattern)
private val DatetimeParser = new DateTimeFormatterBuilder()
.append(DateTimeFormatter.ofPattern("yyyy-MM-dd"))
.appendPattern(DatePattern)
.appendOptional(
new DateTimeFormatterBuilder()
.append(DateTimeFormatter.ofPattern(" HH:mm:ss"))
.appendOptional(DateTimeFormatter.ofPattern(".SSSSSS"))
.appendLiteral(' ')
.append(new DateTimeFormatterBuilder().appendPattern(TimePattern).toFormatter)
.appendOptional(
new DateTimeFormatterBuilder()
.appendLiteral('.')
.appendPattern(DecimalPattern)
.toFormatter
)
.toFormatter
)
.toFormatter
.withZone(ZoneOffset.UTC)

private val JodaDatetimePrinter = new joda.format.DateTimeFormatterBuilder()
.appendPattern(DatetimePattern)
.toFormatter

private val JodaDatetimeParser = new joda.format.DateTimeFormatterBuilder()
.appendPattern(DatePattern)
.appendOptional(
new joda.format.DateTimeFormatterBuilder()
.appendLiteral(' ')
.appendPattern(TimePattern)
.appendOptional(
new joda.format.DateTimeFormatterBuilder()
.appendLiteral('.')
.appendPattern(DecimalPattern)
.toParser
)
.toParser
)
.toFormatter
.withZone(joda.DateTimeZone.UTC)

// NUMERIC
// https://cloud.google.com/bigquery/docs/reference/standard-sql/data-types#numeric-type
implicit val afBigQueryNumeric: AvroField[BigDecimal] = AvroField.bigDecimal(38, 9)

// TIMESTAMP
implicit val afBigQueryTimestamp: AvroField[Instant] = micros.afTimestampMicros
implicit val afBigQueryJodaTimestamp: AvroField[joda.DateTime] =
micros.afJodaTimestampMicros

// DATE: `AvroField.afDate`

// TIME
implicit val afBigQueryTime: AvroField[LocalTime] = micros.afTimeMicros
implicit val afBigQueryJodaTime: AvroField[joda.LocalTime] = micros.afJodaTimeMicros

// DATETIME -> sqlType: DATETIME
implicit val afBigQueryDatetime: AvroField[LocalDateTime] =
AvroField.logicalType[CharSequence](new org.apache.avro.LogicalType(DateTimeTypeName))(cs =>
LocalDateTime.from(DatetimeParser.parse(cs))
)(DatetimePrinter.format)
AvroField.logicalType[CharSequence](new org.apache.avro.LogicalType(DateTimeTypeName)) { cs =>
LocalDateTime.parse(cs.toString, DatetimeParser)
} { datetime =>
DatetimePrinter.format(datetime)
}
implicit val afBigQueryJodaDatetime: AvroField[joda.LocalDateTime] =
AvroField.logicalType[CharSequence](new org.apache.avro.LogicalType(DateTimeTypeName)) { cs =>
joda.LocalDateTime.parse(cs.toString, JodaDatetimeParser)
} { datetime =>
JodaDatetimePrinter.print(datetime)
}
}
}
Loading