[SPARK-22818][SQL] csv escape of quote escape

## What changes were proposed in this pull request?

Escape of escape should be considered when using the UniVocity csv encoding/decoding library.

Ref: https://github.com/uniVocity/univocity-parsers#escaping-quote-escape-characters

One option is added for reading and writing CSV: `escapeQuoteEscaping`

## How was this patch tested?

Unit test added.

Author: soonmok-kwon <soonmok.kwon@navercorp.com>

Closes #20004 from ep1804/SPARK-22818.
This commit is contained in:
soonmok-kwon 2017-12-29 07:30:06 +08:00 committed by gatorsmile
parent cfcd746689
commit ffe6fd77a4
7 changed files with 94 additions and 28 deletions

View file

@ -333,7 +333,7 @@ class DataFrameReader(OptionUtils):
ignoreTrailingWhiteSpace=None, nullValue=None, nanValue=None, positiveInf=None,
negativeInf=None, dateFormat=None, timestampFormat=None, maxColumns=None,
maxCharsPerColumn=None, maxMalformedLogPerPartition=None, mode=None,
columnNameOfCorruptRecord=None, multiLine=None):
columnNameOfCorruptRecord=None, multiLine=None, charToEscapeQuoteEscaping=None):
"""Loads a CSV file and returns the result as a :class:`DataFrame`.
This function will go through the input once to determine the input schema if
@ -344,17 +344,17 @@ class DataFrameReader(OptionUtils):
or RDD of Strings storing CSV rows.
:param schema: an optional :class:`pyspark.sql.types.StructType` for the input schema
or a DDL-formatted string (For example ``col0 INT, col1 DOUBLE``).
:param sep: sets the single character as a separator for each field and value.
:param sep: sets a single character as a separator for each field and value.
If None is set, it uses the default value, ``,``.
:param encoding: decodes the CSV files by the given encoding type. If None is set,
it uses the default value, ``UTF-8``.
:param quote: sets the single character used for escaping quoted values where the
:param quote: sets a single character used for escaping quoted values where the
separator can be part of the value. If None is set, it uses the default
value, ``"``. If you would like to turn off quotations, you need to set an
empty string.
:param escape: sets the single character used for escaping quotes inside an already
:param escape: sets a single character used for escaping quotes inside an already
quoted value. If None is set, it uses the default value, ``\``.
:param comment: sets the single character used for skipping lines beginning with this
:param comment: sets a single character used for skipping lines beginning with this
character. By default (None), it is disabled.
:param header: uses the first line as names of columns. If None is set, it uses the
default value, ``false``.
@ -410,6 +410,10 @@ class DataFrameReader(OptionUtils):
``spark.sql.columnNameOfCorruptRecord``.
:param multiLine: parse records, which may span multiple lines. If None is
set, it uses the default value, ``false``.
:param charToEscapeQuoteEscaping: sets a single character used for escaping the escape for
the quote character. If None is set, the default value is
escape character when escape and quote characters are
different, ``\0`` otherwise.
>>> df = spark.read.csv('python/test_support/sql/ages.csv')
>>> df.dtypes
@ -427,7 +431,8 @@ class DataFrameReader(OptionUtils):
dateFormat=dateFormat, timestampFormat=timestampFormat, maxColumns=maxColumns,
maxCharsPerColumn=maxCharsPerColumn,
maxMalformedLogPerPartition=maxMalformedLogPerPartition, mode=mode,
columnNameOfCorruptRecord=columnNameOfCorruptRecord, multiLine=multiLine)
columnNameOfCorruptRecord=columnNameOfCorruptRecord, multiLine=multiLine,
charToEscapeQuoteEscaping=charToEscapeQuoteEscaping)
if isinstance(path, basestring):
path = [path]
if type(path) == list:
@ -814,7 +819,8 @@ class DataFrameWriter(OptionUtils):
@since(2.0)
def csv(self, path, mode=None, compression=None, sep=None, quote=None, escape=None,
header=None, nullValue=None, escapeQuotes=None, quoteAll=None, dateFormat=None,
timestampFormat=None, ignoreLeadingWhiteSpace=None, ignoreTrailingWhiteSpace=None):
timestampFormat=None, ignoreLeadingWhiteSpace=None, ignoreTrailingWhiteSpace=None,
charToEscapeQuoteEscaping=None):
"""Saves the content of the :class:`DataFrame` in CSV format at the specified path.
:param path: the path in any Hadoop supported file system
@ -829,12 +835,12 @@ class DataFrameWriter(OptionUtils):
:param compression: compression codec to use when saving to file. This can be one of the
known case-insensitive shorten names (none, bzip2, gzip, lz4,
snappy and deflate).
:param sep: sets the single character as a separator for each field and value. If None is
:param sep: sets a single character as a separator for each field and value. If None is
set, it uses the default value, ``,``.
:param quote: sets the single character used for escaping quoted values where the
:param quote: sets a single character used for escaping quoted values where the
separator can be part of the value. If None is set, it uses the default
value, ``"``. If an empty string is set, it uses ``u0000`` (null character).
:param escape: sets the single character used for escaping quotes inside an already
:param escape: sets a single character used for escaping quotes inside an already
quoted value. If None is set, it uses the default value, ``\``
:param escapeQuotes: a flag indicating whether values containing quotes should always
be enclosed in quotes. If None is set, it uses the default value
@ -860,6 +866,10 @@ class DataFrameWriter(OptionUtils):
:param ignoreTrailingWhiteSpace: a flag indicating whether or not trailing whitespaces from
values being written should be skipped. If None is set, it
uses the default value, ``true``.
:param charToEscapeQuoteEscaping: sets a single character used for escaping the escape for
the quote character. If None is set, the default value is
escape character when escape and quote characters are
different, ``\0`` otherwise..
>>> df.write.csv(os.path.join(tempfile.mkdtemp(), 'data'))
"""
@ -868,7 +878,8 @@ class DataFrameWriter(OptionUtils):
nullValue=nullValue, escapeQuotes=escapeQuotes, quoteAll=quoteAll,
dateFormat=dateFormat, timestampFormat=timestampFormat,
ignoreLeadingWhiteSpace=ignoreLeadingWhiteSpace,
ignoreTrailingWhiteSpace=ignoreTrailingWhiteSpace)
ignoreTrailingWhiteSpace=ignoreTrailingWhiteSpace,
charToEscapeQuoteEscaping=charToEscapeQuoteEscaping)
self._jwrite.csv(path)
@since(1.5)

View file

@ -560,7 +560,7 @@ class DataStreamReader(OptionUtils):
ignoreTrailingWhiteSpace=None, nullValue=None, nanValue=None, positiveInf=None,
negativeInf=None, dateFormat=None, timestampFormat=None, maxColumns=None,
maxCharsPerColumn=None, maxMalformedLogPerPartition=None, mode=None,
columnNameOfCorruptRecord=None, multiLine=None):
columnNameOfCorruptRecord=None, multiLine=None, charToEscapeQuoteEscaping=None):
"""Loads a CSV file stream and returns the result as a :class:`DataFrame`.
This function will go through the input once to determine the input schema if
@ -572,17 +572,17 @@ class DataStreamReader(OptionUtils):
:param path: string, or list of strings, for input path(s).
:param schema: an optional :class:`pyspark.sql.types.StructType` for the input schema
or a DDL-formatted string (For example ``col0 INT, col1 DOUBLE``).
:param sep: sets the single character as a separator for each field and value.
:param sep: sets a single character as a separator for each field and value.
If None is set, it uses the default value, ``,``.
:param encoding: decodes the CSV files by the given encoding type. If None is set,
it uses the default value, ``UTF-8``.
:param quote: sets the single character used for escaping quoted values where the
:param quote: sets a single character used for escaping quoted values where the
separator can be part of the value. If None is set, it uses the default
value, ``"``. If you would like to turn off quotations, you need to set an
empty string.
:param escape: sets the single character used for escaping quotes inside an already
:param escape: sets a single character used for escaping quotes inside an already
quoted value. If None is set, it uses the default value, ``\``.
:param comment: sets the single character used for skipping lines beginning with this
:param comment: sets a single character used for skipping lines beginning with this
character. By default (None), it is disabled.
:param header: uses the first line as names of columns. If None is set, it uses the
default value, ``false``.
@ -638,6 +638,10 @@ class DataStreamReader(OptionUtils):
``spark.sql.columnNameOfCorruptRecord``.
:param multiLine: parse one record, which may span multiple lines. If None is
set, it uses the default value, ``false``.
:param charToEscapeQuoteEscaping: sets a single character used for escaping the escape for
the quote character. If None is set, the default value is
escape character when escape and quote characters are
different, ``\0`` otherwise..
>>> csv_sdf = spark.readStream.csv(tempfile.mkdtemp(), schema = sdf_schema)
>>> csv_sdf.isStreaming
@ -653,7 +657,8 @@ class DataStreamReader(OptionUtils):
dateFormat=dateFormat, timestampFormat=timestampFormat, maxColumns=maxColumns,
maxCharsPerColumn=maxCharsPerColumn,
maxMalformedLogPerPartition=maxMalformedLogPerPartition, mode=mode,
columnNameOfCorruptRecord=columnNameOfCorruptRecord, multiLine=multiLine)
columnNameOfCorruptRecord=columnNameOfCorruptRecord, multiLine=multiLine,
charToEscapeQuoteEscaping=charToEscapeQuoteEscaping)
if isinstance(path, basestring):
return self._df(self._jreader.csv(path))
else:

