Skip to content

Commit

Permalink
Add joda-time and ByteBuffer support in avro (#740)
Browse files Browse the repository at this point in the history
* Add joda-time and ByteBuffer support in avro

* Move joda-time to common

* Add avro duration logical-type

* handle unsigned integers values properly

* Add all logical types tests

* FIx constant naming

* Fix default bytes behavior

* Use deepEquals for DefaultBytes test

* Import org.joda.time package
  • Loading branch information
RustedBones committed Sep 12, 2023
1 parent 69e2024 commit 808f103
Show file tree
Hide file tree
Showing 6 changed files with 333 additions and 87 deletions.
48 changes: 38 additions & 10 deletions avro/src/main/scala/magnolify/avro/AvroType.scala
Original file line number Diff line number Diff line change
Expand Up @@ -16,21 +16,22 @@

package magnolify.avro

import java.nio.ByteBuffer
import java.time._
import java.{util => ju}
import magnolia1._
import magnolify.shared._
import magnolify.shims.FactoryCompat
import org.apache.avro.generic.GenericData.EnumSymbol
import org.apache.avro.generic._
import org.apache.avro.{JsonProperties, LogicalType, LogicalTypes, Schema}
import org.joda.{time => joda}

import java.nio.{ByteBuffer, ByteOrder}
import java.time._
import java.{util => ju}
import scala.annotation.{implicitNotFound, nowarn}
import scala.collection.concurrent
import scala.reflect.ClassTag
import scala.jdk.CollectionConverters._
import scala.collection.compat._
import scala.jdk.CollectionConverters._
import scala.reflect.ClassTag

sealed trait AvroType[T] extends Converter[T, GenericRecord, GenericRecord] {
val schema: Schema
Expand Down Expand Up @@ -200,14 +201,14 @@ object AvroField {
implicit val afUnit =
aux2[Unit, JsonProperties.Null](Schema.Type.NULL)(_ => ())(_ => JsonProperties.NULL_VALUE)

implicit val afBytes = new Aux[Array[Byte], ByteBuffer, ByteBuffer] {
implicit val afByteBuffer: AvroField[ByteBuffer] = new Aux[ByteBuffer, ByteBuffer, ByteBuffer] {
override protected def buildSchema(cm: CaseMapper): Schema = Schema.create(Schema.Type.BYTES)
// `JacksonUtils.toJson` expects `Array[Byte]` for `BYTES` defaults
override def makeDefault(d: Array[Byte])(cm: CaseMapper): Array[Byte] = d
override def from(v: ByteBuffer)(cm: CaseMapper): Array[Byte] =
ju.Arrays.copyOfRange(v.array(), v.position(), v.limit())
override def to(v: Array[Byte])(cm: CaseMapper): ByteBuffer = ByteBuffer.wrap(v)
override def makeDefault(d: ByteBuffer)(cm: CaseMapper): Array[Byte] = d.array()
override def from(v: ByteBuffer)(cm: CaseMapper): ByteBuffer = v
override def to(v: ByteBuffer)(cm: CaseMapper): ByteBuffer = v
}
implicit val afBytes: AvroField[Array[Byte]] = from[ByteBuffer](_.array())(ByteBuffer.wrap)

@nowarn("msg=parameter value lp in method afEnum is never used")
implicit def afEnum[T](implicit et: EnumType[T], lp: shapeless.LowPriority): AvroField[T] =
Expand Down Expand Up @@ -295,6 +296,33 @@ object AvroField {
logicalType[String](LogicalTypes.uuid())(ju.UUID.fromString)(_.toString)
implicit val afDate: AvroField[LocalDate] =
logicalType[Int](LogicalTypes.date())(x => LocalDate.ofEpochDay(x.toLong))(_.toEpochDay.toInt)
private lazy val EpochJodaDate = new joda.LocalDate(1970, 1, 1)
implicit val afJodaDate: AvroField[joda.LocalDate] =
logicalType[Int](LogicalTypes.date()) { daysFromEpoch =>
EpochJodaDate.plusDays(daysFromEpoch)
} { date =>
joda.Days.daysBetween(EpochJodaDate, date).getDays
}

// duration, as in the avro spec. do not make implicit as there is not a specific type for it
// A duration logical type annotates Avro fixed type of size 12, which stores three little-endian unsigned integers
// that represent durations at different granularities of time.
// The first stores a number in months, the second stores a number in days, and the third stores a number in milliseconds.
val afDuration: AvroField[(Long, Long, Long)] =
logicalType[ByteBuffer](new LogicalType("duration")) { bs =>
bs.order(ByteOrder.LITTLE_ENDIAN)
val months = java.lang.Integer.toUnsignedLong(bs.getInt)
val days = java.lang.Integer.toUnsignedLong(bs.getInt)
val millis = java.lang.Integer.toUnsignedLong(bs.getInt)
(months, days, millis)
} { case (months, days, millis) =>
ByteBuffer
.allocate(12)
.order(ByteOrder.LITTLE_ENDIAN)
.putInt(months.toInt)
.putInt(days.toInt)
.putInt(millis.toInt)
}(AvroField.fixed(12)(ByteBuffer.wrap)(_.array()))

def fixed[T: ClassTag](
size: Int
Expand Down
170 changes: 140 additions & 30 deletions avro/src/main/scala/magnolify/avro/logical/package.scala
Original file line number Diff line number Diff line change
Expand Up @@ -17,53 +17,121 @@
package magnolify.avro

import org.apache.avro.LogicalTypes.LogicalTypeFactory
import org.apache.avro.{LogicalType, LogicalTypes, Schema}
import org.joda.{time => joda}

import java.time.{Instant, LocalDateTime, LocalTime, ZoneOffset}
import java.time._
import java.time.format.{DateTimeFormatter, DateTimeFormatterBuilder}
import org.apache.avro.{LogicalType, LogicalTypes, Schema}
import java.util.concurrent.TimeUnit

package object logical {
// Duplicate implementation from org.apache.avro.data.TimeConversions
// to support both 1.8 (joda-time based) and 1.9+ (java-time based)
object micros {
private def toTimestampMicros(microsFromEpoch: Long): Instant = {
val epochSeconds = microsFromEpoch / 1000000L
val nanoAdjustment = (microsFromEpoch % 1000000L) * 1000L;
Instant.ofEpochSecond(epochSeconds, nanoAdjustment)
}

private def fromTimestampMicros(instant: Instant): Long = {
val seconds = instant.getEpochSecond
val nanos = instant.getNano
if (seconds < 0 && nanos > 0) {
val micros = Math.multiplyExact(seconds + 1, 1000000L)
val adjustment = (nanos / 1000L) - 1000000
Math.addExact(micros, adjustment)
} else {
val micros = Math.multiplyExact(seconds, 1000000L)
Math.addExact(micros, nanos / 1000L)
}
}

implicit val afTimestampMicros: AvroField[Instant] =
AvroField.logicalType[Long](LogicalTypes.timestampMicros())(us =>
Instant.ofEpochMilli(us / 1000)
)(_.toEpochMilli * 1000)
AvroField.logicalType[Long](LogicalTypes.timestampMicros())(toTimestampMicros)(
fromTimestampMicros
)

implicit val afTimeMicros: AvroField[LocalTime] =
AvroField.logicalType[Long](LogicalTypes.timeMicros())(us =>
LocalTime.ofNanoOfDay(us * 1000)
)(_.toNanoOfDay / 1000)
AvroField.logicalType[Long](LogicalTypes.timeMicros()) { us =>
LocalTime.ofNanoOfDay(TimeUnit.MICROSECONDS.toNanos(us))
} { time =>
TimeUnit.NANOSECONDS.toMicros(time.toNanoOfDay)
}

// `LogicalTypes.localTimestampMicros` is Avro 1.10.0+
// `LogicalTypes.localTimestampMicros()` is Avro 1.10
implicit val afLocalTimestampMicros: AvroField[LocalDateTime] =
AvroField.logicalType[Long](new LogicalType("local-timestamp-micros"))(us =>
LocalDateTime.ofInstant(Instant.ofEpochMilli(us / 1000), ZoneOffset.UTC)
)(_.toInstant(ZoneOffset.UTC).toEpochMilli * 1000)
AvroField.logicalType[Long](new LogicalType("local-timestamp-micros")) { microsFromEpoch =>
val instant = toTimestampMicros(microsFromEpoch)
LocalDateTime.ofInstant(instant, ZoneOffset.UTC)
} { timestamp =>
val instant = timestamp.toInstant(ZoneOffset.UTC)
fromTimestampMicros(instant)
}

// avro 1.8 uses joda-time
implicit val afJodaTimestampMicros: AvroField[joda.DateTime] =
AvroField.logicalType[Long](LogicalTypes.timestampMicros()) { microsFromEpoch =>
new joda.DateTime(microsFromEpoch / 1000, joda.DateTimeZone.UTC)
} { timestamp =>
1000 * timestamp.getMillis
}

implicit val afJodaTimeMicros: AvroField[joda.LocalTime] =
AvroField.logicalType[Long](LogicalTypes.timeMicros()) { microsFromMidnight =>
joda.LocalTime.fromMillisOfDay(microsFromMidnight / 1000)
} { time =>
// from LossyTimeMicrosConversion
1000L * time.millisOfDay().get()
}
}

object millis {
implicit val afTimestampMillis: AvroField[Instant] =
AvroField.logicalType[Long](LogicalTypes.timestampMillis())(Instant.ofEpochMilli)(
_.toEpochMilli
)
AvroField.logicalType[Long](LogicalTypes.timestampMillis()) { millisFromEpoch =>
Instant.ofEpochMilli(millisFromEpoch)
} { timestamp =>
timestamp.toEpochMilli
}

implicit val afTimeMillis: AvroField[LocalTime] =
AvroField.logicalType[Int](LogicalTypes.timeMillis())(ms =>
LocalTime.ofNanoOfDay(ms * 1000000L)
)(t => (t.toNanoOfDay / 1000000).toInt)
AvroField.logicalType[Int](LogicalTypes.timeMillis()) { millisFromMidnight =>
LocalTime.ofNanoOfDay(TimeUnit.MILLISECONDS.toNanos(millisFromMidnight.toLong))
} { time =>
TimeUnit.NANOSECONDS.toMillis(time.toNanoOfDay).toInt
}

// `LogicalTypes.localTimestampMillis` is Avro 1.10.0+
implicit val afLocalTimestampMillis: AvroField[LocalDateTime] =
AvroField.logicalType[Long](new LogicalType("local-timestamp-millis"))(ms =>
LocalDateTime.ofInstant(Instant.ofEpochMilli(ms), ZoneOffset.UTC)
)(_.toInstant(ZoneOffset.UTC).toEpochMilli)
AvroField.logicalType[Long](new LogicalType("local-timestamp-millis")) { millisFromEpoch =>
val instant = Instant.ofEpochMilli(millisFromEpoch)
LocalDateTime.ofInstant(instant, ZoneOffset.UTC)
} { timestamp =>
val instant = timestamp.toInstant(ZoneOffset.UTC)
instant.toEpochMilli
}

// avro 1.8 uses joda-time
implicit val afJodaTimestampMillis: AvroField[joda.DateTime] =
AvroField.logicalType[Long](LogicalTypes.timestampMillis()) { millisFromEpoch =>
new joda.DateTime(millisFromEpoch, joda.DateTimeZone.UTC)
} { timestamp =>
timestamp.getMillis
}

implicit val afJodaTimeMillis: AvroField[joda.LocalTime] =
AvroField.logicalType[Int](LogicalTypes.timeMillis()) { millisFromMidnight =>
joda.LocalTime.fromMillisOfDay(millisFromMidnight.toLong)
} { time =>
time.millisOfDay().get()
}
}

object bigquery {
// datetime is a custom logical type and must be registered
private final val DateTimeTypeName = "datetime"
private final val DateTimeLogicalTypeFactory: LogicalTypeFactory = (_: Schema) =>
new org.apache.avro.LogicalType(DateTimeTypeName)
new LogicalType(DateTimeTypeName)

/**
* Register custom logical types with avro, which is necessary to correctly parse a custom
Expand All @@ -72,38 +140,80 @@ package object logical {
* on the type name.
*/
def registerLogicalTypes(): Unit =
org.apache.avro.LogicalTypes.register(DateTimeTypeName, DateTimeLogicalTypeFactory)
LogicalTypes.register(DateTimeTypeName, DateTimeLogicalTypeFactory)

// DATETIME
// YYYY-[M]M-[D]D[ [H]H:[M]M:[S]S[.DDDDDD]]
private val DatetimePrinter = DateTimeFormatter.ofPattern("yyyy-MM-dd HH:mm:ss.SSSSSS")
private val DatePattern = "yyyy-MM-dd"
private val TimePattern = "HH:mm:ss"
private val DecimalPattern = "SSSSSS"
private val DatetimePattern = s"$DatePattern $TimePattern.$DecimalPattern"
private val DatetimePrinter = DateTimeFormatter.ofPattern(DatetimePattern)
private val DatetimeParser = new DateTimeFormatterBuilder()
.append(DateTimeFormatter.ofPattern("yyyy-MM-dd"))
.appendPattern(DatePattern)
.appendOptional(
new DateTimeFormatterBuilder()
.append(DateTimeFormatter.ofPattern(" HH:mm:ss"))
.appendOptional(DateTimeFormatter.ofPattern(".SSSSSS"))
.appendLiteral(' ')
.append(new DateTimeFormatterBuilder().appendPattern(TimePattern).toFormatter)
.appendOptional(
new DateTimeFormatterBuilder()
.appendLiteral('.')
.appendPattern(DecimalPattern)
.toFormatter
)
.toFormatter
)
.toFormatter
.withZone(ZoneOffset.UTC)

private val JodaDatetimePrinter = new joda.format.DateTimeFormatterBuilder()
.appendPattern(DatetimePattern)
.toFormatter

private val JodaDatetimeParser = new joda.format.DateTimeFormatterBuilder()
.appendPattern(DatePattern)
.appendOptional(
new joda.format.DateTimeFormatterBuilder()
.appendLiteral(' ')
.appendPattern(TimePattern)
.appendOptional(
new joda.format.DateTimeFormatterBuilder()
.appendLiteral('.')
.appendPattern(DecimalPattern)
.toParser
)
.toParser
)
.toFormatter
.withZone(joda.DateTimeZone.UTC)

// NUMERIC
// https://cloud.google.com/bigquery/docs/reference/standard-sql/data-types#numeric-type
implicit val afBigQueryNumeric: AvroField[BigDecimal] = AvroField.bigDecimal(38, 9)

// TIMESTAMP
implicit val afBigQueryTimestamp: AvroField[Instant] = micros.afTimestampMicros
implicit val afBigQueryJodaTimestamp: AvroField[joda.DateTime] =
micros.afJodaTimestampMicros

// DATE: `AvroField.afDate`

// TIME
implicit val afBigQueryTime: AvroField[LocalTime] = micros.afTimeMicros
implicit val afBigQueryJodaTime: AvroField[joda.LocalTime] = micros.afJodaTimeMicros

// DATETIME -> sqlType: DATETIME
implicit val afBigQueryDatetime: AvroField[LocalDateTime] =
AvroField.logicalType[String](new org.apache.avro.LogicalType(DateTimeTypeName))(s =>
LocalDateTime.from(DatetimeParser.parse(s))
)(DatetimePrinter.format)
AvroField.logicalType[String](new org.apache.avro.LogicalType(DateTimeTypeName)) { str =>
LocalDateTime.parse(str, DatetimeParser)
} { datetime =>
DatetimePrinter.format(datetime)
}
implicit val afBigQueryJodaDatetime: AvroField[joda.LocalDateTime] =
AvroField.logicalType[String](new org.apache.avro.LogicalType(DateTimeTypeName)) { str =>
joda.LocalDateTime.parse(str, JodaDatetimeParser)
} { datetime =>
JodaDatetimePrinter.print(datetime)
}
}
}
Loading

0 comments on commit 808f103

Please sign in to comment.