Skip to content

Commit

Permalink
fix test code to compile with Flink 1.19
Browse files Browse the repository at this point in the history
  • Loading branch information
novakov-alexey committed Jun 18, 2024
1 parent 4d8d69e commit 8996992
Show file tree
Hide file tree
Showing 7 changed files with 83 additions and 58 deletions.
2 changes: 1 addition & 1 deletion build.sbt
Original file line number Diff line number Diff line change
Expand Up @@ -4,7 +4,7 @@ Global / onChangedBuildSource := ReloadOnSourceChanges
Global / excludeLintKeys := Set(git.useGitDescribe)

lazy val rootScalaVersion = "3.3.3"
lazy val flinkVersion = System.getProperty("flinkVersion", "1.16.3")
lazy val flinkVersion = System.getProperty("flinkVersion", "1.17.2")

lazy val root = (project in file("."))
.settings(ReleaseProcess.releaseSettings(flinkVersion) *)
Expand Down
2 changes: 1 addition & 1 deletion release.sh
Original file line number Diff line number Diff line change
@@ -1,6 +1,6 @@
#!/bin/bash
set -ex
RELEASE_VERSION_BUMP=true sbt -DflinkVersion=1.16.3 test 'release with-defaults'
RELEASE_VERSION_BUMP=true sbt -DflinkVersion=1.17.2 test 'release with-defaults'
RELEASE_VERSION_BUMP=true sbt -DflinkVersion=1.18.1 test 'release with-defaults'
RELEASE_VERSION_BUMP=true sbt -DflinkVersion=1.19.0 test 'release with-defaults'
wait
14 changes: 10 additions & 4 deletions src/test/scala/org/apache/flinkx/api/AnyTest.scala
Original file line number Diff line number Diff line change
Expand Up @@ -6,27 +6,33 @@ import org.apache.flinkx.api.AnyTest._
import org.apache.flinkx.api.AnyTest.FAny.FValueAny.FTerm
import org.apache.flinkx.api.AnyTest.FAny.FValueAny.FTerm.StringTerm
import org.apache.flink.api.common.typeinfo.TypeInformation
import org.apache.flink.api.common.ExecutionConfig
import org.scalatest.flatspec.AnyFlatSpec
import org.scalatest.matchers.should.Matchers

class AnyTest extends AnyFlatSpec with Matchers with TestUtils {
val ec = ExecutionConfig()

def createSerializer[T: TypeInformation] =
implicitly[TypeInformation[T]].createSerializer(ec)

it should "serialize concrete class" in {
val ser = implicitly[TypeInformation[StringTerm]].createSerializer(null)
val ser = createSerializer[StringTerm]
roundtrip(ser, StringTerm("fo"))
}

it should "serialize ADT" in {
val ser = implicitly[TypeInformation[FAny]].createSerializer(null)
val ser = createSerializer[FAny]
roundtrip(ser, StringTerm("fo"))
}

it should "serialize NEL" in {
val ser = implicitly[TypeInformation[NonEmptyList[FTerm]]].createSerializer(null)
val ser = createSerializer[NonEmptyList[FTerm]]
roundtrip(ser, NonEmptyList.one(StringTerm("fo")))
}

it should "serialize nested nel" in {
val ser = implicitly[TypeInformation[TermFilter]].createSerializer(null)
val ser = createSerializer[TermFilter]
roundtrip(ser, TermFilter("a", NonEmptyList.one(StringTerm("fo"))))
}
}
Expand Down
12 changes: 10 additions & 2 deletions src/test/scala/org/apache/flinkx/api/CatsTest.scala
Original file line number Diff line number Diff line change
Expand Up @@ -4,16 +4,24 @@ import cats.data.NonEmptyList
import org.scalatest.flatspec.AnyFlatSpec
import org.scalatest.matchers.should.Matchers

import org.apache.flink.api.common.ExecutionConfig
import org.apache.flink.api.common.typeinfo.TypeInformation
import org.apache.flinkx.api.serializers._

