From 7321cb458a54033f0ea1cbe108b96d3f29fb4fd4 Mon Sep 17 00:00:00 2001 From: Alexey Novakov Date: Sun, 6 Oct 2024 21:23:08 +0200 Subject: [PATCH] remove some deprecated methods, improve scala style, move Java related classes to flinkx package --- .../ScalaCaseClassSerializerSnapshot.java | 1 - .../ScalaEitherSerializerSnapshot.java | 0 .../ScalaOptionSerializerSnapshot.java | 0 .../apache/flinkx/api/LowPrioImplicits.scala | 2 - .../api/serializer/ConstructorCompat.scala | 2 - .../apache/flinkx/api/AllWindowedStream.scala | 4 +- .../apache/flinkx/api/ClosureCleaner.scala | 56 ++++----- .../apache/flinkx/api/ConnectedStreams.scala | 4 +- .../org/apache/flinkx/api/DataStream.scala | 61 +++------- .../apache/flinkx/api/DataStreamUtils.scala | 7 +- .../org/apache/flinkx/api/JoinedStreams.scala | 4 +- .../org/apache/flinkx/api/KeyedStream.scala | 40 +------ .../apache/flinkx/api/ScalaStreamOps.scala | 8 +- .../api/StreamExecutionEnvironment.scala | 108 +++++------------- .../apache/flinkx/api/WindowedStream.scala | 4 +- .../api/function/RichAllWindowFunction.scala | 2 +- .../api/function/RichWindowFunction.scala | 2 +- .../api/function/StatefulFunction.scala | 4 +- .../api/serializer/CaseClassSerializer.scala | 3 +- .../CollectionSerializerSnapshot.scala | 2 +- 20 files changed, 97 insertions(+), 217 deletions(-) rename modules/scala-api/src/main/java/org/apache/{flink => flinkx}/api/serializer/ScalaCaseClassSerializerSnapshot.java (99%) rename modules/scala-api/src/main/java/org/apache/{flink => flinkx}/api/serializer/ScalaEitherSerializerSnapshot.java (100%) rename modules/scala-api/src/main/java/org/apache/{flink => flinkx}/api/serializer/ScalaOptionSerializerSnapshot.java (100%) diff --git a/modules/scala-api/src/main/java/org/apache/flink/api/serializer/ScalaCaseClassSerializerSnapshot.java b/modules/scala-api/src/main/java/org/apache/flinkx/api/serializer/ScalaCaseClassSerializerSnapshot.java similarity index 99% rename from modules/scala-api/src/main/java/org/apache/flink/api/serializer/ScalaCaseClassSerializerSnapshot.java rename to modules/scala-api/src/main/java/org/apache/flinkx/api/serializer/ScalaCaseClassSerializerSnapshot.java index a53f910..bce444f 100644 --- a/modules/scala-api/src/main/java/org/apache/flink/api/serializer/ScalaCaseClassSerializerSnapshot.java +++ b/modules/scala-api/src/main/java/org/apache/flinkx/api/serializer/ScalaCaseClassSerializerSnapshot.java @@ -27,7 +27,6 @@ import org.apache.flink.util.InstantiationUtil; import java.io.IOException; -import java.util.Objects; import java.util.Optional; import static org.apache.flink.util.Preconditions.checkNotNull; diff --git a/modules/scala-api/src/main/java/org/apache/flink/api/serializer/ScalaEitherSerializerSnapshot.java b/modules/scala-api/src/main/java/org/apache/flinkx/api/serializer/ScalaEitherSerializerSnapshot.java similarity index 100% rename from modules/scala-api/src/main/java/org/apache/flink/api/serializer/ScalaEitherSerializerSnapshot.java rename to modules/scala-api/src/main/java/org/apache/flinkx/api/serializer/ScalaEitherSerializerSnapshot.java diff --git a/modules/scala-api/src/main/java/org/apache/flink/api/serializer/ScalaOptionSerializerSnapshot.java b/modules/scala-api/src/main/java/org/apache/flinkx/api/serializer/ScalaOptionSerializerSnapshot.java similarity index 100% rename from modules/scala-api/src/main/java/org/apache/flink/api/serializer/ScalaOptionSerializerSnapshot.java rename to modules/scala-api/src/main/java/org/apache/flinkx/api/serializer/ScalaOptionSerializerSnapshot.java diff --git a/modules/scala-api/src/main/scala-2/org/apache/flinkx/api/LowPrioImplicits.scala b/modules/scala-api/src/main/scala-2/org/apache/flinkx/api/LowPrioImplicits.scala index c4db5bf..227dae1 100644 --- a/modules/scala-api/src/main/scala-2/org/apache/flinkx/api/LowPrioImplicits.scala +++ b/modules/scala-api/src/main/scala-2/org/apache/flinkx/api/LowPrioImplicits.scala @@ -6,12 +6,10 @@ import magnolia1.{CaseClass, Magnolia, SealedTrait} import org.apache.flink.api.common.ExecutionConfig import org.apache.flink.api.common.typeinfo.TypeInformation -import scala.annotation.tailrec import scala.collection.mutable import scala.language.experimental.macros import scala.reflect._ import scala.reflect.runtime.universe.{Try => _, _} -import scala.util.{Failure, Success, Try} private[api] trait LowPrioImplicits { type Typeclass[T] = TypeInformation[T] diff --git a/modules/scala-api/src/main/scala-2/org/apache/flinkx/api/serializer/ConstructorCompat.scala b/modules/scala-api/src/main/scala-2/org/apache/flinkx/api/serializer/ConstructorCompat.scala index bf571e7..d0b8ba0 100644 --- a/modules/scala-api/src/main/scala-2/org/apache/flinkx/api/serializer/ConstructorCompat.scala +++ b/modules/scala-api/src/main/scala-2/org/apache/flinkx/api/serializer/ConstructorCompat.scala @@ -5,8 +5,6 @@ import scala.reflect.runtime.{currentMirror => cm} import scala.reflect.runtime.universe import scala.reflect.runtime.universe._ -import org.apache.flinkx.api.serializers.drop - private[serializer] trait ConstructorCompat { @nowarn("msg=(eliminated by erasure)|(explicit array)") diff --git a/modules/scala-api/src/main/scala/org/apache/flinkx/api/AllWindowedStream.scala b/modules/scala-api/src/main/scala/org/apache/flinkx/api/AllWindowedStream.scala index 74d1fbe..15bb073 100644 --- a/modules/scala-api/src/main/scala/org/apache/flinkx/api/AllWindowedStream.scala +++ b/modules/scala-api/src/main/scala/org/apache/flinkx/api/AllWindowedStream.scala @@ -484,7 +484,7 @@ class AllWindowedStream[T, W <: Window](javaStream: JavaAllWStream[T, W]) { def minBy(field: String): DataStream[T] = aggregate(AggregationType.MINBY, field) private def aggregate(aggregationType: AggregationType, field: String): DataStream[T] = { - val position = fieldNames2Indices(getInputType(), Array(field))(0) + val position = fieldNames2Indices(getInputType, Array(field))(0) aggregate(aggregationType, position) } @@ -522,5 +522,5 @@ class AllWindowedStream[T, W <: Window](javaStream: JavaAllWStream[T, W]) { /** Gets the output type. */ - private def getInputType(): TypeInformation[T] = javaStream.getInputType + private def getInputType: TypeInformation[T] = javaStream.getInputType } diff --git a/modules/scala-api/src/main/scala/org/apache/flinkx/api/ClosureCleaner.scala b/modules/scala-api/src/main/scala/org/apache/flinkx/api/ClosureCleaner.scala index 31cdf26..2426eae 100644 --- a/modules/scala-api/src/main/scala/org/apache/flinkx/api/ClosureCleaner.scala +++ b/modules/scala-api/src/main/scala/org/apache/flinkx/api/ClosureCleaner.scala @@ -22,14 +22,14 @@ import org.apache.commons.io.IOUtils import java.lang.invoke.{MethodHandleInfo, SerializedLambda} import java.lang.reflect.{Field, Modifier} import java.io.{ByteArrayInputStream, ByteArrayOutputStream, ObjectOutputStream} -import scala.collection.mutable.{Map, Set, Stack} -import org.apache.commons.lang3.{ClassUtils, JavaVersion, SystemUtils} +import org.apache.commons.lang3.ClassUtils import org.apache.flink.shaded.asm9.org.objectweb.asm.{ClassReader, ClassVisitor, Handle, MethodVisitor, Type} import org.apache.flink.shaded.asm9.org.objectweb.asm.Opcodes._ import org.apache.flink.shaded.asm9.org.objectweb.asm.tree.{ClassNode, MethodNode} -import org.apache.flink.util.{FlinkException, InstantiationUtil} +import org.apache.flink.util.FlinkException import org.slf4j.LoggerFactory +import scala.collection.mutable import scala.jdk.CollectionConverters._ /** A cleaner that renders closures serializable if they can be done so safely. @@ -82,12 +82,12 @@ object ClosureCleaner { /** Return a list of classes that represent closures enclosed in the given closure object. */ private def getInnerClosureClasses(obj: AnyRef): List[Class[_]] = { - val seen = Set[Class[_]](obj.getClass) - val stack = Stack[Class[_]](obj.getClass) - while (!stack.isEmpty) { + val seen = mutable.Set[Class[_]](obj.getClass) + val stack = mutable.Stack[Class[_]](obj.getClass) + while (stack.nonEmpty) { val cr = getClassReader(stack.pop()) if (cr != null) { - val set = Set.empty[Class[_]] + val set = mutable.Set.empty[Class[_]] cr.accept(new InnerClosureFinder(set), 0) for (cls <- set -- seen) { seen += cls @@ -99,14 +99,14 @@ object ClosureCleaner { } /** Initializes the accessed fields for outer classes and their super classes. */ - private def initAccessedFields(accessedFields: Map[Class[_], Set[String]], outerClasses: Seq[Class[_]]): Unit = { + private def initAccessedFields(accessedFields: mutable.Map[Class[_], mutable.Set[String]], outerClasses: Seq[Class[_]]): Unit = { for (cls <- outerClasses) { var currentClass = cls assert(currentClass != null, "The outer class can't be null.") while (currentClass != null) { - accessedFields(currentClass) = Set.empty[String] - currentClass = currentClass.getSuperclass() + accessedFields(currentClass) = mutable.Set.empty[String] + currentClass = currentClass.getSuperclass } } } @@ -116,7 +116,7 @@ object ClosureCleaner { outerClass: Class[_], clone: AnyRef, obj: AnyRef, - accessedFields: Map[Class[_], Set[String]] + accessedFields: mutable.Map[Class[_], mutable.Set[String]] ): Unit = { for (fieldName <- accessedFields(outerClass)) { val field = outerClass.getDeclaredField(fieldName) @@ -131,7 +131,7 @@ object ClosureCleaner { parent: AnyRef, obj: AnyRef, outerClass: Class[_], - accessedFields: Map[Class[_], Set[String]] + accessedFields: mutable.Map[Class[_], mutable.Set[String]] ): AnyRef = { val clone = instantiateClass(outerClass, parent) @@ -140,7 +140,7 @@ object ClosureCleaner { while (currentClass != null) { setAccessedFields(currentClass, clone, obj, accessedFields) - currentClass = currentClass.getSuperclass() + currentClass = currentClass.getSuperclass } clone @@ -159,7 +159,7 @@ object ClosureCleaner { * whether to clean enclosing closures transitively */ def clean(closure: AnyRef, checkSerializable: Boolean = true, cleanTransitively: Boolean = true): Unit = { - clean(closure, checkSerializable, cleanTransitively, Map.empty) + clean(closure, checkSerializable, cleanTransitively, mutable.Map.empty) } def scalaClean[T <: AnyRef](fun: T, checkSerializable: Boolean = true, cleanTransitively: Boolean = true): T = { @@ -204,7 +204,7 @@ object ClosureCleaner { func: AnyRef, checkSerializable: Boolean, cleanTransitively: Boolean, - accessedFields: Map[Class[_], Set[String]] + accessedFields: mutable.Map[Class[_], mutable.Set[String]] ): Unit = { // indylambda check. Most likely to be the case with 2.12, 2.13 @@ -447,7 +447,7 @@ object ClosureCleaner { private def instantiateClass(cls: Class[_], enclosingObject: AnyRef): AnyRef = { // Use reflection to instantiate object without calling constructor - val rf = sun.reflect.ReflectionFactory.getReflectionFactory() + val rf = sun.reflect.ReflectionFactory.getReflectionFactory val parentCtor = classOf[Object].getDeclaredConstructor() val newCtor = rf.newConstructorForSerialization(cls, parentCtor) val obj = newCtor.newInstance().asInstanceOf[AnyRef] @@ -627,14 +627,14 @@ object IndylambdaScalaClosures { def findAccessedFields( lambdaProxy: SerializedLambda, lambdaClassLoader: ClassLoader, - accessedFields: Map[Class[_], Set[String]], + accessedFields: mutable.Map[Class[_], mutable.Set[String]], findTransitively: Boolean ): Unit = { // We may need to visit the same class multiple times for different methods on it, and we'll // need to lookup by name. So we use ASM's Tree API and cache the ClassNode/MethodNode. - val classInfoByInternalName = Map.empty[String, (Class[_], ClassNode)] - val methodNodeById = Map.empty[MethodIdentifier[_], MethodNode] + val classInfoByInternalName = mutable.Map.empty[String, (Class[_], ClassNode)] + val methodNodeById = mutable.Map.empty[MethodIdentifier[_], MethodNode] def getOrUpdateClassInfo(classInternalName: String): (Class[_], ClassNode) = { val classInfo = classInfoByInternalName.getOrElseUpdate( classInternalName, { @@ -672,19 +672,19 @@ object IndylambdaScalaClosures { // inner closure // we need to track calls from "inner closure" to outer classes relative to it (class T, A, B) // to better find and track field accesses. - val trackedClassInternalNames = Set[String](implClassInternalName) + val trackedClassInternalNames = mutable.Set[String](implClassInternalName) // Depth-first search for inner closures and track the fields that were accessed in them. // Start from the lambda body's implementation method, follow method invocations - val visited = Set.empty[MethodIdentifier[_]] - val stack = Stack[MethodIdentifier[_]](implMethodId) + val visited = mutable.Set.empty[MethodIdentifier[_]] + val stack = mutable.Stack[MethodIdentifier[_]](implMethodId) def pushIfNotVisited(methodId: MethodIdentifier[_]): Unit = { if (!visited.contains(methodId)) { stack.push(methodId) } } - while (!stack.isEmpty) { + while (stack.nonEmpty) { val currentId = stack.pop() visited += currentId @@ -816,10 +816,10 @@ private case class MethodIdentifier[T](cls: Class[T], name: String, desc: String * a set of visited methods to avoid cycles */ private class FieldAccessFinder( - fields: Map[Class[_], Set[String]], + fields: mutable.Map[Class[_], mutable.Set[String]], findTransitively: Boolean, specificMethod: Option[MethodIdentifier[_]] = None, - visitedMethods: Set[MethodIdentifier[_]] = Set.empty + visitedMethods: mutable.Set[MethodIdentifier[_]] = mutable.Set.empty ) extends ClassVisitor(ASM9) { override def visitMethod( @@ -868,7 +868,7 @@ private class FieldAccessFinder( ClosureCleaner .getClassReader(currentClass) .accept(new FieldAccessFinder(fields, findTransitively, Some(m), visitedMethods), 0) - currentClass = currentClass.getSuperclass() + currentClass = currentClass.getSuperclass } } } @@ -878,7 +878,7 @@ private class FieldAccessFinder( } } -private class InnerClosureFinder(output: Set[Class[_]]) extends ClassVisitor(ASM9) { +private class InnerClosureFinder(output: mutable.Set[Class[_]]) extends ClassVisitor(ASM9) { var myName: String = null // TODO: Recursively find inner closures that we indirectly reference, e.g. @@ -908,7 +908,7 @@ private class InnerClosureFinder(output: Set[Class[_]]) extends ClassVisitor(ASM override def visitMethodInsn(op: Int, owner: String, name: String, desc: String, itf: Boolean): Unit = { val argTypes = Type.getArgumentTypes(desc) if ( - op == INVOKESPECIAL && name == "" && argTypes.length > 0 + op == INVOKESPECIAL && name == "" && argTypes.nonEmpty && argTypes(0).toString.startsWith("L") // is it an object? && argTypes(0).getInternalName == myName ) { diff --git a/modules/scala-api/src/main/scala/org/apache/flinkx/api/ConnectedStreams.scala b/modules/scala-api/src/main/scala/org/apache/flinkx/api/ConnectedStreams.scala index 5f9c4a5..4e49346 100644 --- a/modules/scala-api/src/main/scala/org/apache/flinkx/api/ConnectedStreams.scala +++ b/modules/scala-api/src/main/scala/org/apache/flinkx/api/ConnectedStreams.scala @@ -198,8 +198,8 @@ class ConnectedStreams[IN1, IN2](javaStream: JavaCStream[IN1, IN2]) { val cleanFun2 = clean(fun2) val flatMapper = new CoFlatMapFunction[IN1, IN2, R] { - def flatMap1(value: IN1, out: Collector[R]) = { cleanFun1(value).foreach(out.collect _) } - def flatMap2(value: IN2, out: Collector[R]) = { cleanFun2(value).foreach(out.collect _) } + def flatMap1(value: IN1, out: Collector[R]): Unit = { cleanFun1(value).foreach(out.collect) } + def flatMap2(value: IN2, out: Collector[R]): Unit = { cleanFun2(value).foreach(out.collect) } } flatMap(flatMapper) diff --git a/modules/scala-api/src/main/scala/org/apache/flinkx/api/DataStream.scala b/modules/scala-api/src/main/scala/org/apache/flinkx/api/DataStream.scala index 1107a8f..52e3c30 100644 --- a/modules/scala-api/src/main/scala/org/apache/flinkx/api/DataStream.scala +++ b/modules/scala-api/src/main/scala/org/apache/flinkx/api/DataStream.scala @@ -53,30 +53,12 @@ class DataStream[T](stream: JavaStream[T]) { */ @deprecated @PublicEvolving - def getType(): TypeInformation[T] = stream.getType() - - /** Returns the parallelism of this operation. - * - * @deprecated - * Use [[parallelism]] instead. - */ - @deprecated - @PublicEvolving - def getParallelism = stream.getParallelism - - /** Returns the execution config. - * - * @deprecated - * Use [[executionConfig]] instead. - */ - @deprecated - @PublicEvolving - def getExecutionConfig = stream.getExecutionConfig + def getType: TypeInformation[T] = stream.getType /** Returns the ID of the DataStream. */ @Internal - private[flinkx] def getId = stream.getId() + private[flinkx] def getId = stream.getId // -------------------------------------------------------------------------- // Scalaesk accessors @@ -88,20 +70,20 @@ class DataStream[T](stream: JavaStream[T]) { /** Returns the TypeInformation for the elements of this DataStream. */ - def dataType: TypeInformation[T] = stream.getType() + def dataType: TypeInformation[T] = stream.getType /** Returns the execution config. */ - def executionConfig: ExecutionConfig = stream.getExecutionConfig() + def executionConfig: ExecutionConfig = stream.getExecutionConfig /** Returns the [[StreamExecutionEnvironment]] associated with this data stream */ def executionEnvironment: StreamExecutionEnvironment = - new StreamExecutionEnvironment(stream.getExecutionEnvironment()) + new StreamExecutionEnvironment(stream.getExecutionEnvironment) /** Returns the parallelism of this operation. */ - def parallelism: Int = stream.getParallelism() + def parallelism: Int = stream.getParallelism /** Sets the parallelism of this operation. This must be at least 1. */ @@ -130,23 +112,12 @@ class DataStream[T](stream: JavaStream[T]) { /** Returns the minimum resources of this operation. */ @PublicEvolving - def minResources: ResourceSpec = stream.getMinResources() + def minResources: ResourceSpec = stream.getMinResources /** Returns the preferred resources of this operation. */ @PublicEvolving - def preferredResources: ResourceSpec = stream.getPreferredResources() - - /** Gets the name of the current data stream. This name is used by the visualization and logging during runtime. - * - * @return - * Name of the stream. - * @deprecated - * Use [[name]] instead - */ - @deprecated - @PublicEvolving - def getName: String = name + def preferredResources: ResourceSpec = stream.getPreferredResources /** Gets the name of the current data stream. This name is used by the visualization and logging during runtime. * @@ -365,7 +336,7 @@ class DataStream[T](stream: JavaStream[T]) { val keyType: TypeInformation[K] = implicitly[TypeInformation[K]] val keyExtractor = new KeySelector[T, K] with ResultTypeQueryable[K] { - def getKey(in: T) = cleanFun(in) + def getKey(in: T): K = cleanFun(in) override def getProducedType: TypeInformation[K] = keyType } asScalaStream(new JavaKeyedStream(stream, keyExtractor, keyType)) @@ -411,8 +382,8 @@ class DataStream[T](stream: JavaStream[T]) { val cleanFun = clean(fun) val keyExtractor = new KeySelector[T, K] with ResultTypeQueryable[K] { - def getKey(in: T) = cleanFun(in) - override def getProducedType(): TypeInformation[K] = keyType + def getKey(in: T): K = cleanFun(in) + override def getProducedType: TypeInformation[K] = keyType } asScalaStream(stream.partitionCustom(partitioner, keyExtractor)) @@ -582,7 +553,7 @@ class DataStream[T](stream: JavaStream[T]) { } val cleanFun = clean(fun) val flatMapper = new FlatMapFunction[T, R] { - def flatMap(in: T, out: Collector[R]) = { cleanFun(in, out) } + def flatMap(in: T, out: Collector[R]): Unit = { cleanFun(in, out) } } flatMap(flatMapper) } @@ -595,7 +566,7 @@ class DataStream[T](stream: JavaStream[T]) { } val cleanFun = clean(fun) val flatMapper = new FlatMapFunction[T, R] { - def flatMap(in: T, out: Collector[R]) = { cleanFun(in).foreach(out.collect _) } + def flatMap(in: T, out: Collector[R]): Unit = { cleanFun(in).foreach(out.collect) } } flatMap(flatMapper) } @@ -634,7 +605,7 @@ class DataStream[T](stream: JavaStream[T]) { } val cleanFun = clean(fun) val filterFun = new FilterFunction[T] { - def filter(in: T) = cleanFun(in) + def filter(in: T): Boolean = cleanFun(in) } filter(filterFun) } @@ -757,7 +728,7 @@ class DataStream[T](stream: JavaStream[T]) { * The closed DataStream. */ @PublicEvolving - def printToErr() = stream.printToErr() + def printToErr(): DataStreamSink[T] = stream.printToErr() /** Writes a DataStream to the standard output stream (stdout). For each element of the DataStream the result of * [[AnyRef.toString()]] is written. @@ -780,7 +751,7 @@ class DataStream[T](stream: JavaStream[T]) { * The closed DataStream. */ @PublicEvolving - def printToErr(sinkIdentifier: String) = stream.printToErr(sinkIdentifier) + def printToErr(sinkIdentifier: String): DataStreamSink[T] = stream.printToErr(sinkIdentifier) /** Writes a DataStream using the given [[OutputFormat]]. */ diff --git a/modules/scala-api/src/main/scala/org/apache/flinkx/api/DataStreamUtils.scala b/modules/scala-api/src/main/scala/org/apache/flinkx/api/DataStreamUtils.scala index acd6f4c..7ca470c 100644 --- a/modules/scala-api/src/main/scala/org/apache/flinkx/api/DataStreamUtils.scala +++ b/modules/scala-api/src/main/scala/org/apache/flinkx/api/DataStreamUtils.scala @@ -24,12 +24,9 @@ class DataStreamUtils[T: TypeInformation: ClassTag](val self: DataStream[T]) { * @return * The iterator * - * @deprecated - * Replaced with [[DataStream#executeAndCollect]]. */ - def collect(): Iterator[T] = { - JavaStreamUtils.collect(self.javaStream).asScala - } + def collect(): Iterator[T] = + self.javaStream.executeAndCollect().asScala /** Reinterprets the given [[DataStream]] as a [[KeyedStream]], which extracts keys with the given [[KeySelector]]. * diff --git a/modules/scala-api/src/main/scala/org/apache/flinkx/api/JoinedStreams.scala b/modules/scala-api/src/main/scala/org/apache/flinkx/api/JoinedStreams.scala index c9e9c15..28d0bc5 100644 --- a/modules/scala-api/src/main/scala/org/apache/flinkx/api/JoinedStreams.scala +++ b/modules/scala-api/src/main/scala/org/apache/flinkx/api/JoinedStreams.scala @@ -49,7 +49,7 @@ class JoinedStreams[T1, T2](input1: DataStream[T1], input2: DataStream[T2]) { val cleanFun = clean(keySelector) val keyType = implicitly[TypeInformation[KEY]] val javaSelector = new KeySelector[T1, KEY] with ResultTypeQueryable[KEY] { - def getKey(in: T1) = cleanFun(in) + def getKey(in: T1): KEY = cleanFun(in) override def getProducedType: TypeInformation[KEY] = keyType } new Where[KEY](javaSelector, keyType) @@ -71,7 +71,7 @@ class JoinedStreams[T1, T2](input1: DataStream[T1], input2: DataStream[T2]) { val cleanFun = clean(keySelector) val localKeyType = keyType val javaSelector = new KeySelector[T2, KEY] with ResultTypeQueryable[KEY] { - def getKey(in: T2) = cleanFun(in) + def getKey(in: T2): KEY = cleanFun(in) override def getProducedType: TypeInformation[KEY] = localKeyType } new EqualTo(javaSelector) diff --git a/modules/scala-api/src/main/scala/org/apache/flinkx/api/KeyedStream.scala b/modules/scala-api/src/main/scala/org/apache/flinkx/api/KeyedStream.scala index 0ce941b..2780e66 100644 --- a/modules/scala-api/src/main/scala/org/apache/flinkx/api/KeyedStream.scala +++ b/modules/scala-api/src/main/scala/org/apache/flinkx/api/KeyedStream.scala @@ -32,7 +32,7 @@ class KeyedStream[T, K](javaStream: KeyedJavaStream[T, K]) extends DataStream[T] /** Gets the type of the key by which this stream is keyed. */ @Internal - def getKeyType = javaStream.getKeyType() + private def getKeyType = javaStream.getKeyType // ------------------------------------------------------------------------ // basic transformations @@ -204,42 +204,6 @@ class KeyedStream[T, K](javaStream: KeyedJavaStream[T, K]) extends DataStream[T] // Windowing // ------------------------------------------------------------------------ - /** Windows this [[KeyedStream]] into tumbling time windows. - * - * This is a shortcut for either `.window(TumblingEventTimeWindows.of(size))` or - * `.window(TumblingProcessingTimeWindows.of(size))` depending on the time characteristic set using - * [[StreamExecutionEnvironment.setStreamTimeCharacteristic()]] - * - * @param size - * The size of the window. - * - * @deprecated - * Please use [[window()]] with either [[TumblingEventTimeWindows]] or [[TumblingProcessingTimeWindows]]. For more - * information, see the deprecation notice on [[org.apache.flink.streaming.api.TimeCharacteristic]]. - */ - @deprecated - def timeWindow(size: Time): WindowedStream[T, K, TimeWindow] = { - new WindowedStream(javaStream.timeWindow(size)) - } - - /** Windows this [[KeyedStream]] into sliding time windows. - * - * This is a shortcut for either `.window(SlidingEventTimeWindows.of(size))` or - * `.window(SlidingProcessingTimeWindows.of(size))` depending on the time characteristic set using - * [[StreamExecutionEnvironment.setStreamTimeCharacteristic()]] - * - * @param size - * The size of the window. - * - * @deprecated - * Please use [[window()]] with either [[SlidingEventTimeWindows]] or [[SlidingProcessingTimeWindows]]. For more - * information, see the deprecation notice on [[org.apache.flink.streaming.api.TimeCharacteristic]]. - */ - @deprecated - def timeWindow(size: Time, slide: Time): WindowedStream[T, K, TimeWindow] = { - new WindowedStream(javaStream.timeWindow(size, slide)) - } - /** Windows this [[KeyedStream]] into sliding count windows. * * @param size @@ -511,7 +475,7 @@ class KeyedStream[T, K](javaStream: KeyedJavaStream[T, K]) extends DataStream[T] override val stateSerializer: TypeSerializer[S] = serializer override def flatMap(in: T, out: Collector[R]): Unit = { - applyWithState(in, cleanFun) foreach out.collect + applyWithState(in, cleanFun).foreach(out.collect) } } diff --git a/modules/scala-api/src/main/scala/org/apache/flinkx/api/ScalaStreamOps.scala b/modules/scala-api/src/main/scala/org/apache/flinkx/api/ScalaStreamOps.scala index b7449dd..e44105c 100644 --- a/modules/scala-api/src/main/scala/org/apache/flinkx/api/ScalaStreamOps.scala +++ b/modules/scala-api/src/main/scala/org/apache/flinkx/api/ScalaStreamOps.scala @@ -12,21 +12,21 @@ import language.experimental.macros object ScalaStreamOps { - /** Converts an [[org.apache.flink.streaming.api.datastream.DataStream]] to a [[org.apache.flink.api.DataStream]]. + /** Converts an [[org.apache.flink.streaming.api.datastream.DataStream]] to a [[org.apache.flinkx.api.DataStream]]. */ def asScalaStream[R](stream: JavaStream[R]) = new DataStream[R](stream) - /** Converts an [[org.apache.flink.streaming.api.datastream.KeyedStream]] to a [[org.apache.flink.api.KeyedStream]]. + /** Converts an [[org.apache.flink.streaming.api.datastream.KeyedStream]] to a [[org.apache.flinkx.api.KeyedStream]]. */ def asScalaStream[R, K](stream: KeyedJavaStream[R, K]) = new KeyedStream[R, K](stream) /** Converts an [[org.apache.flink.streaming.api.datastream.ConnectedStreams]] to a - * [[org.apache.flink.api.ConnectedStreams]]. + * [[org.apache.flinkx.api.ConnectedStreams]]. */ def asScalaStream[IN1, IN2](stream: ConnectedJavaStreams[IN1, IN2]) = new ConnectedStreams[IN1, IN2](stream) /** Converts an [[org.apache.flink.streaming.api.datastream.BroadcastConnectedStream]] to a - * [[org.apache.flink.api.BroadcastConnectedStream]]. + * [[org.apache.flinkx.api.BroadcastConnectedStream]]. */ def asScalaStream[IN1, IN2](stream: BroadcastConnectedJavaStreams[IN1, IN2]) = new BroadcastConnectedStream[IN1, IN2](stream) diff --git a/modules/scala-api/src/main/scala/org/apache/flinkx/api/StreamExecutionEnvironment.scala b/modules/scala-api/src/main/scala/org/apache/flinkx/api/StreamExecutionEnvironment.scala index ac17828..9964366 100644 --- a/modules/scala-api/src/main/scala/org/apache/flinkx/api/StreamExecutionEnvironment.scala +++ b/modules/scala-api/src/main/scala/org/apache/flinkx/api/StreamExecutionEnvironment.scala @@ -17,35 +17,37 @@ */ package org.apache.flinkx.api +import com.esotericsoftware.kryo.Serializer import org.apache.flink.annotation.{Experimental, Internal, Public, PublicEvolving} -import org.apache.flink.api.common.{ExecutionConfig, RuntimeExecutionMode} import org.apache.flink.api.common.ExecutionConfig.ClosureCleanerLevel +import org.apache.flink.api.common.cache.DistributedCache import org.apache.flink.api.common.eventtime.WatermarkStrategy import org.apache.flink.api.common.io.{FileInputFormat, FilePathFilter, InputFormat} import org.apache.flink.api.common.operators.SlotSharingGroup import org.apache.flink.api.common.restartstrategy.RestartStrategies.RestartStrategyConfiguration import org.apache.flink.api.common.typeinfo.TypeInformation -import org.apache.flink.api.connector.source.{Source, SourceSplit} +import org.apache.flink.api.common.{ExecutionConfig, JobExecutionResult, RuntimeExecutionMode} import org.apache.flink.api.connector.source.lib.NumberSequenceSource +import org.apache.flink.api.connector.source.{Source, SourceSplit} +import org.apache.flink.api.java.tuple import org.apache.flink.api.java.typeutils.ResultTypeQueryable import org.apache.flink.api.java.typeutils.runtime.kryo.KryoSerializer import org.apache.flink.configuration.{Configuration, ReadableConfig} import org.apache.flink.core.execution.{JobClient, JobListener} import org.apache.flink.core.fs.Path import org.apache.flink.runtime.state.StateBackend -import org.apache.flink.streaming.api.{CheckpointingMode, TimeCharacteristic} +import org.apache.flink.streaming.api.CheckpointingMode import org.apache.flink.streaming.api.environment.{CheckpointConfig, StreamExecutionEnvironment => JavaEnv} -import org.apache.flink.streaming.api.functions.source._ import org.apache.flink.streaming.api.functions.source.SourceFunction.SourceContext +import org.apache.flink.streaming.api.functions.source._ import org.apache.flink.streaming.api.graph.StreamGraph import org.apache.flink.util.{SplittableIterator, TernaryBoolean} -import ScalaStreamOps._ - -import scala.jdk.CollectionConverters._ -import language.implicitConversions -import com.esotericsoftware.kryo.Serializer +import org.apache.flinkx.api.ScalaStreamOps._ import java.net.URI +import java.util +import scala.jdk.CollectionConverters._ +import scala.language.implicitConversions @Public class StreamExecutionEnvironment(javaEnv: JavaEnv) { @@ -57,11 +59,11 @@ class StreamExecutionEnvironment(javaEnv: JavaEnv) { def getConfig = javaEnv.getConfig /** Gets cache files. */ - def getCachedFiles = javaEnv.getCachedFiles + def getCachedFiles: util.List[tuple.Tuple2[String, DistributedCache.DistributedCacheEntry]] = javaEnv.getCachedFiles /** Gets the config JobListeners. */ @PublicEvolving - def getJobListeners = javaEnv.getJobListeners + def getJobListeners: util.List[JobListener] = javaEnv.getJobListeners /** Sets the parallelism for operations executed through this environment. Setting a parallelism of x here will cause * all operators (such as join, map, reduce) to run with x parallel instances. This value can be overridden by @@ -122,7 +124,7 @@ class StreamExecutionEnvironment(javaEnv: JavaEnv) { * The maximum degree of parallelism specifies the upper limit for dynamic scaling. It also defines the number of key * groups used for partitioned state. */ - def getMaxParallelism = javaEnv.getMaxParallelism + def getMaxParallelism: Int = javaEnv.getMaxParallelism /** Sets the maximum time frequency (milliseconds) for the flushing of the output buffers. By default the output * buffers flush frequently to provide low latency and to aid smooth developer experience. Setting the parameter can @@ -138,7 +140,7 @@ class StreamExecutionEnvironment(javaEnv: JavaEnv) { } /** Gets the default buffer timeout set for this environment */ - def getBufferTimeout = javaEnv.getBufferTimeout + def getBufferTimeout: Long = javaEnv.getBufferTimeout /** Disables operator chaining for streaming operators. Operator chaining allows non-shuffle operations to be * co-located in the same thread fully avoiding serialization and de-serialization. @@ -155,7 +157,7 @@ class StreamExecutionEnvironment(javaEnv: JavaEnv) { /** Gets the checkpoint config, which defines values like checkpoint interval, delay between checkpoints, etc. */ - def getCheckpointConfig = javaEnv.getCheckpointConfig() + def getCheckpointConfig: CheckpointConfig = javaEnv.getCheckpointConfig /** Enables checkpointing for the streaming job. The distributed state of the streaming dataflow will be periodically * snapshotted. In case of a failure, the streaming dataflow will be restarted from the latest completed checkpoint. @@ -194,13 +196,13 @@ class StreamExecutionEnvironment(javaEnv: JavaEnv) { enableCheckpointing(interval, CheckpointingMode.EXACTLY_ONCE) } - def getCheckpointingMode = javaEnv.getCheckpointingMode() + def getCheckpointingMode = javaEnv.getCheckpointingMode /** Sets the state backend that describes how to store operator. It defines the data structures that hold state during * execution (for example hash tables, RocksDB, or other data stores). * * State managed by the state backend includes both keyed state that is accessible on - * [[org.apache.flink.api.KeyedStream]], as well as state maintained directly by the user code that implements + * [[org.apache.flinkx.api.KeyedStream]], as well as state maintained directly by the user code that implements * [[org.apache.flink.streaming.api.checkpoint.CheckpointedFunction]]. * * The [[org.apache.flink.runtime.state.hashmap.HashMapStateBackend]] maintains state in heap memory, as objects. It @@ -226,7 +228,7 @@ class StreamExecutionEnvironment(javaEnv: JavaEnv) { /** Returns the state backend that defines how to store and checkpoint state. */ @PublicEvolving - def getStateBackend: StateBackend = javaEnv.getStateBackend() + def getStateBackend: StateBackend = javaEnv.getStateBackend /** Enable the change log for current state backend. This change log allows operators to persist state changes in a * very fine-grained manner. Currently, the change log only applies to keyed state, so non-keyed operator state and @@ -339,31 +341,8 @@ class StreamExecutionEnvironment(javaEnv: JavaEnv) { * The restart strategy configuration to be used */ @PublicEvolving - def getRestartStrategy: RestartStrategyConfiguration = { - javaEnv.getRestartStrategy() - } - - /** Sets the number of times that failed tasks are re-executed. A value of zero effectively disables fault tolerance. - * A value of "-1" indicates that the system default value (as defined in the configuration) should be used. - * - * @deprecated - * This method will be replaced by [[setRestartStrategy()]]. The FixedDelayRestartStrategyConfiguration contains - * the number of execution retries. - */ - @PublicEvolving - def setNumberOfExecutionRetries(numRetries: Int): Unit = { - javaEnv.setNumberOfExecutionRetries(numRetries) - } - - /** Gets the number of times the system will try to re-execute failed tasks. A value of "-1" indicates that the system - * default value (as defined in the configuration) should be used. - * - * @deprecated - * This method will be replaced by [[getRestartStrategy]]. The FixedDelayRestartStrategyConfiguration contains the - * number of execution retries. - */ - @PublicEvolving - def getNumberOfExecutionRetries = javaEnv.getNumberOfExecutionRetries + def getRestartStrategy: RestartStrategyConfiguration = + javaEnv.getRestartStrategy // -------------------------------------------------------------------------------------------- // Registry for types and serializers @@ -387,7 +366,7 @@ class StreamExecutionEnvironment(javaEnv: JavaEnv) { * @param serializerClass * The class of the serializer to use. */ - def addDefaultKryoSerializer(`type`: Class[_], serializerClass: Class[_ <: Serializer[_]]) = { + def addDefaultKryoSerializer(`type`: Class[_], serializerClass: Class[_ <: Serializer[_]]): Unit = { javaEnv.addDefaultKryoSerializer(`type`, serializerClass) } @@ -401,7 +380,7 @@ class StreamExecutionEnvironment(javaEnv: JavaEnv) { } /** Registers the given type with the serializer at the [[KryoSerializer]]. */ - def registerTypeWithKryoSerializer(clazz: Class[_], serializer: Class[_ <: Serializer[_]]) = { + def registerTypeWithKryoSerializer(clazz: Class[_], serializer: Class[_ <: Serializer[_]]): Unit = { javaEnv.registerTypeWithKryoSerializer(clazz, serializer) } @@ -409,24 +388,10 @@ class StreamExecutionEnvironment(javaEnv: JavaEnv) { * type is registered with the POJO serializer. If the type ends up being serialized with Kryo, then it will be * registered at Kryo to make sure that only tags are written. */ - def registerType(typeClass: Class[_]) = { + def registerType(typeClass: Class[_]): Unit = { javaEnv.registerType(typeClass) } - // -------------------------------------------------------------------------------------------- - // Time characteristic - // -------------------------------------------------------------------------------------------- - - /** Gets the time characteristic/ - * - * @see - * #setStreamTimeCharacteristic - * @return - * The time characteristic. - */ - @PublicEvolving - def getStreamTimeCharacteristic = javaEnv.getStreamTimeCharacteristic() - /** Sets all relevant options contained in the [[ReadableConfig]] such as e.g. * [[org.apache.flink.streaming.api.environment.StreamPipelineOptions#TIME_CHARACTERISTIC]]. It will reconfigure * [[StreamExecutionEnvironment]], [[org.apache.flink.api.common.ExecutionConfig]] and @@ -464,17 +429,6 @@ class StreamExecutionEnvironment(javaEnv: JavaEnv) { // -------------------------------------------------------------------------------------------- // Data stream creations // -------------------------------------------------------------------------------------------- - /** Creates a new DataStream that contains a sequence of numbers. This source is a parallel source. If you manually - * set the parallelism to `1` the emitted elements are in order. - * - * @deprecated - * Use [[fromSequence(long, long)]] instead to create a new data stream that contains [[NumberSequenceSource]]. - */ - @deprecated - def generateSequence(from: Long, to: Long): DataStream[Long] = { - new DataStream[java.lang.Long](javaEnv.generateSequence(from, to)) - .asInstanceOf[DataStream[Long]] - } /** Creates a new data stream that contains a sequence of numbers (longs) and is useful for testing and for cases that * just need a stream of N events of any kind. @@ -624,10 +578,10 @@ class StreamExecutionEnvironment(javaEnv: JavaEnv) { require(function != null, "Function must not be null.") val sourceFunction = new SourceFunction[T] { val cleanFun = scalaClean(function) - override def run(ctx: SourceContext[T]) = { + override def run(ctx: SourceContext[T]): Unit = { cleanFun(ctx) } - override def cancel() = {} + override def cancel(): Unit = {} } addSource(sourceFunction) } @@ -652,7 +606,7 @@ class StreamExecutionEnvironment(javaEnv: JavaEnv) { * @return * The result of the job execution, containing elapsed time and accumulators. */ - def execute() = javaEnv.execute() + def execute(): JobExecutionResult = javaEnv.execute() /** Triggers the program execution. The environment will execute all parts of the program that have resulted in a * "sink" operation. Sink operations are for example printing results or forwarding them to a message queue. @@ -662,7 +616,7 @@ class StreamExecutionEnvironment(javaEnv: JavaEnv) { * @return * The result of the job execution, containing elapsed time and accumulators. */ - def execute(jobName: String) = javaEnv.execute(jobName) + def execute(jobName: String): JobExecutionResult = javaEnv.execute(jobName) /** Register a [[JobListener]] in this environment. The [[JobListener]] will be notified on specific job status * changed. @@ -712,7 +666,7 @@ class StreamExecutionEnvironment(javaEnv: JavaEnv) { /** Creates the plan with which the system will execute the program, and returns it as a String using a JSON * representation of the execution data flow graph. Note that this needs to be called, before the plan is executed. */ - def getExecutionPlan = javaEnv.getExecutionPlan + def getExecutionPlan: String = javaEnv.getExecutionPlan /** Getter of the [[org.apache.flink.streaming.api.graph.StreamGraph]] of the streaming job. This call clears * previously registered [[org.apache.flink.api.dag.Transformation transformations]]. @@ -754,14 +708,14 @@ class StreamExecutionEnvironment(javaEnv: JavaEnv) { * The encased ExecutionEnvironment */ @Internal - def getWrappedStreamExecutionEnvironment = javaEnv + def getWrappedStreamExecutionEnvironment: JavaEnv = javaEnv /** Returns a "closure-cleaned" version of the given function. Cleans only if closure cleaning is not disabled in the * [[org.apache.flink.api.common.ExecutionConfig]] */ private[flinkx] def scalaClean[F <: AnyRef](f: F): F = { if (getConfig.isClosureCleanerEnabled) { - ClosureCleaner.scalaClean(f, true, getConfig.getClosureCleanerLevel == ClosureCleanerLevel.RECURSIVE) + ClosureCleaner.scalaClean(f, checkSerializable = true, cleanTransitively = getConfig.getClosureCleanerLevel == ClosureCleanerLevel.RECURSIVE) } else { ClosureCleaner.ensureSerializable(f) } diff --git a/modules/scala-api/src/main/scala/org/apache/flinkx/api/WindowedStream.scala b/modules/scala-api/src/main/scala/org/apache/flinkx/api/WindowedStream.scala index 3637d3c..2cb969f 100644 --- a/modules/scala-api/src/main/scala/org/apache/flinkx/api/WindowedStream.scala +++ b/modules/scala-api/src/main/scala/org/apache/flinkx/api/WindowedStream.scala @@ -531,7 +531,7 @@ class WindowedStream[T, K, W <: Window](javaStream: JavaWStream[T, K, W]) { def minBy(field: String): DataStream[T] = aggregate(AggregationType.MINBY, field) private def aggregate(aggregationType: AggregationType, field: String): DataStream[T] = { - val position = fieldNames2Indices(getInputType(), Array(field))(0) + val position = fieldNames2Indices(getInputType, Array(field))(0) aggregate(aggregationType, position) } @@ -569,5 +569,5 @@ class WindowedStream[T, K, W <: Window](javaStream: JavaWStream[T, K, W]) { /** Gets the output type. */ - private def getInputType(): TypeInformation[T] = javaStream.getInputType + private def getInputType: TypeInformation[T] = javaStream.getInputType } diff --git a/modules/scala-api/src/main/scala/org/apache/flinkx/api/function/RichAllWindowFunction.scala b/modules/scala-api/src/main/scala/org/apache/flinkx/api/function/RichAllWindowFunction.scala index 46d9cd0..4cc0528 100644 --- a/modules/scala-api/src/main/scala/org/apache/flinkx/api/function/RichAllWindowFunction.scala +++ b/modules/scala-api/src/main/scala/org/apache/flinkx/api/function/RichAllWindowFunction.scala @@ -21,7 +21,7 @@ package org.apache.flinkx.api.function import org.apache.flink.api.common.functions.AbstractRichFunction import org.apache.flink.streaming.api.windowing.windows.Window -/** Rich variant of the [[org.apache.flink.api.function.AllWindowFunction]]. +/** Rich variant of the [[org.apache.flinkx.api.function.AllWindowFunction]]. * * As a [[org.apache.flink.api.common.functions.RichFunction]], it gives access to the * [[org.apache.flink.api.common.functions.RuntimeContext]] and provides setup and tear-down methods. diff --git a/modules/scala-api/src/main/scala/org/apache/flinkx/api/function/RichWindowFunction.scala b/modules/scala-api/src/main/scala/org/apache/flinkx/api/function/RichWindowFunction.scala index 6985d63..a870303 100644 --- a/modules/scala-api/src/main/scala/org/apache/flinkx/api/function/RichWindowFunction.scala +++ b/modules/scala-api/src/main/scala/org/apache/flinkx/api/function/RichWindowFunction.scala @@ -21,7 +21,7 @@ package org.apache.flinkx.api.function import org.apache.flink.api.common.functions.AbstractRichFunction import org.apache.flink.streaming.api.windowing.windows.Window -/** Rich variant of the [[org.apache.flink.api.function.WindowFunction]]. +/** Rich variant of the [[org.apache.flinkx.api.function.WindowFunction]]. * * As a [[org.apache.flink.api.common.functions.RichFunction]], it gives access to the * [[org.apache.flink.api.common.functions.RuntimeContext]] and provides setup and tear-down methods. diff --git a/modules/scala-api/src/main/scala/org/apache/flinkx/api/function/StatefulFunction.scala b/modules/scala-api/src/main/scala/org/apache/flinkx/api/function/StatefulFunction.scala index 0f0117a..2016280 100644 --- a/modules/scala-api/src/main/scala/org/apache/flinkx/api/function/StatefulFunction.scala +++ b/modules/scala-api/src/main/scala/org/apache/flinkx/api/function/StatefulFunction.scala @@ -43,8 +43,8 @@ trait StatefulFunction[I, O, S] extends RichFunction { o } - override def open(c: Configuration) = { + override def open(c: Configuration): Unit = { val info = new ValueStateDescriptor[S]("state", stateSerializer) - state = getRuntimeContext().getState(info) + state = getRuntimeContext.getState(info) } } diff --git a/modules/scala-api/src/main/scala/org/apache/flinkx/api/serializer/CaseClassSerializer.scala b/modules/scala-api/src/main/scala/org/apache/flinkx/api/serializer/CaseClassSerializer.scala index b597c7f..62b5e7c 100644 --- a/modules/scala-api/src/main/scala/org/apache/flinkx/api/serializer/CaseClassSerializer.scala +++ b/modules/scala-api/src/main/scala/org/apache/flinkx/api/serializer/CaseClassSerializer.scala @@ -80,11 +80,10 @@ abstract class CaseClassSerializer[T <: Product]( private def isClassArityUsageDisabled = sys.env .get("DISABLE_CASE_CLASS_ARITY_USAGE") - .map(v => + .exists(v => Try(v.toBoolean) .getOrElse(false) ) - .exists(_ == true) def serialize(value: T, target: DataOutputView): Unit = { if (arity > 0 && !isClassArityUsageDisabled) diff --git a/modules/scala-api/src/main/scala/org/apache/flinkx/api/serializer/CollectionSerializerSnapshot.scala b/modules/scala-api/src/main/scala/org/apache/flinkx/api/serializer/CollectionSerializerSnapshot.scala index ec49272..8352dd6 100644 --- a/modules/scala-api/src/main/scala/org/apache/flinkx/api/serializer/CollectionSerializerSnapshot.scala +++ b/modules/scala-api/src/main/scala/org/apache/flinkx/api/serializer/CollectionSerializerSnapshot.scala @@ -4,7 +4,7 @@ import org.apache.flink.api.common.typeutils.{TypeSerializer, TypeSerializerSche import org.apache.flink.core.memory.{DataInputView, DataOutputView} import org.apache.flink.util.InstantiationUtil -class CollectionSerializerSnapshot[F[_], T, S <: TypeSerializer[F[T]]]() extends TypeSerializerSnapshot[F[T]] { +class CollectionSerializerSnapshot[F[_], T, S <: TypeSerializer[F[T]]] extends TypeSerializerSnapshot[F[T]] { def this(ser: TypeSerializer[T], serClass: Class[S], valueClass: Class[T]) = { this() nestedSerializer = ser