diff --git a/.gitignore b/.gitignore index 97fb008..b712623 100644 --- a/.gitignore +++ b/.gitignore @@ -10,4 +10,8 @@ target .metals .vscode metals.sbt -.scala-build \ No newline at end of file +.scala-build +.ammonite/* +*.jar +*checkpoint +sink-table \ No newline at end of file diff --git a/README.md b/README.md index 0e0ba96..7cbba97 100644 --- a/README.md +++ b/README.md @@ -50,6 +50,17 @@ If you want to create new project easily check this __Giter8 template__ out: [no We suggest to remove the official `flink-scala` and `flink-streaming-scala` dependencies altogether to simplify the migration and do not to mix two flavors of API in the same project. But it's technically possible and not required. +## Examples + +There is a wide range of [code examples](https://github.com/flink-extended/flink-scala-api/tree/master/modules/examples) to introduce you to flink-scala-api, both using Scala scripts and multimodule applications. These examples include: + +- Flink jobs built using Scala 3 with Ammonite and Scala CLI; +- A complete application for fraud detection; +- Examples using Datastream and Table APIs; +- Simple job developed interactively via Jupyter notebooks; +- Word count reading text from a web socket; +- Example usage of DataGen connector and Kafka sink; +- And more; ## Differences with the Official Flink Scala API diff --git a/build.sbt b/build.sbt index 491dac0..546436c 100644 --- a/build.sbt +++ b/build.sbt @@ -7,6 +7,12 @@ lazy val rootScalaVersion = "3.3.3" lazy val flinkVersion = System.getProperty("flinkVersion", "1.18.1") lazy val root = (project in file(".")) + .aggregate(`scala-api`, `examples`) + .settings( + publish / skip := true + ) + +lazy val `scala-api` = (project in file("modules/scala-api")) .settings(ReleaseProcess.releaseSettings(flinkVersion) *) .settings( name := "flink-scala-api", @@ -108,3 +114,36 @@ lazy val root = (project in file(".")) mdocIn := new File("README.md") ) .enablePlugins(MdocPlugin) + +val flinkMajorAndMinorVersion = + flinkVersion.split("\\.").toList.take(2).mkString(".") + +lazy val `examples` = (project in file("modules/examples")) + .settings( + scalaVersion := rootScalaVersion, + Test / fork := true, + libraryDependencies ++= Seq( + "org.flinkextended" %% "flink-scala-api" % "1.18.1_1.1.6", + "org.apache.flink" % "flink-runtime-web" % "1.18.1" % Provided, + "org.apache.flink" % "flink-clients" % "1.18.1" % Provided, + "org.apache.flink" % "flink-state-processor-api" % "1.18.1" % Provided, + "org.apache.flink" % "flink-connector-kafka" % "3.0.2-1.18" % Provided, + "org.apache.flink" % "flink-connector-files" % "1.18.1" % Provided, + "org.apache.flink" % "flink-table-runtime" % "1.18.1" % Provided, + "org.apache.flink" % "flink-table-planner-loader" % "1.18.1" % Provided, + "io.bullet" %% "borer-core" % "1.14.0" % Provided, + "ch.qos.logback" % "logback-classic" % "1.4.14" % Provided, + "org.apache.flink" % "flink-test-utils" % "1.18.1" % Test, + "org.apache.flink" % "flink-streaming-java" % "1.18.1" % Test classifier "tests", + "org.scalatest" %% "scalatest" % "3.2.15" % Test + ), + Compile / run := Defaults + .runTask( + Compile / fullClasspath, + Compile / run / mainClass, + Compile / run / runner + ) + .evaluated, + Compile / run / fork := true + ) + .enablePlugins(ProtobufPlugin) diff --git a/modules/examples/notebooks/word_count_example.ipynb b/modules/examples/notebooks/word_count_example.ipynb new file mode 100644 index 0000000..a6d3309 --- /dev/null +++ b/modules/examples/notebooks/word_count_example.ipynb @@ -0,0 +1,435 @@ +{ + "cells": [ + { + "cell_type": "code", + "execution_count": 1, + "metadata": {}, + "outputs": [ + { + "data": { + "text/plain": [ + "\u001b[32mimport \u001b[39m\u001b[36m$ivy.$\u001b[39m\n", + "\u001b[32mimport \u001b[39m\u001b[36m$ivy.$\u001b[39m\n", + "\u001b[32mimport \u001b[39m\u001b[36morg.apache.flinkx.api._\u001b[39m\n", + "\u001b[32mimport \u001b[39m\u001b[36morg.apache.flinkx.api.serializers._\u001b[39m\n", + "\u001b[32mimport \u001b[39m\u001b[36morg.apache.flink.configuration.Configuration\u001b[39m" + ] + }, + "execution_count": 1, + "metadata": {}, + "output_type": "execute_result" + } + ], + "source": [ + "import $ivy.`org.flinkextended::flink-scala-api:1.18.1_1.1.6`\n", + "import $ivy.`org.apache.flink:flink-clients:1.18.1`\n", + "\n", + "import org.apache.flinkx.api._\n", + "import org.apache.flinkx.api.serializers._\n", + "import org.apache.flink.configuration.Configuration" + ] + }, + { + "cell_type": "code", + "execution_count": 2, + "metadata": {}, + "outputs": [ + { + "name": "stderr", + "output_type": "stream", + "text": [ + "SLF4J(W): No SLF4J providers were found.\n", + "SLF4J(W): Defaulting to no-operation (NOP) logger implementation\n", + "SLF4J(W): See https://www.slf4j.org/codes.html#noProviders for further details.\n" + ] + }, + { + "data": { + "text/plain": [ + "\u001b[36menv\u001b[39m: \u001b[32mStreamExecutionEnvironment\u001b[39m = org.apache.flinkx.api.StreamExecutionEnvironment@fd2988a" + ] + }, + "execution_count": 2, + "metadata": {}, + "output_type": "execute_result" + } + ], + "source": [ + "val env = StreamExecutionEnvironment.createLocalEnvironmentWithWebUI(new Configuration())" + ] + }, + { + "cell_type": "code", + "execution_count": 10, + "metadata": {}, + "outputs": [ + { + "data": { + "text/plain": [ + "\u001b[36mtext\u001b[39m: \u001b[32mDataStream\u001b[39m[\u001b[32mString\u001b[39m] = org.apache.flinkx.api.DataStream@4da98f07" + ] + }, + "execution_count": 10, + "metadata": {}, + "output_type": "execute_result" + } + ], + "source": [ + "val text = env.fromElements(\n", + " \"To be, or not to be,--that is the question:--\",\n", + " \"Whether 'tis nobler in the mind to suffer\",\n", + " \"The slings and arrows of outrageous fortune\",\n", + " \"Or to take arms against a sea of troubles,\"\n", + " )" + ] + }, + { + "cell_type": "code", + "execution_count": 11, + "metadata": {}, + "outputs": [ + { + "ename": "org.apache.flink.util.FlinkException", + "evalue": "Failed to execute job 'wordCount'.", + "output_type": "error", + "traceback": [ + "\u001b[31morg.apache.flink.util.FlinkException: Failed to execute job 'wordCount'.\u001b[39m", + " org.apache.flink.streaming.api.environment.StreamExecutionEnvironment.executeAsync(\u001b[32mStreamExecutionEnvironment.java\u001b[39m:\u001b[32m2253\u001b[39m)", + " org.apache.flink.streaming.api.environment.StreamExecutionEnvironment.executeAsync(\u001b[32mStreamExecutionEnvironment.java\u001b[39m:\u001b[32m2219\u001b[39m)", + " org.apache.flink.streaming.api.datastream.DataStream.executeAndCollectWithClient(\u001b[32mDataStream.java\u001b[39m:\u001b[32m1471\u001b[39m)", + " org.apache.flink.streaming.api.datastream.DataStream.executeAndCollect(\u001b[32mDataStream.java\u001b[39m:\u001b[32m1336\u001b[39m)", + " org.apache.flinkx.api.DataStream.executeAndCollect(\u001b[32mDataStream.scala\u001b[39m:\u001b[32m851\u001b[39m)", + " ammonite.$sess.cell11$Helper.(\u001b[32mcell11.sc\u001b[39m:\u001b[32m6\u001b[39m)", + " ammonite.$sess.cell11$.(\u001b[32mcell11.sc\u001b[39m:\u001b[32m7\u001b[39m)", + "\u001b[31mjava.lang.RuntimeException: org.apache.flink.runtime.client.JobInitializationException: Could not start the JobMaster.\u001b[39m", + " org.apache.flink.util.ExceptionUtils.rethrow(\u001b[32mExceptionUtils.java\u001b[39m:\u001b[32m321\u001b[39m)", + " org.apache.flink.util.function.FunctionUtils.lambda$uncheckedFunction$2(\u001b[32mFunctionUtils.java\u001b[39m:\u001b[32m75\u001b[39m)", + " java.util.concurrent.CompletableFuture$UniApply.tryFire(\u001b[32mCompletableFuture.java\u001b[39m:\u001b[32m642\u001b[39m)", + " java.util.concurrent.CompletableFuture$Completion.exec(\u001b[32mCompletableFuture.java\u001b[39m:\u001b[32m479\u001b[39m)", + " java.util.concurrent.ForkJoinTask.doExec(\u001b[32mForkJoinTask.java\u001b[39m:\u001b[32m290\u001b[39m)", + " java.util.concurrent.ForkJoinPool$WorkQueue.topLevelExec(\u001b[32mForkJoinPool.java\u001b[39m:\u001b[32m1020\u001b[39m)", + " java.util.concurrent.ForkJoinPool.scan(\u001b[32mForkJoinPool.java\u001b[39m:\u001b[32m1656\u001b[39m)", + " java.util.concurrent.ForkJoinPool.runWorker(\u001b[32mForkJoinPool.java\u001b[39m:\u001b[32m1594\u001b[39m)", + " java.util.concurrent.ForkJoinWorkerThread.run(\u001b[32mForkJoinWorkerThread.java\u001b[39m:\u001b[32m183\u001b[39m)", + "\u001b[31morg.apache.flink.runtime.client.JobInitializationException: Could not start the JobMaster.\u001b[39m", + " org.apache.flink.runtime.jobmaster.DefaultJobMasterServiceProcess.lambda$new$0(\u001b[32mDefaultJobMasterServiceProcess.java\u001b[39m:\u001b[32m97\u001b[39m)", + " java.util.concurrent.CompletableFuture.uniWhenComplete(\u001b[32mCompletableFuture.java\u001b[39m:\u001b[32m859\u001b[39m)", + " java.util.concurrent.CompletableFuture$UniWhenComplete.tryFire(\u001b[32mCompletableFuture.java\u001b[39m:\u001b[32m837\u001b[39m)", + " java.util.concurrent.CompletableFuture.postComplete(\u001b[32mCompletableFuture.java\u001b[39m:\u001b[32m506\u001b[39m)", + " java.util.concurrent.CompletableFuture$AsyncSupply.run(\u001b[32mCompletableFuture.java\u001b[39m:\u001b[32m1705\u001b[39m)", + " java.util.concurrent.ThreadPoolExecutor.runWorker(\u001b[32mThreadPoolExecutor.java\u001b[39m:\u001b[32m1128\u001b[39m)", + " java.util.concurrent.ThreadPoolExecutor$Worker.run(\u001b[32mThreadPoolExecutor.java\u001b[39m:\u001b[32m628\u001b[39m)", + " java.lang.Thread.run(\u001b[32mThread.java\u001b[39m:\u001b[32m829\u001b[39m)", + "\u001b[31mjava.util.concurrent.CompletionException: java.lang.RuntimeException: java.lang.ClassNotFoundException: org.apache.flink.api.common.ExecutionConfig\u001b[39m", + " java.util.concurrent.CompletableFuture.encodeThrowable(\u001b[32mCompletableFuture.java\u001b[39m:\u001b[32m314\u001b[39m)", + " java.util.concurrent.CompletableFuture.completeThrowable(\u001b[32mCompletableFuture.java\u001b[39m:\u001b[32m319\u001b[39m)", + " java.util.concurrent.CompletableFuture$AsyncSupply.run(\u001b[32mCompletableFuture.java\u001b[39m:\u001b[32m1702\u001b[39m)", + " java.util.concurrent.ThreadPoolExecutor.runWorker(\u001b[32mThreadPoolExecutor.java\u001b[39m:\u001b[32m1128\u001b[39m)", + " java.util.concurrent.ThreadPoolExecutor$Worker.run(\u001b[32mThreadPoolExecutor.java\u001b[39m:\u001b[32m628\u001b[39m)", + " java.lang.Thread.run(\u001b[32mThread.java\u001b[39m:\u001b[32m829\u001b[39m)", + "\u001b[31mjava.lang.RuntimeException: java.lang.ClassNotFoundException: org.apache.flink.api.common.ExecutionConfig\u001b[39m", + " org.apache.flink.util.ExceptionUtils.rethrow(\u001b[32mExceptionUtils.java\u001b[39m:\u001b[32m321\u001b[39m)", + " org.apache.flink.util.function.FunctionUtils.lambda$uncheckedSupplier$4(\u001b[32mFunctionUtils.java\u001b[39m:\u001b[32m114\u001b[39m)", + " java.util.concurrent.CompletableFuture$AsyncSupply.run(\u001b[32mCompletableFuture.java\u001b[39m:\u001b[32m1700\u001b[39m)", + " java.util.concurrent.ThreadPoolExecutor.runWorker(\u001b[32mThreadPoolExecutor.java\u001b[39m:\u001b[32m1128\u001b[39m)", + " java.util.concurrent.ThreadPoolExecutor$Worker.run(\u001b[32mThreadPoolExecutor.java\u001b[39m:\u001b[32m628\u001b[39m)", + " java.lang.Thread.run(\u001b[32mThread.java\u001b[39m:\u001b[32m829\u001b[39m)", + "\u001b[31mjava.lang.ClassNotFoundException: org.apache.flink.api.common.ExecutionConfig\u001b[39m", + " jdk.internal.loader.BuiltinClassLoader.loadClass(\u001b[32mBuiltinClassLoader.java\u001b[39m:\u001b[32m581\u001b[39m)", + " jdk.internal.loader.ClassLoaders$AppClassLoader.loadClass(\u001b[32mClassLoaders.java\u001b[39m:\u001b[32m178\u001b[39m)", + " java.lang.ClassLoader.loadClass(\u001b[32mClassLoader.java\u001b[39m:\u001b[32m527\u001b[39m)", + " java.lang.Class.forName0(\u001b[32mNative Method\u001b[39m)", + " java.lang.Class.forName(\u001b[32mClass.java\u001b[39m:\u001b[32m398\u001b[39m)", + " org.apache.flink.util.InstantiationUtil$ClassLoaderObjectInputStream.resolveClass(\u001b[32mInstantiationUtil.java\u001b[39m:\u001b[32m78\u001b[39m)", + " java.io.ObjectInputStream.readNonProxyDesc(\u001b[32mObjectInputStream.java\u001b[39m:\u001b[32m2003\u001b[39m)", + " java.io.ObjectInputStream.readClassDesc(\u001b[32mObjectInputStream.java\u001b[39m:\u001b[32m1870\u001b[39m)", + " java.io.ObjectInputStream.readOrdinaryObject(\u001b[32mObjectInputStream.java\u001b[39m:\u001b[32m2201\u001b[39m)", + " java.io.ObjectInputStream.readObject0(\u001b[32mObjectInputStream.java\u001b[39m:\u001b[32m1687\u001b[39m)", + " java.io.ObjectInputStream.readObject(\u001b[32mObjectInputStream.java\u001b[39m:\u001b[32m489\u001b[39m)", + " java.io.ObjectInputStream.readObject(\u001b[32mObjectInputStream.java\u001b[39m:\u001b[32m447\u001b[39m)", + " org.apache.flink.util.InstantiationUtil.deserializeObject(\u001b[32mInstantiationUtil.java\u001b[39m:\u001b[32m539\u001b[39m)", + " org.apache.flink.util.InstantiationUtil.deserializeObject(\u001b[32mInstantiationUtil.java\u001b[39m:\u001b[32m527\u001b[39m)", + " org.apache.flink.util.SerializedValue.deserializeValue(\u001b[32mSerializedValue.java\u001b[39m:\u001b[32m67\u001b[39m)", + " org.apache.flink.runtime.scheduler.DefaultSchedulerFactory.createInstance(\u001b[32mDefaultSchedulerFactory.java\u001b[39m:\u001b[32m101\u001b[39m)", + " org.apache.flink.runtime.jobmaster.DefaultSlotPoolServiceSchedulerFactory.createScheduler(\u001b[32mDefaultSlotPoolServiceSchedulerFactory.java\u001b[39m:\u001b[32m122\u001b[39m)", + " org.apache.flink.runtime.jobmaster.JobMaster.createScheduler(\u001b[32mJobMaster.java\u001b[39m:\u001b[32m379\u001b[39m)", + " org.apache.flink.runtime.jobmaster.JobMaster.(\u001b[32mJobMaster.java\u001b[39m:\u001b[32m356\u001b[39m)", + " org.apache.flink.runtime.jobmaster.factories.DefaultJobMasterServiceFactory.internalCreateJobMasterService(\u001b[32mDefaultJobMasterServiceFactory.java\u001b[39m:\u001b[32m128\u001b[39m)", + " org.apache.flink.runtime.jobmaster.factories.DefaultJobMasterServiceFactory.lambda$createJobMasterService$0(\u001b[32mDefaultJobMasterServiceFactory.java\u001b[39m:\u001b[32m100\u001b[39m)", + " org.apache.flink.util.function.FunctionUtils.lambda$uncheckedSupplier$4(\u001b[32mFunctionUtils.java\u001b[39m:\u001b[32m112\u001b[39m)", + " java.util.concurrent.CompletableFuture$AsyncSupply.run(\u001b[32mCompletableFuture.java\u001b[39m:\u001b[32m1700\u001b[39m)", + " java.util.concurrent.ThreadPoolExecutor.runWorker(\u001b[32mThreadPoolExecutor.java\u001b[39m:\u001b[32m1128\u001b[39m)", + " java.util.concurrent.ThreadPoolExecutor$Worker.run(\u001b[32mThreadPoolExecutor.java\u001b[39m:\u001b[32m628\u001b[39m)", + " java.lang.Thread.run(\u001b[32mThread.java\u001b[39m:\u001b[32m829\u001b[39m)" + ] + } + ], + "source": [ + "val words = text\n", + " .flatMap(_.toLowerCase.split(\"\\\\W+\"))\n", + " .map((_, 1))\n", + " .keyBy(_._1)\n", + " .sum(1)\n", + " .executeAndCollect(\"wordCount\").toList" + ] + }, + { + "cell_type": "code", + "execution_count": 13, + "metadata": {}, + "outputs": [ + { + "data": { + "text/html": [ + "\n", + " \n", + " " + ] + }, + "metadata": {}, + "output_type": "display_data" + }, + { + "data": { + "application/vnd.plotly.v1+json": { + "config": {}, + "data": [ + { + "type": "bar", + "x": [ + "whether", + "take", + "sea", + "troubles", + "is", + "mind", + "be", + "and", + "outrageous", + "the", + "the", + "the", + "slings", + "or", + "be", + "that", + "question", + "arrows", + "or", + "a", + "to", + "to", + "tis", + "in", + "to", + "suffer", + "of", + "to", + "arms", + "of", + "against", + "not", + "nobler", + "fortune" + ], + "y": [ + 1, + 1, + 1, + 1, + 1, + 1, + 1, + 1, + 1, + 1, + 2, + 3, + 1, + 1, + 2, + 1, + 1, + 1, + 2, + 1, + 1, + 2, + 1, + 1, + 3, + 1, + 1, + 4, + 1, + 2, + 1, + 1, + 1, + 1 + ] + } + ], + "layout": {} + }, + "text/html": [ + "
\n", + "\n", + " " + ] + }, + "metadata": {}, + "output_type": "display_data" + }, + { + "data": { + "text/plain": [ + "\u001b[32mimport \u001b[39m\u001b[36m$ivy.$\u001b[39m\n", + "\u001b[32mimport \u001b[39m\u001b[36mplotly._\u001b[39m\n", + "\u001b[32mimport \u001b[39m\u001b[36mplotly.element._\u001b[39m\n", + "\u001b[32mimport \u001b[39m\u001b[36mplotly.layout._\u001b[39m\n", + "\u001b[32mimport \u001b[39m\u001b[36mplotly.Almond._\u001b[39m\n", + "\u001b[36mx\u001b[39m: \u001b[32mList\u001b[39m[\u001b[32mString\u001b[39m] = \u001b[33mList\u001b[39m(\n", + " \u001b[32m\"whether\"\u001b[39m,\n", + " \u001b[32m\"take\"\u001b[39m,\n", + " \u001b[32m\"sea\"\u001b[39m,\n", + " \u001b[32m\"troubles\"\u001b[39m,\n", + " \u001b[32m\"is\"\u001b[39m,\n", + " \u001b[32m\"mind\"\u001b[39m,\n", + " \u001b[32m\"be\"\u001b[39m,\n", + " \u001b[32m\"and\"\u001b[39m,\n", + " \u001b[32m\"outrageous\"\u001b[39m,\n", + " \u001b[32m\"the\"\u001b[39m,\n", + " \u001b[32m\"the\"\u001b[39m,\n", + " \u001b[32m\"the\"\u001b[39m,\n", + " \u001b[32m\"slings\"\u001b[39m,\n", + " \u001b[32m\"or\"\u001b[39m,\n", + " \u001b[32m\"be\"\u001b[39m,\n", + " \u001b[32m\"that\"\u001b[39m,\n", + " \u001b[32m\"question\"\u001b[39m,\n", + " \u001b[32m\"arrows\"\u001b[39m,\n", + " \u001b[32m\"or\"\u001b[39m,\n", + " \u001b[32m\"a\"\u001b[39m,\n", + " \u001b[32m\"to\"\u001b[39m,\n", + " \u001b[32m\"to\"\u001b[39m,\n", + " \u001b[32m\"tis\"\u001b[39m,\n", + " \u001b[32m\"in\"\u001b[39m,\n", + " \u001b[32m\"to\"\u001b[39m,\n", + " \u001b[32m\"suffer\"\u001b[39m,\n", + " \u001b[32m\"of\"\u001b[39m,\n", + " \u001b[32m\"to\"\u001b[39m,\n", + " \u001b[32m\"arms\"\u001b[39m,\n", + " \u001b[32m\"of\"\u001b[39m,\n", + " \u001b[32m\"against\"\u001b[39m,\n", + " \u001b[32m\"not\"\u001b[39m,\n", + " \u001b[32m\"nobler\"\u001b[39m,\n", + " \u001b[32m\"fortune\"\u001b[39m\n", + ")\n", + "\u001b[36my\u001b[39m: \u001b[32mList\u001b[39m[\u001b[32mInt\u001b[39m] = \u001b[33mList\u001b[39m(\n", + " \u001b[32m1\u001b[39m,\n", + " \u001b[32m1\u001b[39m,\n", + " \u001b[32m1\u001b[39m,\n", + " \u001b[32m1\u001b[39m,\n", + " \u001b[32m1\u001b[39m,\n", + " \u001b[32m1\u001b[39m,\n", + " \u001b[32m1\u001b[39m,\n", + " \u001b[32m1\u001b[39m,\n", + " \u001b[32m1\u001b[39m,\n", + " \u001b[32m1\u001b[39m,\n", + " \u001b[32m2\u001b[39m,\n", + " \u001b[32m3\u001b[39m,\n", + " \u001b[32m1\u001b[39m,\n", + " \u001b[32m1\u001b[39m,\n", + " \u001b[32m2\u001b[39m,\n", + " \u001b[32m1\u001b[39m,\n", + " \u001b[32m1\u001b[39m,\n", + " \u001b[32m1\u001b[39m,\n", + " \u001b[32m2\u001b[39m,\n", + " \u001b[32m1\u001b[39m,\n", + " \u001b[32m1\u001b[39m,\n", + " \u001b[32m2\u001b[39m,\n", + " \u001b[32m1\u001b[39m,\n", + " \u001b[32m1\u001b[39m,\n", + " \u001b[32m3\u001b[39m,\n", + " \u001b[32m1\u001b[39m,\n", + " \u001b[32m1\u001b[39m,\n", + " \u001b[32m4\u001b[39m,\n", + " \u001b[32m1\u001b[39m,\n", + " \u001b[32m2\u001b[39m,\n", + " \u001b[32m1\u001b[39m,\n", + " \u001b[32m1\u001b[39m,\n", + " \u001b[32m1\u001b[39m,\n", + " \u001b[32m1\u001b[39m\n", + ")\n", + "\u001b[36mres13_6\u001b[39m: \u001b[32mString\u001b[39m = \u001b[32m\"plot-bf228d5c-de91-4b68-a885-98733b917ad2\"\u001b[39m" + ] + }, + "execution_count": 13, + "metadata": {}, + "output_type": "execute_result" + } + ], + "source": [ + "import $ivy.`org.plotly-scala::plotly-almond:0.8.1`\n", + "\n", + "import plotly._\n", + "import plotly.element._\n", + "import plotly.layout._\n", + "import plotly.Almond._\n", + "\n", + "val (x, y) = words.unzip\n", + "\n", + "Bar(x, y).plot()" + ] + } + ], + "metadata": { + "kernelspec": { + "display_name": "Scala", + "language": "scala", + "name": "scala" + }, + "language_info": { + "codemirror_mode": "text/x-scala", + "file_extension": ".sc", + "mimetype": "text/x-scala", + "name": "scala", + "nbconvert_exporter": "script", + "version": "2.13.11" + }, + "orig_nbformat": 4 + }, + "nbformat": 4, + "nbformat_minor": 2 +} diff --git a/modules/examples/scripts/debug-sql.sc b/modules/examples/scripts/debug-sql.sc new file mode 100644 index 0000000..93e6bd2 --- /dev/null +++ b/modules/examples/scripts/debug-sql.sc @@ -0,0 +1,55 @@ +import $ivy.`org.flinkextended::flink-scala-api:1.18.1_1.1.6` + +import $ivy.`org.apache.flink:flink-clients:1.18.1` + +import $ivy.`org.apache.flink:flink-streaming-scala_2.12:1.18.1` + +import $ivy.`org.apache.flink:flink-table-api-java:1.18.1` +import $ivy.`org.apache.flink:flink-table-api-java-bridge:1.18.1` +import $ivy.`org.apache.flink:flink-table-runtime:1.18.1` +import $ivy.`org.apache.flink:flink-table-planner_2.12:1.18.1` + +import org.apache.flink.table.api._ +import org.apache.flink.table.api.bridge.java.StreamTableEnvironment +import org.apache.flink.connector.datagen.table.DataGenConnectorOptions + +import org.apache.flinkx.api._ +import org.apache.flinkx.api.serializers._ + +import java.lang.{Long => JLong} + +val env = StreamExecutionEnvironment.getExecutionEnvironment +val tEnv = StreamTableEnvironment.create(env.getJavaEnv) + +val settings = EnvironmentSettings.newInstance().inStreamingMode().build() + +val table = TableEnvironment.create(settings) + +table.createTemporaryTable( + "SourceTable", + TableDescriptor + .forConnector("datagen") + .schema( + Schema.newBuilder + .column("BookId", DataTypes.INT()) + .build + ) + .option(DataGenConnectorOptions.ROWS_PER_SECOND, new JLong(1)) + .build +) + +val tableDescriptor = TableDescriptor + .forConnector("datagen") + .schema( + Schema.newBuilder + .column("id", DataTypes.INT.notNull) + .column("a", DataTypes.ROW(DataTypes.FIELD("np", DataTypes.INT.notNull())).notNull()) + .build) + .build +table.createTemporaryTable("t1", tableDescriptor) +table.createTemporaryTable("t2", tableDescriptor) +// table.dropTemporaryTable("t1") +// table.dropTemporaryTable("t2") + +val res = table.executeSql("EXPLAIN SELECT a.id, COALESCE(a.a.np, b.a.np) c1, IFNULL(a.a.np, b.a.np) c2 FROM t1 a left JOIN t2 b ON a.id=b.id where a.a is null or a.a.np is null") +res.print diff --git a/modules/examples/scripts/flink-amm.sc b/modules/examples/scripts/flink-amm.sc new file mode 100644 index 0000000..326a0cd --- /dev/null +++ b/modules/examples/scripts/flink-amm.sc @@ -0,0 +1,52 @@ +import $cp.lib.`flink-faker-0.4.0.jar` + +import $ivy.`org.flinkextended::flink-scala-api:1.15.4_1.0.0` +import $ivy.`org.apache.flink:flink-clients:1.15.2` +import $ivy.`org.apache.flink:flink-csv:1.15.2` +import $ivy.`org.apache.flink:flink-table-api-java:1.15.2` +import $ivy.`org.apache.flink:flink-table-api-java-bridge:1.15.2` +import $ivy.`org.apache.flink:flink-table-runtime:1.15.2` +import $ivy.`org.apache.flink:flink-table-planner-loader:1.15.2` + +import org.apache.flink.table.api._ +import org.apache.flink.table.api.bridge.java.StreamTableEnvironment +import org.apache.flink.connector.datagen.table.DataGenConnectorOptions + +import org.apache.flink.api._ +import org.apache.flink.api.serializers._ + +import _root_.java.lang.{Long => JLong} + +val env = StreamExecutionEnvironment.getExecutionEnvironment +val tEnv = StreamTableEnvironment.create(env.getJavaEnv) +val settings = EnvironmentSettings.newInstance().inStreamingMode().build() +val table = TableEnvironment.create(settings) + +val tableDescriptor = TableDescriptor + .forConnector("faker") + .schema( + Schema.newBuilder + .column( + "id", + DataTypes.INT // .notNull + ) + .column( + "a", + DataTypes.ROW(DataTypes.FIELD("np", DataTypes.INT)) + ) + .build + ) + .option("fields.id.expression", "#{number.numberBetween '0','10'}") + .option("fields.a.np.expression", "#{number.numberBetween '20','30'}") + // .option("fields.a.np.null-rate", "0.5") + .option("fields.a.null-rate", "0.5") + .option("rows-per-second", "50") + .build +table.createTemporaryTable("t1", tableDescriptor) +// table.dropTemporaryTable("t1") + +val res = table.executeSql( + "SELECT a.id, COALESCE(a.a.np, a.id) c1, IFNULL(a.a.np, a.id) c2, a.a.np FROM t1 a" + // "show create table t1" +) +res.print diff --git a/modules/examples/scripts/flink-scala-cli.scala b/modules/examples/scripts/flink-scala-cli.scala new file mode 100644 index 0000000..2ac7ed8 --- /dev/null +++ b/modules/examples/scripts/flink-scala-cli.scala @@ -0,0 +1,20 @@ +//> using dep "org.flinkextended::flink-scala-api:1.18.1_1.1.6" +//> using dep "org.apache.flink:flink-clients:1.18.1" + +import org.apache.flinkx.api.* +import org.apache.flinkx.api.serializers.* +import org.slf4j.LoggerFactory +import java.io.File + +@main def wordCountExample = + val logger = LoggerFactory.getLogger(this.getClass()) + val files = File(".").listFiles ++ Option(File("/flink/lib/").listFiles) + .getOrElse(Array.empty[File]) + val elems = files.filter(_.isFile).map(_.getAbsolutePath()) + + val env = StreamExecutionEnvironment.getExecutionEnvironment + val text = env.fromElements(elems*) + + text.addSink(logger.info(_)) + + env.execute("wordCount") diff --git a/modules/examples/scripts/gen-csv-file.sc b/modules/examples/scripts/gen-csv-file.sc new file mode 100644 index 0000000..a8e6f02 --- /dev/null +++ b/modules/examples/scripts/gen-csv-file.sc @@ -0,0 +1,49 @@ +//> using dep "org.flinkextended::flink-scala-api:1.18.1_1.1.6" +//> using dep "org.apache.flink:flink-clients:1.18.1" +//> using dep "org.apache.flink:flink-csv:1.18.1" +//> using dep "org.apache.flink:flink-connector-files:1.18.1" +//> using dep "org.apache.flink:flink-table-runtime:1.18.1" +//> using dep "org.apache.flink:flink-table-planner-loader:1.18.1" + +import org.apache.flink.table.api._ +import org.apache.flink.connector.datagen.table.DataGenConnectorOptions +import org.apache.flinkx.api._ +import org.apache.flinkx.api.serializers._ + +import java.lang.{Long => JLong} + +val env = StreamExecutionEnvironment.getExecutionEnvironment +val settings = EnvironmentSettings.newInstance.inStreamingMode.build +val table = TableEnvironment.create(settings) +val schema = Schema.newBuilder + .column("id", DataTypes.INT()) + .column("bid_price", DataTypes.DOUBLE()) + .column("order_time", DataTypes.TIMESTAMP(2)) + .build + +table.createTemporaryTable( + "SourceTable", + TableDescriptor + .forConnector("datagen") + .schema(schema) + .option(DataGenConnectorOptions.NUMBER_OF_ROWS, JLong(1000)) + .option("fields.id.kind", "sequence") + .option("fields.id.start", "1") + .option("fields.id.end", "10000") + .build +) + +val currentDirectory = java.io.File(".").getCanonicalPath + +table.createTemporaryTable( + "SinkTable", + TableDescriptor + .forConnector("filesystem") + .schema(schema) + .option("format", "csv") + .option("sink.rolling-policy.file-size", "124 kb") + .option("path", s"file://$currentDirectory/sink-table") + .build +) + +table.executeSql("insert into SinkTable select * from SourceTable").print diff --git a/modules/examples/scripts/gen-kafka-data.sc b/modules/examples/scripts/gen-kafka-data.sc new file mode 100644 index 0000000..f82cd62 --- /dev/null +++ b/modules/examples/scripts/gen-kafka-data.sc @@ -0,0 +1,51 @@ +//> using dep "org.flinkextended::flink-scala-api:1.18.1_1.1.6" +//> using dep "org.apache.flink:flink-clients:1.18.1" +//> using dep "org.apache.flink:flink-csv:1.18.1" +//> using dep "org.apache.flink:flink-connector-files:1.18.1" +//> using dep "org.apache.flink:flink-connector-kafka:3.0.2-1.18" +//> using dep "org.apache.flink:flink-table-runtime:1.18.1" +//> using dep "org.apache.flink:flink-table-planner-loader:1.18.1" + +import org.apache.flink.table.api._ +import org.apache.flink.connector.datagen.table.DataGenConnectorOptions +import org.apache.flinkx.api._ +import org.apache.flinkx.api.serializers._ + +import java.lang.{Long => JLong} + +val env = StreamExecutionEnvironment.getExecutionEnvironment +val settings = EnvironmentSettings.newInstance.inStreamingMode.build +val table = TableEnvironment.create(settings) +val schema = Schema.newBuilder + .column("id", DataTypes.INT()) + .column("bid_price", DataTypes.DOUBLE()) + .column("order_time", DataTypes.TIMESTAMP(2)) + .build + +table.createTemporaryTable( + "SourceTable", + TableDescriptor + .forConnector("datagen") + .schema(schema) + .option(DataGenConnectorOptions.NUMBER_OF_ROWS, JLong(1000)) + .option("fields.id.kind", "sequence") + .option("fields.id.start", "10001") + .option("fields.id.end", "20000") + .build +) + +val brokers = "confluentkafka-cp-kafka:9092" + +table.createTemporaryTable( + "SinkTable", + TableDescriptor + .forConnector("kafka") + .schema(schema) + .option("properties.bootstrap.servers", brokers) + .option("topic", "bids") + .option("format", "csv") + .option("value.format", "csv") + .build +) + +table.executeSql("insert into SinkTable select * from SourceTable").print diff --git a/modules/examples/scripts/hybrid-source.sc b/modules/examples/scripts/hybrid-source.sc new file mode 100644 index 0000000..0cc360b --- /dev/null +++ b/modules/examples/scripts/hybrid-source.sc @@ -0,0 +1,49 @@ +//> using dep "org.flinkextended::flink-scala-api:1.18.1_1.1.6" +//> using dep "org.apache.flink:flink-clients:1.18.1" +//> using dep "org.apache.flink:flink-csv:1.18.1" +//> using dep "org.apache.flink:flink-connector-files:1.18.1" +//> using dep "org.apache.flink:flink-connector-kafka:3.0.2-1.18" + +import org.apache.flinkx.api.* +import org.apache.flinkx.api.serializers.* +import org.apache.flink.connector.file.src.FileSource +import org.apache.flink.connector.file.src.reader.TextLineInputFormat +import org.apache.flink.connector.file.src.impl.StreamFormatAdapter +import org.apache.flink.connector.kafka.source.KafkaSource +import org.apache.flink.connector.kafka.source.enumerator.initializer.OffsetsInitializer +import org.apache.flink.connector.base.source.hybrid.HybridSource +import org.apache.flink.api.common.serialization.SimpleStringSchema +import org.apache.flink.api.common.eventtime.WatermarkStrategy +import org.apache.flink.core.fs.Path + +val currentDirectory = java.io.File(".").getCanonicalPath + +val fileSource = FileSource + .forBulkFileFormat( + StreamFormatAdapter(TextLineInputFormat()), + Path(s"$currentDirectory/sink-table") + ) + .build + +val switchTimestamp = -1L +val brokers = "confluentkafka-cp-kafka:9092" + +val kafkaSource = KafkaSource + .builder[String]() + .setBootstrapServers(brokers) + .setTopics("bids") + .setStartingOffsets(OffsetsInitializer.timestamp(switchTimestamp + 1)) + .setValueOnlyDeserializer(SimpleStringSchema()) + .build + +val hybridSource = HybridSource + .builder(fileSource) + .addSource(kafkaSource) + .build + +val env = StreamExecutionEnvironment.getExecutionEnvironment +env + .fromSource(hybridSource, WatermarkStrategy.noWatermarks(), "combined") + .print() + +env.execute() diff --git a/src/test/resources/logback.xml b/modules/examples/scripts/logback.xml similarity index 100% rename from src/test/resources/logback.xml rename to modules/examples/scripts/logback.xml diff --git a/modules/examples/src/main/protobuf/simple.proto b/modules/examples/src/main/protobuf/simple.proto new file mode 100644 index 0000000..87c1753 --- /dev/null +++ b/modules/examples/src/main/protobuf/simple.proto @@ -0,0 +1,12 @@ +syntax = "proto2"; +package com.example; +option java_package = "com.example"; +option java_multiple_files = true; + +message SimpleTest { + optional int64 uid = 1; + optional string name = 2; + optional int32 category_type = 3; + optional bytes content = 4; + optional double price = 5; +} \ No newline at end of file diff --git a/modules/examples/src/main/resources/logback.xml b/modules/examples/src/main/resources/logback.xml new file mode 100755 index 0000000..70e5059 --- /dev/null +++ b/modules/examples/src/main/resources/logback.xml @@ -0,0 +1,24 @@ + + + + true + + + + + %date{yyyy-MM-dd HH:mm:ss.SSSZ, UTC} %-16level %-43thread %-24logger{24} %message%n%xException + + + + + + + + + + + + + + + diff --git a/modules/examples/src/main/scala/org/example/Job.scala b/modules/examples/src/main/scala/org/example/Job.scala new file mode 100644 index 0000000..e9d5b8e --- /dev/null +++ b/modules/examples/src/main/scala/org/example/Job.scala @@ -0,0 +1,29 @@ +package org.example + +import org.apache.flinkx.api.* +import org.apache.flinkx.api.serializers.* + +class JobFailed(cause: Exception) extends Exception(cause) + +@main def job = + try { + val env = StreamExecutionEnvironment.getExecutionEnvironment + env + .fromElements(1, 2, 3, 4, 5, 6) + .filter(_ % 2 == 1) + .map(i => i * i) + .print() + throw new RuntimeException("boom") + try env.execute() + catch case e: Exception => throw JobFailed(e) + } catch + case e: JobFailed => + throw e.getCause + case e: Throwable => + e.printStackTrace() + // failure in main method, not in the Flink job + val env = StreamExecutionEnvironment.getExecutionEnvironment + env + .fromElements("printing stacktrace") + .print() + env.execute() diff --git a/modules/examples/src/main/scala/org/example/SocketTextStreamWordCount.scala b/modules/examples/src/main/scala/org/example/SocketTextStreamWordCount.scala new file mode 100644 index 0000000..51acc00 --- /dev/null +++ b/modules/examples/src/main/scala/org/example/SocketTextStreamWordCount.scala @@ -0,0 +1,67 @@ +package org.example + +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +import org.apache.flinkx.api._ +import org.apache.flinkx.api.serializers._ +import org.apache.flink.configuration.Configuration +import org.apache.flink.configuration.ConfigConstants +import org.apache.flink.configuration.RestOptions.BIND_PORT +import scala.jdk.CollectionConverters.* + +/** This example shows an implementation of WordCount with data from a text + * socket. To run the example make sure that the service providing the text + * data is already up and running. + * + * To start an example socket text stream on your local machine run netcat from + * a command line, where the parameter specifies the port number: + * + * {{{ + * nc -lk 9999 + * }}} + * + * Usage: + * {{{ + * SocketTextStreamWordCount + * }}} + * + * This example shows how to: + * + * - use StreamExecutionEnvironment.socketTextStream + * - write a simple Flink Streaming program in scala. + * - write and use user-defined functions. + */ +@main def SocketTextStreamWordCount(hostName: String, port: Int) = + val config = Configuration.fromMap( + Map( +// ConfigConstants.LOCAL_START_WEBSERVER -> "true", + BIND_PORT.key -> "8080" + ).asJava + ) + val flink = StreamExecutionEnvironment.createLocalEnvironmentWithWebUI(config) + + flink + .socketTextStream(hostName, port) + .flatMap(_.toLowerCase.split("\\W+").filter(_.nonEmpty)) + .map((_, 1)) + .keyBy(_._1) + .sum(1) + .print() + + flink.execute("Scala SocketTextStreamWordCount Example") diff --git a/modules/examples/src/main/scala/org/example/TransactonIOs.scala b/modules/examples/src/main/scala/org/example/TransactonIOs.scala new file mode 100644 index 0000000..b867c66 --- /dev/null +++ b/modules/examples/src/main/scala/org/example/TransactonIOs.scala @@ -0,0 +1,104 @@ +package org.example + +import org.apache.flink.streaming.api.functions.source.FromIteratorFunction +import org.apache.flink.streaming.api.functions.sink.SinkFunction +import org.apache.flink.streaming.api.functions.sink.SinkFunction.Context + +import org.slf4j.LoggerFactory + +import scala.concurrent.duration.* +import scala.jdk.CollectionConverters.* + +import java.sql.Timestamp + +case class Transaction( + accountId: Long, + timestamp: Long, + amount: Double, + loc: String = "" +) + +object TransactionsSource: + private val data = + Array( + Transaction(1, 0L, 188.23), + Transaction(2, 0L, 374.79), + Transaction(3, 0L, 112.15), + Transaction(4, 0L, 478.75), + Transaction(5, 0L, 208.85), + Transaction(1, 0L, 379.64), + Transaction(2, 0L, 351.44), + Transaction(3, 0L, 320.75), + Transaction(4, 0L, 259.42), + Transaction(5, 0L, 273.44), + Transaction(1, 0L, 267.25), + Transaction(2, 0L, 397.15), + Transaction(3, 0L, 0.219), + Transaction(4, 0L, 231.94), + Transaction(5, 0L, 384.73), + Transaction(1, 0L, 419.62), + Transaction(2, 0L, 412.91), + Transaction(3, 0L, 0.77), + Transaction(4, 0L, 22.10), + Transaction(5, 0L, 377.54), + Transaction(1, 0L, 375.44), + Transaction(2, 0L, 230.18), + Transaction(3, 0L, 0.80), + Transaction(4, 0L, 350.89), + Transaction(5, 0L, 127.55), + Transaction(1, 0L, 483.91), + Transaction(2, 0L, 228.22), + Transaction(3, 0L, 871.15), + Transaction(4, 0L, 64.19), + Transaction(5, 0L, 79.43), + Transaction(1, 0L, 56.12), + Transaction(2, 0L, 256.48), + Transaction(3, 0L, 148.16), + Transaction(4, 0L, 199.95), + Transaction(5, 0L, 252.37), + Transaction(1, 0L, 274.73), + Transaction(2, 0L, 473.54), + Transaction(3, 0L, 119.92), + Transaction(4, 0L, 323.59), + Transaction(5, 0L, 353.16), + Transaction(1, 0L, 211.90), + Transaction(2, 0L, 280.93), + Transaction(3, 0L, 347.89), + Transaction(4, 0L, 459.86), + Transaction(5, 0L, 82.31), + Transaction(1, 0L, 373.26), + Transaction(2, 0L, 479.83), + Transaction(3, 0L, 454.25), + Transaction(4, 0L, 83.64), + Transaction(5, 0L, 292.44) + ) + + def iterator(sleepBeforeEmit: Long = 100): FromIteratorFunction[Transaction] = + FromIteratorFunction[Transaction]( + (new Iterator[Transaction] with Serializable: + var rows = data.iterator + var timestamp = Timestamp.valueOf("2019-01-01 00:00:00").getTime + val sixMinutes = 6.minutes.toMillis + + override def hasNext: Boolean = rows.hasNext + + override def next(): Transaction = + Thread.sleep(100) + val next = rows.next + + // Going to the first element again + if !hasNext then rows = data.iterator + // Moving timestamp further + timestamp += sixMinutes + + next.copy(timestamp = timestamp) + ).asJava + ) + +case class Alert(id: Long) + +class AlertSink extends SinkFunction[Alert]: + private val logger = LoggerFactory.getLogger(classOf[AlertSink]) + + override def invoke(value: Alert, context: Context): Unit = + logger.info(value.toString) diff --git a/modules/examples/src/main/scala/org/example/connectedStreams.scala b/modules/examples/src/main/scala/org/example/connectedStreams.scala new file mode 100644 index 0000000..0b8847a --- /dev/null +++ b/modules/examples/src/main/scala/org/example/connectedStreams.scala @@ -0,0 +1,58 @@ +package org.example + +import org.apache.flink.streaming.api.functions.co.RichCoFlatMapFunction +import org.apache.flink.util.Collector +import org.apache.flink.api.common.typeinfo.TypeInformation +import org.apache.flink.api.common.state.{ValueState, ValueStateDescriptor} +import org.apache.flinkx.api.* +import org.apache.flinkx.api.serializers.* + +@main def ConnectedStreams = + val env = StreamExecutionEnvironment.getExecutionEnvironment + + given tranTypeInfo: TypeInformation[Transaction] = deriveTypeInformation + + val control = env + .addSource(TransactionsSource.iterator()) + .keyBy(_.accountId) + + val streamOfWords = env + .addSource(TransactionsSource.iterator()) + .keyBy(_.accountId) + + control + .connect(streamOfWords) + .flatMap(ControlFunction()) + .print() + + env.execute() + +class ControlFunction + extends RichCoFlatMapFunction[Transaction, Transaction, Transaction]: + + @transient lazy val state: ValueState[Double] = getRuntimeContext.getState( + new ValueStateDescriptor( + "joined-transaction", + classOf[Double] + ) + ) + + override def flatMap1( + t: Transaction, + out: Collector[Transaction] + ): Unit = + sumUp(t, out) + + override def flatMap2( + t: Transaction, + out: Collector[Transaction] + ): Unit = + sumUp(t, out) + + private def sumUp(t: Transaction, out: Collector[Transaction]): Unit = + Option(state.value()) match + case Some(v) => + out.collect(t.copy(amount = t.amount + v)) + state.clear() + case None => + state.update(t.amount) diff --git a/modules/examples/src/main/scala/org/example/fileFilter.scala b/modules/examples/src/main/scala/org/example/fileFilter.scala new file mode 100644 index 0000000..483b69c --- /dev/null +++ b/modules/examples/src/main/scala/org/example/fileFilter.scala @@ -0,0 +1,56 @@ +package org.example + +import org.apache.flinkx.api.* +import org.apache.flinkx.api.serializers.* + +import org.apache.flink.connector.file.src.FileSource +import org.apache.flink.connector.file.src.reader.TextLineInputFormat +import org.apache.flink.connector.file.src.enumerate.NonSplittingRecursiveEnumerator +import org.apache.flink.core.fs.Path +import org.apache.flink.api.common.eventtime.WatermarkStrategy +import org.apache.flink.configuration.Configuration + +import java.io.File +import java.time.Duration +import java.util.function.Predicate + +class MyDefaultFileFilter extends Predicate[Path]: + override def test(path: Path): Boolean = + print(s"S3 FILE PATH: $path") + + val fileName = path.getName + println(s", name: $fileName") + + fileName.headOption match + case Some(first) => + first != '.' && first != '_' && !fileName.startsWith("GRP") + case None => false + +@main def filterFiles = + val currentDirectory = File(".").getCanonicalPath + val inputBasePath = Path(s"$currentDirectory/input-table") + val fileSourceBuilder = + FileSource.forRecordStreamFormat( + TextLineInputFormat(), + inputBasePath + ) + + val fileSource = fileSourceBuilder + .monitorContinuously(Duration.ofSeconds(2)) + .setFileEnumerator(() => + NonSplittingRecursiveEnumerator(MyDefaultFileFilter()) + ) + .build() + val env = + StreamExecutionEnvironment.createLocalEnvironmentWithWebUI(Configuration()) + + env + .fromSource(fileSource, WatermarkStrategy.noWatermarks(), "csvs") + .map((_, 1)) + .keyBy(_ => "count") + .sum(1) + .map { case (w, c) => + println(s"count: $c") + } + + env.execute("Filter Files") diff --git a/modules/examples/src/main/scala/org/example/fraud/FraudDetectionJob.scala b/modules/examples/src/main/scala/org/example/fraud/FraudDetectionJob.scala new file mode 100644 index 0000000..bbb72df --- /dev/null +++ b/modules/examples/src/main/scala/org/example/fraud/FraudDetectionJob.scala @@ -0,0 +1,142 @@ +package org.example.fraud + +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + + +import java.io.File + +import org.apache.flinkx.api.* +import org.apache.flinkx.api.serializers.* + +import org.apache.flink.api.common.typeinfo.TypeInformation +import org.apache.flink.api.common.functions.AggregateFunction +import org.apache.flink.streaming.api.windowing.assigners.TumblingProcessingTimeWindows +import org.apache.flink.streaming.api.windowing.time.Time +import org.apache.flink.streaming.api.functions.source.SourceFunction.SourceContext +import org.apache.flink.streaming.api.functions.source.FromIteratorFunction +import org.apache.flink.configuration.Configuration +import org.apache.flink.runtime.state.hashmap.HashMapStateBackend +import org.apache.flink.state.api.SavepointReader + +import org.example.Transaction +import org.example.TransactionsSource +import org.example.Alert +import org.example.AlertSink + +import Givens.given + +@main def runningAvg(sleepBeforeEmit: Long) = + val env = StreamExecutionEnvironment.getExecutionEnvironment + + val transactions = env + .addSource(TransactionsSource.iterator(sleepBeforeEmit)) + .name("transactions") + + transactions + .flatMap(t => if t.amount < 1.0d then List(t, t) else List(t)) + .keyBy(_.accountId) + .map(RunningAverage()) + .keyBy(_ => "all") + .reduce { (a, b) => + val runningAvg = (a._2 + b._2) / 2 + println(s"average ${Thread.currentThread.getName}: $runningAvg") + b._1 -> runningAvg + } + .name("fraud-detector") + + env.execute("Fraud Detection") + +@main def FraudDetectionJob(sleepBeforeEmit: Long) = + val conf = Configuration() + conf.setString("state.savepoints.dir", "file:///tmp/savepoints") + conf.setString( + "execution.checkpointing.externalized-checkpoint-retention", + "RETAIN_ON_CANCELLATION" + ) + conf.setString("execution.checkpointing.interval", "3s") + conf.setString("execution.checkpointing.min-pause", "3s") + conf.setString("state.backend", "filesystem") + + val env = StreamExecutionEnvironment.getExecutionEnvironment //.createLocalEnvironmentWithWebUI(conf) + + val transactions = env + .addSource(TransactionsSource.iterator(sleepBeforeEmit)) + .name("transactions") + .union() + + val alerts = transactions + .keyBy(_.accountId) + .process(FraudDetector()) + .uid("fraud-state") + .name("fraud-detector") + + alerts + .addSink(AlertSink()) + .name("send-alerts") + + env.execute("Fraud Detection") + +@main def fraudDetectionState() = + val env = StreamExecutionEnvironment.getExecutionEnvironment + val savepoint = SavepointReader.read(env.getJavaEnv, "///tmp/savepoints/savepoint-827976-a94a8feb6c07", + HashMapStateBackend()) + val keyedState = savepoint.readKeyedState("fraud-state", ReaderFunction(), TypeInformation.of(classOf[Long]), keyedStateInfo) + keyedState.print() + env.execute() + +case class MaxTransaction(amount: Double, timestamp: Long) + +class MaxAggregate + extends AggregateFunction[Transaction, MaxTransaction, MaxTransaction]: + override def createAccumulator(): MaxTransaction = MaxTransaction(0d, 0L) + + override def add(value: Transaction, accumulator: MaxTransaction): MaxTransaction = + if value.amount > accumulator._1 then + MaxTransaction(value.amount, value.timestamp) + else accumulator + + override def getResult(accumulator: MaxTransaction): MaxTransaction = + accumulator + + override def merge(a: MaxTransaction, b: MaxTransaction): MaxTransaction = + if a._1 >= b._1 then a else b + +@main def maxAmount = + val env = + StreamExecutionEnvironment.getExecutionEnvironment.enableCheckpointing( + 10_000L + ) + + env.getCheckpointConfig.setCheckpointStorage( + s"file://${File(".").getAbsolutePath}/max-amount-checkpoint" + ) + + val transactions = env + .addSource(TransactionsSource.iterator(100)) + .name("transactions") + + transactions + .keyBy(_.accountId) + .window(TumblingProcessingTimeWindows.of(Time.seconds(5))) + .reduce((a, b) => if a.amount < b.amount then b else a) + // .aggregate(MaxAggregate()) + .name("windowed-max") + .print() + + env.execute("Max Amount Transaction") diff --git a/modules/examples/src/main/scala/org/example/fraud/FraudDetector.scala b/modules/examples/src/main/scala/org/example/fraud/FraudDetector.scala new file mode 100644 index 0000000..6876deb --- /dev/null +++ b/modules/examples/src/main/scala/org/example/fraud/FraudDetector.scala @@ -0,0 +1,148 @@ +package org.example.fraud + +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +import org.apache.flinkx.api.* +import org.apache.flinkx.api.serializers.* + +import org.apache.flink.api.common.state.{ValueState, ValueStateDescriptor} +import org.apache.flink.api.common.functions.RuntimeContext +import org.apache.flink.api.common.typeinfo.TypeInformation +import org.apache.flink.configuration.Configuration +import org.apache.flink.streaming.api.functions.KeyedProcessFunction +import org.apache.flink.state.api.functions.KeyedStateReaderFunction +import org.apache.flink.state.api.functions.KeyedStateReaderFunction.Context +import org.apache.flink.util.Collector + +import org.example.{Alert, Transaction} + +import org.slf4j.LoggerFactory +import scala.concurrent.duration.* + +import FraudDetector.* + +object Givens: + given tranTypeInfo: TypeInformation[Transaction] = + deriveTypeInformation[Transaction] + given alertTypeInfo: TypeInformation[Alert] = + TypeInformation.of(classOf[Alert]) + given keyedStateInfo: TypeInformation[KeyedFraudState] = + TypeInformation.of(classOf[KeyedFraudState]) + +case class FraudStateVars( + flagState: ValueState[Boolean], + timerState: ValueState[Long], + lastTransaction: ValueState[Transaction] +): + def clear(): Unit = + flagState.clear() + timerState.clear() + +object FraudDetector: + val SmallAmount = 1.00 + val LargeAmount = 500.00 + val OneMinute: Long = 1.minute.toMillis + + def readState(context: RuntimeContext): FraudStateVars = + FraudStateVars( + context.getState( + ValueStateDescriptor("flag", boolInfo) + ), + context.getState( + ValueStateDescriptor("timer-state", longInfo) + ), + context.getState( + ValueStateDescriptor("last-transaction", Givens.tranTypeInfo) + ) + ) + +case class KeyedFraudState(key: Long, state: FraudStateVars) + +class ReaderFunction extends KeyedStateReaderFunction[Long, KeyedFraudState]: + var fraudState: FraudStateVars = _ + + override def open(parameters: Configuration): Unit = + fraudState = readState(getRuntimeContext) + + override def readKey( + key: Long, + ctx: Context, + out: Collector[KeyedFraudState] + ): Unit = + out.collect(KeyedFraudState(key, fraudState)) + +@SerialVersionUID(1L) +class FraudDetector extends KeyedProcessFunction[Long, Transaction, Alert]: + @transient lazy val logger = LoggerFactory.getLogger(classOf[FraudDetector]) + + @transient var fraudState: FraudStateVars = _ + + override def open(parameters: Configuration): Unit = + fraudState = readState(getRuntimeContext) + logger.info(s"Loaded last transaction: ${fraudState.lastTransaction}") + + @throws[Exception] + def processElement( + transaction: Transaction, + context: KeyedProcessFunction[Long, Transaction, Alert]#Context, + collector: Collector[Alert] + ): Unit = + // Get the current state for the current key + Option(fraudState.flagState.value).foreach { _ => + if transaction.amount > FraudDetector.LargeAmount then + // Output an alert downstream + val alert = Alert(transaction.accountId) + collector.collect(alert) + logger.info(s"Fraudulent transaction: $transaction") + + // Clean up our state + cleanUp(context) + } + + if transaction.amount < FraudDetector.SmallAmount then + // set the flag to true + fraudState.flagState.update(true) + + // set the timer and timer state + val timer = + context.timerService.currentProcessingTime + FraudDetector.OneMinute + context.timerService.registerProcessingTimeTimer(timer) + fraudState.timerState.update(timer) + logger.info(s"small amount: ${transaction.amount}") + + fraudState.lastTransaction.update(transaction) + + override def onTimer( + timestamp: Long, + ctx: KeyedProcessFunction[Long, Transaction, Alert]#OnTimerContext, + out: Collector[Alert] + ): Unit = + // remove flag after 1 minute, assuming that attacker makes fraudulent transactions within a minute + fraudState.clear() + + @throws[Exception] + private def cleanUp( + ctx: KeyedProcessFunction[Long, Transaction, Alert]#Context + ): Unit = + // delete timer + val timer = fraudState.timerState.value + ctx.timerService.deleteProcessingTimeTimer(timer) + + // clean up all states + fraudState.clear() diff --git a/modules/examples/src/main/scala/org/example/fraud/RunningAverage.scala b/modules/examples/src/main/scala/org/example/fraud/RunningAverage.scala new file mode 100644 index 0000000..189ac1c --- /dev/null +++ b/modules/examples/src/main/scala/org/example/fraud/RunningAverage.scala @@ -0,0 +1,43 @@ +package org.example.fraud + +import org.apache.flinkx.api._ +import org.apache.flinkx.api.serializers._ + +import org.apache.flink.api.common.functions.RichMapFunction +import org.apache.flink.api.common.state.ValueStateDescriptor +import org.apache.flink.api.common.typeinfo.TypeInformation +import org.apache.flink.configuration.Configuration +import org.example.Transaction + +class RunningAverage + extends RichMapFunction[Transaction, (Transaction, Double)]: + + given tranTypeInfo: TypeInformation[Transaction] = + TypeInformation.of(classOf[Transaction]) + + @transient lazy val runningAvg = getRuntimeContext.getState( + ValueStateDescriptor( + "running-average", + classOf[Double], + 0d + ) + ) + + @transient lazy val count = getRuntimeContext.getState( + ValueStateDescriptor("count", classOf[Int], 0) + ) + + private def threadName = Thread.currentThread.getName + override def open(config: Configuration): Unit = + println(s"open map: $threadName") + + override def map(t: Transaction): (Transaction, Double) = + Option(count.value) match + case Some(cnt) => count.update(cnt + 1) + case _ => () + + Option(runningAvg.value) match + case Some(avg) => runningAvg.update((avg + t.amount) / count.value) + case _ => () + + (t, runningAvg.value) diff --git a/modules/examples/src/main/scala/org/example/runningSum.scala b/modules/examples/src/main/scala/org/example/runningSum.scala new file mode 100644 index 0000000..9f0f359 --- /dev/null +++ b/modules/examples/src/main/scala/org/example/runningSum.scala @@ -0,0 +1,215 @@ +package org.example + +import org.apache.flinkx.api.* +import org.apache.flinkx.api.serializers.* + +import org.apache.flink.streaming.api.windowing.triggers.Trigger +import org.apache.flink.streaming.api.windowing.triggers.Trigger.TriggerContext +import org.apache.flink.streaming.api.windowing.triggers.TriggerResult +import org.apache.flink.streaming.api.windowing.triggers.Trigger.OnMergeContext +import org.apache.flink.streaming.api.windowing.triggers.EventTimeTrigger +import org.apache.flink.streaming.api.windowing.windows.TimeWindow +import org.apache.flink.streaming.api.windowing.time.Time +import org.apache.flink.streaming.api.windowing.assigners.SlidingEventTimeWindows +import org.apache.flink.streaming.api.functions.KeyedProcessFunction +import org.apache.flink.util.Collector +import org.apache.flink.configuration.Configuration +import org.apache.flink.api.common.eventtime.WatermarkStrategy +import org.apache.flink.api.common.eventtime.SerializableTimestampAssigner +import org.apache.flink.api.common.state.ValueStateDescriptor +import org.apache.flink.api.common.state.ValueState +import org.apache.flink.api.common.state.MapStateDescriptor +import org.apache.flink.api.common.state.MapState + +import scala.jdk.CollectionConverters.* + +import java.util.concurrent.TimeUnit +import java.time.Duration + +final case class TestEvent( + key: Long, + timestamp: Long, + runningCount: Long, + windowStart: Long = -1, // no window assigned yet, + bag: List[Int] = Nil +) + +class CustomEventTimeTrigger[T, W <: TimeWindow](trigger: EventTimeTrigger) + extends Trigger[T, W]: + + override def onElement( + element: T, + timestamp: Long, + window: W, + ctx: TriggerContext + ): TriggerResult = + val result = trigger.onElement(element, timestamp, window, ctx) + if result.isPurge then TriggerResult.FIRE_AND_PURGE + else TriggerResult.FIRE + + override def onProcessingTime( + time: Long, + window: W, + ctx: TriggerContext + ): TriggerResult = trigger.onProcessingTime(time, window, ctx) + + override def onEventTime( + time: Long, + window: W, + ctx: TriggerContext + ): TriggerResult = trigger.onEventTime(time, window, ctx); + + override def clear(window: W, ctx: TriggerContext): Unit = + trigger.clear(window, ctx) + + override def canMerge: Boolean = trigger.canMerge + + override def onMerge(window: W, ctx: OnMergeContext): Unit = + trigger.onMerge(window, ctx) + + override def toString: String = + s"CustomEventTimeTrigger(${trigger.toString})" + +def windowActionJ( + key: Long, + window: TimeWindow, + input: _root_.java.lang.Iterable[TestEvent], + out: Collector[TestEvent] +): Unit = windowAction(key, window, input.asScala, out) + +def windowAction( + key: Long, + window: TimeWindow, + input: Iterable[TestEvent], + out: Collector[TestEvent] +): Unit = + val reduced = input.reduce(reduceEvents) + val output = + reduced.copy( + windowStart = window.getStart, + runningCount = + if reduced.runningCount > 0 then reduced.runningCount else 1 + ) + println( + s"\n{start: ${window.getStart} .. end: ${window.getEnd}, count: ${output.runningCount} \ninput: " + ) + println(input.mkString(" ", "\n", "")) + + out.collect(output) + println(s"output: $output }") + +def reduceEvents(a: TestEvent, b: TestEvent) = + val (latest, prev) = + if b.timestamp > a.timestamp then (b, a) + else (a, b) + val prevCount = if prev.runningCount == 0 then 1 else prev.runningCount + latest.copy(runningCount = prevCount + 1) + +/* +'runningWindowedSum' is using window function and custom trigger to emit running count on every element + */ +@main def runningWindowedSum = + val env = StreamExecutionEnvironment.getExecutionEnvironment + + val windowSize = Time.of(10, TimeUnit.SECONDS) + val windowSlide = Time.of(2, TimeUnit.SECONDS) + val watermarkStrategy = WatermarkStrategy + .forBoundedOutOfOrderness[TestEvent](Duration.ofSeconds(1000)) + .withTimestampAssigner((event: TestEvent, streamRecordTimestamp: Long) => + event.timestamp + ) + + env + .fromElements( + TestEvent(2L, 6000, 0), + TestEvent(2L, 5000, 0), + TestEvent(2L, 12000, 0) + ) + .assignTimestampsAndWatermarks(watermarkStrategy) + .keyBy(_.key) + .window(SlidingEventTimeWindows.of(windowSize, windowSlide)) + .trigger(CustomEventTimeTrigger(EventTimeTrigger.create())) + .reduce(reduceEvents, windowAction) + + env.execute() + +class RunningCountFunc(windowSize: Duration) + extends KeyedProcessFunction[Long, TestEvent, TestEvent]: + + val oldEntriesCleanupInterval = 1000L + var minTimestamp: ValueState[Long] = _ + var timeToCount: MapState[Long, Long] = _ + override def open(parameters: Configuration): Unit = + timeToCount = getRuntimeContext.getMapState( + MapStateDescriptor("timestamp2count", classOf[Long], classOf[Long]) + ) + + minTimestamp = getRuntimeContext.getState( + new ValueStateDescriptor( + "min-timestamp", + classOf[Long], + Long.MinValue + ) + ) + + override def processElement( + event: TestEvent, + ctx: KeyedProcessFunction[Long, TestEvent, TestEvent]#Context, + out: Collector[TestEvent] + ): Unit = + val currentCount = + if timeToCount.contains(event.timestamp) then + timeToCount.get(event.timestamp) + else 0 + timeToCount.put(event.timestamp, currentCount + 1) + + val windowStart = event.timestamp - windowSize.getSeconds * 1000 + val windowCount = + timeToCount.entries().asScala.foldLeft(0L) { (acc, entry) => + if (windowStart < entry.getKey) && (entry.getKey <= event.timestamp) + then acc + entry.getValue + else acc + } + + minTimestamp.update( + windowStart // TODO: substract some out of orderness threshold + ) + out.collect( + event.copy(runningCount = windowCount, windowStart = windowStart) + ) + + ctx.timerService.registerProcessingTimeTimer( + System.currentTimeMillis + oldEntriesCleanupInterval + ) + + override def onTimer( + timestamp: Long, + ctx: KeyedProcessFunction[Long, TestEvent, TestEvent]#OnTimerContext, + out: Collector[TestEvent] + ): Unit = + println(s"Clean up for ${minTimestamp.value()}") + + val oldEntries = timeToCount + .entries() + .asScala + .collect { + case entry if entry.getKey < minTimestamp.value() => entry.getKey + } + oldEntries.foreach(timeToCount.remove) + +/* +'runningSum' example is using a ProcessFunction with state variables to report running count within a time window specfied + */ +@main def runningSum = + val env = StreamExecutionEnvironment.getExecutionEnvironment + env + .fromElements( + TestEvent(2L, 6000, 0), + TestEvent(2L, 5000, 0), + TestEvent(2L, 15000, 0) + ) + .keyBy(_.key) + .process(RunningCountFunc(Duration.ofSeconds(10))) + .print() + + env.execute() diff --git a/modules/examples/src/main/scala/org/example/troubleshooting/fakeKafkaSource.scala b/modules/examples/src/main/scala/org/example/troubleshooting/fakeKafkaSource.scala new file mode 100644 index 0000000..8da0eb8 --- /dev/null +++ b/modules/examples/src/main/scala/org/example/troubleshooting/fakeKafkaSource.scala @@ -0,0 +1,93 @@ +package org.example + +import org.apache.flink.streaming.api.functions.source.RichParallelSourceFunction +import org.apache.flink.streaming.api.functions.source.SourceFunction.SourceContext +import org.apache.flink.configuration.Configuration + +import scala.util.Random +import scala.util.Try +import scala.util.Failure +import scala.util.Success + +import java.util.Arrays + +case class FakeKafkaRecord( + timestamp: Long, + key: Array[Byte], + value: Array[Byte], + partition: Int +) + +object FakeKafkaSource: + val NO_OF_PARTITIONS = 8 + +class FakeKafkaSource( + seed: Int, + idlePartitions: Set[Int], + serializedMeasurements: Array[Array[Byte]], + poisonPillRate: Double +) extends RichParallelSourceFunction[FakeKafkaRecord]: + + lazy val indexOfThisSubtask = getRuntimeContext.getIndexOfThisSubtask + + lazy val numberOfParallelSubtasks = + getRuntimeContext.getNumberOfParallelSubtasks + + lazy val assignedPartitions = + (0 to FakeKafkaSource.NO_OF_PARTITIONS).filter( + _ % numberOfParallelSubtasks == indexOfThisSubtask + ) + + val rand = Random(seed) + + @transient @volatile var cancelled = false + + override def open(parameters: Configuration): Unit = + println(s"Now reading from partitions: $assignedPartitions") + + override def run(ctx: SourceContext[FakeKafkaRecord]): Unit = + if assignedPartitions.nonEmpty then + while !cancelled do { + val nextPartition = assignedPartitions( + rand.nextInt(assignedPartitions.length) + ) + + if idlePartitions.contains(nextPartition) then + // noinspection BusyWait + Thread.sleep(1000) // avoid spinning wait + else + val nextTimestamp = getTimestampForPartition(nextPartition) + + var serializedMeasurement = + serializedMeasurements(rand.nextInt(serializedMeasurements.length)) + + if rand.nextFloat() > 1 - poisonPillRate then + serializedMeasurement = Arrays.copyOf(serializedMeasurement, 10) + + (ctx.getCheckpointLock()).synchronized { + ctx.collect( + FakeKafkaRecord( + nextTimestamp, + Array.empty, + serializedMeasurement, + nextPartition + ) + ) + } + } + else + ctx.markAsTemporarilyIdle() + val waitLock = Object() + + while !cancelled do { + Try(waitLock.synchronized(waitLock.wait())) match + case Failure(e: InterruptedException) if cancelled => + Thread.currentThread().interrupt() + case _ => () + } + + private def getTimestampForPartition(partition: Int) = + System.currentTimeMillis() - (partition * 50L) + + override def cancel(): Unit = + cancelled = true diff --git a/modules/examples/src/main/scala/org/example/troubleshooting/measurements.scala b/modules/examples/src/main/scala/org/example/troubleshooting/measurements.scala new file mode 100644 index 0000000..31a9c82 --- /dev/null +++ b/modules/examples/src/main/scala/org/example/troubleshooting/measurements.scala @@ -0,0 +1,105 @@ +package org.example + +import org.apache.flink.streaming.api.windowing.windows.TimeWindow +import org.apache.flink.runtime.metrics.DescriptiveStatisticsHistogram +import org.apache.flink.util.Collector +import org.apache.flink.configuration.Configuration +import org.apache.flinkx.api.function.ProcessWindowFunction +import org.apache.flink.api.common.functions.AggregateFunction + +class MeasurementWindowProcessFunction + extends ProcessWindowFunction[ + WindowedMeasurements, + WindowedMeasurements, + String, + TimeWindow + ]: + val EVENT_TIME_LAG_WINDOW_SIZE = 10_000 + + @transient lazy val eventTimeLag = getRuntimeContext.getMetricGroup + .histogram( + "eventTimeLag", + DescriptiveStatisticsHistogram(EVENT_TIME_LAG_WINDOW_SIZE) + ) + + override def process( + key: String, + context: Context, + elements: Iterable[WindowedMeasurements], + out: Collector[WindowedMeasurements] + ): Unit = + val aggregate = elements.iterator.next + val window = context.window + val res = aggregate.copy( + windowStart = window.getStart, + windowEnd = window.getEnd, + location = key + ) + + eventTimeLag.update(System.currentTimeMillis - window.getEnd) + out.collect(res) + +class MeasurementWindowAggregatingPerLocation + extends AggregateFunction[ + Measurement, + WindowedMeasurements, + WindowedMeasurements + ]: + + override def add( + record: Measurement, + accumulator: WindowedMeasurements + ): WindowedMeasurements = + accumulator.addMeasurement(record) + + override def createAccumulator(): WindowedMeasurements = + WindowedMeasurements() + + override def getResult( + accumulator: WindowedMeasurements + ): WindowedMeasurements = + accumulator + + override def merge( + a: WindowedMeasurements, + b: WindowedMeasurements + ): WindowedMeasurements = + b.copy( + eventsPerWindow = a.eventsPerWindow + b.eventsPerWindow, + sumPerWindow = a.sumPerWindow + b.sumPerWindow + ) + +class MeasurementWindowAggregatingPerArea + extends AggregateFunction[ + WindowedMeasurements, + WindowedMeasurementsForArea, + WindowedMeasurementsForArea + ]: + + override def createAccumulator() = WindowedMeasurementsForArea() + + override def add( + value: WindowedMeasurements, + accumulator: WindowedMeasurementsForArea + ): WindowedMeasurementsForArea = + accumulator.copy( + sumPerWindow = accumulator.sumPerWindow + value.sumPerWindow, + eventsPerWindow = accumulator.eventsPerWindow + value.eventsPerWindow, + locations = accumulator.locations :+ value.location + ) + + override def getResult( + accumulator: WindowedMeasurementsForArea + ): WindowedMeasurementsForArea = + val l = accumulator.locations(0) + accumulator.copy(area = WindowedMeasurementsForArea.getArea(l)) + + override def merge( + a: WindowedMeasurementsForArea, + b: WindowedMeasurementsForArea + ): WindowedMeasurementsForArea = + b.copy( + eventsPerWindow = a.eventsPerWindow + b.eventsPerWindow, + sumPerWindow = a.sumPerWindow + b.sumPerWindow, + locations = a.locations ++ b.locations + ) diff --git a/modules/examples/src/main/scala/org/example/troubleshooting/troubleshootingExample.scala b/modules/examples/src/main/scala/org/example/troubleshooting/troubleshootingExample.scala new file mode 100644 index 0000000..e330212 --- /dev/null +++ b/modules/examples/src/main/scala/org/example/troubleshooting/troubleshootingExample.scala @@ -0,0 +1,191 @@ +package org.example + +import java.time.Duration + +import org.apache.flinkx.api._ +import org.apache.flinkx.api.serializers._ +import org.apache.flinkx.api.function.ProcessWindowFunction + +import org.apache.commons.lang3.RandomStringUtils + +import io.bullet.borer.{Codec, Decoder, Encoder, Cbor} + +import org.apache.flink.api.common.eventtime.WatermarkStrategy +import org.apache.flink.api.common.functions.RichFlatMapFunction +import org.apache.flink.api.common.functions.AggregateFunction +import org.apache.flink.api.common.typeinfo.TypeInformation +import org.apache.flink.configuration.Configuration +import org.apache.flink.configuration.RestOptions.BIND_PORT +import org.apache.flink.configuration.TaskManagerOptions.{ + CPU_CORES, + MANAGED_MEMORY_SIZE, + TASK_HEAP_MEMORY, + TASK_OFF_HEAP_MEMORY +} +import org.apache.flink.streaming.api.windowing.assigners.TumblingEventTimeWindows +import org.apache.flink.streaming.api.windowing.windows.TimeWindow +import org.apache.flink.streaming.api.windowing.time.Time +import org.apache.flink.streaming.api.functions.sink.DiscardingSink +import org.apache.flink.runtime.metrics.DescriptiveStatisticsHistogram +import org.apache.flink.util.Collector +import org.apache.flink.configuration.MemorySize + +import scala.util.{Success, Failure, Random, Using} +import scala.io.Source + +import Measurement.given +import WindowedMeasurements.given + +case class Measurement( + sensorId: Int, + value: Double, + location: String, + measurementInformation: String +) + +object Measurement: + given codec: Codec[Measurement] = Codec( + Encoder.forProduct[Measurement], + Decoder.forProduct[Measurement] + ) + + given measurementTypeInformation: TypeInformation[Measurement] = + TypeInformation.of(classOf[Measurement]) + +case class WindowedMeasurements( + windowStart: Long = 0, + windowEnd: Long = 0, + location: String = "", + eventsPerWindow: Long = 0, + sumPerWindow: Double = 0 +): + def addMeasurement(m: Measurement) = + copy( + sumPerWindow = sumPerWindow + m.value, + eventsPerWindow = eventsPerWindow + 1 + ) + +object WindowedMeasurements: + given wmTypeInformation: TypeInformation[WindowedMeasurements] = + TypeInformation.of(classOf[WindowedMeasurements]) + +case class WindowedMeasurementsForArea( + windowStart: Long = 0, + windowEnd: Long = 0, + area: String = "", + locations: Array[String] = Array.empty, + eventsPerWindow: Long = 0, + sumPerWindow: Double = 0 +) + +object WindowedMeasurementsForArea: + def getArea(location: String): String = + if location.length > 0 then location.substring(0, 1) else "" + +class MeasurementDeserializer + extends RichFlatMapFunction[FakeKafkaRecord, Measurement]: + + lazy val numInvalidRecords = + getRuntimeContext.getMetricGroup.counter("numInvalidRecords") + + override def flatMap( + value: FakeKafkaRecord, + out: Collector[Measurement] + ): Unit = + val m = Cbor.decode(value.value).to[Measurement].valueTry + m match + case Failure(_) => numInvalidRecords.inc + case Success(value) => out.collect(value) + +val RANDOM_SEED = 1 +val NUM_OF_MEASUREMENTS = 100_000 + +def createSerializedMeasurements: Array[Array[Byte]] = + val rand = Random(RANDOM_SEED) + val locations = + Using.resource(Source.fromResource("cities.csv"))(_.getLines().toArray) + + (0 to NUM_OF_MEASUREMENTS).map { _ => + val m = Measurement( + rand.nextInt(100), + rand.nextDouble * 100, + locations(rand.nextInt(locations.length)), + RandomStringUtils.randomAlphabetic(30) + ) + Cbor.encode(m).toByteArray + }.toArray + +@main def main = + val flinkConfig = Configuration() + flinkConfig.set(BIND_PORT, "8080") + flinkConfig.set(CPU_CORES, 4.0) + flinkConfig.set(TASK_HEAP_MEMORY, MemorySize.ofMebiBytes(1024)) + flinkConfig.set(TASK_OFF_HEAP_MEMORY, MemorySize.ofMebiBytes(256)) + flinkConfig.set(MANAGED_MEMORY_SIZE, MemorySize.ofMebiBytes(1024)) + + val env = + StreamExecutionEnvironment + // .createLocalEnvironmentWithWebUI(flinkConfig) + .getExecutionEnvironment + .setBufferTimeout(10) + + env.getConfig.setAutoWatermarkInterval(100) + env.enableCheckpointing(5000) + env.getConfig.disableForceKryo() + env.getCheckpointConfig.setMinPauseBetweenCheckpoints(4000) + + val FAILURE_RATE = 0.0001f + + val sourceStream = env + .addSource( + FakeKafkaSource( + RANDOM_SEED, + Set(0, 4), + createSerializedMeasurements, + FAILURE_RATE + ) + ) + .name("FakeKafkaSource") + .uid("FakeKafkaSource") + .assignTimestampsAndWatermarks( + WatermarkStrategy + .forBoundedOutOfOrderness[FakeKafkaRecord](Duration.ofMillis(250)) + .withTimestampAssigner((rec, _) => rec.timestamp) + .withIdleness(Duration.ofSeconds(1)) + ) + .name("Watermarks") + .uid("Watermarks") + .flatMap(MeasurementDeserializer()) + .name("Deserialization") + .uid("Deserialization") + + val aggregatedPerLocation = + sourceStream + .keyBy(_.location) + .window(TumblingEventTimeWindows.of(Time.seconds(1))) + .aggregate( + MeasurementWindowAggregatingPerLocation(), + MeasurementWindowProcessFunction() + ) + .name("WindowedAggregationPerLocation") + .uid("WindowedAggregationPerLocation") + + val aggregatedPerArea = + aggregatedPerLocation + .keyBy(m => WindowedMeasurementsForArea.getArea(m.location)) + .window(TumblingEventTimeWindows.of(Time.seconds(1))) + .aggregate(MeasurementWindowAggregatingPerArea()) + + aggregatedPerLocation + .addSink(DiscardingSink()) + .name("OutputPerLocation") + .uid("OutputPerLocation") + .disableChaining + + aggregatedPerArea + .print() + .name("OutputPerArea") + .uid("OutputPerArea") + .disableChaining + + env.execute("troubleshootingExample.scala") diff --git a/modules/examples/src/main/scala/org/example/wordCount.scala b/modules/examples/src/main/scala/org/example/wordCount.scala new file mode 100644 index 0000000..d876ee5 --- /dev/null +++ b/modules/examples/src/main/scala/org/example/wordCount.scala @@ -0,0 +1,22 @@ +package org.example + +import org.apache.flinkx.api.* +import org.apache.flinkx.api.serializers.* + +@main def wordCountExample = + val env = StreamExecutionEnvironment.getExecutionEnvironment + val text = env.fromElements( + "To be, or not to be,--that is the question:--", + "Whether 'tis nobler in the mind to suffer", + "The slings and arrows of outrageous fortune", + "Or to take arms against a sea of troubles," + ) + + text + .flatMap(_.toLowerCase.split("\\W+")) + .map((_, 1)) + .keyBy(_._1) + .sum(1) + .print() + + env.execute("wordCount") diff --git a/modules/examples/src/test/scala/org/example/ConnectedStreamsTest.scala b/modules/examples/src/test/scala/org/example/ConnectedStreamsTest.scala new file mode 100644 index 0000000..ebf5f7d --- /dev/null +++ b/modules/examples/src/test/scala/org/example/ConnectedStreamsTest.scala @@ -0,0 +1,80 @@ +package org.example + +import org.scalatest.flatspec.AnyFlatSpec +import org.scalatest.matchers.should.Matchers + +import org.apache.flinkx.api.* +import org.apache.flinkx.api.serializers.* +import org.apache.flink.api.common.eventtime.WatermarkStrategy +import org.apache.flink.streaming.api.functions.co.CoMapFunction +import org.apache.flink.streaming.api.functions.co.CoProcessFunction +import org.apache.flink.util.Collector + +import java.util.concurrent.TimeUnit +import java.time.Duration + +case class TestCommand(timestamp: Long, bag: List[String] = Nil) + +class ConnectedStreamsTest extends AnyFlatSpec with Matchers: + + it should "process in random order" in { + val env = StreamExecutionEnvironment.getExecutionEnvironment + env.setParallelism(1) + + val commands = Seq(TestCommand(1), TestCommand(2), TestCommand(3)) + val events = Seq( + TestEvent(2L, 1, 0), + TestEvent(2L, 2, 0), + TestEvent(2L, 3, 0) + ) + def strategy[T] = WatermarkStrategy + .forBoundedOutOfOrderness[T](Duration.ofSeconds(1000)) + + env + .fromCollection(commands) + .assignTimestampsAndWatermarks( + strategy + .withTimestampAssigner( + (cmd: TestCommand, streamRecordTimestamp: Long) => cmd.timestamp + ) + ) + .connect( + env + .fromCollection(events) + .assignTimestampsAndWatermarks( + strategy + .withTimestampAssigner( + (event: TestEvent, streamRecordTimestamp: Long) => + event.timestamp + ) + ) + ) + .process { + new CoProcessFunction[TestCommand, TestEvent, String]: + + override def processElement1( + value: TestCommand, + ctx: CoProcessFunction[ + org.example.TestCommand, + org.example.TestEvent, + String + ]#Context, + out: Collector[String] + ): Unit = + out.collect(s"cmd: ${value.timestamp}") + + override def processElement2( + value: TestEvent, + ctx: CoProcessFunction[ + org.example.TestCommand, + org.example.TestEvent, + String + ]#Context, + out: Collector[String] + ): Unit = + out.collect(s"event: ${value.timestamp}") + } + .print() + + env.execute() + } diff --git a/modules/examples/src/test/scala/org/example/CustomTriggerTests.scala b/modules/examples/src/test/scala/org/example/CustomTriggerTests.scala new file mode 100644 index 0000000..7b6ec28 --- /dev/null +++ b/modules/examples/src/test/scala/org/example/CustomTriggerTests.scala @@ -0,0 +1,113 @@ +package org.example + +import org.scalatest.flatspec.AnyFlatSpec +import org.scalatest.BeforeAndAfter +import org.scalatest.matchers.should.Matchers +import org.scalatest.concurrent.Eventually +import org.scalatest.time.Seconds +import org.scalatest.time.Span +import org.scalatest.time.Millis +import org.scalatest.Inspectors + +import org.apache.flinkx.api.* +import org.apache.flinkx.api.serializers.* + +import org.apache.flink.streaming.util.KeyedOneInputStreamOperatorTestHarness +import org.apache.flink.streaming.api.operators.KeyedProcessOperator +import org.apache.flink.streaming.api.functions.KeyedProcessFunction +import org.apache.flink.streaming.api.windowing.time.Time +import org.apache.flink.streaming.api.windowing.windows.Window +import org.apache.flink.streaming.api.windowing.windows.TimeWindow +import org.apache.flink.streaming.api.windowing.assigners.SlidingEventTimeWindows +import org.apache.flink.streaming.api.windowing.triggers.EventTimeTrigger +import org.apache.flink.streaming.runtime.operators.windowing.WindowOperator +import org.apache.flink.streaming.runtime.operators.windowing.functions.InternalSingleValueWindowFunction +import org.apache.flink.streaming.runtime.streamrecord.StreamRecord +import org.apache.flink.streaming.api.watermark.Watermark +import org.apache.flink.api.common.typeinfo.TypeInformation +import org.apache.flink.api.common.ExecutionConfig +import org.apache.flink.api.common.state.ReducingStateDescriptor +import org.apache.flink.util.Collector + +import java.util.concurrent.TimeUnit + +import scala.collection.JavaConverters.* + +class CustomTriggerTest extends AnyFlatSpec with Matchers with Inspectors: + + it should "test custom trigger" in { + val cfg = ExecutionConfig() + val serializer = deriveTypeInformation[TestEvent].createSerializer( + cfg + ) + + val stateDesc = + ReducingStateDescriptor( + "window-contents", + reduceEvents, + serializer + ) + + val windowSize = Time.of(10, TimeUnit.SECONDS) + val windowSlide = Time.of(2, TimeUnit.SECONDS) + + val operator = WindowOperator( + SlidingEventTimeWindows.of(windowSize, windowSlide), + TimeWindow.Serializer(), + (e: TestEvent) => e.key, + TypeInformation.of(classOf[Long]).createSerializer(cfg), + stateDesc, + InternalSingleValueWindowFunction(windowActionJ _), + CustomEventTimeTrigger(EventTimeTrigger.create()), + 0, + null /* late data output tag */ + ) + + val testHarness = + KeyedOneInputStreamOperatorTestHarness[Long, TestEvent, TestEvent]( + operator, + e => e.key, + TypeInformation.of(classOf[Long]) + ) + + testHarness.setup(serializer) + testHarness.open() + + // closing all empty pre-generated windows + testHarness.processWatermark( + Watermark( + windowSize.toMilliseconds - windowSlide.toMilliseconds + ) + ) + testHarness.getOutput.size should be(1) + testHarness.getRecordOutput.size should be(0) + + testHarness.processElement( + StreamRecord(TestEvent(2L, 6000, 1, windowStart = -1, bag = List(3)), 6000) + ) + testHarness.processElement( + StreamRecord(TestEvent(2L, 5000, 0), 5000) + ) + testHarness.processElement( + StreamRecord(TestEvent(2L, 10000, 2), 10000) + ) + + println("content:") + testHarness.getOutput.asScala.foreach(println) + + val records = testHarness.getRecordOutput.asScala + records.size should be(12) + val windows = records.groupBy(_.getValue.windowStart) + + // checking [0 - 10000] window, which contains 2 as running count + windows(0) + .map(_.getValue.runningCount) + .max should be(2) + + // checking windows [2000 - 12000], [4000 - ..] etc., which contains 3 as running count + windows + .filterKeys(_ > 0) + .mapValues(_.map(_.getValue.runningCount).max) + .values + .max should be(3) + } diff --git a/modules/examples/src/test/scala/org/example/MyKeyedProcessFunctionTest.scala b/modules/examples/src/test/scala/org/example/MyKeyedProcessFunctionTest.scala new file mode 100644 index 0000000..7d4fed6 --- /dev/null +++ b/modules/examples/src/test/scala/org/example/MyKeyedProcessFunctionTest.scala @@ -0,0 +1,45 @@ +package org.example + +import org.scalatest.flatspec.AnyFlatSpec +import org.scalatest.matchers.should.Matchers + +import org.apache.flink.streaming.api.functions.KeyedProcessFunction +import org.apache.flink.streaming.api.operators.KeyedProcessOperator +import org.apache.flink.streaming.api.functions.KeyedProcessFunction +import org.apache.flink.streaming.util.KeyedOneInputStreamOperatorTestHarness +import org.apache.flink.api.common.typeinfo.TypeInformation +import org.apache.flink.util.Collector + +@SerialVersionUID(1L) +class MyKeyedProcessFunction + extends KeyedProcessFunction[Long, TestEvent, Long]: + + @throws[Exception] + override def processElement( + e: TestEvent, + context: KeyedProcessFunction[Long, TestEvent, Long]#Context, + out: Collector[Long] + ): Unit = + out.collect(e.timestamp) + +class MyKeyedProcessFunctionTest extends AnyFlatSpec with Matchers: + + it should "test state" in { + val operator = + KeyedProcessOperator(MyKeyedProcessFunction()) + val testHarness = + KeyedOneInputStreamOperatorTestHarness[Long, TestEvent, Long]( + operator, + e => e.key, + TypeInformation.of(classOf[Long]) + ) + + testHarness.getExecutionConfig().setAutoWatermarkInterval(50) + testHarness.setup() + testHarness.open() + + testHarness.getOutput().size() should be(0) + + testHarness.processElement(TestEvent(2L, 1, 0), 100L) + testHarness.getOutput().size() shouldNot be(0) + } diff --git a/modules/examples/src/test/scala/org/example/fraud/FakeRuntimeContext.scala b/modules/examples/src/test/scala/org/example/fraud/FakeRuntimeContext.scala new file mode 100644 index 0000000..d870328 --- /dev/null +++ b/modules/examples/src/test/scala/org/example/fraud/FakeRuntimeContext.scala @@ -0,0 +1,116 @@ +package org.example.fraud + +import org.apache.flink.api.common.{ExecutionConfig, JobID} +import org.apache.flink.api.common.accumulators.{ + Accumulator, + DoubleCounter, + Histogram, + IntCounter, + LongCounter +} +import org.apache.flink.api.common.cache.DistributedCache +import org.apache.flink.api.common.externalresource.ExternalResourceInfo +import org.apache.flink.api.common.functions.{ + BroadcastVariableInitializer, + RuntimeContext +} +import org.apache.flink.api.common.state.{ + AggregatingState, + AggregatingStateDescriptor, + ListState, + ListStateDescriptor, + MapState, + MapStateDescriptor, + ReducingState, + ReducingStateDescriptor, + ValueState, + ValueStateDescriptor +} +import org.apache.flink.metrics.groups.OperatorMetricGroup + +import java.util + +class FakeRuntimeContext extends RuntimeContext: + + override def getJobId: JobID = ??? + + override def getTaskName: String = ??? + + override def getMetricGroup: OperatorMetricGroup = ??? + + override def getNumberOfParallelSubtasks: Int = ??? + + override def getMaxNumberOfParallelSubtasks: Int = ??? + + override def getIndexOfThisSubtask: Int = ??? + + override def getAttemptNumber: Int = ??? + + override def getTaskNameWithSubtasks: String = ??? + + override def getExecutionConfig: ExecutionConfig = ??? + + override def getUserCodeClassLoader: ClassLoader = ??? + + override def registerUserCodeClassLoaderReleaseHookIfAbsent( + releaseHookName: String, + releaseHook: Runnable + ): Unit = ??? + + override def addAccumulator[V, A <: Serializable]( + name: String, + accumulator: Accumulator[V, A] + ): Unit = ??? + + override def getAccumulator[V, A <: Serializable]( + name: String + ): Accumulator[V, A] = ??? + + override def getIntCounter(name: String): IntCounter = ??? + + override def getLongCounter(name: String): LongCounter = ??? + + override def getDoubleCounter(name: String): DoubleCounter = ??? + + override def getHistogram(name: String): Histogram = ??? + + override def getExternalResourceInfos( + resourceName: String + ): util.Set[ExternalResourceInfo] = ??? + + override def hasBroadcastVariable(name: String): Boolean = ??? + + override def getBroadcastVariable[RT](name: String): util.List[RT] = ??? + + override def getBroadcastVariableWithInitializer[T, C]( + name: String, + initializer: BroadcastVariableInitializer[T, C] + ): C = ??? + + override def getDistributedCache: DistributedCache = ??? + + override def getListState[T]( + stateProperties: ListStateDescriptor[T] + ): ListState[T] = ??? + + override def getReducingState[T]( + stateProperties: ReducingStateDescriptor[T] + ): ReducingState[T] = ??? + + override def getAggregatingState[IN, ACC, OUT]( + stateProperties: AggregatingStateDescriptor[IN, ACC, OUT] + ): AggregatingState[IN, OUT] = ??? + + override def getMapState[UK, UV]( + stateProperties: MapStateDescriptor[UK, UV] + ): MapState[UK, UV] = ??? + + override def getState[T]( + stateProperties: ValueStateDescriptor[T] + ): ValueState[T] = + new ValueState[T] { + var v: T = null.asInstanceOf[T] + override def clear(): Unit = v = null.asInstanceOf[T] + override def update(value: T): Unit = v = value + override def value(): T = v + } diff --git a/modules/examples/src/test/scala/org/example/fraud/FraudDetectorTest.scala b/modules/examples/src/test/scala/org/example/fraud/FraudDetectorTest.scala new file mode 100644 index 0000000..42463f3 --- /dev/null +++ b/modules/examples/src/test/scala/org/example/fraud/FraudDetectorTest.scala @@ -0,0 +1,59 @@ +package org.example.fraud + +import org.example.Transaction +import org.example.Alert + +import org.scalatest.matchers.should.Matchers +import org.scalatest.flatspec.AnyFlatSpec +import org.apache.flink.util.Collector +import org.apache.flink.streaming.api.functions.KeyedProcessFunction +import org.apache.flink.util.OutputTag +import org.apache.flink.streaming.api.TimerService +import org.apache.flink.streaming.api.operators.StreamingRuntimeContext +import org.apache.flink.api.common.functions.util.AbstractRuntimeUDFContext +import org.apache.flink.streaming.api.operators.StreamFlatMap +import org.apache.flink.configuration.Configuration + +import scala.collection.mutable.ListBuffer +import org.example.fraud.FraudDetector + +class FraudDetectorTest extends AnyFlatSpec with Matchers: + + class FakeCollector extends Collector[Alert]: + val state = ListBuffer.empty[Alert] + + override def collect(record: Alert): Unit = + state += record + override def close(): Unit = + state.clear + + def makeContext(detector: FraudDetector) = + new detector.Context: + override def getCurrentKey(): Long = ??? + override def output[X](outputTag: OutputTag[X], value: X): Unit = ??? + override def timestamp(): java.lang.Long = 0L + override def timerService(): TimerService = new TimerService: + override def currentProcessingTime(): Long = 0L + override def registerProcessingTimeTimer(time: Long): Unit = () + override def currentWatermark(): Long = ??? + override def deleteProcessingTimeTimer(time: Long): Unit = () + override def registerEventTimeTimer(time: Long): Unit = ??? + override def deleteEventTimeTimer(time: Long): Unit = ??? + + it should "detect fraud" in { + // given + val detector = FraudDetector() + detector.setRuntimeContext(FakeRuntimeContext()) + detector.open(new Configuration()) + + val ctx = makeContext(detector) + val collector = FakeCollector() + // when + detector.processElement(Transaction(1, 1, 0.1), ctx, collector) + // then + collector.state should be(empty) + // when + detector.processElement(Transaction(1, 1, 500.1), ctx, collector) + // then + collector.state shouldNot be(empty) + } diff --git a/src/main/java/org/apache/flink/api/serializer/ScalaCaseClassSerializerSnapshot.java b/modules/scala-api/src/main/java/org/apache/flink/api/serializer/ScalaCaseClassSerializerSnapshot.java similarity index 100% rename from src/main/java/org/apache/flink/api/serializer/ScalaCaseClassSerializerSnapshot.java rename to modules/scala-api/src/main/java/org/apache/flink/api/serializer/ScalaCaseClassSerializerSnapshot.java diff --git a/src/main/java/org/apache/flink/api/serializer/ScalaEitherSerializerSnapshot.java b/modules/scala-api/src/main/java/org/apache/flink/api/serializer/ScalaEitherSerializerSnapshot.java similarity index 100% rename from src/main/java/org/apache/flink/api/serializer/ScalaEitherSerializerSnapshot.java rename to modules/scala-api/src/main/java/org/apache/flink/api/serializer/ScalaEitherSerializerSnapshot.java diff --git a/src/main/java/org/apache/flink/api/serializer/ScalaOptionSerializerSnapshot.java b/modules/scala-api/src/main/java/org/apache/flink/api/serializer/ScalaOptionSerializerSnapshot.java similarity index 100% rename from src/main/java/org/apache/flink/api/serializer/ScalaOptionSerializerSnapshot.java rename to modules/scala-api/src/main/java/org/apache/flink/api/serializer/ScalaOptionSerializerSnapshot.java diff --git a/src/main/scala-2/org/apache/flinkx/api/LowPrioImplicits.scala b/modules/scala-api/src/main/scala-2/org/apache/flinkx/api/LowPrioImplicits.scala similarity index 100% rename from src/main/scala-2/org/apache/flinkx/api/LowPrioImplicits.scala rename to modules/scala-api/src/main/scala-2/org/apache/flinkx/api/LowPrioImplicits.scala diff --git a/src/main/scala-2/org/apache/flinkx/api/serializer/ConstructorCompat.scala b/modules/scala-api/src/main/scala-2/org/apache/flinkx/api/serializer/ConstructorCompat.scala similarity index 100% rename from src/main/scala-2/org/apache/flinkx/api/serializer/ConstructorCompat.scala rename to modules/scala-api/src/main/scala-2/org/apache/flinkx/api/serializer/ConstructorCompat.scala diff --git a/src/main/scala-3/org/apache/flinkx/api/LowPrioImplicits.scala b/modules/scala-api/src/main/scala-3/org/apache/flinkx/api/LowPrioImplicits.scala similarity index 100% rename from src/main/scala-3/org/apache/flinkx/api/LowPrioImplicits.scala rename to modules/scala-api/src/main/scala-3/org/apache/flinkx/api/LowPrioImplicits.scala diff --git a/src/main/scala-3/org/apache/flinkx/api/TaggedDerivation.scala b/modules/scala-api/src/main/scala-3/org/apache/flinkx/api/TaggedDerivation.scala similarity index 100% rename from src/main/scala-3/org/apache/flinkx/api/TaggedDerivation.scala rename to modules/scala-api/src/main/scala-3/org/apache/flinkx/api/TaggedDerivation.scala diff --git a/src/main/scala-3/org/apache/flinkx/api/TypeTag.scala b/modules/scala-api/src/main/scala-3/org/apache/flinkx/api/TypeTag.scala similarity index 100% rename from src/main/scala-3/org/apache/flinkx/api/TypeTag.scala rename to modules/scala-api/src/main/scala-3/org/apache/flinkx/api/TypeTag.scala diff --git a/src/main/scala-3/org/apache/flinkx/api/TypeTagMacro.scala b/modules/scala-api/src/main/scala-3/org/apache/flinkx/api/TypeTagMacro.scala similarity index 100% rename from src/main/scala-3/org/apache/flinkx/api/TypeTagMacro.scala rename to modules/scala-api/src/main/scala-3/org/apache/flinkx/api/TypeTagMacro.scala diff --git a/src/main/scala-3/org/apache/flinkx/api/serializer/ConstructorCompat.scala b/modules/scala-api/src/main/scala-3/org/apache/flinkx/api/serializer/ConstructorCompat.scala similarity index 100% rename from src/main/scala-3/org/apache/flinkx/api/serializer/ConstructorCompat.scala rename to modules/scala-api/src/main/scala-3/org/apache/flinkx/api/serializer/ConstructorCompat.scala diff --git a/src/main/scala/org/apache/flink/streaming/util/typeutils/DefaultScalaProductFieldAccessorFactory.scala b/modules/scala-api/src/main/scala/org/apache/flink/streaming/util/typeutils/DefaultScalaProductFieldAccessorFactory.scala similarity index 100% rename from src/main/scala/org/apache/flink/streaming/util/typeutils/DefaultScalaProductFieldAccessorFactory.scala rename to modules/scala-api/src/main/scala/org/apache/flink/streaming/util/typeutils/DefaultScalaProductFieldAccessorFactory.scala diff --git a/src/main/scala/org/apache/flinkx/api/AllWindowedStream.scala b/modules/scala-api/src/main/scala/org/apache/flinkx/api/AllWindowedStream.scala similarity index 100% rename from src/main/scala/org/apache/flinkx/api/AllWindowedStream.scala rename to modules/scala-api/src/main/scala/org/apache/flinkx/api/AllWindowedStream.scala diff --git a/src/main/scala/org/apache/flinkx/api/AsyncDataStream.scala b/modules/scala-api/src/main/scala/org/apache/flinkx/api/AsyncDataStream.scala similarity index 100% rename from src/main/scala/org/apache/flinkx/api/AsyncDataStream.scala rename to modules/scala-api/src/main/scala/org/apache/flinkx/api/AsyncDataStream.scala diff --git a/src/main/scala/org/apache/flinkx/api/BroadcastConnectedStream.scala b/modules/scala-api/src/main/scala/org/apache/flinkx/api/BroadcastConnectedStream.scala similarity index 100% rename from src/main/scala/org/apache/flinkx/api/BroadcastConnectedStream.scala rename to modules/scala-api/src/main/scala/org/apache/flinkx/api/BroadcastConnectedStream.scala diff --git a/src/main/scala/org/apache/flinkx/api/CloseableIterator.scala b/modules/scala-api/src/main/scala/org/apache/flinkx/api/CloseableIterator.scala similarity index 100% rename from src/main/scala/org/apache/flinkx/api/CloseableIterator.scala rename to modules/scala-api/src/main/scala/org/apache/flinkx/api/CloseableIterator.scala diff --git a/src/main/scala/org/apache/flinkx/api/ClosureCleaner.scala b/modules/scala-api/src/main/scala/org/apache/flinkx/api/ClosureCleaner.scala similarity index 100% rename from src/main/scala/org/apache/flinkx/api/ClosureCleaner.scala rename to modules/scala-api/src/main/scala/org/apache/flinkx/api/ClosureCleaner.scala diff --git a/src/main/scala/org/apache/flinkx/api/CoGroupedStreams.scala b/modules/scala-api/src/main/scala/org/apache/flinkx/api/CoGroupedStreams.scala similarity index 100% rename from src/main/scala/org/apache/flinkx/api/CoGroupedStreams.scala rename to modules/scala-api/src/main/scala/org/apache/flinkx/api/CoGroupedStreams.scala diff --git a/src/main/scala/org/apache/flinkx/api/ConnectedStreams.scala b/modules/scala-api/src/main/scala/org/apache/flinkx/api/ConnectedStreams.scala similarity index 100% rename from src/main/scala/org/apache/flinkx/api/ConnectedStreams.scala rename to modules/scala-api/src/main/scala/org/apache/flinkx/api/ConnectedStreams.scala diff --git a/src/main/scala/org/apache/flinkx/api/DataStream.scala b/modules/scala-api/src/main/scala/org/apache/flinkx/api/DataStream.scala similarity index 100% rename from src/main/scala/org/apache/flinkx/api/DataStream.scala rename to modules/scala-api/src/main/scala/org/apache/flinkx/api/DataStream.scala diff --git a/src/main/scala/org/apache/flinkx/api/DataStreamUtils.scala b/modules/scala-api/src/main/scala/org/apache/flinkx/api/DataStreamUtils.scala similarity index 100% rename from src/main/scala/org/apache/flinkx/api/DataStreamUtils.scala rename to modules/scala-api/src/main/scala/org/apache/flinkx/api/DataStreamUtils.scala diff --git a/src/main/scala/org/apache/flinkx/api/JoinedStreams.scala b/modules/scala-api/src/main/scala/org/apache/flinkx/api/JoinedStreams.scala similarity index 100% rename from src/main/scala/org/apache/flinkx/api/JoinedStreams.scala rename to modules/scala-api/src/main/scala/org/apache/flinkx/api/JoinedStreams.scala diff --git a/src/main/scala/org/apache/flinkx/api/KeyedStream.scala b/modules/scala-api/src/main/scala/org/apache/flinkx/api/KeyedStream.scala similarity index 100% rename from src/main/scala/org/apache/flinkx/api/KeyedStream.scala rename to modules/scala-api/src/main/scala/org/apache/flinkx/api/KeyedStream.scala diff --git a/src/main/scala/org/apache/flinkx/api/OutputTag.scala b/modules/scala-api/src/main/scala/org/apache/flinkx/api/OutputTag.scala similarity index 100% rename from src/main/scala/org/apache/flinkx/api/OutputTag.scala rename to modules/scala-api/src/main/scala/org/apache/flinkx/api/OutputTag.scala diff --git a/src/main/scala/org/apache/flinkx/api/ScalaStreamOps.scala b/modules/scala-api/src/main/scala/org/apache/flinkx/api/ScalaStreamOps.scala similarity index 100% rename from src/main/scala/org/apache/flinkx/api/ScalaStreamOps.scala rename to modules/scala-api/src/main/scala/org/apache/flinkx/api/ScalaStreamOps.scala diff --git a/src/main/scala/org/apache/flinkx/api/StreamExecutionEnvironment.scala b/modules/scala-api/src/main/scala/org/apache/flinkx/api/StreamExecutionEnvironment.scala similarity index 100% rename from src/main/scala/org/apache/flinkx/api/StreamExecutionEnvironment.scala rename to modules/scala-api/src/main/scala/org/apache/flinkx/api/StreamExecutionEnvironment.scala diff --git a/src/main/scala/org/apache/flinkx/api/WindowedStream.scala b/modules/scala-api/src/main/scala/org/apache/flinkx/api/WindowedStream.scala similarity index 100% rename from src/main/scala/org/apache/flinkx/api/WindowedStream.scala rename to modules/scala-api/src/main/scala/org/apache/flinkx/api/WindowedStream.scala diff --git a/src/main/scala/org/apache/flinkx/api/async/AsyncFunction.scala b/modules/scala-api/src/main/scala/org/apache/flinkx/api/async/AsyncFunction.scala similarity index 100% rename from src/main/scala/org/apache/flinkx/api/async/AsyncFunction.scala rename to modules/scala-api/src/main/scala/org/apache/flinkx/api/async/AsyncFunction.scala diff --git a/src/main/scala/org/apache/flinkx/api/async/JavaResultFutureWrapper.scala b/modules/scala-api/src/main/scala/org/apache/flinkx/api/async/JavaResultFutureWrapper.scala similarity index 100% rename from src/main/scala/org/apache/flinkx/api/async/JavaResultFutureWrapper.scala rename to modules/scala-api/src/main/scala/org/apache/flinkx/api/async/JavaResultFutureWrapper.scala diff --git a/src/main/scala/org/apache/flinkx/api/async/ResultFuture.scala b/modules/scala-api/src/main/scala/org/apache/flinkx/api/async/ResultFuture.scala similarity index 100% rename from src/main/scala/org/apache/flinkx/api/async/ResultFuture.scala rename to modules/scala-api/src/main/scala/org/apache/flinkx/api/async/ResultFuture.scala diff --git a/src/main/scala/org/apache/flinkx/api/async/RichAsyncFunction.scala b/modules/scala-api/src/main/scala/org/apache/flinkx/api/async/RichAsyncFunction.scala similarity index 100% rename from src/main/scala/org/apache/flinkx/api/async/RichAsyncFunction.scala rename to modules/scala-api/src/main/scala/org/apache/flinkx/api/async/RichAsyncFunction.scala diff --git a/src/main/scala/org/apache/flinkx/api/async/ScalaRichAsyncFunctionWrapper.scala b/modules/scala-api/src/main/scala/org/apache/flinkx/api/async/ScalaRichAsyncFunctionWrapper.scala similarity index 100% rename from src/main/scala/org/apache/flinkx/api/async/ScalaRichAsyncFunctionWrapper.scala rename to modules/scala-api/src/main/scala/org/apache/flinkx/api/async/ScalaRichAsyncFunctionWrapper.scala diff --git a/src/main/scala/org/apache/flinkx/api/extensions/impl/acceptPartialFunctions/OnConnectedStream.scala b/modules/scala-api/src/main/scala/org/apache/flinkx/api/extensions/impl/acceptPartialFunctions/OnConnectedStream.scala similarity index 100% rename from src/main/scala/org/apache/flinkx/api/extensions/impl/acceptPartialFunctions/OnConnectedStream.scala rename to modules/scala-api/src/main/scala/org/apache/flinkx/api/extensions/impl/acceptPartialFunctions/OnConnectedStream.scala diff --git a/src/main/scala/org/apache/flinkx/api/extensions/impl/acceptPartialFunctions/OnDataStream.scala b/modules/scala-api/src/main/scala/org/apache/flinkx/api/extensions/impl/acceptPartialFunctions/OnDataStream.scala similarity index 100% rename from src/main/scala/org/apache/flinkx/api/extensions/impl/acceptPartialFunctions/OnDataStream.scala rename to modules/scala-api/src/main/scala/org/apache/flinkx/api/extensions/impl/acceptPartialFunctions/OnDataStream.scala diff --git a/src/main/scala/org/apache/flinkx/api/extensions/impl/acceptPartialFunctions/OnJoinedStream.scala b/modules/scala-api/src/main/scala/org/apache/flinkx/api/extensions/impl/acceptPartialFunctions/OnJoinedStream.scala similarity index 100% rename from src/main/scala/org/apache/flinkx/api/extensions/impl/acceptPartialFunctions/OnJoinedStream.scala rename to modules/scala-api/src/main/scala/org/apache/flinkx/api/extensions/impl/acceptPartialFunctions/OnJoinedStream.scala diff --git a/src/main/scala/org/apache/flinkx/api/extensions/impl/acceptPartialFunctions/OnKeyedStream.scala b/modules/scala-api/src/main/scala/org/apache/flinkx/api/extensions/impl/acceptPartialFunctions/OnKeyedStream.scala similarity index 100% rename from src/main/scala/org/apache/flinkx/api/extensions/impl/acceptPartialFunctions/OnKeyedStream.scala rename to modules/scala-api/src/main/scala/org/apache/flinkx/api/extensions/impl/acceptPartialFunctions/OnKeyedStream.scala diff --git a/src/main/scala/org/apache/flinkx/api/extensions/impl/acceptPartialFunctions/OnWindowedStream.scala b/modules/scala-api/src/main/scala/org/apache/flinkx/api/extensions/impl/acceptPartialFunctions/OnWindowedStream.scala similarity index 100% rename from src/main/scala/org/apache/flinkx/api/extensions/impl/acceptPartialFunctions/OnWindowedStream.scala rename to modules/scala-api/src/main/scala/org/apache/flinkx/api/extensions/impl/acceptPartialFunctions/OnWindowedStream.scala diff --git a/src/main/scala/org/apache/flinkx/api/extensions/ops.scala b/modules/scala-api/src/main/scala/org/apache/flinkx/api/extensions/ops.scala similarity index 100% rename from src/main/scala/org/apache/flinkx/api/extensions/ops.scala rename to modules/scala-api/src/main/scala/org/apache/flinkx/api/extensions/ops.scala diff --git a/src/main/scala/org/apache/flinkx/api/function/AllWindowFunction.scala b/modules/scala-api/src/main/scala/org/apache/flinkx/api/function/AllWindowFunction.scala similarity index 100% rename from src/main/scala/org/apache/flinkx/api/function/AllWindowFunction.scala rename to modules/scala-api/src/main/scala/org/apache/flinkx/api/function/AllWindowFunction.scala diff --git a/src/main/scala/org/apache/flinkx/api/function/ProcessAllWindowFunction.scala b/modules/scala-api/src/main/scala/org/apache/flinkx/api/function/ProcessAllWindowFunction.scala similarity index 100% rename from src/main/scala/org/apache/flinkx/api/function/ProcessAllWindowFunction.scala rename to modules/scala-api/src/main/scala/org/apache/flinkx/api/function/ProcessAllWindowFunction.scala diff --git a/src/main/scala/org/apache/flinkx/api/function/ProcessWindowFunction.scala b/modules/scala-api/src/main/scala/org/apache/flinkx/api/function/ProcessWindowFunction.scala similarity index 100% rename from src/main/scala/org/apache/flinkx/api/function/ProcessWindowFunction.scala rename to modules/scala-api/src/main/scala/org/apache/flinkx/api/function/ProcessWindowFunction.scala diff --git a/src/main/scala/org/apache/flinkx/api/function/RichAllWindowFunction.scala b/modules/scala-api/src/main/scala/org/apache/flinkx/api/function/RichAllWindowFunction.scala similarity index 100% rename from src/main/scala/org/apache/flinkx/api/function/RichAllWindowFunction.scala rename to modules/scala-api/src/main/scala/org/apache/flinkx/api/function/RichAllWindowFunction.scala diff --git a/src/main/scala/org/apache/flinkx/api/function/RichWindowFunction.scala b/modules/scala-api/src/main/scala/org/apache/flinkx/api/function/RichWindowFunction.scala similarity index 100% rename from src/main/scala/org/apache/flinkx/api/function/RichWindowFunction.scala rename to modules/scala-api/src/main/scala/org/apache/flinkx/api/function/RichWindowFunction.scala diff --git a/src/main/scala/org/apache/flinkx/api/function/StatefulFunction.scala b/modules/scala-api/src/main/scala/org/apache/flinkx/api/function/StatefulFunction.scala similarity index 100% rename from src/main/scala/org/apache/flinkx/api/function/StatefulFunction.scala rename to modules/scala-api/src/main/scala/org/apache/flinkx/api/function/StatefulFunction.scala diff --git a/src/main/scala/org/apache/flinkx/api/function/WindowFunction.scala b/modules/scala-api/src/main/scala/org/apache/flinkx/api/function/WindowFunction.scala similarity index 100% rename from src/main/scala/org/apache/flinkx/api/function/WindowFunction.scala rename to modules/scala-api/src/main/scala/org/apache/flinkx/api/function/WindowFunction.scala diff --git a/src/main/scala/org/apache/flinkx/api/function/util/ScalaAllWindowFunction.scala b/modules/scala-api/src/main/scala/org/apache/flinkx/api/function/util/ScalaAllWindowFunction.scala similarity index 100% rename from src/main/scala/org/apache/flinkx/api/function/util/ScalaAllWindowFunction.scala rename to modules/scala-api/src/main/scala/org/apache/flinkx/api/function/util/ScalaAllWindowFunction.scala diff --git a/src/main/scala/org/apache/flinkx/api/function/util/ScalaAllWindowFunctionWrapper.scala b/modules/scala-api/src/main/scala/org/apache/flinkx/api/function/util/ScalaAllWindowFunctionWrapper.scala similarity index 100% rename from src/main/scala/org/apache/flinkx/api/function/util/ScalaAllWindowFunctionWrapper.scala rename to modules/scala-api/src/main/scala/org/apache/flinkx/api/function/util/ScalaAllWindowFunctionWrapper.scala diff --git a/src/main/scala/org/apache/flinkx/api/function/util/ScalaProcessWindowFunctionWrapper.scala b/modules/scala-api/src/main/scala/org/apache/flinkx/api/function/util/ScalaProcessWindowFunctionWrapper.scala similarity index 100% rename from src/main/scala/org/apache/flinkx/api/function/util/ScalaProcessWindowFunctionWrapper.scala rename to modules/scala-api/src/main/scala/org/apache/flinkx/api/function/util/ScalaProcessWindowFunctionWrapper.scala diff --git a/src/main/scala/org/apache/flinkx/api/function/util/ScalaReduceFunction.scala b/modules/scala-api/src/main/scala/org/apache/flinkx/api/function/util/ScalaReduceFunction.scala similarity index 100% rename from src/main/scala/org/apache/flinkx/api/function/util/ScalaReduceFunction.scala rename to modules/scala-api/src/main/scala/org/apache/flinkx/api/function/util/ScalaReduceFunction.scala diff --git a/src/main/scala/org/apache/flinkx/api/function/util/ScalaWindowFunction.scala b/modules/scala-api/src/main/scala/org/apache/flinkx/api/function/util/ScalaWindowFunction.scala similarity index 100% rename from src/main/scala/org/apache/flinkx/api/function/util/ScalaWindowFunction.scala rename to modules/scala-api/src/main/scala/org/apache/flinkx/api/function/util/ScalaWindowFunction.scala diff --git a/src/main/scala/org/apache/flinkx/api/function/util/ScalaWindowFunctionWrapper.scala b/modules/scala-api/src/main/scala/org/apache/flinkx/api/function/util/ScalaWindowFunctionWrapper.scala similarity index 100% rename from src/main/scala/org/apache/flinkx/api/function/util/ScalaWindowFunctionWrapper.scala rename to modules/scala-api/src/main/scala/org/apache/flinkx/api/function/util/ScalaWindowFunctionWrapper.scala diff --git a/src/main/scala/org/apache/flinkx/api/mapper/BigDecMapper.scala b/modules/scala-api/src/main/scala/org/apache/flinkx/api/mapper/BigDecMapper.scala similarity index 100% rename from src/main/scala/org/apache/flinkx/api/mapper/BigDecMapper.scala rename to modules/scala-api/src/main/scala/org/apache/flinkx/api/mapper/BigDecMapper.scala diff --git a/src/main/scala/org/apache/flinkx/api/mapper/BigIntMapper.scala b/modules/scala-api/src/main/scala/org/apache/flinkx/api/mapper/BigIntMapper.scala similarity index 100% rename from src/main/scala/org/apache/flinkx/api/mapper/BigIntMapper.scala rename to modules/scala-api/src/main/scala/org/apache/flinkx/api/mapper/BigIntMapper.scala diff --git a/src/main/scala/org/apache/flinkx/api/mapper/UuidMapper.scala b/modules/scala-api/src/main/scala/org/apache/flinkx/api/mapper/UuidMapper.scala similarity index 100% rename from src/main/scala/org/apache/flinkx/api/mapper/UuidMapper.scala rename to modules/scala-api/src/main/scala/org/apache/flinkx/api/mapper/UuidMapper.scala diff --git a/src/main/scala/org/apache/flinkx/api/serializer/ArraySerializer.scala b/modules/scala-api/src/main/scala/org/apache/flinkx/api/serializer/ArraySerializer.scala similarity index 100% rename from src/main/scala/org/apache/flinkx/api/serializer/ArraySerializer.scala rename to modules/scala-api/src/main/scala/org/apache/flinkx/api/serializer/ArraySerializer.scala diff --git a/src/main/scala/org/apache/flinkx/api/serializer/CaseClassSerializer.scala b/modules/scala-api/src/main/scala/org/apache/flinkx/api/serializer/CaseClassSerializer.scala similarity index 100% rename from src/main/scala/org/apache/flinkx/api/serializer/CaseClassSerializer.scala rename to modules/scala-api/src/main/scala/org/apache/flinkx/api/serializer/CaseClassSerializer.scala diff --git a/src/main/scala/org/apache/flinkx/api/serializer/CollectionSerializerSnapshot.scala b/modules/scala-api/src/main/scala/org/apache/flinkx/api/serializer/CollectionSerializerSnapshot.scala similarity index 100% rename from src/main/scala/org/apache/flinkx/api/serializer/CollectionSerializerSnapshot.scala rename to modules/scala-api/src/main/scala/org/apache/flinkx/api/serializer/CollectionSerializerSnapshot.scala diff --git a/src/main/scala/org/apache/flinkx/api/serializer/CoproductSerializer.scala b/modules/scala-api/src/main/scala/org/apache/flinkx/api/serializer/CoproductSerializer.scala similarity index 100% rename from src/main/scala/org/apache/flinkx/api/serializer/CoproductSerializer.scala rename to modules/scala-api/src/main/scala/org/apache/flinkx/api/serializer/CoproductSerializer.scala diff --git a/src/main/scala/org/apache/flinkx/api/serializer/EitherSerializer.scala b/modules/scala-api/src/main/scala/org/apache/flinkx/api/serializer/EitherSerializer.scala similarity index 100% rename from src/main/scala/org/apache/flinkx/api/serializer/EitherSerializer.scala rename to modules/scala-api/src/main/scala/org/apache/flinkx/api/serializer/EitherSerializer.scala diff --git a/src/main/scala/org/apache/flinkx/api/serializer/ListCCSerializer.scala b/modules/scala-api/src/main/scala/org/apache/flinkx/api/serializer/ListCCSerializer.scala similarity index 100% rename from src/main/scala/org/apache/flinkx/api/serializer/ListCCSerializer.scala rename to modules/scala-api/src/main/scala/org/apache/flinkx/api/serializer/ListCCSerializer.scala diff --git a/src/main/scala/org/apache/flinkx/api/serializer/ListSerializer.scala b/modules/scala-api/src/main/scala/org/apache/flinkx/api/serializer/ListSerializer.scala similarity index 100% rename from src/main/scala/org/apache/flinkx/api/serializer/ListSerializer.scala rename to modules/scala-api/src/main/scala/org/apache/flinkx/api/serializer/ListSerializer.scala diff --git a/src/main/scala/org/apache/flinkx/api/serializer/MapSerializer.scala b/modules/scala-api/src/main/scala/org/apache/flinkx/api/serializer/MapSerializer.scala similarity index 100% rename from src/main/scala/org/apache/flinkx/api/serializer/MapSerializer.scala rename to modules/scala-api/src/main/scala/org/apache/flinkx/api/serializer/MapSerializer.scala diff --git a/src/main/scala/org/apache/flinkx/api/serializer/MappedSerializer.scala b/modules/scala-api/src/main/scala/org/apache/flinkx/api/serializer/MappedSerializer.scala similarity index 100% rename from src/main/scala/org/apache/flinkx/api/serializer/MappedSerializer.scala rename to modules/scala-api/src/main/scala/org/apache/flinkx/api/serializer/MappedSerializer.scala diff --git a/src/main/scala/org/apache/flinkx/api/serializer/NothingSerializer.scala b/modules/scala-api/src/main/scala/org/apache/flinkx/api/serializer/NothingSerializer.scala similarity index 100% rename from src/main/scala/org/apache/flinkx/api/serializer/NothingSerializer.scala rename to modules/scala-api/src/main/scala/org/apache/flinkx/api/serializer/NothingSerializer.scala diff --git a/src/main/scala/org/apache/flinkx/api/serializer/OptionSerializer.scala b/modules/scala-api/src/main/scala/org/apache/flinkx/api/serializer/OptionSerializer.scala similarity index 100% rename from src/main/scala/org/apache/flinkx/api/serializer/OptionSerializer.scala rename to modules/scala-api/src/main/scala/org/apache/flinkx/api/serializer/OptionSerializer.scala diff --git a/src/main/scala/org/apache/flinkx/api/serializer/ScalaCaseClassSerializer.scala b/modules/scala-api/src/main/scala/org/apache/flinkx/api/serializer/ScalaCaseClassSerializer.scala similarity index 100% rename from src/main/scala/org/apache/flinkx/api/serializer/ScalaCaseClassSerializer.scala rename to modules/scala-api/src/main/scala/org/apache/flinkx/api/serializer/ScalaCaseClassSerializer.scala diff --git a/src/main/scala/org/apache/flinkx/api/serializer/ScalaCaseObjectSerializer.scala b/modules/scala-api/src/main/scala/org/apache/flinkx/api/serializer/ScalaCaseObjectSerializer.scala similarity index 100% rename from src/main/scala/org/apache/flinkx/api/serializer/ScalaCaseObjectSerializer.scala rename to modules/scala-api/src/main/scala/org/apache/flinkx/api/serializer/ScalaCaseObjectSerializer.scala diff --git a/src/main/scala/org/apache/flinkx/api/serializer/SeqSerializer.scala b/modules/scala-api/src/main/scala/org/apache/flinkx/api/serializer/SeqSerializer.scala similarity index 100% rename from src/main/scala/org/apache/flinkx/api/serializer/SeqSerializer.scala rename to modules/scala-api/src/main/scala/org/apache/flinkx/api/serializer/SeqSerializer.scala diff --git a/src/main/scala/org/apache/flinkx/api/serializer/SetSerializer.scala b/modules/scala-api/src/main/scala/org/apache/flinkx/api/serializer/SetSerializer.scala similarity index 100% rename from src/main/scala/org/apache/flinkx/api/serializer/SetSerializer.scala rename to modules/scala-api/src/main/scala/org/apache/flinkx/api/serializer/SetSerializer.scala diff --git a/src/main/scala/org/apache/flinkx/api/serializer/SimpleSerializer.scala b/modules/scala-api/src/main/scala/org/apache/flinkx/api/serializer/SimpleSerializer.scala similarity index 100% rename from src/main/scala/org/apache/flinkx/api/serializer/SimpleSerializer.scala rename to modules/scala-api/src/main/scala/org/apache/flinkx/api/serializer/SimpleSerializer.scala diff --git a/src/main/scala/org/apache/flinkx/api/serializer/UnitSerializer.scala b/modules/scala-api/src/main/scala/org/apache/flinkx/api/serializer/UnitSerializer.scala similarity index 100% rename from src/main/scala/org/apache/flinkx/api/serializer/UnitSerializer.scala rename to modules/scala-api/src/main/scala/org/apache/flinkx/api/serializer/UnitSerializer.scala diff --git a/src/main/scala/org/apache/flinkx/api/serializer/VectorSerializer.scala b/modules/scala-api/src/main/scala/org/apache/flinkx/api/serializer/VectorSerializer.scala similarity index 100% rename from src/main/scala/org/apache/flinkx/api/serializer/VectorSerializer.scala rename to modules/scala-api/src/main/scala/org/apache/flinkx/api/serializer/VectorSerializer.scala diff --git a/src/main/scala/org/apache/flinkx/api/serializers.scala b/modules/scala-api/src/main/scala/org/apache/flinkx/api/serializers.scala similarity index 100% rename from src/main/scala/org/apache/flinkx/api/serializers.scala rename to modules/scala-api/src/main/scala/org/apache/flinkx/api/serializers.scala diff --git a/src/main/scala/org/apache/flinkx/api/typeinfo/CaseClassComparator.scala b/modules/scala-api/src/main/scala/org/apache/flinkx/api/typeinfo/CaseClassComparator.scala similarity index 100% rename from src/main/scala/org/apache/flinkx/api/typeinfo/CaseClassComparator.scala rename to modules/scala-api/src/main/scala/org/apache/flinkx/api/typeinfo/CaseClassComparator.scala diff --git a/src/main/scala/org/apache/flinkx/api/typeinfo/CaseClassTypeInfo.scala b/modules/scala-api/src/main/scala/org/apache/flinkx/api/typeinfo/CaseClassTypeInfo.scala similarity index 100% rename from src/main/scala/org/apache/flinkx/api/typeinfo/CaseClassTypeInfo.scala rename to modules/scala-api/src/main/scala/org/apache/flinkx/api/typeinfo/CaseClassTypeInfo.scala diff --git a/src/main/scala/org/apache/flinkx/api/typeinfo/CollectionTypeInformation.scala b/modules/scala-api/src/main/scala/org/apache/flinkx/api/typeinfo/CollectionTypeInformation.scala similarity index 100% rename from src/main/scala/org/apache/flinkx/api/typeinfo/CollectionTypeInformation.scala rename to modules/scala-api/src/main/scala/org/apache/flinkx/api/typeinfo/CollectionTypeInformation.scala diff --git a/src/main/scala/org/apache/flinkx/api/typeinfo/CoproductTypeInformation.scala b/modules/scala-api/src/main/scala/org/apache/flinkx/api/typeinfo/CoproductTypeInformation.scala similarity index 100% rename from src/main/scala/org/apache/flinkx/api/typeinfo/CoproductTypeInformation.scala rename to modules/scala-api/src/main/scala/org/apache/flinkx/api/typeinfo/CoproductTypeInformation.scala diff --git a/src/main/scala/org/apache/flinkx/api/typeinfo/EitherTypeInfo.scala b/modules/scala-api/src/main/scala/org/apache/flinkx/api/typeinfo/EitherTypeInfo.scala similarity index 100% rename from src/main/scala/org/apache/flinkx/api/typeinfo/EitherTypeInfo.scala rename to modules/scala-api/src/main/scala/org/apache/flinkx/api/typeinfo/EitherTypeInfo.scala diff --git a/src/main/scala/org/apache/flinkx/api/typeinfo/MappedTypeInformation.scala b/modules/scala-api/src/main/scala/org/apache/flinkx/api/typeinfo/MappedTypeInformation.scala similarity index 100% rename from src/main/scala/org/apache/flinkx/api/typeinfo/MappedTypeInformation.scala rename to modules/scala-api/src/main/scala/org/apache/flinkx/api/typeinfo/MappedTypeInformation.scala diff --git a/src/main/scala/org/apache/flinkx/api/typeinfo/OptionTypeComparator.scala b/modules/scala-api/src/main/scala/org/apache/flinkx/api/typeinfo/OptionTypeComparator.scala similarity index 100% rename from src/main/scala/org/apache/flinkx/api/typeinfo/OptionTypeComparator.scala rename to modules/scala-api/src/main/scala/org/apache/flinkx/api/typeinfo/OptionTypeComparator.scala diff --git a/src/main/scala/org/apache/flinkx/api/typeinfo/OptionTypeInfo.scala b/modules/scala-api/src/main/scala/org/apache/flinkx/api/typeinfo/OptionTypeInfo.scala similarity index 100% rename from src/main/scala/org/apache/flinkx/api/typeinfo/OptionTypeInfo.scala rename to modules/scala-api/src/main/scala/org/apache/flinkx/api/typeinfo/OptionTypeInfo.scala diff --git a/src/main/scala/org/apache/flinkx/api/typeinfo/ProductTypeInformation.scala b/modules/scala-api/src/main/scala/org/apache/flinkx/api/typeinfo/ProductTypeInformation.scala similarity index 100% rename from src/main/scala/org/apache/flinkx/api/typeinfo/ProductTypeInformation.scala rename to modules/scala-api/src/main/scala/org/apache/flinkx/api/typeinfo/ProductTypeInformation.scala diff --git a/src/main/scala/org/apache/flinkx/api/typeinfo/SimpleTypeInformation.scala b/modules/scala-api/src/main/scala/org/apache/flinkx/api/typeinfo/SimpleTypeInformation.scala similarity index 100% rename from src/main/scala/org/apache/flinkx/api/typeinfo/SimpleTypeInformation.scala rename to modules/scala-api/src/main/scala/org/apache/flinkx/api/typeinfo/SimpleTypeInformation.scala diff --git a/src/main/scala/org/apache/flinkx/api/typeinfo/UnitTypeInformation.scala b/modules/scala-api/src/main/scala/org/apache/flinkx/api/typeinfo/UnitTypeInformation.scala similarity index 100% rename from src/main/scala/org/apache/flinkx/api/typeinfo/UnitTypeInformation.scala rename to modules/scala-api/src/main/scala/org/apache/flinkx/api/typeinfo/UnitTypeInformation.scala diff --git a/src/test/resources/click.dat b/modules/scala-api/src/test/resources/click.dat similarity index 100% rename from src/test/resources/click.dat rename to modules/scala-api/src/test/resources/click.dat diff --git a/modules/scala-api/src/test/resources/logback.xml b/modules/scala-api/src/test/resources/logback.xml new file mode 100755 index 0000000..70e5059 --- /dev/null +++ b/modules/scala-api/src/test/resources/logback.xml @@ -0,0 +1,24 @@ + + + + true + + + + + %date{yyyy-MM-dd HH:mm:ss.SSSZ, UTC} %-16level %-43thread %-24logger{24} %message%n%xException + + + + + + + + + + + + + + + diff --git a/src/test/resources/without-arity-test.dat b/modules/scala-api/src/test/resources/without-arity-test.dat similarity index 100% rename from src/test/resources/without-arity-test.dat rename to modules/scala-api/src/test/resources/without-arity-test.dat diff --git a/src/test/scala-3/org/apache/flinkx/api/Scala3EnumTest.scala b/modules/scala-api/src/test/scala-3/org/apache/flinkx/api/Scala3EnumTest.scala similarity index 100% rename from src/test/scala-3/org/apache/flinkx/api/Scala3EnumTest.scala rename to modules/scala-api/src/test/scala-3/org/apache/flinkx/api/Scala3EnumTest.scala diff --git a/src/test/scala/org/apache/flinkx/api/AnyTest.scala b/modules/scala-api/src/test/scala/org/apache/flinkx/api/AnyTest.scala similarity index 100% rename from src/test/scala/org/apache/flinkx/api/AnyTest.scala rename to modules/scala-api/src/test/scala/org/apache/flinkx/api/AnyTest.scala diff --git a/src/test/scala/org/apache/flinkx/api/AsyncDataStreamTest.scala b/modules/scala-api/src/test/scala/org/apache/flinkx/api/AsyncDataStreamTest.scala similarity index 100% rename from src/test/scala/org/apache/flinkx/api/AsyncDataStreamTest.scala rename to modules/scala-api/src/test/scala/org/apache/flinkx/api/AsyncDataStreamTest.scala diff --git a/src/test/scala/org/apache/flinkx/api/CatsTest.scala b/modules/scala-api/src/test/scala/org/apache/flinkx/api/CatsTest.scala similarity index 100% rename from src/test/scala/org/apache/flinkx/api/CatsTest.scala rename to modules/scala-api/src/test/scala/org/apache/flinkx/api/CatsTest.scala diff --git a/src/test/scala/org/apache/flinkx/api/CoGroupedStreamsTest.scala b/modules/scala-api/src/test/scala/org/apache/flinkx/api/CoGroupedStreamsTest.scala similarity index 100% rename from src/test/scala/org/apache/flinkx/api/CoGroupedStreamsTest.scala rename to modules/scala-api/src/test/scala/org/apache/flinkx/api/CoGroupedStreamsTest.scala diff --git a/src/test/scala/org/apache/flinkx/api/DataStreamTest.scala b/modules/scala-api/src/test/scala/org/apache/flinkx/api/DataStreamTest.scala similarity index 100% rename from src/test/scala/org/apache/flinkx/api/DataStreamTest.scala rename to modules/scala-api/src/test/scala/org/apache/flinkx/api/DataStreamTest.scala diff --git a/src/test/scala/org/apache/flinkx/api/ExampleTest.scala b/modules/scala-api/src/test/scala/org/apache/flinkx/api/ExampleTest.scala similarity index 100% rename from src/test/scala/org/apache/flinkx/api/ExampleTest.scala rename to modules/scala-api/src/test/scala/org/apache/flinkx/api/ExampleTest.scala diff --git a/src/test/scala/org/apache/flinkx/api/IntegrationTest.scala b/modules/scala-api/src/test/scala/org/apache/flinkx/api/IntegrationTest.scala similarity index 100% rename from src/test/scala/org/apache/flinkx/api/IntegrationTest.scala rename to modules/scala-api/src/test/scala/org/apache/flinkx/api/IntegrationTest.scala diff --git a/src/test/scala/org/apache/flinkx/api/IntegrationTestSink.scala b/modules/scala-api/src/test/scala/org/apache/flinkx/api/IntegrationTestSink.scala similarity index 100% rename from src/test/scala/org/apache/flinkx/api/IntegrationTestSink.scala rename to modules/scala-api/src/test/scala/org/apache/flinkx/api/IntegrationTestSink.scala diff --git a/src/test/scala/org/apache/flinkx/api/JoinedStreamsTest.scala b/modules/scala-api/src/test/scala/org/apache/flinkx/api/JoinedStreamsTest.scala similarity index 100% rename from src/test/scala/org/apache/flinkx/api/JoinedStreamsTest.scala rename to modules/scala-api/src/test/scala/org/apache/flinkx/api/JoinedStreamsTest.scala diff --git a/src/test/scala/org/apache/flinkx/api/MappedTypeInfoTest.scala b/modules/scala-api/src/test/scala/org/apache/flinkx/api/MappedTypeInfoTest.scala similarity index 100% rename from src/test/scala/org/apache/flinkx/api/MappedTypeInfoTest.scala rename to modules/scala-api/src/test/scala/org/apache/flinkx/api/MappedTypeInfoTest.scala diff --git a/src/test/scala/org/apache/flinkx/api/ProcessFunctionTest.scala b/modules/scala-api/src/test/scala/org/apache/flinkx/api/ProcessFunctionTest.scala similarity index 100% rename from src/test/scala/org/apache/flinkx/api/ProcessFunctionTest.scala rename to modules/scala-api/src/test/scala/org/apache/flinkx/api/ProcessFunctionTest.scala diff --git a/src/test/scala/org/apache/flinkx/api/SchemaEvolutionTest.scala b/modules/scala-api/src/test/scala/org/apache/flinkx/api/SchemaEvolutionTest.scala similarity index 100% rename from src/test/scala/org/apache/flinkx/api/SchemaEvolutionTest.scala rename to modules/scala-api/src/test/scala/org/apache/flinkx/api/SchemaEvolutionTest.scala diff --git a/src/test/scala/org/apache/flinkx/api/SerializerSnapshotTest.scala b/modules/scala-api/src/test/scala/org/apache/flinkx/api/SerializerSnapshotTest.scala similarity index 100% rename from src/test/scala/org/apache/flinkx/api/SerializerSnapshotTest.scala rename to modules/scala-api/src/test/scala/org/apache/flinkx/api/SerializerSnapshotTest.scala diff --git a/src/test/scala/org/apache/flinkx/api/SerializerTest.scala b/modules/scala-api/src/test/scala/org/apache/flinkx/api/SerializerTest.scala similarity index 100% rename from src/test/scala/org/apache/flinkx/api/SerializerTest.scala rename to modules/scala-api/src/test/scala/org/apache/flinkx/api/SerializerTest.scala diff --git a/src/test/scala/org/apache/flinkx/api/StreamExecutionEnvironmentTest.scala b/modules/scala-api/src/test/scala/org/apache/flinkx/api/StreamExecutionEnvironmentTest.scala similarity index 100% rename from src/test/scala/org/apache/flinkx/api/StreamExecutionEnvironmentTest.scala rename to modules/scala-api/src/test/scala/org/apache/flinkx/api/StreamExecutionEnvironmentTest.scala diff --git a/src/test/scala/org/apache/flinkx/api/TestUtils.scala b/modules/scala-api/src/test/scala/org/apache/flinkx/api/TestUtils.scala similarity index 100% rename from src/test/scala/org/apache/flinkx/api/TestUtils.scala rename to modules/scala-api/src/test/scala/org/apache/flinkx/api/TestUtils.scala diff --git a/src/test/scala/org/apache/flinkx/api/TypeInfoTest.scala b/modules/scala-api/src/test/scala/org/apache/flinkx/api/TypeInfoTest.scala similarity index 100% rename from src/test/scala/org/apache/flinkx/api/TypeInfoTest.scala rename to modules/scala-api/src/test/scala/org/apache/flinkx/api/TypeInfoTest.scala diff --git a/project/plugins.sbt b/project/plugins.sbt index a84ce01..e9e1734 100644 --- a/project/plugins.sbt +++ b/project/plugins.sbt @@ -4,3 +4,4 @@ addSbtPlugin("org.scalameta" % "sbt-scalafmt" % "2.5.2") addSbtPlugin("org.scalameta" % "sbt-mdoc" % "2.5.4") addSbtPlugin("com.github.sbt" % "sbt-release" % "1.4.0") addSbtPlugin("com.github.sbt" % "sbt-git" % "2.0.1") +addSbtPlugin("com.github.sbt" % "sbt-protobuf" % "0.7.1") \ No newline at end of file