[SPARK-20980][SQL] Rename wholeFile
to multiLine
for both CSV and JSON
### What changes were proposed in this pull request? The current option name `wholeFile` is misleading for CSV users. Currently, it is not representing a record per file. Actually, one file could have multiple records. Thus, we should rename it. Now, the proposal is `multiLine`. ### How was this patch tested? N/A Author: Xiao Li <gatorsmile@gmail.com> Closes #18202 from gatorsmile/renameCVSOption.
This commit is contained in:
parent
fffeb6d7c3
commit
2051428173
|
@ -334,7 +334,7 @@ setMethod("toDF", signature(x = "RDD"),
|
|||
#'
|
||||
#' Loads a JSON file, returning the result as a SparkDataFrame
|
||||
#' By default, (\href{http://jsonlines.org/}{JSON Lines text format or newline-delimited JSON}
|
||||
#' ) is supported. For JSON (one record per file), set a named property \code{wholeFile} to
|
||||
#' ) is supported. For JSON (one record per file), set a named property \code{multiLine} to
|
||||
#' \code{TRUE}.
|
||||
#' It goes through the entire dataset once to determine the schema.
|
||||
#'
|
||||
|
@ -348,7 +348,7 @@ setMethod("toDF", signature(x = "RDD"),
|
|||
#' sparkR.session()
|
||||
#' path <- "path/to/file.json"
|
||||
#' df <- read.json(path)
|
||||
#' df <- read.json(path, wholeFile = TRUE)
|
||||
#' df <- read.json(path, multiLine = TRUE)
|
||||
#' df <- jsonFile(path)
|
||||
#' }
|
||||
#' @name read.json
|
||||
|
@ -598,7 +598,7 @@ tableToDF <- function(tableName) {
|
|||
#' df1 <- read.df("path/to/file.json", source = "json")
|
||||
#' schema <- structType(structField("name", "string"),
|
||||
#' structField("info", "map<string,double>"))
|
||||
#' df2 <- read.df(mapTypeJsonPath, "json", schema, wholeFile = TRUE)
|
||||
#' df2 <- read.df(mapTypeJsonPath, "json", schema, multiLine = TRUE)
|
||||
#' df3 <- loadDF("data/test_table", "parquet", mergeSchema = "true")
|
||||
#' }
|
||||
#' @name read.df
|
||||
|
|
|
@ -174,12 +174,12 @@ class DataFrameReader(OptionUtils):
|
|||
allowComments=None, allowUnquotedFieldNames=None, allowSingleQuotes=None,
|
||||
allowNumericLeadingZero=None, allowBackslashEscapingAnyCharacter=None,
|
||||
mode=None, columnNameOfCorruptRecord=None, dateFormat=None, timestampFormat=None,
|
||||
wholeFile=None):
|
||||
multiLine=None):
|
||||
"""
|
||||
Loads JSON files and returns the results as a :class:`DataFrame`.
|
||||
|
||||
`JSON Lines <http://jsonlines.org/>`_ (newline-delimited JSON) is supported by default.
|
||||
For JSON (one record per file), set the ``wholeFile`` parameter to ``true``.
|
||||
For JSON (one record per file), set the ``multiLine`` parameter to ``true``.
|
||||
|
||||
If the ``schema`` parameter is not specified, this function goes
|
||||
through the input once to determine the input schema.
|
||||
|
@ -230,7 +230,7 @@ class DataFrameReader(OptionUtils):
|
|||
formats follow the formats at ``java.text.SimpleDateFormat``.
|
||||
This applies to timestamp type. If None is set, it uses the
|
||||
default value, ``yyyy-MM-dd'T'HH:mm:ss.SSSXXX``.
|
||||
:param wholeFile: parse one record, which may span multiple lines, per file. If None is
|
||||
:param multiLine: parse one record, which may span multiple lines, per file. If None is
|
||||
set, it uses the default value, ``false``.
|
||||
|
||||
>>> df1 = spark.read.json('python/test_support/sql/people.json')
|
||||
|
@ -248,7 +248,7 @@ class DataFrameReader(OptionUtils):
|
|||
allowSingleQuotes=allowSingleQuotes, allowNumericLeadingZero=allowNumericLeadingZero,
|
||||
allowBackslashEscapingAnyCharacter=allowBackslashEscapingAnyCharacter,
|
||||
mode=mode, columnNameOfCorruptRecord=columnNameOfCorruptRecord, dateFormat=dateFormat,
|
||||
timestampFormat=timestampFormat, wholeFile=wholeFile)
|
||||
timestampFormat=timestampFormat, multiLine=multiLine)
|
||||
if isinstance(path, basestring):
|
||||
path = [path]
|
||||
if type(path) == list:
|
||||
|
@ -322,7 +322,7 @@ class DataFrameReader(OptionUtils):
|
|||
ignoreTrailingWhiteSpace=None, nullValue=None, nanValue=None, positiveInf=None,
|
||||
negativeInf=None, dateFormat=None, timestampFormat=None, maxColumns=None,
|
||||
maxCharsPerColumn=None, maxMalformedLogPerPartition=None, mode=None,
|
||||
columnNameOfCorruptRecord=None, wholeFile=None):
|
||||
columnNameOfCorruptRecord=None, multiLine=None):
|
||||
"""Loads a CSV file and returns the result as a :class:`DataFrame`.
|
||||
|
||||
This function will go through the input once to determine the input schema if
|
||||
|
@ -396,7 +396,7 @@ class DataFrameReader(OptionUtils):
|
|||
``spark.sql.columnNameOfCorruptRecord``. If None is set,
|
||||
it uses the value specified in
|
||||
``spark.sql.columnNameOfCorruptRecord``.
|
||||
:param wholeFile: parse records, which may span multiple lines. If None is
|
||||
:param multiLine: parse records, which may span multiple lines. If None is
|
||||
set, it uses the default value, ``false``.
|
||||
|
||||
>>> df = spark.read.csv('python/test_support/sql/ages.csv')
|
||||
|
@ -411,7 +411,7 @@ class DataFrameReader(OptionUtils):
|
|||
dateFormat=dateFormat, timestampFormat=timestampFormat, maxColumns=maxColumns,
|
||||
maxCharsPerColumn=maxCharsPerColumn,
|
||||
maxMalformedLogPerPartition=maxMalformedLogPerPartition, mode=mode,
|
||||
columnNameOfCorruptRecord=columnNameOfCorruptRecord, wholeFile=wholeFile)
|
||||
columnNameOfCorruptRecord=columnNameOfCorruptRecord, multiLine=multiLine)
|
||||
if isinstance(path, basestring):
|
||||
path = [path]
|
||||
return self._df(self._jreader.csv(self._spark._sc._jvm.PythonUtils.toSeq(path)))
|
||||
|
|
|
@ -401,12 +401,12 @@ class DataStreamReader(OptionUtils):
|
|||
allowComments=None, allowUnquotedFieldNames=None, allowSingleQuotes=None,
|
||||
allowNumericLeadingZero=None, allowBackslashEscapingAnyCharacter=None,
|
||||
mode=None, columnNameOfCorruptRecord=None, dateFormat=None, timestampFormat=None,
|
||||
wholeFile=None):
|
||||
multiLine=None):
|
||||
"""
|
||||
Loads a JSON file stream and returns the results as a :class:`DataFrame`.
|
||||
|
||||
`JSON Lines <http://jsonlines.org/>`_ (newline-delimited JSON) is supported by default.
|
||||
For JSON (one record per file), set the ``wholeFile`` parameter to ``true``.
|
||||
For JSON (one record per file), set the ``multiLine`` parameter to ``true``.
|
||||
|
||||
If the ``schema`` parameter is not specified, this function goes
|
||||
through the input once to determine the input schema.
|
||||
|
@ -458,7 +458,7 @@ class DataStreamReader(OptionUtils):
|
|||
formats follow the formats at ``java.text.SimpleDateFormat``.
|
||||
This applies to timestamp type. If None is set, it uses the
|
||||
default value, ``yyyy-MM-dd'T'HH:mm:ss.SSSXXX``.
|
||||
:param wholeFile: parse one record, which may span multiple lines, per file. If None is
|
||||
:param multiLine: parse one record, which may span multiple lines, per file. If None is
|
||||
set, it uses the default value, ``false``.
|
||||
|
||||
>>> json_sdf = spark.readStream.json(tempfile.mkdtemp(), schema = sdf_schema)
|
||||
|
@ -473,7 +473,7 @@ class DataStreamReader(OptionUtils):
|
|||
allowSingleQuotes=allowSingleQuotes, allowNumericLeadingZero=allowNumericLeadingZero,
|
||||
allowBackslashEscapingAnyCharacter=allowBackslashEscapingAnyCharacter,
|
||||
mode=mode, columnNameOfCorruptRecord=columnNameOfCorruptRecord, dateFormat=dateFormat,
|
||||
timestampFormat=timestampFormat, wholeFile=wholeFile)
|
||||
timestampFormat=timestampFormat, multiLine=multiLine)
|
||||
if isinstance(path, basestring):
|
||||
return self._df(self._jreader.json(path))
|
||||
else:
|
||||
|
@ -532,7 +532,7 @@ class DataStreamReader(OptionUtils):
|
|||
ignoreTrailingWhiteSpace=None, nullValue=None, nanValue=None, positiveInf=None,
|
||||
negativeInf=None, dateFormat=None, timestampFormat=None, maxColumns=None,
|
||||
maxCharsPerColumn=None, maxMalformedLogPerPartition=None, mode=None,
|
||||
columnNameOfCorruptRecord=None, wholeFile=None):
|
||||
columnNameOfCorruptRecord=None, multiLine=None):
|
||||
"""Loads a CSV file stream and returns the result as a :class:`DataFrame`.
|
||||
|
||||
This function will go through the input once to determine the input schema if
|
||||
|
@ -607,7 +607,7 @@ class DataStreamReader(OptionUtils):
|
|||
``spark.sql.columnNameOfCorruptRecord``. If None is set,
|
||||
it uses the value specified in
|
||||
``spark.sql.columnNameOfCorruptRecord``.
|
||||
:param wholeFile: parse one record, which may span multiple lines. If None is
|
||||
:param multiLine: parse one record, which may span multiple lines. If None is
|
||||
set, it uses the default value, ``false``.
|
||||
|
||||
>>> csv_sdf = spark.readStream.csv(tempfile.mkdtemp(), schema = sdf_schema)
|
||||
|
@ -624,7 +624,7 @@ class DataStreamReader(OptionUtils):
|
|||
dateFormat=dateFormat, timestampFormat=timestampFormat, maxColumns=maxColumns,
|
||||
maxCharsPerColumn=maxCharsPerColumn,
|
||||
maxMalformedLogPerPartition=maxMalformedLogPerPartition, mode=mode,
|
||||
columnNameOfCorruptRecord=columnNameOfCorruptRecord, wholeFile=wholeFile)
|
||||
columnNameOfCorruptRecord=columnNameOfCorruptRecord, multiLine=multiLine)
|
||||
if isinstance(path, basestring):
|
||||
return self._df(self._jreader.csv(path))
|
||||
else:
|
||||
|
|
|
@ -457,15 +457,15 @@ class SQLTests(ReusedPySparkTestCase):
|
|||
df.select(add_three("id").alias("plus_three")).collect()
|
||||
)
|
||||
|
||||
def test_wholefile_json(self):
|
||||
def test_multiLine_json(self):
|
||||
people1 = self.spark.read.json("python/test_support/sql/people.json")
|
||||
people_array = self.spark.read.json("python/test_support/sql/people_array.json",
|
||||
wholeFile=True)
|
||||
multiLine=True)
|
||||
self.assertEqual(people1.collect(), people_array.collect())
|
||||
|
||||
def test_wholefile_csv(self):
|
||||
def test_multiline_csv(self):
|
||||
ages_newlines = self.spark.read.csv(
|
||||
"python/test_support/sql/ages_newlines.csv", wholeFile=True)
|
||||
"python/test_support/sql/ages_newlines.csv", multiLine=True)
|
||||
expected = [Row(_c0=u'Joe', _c1=u'20', _c2=u'Hi,\nI am Jeo'),
|
||||
Row(_c0=u'Tom', _c1=u'30', _c2=u'My name is Tom'),
|
||||
Row(_c0=u'Hyukjin', _c1=u'25', _c2=u'I am Hyukjin\n\nI love Spark!')]
|
||||
|
|
|
@ -81,7 +81,7 @@ private[sql] class JSONOptions(
|
|||
FastDateFormat.getInstance(
|
||||
parameters.getOrElse("timestampFormat", "yyyy-MM-dd'T'HH:mm:ss.SSSXXX"), timeZone, Locale.US)
|
||||
|
||||
val wholeFile = parameters.get("wholeFile").map(_.toBoolean).getOrElse(false)
|
||||
val multiLine = parameters.get("multiLine").map(_.toBoolean).getOrElse(false)
|
||||
|
||||
/** Sets config options on a Jackson [[JsonFactory]]. */
|
||||
def setJacksonOptions(factory: JsonFactory): Unit = {
|
||||
|
|
|
@ -295,7 +295,7 @@ class DataFrameReader private[sql](sparkSession: SparkSession) extends Logging {
|
|||
* Loads JSON files and returns the results as a `DataFrame`.
|
||||
*
|
||||
* <a href="http://jsonlines.org/">JSON Lines</a> (newline-delimited JSON) is supported by
|
||||
* default. For JSON (one record per file), set the `wholeFile` option to true.
|
||||
* default. For JSON (one record per file), set the `multiLine` option to true.
|
||||
*
|
||||
* This function goes through the input once to determine the input schema. If you know the
|
||||
* schema in advance, use the version that specifies the schema to avoid the extra scan.
|
||||
|
@ -335,7 +335,7 @@ class DataFrameReader private[sql](sparkSession: SparkSession) extends Logging {
|
|||
* <li>`timestampFormat` (default `yyyy-MM-dd'T'HH:mm:ss.SSSXXX`): sets the string that
|
||||
* indicates a timestamp format. Custom date formats follow the formats at
|
||||
* `java.text.SimpleDateFormat`. This applies to timestamp type.</li>
|
||||
* <li>`wholeFile` (default `false`): parse one record, which may span multiple lines,
|
||||
* <li>`multiLine` (default `false`): parse one record, which may span multiple lines,
|
||||
* per file</li>
|
||||
* </ul>
|
||||
*
|
||||
|
@ -537,7 +537,7 @@ class DataFrameReader private[sql](sparkSession: SparkSession) extends Logging {
|
|||
* <li>`columnNameOfCorruptRecord` (default is the value specified in
|
||||
* `spark.sql.columnNameOfCorruptRecord`): allows renaming the new field having malformed string
|
||||
* created by `PERMISSIVE` mode. This overrides `spark.sql.columnNameOfCorruptRecord`.</li>
|
||||
* <li>`wholeFile` (default `false`): parse one record, which may span multiple lines.</li>
|
||||
* <li>`multiLine` (default `false`): parse one record, which may span multiple lines.</li>
|
||||
* </ul>
|
||||
* @since 2.0.0
|
||||
*/
|
||||
|
|
|
@ -111,8 +111,8 @@ abstract class CSVDataSource extends Serializable {
|
|||
|
||||
object CSVDataSource {
|
||||
def apply(options: CSVOptions): CSVDataSource = {
|
||||
if (options.wholeFile) {
|
||||
WholeFileCSVDataSource
|
||||
if (options.multiLine) {
|
||||
MultiLineCSVDataSource
|
||||
} else {
|
||||
TextInputCSVDataSource
|
||||
}
|
||||
|
@ -197,7 +197,7 @@ object TextInputCSVDataSource extends CSVDataSource {
|
|||
}
|
||||
}
|
||||
|
||||
object WholeFileCSVDataSource extends CSVDataSource {
|
||||
object MultiLineCSVDataSource extends CSVDataSource {
|
||||
override val isSplitable: Boolean = false
|
||||
|
||||
override def readFile(
|
||||
|
|
|
@ -128,7 +128,7 @@ class CSVOptions(
|
|||
FastDateFormat.getInstance(
|
||||
parameters.getOrElse("timestampFormat", "yyyy-MM-dd'T'HH:mm:ss.SSSXXX"), timeZone, Locale.US)
|
||||
|
||||
val wholeFile = parameters.get("wholeFile").map(_.toBoolean).getOrElse(false)
|
||||
val multiLine = parameters.get("multiLine").map(_.toBoolean).getOrElse(false)
|
||||
|
||||
val maxColumns = getInt("maxColumns", 20480)
|
||||
|
||||
|
|
|
@ -86,8 +86,8 @@ abstract class JsonDataSource extends Serializable {
|
|||
|
||||
object JsonDataSource {
|
||||
def apply(options: JSONOptions): JsonDataSource = {
|
||||
if (options.wholeFile) {
|
||||
WholeFileJsonDataSource
|
||||
if (options.multiLine) {
|
||||
MultiLineJsonDataSource
|
||||
} else {
|
||||
TextInputJsonDataSource
|
||||
}
|
||||
|
@ -147,7 +147,7 @@ object TextInputJsonDataSource extends JsonDataSource {
|
|||
}
|
||||
}
|
||||
|
||||
object WholeFileJsonDataSource extends JsonDataSource {
|
||||
object MultiLineJsonDataSource extends JsonDataSource {
|
||||
override val isSplitable: Boolean = {
|
||||
false
|
||||
}
|
||||
|
|
|
@ -163,7 +163,7 @@ final class DataStreamReader private[sql](sparkSession: SparkSession) extends Lo
|
|||
* Loads a JSON file stream and returns the results as a `DataFrame`.
|
||||
*
|
||||
* <a href="http://jsonlines.org/">JSON Lines</a> (newline-delimited JSON) is supported by
|
||||
* default. For JSON (one record per file), set the `wholeFile` option to true.
|
||||
* default. For JSON (one record per file), set the `multiLine` option to true.
|
||||
*
|
||||
* This function goes through the input once to determine the input schema. If you know the
|
||||
* schema in advance, use the version that specifies the schema to avoid the extra scan.
|
||||
|
@ -205,7 +205,7 @@ final class DataStreamReader private[sql](sparkSession: SparkSession) extends Lo
|
|||
* <li>`timestampFormat` (default `yyyy-MM-dd'T'HH:mm:ss.SSSXXX`): sets the string that
|
||||
* indicates a timestamp format. Custom date formats follow the formats at
|
||||
* `java.text.SimpleDateFormat`. This applies to timestamp type.</li>
|
||||
* <li>`wholeFile` (default `false`): parse one record, which may span multiple lines,
|
||||
* <li>`multiLine` (default `false`): parse one record, which may span multiple lines,
|
||||
* per file</li>
|
||||
* </ul>
|
||||
*
|
||||
|
@ -276,7 +276,7 @@ final class DataStreamReader private[sql](sparkSession: SparkSession) extends Lo
|
|||
* <li>`columnNameOfCorruptRecord` (default is the value specified in
|
||||
* `spark.sql.columnNameOfCorruptRecord`): allows renaming the new field having malformed string
|
||||
* created by `PERMISSIVE` mode. This overrides `spark.sql.columnNameOfCorruptRecord`.</li>
|
||||
* <li>`wholeFile` (default `false`): parse one record, which may span multiple lines.</li>
|
||||
* <li>`multiLine` (default `false`): parse one record, which may span multiple lines.</li>
|
||||
* </ul>
|
||||
*
|
||||
* @since 2.0.0
|
||||
|
|
|
@ -261,10 +261,10 @@ class CSVSuite extends QueryTest with SharedSQLContext with SQLTestUtils {
|
|||
}
|
||||
|
||||
test("test for DROPMALFORMED parsing mode") {
|
||||
Seq(false, true).foreach { wholeFile =>
|
||||
Seq(false, true).foreach { multiLine =>
|
||||
val cars = spark.read
|
||||
.format("csv")
|
||||
.option("wholeFile", wholeFile)
|
||||
.option("multiLine", multiLine)
|
||||
.options(Map("header" -> "true", "mode" -> "dropmalformed"))
|
||||
.load(testFile(carsFile))
|
||||
|
||||
|
@ -284,11 +284,11 @@ class CSVSuite extends QueryTest with SharedSQLContext with SQLTestUtils {
|
|||
}
|
||||
|
||||
test("test for FAILFAST parsing mode") {
|
||||
Seq(false, true).foreach { wholeFile =>
|
||||
Seq(false, true).foreach { multiLine =>
|
||||
val exception = intercept[SparkException] {
|
||||
spark.read
|
||||
.format("csv")
|
||||
.option("wholeFile", wholeFile)
|
||||
.option("multiLine", multiLine)
|
||||
.options(Map("header" -> "true", "mode" -> "failfast"))
|
||||
.load(testFile(carsFile)).collect()
|
||||
}
|
||||
|
@ -990,13 +990,13 @@ class CSVSuite extends QueryTest with SharedSQLContext with SQLTestUtils {
|
|||
}
|
||||
|
||||
test("SPARK-18699 put malformed records in a `columnNameOfCorruptRecord` field") {
|
||||
Seq(false, true).foreach { wholeFile =>
|
||||
Seq(false, true).foreach { multiLine =>
|
||||
val schema = new StructType().add("a", IntegerType).add("b", TimestampType)
|
||||
// We use `PERMISSIVE` mode by default if invalid string is given.
|
||||
val df1 = spark
|
||||
.read
|
||||
.option("mode", "abcd")
|
||||
.option("wholeFile", wholeFile)
|
||||
.option("multiLine", multiLine)
|
||||
.schema(schema)
|
||||
.csv(testFile(valueMalformedFile))
|
||||
checkAnswer(df1,
|
||||
|
@ -1011,7 +1011,7 @@ class CSVSuite extends QueryTest with SharedSQLContext with SQLTestUtils {
|
|||
.read
|
||||
.option("mode", "Permissive")
|
||||
.option("columnNameOfCorruptRecord", columnNameOfCorruptRecord)
|
||||
.option("wholeFile", wholeFile)
|
||||
.option("multiLine", multiLine)
|
||||
.schema(schemaWithCorrField1)
|
||||
.csv(testFile(valueMalformedFile))
|
||||
checkAnswer(df2,
|
||||
|
@ -1028,7 +1028,7 @@ class CSVSuite extends QueryTest with SharedSQLContext with SQLTestUtils {
|
|||
.read
|
||||
.option("mode", "permissive")
|
||||
.option("columnNameOfCorruptRecord", columnNameOfCorruptRecord)
|
||||
.option("wholeFile", wholeFile)
|
||||
.option("multiLine", multiLine)
|
||||
.schema(schemaWithCorrField2)
|
||||
.csv(testFile(valueMalformedFile))
|
||||
checkAnswer(df3,
|
||||
|
@ -1041,7 +1041,7 @@ class CSVSuite extends QueryTest with SharedSQLContext with SQLTestUtils {
|
|||
.read
|
||||
.option("mode", "PERMISSIVE")
|
||||
.option("columnNameOfCorruptRecord", columnNameOfCorruptRecord)
|
||||
.option("wholeFile", wholeFile)
|
||||
.option("multiLine", multiLine)
|
||||
.schema(schema.add(columnNameOfCorruptRecord, IntegerType))
|
||||
.csv(testFile(valueMalformedFile))
|
||||
.collect
|
||||
|
@ -1073,7 +1073,7 @@ class CSVSuite extends QueryTest with SharedSQLContext with SQLTestUtils {
|
|||
|
||||
val df = spark.read
|
||||
.option("header", true)
|
||||
.option("wholeFile", true)
|
||||
.option("multiLine", true)
|
||||
.csv(path.getAbsolutePath)
|
||||
|
||||
// Check if headers have new lines in the names.
|
||||
|
@ -1096,10 +1096,10 @@ class CSVSuite extends QueryTest with SharedSQLContext with SQLTestUtils {
|
|||
}
|
||||
|
||||
test("Empty file produces empty dataframe with empty schema") {
|
||||
Seq(false, true).foreach { wholeFile =>
|
||||
Seq(false, true).foreach { multiLine =>
|
||||
val df = spark.read.format("csv")
|
||||
.option("header", true)
|
||||
.option("wholeFile", wholeFile)
|
||||
.option("multiLine", multiLine)
|
||||
.load(testFile(emptyFile))
|
||||
|
||||
assert(df.schema === spark.emptyDataFrame.schema)
|
||||
|
|
|
@ -1814,7 +1814,7 @@ class JsonSuite extends QueryTest with SharedSQLContext with TestJsonData {
|
|||
|
||||
assert(new File(path).listFiles().exists(_.getName.endsWith(".gz")))
|
||||
|
||||
val jsonDF = spark.read.option("wholeFile", true).json(path)
|
||||
val jsonDF = spark.read.option("multiLine", true).json(path)
|
||||
val jsonDir = new File(dir, "json").getCanonicalPath
|
||||
jsonDF.coalesce(1).write
|
||||
.option("compression", "gZiP")
|
||||
|
@ -1836,7 +1836,7 @@ class JsonSuite extends QueryTest with SharedSQLContext with TestJsonData {
|
|||
.write
|
||||
.text(path)
|
||||
|
||||
val jsonDF = spark.read.option("wholeFile", true).json(path)
|
||||
val jsonDF = spark.read.option("multiLine", true).json(path)
|
||||
val jsonDir = new File(dir, "json").getCanonicalPath
|
||||
jsonDF.coalesce(1).write.json(jsonDir)
|
||||
|
||||
|
@ -1865,7 +1865,7 @@ class JsonSuite extends QueryTest with SharedSQLContext with TestJsonData {
|
|||
.write
|
||||
.text(path)
|
||||
|
||||
val jsonDF = spark.read.option("wholeFile", true).json(path)
|
||||
val jsonDF = spark.read.option("multiLine", true).json(path)
|
||||
// no corrupt record column should be created
|
||||
assert(jsonDF.schema === StructType(Seq()))
|
||||
// only the first object should be read
|
||||
|
@ -1886,7 +1886,7 @@ class JsonSuite extends QueryTest with SharedSQLContext with TestJsonData {
|
|||
.write
|
||||
.text(path)
|
||||
|
||||
val jsonDF = spark.read.option("wholeFile", true).option("mode", "PERMISSIVE").json(path)
|
||||
val jsonDF = spark.read.option("multiLine", true).option("mode", "PERMISSIVE").json(path)
|
||||
assert(jsonDF.count() === corruptRecordCount)
|
||||
assert(jsonDF.schema === new StructType()
|
||||
.add("_corrupt_record", StringType)
|
||||
|
@ -1917,7 +1917,7 @@ class JsonSuite extends QueryTest with SharedSQLContext with TestJsonData {
|
|||
.write
|
||||
.text(path)
|
||||
|
||||
val jsonDF = spark.read.option("wholeFile", true).option("mode", "DROPMALFORMED").json(path)
|
||||
val jsonDF = spark.read.option("multiLine", true).option("mode", "DROPMALFORMED").json(path)
|
||||
checkAnswer(jsonDF, Seq(Row("test")))
|
||||
}
|
||||
}
|
||||
|
@ -1940,7 +1940,7 @@ class JsonSuite extends QueryTest with SharedSQLContext with TestJsonData {
|
|||
// `FAILFAST` mode should throw an exception for corrupt records.
|
||||
val exceptionOne = intercept[SparkException] {
|
||||
spark.read
|
||||
.option("wholeFile", true)
|
||||
.option("multiLine", true)
|
||||
.option("mode", "FAILFAST")
|
||||
.json(path)
|
||||
}
|
||||
|
@ -1949,7 +1949,7 @@ class JsonSuite extends QueryTest with SharedSQLContext with TestJsonData {
|
|||
|
||||
val exceptionTwo = intercept[SparkException] {
|
||||
spark.read
|
||||
.option("wholeFile", true)
|
||||
.option("multiLine", true)
|
||||
.option("mode", "FAILFAST")
|
||||
.schema(schema)
|
||||
.json(path)
|
||||
|
|
Loading…
Reference in a new issue