Skip to content

Commit

Permalink
Caught ArrayIndexOutOfBounds exception in Univocity parser and rethro…
Browse files Browse the repository at this point in the history
…wn it with more descriptive message
  • Loading branch information
vladanvasi-db committed Aug 28, 2024
1 parent 1bdbe14 commit 960bc24
Show file tree
Hide file tree
Showing 2 changed files with 56 additions and 7 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -21,9 +21,10 @@ import java.io.InputStream

import scala.util.control.NonFatal

import com.univocity.parsers.common.TextParsingException
import com.univocity.parsers.csv.CsvParser

import org.apache.spark.SparkUpgradeException
import org.apache.spark.{SparkRuntimeException, SparkUpgradeException}
import org.apache.spark.internal.Logging
import org.apache.spark.sql.catalyst.{InternalRow, NoopFilters, OrderedFilters}
import org.apache.spark.sql.catalyst.expressions.{ExprUtils, GenericInternalRow}
Expand Down Expand Up @@ -294,6 +295,19 @@ class UnivocityParser(
}
}

private def parseLine(line: String): Array[String] = {
try {
tokenizer.parseLine(line)
}
catch {
case e: TextParsingException if e.getCause.isInstanceOf[ArrayIndexOutOfBoundsException] =>
throw new SparkRuntimeException(
errorClass = "MALFORMED_CSV_RECORD",
messageParameters = Map("badRecord" -> line)
)
}
}

/**
* Parses a single CSV string and turns it into either one resulting row or no row (if the
* the record is malformed).
Expand All @@ -306,7 +320,7 @@ class UnivocityParser(
(_: String) => Some(InternalRow.empty)
} else {
// parse if the columnPruning is disabled or requiredSchema is nonEmpty
(input: String) => convert(tokenizer.parseLine(input))
(input: String) => convert(parseLine(input))
}
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -23,12 +23,12 @@ import java.util.{Locale, TimeZone}

import org.apache.commons.lang3.time.FastDateFormat

import org.apache.spark.{SparkFunSuite, SparkIllegalArgumentException}
import org.apache.spark.{SparkFunSuite, SparkIllegalArgumentException, SparkRuntimeException}
import org.apache.spark.sql.catalyst.InternalRow
import org.apache.spark.sql.catalyst.plans.SQLHelper
import org.apache.spark.sql.catalyst.util.{BadRecordException, DateTimeUtils}
import org.apache.spark.sql.catalyst.util.DateTimeConstants._
import org.apache.spark.sql.catalyst.util.DateTimeTestUtils._
import org.apache.spark.sql.catalyst.util.DateTimeUtils
import org.apache.spark.sql.sources.{EqualTo, Filter, StringStartsWith}
import org.apache.spark.sql.types._
import org.apache.spark.unsafe.types.UTF8String
Expand Down Expand Up @@ -98,7 +98,7 @@ class UnivocityParserSuite extends SparkFunSuite with SQLHelper {
test("Throws exception for empty string with non null type") {
val options = new CSVOptions(Map.empty[String, String], false, "UTC")
val parser = new UnivocityParser(StructType(Seq.empty), options)
val exception = intercept[RuntimeException]{
val exception = intercept[RuntimeException] {
parser.makeConverter("_1", IntegerType, nullable = false).apply("")
}
assert(exception.getMessage.contains("null value found but field _1 is not nullable."))
Expand All @@ -125,7 +125,7 @@ class UnivocityParserSuite extends SparkFunSuite with SQLHelper {
timestampsOptions.locale)
val expectedTime = format.parse(customTimestamp).getTime
val castedTimestamp = parser.makeConverter("_1", TimestampType, nullable = true)
.apply(customTimestamp)
.apply(customTimestamp)
assert(castedTimestamp == expectedTime * 1000L)

val customDate = "31/01/2015"
Expand All @@ -137,7 +137,7 @@ class UnivocityParserSuite extends SparkFunSuite with SQLHelper {
dateOptions.locale)
val expectedDate = DateTimeUtils.millisToMicros(format.parse(customDate).getTime)
val castedDate = parser.makeConverter("_1", DateType, nullable = true)
.apply(customDate)
.apply(customDate)
assert(castedDate == DateTimeUtils.microsToDays(expectedDate, UTC))

val timestamp = "2015-01-01 00:00:00"
Expand Down Expand Up @@ -323,6 +323,41 @@ class UnivocityParserSuite extends SparkFunSuite with SQLHelper {
parameters = Map("fieldName" -> "`i`", "fields" -> ""))
}

test("Bad records test in permissive mode") {
def checkBadRecord(
input: String = "1,a",
dataSchema: StructType = StructType.fromDDL("i INTEGER, s STRING, d DOUBLE"),
requiredSchema: StructType = StructType.fromDDL("i INTEGER, s STRING"),
options: Map[String, String] = Map("mode" -> "PERMISSIVE")): BadRecordException = {
val csvOptions = new CSVOptions(options, false, "UTC")
val parser = new UnivocityParser(dataSchema, requiredSchema, csvOptions, Seq())
intercept[BadRecordException] {
parser.parse(input)
}
}

// Bad record exception caused by conversion error
checkBadRecord(input = "1.5,a,10.3")

// Bad record exception caused by insufficient number of columns
checkBadRecord(input = "2")
}

test("Array index out of bounds when parsing CSV with more columns than expected") {
val input = "1,string,3.14,5,7"
val dataSchema: StructType = StructType.fromDDL("i INTEGER, a STRING")
val requiredSchema: StructType = StructType.fromDDL("i INTEGER, a STRING")
val options = new CSVOptions(Map("maxColumns" -> "2"), false, "UTC")
val filters = Seq()
val parser = new UnivocityParser(dataSchema, requiredSchema, options, filters)
checkError(
exception = intercept[SparkRuntimeException] {
parser.parse(input)
},
errorClass = "MALFORMED_CSV_RECORD",
parameters = Map("badRecord" -> "1,string,3.14,5,7"))
}

test("SPARK-30960: parse date/timestamp string with legacy format") {
def check(parser: UnivocityParser): Unit = {
// The legacy format allows 1 or 2 chars for some fields.
Expand Down

0 comments on commit 960bc24

Please sign in to comment.