Skip to content

Commit

Permalink
Merge pull request #253 from modelix/MODELIX-552
Browse files Browse the repository at this point in the history
MODELIX-552 ClassCastException in .mapIfNotNull
  • Loading branch information
slisson authored Sep 20, 2023
2 parents 1a45e64 + cf51cc9 commit 3437608
Show file tree
Hide file tree
Showing 58 changed files with 90 additions and 399 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -19,7 +19,7 @@ import kotlinx.serialization.Serializable
import kotlinx.serialization.modules.SerializersModule
import kotlinx.serialization.serializer

class AndOperatorStep() : MonoTransformingStep<IZipOutput<Boolean>, Boolean>() {
class AndOperatorStep() : SimpleMonoTransformingStep<IZipOutput<Boolean>, Boolean>() {

override fun transform(evaluationContext: QueryEvaluationContext, input: IZipOutput<Boolean>): Boolean {
return input.values.all { it == true }
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -19,7 +19,7 @@ import kotlinx.serialization.Serializable
import kotlinx.serialization.modules.SerializersModule
import kotlinx.serialization.serializer

class CollectionSizeStep : MonoTransformingStep<Collection<*>, Int>() {
class CollectionSizeStep : SimpleMonoTransformingStep<Collection<*>, Int>() {
override fun getOutputSerializer(serializersModule: SerializersModule): KSerializer<out IStepOutput<Int>> {
return serializersModule.serializer<Int>().stepOutputSerializer(this)
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -99,11 +99,6 @@ class ListCollectorStep<E> : CollectorStep<E, List<E>>() {
val outputList = inputList.map { it.value }
return CollectorStepOutput(inputList, inputList, outputList)
}
override fun aggregate(input: Sequence<IStepOutput<E>>): IStepOutput<List<E>> {
val inputList = input.toList()
val outputList = inputList.map { it.value }
return CollectorStepOutput(inputList, inputList, outputList)
}

@Serializable
@SerialName("toList")
Expand Down Expand Up @@ -132,12 +127,6 @@ class SetCollectorStep<E> : CollectorStep<E, Set<E>>() {
input.collect { if (outputSet.add(it.value)) inputList.add(it) }
return CollectorStepOutput(inputList, inputList, outputSet)
}
override fun aggregate(input: Sequence<IStepOutput<E>>): IStepOutput<Set<E>> {
val inputList = ArrayList<IStepOutput<E>>()
val outputSet = HashSet<E>()
input.forEach { if (outputSet.add(it.value)) inputList.add(it) }
return CollectorStepOutput(inputList, inputList, outputSet)
}

@Serializable
@SerialName("toSet")
Expand Down Expand Up @@ -173,19 +162,6 @@ class MapCollectorStep<K, V> : CollectorStep<IZip2Output<Any?, K, V>, Map<K, V>>
val outputMap: Map<K, V> = internalMap.mapValues { it.value.value }
return CollectorStepOutput(inputList, internalMap, outputMap)
}
override fun aggregate(input: Sequence<IStepOutput<IZip2Output<Any?, K, V>>>): IStepOutput<Map<K, V>> {
val inputList = ArrayList<IStepOutput<IZip2Output<Any?, K, V>>>()
val internalMap = HashMap<K, IStepOutput<V>>()
input.forEach {
val zipStepOutput = it as ZipStepOutput<IZip2Output<Any?, K, V>, Any?>
if (!internalMap.containsKey(it.value.first)) {
inputList.add(it)
internalMap.put(zipStepOutput.values[0].value as K, zipStepOutput.values[1] as IStepOutput<V>)
}
}
val outputMap: Map<K, V> = internalMap.mapValues { it.value.value }
return CollectorStepOutput(inputList, internalMap, outputMap)
}

@Serializable
@SerialName("toMap")
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -56,14 +56,6 @@ open class ConstantSourceStep<E>(val element: E, val type: KType) : ProducingSte
override fun requiresWriteAccess(): Boolean = false
override fun needsCoroutineScope(): Boolean = false

override fun createSequence(evaluationContext: QueryEvaluationContext, queryInput: Sequence<Any?>): Sequence<E> {
return sequenceOf(element)
}

override fun evaluate(evaluationContext: QueryEvaluationContext, queryInput: Any?): Optional<E> {
return Optional.of(element)
}

override fun evaluateStatically(): E {
return element
}
Expand Down Expand Up @@ -172,3 +164,5 @@ fun String?.asMono() = createConstantSourceStep(this)

@JvmName("asMono_nullable")
fun Set<String?>.asMono() = createConstantSourceStep(this)

inline fun <reified T> nullMono(): IMonoStep<T?> = ConstantSourceStep<T?>(null, typeOf<T?>())
Original file line number Diff line number Diff line change
Expand Up @@ -25,8 +25,6 @@ class CountingStep() : AggregationStep<Any?, Int>() {
return input.count().asStepOutput(this)
}

override fun aggregate(input: Sequence<IStepOutput<Any?>>): IStepOutput<Int> = input.count().asStepOutput(this)

override fun createDescriptor(context: QueryGraphDescriptorBuilder) = CountDescriptor()

@Serializable
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -13,19 +13,34 @@
*/
package org.modelix.modelql.core

import kotlinx.coroutines.flow.map
import kotlinx.serialization.KSerializer
import kotlinx.serialization.SerialName
import kotlinx.serialization.Serializable
import kotlinx.serialization.modules.SerializersModule
import kotlinx.serialization.serializer

class EmptyStringIfNullStep : MonoTransformingStep<String?, String>() {

override fun getOutputSerializer(serializersModule: SerializersModule): KSerializer<out IStepOutput<String>> {
return serializersModule.serializer<String>().stepOutputSerializer(this)
val inputSerializer: KSerializer<IStepOutput<String?>> = getProducer().getOutputSerializer(serializersModule).upcast()
return MultiplexedOutputSerializer<String>(
this,
listOf<KSerializer<IStepOutput<String>>>(
inputSerializer as KSerializer<IStepOutput<String>>,
serializersModule.serializer<String>().stepOutputSerializer(this).upcast(),
),
)
}

override fun transform(evaluationContext: QueryEvaluationContext, input: String?): String {
return input ?: ""
override fun createFlow(input: StepFlow<String?>, context: IFlowInstantiationContext): StepFlow<String> {
return input.map {
if (it.value == null) {
MultiplexedOutput(1, "".asStepOutput(this))
} else {
MultiplexedOutput(0, it.upcast())
}
}
}

override fun createDescriptor(context: QueryGraphDescriptorBuilder): StepDescriptor {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -43,15 +43,6 @@ class FilteringStep<E>(val condition: MonoUnboundQuery<E, Boolean?>) : Transform
// return input.filter { condition.evaluate(it.value).presentAndEqual(true) }
}

override fun createSequence(evaluationContext: QueryEvaluationContext, queryInput: Sequence<Any?>): Sequence<E> {
return getProducer().createSequence(evaluationContext, queryInput).filter {
condition.evaluate(
evaluationContext,
it,
).presentAndEqual(true)
}
}

override fun getOutputSerializer(serializersModule: SerializersModule): KSerializer<out IStepOutput<E>> {
return getProducer().getOutputSerializer(serializersModule)
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -28,14 +28,6 @@ class FirstElementStep<E>() : MonoTransformingStep<E, E>() {

override fun requiresSingularQueryInput(): Boolean = true

override fun transform(evaluationContext: QueryEvaluationContext, input: IStepOutput<E>): IStepOutput<E> {
return input
}

override fun transform(evaluationContext: QueryEvaluationContext, input: E): E {
return input
}

override fun toString(): String {
return getProducer().toString() + ".first()"
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -26,10 +26,6 @@ class FirstOrNullStep<E>() : AggregationStep<E, E?>() {
?: MultiplexedOutput(1, null.asStepOutput(this))
}

override fun aggregate(input: Sequence<IStepOutput<E>>): IStepOutput<E?> {
return input.firstOrNull() ?: null.asStepOutput(this)
}

override fun toString(): String {
return "${getProducer()}.firstOrNull()"
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -36,10 +36,6 @@ class FlatMapStep<In, Out>(val query: FluxUnboundQuery<In, Out>) : TransformingS
return input.flatMapConcat { query.asFlow(context.evaluationContext, it) }
}

override fun createSequence(evaluationContext: QueryEvaluationContext, queryInput: Sequence<Any?>): Sequence<Out> {
return query.asSequence(evaluationContext, getProducer().createSequence(evaluationContext, queryInput))
}

override fun getOutputSerializer(serializersModule: SerializersModule): KSerializer<out IStepOutput<Out>> {
return query.outputStep.getOutputSerializer(serializersModule)
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -21,10 +21,6 @@ abstract class FoldingStep<In, Out>(private val initial: Out) : AggregationStep<
return input.fold(initial) { acc, value -> fold(acc, value.value) }.asStepOutput(this)
}

override fun aggregate(input: Sequence<IStepOutput<In>>): IStepOutput<Out> {
return input.fold(initial) { acc, value -> fold(acc, value.value) }.asStepOutput(this)
}

private var result: Out = initial

protected abstract fun fold(acc: Out, value: In): Out
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -70,21 +70,6 @@ interface IProducingStep<out E> : IStep {
fun getOutputSerializer(serializersModule: SerializersModule): KSerializer<out IStepOutput<E>>
fun createFlow(context: IFlowInstantiationContext): StepFlow<E>

/**
* Flows usually provide better performance, but if suspending is not possible you can iterate using a sequence.
*/
fun createSequence(evaluationContext: QueryEvaluationContext, queryInput: Sequence<Any?>): Sequence<E>

/**
* Even higher performance for producers that output exactly one element
*/
fun evaluate(evaluationContext: QueryEvaluationContext, queryInput: Any?): Optional<E> {
return createSequence(evaluationContext, sequenceOf(queryInput))
.map { Optional.of(it) }
.ifEmpty { sequenceOf(Optional.empty<E>()) }
.single()
}

fun outputIsConsumedMultipleTimes(): Boolean {
return getConsumers().size > 1 || getConsumers().any { it.inputIsConsumedMultipleTimes() }
}
Expand Down Expand Up @@ -181,30 +166,12 @@ abstract class MonoTransformingStep<In, Out> : TransformingStep<In, Out>(), IMon

fun connectAndDowncast(producer: IMonoStep<In>): IMonoStep<Out> = also { producer.connect(it) }
fun connectAndDowncast(producer: IFluxStep<In>): IFluxStep<Out> = also { producer.connect(it) }
}

abstract class SimpleMonoTransformingStep<In, Out> : MonoTransformingStep<In, Out>() {
override fun createFlow(input: StepFlow<In>, context: IFlowInstantiationContext): StepFlow<Out> {
return input.map { transform(context.evaluationContext, it) }
}

override fun createSequence(evaluationContext: QueryEvaluationContext, queryInput: Sequence<Any?>): Sequence<Out> {
return createTransformingSequence(
evaluationContext,
getProducer().createSequence(evaluationContext, queryInput),
)
}

open fun createTransformingSequence(evaluationContext: QueryEvaluationContext, input: Sequence<In>): Sequence<Out> {
return input.map { transform(evaluationContext, it) }
}

override fun evaluate(evaluationContext: QueryEvaluationContext, queryInput: Any?): Optional<Out> {
return getProducer().evaluate(evaluationContext, queryInput).map { transform(evaluationContext, it) }
return input.map { transform(context.evaluationContext, it.value).asStepOutput(this) }
}

protected open fun transform(evaluationContext: QueryEvaluationContext, input: IStepOutput<In>): IStepOutput<Out> {
return transform(evaluationContext, input.value).asStepOutput(this)
}

abstract fun transform(evaluationContext: QueryEvaluationContext, input: In): Out
}

Expand Down Expand Up @@ -233,30 +200,11 @@ abstract class AggregationStep<In, Out> : MonoTransformingStep<In, Out>() {
}
}

override fun transform(evaluationContext: QueryEvaluationContext, input: IStepOutput<In>): IStepOutput<Out> {
return aggregate(sequenceOf(input))
}

override fun transform(evaluationContext: QueryEvaluationContext, input: In): Out {
return aggregate(sequenceOf(input.asStepOutput(null))).value
}

override fun evaluate(evaluationContext: QueryEvaluationContext, queryInput: Any?): Optional<Out> {
return Optional.of(
aggregate(
getProducer().createSequence(evaluationContext, sequenceOf(queryInput)).map {
it.asStepOutput(null)
},
).value,
)
}

override fun needsCoroutineScope() = outputIsConsumedMultipleTimes()

override fun inputIsConsumedMultipleTimes(): Boolean {
return false
}

protected abstract suspend fun aggregate(input: StepFlow<In>): IStepOutput<Out>
protected abstract fun aggregate(input: Sequence<IStepOutput<In>>): IStepOutput<Out>
}
Original file line number Diff line number Diff line change
Expand Up @@ -27,10 +27,6 @@ open class IdentityStep<E> : TransformingStep<E, E>(), IFluxOrMonoStep<E> {
return input
}

override fun createSequence(evaluationContext: QueryEvaluationContext, queryInput: Sequence<Any?>): Sequence<E> {
return getProducer().createSequence(evaluationContext, queryInput)
}

override fun canBeEmpty(): Boolean = getProducer().canBeEmpty()

override fun canBeMultiple(): Boolean = getProducer().canBeMultiple()
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -26,15 +26,10 @@ class IfEmptyStep<In : Out, Out>(val alternative: UnboundQuery<Unit, *, Out>) :
override fun createFlow(input: StepFlow<In>, context: IFlowInstantiationContext): StepFlow<Out> {
val downCastedInput: StepFlow<Out> = input
return downCastedInput.map { MultiplexedOutput(0, it) }.onEmpty {
emitAll(alternative.asFlow(context.evaluationContext, Unit).map { MultiplexedOutput(1, it) })
emitAll(alternative.asFlow(context.evaluationContext, Unit.asStepOutput(null)).map { MultiplexedOutput(1, it) })
}
}

override fun createSequence(evaluationContext: QueryEvaluationContext, queryInput: Sequence<Any?>): Sequence<Out> {
return getProducer().createSequence(evaluationContext, queryInput)
.ifEmpty { alternative.outputStep.createSequence(evaluationContext, sequenceOf(Unit)) }
}

override fun canBeEmpty(): Boolean = alternative.outputStep.canBeEmpty()

override fun canBeMultiple(): Boolean = getProducer().canBeMultiple() || alternative.outputStep.canBeMultiple()
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -19,7 +19,7 @@ import kotlinx.serialization.Serializable
import kotlinx.serialization.modules.SerializersModule
import kotlinx.serialization.serializer

class IntSumStep(val operand: Int) : MonoTransformingStep<Int, Int>() {
class IntSumStep(val operand: Int) : SimpleMonoTransformingStep<Int, Int>() {

override fun transform(evaluationContext: QueryEvaluationContext, input: Int): Int {
return input + operand
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -28,8 +28,6 @@ class IsEmptyStep() : AggregationStep<Any?, Boolean>() {
return input.take(1).map { false }.onEmpty { emit(true) }.single().asStepOutput(this)
}

override fun aggregate(input: Sequence<IStepOutput<Any?>>): IStepOutput<Boolean> = input.none().asStepOutput(this)

override fun createDescriptor(context: QueryGraphDescriptorBuilder) = Descriptor()

@Serializable
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -19,7 +19,7 @@ import kotlinx.serialization.Serializable
import kotlinx.serialization.modules.SerializersModule
import kotlinx.serialization.serializer

class IsNullPredicateStep<In>() : MonoTransformingStep<In, Boolean>() {
class IsNullPredicateStep<In>() : SimpleMonoTransformingStep<In, Boolean>() {

override fun transform(evaluationContext: QueryEvaluationContext, input: In): Boolean {
return input == null
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -53,10 +53,6 @@ class JoinStep<E>() : ProducingStep<E>(), IConsumingStep<E>, IFluxStep<E> {
.asFlow().flattenConcat()
}

override fun createSequence(evaluationContext: QueryEvaluationContext, queryInput: Sequence<Any?>): Sequence<E> {
return producers.asSequence().flatMap { it.createSequence(evaluationContext, queryInput) }
}

override fun getOutputSerializer(serializersModule: SerializersModule): KSerializer<out IStepOutput<E>> {
return MultiplexedOutputSerializer(this, getProducers().map { it.getOutputSerializer(serializersModule).upcast() })
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -13,6 +13,7 @@
*/
package org.modelix.modelql.core

import kotlinx.coroutines.flow.map
import kotlinx.serialization.KSerializer
import kotlinx.serialization.descriptors.SerialDescriptor
import kotlinx.serialization.encoding.Decoder
Expand All @@ -25,8 +26,8 @@ open class LocalMappingStep<In, Out>(val transformation: (In) -> Out) : MonoTran
return LocalMappingSerializer(this, getProducer().getOutputSerializer(serializersModule)).stepOutputSerializer(this)
}

override fun transform(evaluationContext: QueryEvaluationContext, input: In): Out {
return transformation(input)
override fun createFlow(input: StepFlow<In>, context: IFlowInstantiationContext): StepFlow<Out> {
return input.map { transformation(it.value).asStepOutput(this) }
}

override fun createDescriptor(context: QueryGraphDescriptorBuilder): StepDescriptor {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -32,20 +32,12 @@ class MapIfNotNullStep<In : Any, Out>(val query: MonoUnboundQuery<In, Out>) : Mo
}

override fun createFlow(input: StepFlow<In?>, context: IFlowInstantiationContext): StepFlow<Out?> {
return input.flatMapConcat {
it.value?.let { query.asFlow(context.evaluationContext, it).map { MultiplexedOutput(1, it) } }
?: flowOf(MultiplexedOutput(0, it as IStepOutput<Out?>))
return input.flatMapConcat { stepOutput ->
stepOutput.value?.let { query.asFlow(context.evaluationContext, stepOutput.upcast()).map { MultiplexedOutput(1, it) } }
?: flowOf(MultiplexedOutput(0, stepOutput.upcast()))
}
}

override fun transform(evaluationContext: QueryEvaluationContext, input: IStepOutput<In?>): IStepOutput<Out?> {
throw UnsupportedOperationException("use MapIfNotNullStep.createFlow")
}

override fun transform(evaluationContext: QueryEvaluationContext, input: In?): Out? {
return input?.let { query.outputStep.evaluate(evaluationContext, it).getOrElse(null) }
}

override fun getOutputSerializer(serializersModule: SerializersModule): KSerializer<out IStepOutput<Out?>> {
val inputSerializer: KSerializer<out IStepOutput<In?>> = getProducer().getOutputSerializer(serializersModule)
val mappedSerializer: KSerializer<out IStepOutput<Out>> = query.getElementOutputSerializer(serializersModule)
Expand Down
Loading

0 comments on commit 3437608

Please sign in to comment.