From 9ddecc147911055246b536258e4de52d7019cf34 Mon Sep 17 00:00:00 2001 From: Adrian Bridgett Date: Tue, 9 Feb 2016 17:05:08 +0000 Subject: [PATCH 1/8] report the error as well as the line that caused it --- .../databricks/spark/csv/CsvRelation.scala | 26 ++++++++++++------- 1 file changed, 16 insertions(+), 10 deletions(-) diff --git a/src/main/scala/com/databricks/spark/csv/CsvRelation.scala b/src/main/scala/com/databricks/spark/csv/CsvRelation.scala index c2cd955..504cf4a 100755 --- a/src/main/scala/com/databricks/spark/csv/CsvRelation.scala +++ b/src/main/scala/com/databricks/spark/csv/CsvRelation.scala @@ -105,7 +105,7 @@ case class CsvRelation protected[spark] ( tokenRdd(schemaFields.map(_.name)).flatMap { tokens => if (dropMalformed && schemaFields.length != tokens.size) { - logger.warn(s"Dropping malformed line: ${tokens.mkString(",")}") + logger.warn(s"Dropping malformed line (wrong length): ${tokens.mkString(",")}") None } else if (failFast && schemaFields.length != tokens.size) { throw new RuntimeException(s"Malformed line in FAILFAST mode: ${tokens.mkString(",")}") @@ -125,13 +125,16 @@ case class CsvRelation protected[spark] ( case aiob: ArrayIndexOutOfBoundsException if permissive => (index until schemaFields.length).foreach(ind => rowArray(ind) = null) Some(Row.fromSeq(rowArray)) - case _: java.lang.NumberFormatException | - _: IllegalArgumentException if dropMalformed => - logger.warn("Number format exception. " + + case e: java.lang.NumberFormatException if dropMalformed => + logger.warn("Number format exception (" + e + ")." + + s"Dropping malformed line: ${tokens.mkString(delimiter.toString)}") + None + case e: IllegalArgumentException if dropMalformed => + logger.warn("IllegalArgument exception (" + e + ")." + s"Dropping malformed line: ${tokens.mkString(delimiter.toString)}") None case pe: java.text.ParseException if dropMalformed => - logger.warn("Parse exception. " + + logger.warn("Parse exception. (" + pe + ")." + s"Dropping malformed line: ${tokens.mkString(delimiter.toString)}") None } @@ -170,7 +173,7 @@ case class CsvRelation protected[spark] ( val requiredSize = requiredFields.length tokenRdd(schemaFields.map(_.name)).flatMap { tokens => if (dropMalformed && schemaFields.length != tokens.size) { - logger.warn(s"Dropping malformed line: ${tokens.mkString(delimiter.toString)}") + logger.warn(s"Dropping malformed line (wrong length): ${tokens.mkString(delimiter.toString)}") None } else if (failFast && schemaFields.length != tokens.size) { throw new RuntimeException(s"Malformed line in FAILFAST mode: " + @@ -198,13 +201,16 @@ case class CsvRelation protected[spark] ( } Some(Row.fromSeq(rowArray.take(requiredSize))) } catch { - case _: java.lang.NumberFormatException | - _: IllegalArgumentException if dropMalformed => - logger.warn("Number format exception. " + + case e: java.lang.NumberFormatException if dropMalformed => + logger.warn("Number format exception (" + e + ")." + + s"Dropping malformed line: ${tokens.mkString(delimiter.toString)}") + None + case e: IllegalArgumentException if dropMalformed => + logger.warn("IllegalArgument exception (" + e + ")." + s"Dropping malformed line: ${tokens.mkString(delimiter.toString)}") None case pe: java.text.ParseException if dropMalformed => - logger.warn("Parse exception. " + + logger.warn("Parse exception (" + pe + ")." + s"Dropping malformed line: ${tokens.mkString(delimiter.toString)}") None } From 09a3b1a8aa42a296361adad4ef29a99ff97850c3 Mon Sep 17 00:00:00 2001 From: Adrian Bridgett Date: Thu, 11 Feb 2016 08:25:12 +0000 Subject: [PATCH 2/8] wrap long line --- src/main/scala/com/databricks/spark/csv/CsvRelation.scala | 3 ++- 1 file changed, 2 insertions(+), 1 deletion(-) diff --git a/src/main/scala/com/databricks/spark/csv/CsvRelation.scala b/src/main/scala/com/databricks/spark/csv/CsvRelation.scala index 504cf4a..325b871 100755 --- a/src/main/scala/com/databricks/spark/csv/CsvRelation.scala +++ b/src/main/scala/com/databricks/spark/csv/CsvRelation.scala @@ -173,7 +173,8 @@ case class CsvRelation protected[spark] ( val requiredSize = requiredFields.length tokenRdd(schemaFields.map(_.name)).flatMap { tokens => if (dropMalformed && schemaFields.length != tokens.size) { - logger.warn(s"Dropping malformed line (wrong length): ${tokens.mkString(delimiter.toString)}") + logger.warn(s"Dropping malformed line (wrong length): " + + s"${tokens.mkString(delimiter.toString)}") None } else if (failFast && schemaFields.length != tokens.size) { throw new RuntimeException(s"Malformed line in FAILFAST mode: " + From ebfced992f59b24a5cfa2fea66512c9cadfad6a5 Mon Sep 17 00:00:00 2001 From: Adrian Bridgett Date: Mon, 29 Feb 2016 08:26:53 +0000 Subject: [PATCH 3/8] improve error message formatting (thanks to Hyukjin Kwon) --- src/main/scala/com/databricks/spark/csv/CsvRelation.scala | 6 +++--- 1 file changed, 3 insertions(+), 3 deletions(-) diff --git a/src/main/scala/com/databricks/spark/csv/CsvRelation.scala b/src/main/scala/com/databricks/spark/csv/CsvRelation.scala index 325b871..7e9e813 100755 --- a/src/main/scala/com/databricks/spark/csv/CsvRelation.scala +++ b/src/main/scala/com/databricks/spark/csv/CsvRelation.scala @@ -126,15 +126,15 @@ case class CsvRelation protected[spark] ( (index until schemaFields.length).foreach(ind => rowArray(ind) = null) Some(Row.fromSeq(rowArray)) case e: java.lang.NumberFormatException if dropMalformed => - logger.warn("Number format exception (" + e + ")." + + logger.warn("Number format exception (${e.getMessage})." + s"Dropping malformed line: ${tokens.mkString(delimiter.toString)}") None case e: IllegalArgumentException if dropMalformed => - logger.warn("IllegalArgument exception (" + e + ")." + + logger.warn("IllegalArgument exception (${e.getMessage})." + s"Dropping malformed line: ${tokens.mkString(delimiter.toString)}") None case pe: java.text.ParseException if dropMalformed => - logger.warn("Parse exception. (" + pe + ")." + + logger.warn("Parse exception. (${pe.getMessage})." + s"Dropping malformed line: ${tokens.mkString(delimiter.toString)}") None } From 6ddab09c286bd53fb968c90d9177f086a7799f86 Mon Sep 17 00:00:00 2001 From: Adrian Bridgett Date: Fri, 11 Mar 2016 13:58:58 +0000 Subject: [PATCH 4/8] missed the all important "s" before the string quote --- src/main/scala/com/databricks/spark/csv/CsvRelation.scala | 6 +++--- 1 file changed, 3 insertions(+), 3 deletions(-) diff --git a/src/main/scala/com/databricks/spark/csv/CsvRelation.scala b/src/main/scala/com/databricks/spark/csv/CsvRelation.scala index 7e9e813..e5e4d02 100755 --- a/src/main/scala/com/databricks/spark/csv/CsvRelation.scala +++ b/src/main/scala/com/databricks/spark/csv/CsvRelation.scala @@ -126,15 +126,15 @@ case class CsvRelation protected[spark] ( (index until schemaFields.length).foreach(ind => rowArray(ind) = null) Some(Row.fromSeq(rowArray)) case e: java.lang.NumberFormatException if dropMalformed => - logger.warn("Number format exception (${e.getMessage})." + + logger.warn(s"Number format exception (${e.getMessage})." + s"Dropping malformed line: ${tokens.mkString(delimiter.toString)}") None case e: IllegalArgumentException if dropMalformed => - logger.warn("IllegalArgument exception (${e.getMessage})." + + logger.warn(s"IllegalArgument exception (${e.getMessage})." + s"Dropping malformed line: ${tokens.mkString(delimiter.toString)}") None case pe: java.text.ParseException if dropMalformed => - logger.warn("Parse exception. (${pe.getMessage})." + + logger.warn(s"Parse exception. (${pe.getMessage})." + s"Dropping malformed line: ${tokens.mkString(delimiter.toString)}") None } From eadadd0ded2cc74ac57e435e5468064189bb090b Mon Sep 17 00:00:00 2001 From: Adrian Bridgett Date: Fri, 11 Mar 2016 14:34:47 +0000 Subject: [PATCH 5/8] add wrong length information --- .../com/databricks/spark/csv/CsvRelation.scala | 18 ++++++++++++------ 1 file changed, 12 insertions(+), 6 deletions(-) diff --git a/src/main/scala/com/databricks/spark/csv/CsvRelation.scala b/src/main/scala/com/databricks/spark/csv/CsvRelation.scala index e5e4d02..5d051dd 100755 --- a/src/main/scala/com/databricks/spark/csv/CsvRelation.scala +++ b/src/main/scala/com/databricks/spark/csv/CsvRelation.scala @@ -105,10 +105,14 @@ case class CsvRelation protected[spark] ( tokenRdd(schemaFields.map(_.name)).flatMap { tokens => if (dropMalformed && schemaFields.length != tokens.size) { - logger.warn(s"Dropping malformed line (wrong length): ${tokens.mkString(",")}") + logger.warn(s"Dropping malformed line " + + s"(expected ${schemaFields.length} tokens but received " + + s"${tokens.size} tokens): ${tokens.mkString(",")}") None } else if (failFast && schemaFields.length != tokens.size) { - throw new RuntimeException(s"Malformed line in FAILFAST mode: ${tokens.mkString(",")}") + throw new RuntimeException(s"Malformed line " + + s"(expected ${schemaFields.length} tokens but received " + + s"${tokens.size} tokens) in FAILFAST mode: ${tokens.mkString(",")}") } else { var index: Int = 0 val rowArray = new Array[Any](schemaFields.length) @@ -173,12 +177,14 @@ case class CsvRelation protected[spark] ( val requiredSize = requiredFields.length tokenRdd(schemaFields.map(_.name)).flatMap { tokens => if (dropMalformed && schemaFields.length != tokens.size) { - logger.warn(s"Dropping malformed line (wrong length): " + - s"${tokens.mkString(delimiter.toString)}") + logger.warn(s"Dropping malformed line " + + s"(expected ${schemaFields.length} tokens but received " + + s"${tokens.size} tokens): ${tokens.mkString(delimiter.toString)}") None } else if (failFast && schemaFields.length != tokens.size) { - throw new RuntimeException(s"Malformed line in FAILFAST mode: " + - s"${tokens.mkString(delimiter.toString)}") + throw new RuntimeException(s"Malformed line in FAILFAST mode " + + s"(expected ${schemaFields.length} tokens but received " + + s"${tokens.size} tokens): ${tokens.mkString(delimiter.toString)}") } else { val indexSafeTokens = if (permissive && schemaFields.length != tokens.size) { tokens ++ new Array[String](schemaFields.length - tokens.size) From 72b5dec228ccf71f6e49013530144927e4171e55 Mon Sep 17 00:00:00 2001 From: Adrian Bridgett Date: Fri, 11 Mar 2016 14:51:50 +0000 Subject: [PATCH 6/8] removed trailing space --- src/main/scala/com/databricks/spark/csv/CsvRelation.scala | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/src/main/scala/com/databricks/spark/csv/CsvRelation.scala b/src/main/scala/com/databricks/spark/csv/CsvRelation.scala index 5d051dd..0f70ec0 100755 --- a/src/main/scala/com/databricks/spark/csv/CsvRelation.scala +++ b/src/main/scala/com/databricks/spark/csv/CsvRelation.scala @@ -183,7 +183,7 @@ case class CsvRelation protected[spark] ( None } else if (failFast && schemaFields.length != tokens.size) { throw new RuntimeException(s"Malformed line in FAILFAST mode " + - s"(expected ${schemaFields.length} tokens but received " + + s"(expected ${schemaFields.length} tokens but received " + s"${tokens.size} tokens): ${tokens.mkString(delimiter.toString)}") } else { val indexSafeTokens = if (permissive && schemaFields.length != tokens.size) { From 75e5fd3ed8b1b21f53e16a6d1ef0f9e6cfab3bdb Mon Sep 17 00:00:00 2001 From: Adrian Bridgett Date: Fri, 11 Mar 2016 16:15:22 +0000 Subject: [PATCH 7/8] corrected test for improved parse messages --- src/test/scala/com/databricks/spark/csv/CsvSuite.scala | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/src/test/scala/com/databricks/spark/csv/CsvSuite.scala b/src/test/scala/com/databricks/spark/csv/CsvSuite.scala index 00eb846..8eb605f 100755 --- a/src/test/scala/com/databricks/spark/csv/CsvSuite.scala +++ b/src/test/scala/com/databricks/spark/csv/CsvSuite.scala @@ -205,7 +205,7 @@ abstract class AbstractCsvSuite extends FunSuite with BeforeAndAfterAll { .collect() } - assert(exception.getMessage.contains("Malformed line in FAILFAST mode: 2015,Chevy,Volt")) + assert(exception.getMessage.contains("Malformed line in FAILFAST mode (expected 5 tokens but received 3 tokens): 2015,Chevy,Volt")) } test("DSL test roundtrip nulls") { From 382a1e198b4ac9f0e30806396a61bb631d80eb5e Mon Sep 17 00:00:00 2001 From: Adrian Bridgett Date: Mon, 21 Mar 2016 11:19:04 +0000 Subject: [PATCH 8/8] fix long line --- src/test/scala/com/databricks/spark/csv/CsvSuite.scala | 3 ++- 1 file changed, 2 insertions(+), 1 deletion(-) diff --git a/src/test/scala/com/databricks/spark/csv/CsvSuite.scala b/src/test/scala/com/databricks/spark/csv/CsvSuite.scala index 8eb605f..7c055e4 100755 --- a/src/test/scala/com/databricks/spark/csv/CsvSuite.scala +++ b/src/test/scala/com/databricks/spark/csv/CsvSuite.scala @@ -205,7 +205,8 @@ abstract class AbstractCsvSuite extends FunSuite with BeforeAndAfterAll { .collect() } - assert(exception.getMessage.contains("Malformed line in FAILFAST mode (expected 5 tokens but received 3 tokens): 2015,Chevy,Volt")) + assert(exception.getMessage.contains("Malformed line in FAILFAST mode " + + "(expected 5 tokens but received 3 tokens): 2015,Chevy,Volt")) } test("DSL test roundtrip nulls") {