From 8996992a52ba56af0839b103cb5e05d3a8ea8295 Mon Sep 17 00:00:00 2001 From: Alexey Novakov Date: Tue, 18 Jun 2024 21:22:22 +0200 Subject: [PATCH] fix test code to compile with Flink 1.19 --- build.sbt | 2 +- release.sh | 2 +- .../scala/org/apache/flinkx/api/AnyTest.scala | 14 ++-- .../org/apache/flinkx/api/CatsTest.scala | 12 +++- .../flinkx/api/SchemaEvolutionTest.scala | 17 +++-- .../flinkx/api/SerializerSnapshotTest.scala | 27 +++++--- .../apache/flinkx/api/SerializerTest.scala | 67 ++++++++++--------- 7 files changed, 83 insertions(+), 58 deletions(-) diff --git a/build.sbt b/build.sbt index 4bc08ce..b0f64a8 100644 --- a/build.sbt +++ b/build.sbt @@ -4,7 +4,7 @@ Global / onChangedBuildSource := ReloadOnSourceChanges Global / excludeLintKeys := Set(git.useGitDescribe) lazy val rootScalaVersion = "3.3.3" -lazy val flinkVersion = System.getProperty("flinkVersion", "1.16.3") +lazy val flinkVersion = System.getProperty("flinkVersion", "1.17.2") lazy val root = (project in file(".")) .settings(ReleaseProcess.releaseSettings(flinkVersion) *) diff --git a/release.sh b/release.sh index 99e8eb5..49d9947 100644 --- a/release.sh +++ b/release.sh @@ -1,6 +1,6 @@ #!/bin/bash set -ex -RELEASE_VERSION_BUMP=true sbt -DflinkVersion=1.16.3 test 'release with-defaults' RELEASE_VERSION_BUMP=true sbt -DflinkVersion=1.17.2 test 'release with-defaults' RELEASE_VERSION_BUMP=true sbt -DflinkVersion=1.18.1 test 'release with-defaults' +RELEASE_VERSION_BUMP=true sbt -DflinkVersion=1.19.0 test 'release with-defaults' wait \ No newline at end of file diff --git a/src/test/scala/org/apache/flinkx/api/AnyTest.scala b/src/test/scala/org/apache/flinkx/api/AnyTest.scala index 8562bc2..ef1f674 100644 --- a/src/test/scala/org/apache/flinkx/api/AnyTest.scala +++ b/src/test/scala/org/apache/flinkx/api/AnyTest.scala @@ -6,27 +6,33 @@ import org.apache.flinkx.api.AnyTest._ import org.apache.flinkx.api.AnyTest.FAny.FValueAny.FTerm import org.apache.flinkx.api.AnyTest.FAny.FValueAny.FTerm.StringTerm import org.apache.flink.api.common.typeinfo.TypeInformation +import org.apache.flink.api.common.ExecutionConfig import org.scalatest.flatspec.AnyFlatSpec import org.scalatest.matchers.should.Matchers class AnyTest extends AnyFlatSpec with Matchers with TestUtils { + val ec = ExecutionConfig() + + def createSerializer[T: TypeInformation] = + implicitly[TypeInformation[T]].createSerializer(ec) + it should "serialize concrete class" in { - val ser = implicitly[TypeInformation[StringTerm]].createSerializer(null) + val ser = createSerializer[StringTerm] roundtrip(ser, StringTerm("fo")) } it should "serialize ADT" in { - val ser = implicitly[TypeInformation[FAny]].createSerializer(null) + val ser = createSerializer[FAny] roundtrip(ser, StringTerm("fo")) } it should "serialize NEL" in { - val ser = implicitly[TypeInformation[NonEmptyList[FTerm]]].createSerializer(null) + val ser = createSerializer[NonEmptyList[FTerm]] roundtrip(ser, NonEmptyList.one(StringTerm("fo"))) } it should "serialize nested nel" in { - val ser = implicitly[TypeInformation[TermFilter]].createSerializer(null) + val ser = createSerializer[TermFilter] roundtrip(ser, TermFilter("a", NonEmptyList.one(StringTerm("fo")))) } } diff --git a/src/test/scala/org/apache/flinkx/api/CatsTest.scala b/src/test/scala/org/apache/flinkx/api/CatsTest.scala index 7101207..6bc215a 100644 --- a/src/test/scala/org/apache/flinkx/api/CatsTest.scala +++ b/src/test/scala/org/apache/flinkx/api/CatsTest.scala @@ -4,16 +4,24 @@ import cats.data.NonEmptyList import org.scalatest.flatspec.AnyFlatSpec import org.scalatest.matchers.should.Matchers +import org.apache.flink.api.common.ExecutionConfig +import org.apache.flink.api.common.typeinfo.TypeInformation import org.apache.flinkx.api.serializers._ class CatsTest extends AnyFlatSpec with Matchers with TestUtils { + implicit val stringListTi: TypeInformation[NonEmptyList[String]] = deriveTypeInformation + implicit val intListTi: TypeInformation[NonEmptyList[Int]] = deriveTypeInformation + + def createSerializer[T: TypeInformation] = + implicitly[TypeInformation[T]].createSerializer(ExecutionConfig()) it should "derive for NEL[String]" in { - val ser = deriveTypeInformation[NonEmptyList[String]].createSerializer(null) + val ser = createSerializer[NonEmptyList[String]] roundtrip(ser, NonEmptyList.one("doo")) } + it should "derive for NEL[Int]" in { - val ser = deriveTypeInformation[NonEmptyList[Int]].createSerializer(null) + val ser = createSerializer[NonEmptyList[Int]] roundtrip(ser, NonEmptyList.one(1)) } } diff --git a/src/test/scala/org/apache/flinkx/api/SchemaEvolutionTest.scala b/src/test/scala/org/apache/flinkx/api/SchemaEvolutionTest.scala index e478074..cb989d0 100644 --- a/src/test/scala/org/apache/flinkx/api/SchemaEvolutionTest.scala +++ b/src/test/scala/org/apache/flinkx/api/SchemaEvolutionTest.scala @@ -3,7 +3,9 @@ package org.apache.flinkx.api import org.apache.flinkx.api.SchemaEvolutionTest.{Click, ClickEvent, Event, NoArityTest} import org.apache.flinkx.api.serializers._ import org.apache.flinkx.api.serializer.ScalaCaseClassSerializer +import org.apache.flink.api.common.typeinfo.TypeInformation import org.apache.flink.core.memory.{DataInputViewStreamWrapper, DataOutputViewStreamWrapper} +import org.apache.flink.api.common.ExecutionConfig import org.scalatest.flatspec.AnyFlatSpec import org.scalatest.matchers.should.Matchers @@ -11,34 +13,37 @@ import java.io.ByteArrayOutputStream import java.nio.file.{Files, Path} class SchemaEvolutionTest extends AnyFlatSpec with Matchers { - private val eventTypeInfo = deriveTypeInformation[Event] - private val arityTestInfo = deriveTypeInformation[NoArityTest] + private implicit val eventTypeInfo: TypeInformation[Event] = deriveTypeInformation[Event] + private implicit val arityTestInfo: TypeInformation[NoArityTest] = deriveTypeInformation[NoArityTest] private val clicks = List(ClickEvent("a", "2021-01-01"), ClickEvent("b", "2021-01-01"), ClickEvent("c", "2021-01-01")) + def createSerializer[T: TypeInformation] = + implicitly[TypeInformation[T]].createSerializer(ExecutionConfig()) + ignore should "generate blob for event=click+purchase" in { val buffer = new ByteArrayOutputStream() - val eventSerializer = eventTypeInfo.createSerializer(null) + val eventSerializer = createSerializer[Event] eventSerializer.serialize(Click("p1", clicks), new DataOutputViewStreamWrapper(buffer)) Files.write(Path.of("src/test/resources/click.dat"), buffer.toByteArray) } it should "decode click when we added view" in { val buffer = this.getClass.getResourceAsStream("/click.dat") - val click = eventTypeInfo.createSerializer(null).deserialize(new DataInputViewStreamWrapper(buffer)) + val click = createSerializer[Event].deserialize(new DataInputViewStreamWrapper(buffer)) click shouldBe Click("p1", clicks) } ignore should "generate blob for no arity test" in { val buffer = new ByteArrayOutputStream() - val eventSerializer = arityTestInfo.createSerializer(null) + val eventSerializer = createSerializer[NoArityTest] eventSerializer.serialize(NoArityTest(4, 3, List("test")), new DataOutputViewStreamWrapper(buffer)) Files.write(Path.of("src/test/resources/without-arity-test.dat"), buffer.toByteArray) } it should "decode class without arity info" in { val buffer = this.getClass.getResourceAsStream("/without-arity-test.dat") - val serializer = arityTestInfo.createSerializer(null) match { + val serializer = createSerializer[NoArityTest] match { case s: ScalaCaseClassSerializer[_] => s case s => fail(s"Derived serializer must be of CaseClassSerializer type, but was $s") } diff --git a/src/test/scala/org/apache/flinkx/api/SerializerSnapshotTest.scala b/src/test/scala/org/apache/flinkx/api/SerializerSnapshotTest.scala index 6d0dc6b..110393e 100644 --- a/src/test/scala/org/apache/flinkx/api/SerializerSnapshotTest.scala +++ b/src/test/scala/org/apache/flinkx/api/SerializerSnapshotTest.scala @@ -12,11 +12,13 @@ import org.apache.flinkx.api.SerializerSnapshotTest.{ TraitMap } import org.apache.flink.api.common.typeutils.TypeSerializer +import org.apache.flink.api.common.ExecutionConfig import org.apache.flink.core.memory.{DataInputViewStreamWrapper, DataOutputViewStreamWrapper} import org.scalatest.flatspec.AnyFlatSpec import org.scalatest.matchers.should.Matchers import org.apache.flinkx.api.serializers._ import org.apache.flink.api.common.typeinfo.TypeInformation +import org.apache.flink.api.common.ExecutionConfig import org.apache.flink.util.ChildFirstClassLoader import org.scalatest.Assertion @@ -24,53 +26,56 @@ import java.net.URLClassLoader class SerializerSnapshotTest extends AnyFlatSpec with Matchers { + def createSerializer[T: TypeInformation] = + implicitly[TypeInformation[T]].createSerializer(ExecutionConfig()) + it should "roundtrip product serializer snapshot" in { - val ser = deriveTypeInformation[SimpleClass1].createSerializer(null) + val ser = createSerializer[SimpleClass1] assertRoundtripSerializer(ser) } it should "roundtrip coproduct serializer snapshot" in { - val ser = deriveTypeInformation[OuterTrait].createSerializer(null) + val ser = createSerializer[OuterTrait] assertRoundtripSerializer(ser) } it should "roundtrip coproduct serializer snapshot with singletons" in { - val ser = deriveTypeInformation[ADT2].createSerializer(null) + val ser = createSerializer[ADT2] assertRoundtripSerializer(ser) } it should "roundtrip serializer snapshot with list of primitives" in { - val ser = deriveTypeInformation[List[Double]].createSerializer(null) + val ser = createSerializer[List[Double]] assertRoundtripSerializer(ser) } it should "roundtrip serializer snapshot with set as array of primitives" in { - val ser = implicitly[TypeInformation[Set[Double]]].createSerializer(null) + val ser = createSerializer[Set[Double]] assertRoundtripSerializer(ser) } it should "do array ser snapshot" in { - val set = deriveTypeInformation[SimpleClassArray].createSerializer(null) + val set = createSerializer[SimpleClassArray] assertRoundtripSerializer(set) } it should "do map ser snapshot" in { - assertRoundtripSerializer(deriveTypeInformation[SimpleClassMap1].createSerializer(null)) - assertRoundtripSerializer(deriveTypeInformation[SimpleClassMap2].createSerializer(null)) + assertRoundtripSerializer(createSerializer[SimpleClassMap1]) + assertRoundtripSerializer(createSerializer[SimpleClassMap2]) } it should "do list ser snapshot" in { - assertRoundtripSerializer(deriveTypeInformation[SimpleClassList].createSerializer(null)) + assertRoundtripSerializer(createSerializer[SimpleClassList]) } it should "do map ser snapshot adt " in { implicit val ti: Typeclass[OuterTrait] = deriveTypeInformation[OuterTrait] drop(ti) - assertRoundtripSerializer(deriveTypeInformation[TraitMap].createSerializer(null)) + assertRoundtripSerializer(createSerializer[TraitMap]) } it should "be compatible after snapshot deserialization in different classloader" in { - val ser = deriveTypeInformation[SimpleClass1].createSerializer(null) + val ser = createSerializer[SimpleClass1] val cl = newClassLoader(classOf[SimpleClass1]) try { val restored = roundtripSerializer(ser, cl) diff --git a/src/test/scala/org/apache/flinkx/api/SerializerTest.scala b/src/test/scala/org/apache/flinkx/api/SerializerTest.scala index 69ccd2d..9678a18 100644 --- a/src/test/scala/org/apache/flinkx/api/SerializerTest.scala +++ b/src/test/scala/org/apache/flinkx/api/SerializerTest.scala @@ -29,6 +29,7 @@ import org.apache.flinkx.api.SerializerTest.{ WrappedADT } import org.apache.flink.api.common.typeinfo.TypeInformation +import org.apache.flink.api.common.ExecutionConfig import org.scalatest.Inspectors import org.scalatest.flatspec.AnyFlatSpec import org.scalatest.matchers.should.Matchers @@ -38,68 +39,69 @@ import java.time.{Instant, LocalDate, LocalDateTime} import java.util.UUID class SerializerTest extends AnyFlatSpec with Matchers with Inspectors with TestUtils { + val ec = ExecutionConfig() it should "derive serializer for simple class" in { - val ser = implicitly[TypeInformation[Simple]].createSerializer(null) + val ser = implicitly[TypeInformation[Simple]].createSerializer(ec) all(ser, Simple(1, "foo")) } it should "derive serializer for java classes" in { - val ser = implicitly[TypeInformation[SimpleJava]].createSerializer(null) + val ser = implicitly[TypeInformation[SimpleJava]].createSerializer(ec) all(ser, SimpleJava(1, "foo")) } it should "derive serializer for java.time classes" in { - val ser = implicitly[TypeInformation[JavaTime]].createSerializer(null) + val ser = implicitly[TypeInformation[JavaTime]].createSerializer(ec) all(ser, JavaTime(Instant.now(), LocalDate.now(), LocalDateTime.now())) } it should "derive nested classes" in { - val ser = implicitly[TypeInformation[Nested]].createSerializer(null) + val ser = implicitly[TypeInformation[Nested]].createSerializer(ec) all(ser, Nested(Simple(1, "foo"))) } it should "derive for ADTs" in { - val ser = implicitly[TypeInformation[ADT]].createSerializer(null) + val ser = implicitly[TypeInformation[ADT]].createSerializer(ec) all(ser, Foo("a")) all(ser, Bar(1)) } it should "derive for ADTs with case objects" in { - val ser = implicitly[TypeInformation[ADT2]].createSerializer(null) + val ser = implicitly[TypeInformation[ADT2]].createSerializer(ec) all(ser, Foo2) all(ser, Bar2) } it should "derive for deeply nested classes" in { - val ser = implicitly[TypeInformation[Egg]].createSerializer(null) + val ser = implicitly[TypeInformation[Egg]].createSerializer(ec) all(ser, Egg(1)) } it should "derive for deeply nested adts" in { - val ser = implicitly[TypeInformation[Food]].createSerializer(null) + val ser = implicitly[TypeInformation[Food]].createSerializer(ec) all(ser, Egg(1)) } it should "derive for nested ADTs" in { - val ser = implicitly[TypeInformation[WrappedADT]].createSerializer(null) + val ser = implicitly[TypeInformation[WrappedADT]].createSerializer(ec) all(ser, WrappedADT(Foo("a"))) all(ser, WrappedADT(Bar(1))) } it should "derive for generic ADTs" in { - val ser = implicitly[TypeInformation[Param[Int]]].createSerializer(null) + val ser = implicitly[TypeInformation[Param[Int]]].createSerializer(ec) all(ser, P2(1)) } it should "derive seq" in { - val ser = implicitly[TypeInformation[SimpleSeq]].createSerializer(null) + val ser = implicitly[TypeInformation[SimpleSeq]].createSerializer(ec) noKryo(ser) serializable(ser) } it should "derive list of ADT" in { - val ser = implicitly[TypeInformation[List[ADT]]].createSerializer(null) + val ser = implicitly[TypeInformation[List[ADT]]].createSerializer(ec) all(ser, List(Foo("a"))) roundtrip(ser, ::(Foo("a"), Nil)) roundtrip(ser, Nil) @@ -111,23 +113,23 @@ class SerializerTest extends AnyFlatSpec with Matchers with Inspectors with Test } it should "derive list" in { - val ser = implicitly[TypeInformation[List[Simple]]].createSerializer(null) + val ser = implicitly[TypeInformation[List[Simple]]].createSerializer(ec) all(ser, List(Simple(1, "a"))) } it should "derive nested list" in { - val ser = implicitly[TypeInformation[List[SimpleList]]].createSerializer(null) + val ser = implicitly[TypeInformation[List[SimpleList]]].createSerializer(ec) all(ser, List(SimpleList(List(1)))) } it should "derive seq of seq" in { - val ser = implicitly[TypeInformation[SimpleSeqSeq]].createSerializer(null) + val ser = implicitly[TypeInformation[SimpleSeqSeq]].createSerializer(ec) noKryo(ser) serializable(ser) } it should "derive generic type bounded classes" in { - val ser = implicitly[TypeInformation[BoundADT[Foo]]].createSerializer(null) + val ser = implicitly[TypeInformation[BoundADT[Foo]]].createSerializer(ec) noKryo(ser) } @@ -138,84 +140,83 @@ class SerializerTest extends AnyFlatSpec with Matchers with Inspectors with Test it should "return the same TypeSerializer instance with case classes" in { val ti = implicitly[TypeInformation[Simple]] - val ser1 = ti.createSerializer(null) - val ser2 = ti.createSerializer(null) + val ser1 = ti.createSerializer(ec) + val ser2 = ti.createSerializer(ec) ser1 should be theSameInstanceAs ser2 } it should "be serializable in case of annotations on classes" in { - val ser = implicitly[TypeInformation[Annotated]].createSerializer(null) + val ser = implicitly[TypeInformation[Annotated]].createSerializer(ec) serializable(ser) } it should "be serializable in case of annotations on subtypes" in { - val ser = implicitly[TypeInformation[Ann]].createSerializer(null) + val ser = implicitly[TypeInformation[Ann]].createSerializer(ec) serializable(ser) } it should "serialize Option" in { - val ser = implicitly[TypeInformation[SimpleOption]].createSerializer(null) + val ser = implicitly[TypeInformation[SimpleOption]].createSerializer(ec) all(ser, SimpleOption(None)) roundtrip(ser, SimpleOption(Some("foo"))) } it should "serialize Either" in { - val ser = implicitly[TypeInformation[SimpleEither]].createSerializer(null) + val ser = implicitly[TypeInformation[SimpleEither]].createSerializer(ec) all(ser, SimpleEither(Left("foo"))) roundtrip(ser, SimpleEither(Right(42))) } it should "serialize nested list of ADT" in { - val ser = implicitly[TypeInformation[ListADT]].createSerializer(null) + val ser = implicitly[TypeInformation[ListADT]].createSerializer(ec) all(ser, ListADT(Nil)) roundtrip(ser, ListADT(List(Foo("a")))) } it should "derive multiple instances of generic class" in { - val ser = implicitly[TypeInformation[Generic[SimpleOption]]].createSerializer(null) - val ser2 = implicitly[TypeInformation[Generic[Simple]]].createSerializer(null) + val ser = implicitly[TypeInformation[Generic[SimpleOption]]].createSerializer(ec) + val ser2 = implicitly[TypeInformation[Generic[Simple]]].createSerializer(ec) all(ser, Generic(SimpleOption(None), Bar(0))) all(ser2, Generic(Simple(0, "asd"), Bar(0))) } it should "serialize nil" in { - val ser = implicitly[TypeInformation[NonEmptyList[String]]].createSerializer(null) + val ser = implicitly[TypeInformation[NonEmptyList[String]]].createSerializer(ec) roundtrip(ser, NonEmptyList.one("a")) } it should "serialize unit" in { - val ser = implicitly[TypeInformation[Unit]].createSerializer(null) + val ser = implicitly[TypeInformation[Unit]].createSerializer(ec) roundtrip(ser, ()) } it should "serialize triple-nested case clases" in { - val ser = implicitly[TypeInformation[Seq[NestedBottom]]].createSerializer(null) + val ser = implicitly[TypeInformation[Seq[NestedBottom]]].createSerializer(ec) roundtrip(ser, List(NestedBottom(Some("a"), None))) } it should "serialize classes with type mapper" in { import MappedTypeInfoTest._ - val ser = implicitly[TypeInformation[WrappedString]].createSerializer(null) + val ser = implicitly[TypeInformation[WrappedString]].createSerializer(ec) val str = new WrappedString() str.put("foo") roundtrip(ser, str) } it should "serialize bigint" in { - val ser = implicitly[TypeInformation[BigInt]].createSerializer(null) + val ser = implicitly[TypeInformation[BigInt]].createSerializer(ec) roundtrip(ser, BigInt(123)) } it should "serialize bigdec" in { - val ser = implicitly[TypeInformation[BigDecimal]].createSerializer(null) + val ser = implicitly[TypeInformation[BigDecimal]].createSerializer(ec) roundtrip(ser, BigDecimal(123)) } it should "serialize uuid" in { - val ser = implicitly[TypeInformation[UUID]].createSerializer(null) + val ser = implicitly[TypeInformation[UUID]].createSerializer(ec) roundtrip(ser, UUID.randomUUID()) } - } object SerializerTest {