View file

@ -517,17 +517,20 @@ class DataFrameReader private[sql](sparkSession: SparkSession) extends Logging {
*
* You can set the following CSV-specific options to deal with CSV files:
* <ul>
* <li>`sep` (default `,`): sets the single character as a separator for each
* <li>`sep` (default `,`): sets a single character as a separator for each
* field and value.</li>
* <li>`encoding` (default `UTF-8`): decodes the CSV files by the given encoding
* type.</li>
* <li>`quote` (default `"`): sets the single character used for escaping quoted values where
* <li>`quote` (default `"`): sets a single character used for escaping quoted values where
* the separator can be part of the value. If you would like to turn off quotations, you need to
* set not `null` but an empty string. This behaviour is different from
* `com.databricks.spark.csv`.</li>
* <li>`escape` (default `\`): sets the single character used for escaping quotes inside
* <li>`escape` (default `\`): sets a single character used for escaping quotes inside
* an already quoted value.</li>
* <li>`comment` (default empty string): sets the single character used for skipping lines
* <li>`charToEscapeQuoteEscaping` (default `escape` or `\0`): sets a single character used for
* escaping the escape for the quote character. The default value is escape character when escape
* and quote characters are different, `\0` otherwise.</li>
* <li>`comment` (default empty string): sets a single character used for skipping lines
* beginning with this character. By default, it is disabled.</li>
* <li>`header` (default `false`): uses the first line as names of columns.</li>
* <li>`inferSchema` (default `false`): infers the input schema automatically from data. It

View file

@ -594,13 +594,16 @@ final class DataFrameWriter[T] private[sql](ds: Dataset[T]) {
*
* You can set the following CSV-specific option(s) for writing CSV files:
* <ul>
* <li>`sep` (default `,`): sets the single character as a separator for each
* <li>`sep` (default `,`): sets a single character as a separator for each
* field and value.</li>
* <li>`quote` (default `"`): sets the single character used for escaping quoted values where
* <li>`quote` (default `"`): sets a single character used for escaping quoted values where
* the separator can be part of the value. If an empty string is set, it uses `u0000`
* (null character).</li>
* <li>`escape` (default `\`): sets the single character used for escaping quotes inside
* <li>`escape` (default `\`): sets a single character used for escaping quotes inside
* an already quoted value.</li>
* <li>`charToEscapeQuoteEscaping` (default `escape` or `\0`): sets a single character used for
* escaping the escape for the quote character. The default value is escape character when escape
* and quote characters are different, `\0` otherwise.</li>
* <li>`escapeQuotes` (default `true`): a flag indicating whether values containing
* quotes should always be enclosed in quotes. Default is to escape all values containing
* a quote character.</li>

View file

@ -89,6 +89,14 @@ class CSVOptions(
val quote = getChar("quote", '\"')
val escape = getChar("escape", '\\')
val charToEscapeQuoteEscaping = parameters.get("charToEscapeQuoteEscaping") match {
case None => None
case Some(null) => None
case Some(value) if value.length == 0 => None
case Some(value) if value.length == 1 => Some(value.charAt(0))
case _ =>
throw new RuntimeException("charToEscapeQuoteEscaping cannot be more than one character")
}
val comment = getChar("comment", '\u0000')
val headerFlag = getBool("header")
@ -148,6 +156,7 @@ class CSVOptions(
format.setDelimiter(delimiter)
format.setQuote(quote)
format.setQuoteEscape(escape)
charToEscapeQuoteEscaping.foreach(format.setCharToEscapeQuoteEscaping)
format.setComment(comment)
writerSettings.setIgnoreLeadingWhitespaces(ignoreLeadingWhiteSpaceFlagInWrite)
writerSettings.setIgnoreTrailingWhitespaces(ignoreTrailingWhiteSpaceFlagInWrite)
@ -165,6 +174,7 @@ class CSVOptions(
format.setDelimiter(delimiter)
format.setQuote(quote)
format.setQuoteEscape(escape)
charToEscapeQuoteEscaping.foreach(format.setCharToEscapeQuoteEscaping)
format.setComment(comment)
settings.setIgnoreLeadingWhitespaces(ignoreLeadingWhiteSpaceInRead)
settings.setIgnoreTrailingWhitespaces(ignoreTrailingWhiteSpaceInRead)

View file

@ -262,17 +262,20 @@ final class DataStreamReader private[sql](sparkSession: SparkSession) extends Lo
* <ul>
* <li>`maxFilesPerTrigger` (default: no max limit): sets the maximum number of new files to be
* considered in every trigger.</li>
* <li>`sep` (default `,`): sets the single character as a separator for each
* <li>`sep` (default `,`): sets a single character as a separator for each
* field and value.</li>
* <li>`encoding` (default `UTF-8`): decodes the CSV files by the given encoding
* type.</li>
* <li>`quote` (default `"`): sets the single character used for escaping quoted values where
* <li>`quote` (default `"`): sets a single character used for escaping quoted values where
* the separator can be part of the value. If you would like to turn off quotations, you need to
* set not `null` but an empty string. This behaviour is different form
* `com.databricks.spark.csv`.</li>
* <li>`escape` (default `\`): sets the single character used for escaping quotes inside
* <li>`escape` (default `\`): sets a single character used for escaping quotes inside
* an already quoted value.</li>
* <li>`comment` (default empty string): sets the single character used for skipping lines
* <li>`charToEscapeQuoteEscaping` (default `escape` or `\0`): sets a single character used for
* escaping the escape for the quote character. The default value is escape character when escape
* and quote characters are different, `\0` otherwise.</li>
* <li>`comment` (default empty string): sets a single character used for skipping lines
* beginning with this character. By default, it is disabled.</li>
* <li>`header` (default `false`): uses the first line as names of columns.</li>
* <li>`inferSchema` (default `false`): infers the input schema automatically from data. It

View file

@ -482,6 +482,37 @@ class CSVSuite extends QueryTest with SharedSQLContext with SQLTestUtils {
}
}
test("save csv with quote escaping, using charToEscapeQuoteEscaping option") {
withTempPath { path =>
// original text
val df1 = Seq(
"""You are "beautiful"""",
"""Yes, \"in the inside"\"""
).toDF()
// text written in CSV with following options:
// quote character: "
// escape character: \
// character to escape quote escaping: #
val df2 = Seq(
""""You are \"beautiful\""""",
""""Yes, #\\"in the inside\"#\""""
).toDF()
df2.coalesce(1).write.text(path.getAbsolutePath)
val df3 = spark.read
.format("csv")
.option("quote", "\"")
.option("escape", "\\")
.option("charToEscapeQuoteEscaping", "#")
.load(path.getAbsolutePath)
checkAnswer(df1, df3)
}
}
test("commented lines in CSV data") {
Seq("false", "true").foreach { multiLine =>