[SPARK-13792][SQL] Limit logging of bad records in CSV data source

## What changes were proposed in this pull request?
This pull request adds a new option (maxMalformedLogPerPartition) in CSV reader to limit the maximum of logging message Spark generates per partition for malformed records.

The error log looks something like
```
16/06/20 18:50:14 WARN CSVRelation: Dropping malformed line: adsf,1,4
16/06/20 18:50:14 WARN CSVRelation: Dropping malformed line: adsf,1,4
16/06/20 18:50:14 WARN CSVRelation: Dropping malformed line: adsf,1,4
16/06/20 18:50:14 WARN CSVRelation: Dropping malformed line: adsf,1,4
16/06/20 18:50:14 WARN CSVRelation: Dropping malformed line: adsf,1,4
16/06/20 18:50:14 WARN CSVRelation: Dropping malformed line: adsf,1,4
16/06/20 18:50:14 WARN CSVRelation: Dropping malformed line: adsf,1,4
16/06/20 18:50:14 WARN CSVRelation: Dropping malformed line: adsf,1,4
16/06/20 18:50:14 WARN CSVRelation: Dropping malformed line: adsf,1,4
16/06/20 18:50:14 WARN CSVRelation: Dropping malformed line: adsf,1,4
16/06/20 18:50:14 WARN CSVRelation: More than 10 malformed records have been found on this partition. Malformed records from now on will not be logged.
```

Closes #12173

## How was this patch tested?
Manually tested.

Author: Reynold Xin <rxin@databricks.com>

Closes #13795 from rxin/SPARK-13792.
This commit is contained in:
Reynold Xin 2016-06-20 21:46:12 -07:00
parent 217db56ba1
commit c775bf09e0
5 changed files with 44 additions and 15 deletions

View file

@ -392,6 +392,10 @@ class DataFrameReader(ReaderUtils):
:param maxCharsPerColumn: defines the maximum number of characters allowed for any given
value being read. If None is set, it uses the default value,
``1000000``.
:param maxMalformedLogPerPartition: sets the maximum number of malformed rows Spark will
log for each partition. Malformed records beyond this
number will be ignored. If None is set, it
uses the default value, ``10``.
:param mode: allows a mode for dealing with corrupt records during parsing. If None is
set, it uses the default value, ``PERMISSIVE``.

View file

@ -382,6 +382,8 @@ class DataFrameReader private[sql](sparkSession: SparkSession) extends Logging {
* a record can have.</li>
* <li>`maxCharsPerColumn` (default `1000000`): defines the maximum number of characters allowed
* for any given value being read.</li>
* <li>`maxMalformedLogPerPartition` (default `10`): sets the maximum number of malformed rows
* Spark will log for each partition. Malformed records beyond this number will be ignored.</li>
* <li>`mode` (default `PERMISSIVE`): allows a mode for dealing with corrupt records
* during parsing.</li>
* <ul>

View file

@ -120,7 +120,14 @@ class CSVFileFormat extends TextBasedFileFormat with DataSourceRegister {
val tokenizedIterator = new BulkCsvReader(lineIterator, csvOptions, headers)
val parser = CSVRelation.csvParser(dataSchema, requiredSchema.fieldNames, csvOptions)
tokenizedIterator.flatMap(parser(_).toSeq)
var numMalformedRecords = 0
tokenizedIterator.flatMap { recordTokens =>
val row = parser(recordTokens, numMalformedRecords)
if (row.isEmpty) {
numMalformedRecords += 1
}
row
}
}
}

View file

@ -113,6 +113,8 @@ private[sql] class CSVOptions(@transient private val parameters: Map[String, Str
val escapeQuotes = getBool("escapeQuotes", true)
val maxMalformedLogPerPartition = getInt("maxMalformedLogPerPartition", 10)
val inputBufferSize = 128
val isCommentSet = this.comment != '\u0000'

View file

@ -50,10 +50,19 @@ object CSVRelation extends Logging {
}
}
/**
* Returns a function that parses a single CSV record (in the form of an array of strings in which
* each element represents a column) and turns it into either one resulting row or no row (if the
* the record is malformed).
*
* The 2nd argument in the returned function represents the total number of malformed rows
* observed so far.
*/
// This is pretty convoluted and we should probably rewrite the entire CSV parsing soon.
def csvParser(
schema: StructType,
requiredColumns: Array[String],
params: CSVOptions): Array[String] => Option[InternalRow] = {
params: CSVOptions): (Array[String], Int) => Option[InternalRow] = {
val schemaFields = schema.fields
val requiredFields = StructType(requiredColumns.map(schema(_))).fields
val safeRequiredFields = if (params.dropMalformed) {
@ -72,9 +81,16 @@ object CSVRelation extends Logging {
val requiredSize = requiredFields.length
val row = new GenericMutableRow(requiredSize)
(tokens: Array[String]) => {
(tokens: Array[String], numMalformedRows) => {
if (params.dropMalformed && schemaFields.length != tokens.length) {
logWarning(s"Dropping malformed line: ${tokens.mkString(params.delimiter.toString)}")
if (numMalformedRows < params.maxMalformedLogPerPartition) {
logWarning(s"Dropping malformed line: ${tokens.mkString(params.delimiter.toString)}")
}
if (numMalformedRows == params.maxMalformedLogPerPartition - 1) {
logWarning(
s"More than ${params.maxMalformedLogPerPartition} malformed records have been " +
"found on this partition. Malformed records from now on will not be logged.")
}
None
} else if (params.failFast && schemaFields.length != tokens.length) {
throw new RuntimeException(s"Malformed line in FAILFAST mode: " +
@ -109,23 +125,21 @@ object CSVRelation extends Logging {
Some(row)
} catch {
case NonFatal(e) if params.dropMalformed =>
logWarning("Parse exception. " +
s"Dropping malformed line: ${tokens.mkString(params.delimiter.toString)}")
if (numMalformedRows < params.maxMalformedLogPerPartition) {
logWarning("Parse exception. " +
s"Dropping malformed line: ${tokens.mkString(params.delimiter.toString)}")
}
if (numMalformedRows == params.maxMalformedLogPerPartition - 1) {
logWarning(
s"More than ${params.maxMalformedLogPerPartition} malformed records have been " +
"found on this partition. Malformed records from now on will not be logged.")
}
None
}
}
}
}
def parseCsv(
tokenizedRDD: RDD[Array[String]],
schema: StructType,
requiredColumns: Array[String],
options: CSVOptions): RDD[InternalRow] = {
val parser = csvParser(schema, requiredColumns, options)
tokenizedRDD.flatMap(parser(_).toSeq)
}
// Skips the header line of each file if the `header` option is set to true.
def dropHeaderLine(
file: PartitionedFile, lines: Iterator[String], csvOptions: CSVOptions): Unit = {