class CatsTest extends AnyFlatSpec with Matchers with TestUtils {
implicit val stringListTi: TypeInformation[NonEmptyList[String]] = deriveTypeInformation
implicit val intListTi: TypeInformation[NonEmptyList[Int]] = deriveTypeInformation

def createSerializer[T: TypeInformation] =
implicitly[TypeInformation[T]].createSerializer(ExecutionConfig())

it should "derive for NEL[String]" in {
val ser = deriveTypeInformation[NonEmptyList[String]].createSerializer(null)
val ser = createSerializer[NonEmptyList[String]]
roundtrip(ser, NonEmptyList.one("doo"))
}

it should "derive for NEL[Int]" in {
val ser = deriveTypeInformation[NonEmptyList[Int]].createSerializer(null)
val ser = createSerializer[NonEmptyList[Int]]
roundtrip(ser, NonEmptyList.one(1))
}
}
17 changes: 11 additions & 6 deletions src/test/scala/org/apache/flinkx/api/SchemaEvolutionTest.scala
Original file line number Diff line number Diff line change
Expand Up @@ -3,42 +3,47 @@ package org.apache.flinkx.api
import org.apache.flinkx.api.SchemaEvolutionTest.{Click, ClickEvent, Event, NoArityTest}
import org.apache.flinkx.api.serializers._
import org.apache.flinkx.api.serializer.ScalaCaseClassSerializer
import org.apache.flink.api.common.typeinfo.TypeInformation
import org.apache.flink.core.memory.{DataInputViewStreamWrapper, DataOutputViewStreamWrapper}
import org.apache.flink.api.common.ExecutionConfig
import org.scalatest.flatspec.AnyFlatSpec
import org.scalatest.matchers.should.Matchers

import java.io.ByteArrayOutputStream
import java.nio.file.{Files, Path}

