241 lines (176 loc) · 7.73 KB

In Predicate Expression

In is a predicate expression (i.e. the result data type is always boolean).

In is created when:


Use Catalyst DSL’s in operator to create an In expression.

in(list: Expression*): Expression
// Using Catalyst DSL to create an In expression
import org.apache.spark.sql.catalyst.dsl.expressions._

// HACK: Disable symbolToColumn implicit conversion
// It is imported automatically in spark-shell (and makes demos impossible)
// implicit def symbolToColumn(s: Symbol): org.apache.spark.sql.ColumnName
trait ThatWasABadIdea
implicit def symbolToColumn(ack: ThatWasABadIdea) = ack

val value = 'a.long
import org.apache.spark.sql.catalyst.expressions.{Expression, Literal}
import org.apache.spark.sql.types.StringType
val list: Seq[Expression] = Seq(1, Literal.create(null, StringType), true)

val e = value in (list: _*)

scala> :type e

scala> println(e.dataType)

scala> println(e.sql)
(`a` IN (1, CAST(NULL AS STRING), true))

In expression can be evaluated to a boolean value (i.e. true or false) or the special value null.

import org.apache.spark.sql.functions.lit
val value = lit(null)
val list = Seq(lit(1))
val in = (value isin (list: _*)).expr

scala> println(in.sql)
(NULL IN (1))

import org.apache.spark.sql.catalyst.InternalRow
val input = InternalRow(1, "hello")

// Case 1: value.eval(input) was null => null
val evaluatedValue = in.eval(input)
assert(evaluatedValue == null)

// Case 2: v = e.eval(input) && ordering.equiv(v, evaluatedValue) => true
val value = lit(1)
val list = Seq(lit(1))
val in = (value isin (list: _*)).expr
val evaluatedValue = in.eval(input)

// Case 3: e.eval(input) = null and no ordering.equiv(v, evaluatedValue) => null
val value = lit(1)
val list = Seq(lit(null), lit(2))
val in = (value isin (list: _*)).expr
scala> println(in.sql)
(1 IN (NULL, 2))

val evaluatedValue = in.eval(input)
assert(evaluatedValue == null)

// Case 4: false
val value = lit(1)
val list = Seq(0, 2, 3).map(lit)
val in = (value isin (list: _*)).expr
scala> println(in.sql)
(1 IN (0, 2, 3))

val evaluatedValue = in.eval(input)
assert(evaluatedValue.asInstanceOf[Boolean] == false)

In takes the following when created:

Expression list must not be null (but can have expressions that can be evaluated to null).

In uses the following text representation (i.e. toString):

[value] IN [list]
import org.apache.spark.sql.catalyst.expressions.{In, Literal}
import org.apache.spark.sql.{functions => f}
val in = In(value = Literal(1), list = Seq(f.array("1", "2", "3").expr))
scala> println(in)
1 IN (array('1, '2, '3))

In has the following SQL representation:

([valueSQL] IN ([listSQL]))
import org.apache.spark.sql.catalyst.expressions.{In, Literal}
import org.apache.spark.sql.{functions => f}
val in = In(value = Literal(1), list = Seq(f.array("1", "2", "3").expr))
scala> println(in.sql)
(1 IN (array(`1`, `2`, `3`)))

In expression is inSetConvertible when the list contains Literal expressions only.

// FIXME Example 1: inSetConvertible true

// FIXME Example 2: inSetConvertible false

In expressions are analyzed using the following rules:

In expression has a custom support in InMemoryTableScanExec physical operator.

// Demo: InMemoryTableScanExec and In expression
// 1. Create an In(a: AttributeReference, list: Seq[Literal]) with the list.nonEmpty
// 2. Use InMemoryTableScanExec.buildFilter partial function to produce the expression
Table 1. In’s Internal Properties (e.g. Registries, Counters and Flags)
Name Description


Scala’s Ordering instance that represents a strategy for sorting instances of a type.

Lazily-instantiated using TypeUtils.getInterpretedOrdering for the data type of the value expression.

Used exclusively when In is requested to evaluate a value for a given input row.

checkInputDataTypes Method

checkInputDataTypes(): TypeCheckResult
checkInputDataTypes is part of Expression Contract to…​FIXME.


Evaluating In Expression — eval Method

eval(input: InternalRow): Any
eval is part of Expression Contract for the interpreted (non-code-generated) expression evaluation, i.e. evaluating a Catalyst expression to a JVM object for a given internal binary row.

eval requests value expression to evaluate a value for the input internal row.

If the evaluated value is null, eval gives null too.

eval takes every expression in list expressions and requests them to evaluate a value for the input internal row. If any of the evaluated value is not null and equivalent in the ordering, eval returns true.

eval records whether any of the expressions in list expressions gave null value. If no list expression led to true (per ordering), eval returns null if any list expression evaluated to null or false.

Generating Java Source Code (ExprCode) For Code-Generated Expression Evaluation — doGenCode Method

doGenCode(ctx: CodegenContext, ev: ExprCode): ExprCode
doGenCode is part of Expression Contract to generate a Java source code (ExprCode) for code-generated expression evaluation.


val in = $"id" isin (1, 2, 3)
val q = spark.range(4).filter(in)
val plan = q.queryExecution.executedPlan

import org.apache.spark.sql.execution.FilterExec
val filterExec = plan.collectFirst { case f: FilterExec => f }.get

import org.apache.spark.sql.catalyst.expressions.In
val inExpr = filterExec.expressions.head.asInstanceOf[In]

import org.apache.spark.sql.execution.WholeStageCodegenExec
val wsce = plan.asInstanceOf[WholeStageCodegenExec]
val (ctx, code) = wsce.doCodeGen

import org.apache.spark.sql.catalyst.expressions.codegen.CodeFormatter
scala> println(CodeFormatter.format(code))
...code omitted

// FIXME Make it work
// I thought I'd reuse ctx to have expression: id#14L evaluated