Skip to content

Commit

Permalink
kotlin 2.0.0-RC1, enabled jupyter module. Added basic Sparkify conver…
Browse files Browse the repository at this point in the history
…sion for jupyter. Disabled html renderes in favor of just outputting them as text. Notebooks can render them however they like. RDDs are converted to ds before rendering
  • Loading branch information
Jolanrensen committed Apr 10, 2024
1 parent ab4c455 commit e05feac
Show file tree
Hide file tree
Showing 9 changed files with 203 additions and 85 deletions.
2 changes: 1 addition & 1 deletion buildSrc/src/main/kotlin/Versions.kt
Original file line number Diff line number Diff line change
Expand Up @@ -2,7 +2,7 @@ object Versions : Dsl<Versions> {
const val project = "2.0.0-SNAPSHOT"
const val kotlinSparkApiGradlePlugin = "2.0.0-SNAPSHOT"
const val groupID = "org.jetbrains.kotlinx.spark"
const val kotlin = "2.0.0-Beta5"
const val kotlin = "2.0.0-RC1"
const val jvmTarget = "8"
const val jupyterJvmTarget = "8"
inline val spark get() = System.getProperty("spark") as String
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -33,10 +33,10 @@ class DataClassSparkifySuperTypeGenerator(
}
}

context(TypeResolveServiceContainer)
override fun computeAdditionalSupertypes(
classLikeDeclaration: FirClassLikeDeclaration,
resolvedSupertypes: List<FirResolvedTypeRef>
resolvedSupertypes: List<FirResolvedTypeRef>,
typeResolver: TypeResolveService,
): List<FirResolvedTypeRef> = listOf(
buildResolvedTypeRef {
val scalaProduct = productFqNames.first().let {
Expand All @@ -48,7 +48,6 @@ class DataClassSparkifySuperTypeGenerator(
isNullable = false,
)
}

)

override fun needTransformSupertypes(declaration: FirClassLikeDeclaration): Boolean =
Expand Down
6 changes: 6 additions & 0 deletions examples/build.gradle.kts
Original file line number Diff line number Diff line change
Expand Up @@ -3,7 +3,13 @@ import org.jetbrains.kotlin.gradle.tasks.KotlinCompile
plugins {
// Needs to be installed in the local maven repository or have the bootstrap jar on the classpath
id("org.jetbrains.kotlinx.spark.api")
java
kotlin("jvm")
kotlin("plugin.noarg") version Versions.kotlin
}

noArg {
annotation("org.jetbrains.kotlinx.spark.examples.NoArg")
}

kotlinSparkApi {
Expand Down
Binary file modified gradle/bootstraps/compiler-plugin.jar
Binary file not shown.
Binary file modified gradle/bootstraps/gradle-plugin.jar
Binary file not shown.
Original file line number Diff line number Diff line change
Expand Up @@ -19,21 +19,43 @@
*/
package org.jetbrains.kotlinx.spark.api.jupyter

import kotlinx.serialization.Serializable
import kotlinx.serialization.json.*
import org.apache.spark.api.java.JavaDoubleRDD
import org.apache.spark.api.java.JavaPairRDD
import org.apache.spark.api.java.JavaRDD
import org.apache.spark.api.java.JavaRDDLike
import org.apache.spark.rdd.RDD
import org.apache.spark.sql.Dataset
import org.intellij.lang.annotations.Language
import org.jetbrains.kotlinx.jupyter.api.*
import org.jetbrains.kotlinx.jupyter.api.Code
import org.jetbrains.kotlinx.jupyter.api.FieldValue
import org.jetbrains.kotlinx.jupyter.api.KotlinKernelHost
import org.jetbrains.kotlinx.jupyter.api.MimeTypedResult
import org.jetbrains.kotlinx.jupyter.api.Notebook
import org.jetbrains.kotlinx.jupyter.api.VariableDeclaration
import org.jetbrains.kotlinx.jupyter.api.createRendererByCompileTimeType
import org.jetbrains.kotlinx.jupyter.api.declare
import org.jetbrains.kotlinx.jupyter.api.libraries.JupyterIntegration
import org.jetbrains.kotlinx.jupyter.api.textResult
import org.jetbrains.kotlinx.spark.api.SparkSession
import org.jetbrains.kotlinx.spark.api.jupyter.Properties.Companion.displayLimitName
import org.jetbrains.kotlinx.spark.api.jupyter.Properties.Companion.displayTruncateName
import org.jetbrains.kotlinx.spark.api.jupyter.Properties.Companion.scalaName
import org.jetbrains.kotlinx.spark.api.jupyter.Properties.Companion.sparkName
import org.jetbrains.kotlinx.spark.api.jupyter.Properties.Companion.sparkPropertiesName
import org.jetbrains.kotlinx.spark.api.jupyter.Properties.Companion.versionName
import kotlin.reflect.KProperty1
import org.jetbrains.kotlinx.spark.api.kotlinEncoderFor
import org.jetbrains.kotlinx.spark.api.plugin.annotations.ColumnName
import org.jetbrains.kotlinx.spark.api.plugin.annotations.Sparkify
import scala.Tuple2
import kotlin.reflect.KClass
import kotlin.reflect.KMutableProperty
import kotlin.reflect.full.createType
import kotlin.reflect.full.findAnnotation
import kotlin.reflect.full.isSubtypeOf
import kotlin.reflect.full.memberFunctions
import kotlin.reflect.full.memberProperties
import kotlin.reflect.full.primaryConstructor
import kotlin.reflect.full.valueParameters
import kotlin.reflect.typeOf


Expand All @@ -46,9 +68,6 @@ abstract class Integration(private val notebook: Notebook, private val options:
protected val sparkVersion = /*$"\""+spark+"\""$*/ /*-*/ ""
protected val version = /*$"\""+version+"\""$*/ /*-*/ ""

protected val displayLimitOld = "DISPLAY_LIMIT"
protected val displayTruncateOld = "DISPLAY_TRUNCATE"

protected val properties: Properties
get() = notebook
.variablesState[sparkPropertiesName]!!
Expand Down Expand Up @@ -101,6 +120,7 @@ abstract class Integration(private val notebook: Notebook, private val options:
)

open val imports: Array<String> = arrayOf(
"org.jetbrains.kotlinx.spark.api.plugin.annotations.*",
"org.jetbrains.kotlinx.spark.api.*",
"org.jetbrains.kotlinx.spark.api.tuples.*",
*(1..22).map { "scala.Tuple$it" }.toTypedArray(),
Expand All @@ -116,6 +136,9 @@ abstract class Integration(private val notebook: Notebook, private val options:
"org.apache.spark.streaming.*",
)

// Needs to be set by integration
var spark: SparkSession? = null

override fun Builder.onLoaded() {
dependencies(*dependencies)
import(*imports)
Expand All @@ -135,27 +158,6 @@ abstract class Integration(private val notebook: Notebook, private val options:
)
)

@Language("kts")
val _0 = execute(
"""
@Deprecated("Use ${displayLimitName}=${properties.displayLimit} in %use magic or ${sparkPropertiesName}.${displayLimitName} = ${properties.displayLimit} instead", ReplaceWith("${sparkPropertiesName}.${displayLimitName}"))
var $displayLimitOld: Int
get() = ${sparkPropertiesName}.${displayLimitName}
set(value) {
println("$displayLimitOld is deprecated: Use ${sparkPropertiesName}.${displayLimitName} instead")
${sparkPropertiesName}.${displayLimitName} = value
}
@Deprecated("Use ${displayTruncateName}=${properties.displayTruncate} in %use magic or ${sparkPropertiesName}.${displayTruncateName} = ${properties.displayTruncate} instead", ReplaceWith("${sparkPropertiesName}.${displayTruncateName}"))
var $displayTruncateOld: Int
get() = ${sparkPropertiesName}.${displayTruncateName}
set(value) {
println("$displayTruncateOld is deprecated: Use ${sparkPropertiesName}.${displayTruncateName} instead")
${sparkPropertiesName}.${displayTruncateName} = value
}
""".trimIndent()
)

onLoaded()
}

Expand All @@ -180,27 +182,119 @@ abstract class Integration(private val notebook: Notebook, private val options:
onShutdown()
}

onClassAnnotation<Sparkify> {
for (klass in it) {
if (klass.isData) {
execute(generateSparkifyClass(klass))
}
}
}

// Render Dataset
render<Dataset<*>> {
with(properties) {
HTML(it.toHtml(limit = displayLimit, truncate = displayTruncate))
}
renderDataset(it)
}

render<RDD<*>> {
with(properties) {
HTML(it.toJavaRDD().toHtml(limit = displayLimit, truncate = displayTruncate))
// using compile time KType, convert this JavaRDDLike to Dataset and render it
notebook.renderersProcessor.registerWithoutOptimizing(
createRendererByCompileTimeType<JavaRDDLike<*, *>> {
if (spark == null) return@createRendererByCompileTimeType it.value.toString()

val rdd = (it.value as JavaRDDLike<*, *>).rdd()
val type = when {
it.type.isSubtypeOf(typeOf<JavaDoubleRDD>()) ->
typeOf<Double>()

it.type.isSubtypeOf(typeOf<JavaPairRDD<*, *>>()) ->
Tuple2::class.createType(
listOf(
it.type.arguments.first(),
it.type.arguments.last(),
)
)

it.type.isSubtypeOf(typeOf<JavaRDD<*>>()) ->
it.type.arguments.first().type!!

else -> it.type.arguments.first().type!!
}
val ds = spark!!.createDataset(rdd, kotlinEncoderFor(type))
renderDataset(ds)
}
}
)

// using compile time KType, convert this RDD to Dataset and render it
notebook.renderersProcessor.registerWithoutOptimizing(
createRendererByCompileTimeType<RDD<*>> {
if (spark == null) return@createRendererByCompileTimeType it.value.toString()

render<JavaRDDLike<*, *>> {
with(properties) {
HTML(it.toHtml(limit = displayLimit, truncate = displayTruncate))
val rdd = it.value as RDD<*>
val type = it.type.arguments.first().type!!
val ds = spark!!.createDataset(rdd, kotlinEncoderFor(type))
renderDataset(ds)
}
)

onLoadedAlsoDo()
}

private fun renderDataset(it: Dataset<*>): MimeTypedResult =
with(properties) {
val showFunction = Dataset::class
.memberFunctions
.firstOrNull { it.name == "showString" && it.valueParameters.size == 3 }

textResult(
if (showFunction != null) {
showFunction.call(it, displayLimit, displayTruncate, false) as String
} else {
// if the function cannot be called, make sure it will call println instead
it.show(displayLimit, displayTruncate)
""
}
)
}

onLoadedAlsoDo()

// TODO wip
private fun generateSparkifyClass(klass: KClass<*>): Code {
// val name = "`${klass.simpleName!!}${'$'}Generated`"
val name = klass.simpleName
val constructorArgs = klass.primaryConstructor!!.parameters
val visibility = klass.visibility?.name?.lowercase() ?: ""
val memberProperties = klass.memberProperties

val properties = constructorArgs.associateWith {
memberProperties.first { it.name == it.name }
}

val constructorParamsCode = properties.entries.joinToString("\n") { (param, prop) ->
// TODO check override
if (param.isOptional) TODO()
val modifier = if (prop is KMutableProperty<*>) "var" else "val"
val paramVisiblity = prop.visibility?.name?.lowercase() ?: ""
val columnName = param.findAnnotation<ColumnName>()?.name ?: param.name!!

"| @get:kotlin.jvm.JvmName(\"$columnName\") $paramVisiblity $modifier ${param.name}: ${param.type},"
}

val productElementWhenParamsCode = properties.entries.joinToString("\n") { (param, _) ->
"| ${param.index} -> this.${param.name}"
}

@Language("kotlin")
val code = """
|$visibility data class $name(
$constructorParamsCode
|): scala.Product, java.io.Serializable {
| override fun canEqual(that: Any?): Boolean = that is $name
| override fun productArity(): Int = ${constructorArgs.size}
| override fun productElement(n: Int): Any = when (n) {
$productElementWhenParamsCode
| else -> throw IndexOutOfBoundsException()
| }
|}
""".trimMargin()
return code
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -25,6 +25,7 @@ package org.jetbrains.kotlinx.spark.api.jupyter
import org.intellij.lang.annotations.Language
import org.jetbrains.kotlinx.jupyter.api.KotlinKernelHost
import org.jetbrains.kotlinx.jupyter.api.Notebook
import org.jetbrains.kotlinx.spark.api.SparkSession
import org.jetbrains.kotlinx.spark.api.jupyter.Properties.Companion.appNameName
import org.jetbrains.kotlinx.spark.api.jupyter.Properties.Companion.sparkMasterName

Expand Down Expand Up @@ -86,7 +87,7 @@ class SparkIntegration(notebook: Notebook, options: MutableMap<String, String?>)
"""
inline fun <reified T> dfOf(vararg arg: T): Dataset<Row> = spark.dfOf(*arg)""".trimIndent(),
"""
inline fun <reified T> emptyDataset(): Dataset<T> = spark.emptyDataset(encoder<T>())""".trimIndent(),
inline fun <reified T> emptyDataset(): Dataset<T> = spark.emptyDataset(kotlinEncoderFor<T>())""".trimIndent(),
"""
inline fun <reified T> dfOf(colNames: Array<String>, vararg arg: T): Dataset<Row> = spark.dfOf(colNames, *arg)""".trimIndent(),
"""
Expand All @@ -108,6 +109,8 @@ class SparkIntegration(notebook: Notebook, options: MutableMap<String, String?>)
"""
inline fun <RETURN, reified NAMED_UDF : NamedUserDefinedFunction<RETURN, *>> UserDefinedFunction<RETURN, NAMED_UDF>.register(name: String): NAMED_UDF = spark.udf().register(name = name, udf = this)""".trimIndent(),
).map(::execute)

spark = execute("spark").value as SparkSession
}

override fun KotlinKernelHost.onShutdown() {
Expand Down
Loading

0 comments on commit e05feac

Please sign in to comment.