class SchemaEvolutionTest extends AnyFlatSpec with Matchers {
private val eventTypeInfo = deriveTypeInformation[Event]
private val arityTestInfo = deriveTypeInformation[NoArityTest]
private implicit val eventTypeInfo: TypeInformation[Event] = deriveTypeInformation[Event]
private implicit val arityTestInfo: TypeInformation[NoArityTest] = deriveTypeInformation[NoArityTest]
private val clicks =
List(ClickEvent("a", "2021-01-01"), ClickEvent("b", "2021-01-01"), ClickEvent("c", "2021-01-01"))

def createSerializer[T: TypeInformation] =
implicitly[TypeInformation[T]].createSerializer(ExecutionConfig())

ignore should "generate blob for event=click+purchase" in {
val buffer = new ByteArrayOutputStream()
val eventSerializer = eventTypeInfo.createSerializer(null)
val eventSerializer = createSerializer[Event]
eventSerializer.serialize(Click("p1", clicks), new DataOutputViewStreamWrapper(buffer))
Files.write(Path.of("src/test/resources/click.dat"), buffer.toByteArray)
}

it should "decode click when we added view" in {
val buffer = this.getClass.getResourceAsStream("/click.dat")
val click = eventTypeInfo.createSerializer(null).deserialize(new DataInputViewStreamWrapper(buffer))
val click = createSerializer[Event].deserialize(new DataInputViewStreamWrapper(buffer))
click shouldBe Click("p1", clicks)
}

ignore should "generate blob for no arity test" in {
val buffer = new ByteArrayOutputStream()
val eventSerializer = arityTestInfo.createSerializer(null)
val eventSerializer = createSerializer[NoArityTest]
eventSerializer.serialize(NoArityTest(4, 3, List("test")), new DataOutputViewStreamWrapper(buffer))
Files.write(Path.of("src/test/resources/without-arity-test.dat"), buffer.toByteArray)
}

it should "decode class without arity info" in {
val buffer = this.getClass.getResourceAsStream("/without-arity-test.dat")
val serializer = arityTestInfo.createSerializer(null) match {
val serializer = createSerializer[NoArityTest] match {
case s: ScalaCaseClassSerializer[_] => s
case s => fail(s"Derived serializer must be of CaseClassSerializer type, but was $s")
}
Expand Down
27 changes: 16 additions & 11 deletions src/test/scala/org/apache/flinkx/api/SerializerSnapshotTest.scala
Original file line number Diff line number Diff line change
Expand Up @@ -12,65 +12,70 @@ import org.apache.flinkx.api.SerializerSnapshotTest.{
TraitMap
}
import org.apache.flink.api.common.typeutils.TypeSerializer
import org.apache.flink.api.common.ExecutionConfig
import org.apache.flink.core.memory.{DataInputViewStreamWrapper, DataOutputViewStreamWrapper}
import org.scalatest.flatspec.AnyFlatSpec
import org.scalatest.matchers.should.Matchers
import org.apache.flinkx.api.serializers._
import org.apache.flink.api.common.typeinfo.TypeInformation
import org.apache.flink.api.common.ExecutionConfig
import org.apache.flink.util.ChildFirstClassLoader
import org.scalatest.Assertion

import java.net.URLClassLoader

class SerializerSnapshotTest extends AnyFlatSpec with Matchers {

def createSerializer[T: TypeInformation] =
implicitly[TypeInformation[T]].createSerializer(ExecutionConfig())

it should "roundtrip product serializer snapshot" in {
val ser = deriveTypeInformation[SimpleClass1].createSerializer(null)
val ser = createSerializer[SimpleClass1]
assertRoundtripSerializer(ser)
}

it should "roundtrip coproduct serializer snapshot" in {
val ser = deriveTypeInformation[OuterTrait].createSerializer(null)
val ser = createSerializer[OuterTrait]
assertRoundtripSerializer(ser)
}

it should "roundtrip coproduct serializer snapshot with singletons" in {
val ser = deriveTypeInformation[ADT2].createSerializer(null)
val ser = createSerializer[ADT2]
assertRoundtripSerializer(ser)
}

it should "roundtrip serializer snapshot with list of primitives" in {
val ser = deriveTypeInformation[List[Double]].createSerializer(null)
val ser = createSerializer[List[Double]]
assertRoundtripSerializer(ser)
}

it should "roundtrip serializer snapshot with set as array of primitives" in {
val ser = implicitly[TypeInformation[Set[Double]]].createSerializer(null)
val ser = createSerializer[Set[Double]]
assertRoundtripSerializer(ser)
}

it should "do array ser snapshot" in {
val set = deriveTypeInformation[SimpleClassArray].createSerializer(null)
val set = createSerializer[SimpleClassArray]
assertRoundtripSerializer(set)
}

it should "do map ser snapshot" in {
assertRoundtripSerializer(deriveTypeInformation[SimpleClassMap1].createSerializer(null))
assertRoundtripSerializer(deriveTypeInformation[SimpleClassMap2].createSerializer(null))
assertRoundtripSerializer(createSerializer[SimpleClassMap1])
assertRoundtripSerializer(createSerializer[SimpleClassMap2])
}

it should "do list ser snapshot" in {
assertRoundtripSerializer(deriveTypeInformation[SimpleClassList].createSerializer(null))
assertRoundtripSerializer(createSerializer[SimpleClassList])
}

it should "do map ser snapshot adt " in {
implicit val ti: Typeclass[OuterTrait] = deriveTypeInformation[OuterTrait]
drop(ti)
assertRoundtripSerializer(deriveTypeInformation[TraitMap].createSerializer(null))
assertRoundtripSerializer(createSerializer[TraitMap])
}

it should "be compatible after snapshot deserialization in different classloader" in {
val ser = deriveTypeInformation[SimpleClass1].createSerializer(null)
val ser = createSerializer[SimpleClass1]
val cl = newClassLoader(classOf[SimpleClass1])
try {
val restored = roundtripSerializer(ser, cl)
Expand Down
Loading

0 comments on commit 8996992

Please sign in to comment.