Skip to content

Commit

Permalink
removed all spark-sql dependencies from kotlin-spark-api so it can wo…
Browse files Browse the repository at this point in the history
…rk with spark-connect
  • Loading branch information
Jolanrensen committed Jun 17, 2024
1 parent 22fa5ae commit 726edfd
Show file tree
Hide file tree
Showing 21 changed files with 203 additions and 2,517 deletions.
4 changes: 2 additions & 2 deletions gradle.properties
Original file line number Diff line number Diff line change
Expand Up @@ -7,10 +7,10 @@ GROUP=org.jetbrains.kotlinx.spark
# can also be defined like ./gradlew -Pspark=X.X.X -Pscala=X.X.X build
spark=3.5.1
#spark=3.4.2
scala=2.13.13
scala=2.13.14
#scala=2.12.19
skipScalaOnlyDependent=false
sparkConnect=false
sparkConnect=true
org.gradle.caching=true
org.gradle.parallel=false
#kotlin.incremental.useClasspathSnapshot=true
Binary file modified gradle/bootstraps/compiler-plugin.jar
Binary file not shown.
Binary file modified gradle/bootstraps/gradle-plugin.jar
Binary file not shown.
25 changes: 17 additions & 8 deletions kotlin-spark-api/build.gradle.kts
Original file line number Diff line number Diff line change
Expand Up @@ -17,7 +17,6 @@ plugins {
group = Versions.groupID
version = Versions.project


repositories {
mavenCentral()
mavenLocal()
Expand All @@ -33,7 +32,7 @@ dependencies {
Projects {
api(
scalaHelpers,
scalaTuplesInKotlin
scalaTuplesInKotlin,
)
}

Expand All @@ -42,14 +41,18 @@ dependencies {
// https://github.com/FasterXML/jackson-bom/issues/52
if (Versions.spark == "3.3.1") implementation(jacksonDatabind)

// if (Versions.sparkConnect) TODO("unsupported for now")
if (Versions.sparkConnect) {
// IMPORTANT!
compileOnly(sparkSqlApi)
implementation(sparkConnectClient)
} else {
implementation(sparkSql)
}

implementation(
hadoopClient,
kotlinStdLib,
reflect,
sparkSql,
sparkStreaming,
hadoopClient,
kotlinDateTime,
)

Expand All @@ -68,7 +71,10 @@ dependencies {

// Setup preprocessing with JCP for main sources

val kotlinMainSources = kotlin.sourceSets.main.get().kotlin.sourceDirectories
val kotlinMainSources =
kotlin.sourceSets.main
.get()
.kotlin.sourceDirectories

val preprocessMain by tasks.creating(JcpTask::class) {
sources = kotlinMainSources
Expand Down Expand Up @@ -107,7 +113,10 @@ tasks.compileKotlin {

// Setup preprocessing with JCP for test sources

val kotlinTestSources = kotlin.sourceSets.test.get().kotlin.sourceDirectories
val kotlinTestSources =
kotlin.sourceSets.test
.get()
.kotlin.sourceDirectories

val preprocessTest by tasks.creating(JcpTask::class) {
sources = kotlinTestSources
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -27,8 +27,7 @@

package org.jetbrains.kotlinx.spark.api

import org.apache.spark.api.java.Optional
import scala.*
import scala.*
import java.util.*
import java.util.Enumeration
import java.util.concurrent.ConcurrentMap
Expand All @@ -43,14 +42,6 @@ import scala.collection.mutable.Buffer as ScalaMutableBuffer
import scala.collection.mutable.Map as ScalaMutableMap
import scala.collection.mutable.Seq as ScalaMutableSeq
import scala.collection.mutable.Set as ScalaMutableSet
import org.apache.spark.streaming.State

/** Returns state value if it exists, else `null`. */
fun <T> State<T>.getOrNull(): T? = if (exists()) get() else null

/** Returns state value if it exists, else [other]. */
fun <T> State<T>.getOrElse(other: T): T = if (exists()) get() else other


/** Converts Scala [Option] to Kotlin nullable. */
fun <T> Option<T>.getOrNull(): T? = getOrElse(null)
Expand All @@ -59,20 +50,20 @@ fun <T> Option<T>.getOrNull(): T? = getOrElse(null)
fun <T> Option<T>.getOrElse(other: T): T = getOrElse { other }

/** Converts nullable value to Scala [Option]. */
fun <T> T?.toOption(): Option<T> = Option.apply(this)
fun <T> T.toOption(): Option<T> = Option.apply(this)

/** Converts Scala [Option] to Java [Optional]. */
fun <T> Option<T>.toOptional(): Optional<T> = Optional.ofNullable(getOrNull())
fun <T> Option<T>.toOptional(): Optional<T & Any> = Optional.ofNullable(getOrNull())


/** Converts [Optional] to Kotlin nullable. */
fun <T> Optional<T>.getOrNull(): T? = orNull()
fun <T> Optional<T>.getOrNull(): T? = orElse(null)

/** Get if available else [other]. */
fun <T> Optional<T>.getOrElse(other: T): T = orElse(other)

/** Converts nullable value to [Optional]. */
fun <T> T?.toOptional(): Optional<T> = Optional.ofNullable(this)
fun <T> T.toOptional(): Optional<T & Any> = Optional.ofNullable(this)

/** Converts Java [Optional] to Scala [Option]. */
fun <T> Optional<T>.toOption(): Option<T> = Option.apply(getOrNull())
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -29,14 +29,12 @@

package org.jetbrains.kotlinx.spark.api

import org.apache.spark.api.java.JavaRDDLike
import org.apache.spark.api.java.function.FlatMapFunction
import org.apache.spark.api.java.function.ForeachFunction
import org.apache.spark.api.java.function.ForeachPartitionFunction
import org.apache.spark.api.java.function.MapFunction
import org.apache.spark.api.java.function.MapPartitionsFunction
import org.apache.spark.api.java.function.ReduceFunction
import org.apache.spark.rdd.RDD
import org.apache.spark.sql.*
import scala.Tuple2
import scala.Tuple3
Expand Down Expand Up @@ -100,33 +98,6 @@ inline fun <reified T> Array<T>.toDS(spark: SparkSession): Dataset<T> =
inline fun <reified T> Array<T>.toDF(spark: SparkSession, vararg colNames: String): Dataset<Row> =
toDS(spark).run { if (colNames.isEmpty()) toDF() else toDF(*colNames) }

/**
* Utility method to create dataset from RDD
*/
inline fun <reified T> RDD<T>.toDS(spark: SparkSession): Dataset<T> =
spark.createDataset(this, kotlinEncoderFor<T>())

/**
* Utility method to create dataset from JavaRDD
*/
inline fun <reified T> JavaRDDLike<T, *>.toDS(spark: SparkSession): Dataset<T> =
spark.createDataset(this.rdd(), kotlinEncoderFor<T>())

/**
* Utility method to create Dataset<Row> (Dataframe) from JavaRDD.
* NOTE: [T] must be [Serializable].
*/
inline fun <reified T> JavaRDDLike<T, *>.toDF(spark: SparkSession, vararg colNames: String): Dataset<Row> =
toDS(spark).run { if (colNames.isEmpty()) toDF() else toDF(*colNames) }

/**
* Utility method to create Dataset<Row> (Dataframe) from RDD.
* NOTE: [T] must be [Serializable].
*/
inline fun <reified T> RDD<T>.toDF(spark: SparkSession, vararg colNames: String): Dataset<Row> =
toDS(spark).run { if (colNames.isEmpty()) toDF() else toDF(*colNames) }


/**
* (Kotlin-specific)
* Returns a new Dataset that contains the result of applying [func] to each element.
Expand Down Expand Up @@ -271,21 +242,6 @@ inline fun <reified T> Dataset<T>.forEach(noinline func: (T) -> Unit): Unit = fo
inline fun <reified T> Dataset<T>.forEachPartition(noinline func: (Iterator<T>) -> Unit): Unit =
foreachPartition(ForeachPartitionFunction(func))

/**
* It's hard to call `Dataset.debugCodegen` from kotlin, so here is utility for that
*/
fun <T> Dataset<T>.debugCodegen(): Dataset<T> = also {
org.apache.spark.sql.execution.debug.`package$`.`MODULE$`.DebugQuery(it).debugCodegen()
}

/**
* It's hard to call `Dataset.debug` from kotlin, so here is utility for that
*/
fun <T> Dataset<T>.debug(): Dataset<T> = also {
org.apache.spark.sql.execution.debug.`package$`.`MODULE$`.DebugQuery(it).debug()
}


/**
* Alias for [Dataset.joinWith] which passes "left" argument
* and respects the fact that in result of left join right relation is nullable
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -116,9 +116,9 @@ fun <T> kotlinEncoderFor(kType: KType): Encoder<T> =
*/
private fun <T> applyEncoder(agnosticEncoder: AgnosticEncoder<T>): Encoder<T> {
//#if sparkConnect == false
return org.apache.spark.sql.catalyst.encoders.ExpressionEncoder.apply(agnosticEncoder)
//$return org.apache.spark.sql.catalyst.encoders.ExpressionEncoder.apply(agnosticEncoder)
//#else
//$return agnosticEncoder
return agnosticEncoder
//#endif
}

Expand Down

This file was deleted.

This file was deleted.

Loading

0 comments on commit 726edfd

Please sign in to comment.