Skip to content

Commit

Permalink
Added e2e test for parsing error
Browse files Browse the repository at this point in the history
  • Loading branch information
vladanvasi-db committed Sep 9, 2024
1 parent cde3e38 commit 5cf7caa
Show file tree
Hide file tree
Showing 3 changed files with 102 additions and 61 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -98,7 +98,7 @@ class UnivocityParserSuite extends SparkFunSuite with SQLHelper {
test("Throws exception for empty string with non null type") {
val options = new CSVOptions(Map.empty[String, String], false, "UTC")
val parser = new UnivocityParser(StructType(Seq.empty), options)
val exception = intercept[RuntimeException] {
val exception = intercept[RuntimeException]{
parser.makeConverter("_1", IntegerType, nullable = false).apply("")
}
assert(exception.getMessage.contains("null value found but field _1 is not nullable."))
Expand All @@ -125,7 +125,7 @@ class UnivocityParserSuite extends SparkFunSuite with SQLHelper {
timestampsOptions.locale)
val expectedTime = format.parse(customTimestamp).getTime
val castedTimestamp = parser.makeConverter("_1", TimestampType, nullable = true)
.apply(customTimestamp)
.apply(customTimestamp)
assert(castedTimestamp == expectedTime * 1000L)

val customDate = "31/01/2015"
Expand All @@ -137,7 +137,7 @@ class UnivocityParserSuite extends SparkFunSuite with SQLHelper {
dateOptions.locale)
val expectedDate = DateTimeUtils.millisToMicros(format.parse(customDate).getTime)
val castedDate = parser.makeConverter("_1", DateType, nullable = true)
.apply(customDate)
.apply(customDate)
assert(castedDate == DateTimeUtils.microsToDays(expectedDate, UTC))

val timestamp = "2015-01-01 00:00:00"
Expand Down
1 change: 1 addition & 0 deletions sql/core/src/test/resources/test-data/more-columns.csv
Original file line number Diff line number Diff line change
@@ -0,0 +1 @@
1,3.14,string,5,7
Original file line number Diff line number Diff line change
Expand Up @@ -85,16 +85,17 @@ abstract class CSVSuite
private val badAfterGoodFile = "test-data/bad_after_good.csv"
private val malformedRowFile = "test-data/malformedRow.csv"
private val charFile = "test-data/char.csv"
private val moreColumnsFile = "test-data/more-columns.csv"

/** Verifies data and schema. */
private def verifyCars(
df: DataFrame,
withHeader: Boolean,
numCars: Int = 3,
numFields: Int = 5,
checkHeader: Boolean = true,
checkValues: Boolean = true,
checkTypes: Boolean = false): Unit = {
df: DataFrame,
withHeader: Boolean,
numCars: Int = 3,
numFields: Int = 5,
checkHeader: Boolean = true,
checkValues: Boolean = true,
checkTypes: Boolean = false): Unit = {

val numColumns = numFields
val numRows = if (withHeader) numCars else numCars + 1
Expand Down Expand Up @@ -208,24 +209,24 @@ abstract class CSVSuite

test("test with tab delimiter and double quote") {
val cars = spark.read
.options(Map("quote" -> "\"", "delimiter" -> """\t""", "header" -> "true"))
.csv(testFile(carsTsvFile))
.options(Map("quote" -> "\"", "delimiter" -> """\t""", "header" -> "true"))
.csv(testFile(carsTsvFile))

verifyCars(cars, numFields = 6, withHeader = true, checkHeader = false)
}

test("SPARK-24540: test with multiple character delimiter (comma space)") {
val cars = spark.read
.options(Map("quote" -> "\'", "delimiter" -> ", ", "header" -> "true"))
.csv(testFile(carsMultiCharDelimitedFile))
.options(Map("quote" -> "\'", "delimiter" -> ", ", "header" -> "true"))
.csv(testFile(carsMultiCharDelimitedFile))

verifyCars(cars, withHeader = true)
}

test("SPARK-24540: test with multiple (crazy) character delimiter") {
val cars = spark.read
.options(Map("quote" -> "\'", "delimiter" -> """_/-\\_""", "header" -> "true"))
.csv(testFile(carsMultiCharCrazyDelimitedFile))
.options(Map("quote" -> "\'", "delimiter" -> """_/-\\_""", "header" -> "true"))
.csv(testFile(carsMultiCharCrazyDelimitedFile))

verifyCars(cars, withHeader = true)

Expand Down Expand Up @@ -280,9 +281,9 @@ abstract class CSVSuite
// scalastyle:off
spark.sql(
s"""
|CREATE TEMPORARY VIEW carsTable USING csv
|OPTIONS (path "${testFile(carsFile8859)}", header "true",
|charset "iso-8859-1", delimiter "þ")
|CREATE TEMPORARY VIEW carsTable USING csv
|OPTIONS (path "${testFile(carsFile8859)}", header "true",
|charset "iso-8859-1", delimiter "þ")
""".stripMargin.replaceAll("\n", " "))
// scalastyle:on
verifyCars(spark.table("carsTable"), withHeader = true)
Expand Down Expand Up @@ -318,8 +319,8 @@ abstract class CSVSuite
withView("carsTable") {
spark.sql(
s"""
|CREATE TEMPORARY VIEW carsTable USING csv
|OPTIONS (path "${testFile(carsTsvFile)}", header "true", delimiter "\t")
|CREATE TEMPORARY VIEW carsTable USING csv
|OPTIONS (path "${testFile(carsTsvFile)}", header "true", delimiter "\t")
""".stripMargin.replaceAll("\n", " "))

verifyCars(spark.table("carsTable"), numFields = 6, withHeader = true, checkHeader = false)
Expand All @@ -330,11 +331,11 @@ abstract class CSVSuite
withView("carsTable") {
spark.sql(
s"""
|CREATE TEMPORARY VIEW carsTable
|(yearMade double, makeName string, modelName string, priceTag decimal,
| comments string, grp string)
|USING csv
|OPTIONS (path "${testFile(carsTsvFile)}", header "true", delimiter "\t")
|CREATE TEMPORARY VIEW carsTable
|(yearMade double, makeName string, modelName string, priceTag decimal,
| comments string, grp string)
|USING csv
|OPTIONS (path "${testFile(carsTsvFile)}", header "true", delimiter "\t")
""".stripMargin.replaceAll("\n", " "))

assert(
Expand Down Expand Up @@ -436,10 +437,10 @@ abstract class CSVSuite
withView("carsTable") {
spark.sql(
s"""
|CREATE TEMPORARY VIEW carsTable
|(yearMade double, makeName string, modelName string, comments string, grp string)
|USING csv
|OPTIONS (path "${testFile(emptyFile)}", header "false")
|CREATE TEMPORARY VIEW carsTable
|(yearMade double, makeName string, modelName string, comments string, grp string)
|USING csv
|OPTIONS (path "${testFile(emptyFile)}", header "false")
""".stripMargin.replaceAll("\n", " "))

assert(spark.sql("SELECT count(*) FROM carsTable").collect().head(0) === 0)
Expand All @@ -450,10 +451,10 @@ abstract class CSVSuite
withView("carsTable") {
spark.sql(
s"""
|CREATE TEMPORARY VIEW carsTable
|(yearMade double, makeName string, modelName string, comments string, blank string)
|USING csv
|OPTIONS (path "${testFile(carsFile)}", header "true")
|CREATE TEMPORARY VIEW carsTable
|(yearMade double, makeName string, modelName string, comments string, blank string)
|USING csv
|OPTIONS (path "${testFile(carsFile)}", header "true")
""".stripMargin.replaceAll("\n", " "))

val cars = spark.table("carsTable")
Expand Down Expand Up @@ -687,8 +688,8 @@ abstract class CSVSuite

val expected =
Seq(Seq(1, 2, 3, 4, 5.01D, Timestamp.valueOf("2015-08-20 15:57:00")),
Seq(6, 7, 8, 9, 0, Timestamp.valueOf("2015-08-21 16:58:01")),
Seq(1, 2, 3, 4, 5, Timestamp.valueOf("2015-08-23 18:00:42")))
Seq(6, 7, 8, 9, 0, Timestamp.valueOf("2015-08-21 16:58:01")),
Seq(1, 2, 3, 4, 5, Timestamp.valueOf("2015-08-23 18:00:42")))

assert(results.toSeq.map(_.toSeq) === expected)
}
Expand Down Expand Up @@ -1118,7 +1119,8 @@ abstract class CSVSuite

test("SPARK-37326: Roundtrip in reading and writing TIMESTAMP_NTZ values with custom schema") {
withTempPath { path =>
val exp = spark.sql("""
val exp = spark.sql(
"""
select
timestamp_ntz'2020-12-12 12:12:12' as col1,
timestamp_ltz'2020-12-12 12:12:12' as col2
Expand Down Expand Up @@ -1173,7 +1175,8 @@ abstract class CSVSuite
} else {
checkAnswer(
res,
spark.sql("""
spark.sql(
"""
select timestamp_ltz'2020-12-12 12:12:12' as col0 union all
select timestamp_ltz'2020-12-12 12:12:12' as col0
""")
Expand Down Expand Up @@ -1209,7 +1212,8 @@ abstract class CSVSuite
val exp = spark.read.format("csv").option("header", "true").load(path.getAbsolutePath)
checkAnswer(res, exp)
} else {
val exp = spark.sql("""
val exp = spark.sql(
"""
select timestamp_ltz'2020-12-12T12:12:12.000' as col0 union all
select timestamp_ltz'2020-12-12T17:12:12.000Z' as col0 union all
select timestamp_ltz'2020-12-12T17:12:12.000+05:00' as col0 union all
Expand Down Expand Up @@ -1274,8 +1278,8 @@ abstract class CSVSuite
val msg = ex.getCause.getMessage
assert(
msg.contains("Unsupported field: OffsetSeconds") ||
msg.contains("Unable to extract value") ||
msg.contains("Unable to extract ZoneId"))
msg.contains("Unable to extract value") ||
msg.contains("Unable to extract ZoneId"))
}
}
}
Expand Down Expand Up @@ -1441,7 +1445,7 @@ abstract class CSVSuite
Seq("1").toDF().write.text(path.getAbsolutePath)
val schema = StructType(
StructField("a", IntegerType, true) ::
StructField("b", IntegerType, true) :: Nil)
StructField("b", IntegerType, true) :: Nil)
val df = spark.read
.schema(schema)
.option("header", "false")
Expand All @@ -1463,8 +1467,8 @@ abstract class CSVSuite
.csv(testFile(valueMalformedFile))
checkAnswer(df1,
Row(0, null) ::
Row(1, java.sql.Date.valueOf("1983-08-04")) ::
Nil)
Row(1, java.sql.Date.valueOf("1983-08-04")) ::
Nil)

// If `schema` has `columnNameOfCorruptRecord`, it should handle corrupt records
val columnNameOfCorruptRecord = "_unparsed"
Expand All @@ -1478,8 +1482,8 @@ abstract class CSVSuite
.csv(testFile(valueMalformedFile))
checkAnswer(df2,
Row(0, null, "0,2013-111_11 12:13:14") ::
Row(1, java.sql.Date.valueOf("1983-08-04"), null) ::
Nil)
Row(1, java.sql.Date.valueOf("1983-08-04"), null) ::
Nil)

// We put a `columnNameOfCorruptRecord` field in the middle of a schema
val schemaWithCorrField2 = new StructType()
Expand All @@ -1495,8 +1499,8 @@ abstract class CSVSuite
.csv(testFile(valueMalformedFile))
checkAnswer(df3,
Row(0, "0,2013-111_11 12:13:14", null) ::
Row(1, null, java.sql.Date.valueOf("1983-08-04")) ::
Nil)
Row(1, null, java.sql.Date.valueOf("1983-08-04")) ::
Nil)

checkError(
exception = intercept[AnalysisException] {
Expand Down Expand Up @@ -1611,7 +1615,7 @@ abstract class CSVSuite
"92233720368547758070",
"\n\n1.7976931348623157E308",
"true",
null)
null)
checkAnswer(df, expected)
}
}
Expand Down Expand Up @@ -1742,14 +1746,14 @@ abstract class CSVSuite
checkAnswer(
readback,
Row(0, null, "0,2013-111_11 12:13:14") ::
Row(1, Date.valueOf("1983-08-04"), null) :: Nil)
Row(1, Date.valueOf("1983-08-04"), null) :: Nil)
checkAnswer(
readback.filter($"corrRec".isNotNull),
Row(0, null, "0,2013-111_11 12:13:14"))
checkAnswer(
readback.select($"corrRec", $"b"),
Row("0,2013-111_11 12:13:14", null) ::
Row(null, Date.valueOf("1983-08-04")) :: Nil)
Row(null, Date.valueOf("1983-08-04")) :: Nil)
checkAnswer(
readback.filter($"corrRec".isNull && $"a" === 1),
Row(1, Date.valueOf("1983-08-04"), null) :: Nil)
Expand Down Expand Up @@ -2090,10 +2094,10 @@ abstract class CSVSuite
odf.write.option("header", false).csv(path.getCanonicalPath)
val ischema = new StructType().add("f2", DoubleType).add("f1", DoubleType)
val idf = spark.read
.schema(ischema)
.option("header", false)
.option("enforceSchema", false)
.csv(path.getCanonicalPath)
.schema(ischema)
.option("header", false)
.option("enforceSchema", false)
.csv(path.getCanonicalPath)

checkAnswer(idf, odf)
}
Expand Down Expand Up @@ -2284,6 +2288,7 @@ abstract class CSVSuite

assert(df.count() == expected)
}

def checkCount(expected: Long): Unit = {
val validRec = "1"
val inputs = Seq(
Expand Down Expand Up @@ -2538,8 +2543,8 @@ abstract class CSVSuite
test("Do not reuse last good value for bad input field") {
val schema = StructType(
StructField("col1", StringType) ::
StructField("col2", DateType) ::
Nil
StructField("col2", DateType) ::
Nil
)
val rows = spark.read
.schema(schema)
Expand Down Expand Up @@ -2718,6 +2723,7 @@ abstract class CSVSuite
s"1,$validTs,999").toDF("data")
.repartition(1)
.write.text(path.getAbsolutePath)

def checkReadback(condition: Column, expected: Seq[Row]): Unit = {
val readback = spark.read
.option("mode", "PERMISSIVE")
Expand Down Expand Up @@ -2918,7 +2924,7 @@ abstract class CSVSuite

Seq("DROPMALFORMED", "FAILFAST", "PERMISSIVE").foreach { mode =>
val csv = spark.createDataset(
spark.sparkContext.parallelize(inputCSVString:: Nil))(Encoders.STRING)
spark.sparkContext.parallelize(inputCSVString :: Nil))(Encoders.STRING)
val df = spark.read
.option("mode", mode)
.schema(schema)
Expand All @@ -2932,7 +2938,7 @@ abstract class CSVSuite
spark.read.schema(
StructType(
StructField("f1", StringType, nullable = false) ::
StructField("f2", StringType, nullable = false) :: Nil)
StructField("f2", StringType, nullable = false) :: Nil)
).option("mode", "DROPMALFORMED").csv(Seq("a,", "a,b").toDS()),
Row("a", "b"))
}
Expand Down Expand Up @@ -3188,8 +3194,8 @@ abstract class CSVSuite

for (fallbackEnabled <- Seq(true, false)) {
withSQLConf(
SQLConf.LEGACY_TIME_PARSER_POLICY.key -> "CORRECTED",
SQLConf.LEGACY_CSV_ENABLE_DATE_TIME_PARSING_FALLBACK.key -> s"$fallbackEnabled") {
SQLConf.LEGACY_TIME_PARSER_POLICY.key -> "CORRECTED",
SQLConf.LEGACY_CSV_ENABLE_DATE_TIME_PARSING_FALLBACK.key -> s"$fallbackEnabled") {
val df = spark.read
.schema("date date, ts timestamp")
.option("dateFormat", "invalid")
Expand Down Expand Up @@ -3439,6 +3445,40 @@ abstract class CSVSuite
expected)
}
}

test("SPARK-49444: CSV parsing failure with more than max columns") {
val schema = new StructType()
.add("intColumn", IntegerType, nullable = true)
.add("decimalColumn", DecimalType(10, 2), nullable = true)

val fileReadException = intercept[SparkException] {
spark
.read
.schema(schema)
.option("header", "false")
.option("maxColumns", "2")
.csv(testFile(moreColumnsFile))
.collect()
}

checkErrorMatchPVals(
exception = fileReadException,
errorClass = "FAILED_READ_FILE.NO_HINT",
parameters = Map("path" -> s".*$moreColumnsFile"))

val malformedCSVException = fileReadException.getCause.asInstanceOf[SparkRuntimeException]

checkError(
exception = malformedCSVException,
errorClass = "MALFORMED_CSV_RECORD",
parameters = Map("badRecord" -> "1,3.14,string,5,7"),
sqlState = "KD000",
)

assert(malformedCSVException.getCause.isInstanceOf[TextParsingException])
val textParsingException = malformedCSVException.getCause.asInstanceOf[TextParsingException]
assert(textParsingException.getCause.isInstanceOf[ArrayIndexOutOfBoundsException])
}
}

class CSVv1Suite extends CSVSuite {
Expand Down

0 comments on commit 5cf7caa

Please sign in to comment.