[SPARK-15615][SQL] Add an API to load DataFrame from Dataset[String] storing JSON

## What changes were proposed in this pull request?

SPARK-15615 proposes replacing the sqlContext.read.json(rdd) with a dataset equivalent.
SPARK-15463 adds a CSV API for reading from Dataset[String] so this keeps the API consistent.
I am deprecating the existing RDD based APIs.

## How was this patch tested?

There are existing tests. I left most tests to use the existing APIs as they delegate to the new json API.

Please review http://spark.apache.org/contributing.html before opening a pull request.

Author: pj.fanning <pj.fanning@workday.com>
Author: PJ Fanning <pjfanning@users.noreply.github.com>

Closes #16895 from pjfanning/SPARK-15615.
This commit is contained in:
pj.fanning 2017-02-22 18:03:25 -08:00 committed by Wenchen Fan
parent dc005ed53c
commit d3147502e7

View file

@ -323,6 +323,7 @@ class DataFrameReader private[sql](sparkSession: SparkSession) extends Logging {
* @param jsonRDD input RDD with one JSON object per record * @param jsonRDD input RDD with one JSON object per record
* @since 1.4.0 * @since 1.4.0
*/ */
@deprecated("Use json(Dataset[String]) instead.", "2.2.0")
def json(jsonRDD: JavaRDD[String]): DataFrame = json(jsonRDD.rdd) def json(jsonRDD: JavaRDD[String]): DataFrame = json(jsonRDD.rdd)
/** /**
@ -335,7 +336,22 @@ class DataFrameReader private[sql](sparkSession: SparkSession) extends Logging {
* @param jsonRDD input RDD with one JSON object per record * @param jsonRDD input RDD with one JSON object per record
* @since 1.4.0 * @since 1.4.0
*/ */
@deprecated("Use json(Dataset[String]) instead.", "2.2.0")
def json(jsonRDD: RDD[String]): DataFrame = { def json(jsonRDD: RDD[String]): DataFrame = {
json(sparkSession.createDataset(jsonRDD)(Encoders.STRING))
}
/**
* Loads a `Dataset[String]` storing JSON objects (<a href="http://jsonlines.org/">JSON Lines
* text format or newline-delimited JSON</a>) and returns the result as a `DataFrame`.
*
* Unless the schema is specified using `schema` function, this function goes through the
* input once to determine the input schema.
*
* @param jsonDataset input Dataset with one JSON object per record
* @since 2.2.0
*/
def json(jsonDataset: Dataset[String]): DataFrame = {
val parsedOptions = new JSONOptions( val parsedOptions = new JSONOptions(
extraOptions.toMap, extraOptions.toMap,
sparkSession.sessionState.conf.sessionLocalTimeZone, sparkSession.sessionState.conf.sessionLocalTimeZone,
@ -344,12 +360,12 @@ class DataFrameReader private[sql](sparkSession: SparkSession) extends Logging {
val schema = userSpecifiedSchema.getOrElse { val schema = userSpecifiedSchema.getOrElse {
JsonInferSchema.infer( JsonInferSchema.infer(
jsonRDD, jsonDataset.rdd,
parsedOptions, parsedOptions,
createParser) createParser)
} }
val parsed = jsonRDD.mapPartitions { iter => val parsed = jsonDataset.rdd.mapPartitions { iter =>
val parser = new JacksonParser(schema, parsedOptions) val parser = new JacksonParser(schema, parsedOptions)
iter.flatMap(parser.parse(_, createParser, UTF8String.fromString)) iter.flatMap(parser.parse(_, createParser, UTF8String.fromString))
} }