Skip to content

Commit

Permalink
remove some deprecated methods, improve scala style, move Java relate…
Browse files Browse the repository at this point in the history
…d classes to flinkx package
  • Loading branch information
novakov-alexey committed Oct 6, 2024
1 parent 486aa60 commit 7321cb4
Show file tree
Hide file tree
Showing 20 changed files with 97 additions and 217 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -27,7 +27,6 @@
import org.apache.flink.util.InstantiationUtil;

import java.io.IOException;
import java.util.Objects;
import java.util.Optional;

import static org.apache.flink.util.Preconditions.checkNotNull;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -6,12 +6,10 @@ import magnolia1.{CaseClass, Magnolia, SealedTrait}
import org.apache.flink.api.common.ExecutionConfig
import org.apache.flink.api.common.typeinfo.TypeInformation

import scala.annotation.tailrec
import scala.collection.mutable
import scala.language.experimental.macros
import scala.reflect._
import scala.reflect.runtime.universe.{Try => _, _}
import scala.util.{Failure, Success, Try}

private[api] trait LowPrioImplicits {
type Typeclass[T] = TypeInformation[T]
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -5,8 +5,6 @@ import scala.reflect.runtime.{currentMirror => cm}
import scala.reflect.runtime.universe
import scala.reflect.runtime.universe._

import org.apache.flinkx.api.serializers.drop

private[serializer] trait ConstructorCompat {

@nowarn("msg=(eliminated by erasure)|(explicit array)")
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -484,7 +484,7 @@ class AllWindowedStream[T, W <: Window](javaStream: JavaAllWStream[T, W]) {
def minBy(field: String): DataStream[T] = aggregate(AggregationType.MINBY, field)

private def aggregate(aggregationType: AggregationType, field: String): DataStream[T] = {
val position = fieldNames2Indices(getInputType(), Array(field))(0)
val position = fieldNames2Indices(getInputType, Array(field))(0)
aggregate(aggregationType, position)
}

Expand Down Expand Up @@ -522,5 +522,5 @@ class AllWindowedStream[T, W <: Window](javaStream: JavaAllWStream[T, W]) {

/** Gets the output type.
*/
private def getInputType(): TypeInformation[T] = javaStream.getInputType
private def getInputType: TypeInformation[T] = javaStream.getInputType
}
Original file line number Diff line number Diff line change
Expand Up @@ -22,14 +22,14 @@ import org.apache.commons.io.IOUtils
import java.lang.invoke.{MethodHandleInfo, SerializedLambda}
import java.lang.reflect.{Field, Modifier}
import java.io.{ByteArrayInputStream, ByteArrayOutputStream, ObjectOutputStream}
import scala.collection.mutable.{Map, Set, Stack}
import org.apache.commons.lang3.{ClassUtils, JavaVersion, SystemUtils}
import org.apache.commons.lang3.ClassUtils
import org.apache.flink.shaded.asm9.org.objectweb.asm.{ClassReader, ClassVisitor, Handle, MethodVisitor, Type}
import org.apache.flink.shaded.asm9.org.objectweb.asm.Opcodes._
import org.apache.flink.shaded.asm9.org.objectweb.asm.tree.{ClassNode, MethodNode}
import org.apache.flink.util.{FlinkException, InstantiationUtil}
import org.apache.flink.util.FlinkException
import org.slf4j.LoggerFactory

import scala.collection.mutable
import scala.jdk.CollectionConverters._

/** A cleaner that renders closures serializable if they can be done so safely.
Expand Down Expand Up @@ -82,12 +82,12 @@ object ClosureCleaner {
/** Return a list of classes that represent closures enclosed in the given closure object.
*/
private def getInnerClosureClasses(obj: AnyRef): List[Class[_]] = {
val seen = Set[Class[_]](obj.getClass)
val stack = Stack[Class[_]](obj.getClass)
while (!stack.isEmpty) {
val seen = mutable.Set[Class[_]](obj.getClass)
val stack = mutable.Stack[Class[_]](obj.getClass)
while (stack.nonEmpty) {
val cr = getClassReader(stack.pop())
if (cr != null) {
val set = Set.empty[Class[_]]
val set = mutable.Set.empty[Class[_]]
cr.accept(new InnerClosureFinder(set), 0)
for (cls <- set -- seen) {
seen += cls
Expand All @@ -99,14 +99,14 @@ object ClosureCleaner {
}

/** Initializes the accessed fields for outer classes and their super classes. */
private def initAccessedFields(accessedFields: Map[Class[_], Set[String]], outerClasses: Seq[Class[_]]): Unit = {
private def initAccessedFields(accessedFields: mutable.Map[Class[_], mutable.Set[String]], outerClasses: Seq[Class[_]]): Unit = {
for (cls <- outerClasses) {
var currentClass = cls
assert(currentClass != null, "The outer class can't be null.")

while (currentClass != null) {
accessedFields(currentClass) = Set.empty[String]
currentClass = currentClass.getSuperclass()
accessedFields(currentClass) = mutable.Set.empty[String]
currentClass = currentClass.getSuperclass
}
}
}
Expand All @@ -116,7 +116,7 @@ object ClosureCleaner {
outerClass: Class[_],
clone: AnyRef,
obj: AnyRef,
accessedFields: Map[Class[_], Set[String]]
accessedFields: mutable.Map[Class[_], mutable.Set[String]]
): Unit = {
for (fieldName <- accessedFields(outerClass)) {
val field = outerClass.getDeclaredField(fieldName)
Expand All @@ -131,7 +131,7 @@ object ClosureCleaner {
parent: AnyRef,
obj: AnyRef,
outerClass: Class[_],
accessedFields: Map[Class[_], Set[String]]
accessedFields: mutable.Map[Class[_], mutable.Set[String]]
): AnyRef = {
val clone = instantiateClass(outerClass, parent)

Expand All @@ -140,7 +140,7 @@ object ClosureCleaner {

while (currentClass != null) {
setAccessedFields(currentClass, clone, obj, accessedFields)
currentClass = currentClass.getSuperclass()
currentClass = currentClass.getSuperclass
}

clone
Expand All @@ -159,7 +159,7 @@ object ClosureCleaner {
* whether to clean enclosing closures transitively
*/
def clean(closure: AnyRef, checkSerializable: Boolean = true, cleanTransitively: Boolean = true): Unit = {
clean(closure, checkSerializable, cleanTransitively, Map.empty)
clean(closure, checkSerializable, cleanTransitively, mutable.Map.empty)
}

def scalaClean[T <: AnyRef](fun: T, checkSerializable: Boolean = true, cleanTransitively: Boolean = true): T = {
Expand Down Expand Up @@ -204,7 +204,7 @@ object ClosureCleaner {
func: AnyRef,
checkSerializable: Boolean,
cleanTransitively: Boolean,
accessedFields: Map[Class[_], Set[String]]
accessedFields: mutable.Map[Class[_], mutable.Set[String]]
): Unit = {

// indylambda check. Most likely to be the case with 2.12, 2.13
Expand Down Expand Up @@ -447,7 +447,7 @@ object ClosureCleaner {

private def instantiateClass(cls: Class[_], enclosingObject: AnyRef): AnyRef = {
// Use reflection to instantiate object without calling constructor
val rf = sun.reflect.ReflectionFactory.getReflectionFactory()
val rf = sun.reflect.ReflectionFactory.getReflectionFactory
val parentCtor = classOf[Object].getDeclaredConstructor()
val newCtor = rf.newConstructorForSerialization(cls, parentCtor)
val obj = newCtor.newInstance().asInstanceOf[AnyRef]
Expand Down Expand Up @@ -627,14 +627,14 @@ object IndylambdaScalaClosures {
def findAccessedFields(
lambdaProxy: SerializedLambda,
lambdaClassLoader: ClassLoader,
accessedFields: Map[Class[_], Set[String]],
accessedFields: mutable.Map[Class[_], mutable.Set[String]],
findTransitively: Boolean
): Unit = {

// We may need to visit the same class multiple times for different methods on it, and we'll
// need to lookup by name. So we use ASM's Tree API and cache the ClassNode/MethodNode.
val classInfoByInternalName = Map.empty[String, (Class[_], ClassNode)]
val methodNodeById = Map.empty[MethodIdentifier[_], MethodNode]
val classInfoByInternalName = mutable.Map.empty[String, (Class[_], ClassNode)]
val methodNodeById = mutable.Map.empty[MethodIdentifier[_], MethodNode]
def getOrUpdateClassInfo(classInternalName: String): (Class[_], ClassNode) = {
val classInfo = classInfoByInternalName.getOrElseUpdate(
classInternalName, {
Expand Down Expand Up @@ -672,19 +672,19 @@ object IndylambdaScalaClosures {
// inner closure
// we need to track calls from "inner closure" to outer classes relative to it (class T, A, B)
// to better find and track field accesses.
val trackedClassInternalNames = Set[String](implClassInternalName)
val trackedClassInternalNames = mutable.Set[String](implClassInternalName)

// Depth-first search for inner closures and track the fields that were accessed in them.
// Start from the lambda body's implementation method, follow method invocations
val visited = Set.empty[MethodIdentifier[_]]
val stack = Stack[MethodIdentifier[_]](implMethodId)
val visited = mutable.Set.empty[MethodIdentifier[_]]
val stack = mutable.Stack[MethodIdentifier[_]](implMethodId)
def pushIfNotVisited(methodId: MethodIdentifier[_]): Unit = {
if (!visited.contains(methodId)) {
stack.push(methodId)
}
}

while (!stack.isEmpty) {
while (stack.nonEmpty) {
val currentId = stack.pop()
visited += currentId

Expand Down Expand Up @@ -816,10 +816,10 @@ private case class MethodIdentifier[T](cls: Class[T], name: String, desc: String
* a set of visited methods to avoid cycles
*/
private class FieldAccessFinder(
fields: Map[Class[_], Set[String]],
fields: mutable.Map[Class[_], mutable.Set[String]],
findTransitively: Boolean,
specificMethod: Option[MethodIdentifier[_]] = None,
visitedMethods: Set[MethodIdentifier[_]] = Set.empty
visitedMethods: mutable.Set[MethodIdentifier[_]] = mutable.Set.empty
) extends ClassVisitor(ASM9) {

override def visitMethod(
Expand Down Expand Up @@ -868,7 +868,7 @@ private class FieldAccessFinder(
ClosureCleaner
.getClassReader(currentClass)
.accept(new FieldAccessFinder(fields, findTransitively, Some(m), visitedMethods), 0)
currentClass = currentClass.getSuperclass()
currentClass = currentClass.getSuperclass
}
}
}
Expand All @@ -878,7 +878,7 @@ private class FieldAccessFinder(
}
}

private class InnerClosureFinder(output: Set[Class[_]]) extends ClassVisitor(ASM9) {
private class InnerClosureFinder(output: mutable.Set[Class[_]]) extends ClassVisitor(ASM9) {
var myName: String = null

// TODO: Recursively find inner closures that we indirectly reference, e.g.
Expand Down Expand Up @@ -908,7 +908,7 @@ private class InnerClosureFinder(output: Set[Class[_]]) extends ClassVisitor(ASM
override def visitMethodInsn(op: Int, owner: String, name: String, desc: String, itf: Boolean): Unit = {
val argTypes = Type.getArgumentTypes(desc)
if (
op == INVOKESPECIAL && name == "<init>" && argTypes.length > 0
op == INVOKESPECIAL && name == "<init>" && argTypes.nonEmpty
&& argTypes(0).toString.startsWith("L") // is it an object?
&& argTypes(0).getInternalName == myName
) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -198,8 +198,8 @@ class ConnectedStreams[IN1, IN2](javaStream: JavaCStream[IN1, IN2]) {
val cleanFun2 = clean(fun2)

val flatMapper = new CoFlatMapFunction[IN1, IN2, R] {
def flatMap1(value: IN1, out: Collector[R]) = { cleanFun1(value).foreach(out.collect _) }
def flatMap2(value: IN2, out: Collector[R]) = { cleanFun2(value).foreach(out.collect _) }
def flatMap1(value: IN1, out: Collector[R]): Unit = { cleanFun1(value).foreach(out.collect) }
def flatMap2(value: IN2, out: Collector[R]): Unit = { cleanFun2(value).foreach(out.collect) }
}

flatMap(flatMapper)
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -53,30 +53,12 @@ class DataStream[T](stream: JavaStream[T]) {
*/
@deprecated
@PublicEvolving
def getType(): TypeInformation[T] = stream.getType()

/** Returns the parallelism of this operation.
*
* @deprecated
* Use [[parallelism]] instead.
*/
@deprecated
@PublicEvolving
def getParallelism = stream.getParallelism

/** Returns the execution config.
*
* @deprecated
* Use [[executionConfig]] instead.
*/
@deprecated
@PublicEvolving
def getExecutionConfig = stream.getExecutionConfig
def getType: TypeInformation[T] = stream.getType

/** Returns the ID of the DataStream.
*/
@Internal
private[flinkx] def getId = stream.getId()
private[flinkx] def getId = stream.getId

// --------------------------------------------------------------------------
// Scalaesk accessors
Expand All @@ -88,20 +70,20 @@ class DataStream[T](stream: JavaStream[T]) {

/** Returns the TypeInformation for the elements of this DataStream.
*/
def dataType: TypeInformation[T] = stream.getType()
def dataType: TypeInformation[T] = stream.getType

/** Returns the execution config.
*/
def executionConfig: ExecutionConfig = stream.getExecutionConfig()
def executionConfig: ExecutionConfig = stream.getExecutionConfig

/** Returns the [[StreamExecutionEnvironment]] associated with this data stream
*/
def executionEnvironment: StreamExecutionEnvironment =
new StreamExecutionEnvironment(stream.getExecutionEnvironment())
new StreamExecutionEnvironment(stream.getExecutionEnvironment)

/** Returns the parallelism of this operation.
*/
def parallelism: Int = stream.getParallelism()
def parallelism: Int = stream.getParallelism

/** Sets the parallelism of this operation. This must be at least 1.
*/
Expand Down Expand Up @@ -130,23 +112,12 @@ class DataStream[T](stream: JavaStream[T]) {
/** Returns the minimum resources of this operation.
*/
@PublicEvolving
def minResources: ResourceSpec = stream.getMinResources()
def minResources: ResourceSpec = stream.getMinResources

/** Returns the preferred resources of this operation.
*/
@PublicEvolving
def preferredResources: ResourceSpec = stream.getPreferredResources()

/** Gets the name of the current data stream. This name is used by the visualization and logging during runtime.
*
* @return
* Name of the stream.
* @deprecated
* Use [[name]] instead
*/
@deprecated
@PublicEvolving
def getName: String = name
def preferredResources: ResourceSpec = stream.getPreferredResources

/** Gets the name of the current data stream. This name is used by the visualization and logging during runtime.
*
Expand Down Expand Up @@ -365,7 +336,7 @@ class DataStream[T](stream: JavaStream[T]) {
val keyType: TypeInformation[K] = implicitly[TypeInformation[K]]

val keyExtractor = new KeySelector[T, K] with ResultTypeQueryable[K] {
def getKey(in: T) = cleanFun(in)
def getKey(in: T): K = cleanFun(in)
override def getProducedType: TypeInformation[K] = keyType
}
asScalaStream(new JavaKeyedStream(stream, keyExtractor, keyType))
Expand Down Expand Up @@ -411,8 +382,8 @@ class DataStream[T](stream: JavaStream[T]) {
val cleanFun = clean(fun)

val keyExtractor = new KeySelector[T, K] with ResultTypeQueryable[K] {
def getKey(in: T) = cleanFun(in)
override def getProducedType(): TypeInformation[K] = keyType
def getKey(in: T): K = cleanFun(in)
override def getProducedType: TypeInformation[K] = keyType
}

asScalaStream(stream.partitionCustom(partitioner, keyExtractor))
Expand Down Expand Up @@ -582,7 +553,7 @@ class DataStream[T](stream: JavaStream[T]) {
}
val cleanFun = clean(fun)
val flatMapper = new FlatMapFunction[T, R] {
def flatMap(in: T, out: Collector[R]) = { cleanFun(in, out) }
def flatMap(in: T, out: Collector[R]): Unit = { cleanFun(in, out) }
}
flatMap(flatMapper)
}
Expand All @@ -595,7 +566,7 @@ class DataStream[T](stream: JavaStream[T]) {
}
val cleanFun = clean(fun)
val flatMapper = new FlatMapFunction[T, R] {
def flatMap(in: T, out: Collector[R]) = { cleanFun(in).foreach(out.collect _) }
def flatMap(in: T, out: Collector[R]): Unit = { cleanFun(in).foreach(out.collect) }
}
flatMap(flatMapper)
}
Expand Down Expand Up @@ -634,7 +605,7 @@ class DataStream[T](stream: JavaStream[T]) {
}
val cleanFun = clean(fun)
val filterFun = new FilterFunction[T] {
def filter(in: T) = cleanFun(in)
def filter(in: T): Boolean = cleanFun(in)
}
filter(filterFun)
}
Expand Down Expand Up @@ -757,7 +728,7 @@ class DataStream[T](stream: JavaStream[T]) {
* The closed DataStream.
*/
@PublicEvolving
def printToErr() = stream.printToErr()
def printToErr(): DataStreamSink[T] = stream.printToErr()

/** Writes a DataStream to the standard output stream (stdout). For each element of the DataStream the result of
* [[AnyRef.toString()]] is written.
Expand All @@ -780,7 +751,7 @@ class DataStream[T](stream: JavaStream[T]) {
* The closed DataStream.
*/
@PublicEvolving
def printToErr(sinkIdentifier: String) = stream.printToErr(sinkIdentifier)
def printToErr(sinkIdentifier: String): DataStreamSink[T] = stream.printToErr(sinkIdentifier)

/** Writes a DataStream using the given [[OutputFormat]].
*/
Expand Down
Loading

0 comments on commit 7321cb4

Please sign in to comment.