[SPARK-15615][SQL][BUILD][FOLLOW-UP] Replace deprecated usage of json(RDD[String]) API

## What changes were proposed in this pull request?

This PR proposes to replace the deprecated `json(RDD[String])` usage to `json(Dataset[String])`.

This currently produces so many warnings.

## How was this patch tested?

Fixed tests.

Author: hyukjinkwon <gurwls223@gmail.com>

Closes #17071 from HyukjinKwon/SPARK-15615-followup.
This commit is contained in:
hyukjinkwon 2017-02-27 14:33:02 -08:00 committed by Wenchen Fan
parent 4ba9c6c453
commit 8a5a58506c
21 changed files with 189 additions and 201 deletions

View file

@ -25,8 +25,6 @@ import java.util.List;
import java.util.Properties;
// $example on:basic_parquet_example$
import org.apache.spark.api.java.JavaRDD;
import org.apache.spark.api.java.JavaSparkContext;
import org.apache.spark.api.java.function.MapFunction;
import org.apache.spark.sql.Encoders;
// $example on:schema_merging$
@ -217,12 +215,11 @@ public class JavaSQLDataSourceExample {
// +------+
// Alternatively, a DataFrame can be created for a JSON dataset represented by
// an RDD[String] storing one JSON object per string.
// an Dataset[String] storing one JSON object per string.
List<String> jsonData = Arrays.asList(
"{\"name\":\"Yin\",\"address\":{\"city\":\"Columbus\",\"state\":\"Ohio\"}}");
JavaRDD<String> anotherPeopleRDD =
new JavaSparkContext(spark.sparkContext()).parallelize(jsonData);
Dataset<Row> anotherPeople = spark.read().json(anotherPeopleRDD);
Dataset<String> anotherPeopleDataset = spark.createDataset(jsonData, Encoders.STRING());
Dataset<Row> anotherPeople = spark.read().json(anotherPeopleDataset);
anotherPeople.show();
// +---------------+----+
// | address|name|

View file

@ -111,6 +111,10 @@ object SQLDataSourceExample {
private def runJsonDatasetExample(spark: SparkSession): Unit = {
// $example on:json_dataset$
// Primitive types (Int, String, etc) and Product types (case classes) encoders are
// supported by importing this when creating a Dataset.
import spark.implicits._
// A JSON dataset is pointed to by path.
// The path can be either a single text file or a directory storing text files
val path = "examples/src/main/resources/people.json"
@ -135,10 +139,10 @@ object SQLDataSourceExample {
// +------+
// Alternatively, a DataFrame can be created for a JSON dataset represented by
// an RDD[String] storing one JSON object per string
val otherPeopleRDD = spark.sparkContext.makeRDD(
// an Dataset[String] storing one JSON object per string
val otherPeopleDataset = spark.createDataset(
"""{"name":"Yin","address":{"city":"Columbus","state":"Ohio"}}""" :: Nil)
val otherPeople = spark.read.json(otherPeopleRDD)
val otherPeople = spark.read.json(otherPeopleDataset)
otherPeople.show()
// +---------------+----+
// | address|name|

View file

@ -31,6 +31,7 @@ import org.junit.Test;
import org.apache.spark.api.java.JavaRDD;
import org.apache.spark.api.java.JavaSparkContext;
import org.apache.spark.sql.Dataset;
import org.apache.spark.sql.Encoders;
import org.apache.spark.sql.Row;
import org.apache.spark.sql.RowFactory;
import org.apache.spark.sql.SparkSession;
@ -146,13 +147,13 @@ public class JavaApplySchemaSuite implements Serializable {
@Test
public void applySchemaToJSON() {
JavaRDD<String> jsonRDD = jsc.parallelize(Arrays.asList(
Dataset<String> jsonDS = spark.createDataset(Arrays.asList(
"{\"string\":\"this is a simple string.\", \"integer\":10, \"long\":21474836470, " +
"\"bigInteger\":92233720368547758070, \"double\":1.7976931348623157E308, " +
"\"boolean\":true, \"null\":null}",
"{\"string\":\"this is another simple string.\", \"integer\":11, \"long\":21474836469, " +
"\"bigInteger\":92233720368547758069, \"double\":1.7976931348623157E305, " +
"\"boolean\":false, \"null\":null}"));
"\"boolean\":false, \"null\":null}"), Encoders.STRING());
List<StructField> fields = new ArrayList<>(7);
fields.add(DataTypes.createStructField("bigInteger", DataTypes.createDecimalType(20, 0),
true));
@ -183,14 +184,14 @@ public class JavaApplySchemaSuite implements Serializable {
null,
"this is another simple string."));
Dataset<Row> df1 = spark.read().json(jsonRDD);
Dataset<Row> df1 = spark.read().json(jsonDS);
StructType actualSchema1 = df1.schema();
Assert.assertEquals(expectedSchema, actualSchema1);
df1.createOrReplaceTempView("jsonTable1");
List<Row> actual1 = spark.sql("select * from jsonTable1").collectAsList();
Assert.assertEquals(expectedResult, actual1);
Dataset<Row> df2 = spark.read().schema(expectedSchema).json(jsonRDD);
Dataset<Row> df2 = spark.read().schema(expectedSchema).json(jsonDS);
StructType actualSchema2 = df2.schema();
Assert.assertEquals(expectedSchema, actualSchema2);
df2.createOrReplaceTempView("jsonTable2");

View file

@ -414,4 +414,13 @@ public class JavaDataFrameSuite {
Assert.assertEquals(df.schema().length(), 0);
Assert.assertEquals(df.collectAsList().size(), 1);
}
@Test
public void testJsonRDDToDataFrame() {
// This is a test for the deprecated API in SPARK-15615.
JavaRDD<String> rdd = jsc.parallelize(Arrays.asList("{\"a\": 2}"));
Dataset<Row> df = spark.read().json(rdd);
Assert.assertEquals(1L, df.count());
Assert.assertEquals(2L, df.collectAsList().get(0).getLong(0));
}
}

View file

@ -29,8 +29,6 @@ import org.junit.Assert;
import org.junit.Before;
import org.junit.Test;
import org.apache.spark.api.java.JavaRDD;
import org.apache.spark.api.java.JavaSparkContext;
import org.apache.spark.sql.*;
import org.apache.spark.sql.types.DataTypes;
import org.apache.spark.sql.types.StructField;
@ -40,7 +38,6 @@ import org.apache.spark.util.Utils;
public class JavaSaveLoadSuite {
private transient SparkSession spark;
private transient JavaSparkContext jsc;
File path;
Dataset<Row> df;
@ -58,7 +55,6 @@ public class JavaSaveLoadSuite {
.master("local[*]")
.appName("testing")
.getOrCreate();
jsc = new JavaSparkContext(spark.sparkContext());
path =
Utils.createTempDir(System.getProperty("java.io.tmpdir"), "datasource").getCanonicalFile();
@ -70,8 +66,8 @@ public class JavaSaveLoadSuite {
for (int i = 0; i < 10; i++) {
jsonObjects.add("{\"a\":" + i + ", \"b\":\"str" + i + "\"}");
}
JavaRDD<String> rdd = jsc.parallelize(jsonObjects);
df = spark.read().json(rdd);
Dataset<String> ds = spark.createDataset(jsonObjects, Encoders.STRING());
df = spark.read().json(ds);
df.createOrReplaceTempView("jsonTable");
}

View file

@ -914,15 +914,13 @@ class DataFrameSuite extends QueryTest with SharedSQLContext {
}
test("SPARK-7551: support backticks for DataFrame attribute resolution") {
val df = spark.read.json(sparkContext.makeRDD(
"""{"a.b": {"c": {"d..e": {"f": 1}}}}""" :: Nil))
val df = spark.read.json(Seq("""{"a.b": {"c": {"d..e": {"f": 1}}}}""").toDS())
checkAnswer(
df.select(df("`a.b`.c.`d..e`.`f`")),
Row(1)
)
val df2 = spark.read.json(sparkContext.makeRDD(
"""{"a b": {"c": {"d e": {"f": 1}}}}""" :: Nil))
val df2 = spark.read.json(Seq("""{"a b": {"c": {"d e": {"f": 1}}}}""").toDS())
checkAnswer(
df2.select(df2("`a b`.c.d e.f")),
Row(1)
@ -1110,8 +1108,7 @@ class DataFrameSuite extends QueryTest with SharedSQLContext {
}
test("SPARK-9323: DataFrame.orderBy should support nested column name") {
val df = spark.read.json(sparkContext.makeRDD(
"""{"a": {"b": 1}}""" :: Nil))
val df = spark.read.json(Seq("""{"a": {"b": 1}}""").toDS())
checkAnswer(df.orderBy("a.b"), Row(Row(1)))
}
@ -1164,8 +1161,7 @@ class DataFrameSuite extends QueryTest with SharedSQLContext {
}
test("SPARK-10316: respect non-deterministic expressions in PhysicalOperation") {
val input = spark.read.json(spark.sparkContext.makeRDD(
(1 to 10).map(i => s"""{"id": $i}""")))
val input = spark.read.json((1 to 10).map(i => s"""{"id": $i}""").toDS())
val df = input.select($"id", rand(0).as('r))
df.as("a").join(df.filter($"r" < 0.5).as("b"), $"a.id" === $"b.id").collect().foreach { row =>

View file

@ -211,8 +211,8 @@ class SQLQuerySuite extends QueryTest with SharedSQLContext {
}
test("grouping on nested fields") {
spark.read.json(sparkContext.parallelize(
"""{"nested": {"attribute": 1}, "value": 2}""" :: Nil))
spark.read
.json(Seq("""{"nested": {"attribute": 1}, "value": 2}""").toDS())
.createOrReplaceTempView("rows")
checkAnswer(
@ -229,9 +229,8 @@ class SQLQuerySuite extends QueryTest with SharedSQLContext {
}
test("SPARK-6201 IN type conversion") {
spark.read.json(
sparkContext.parallelize(
Seq("{\"a\": \"1\"}}", "{\"a\": \"2\"}}", "{\"a\": \"3\"}}")))
spark.read
.json(Seq("{\"a\": \"1\"}}", "{\"a\": \"2\"}}", "{\"a\": \"3\"}}").toDS())
.createOrReplaceTempView("d")
checkAnswer(
@ -240,9 +239,8 @@ class SQLQuerySuite extends QueryTest with SharedSQLContext {
}
test("SPARK-11226 Skip empty line in json file") {
spark.read.json(
sparkContext.parallelize(
Seq("{\"a\": \"1\"}}", "{\"a\": \"2\"}}", "{\"a\": \"3\"}}", "")))
spark.read
.json(Seq("{\"a\": \"1\"}}", "{\"a\": \"2\"}}", "{\"a\": \"3\"}}", "").toDS())
.createOrReplaceTempView("d")
checkAnswer(
@ -1214,8 +1212,7 @@ class SQLQuerySuite extends QueryTest with SharedSQLContext {
}
test("SPARK-3483 Special chars in column names") {
val data = sparkContext.parallelize(
Seq("""{"key?number1": "value1", "key.number2": "value2"}"""))
val data = Seq("""{"key?number1": "value1", "key.number2": "value2"}""").toDS()
spark.read.json(data).createOrReplaceTempView("records")
sql("SELECT `key?number1`, `key.number2` FROM records")
}
@ -1257,13 +1254,13 @@ class SQLQuerySuite extends QueryTest with SharedSQLContext {
}
test("SPARK-4322 Grouping field with struct field as sub expression") {
spark.read.json(sparkContext.makeRDD("""{"a": {"b": [{"c": 1}]}}""" :: Nil))
spark.read.json(Seq("""{"a": {"b": [{"c": 1}]}}""").toDS())
.createOrReplaceTempView("data")
checkAnswer(sql("SELECT a.b[0].c FROM data GROUP BY a.b[0].c"), Row(1))
spark.catalog.dropTempView("data")
spark.read.json(
sparkContext.makeRDD("""{"a": {"b": 1}}""" :: Nil)).createOrReplaceTempView("data")
spark.read.json(Seq("""{"a": {"b": 1}}""").toDS())
.createOrReplaceTempView("data")
checkAnswer(sql("SELECT a.b + 1 FROM data GROUP BY a.b + 1"), Row(2))
spark.catalog.dropTempView("data")
}
@ -1311,8 +1308,8 @@ class SQLQuerySuite extends QueryTest with SharedSQLContext {
}
test("SPARK-6145: ORDER BY test for nested fields") {
spark.read.json(sparkContext.makeRDD(
"""{"a": {"b": 1, "a": {"a": 1}}, "c": [{"d": 1}]}""" :: Nil))
spark.read
.json(Seq("""{"a": {"b": 1, "a": {"a": 1}}, "c": [{"d": 1}]}""").toDS())
.createOrReplaceTempView("nestedOrder")
checkAnswer(sql("SELECT 1 FROM nestedOrder ORDER BY a.b"), Row(1))
@ -1325,7 +1322,7 @@ class SQLQuerySuite extends QueryTest with SharedSQLContext {
test("SPARK-6145: special cases") {
spark.read
.json(sparkContext.makeRDD("""{"a": {"b": [1]}, "b": [{"a": 1}], "_c0": {"a": 1}}""" :: Nil))
.json(Seq("""{"a": {"b": [1]}, "b": [{"a": 1}], "_c0": {"a": 1}}""").toDS())
.createOrReplaceTempView("t")
checkAnswer(sql("SELECT a.b[0] FROM t ORDER BY _c0.a"), Row(1))
@ -1333,8 +1330,8 @@ class SQLQuerySuite extends QueryTest with SharedSQLContext {
}
test("SPARK-6898: complete support for special chars in column names") {
spark.read.json(sparkContext.makeRDD(
"""{"a": {"c.b": 1}, "b.$q": [{"a@!.q": 1}], "q.w": {"w.i&": [1]}}""" :: Nil))
spark.read
.json(Seq("""{"a": {"c.b": 1}, "b.$q": [{"a@!.q": 1}], "q.w": {"w.i&": [1]}}""").toDS())
.createOrReplaceTempView("t")
checkAnswer(sql("SELECT a.`c.b`, `b.$q`[0].`a@!.q`, `q.w`.`w.i&`[0] FROM t"), Row(1, 1, 1))
@ -1437,8 +1434,9 @@ class SQLQuerySuite extends QueryTest with SharedSQLContext {
test("SPARK-7067: order by queries for complex ExtractValue chain") {
withTempView("t") {
spark.read.json(sparkContext.makeRDD(
"""{"a": {"b": [{"c": 1}]}, "b": [{"d": 1}]}""" :: Nil)).createOrReplaceTempView("t")
spark.read
.json(Seq("""{"a": {"b": [{"c": 1}]}, "b": [{"d": 1}]}""").toDS())
.createOrReplaceTempView("t")
checkAnswer(sql("SELECT a.b FROM t ORDER BY b[0].d"), Row(Seq(Row(1))))
}
}
@ -2109,8 +2107,7 @@ class SQLQuerySuite extends QueryTest with SharedSQLContext {
|"a": [{"count": 3}], "b": [{"e": "test", "count": 1}]}}}'
|
""".stripMargin
val rdd = sparkContext.parallelize(Array(json))
spark.read.json(rdd).write.mode("overwrite").parquet(dir.toString)
spark.read.json(Seq(json).toDS()).write.mode("overwrite").parquet(dir.toString)
spark.read.parquet(dir.toString).collect()
}
}

View file

@ -221,8 +221,7 @@ class UserDefinedTypeSuite extends QueryTest with SharedSQLContext with ParquetT
StructField("vec", new UDT.MyDenseVectorUDT, false)
))
val stringRDD = sparkContext.parallelize(data)
val jsonRDD = spark.read.schema(schema).json(stringRDD)
val jsonRDD = spark.read.schema(schema).json(data.toDS())
checkAnswer(
jsonRDD,
Row(1, new UDT.MyDenseVector(Array(1.1, 2.2, 3.3, 4.4))) ::
@ -242,8 +241,7 @@ class UserDefinedTypeSuite extends QueryTest with SharedSQLContext with ParquetT
StructField("vec", new UDT.MyDenseVectorUDT, false)
))
val stringRDD = sparkContext.parallelize(data)
val jsonDataset = spark.read.schema(schema).json(stringRDD)
val jsonDataset = spark.read.schema(schema).json(data.toDS())
.as[(Int, UDT.MyDenseVector)]
checkDataset(
jsonDataset,

View file

@ -140,7 +140,7 @@ class WideSchemaBenchmark extends SparkFunSuite with BeforeAndAfterEach {
}
datum += "}"
datum = s"""{"a": {"b": {"c": $datum, "d": $datum}, "e": $datum}}"""
val df = sparkSession.read.json(sparkSession.range(numRows).map(_ => datum).rdd).cache()
val df = sparkSession.read.json(sparkSession.range(numRows).map(_ => datum)).cache()
df.count() // force caching
addCases(benchmark, df, s"$width wide x $numRows rows", "a.b.c.value_1")
}
@ -157,7 +157,7 @@ class WideSchemaBenchmark extends SparkFunSuite with BeforeAndAfterEach {
datum = "{\"value\": " + datum + "}"
selector = selector + ".value"
}
val df = sparkSession.read.json(sparkSession.range(numRows).map(_ => datum).rdd).cache()
val df = sparkSession.read.json(sparkSession.range(numRows).map(_ => datum)).cache()
df.count() // force caching
addCases(benchmark, df, s"$depth deep x $numRows rows", selector)
}
@ -180,7 +180,7 @@ class WideSchemaBenchmark extends SparkFunSuite with BeforeAndAfterEach {
}
// TODO(ekl) seems like the json parsing is actually the majority of the time, perhaps
// we should benchmark that too separately.
val df = sparkSession.read.json(sparkSession.range(numRows).map(_ => datum).rdd).cache()
val df = sparkSession.read.json(sparkSession.range(numRows).map(_ => datum)).cache()
df.count() // force caching
addCases(benchmark, df, s"$numNodes x $depth deep x $numRows rows", selector)
}
@ -200,7 +200,7 @@ class WideSchemaBenchmark extends SparkFunSuite with BeforeAndAfterEach {
}
}
datum += "]}"
val df = sparkSession.read.json(sparkSession.range(numRows).map(_ => datum).rdd).cache()
val df = sparkSession.read.json(sparkSession.range(numRows).map(_ => datum)).cache()
df.count() // force caching
addCases(benchmark, df, s"$width wide x $numRows rows", "value[0]")
}

View file

@ -25,19 +25,18 @@ import org.apache.spark.sql.test.SharedSQLContext
* Test cases for various [[JSONOptions]].
*/
class JsonParsingOptionsSuite extends QueryTest with SharedSQLContext {
import testImplicits._
test("allowComments off") {
val str = """{'name': /* hello */ 'Reynold Xin'}"""
val rdd = spark.sparkContext.parallelize(Seq(str))
val df = spark.read.json(rdd)
val df = spark.read.json(Seq(str).toDS())
assert(df.schema.head.name == "_corrupt_record")
}
test("allowComments on") {
val str = """{'name': /* hello */ 'Reynold Xin'}"""
val rdd = spark.sparkContext.parallelize(Seq(str))
val df = spark.read.option("allowComments", "true").json(rdd)
val df = spark.read.option("allowComments", "true").json(Seq(str).toDS())
assert(df.schema.head.name == "name")
assert(df.first().getString(0) == "Reynold Xin")
@ -45,16 +44,14 @@ class JsonParsingOptionsSuite extends QueryTest with SharedSQLContext {
test("allowSingleQuotes off") {
val str = """{'name': 'Reynold Xin'}"""
val rdd = spark.sparkContext.parallelize(Seq(str))
val df = spark.read.option("allowSingleQuotes", "false").json(rdd)
val df = spark.read.option("allowSingleQuotes", "false").json(Seq(str).toDS())
assert(df.schema.head.name == "_corrupt_record")
}
test("allowSingleQuotes on") {
val str = """{'name': 'Reynold Xin'}"""
val rdd = spark.sparkContext.parallelize(Seq(str))
val df = spark.read.json(rdd)
val df = spark.read.json(Seq(str).toDS())
assert(df.schema.head.name == "name")
assert(df.first().getString(0) == "Reynold Xin")
@ -62,16 +59,14 @@ class JsonParsingOptionsSuite extends QueryTest with SharedSQLContext {
test("allowUnquotedFieldNames off") {
val str = """{name: 'Reynold Xin'}"""
val rdd = spark.sparkContext.parallelize(Seq(str))
val df = spark.read.json(rdd)
val df = spark.read.json(Seq(str).toDS())
assert(df.schema.head.name == "_corrupt_record")
}
test("allowUnquotedFieldNames on") {
val str = """{name: 'Reynold Xin'}"""
val rdd = spark.sparkContext.parallelize(Seq(str))
val df = spark.read.option("allowUnquotedFieldNames", "true").json(rdd)
val df = spark.read.option("allowUnquotedFieldNames", "true").json(Seq(str).toDS())
assert(df.schema.head.name == "name")
assert(df.first().getString(0) == "Reynold Xin")
@ -79,16 +74,14 @@ class JsonParsingOptionsSuite extends QueryTest with SharedSQLContext {
test("allowNumericLeadingZeros off") {
val str = """{"age": 0018}"""
val rdd = spark.sparkContext.parallelize(Seq(str))
val df = spark.read.json(rdd)
val df = spark.read.json(Seq(str).toDS())
assert(df.schema.head.name == "_corrupt_record")
}
test("allowNumericLeadingZeros on") {
val str = """{"age": 0018}"""
val rdd = spark.sparkContext.parallelize(Seq(str))
val df = spark.read.option("allowNumericLeadingZeros", "true").json(rdd)
val df = spark.read.option("allowNumericLeadingZeros", "true").json(Seq(str).toDS())
assert(df.schema.head.name == "age")
assert(df.first().getLong(0) == 18)
@ -98,16 +91,14 @@ class JsonParsingOptionsSuite extends QueryTest with SharedSQLContext {
// JsonParser.Feature.ALLOW_NON_NUMERIC_NUMBERS.
ignore("allowNonNumericNumbers off") {
val str = """{"age": NaN}"""
val rdd = spark.sparkContext.parallelize(Seq(str))
val df = spark.read.json(rdd)
val df = spark.read.json(Seq(str).toDS())
assert(df.schema.head.name == "_corrupt_record")
}
ignore("allowNonNumericNumbers on") {
val str = """{"age": NaN}"""
val rdd = spark.sparkContext.parallelize(Seq(str))
val df = spark.read.option("allowNonNumericNumbers", "true").json(rdd)
val df = spark.read.option("allowNonNumericNumbers", "true").json(Seq(str).toDS())
assert(df.schema.head.name == "age")
assert(df.first().getDouble(0).isNaN)
@ -115,16 +106,14 @@ class JsonParsingOptionsSuite extends QueryTest with SharedSQLContext {
test("allowBackslashEscapingAnyCharacter off") {
val str = """{"name": "Cazen Lee", "price": "\$10"}"""
val rdd = spark.sparkContext.parallelize(Seq(str))
val df = spark.read.option("allowBackslashEscapingAnyCharacter", "false").json(rdd)
val df = spark.read.option("allowBackslashEscapingAnyCharacter", "false").json(Seq(str).toDS())
assert(df.schema.head.name == "_corrupt_record")
}
test("allowBackslashEscapingAnyCharacter on") {
val str = """{"name": "Cazen Lee", "price": "\$10"}"""
val rdd = spark.sparkContext.parallelize(Seq(str))
val df = spark.read.option("allowBackslashEscapingAnyCharacter", "true").json(rdd)
val df = spark.read.option("allowBackslashEscapingAnyCharacter", "true").json(Seq(str).toDS())
assert(df.schema.head.name == "name")
assert(df.schema.last.name == "price")

View file

@ -590,7 +590,7 @@ class JsonSuite extends QueryTest with SharedSQLContext with TestJsonData {
val dir = Utils.createTempDir()
dir.delete()
val path = dir.getCanonicalPath
primitiveFieldAndType.map(record => record.replaceAll("\n", " ")).saveAsTextFile(path)
primitiveFieldAndType.map(record => record.replaceAll("\n", " ")).write.text(path)
val jsonDF = spark.read.json(path)
val expectedSchema = StructType(
@ -622,7 +622,7 @@ class JsonSuite extends QueryTest with SharedSQLContext with TestJsonData {
val dir = Utils.createTempDir()
dir.delete()
val path = dir.getCanonicalPath
primitiveFieldAndType.map(record => record.replaceAll("\n", " ")).saveAsTextFile(path)
primitiveFieldAndType.map(record => record.replaceAll("\n", " ")).write.text(path)
val jsonDF = spark.read.option("primitivesAsString", "true").json(path)
val expectedSchema = StructType(
@ -777,9 +777,9 @@ class JsonSuite extends QueryTest with SharedSQLContext with TestJsonData {
}
test("Find compatible types even if inferred DecimalType is not capable of other IntegralType") {
val mixedIntegerAndDoubleRecords = sparkContext.parallelize(
"""{"a": 3, "b": 1.1}""" ::
s"""{"a": 3.1, "b": 0.${"0" * 38}1}""" :: Nil)
val mixedIntegerAndDoubleRecords = Seq(
"""{"a": 3, "b": 1.1}""",
s"""{"a": 3.1, "b": 0.${"0" * 38}1}""").toDS()
val jsonDF = spark.read
.option("prefersDecimal", "true")
.json(mixedIntegerAndDoubleRecords)
@ -828,7 +828,7 @@ class JsonSuite extends QueryTest with SharedSQLContext with TestJsonData {
val mergedJsonDF = spark.read
.option("prefersDecimal", "true")
.json(floatingValueRecords ++ bigIntegerRecords)
.json(floatingValueRecords.union(bigIntegerRecords))
val expectedMergedSchema = StructType(
StructField("a", DoubleType, true) ::
@ -846,7 +846,7 @@ class JsonSuite extends QueryTest with SharedSQLContext with TestJsonData {
val dir = Utils.createTempDir()
dir.delete()
val path = dir.toURI.toString
primitiveFieldAndType.map(record => record.replaceAll("\n", " ")).saveAsTextFile(path)
primitiveFieldAndType.map(record => record.replaceAll("\n", " ")).write.text(path)
sql(
s"""
@ -873,7 +873,7 @@ class JsonSuite extends QueryTest with SharedSQLContext with TestJsonData {
val dir = Utils.createTempDir()
dir.delete()
val path = dir.getCanonicalPath
primitiveFieldAndType.map(record => record.replaceAll("\n", " ")).saveAsTextFile(path)
primitiveFieldAndType.map(record => record.replaceAll("\n", " ")).write.text(path)
val schema = StructType(
StructField("bigInteger", DecimalType.SYSTEM_DEFAULT, true) ::
@ -1263,7 +1263,7 @@ class JsonSuite extends QueryTest with SharedSQLContext with TestJsonData {
assert(result2(3) === "{\"f1\":{\"f11\":4,\"f12\":true},\"f2\":{\"D4\":2147483644}}")
val jsonDF = spark.read.json(primitiveFieldAndType)
val primTable = spark.read.json(jsonDF.toJSON.rdd)
val primTable = spark.read.json(jsonDF.toJSON)
primTable.createOrReplaceTempView("primitiveTable")
checkAnswer(
sql("select * from primitiveTable"),
@ -1276,7 +1276,7 @@ class JsonSuite extends QueryTest with SharedSQLContext with TestJsonData {
)
val complexJsonDF = spark.read.json(complexFieldAndType1)
val compTable = spark.read.json(complexJsonDF.toJSON.rdd)
val compTable = spark.read.json(complexJsonDF.toJSON)
compTable.createOrReplaceTempView("complexTable")
// Access elements of a primitive array.
checkAnswer(
@ -1364,10 +1364,10 @@ class JsonSuite extends QueryTest with SharedSQLContext with TestJsonData {
})
}
test("SPARK-6245 JsonRDD.inferSchema on empty RDD") {
test("SPARK-6245 JsonInferSchema.infer on empty RDD") {
// This is really a test that it doesn't throw an exception
val emptySchema = JsonInferSchema.infer(
empty,
empty.rdd,
new JSONOptions(Map.empty[String, String], "GMT"),
CreateJacksonParser.string)
assert(StructType(Seq()) === emptySchema)
@ -1394,7 +1394,7 @@ class JsonSuite extends QueryTest with SharedSQLContext with TestJsonData {
test("SPARK-8093 Erase empty structs") {
val emptySchema = JsonInferSchema.infer(
emptyRecords,
emptyRecords.rdd,
new JSONOptions(Map.empty[String, String], "GMT"),
CreateJacksonParser.string)
assert(StructType(Seq()) === emptySchema)
@ -1592,7 +1592,7 @@ class JsonSuite extends QueryTest with SharedSQLContext with TestJsonData {
val dir = Utils.createTempDir()
dir.delete()
val path = dir.getCanonicalPath
arrayAndStructRecords.map(record => record.replaceAll("\n", " ")).saveAsTextFile(path)
arrayAndStructRecords.map(record => record.replaceAll("\n", " ")).write.text(path)
val schema =
StructType(
@ -1609,7 +1609,7 @@ class JsonSuite extends QueryTest with SharedSQLContext with TestJsonData {
val dir = Utils.createTempDir()
dir.delete()
val path = dir.getCanonicalPath
primitiveFieldAndType.map(record => record.replaceAll("\n", " ")).saveAsTextFile(path)
primitiveFieldAndType.map(record => record.replaceAll("\n", " ")).write.text(path)
val jsonDF = spark.read.json(path)
val jsonDir = new File(dir, "json").getCanonicalPath
@ -1645,7 +1645,7 @@ class JsonSuite extends QueryTest with SharedSQLContext with TestJsonData {
dir.delete()
val path = dir.getCanonicalPath
primitiveFieldAndType.map(record => record.replaceAll("\n", " ")).saveAsTextFile(path)
primitiveFieldAndType.map(record => record.replaceAll("\n", " ")).write.text(path)
val jsonDF = spark.read.json(path)
val jsonDir = new File(dir, "json").getCanonicalPath
@ -1693,8 +1693,7 @@ class JsonSuite extends QueryTest with SharedSQLContext with TestJsonData {
val json = s"""
|{"a": [{$nested}], "b": [{$nested}]}
""".stripMargin
val rdd = spark.sparkContext.makeRDD(Seq(json))
val df = spark.read.json(rdd)
val df = spark.read.json(Seq(json).toDS())
assert(df.schema.size === 2)
df.collect()
}
@ -1794,8 +1793,7 @@ class JsonSuite extends QueryTest with SharedSQLContext with TestJsonData {
}
test("SPARK-18433: Improve DataSource option keys to be more case-insensitive") {
val records = sparkContext
.parallelize("""{"a": 3, "b": 1.1}""" :: """{"a": 3.1, "b": 0.000001}""" :: Nil)
val records = Seq("""{"a": 3, "b": 1.1}""", """{"a": 3.1, "b": 0.000001}""").toDS()
val schema = StructType(
StructField("a", DecimalType(21, 1), true) ::

View file

@ -17,14 +17,13 @@
package org.apache.spark.sql.execution.datasources.json
import org.apache.spark.rdd.RDD
import org.apache.spark.sql.SparkSession
import org.apache.spark.sql.{Dataset, Encoders, SparkSession}
private[json] trait TestJsonData {
protected def spark: SparkSession
def primitiveFieldAndType: RDD[String] =
spark.sparkContext.parallelize(
def primitiveFieldAndType: Dataset[String] =
spark.createDataset(spark.sparkContext.parallelize(
"""{"string":"this is a simple string.",
"integer":10,
"long":21474836470,
@ -32,10 +31,10 @@ private[json] trait TestJsonData {
"double":1.7976931348623157E308,
"boolean":true,
"null":null
}""" :: Nil)
}""" :: Nil))(Encoders.STRING)
def primitiveFieldValueTypeConflict: RDD[String] =
spark.sparkContext.parallelize(
def primitiveFieldValueTypeConflict: Dataset[String] =
spark.createDataset(spark.sparkContext.parallelize(
"""{"num_num_1":11, "num_num_2":null, "num_num_3": 1.1,
"num_bool":true, "num_str":13.1, "str_bool":"str1"}""" ::
"""{"num_num_1":null, "num_num_2":21474836470.9, "num_num_3": null,
@ -44,16 +43,17 @@ private[json] trait TestJsonData {
"num_bool":false, "num_str":"str1", "str_bool":false}""" ::
"""{"num_num_1":21474836570, "num_num_2":1.1, "num_num_3": 21474836470,
"num_bool":null, "num_str":92233720368547758070, "str_bool":null}""" :: Nil)
)(Encoders.STRING)
def jsonNullStruct: RDD[String] =
spark.sparkContext.parallelize(
def jsonNullStruct: Dataset[String] =
spark.createDataset(spark.sparkContext.parallelize(
"""{"nullstr":"","ip":"27.31.100.29","headers":{"Host":"1.abc.com","Charset":"UTF-8"}}""" ::
"""{"nullstr":"","ip":"27.31.100.29","headers":{}}""" ::
"""{"nullstr":"","ip":"27.31.100.29","headers":""}""" ::
"""{"nullstr":null,"ip":"27.31.100.29","headers":null}""" :: Nil)
"""{"nullstr":null,"ip":"27.31.100.29","headers":null}""" :: Nil))(Encoders.STRING)
def complexFieldValueTypeConflict: RDD[String] =
spark.sparkContext.parallelize(
def complexFieldValueTypeConflict: Dataset[String] =
spark.createDataset(spark.sparkContext.parallelize(
"""{"num_struct":11, "str_array":[1, 2, 3],
"array":[], "struct_array":[], "struct": {}}""" ::
"""{"num_struct":{"field":false}, "str_array":null,
@ -62,24 +62,25 @@ private[json] trait TestJsonData {
"array":[4, 5, 6], "struct_array":[7, 8, 9], "struct": {"field":null}}""" ::
"""{"num_struct":{}, "str_array":["str1", "str2", 33],
"array":[7], "struct_array":{"field": true}, "struct": {"field": "str"}}""" :: Nil)
)(Encoders.STRING)
def arrayElementTypeConflict: RDD[String] =
spark.sparkContext.parallelize(
def arrayElementTypeConflict: Dataset[String] =
spark.createDataset(spark.sparkContext.parallelize(
"""{"array1": [1, 1.1, true, null, [], {}, [2,3,4], {"field":"str"}],
"array2": [{"field":214748364700}, {"field":1}]}""" ::
"""{"array3": [{"field":"str"}, {"field":1}]}""" ::
"""{"array3": [1, 2, 3]}""" :: Nil)
"""{"array3": [1, 2, 3]}""" :: Nil))(Encoders.STRING)
def missingFields: RDD[String] =
spark.sparkContext.parallelize(
def missingFields: Dataset[String] =
spark.createDataset(spark.sparkContext.parallelize(
"""{"a":true}""" ::
"""{"b":21474836470}""" ::
"""{"c":[33, 44]}""" ::
"""{"d":{"field":true}}""" ::
"""{"e":"str"}""" :: Nil)
"""{"e":"str"}""" :: Nil))(Encoders.STRING)
def complexFieldAndType1: RDD[String] =
spark.sparkContext.parallelize(
def complexFieldAndType1: Dataset[String] =
spark.createDataset(spark.sparkContext.parallelize(
"""{"struct":{"field1": true, "field2": 92233720368547758070},
"structWithArrayFields":{"field1":[4, 5, 6], "field2":["str1", "str2"]},
"arrayOfString":["str1", "str2"],
@ -92,10 +93,10 @@ private[json] trait TestJsonData {
"arrayOfStruct":[{"field1": true, "field2": "str1"}, {"field1": false}, {"field3": null}],
"arrayOfArray1":[[1, 2, 3], ["str1", "str2"]],
"arrayOfArray2":[[1, 2, 3], [1.1, 2.1, 3.1]]
}""" :: Nil)
}""" :: Nil))(Encoders.STRING)
def complexFieldAndType2: RDD[String] =
spark.sparkContext.parallelize(
def complexFieldAndType2: Dataset[String] =
spark.createDataset(spark.sparkContext.parallelize(
"""{"arrayOfStruct":[{"field1": true, "field2": "str1"}, {"field1": false}, {"field3": null}],
"complexArrayOfStruct": [
{
@ -146,89 +147,90 @@ private[json] trait TestJsonData {
{"inner3": [[{"inner4": 2}]]}
]
]]
}""" :: Nil)
}""" :: Nil))(Encoders.STRING)
def mapType1: RDD[String] =
spark.sparkContext.parallelize(
def mapType1: Dataset[String] =
spark.createDataset(spark.sparkContext.parallelize(
"""{"map": {"a": 1}}""" ::
"""{"map": {"b": 2}}""" ::
"""{"map": {"c": 3}}""" ::
"""{"map": {"c": 1, "d": 4}}""" ::
"""{"map": {"e": null}}""" :: Nil)
"""{"map": {"e": null}}""" :: Nil))(Encoders.STRING)
def mapType2: RDD[String] =
spark.sparkContext.parallelize(
def mapType2: Dataset[String] =
spark.createDataset(spark.sparkContext.parallelize(
"""{"map": {"a": {"field1": [1, 2, 3, null]}}}""" ::
"""{"map": {"b": {"field2": 2}}}""" ::
"""{"map": {"c": {"field1": [], "field2": 4}}}""" ::
"""{"map": {"c": {"field2": 3}, "d": {"field1": [null]}}}""" ::
"""{"map": {"e": null}}""" ::
"""{"map": {"f": {"field1": null}}}""" :: Nil)
"""{"map": {"f": {"field1": null}}}""" :: Nil))(Encoders.STRING)
def nullsInArrays: RDD[String] =
spark.sparkContext.parallelize(
def nullsInArrays: Dataset[String] =
spark.createDataset(spark.sparkContext.parallelize(
"""{"field1":[[null], [[["Test"]]]]}""" ::
"""{"field2":[null, [{"Test":1}]]}""" ::
"""{"field3":[[null], [{"Test":"2"}]]}""" ::
"""{"field4":[[null, [1,2,3]]]}""" :: Nil)
"""{"field4":[[null, [1,2,3]]]}""" :: Nil))(Encoders.STRING)
def jsonArray: RDD[String] =
spark.sparkContext.parallelize(
def jsonArray: Dataset[String] =
spark.createDataset(spark.sparkContext.parallelize(
"""[{"a":"str_a_1"}]""" ::
"""[{"a":"str_a_2"}, {"b":"str_b_3"}]""" ::
"""{"b":"str_b_4", "a":"str_a_4", "c":"str_c_4"}""" ::
"""[]""" :: Nil)
"""[]""" :: Nil))(Encoders.STRING)
def corruptRecords: RDD[String] =
spark.sparkContext.parallelize(
def corruptRecords: Dataset[String] =
spark.createDataset(spark.sparkContext.parallelize(
"""{""" ::
"""""" ::
"""{"a":1, b:2}""" ::
"""{"a":{, b:3}""" ::
"""{"b":"str_b_4", "a":"str_a_4", "c":"str_c_4"}""" ::
"""]""" :: Nil)
"""]""" :: Nil))(Encoders.STRING)
def additionalCorruptRecords: RDD[String] =
spark.sparkContext.parallelize(
def additionalCorruptRecords: Dataset[String] =
spark.createDataset(spark.sparkContext.parallelize(
"""{"dummy":"test"}""" ::
"""[1,2,3]""" ::
"""":"test", "a":1}""" ::
"""42""" ::
""" ","ian":"test"}""" :: Nil)
""" ","ian":"test"}""" :: Nil))(Encoders.STRING)
def emptyRecords: RDD[String] =
spark.sparkContext.parallelize(
def emptyRecords: Dataset[String] =
spark.createDataset(spark.sparkContext.parallelize(
"""{""" ::
"""""" ::
"""{"a": {}}""" ::
"""{"a": {"b": {}}}""" ::
"""{"b": [{"c": {}}]}""" ::
"""]""" :: Nil)
"""]""" :: Nil))(Encoders.STRING)
def timestampAsLong: RDD[String] =
spark.sparkContext.parallelize(
"""{"ts":1451732645}""" :: Nil)
def timestampAsLong: Dataset[String] =
spark.createDataset(spark.sparkContext.parallelize(
"""{"ts":1451732645}""" :: Nil))(Encoders.STRING)
def arrayAndStructRecords: RDD[String] =
spark.sparkContext.parallelize(
def arrayAndStructRecords: Dataset[String] =
spark.createDataset(spark.sparkContext.parallelize(
"""{"a": {"b": 1}}""" ::
"""{"a": []}""" :: Nil)
"""{"a": []}""" :: Nil))(Encoders.STRING)
def floatingValueRecords: RDD[String] =
spark.sparkContext.parallelize(
s"""{"a": 0.${"0" * 38}1, "b": 0.01}""" :: Nil)
def floatingValueRecords: Dataset[String] =
spark.createDataset(spark.sparkContext.parallelize(
s"""{"a": 0.${"0" * 38}1, "b": 0.01}""" :: Nil))(Encoders.STRING)
def bigIntegerRecords: RDD[String] =
spark.sparkContext.parallelize(
s"""{"a": 1${"0" * 38}, "b": 92233720368547758070}""" :: Nil)
def bigIntegerRecords: Dataset[String] =
spark.createDataset(spark.sparkContext.parallelize(
s"""{"a": 1${"0" * 38}, "b": 92233720368547758070}""" :: Nil))(Encoders.STRING)
def datesRecords: RDD[String] =
spark.sparkContext.parallelize(
def datesRecords: Dataset[String] =
spark.createDataset(spark.sparkContext.parallelize(
"""{"date": "26/08/2015 18:00"}""" ::
"""{"date": "27/10/2014 18:30"}""" ::
"""{"date": "28/01/2016 20:00"}""" :: Nil)
"""{"date": "28/01/2016 20:00"}""" :: Nil))(Encoders.STRING)
lazy val singleRow: RDD[String] = spark.sparkContext.parallelize("""{"a":123}""" :: Nil)
lazy val singleRow: Dataset[String] =
spark.createDataset(spark.sparkContext.parallelize("""{"a":123}""" :: Nil))(Encoders.STRING)
def empty: RDD[String] = spark.sparkContext.parallelize(Seq[String]())
def empty: Dataset[String] = spark.emptyDataset(Encoders.STRING)
}

View file

@ -33,14 +33,15 @@ class CreateTableAsSelectSuite
extends DataSourceTest
with SharedSQLContext
with BeforeAndAfterEach {
import testImplicits._
protected override lazy val sql = spark.sql _
private var path: File = null
override def beforeAll(): Unit = {
super.beforeAll()
val rdd = sparkContext.parallelize((1 to 10).map(i => s"""{"a":$i, "b":"str${i}"}"""))
spark.read.json(rdd).createOrReplaceTempView("jt")
val ds = (1 to 10).map(i => s"""{"a":$i, "b":"str${i}"}""").toDS()
spark.read.json(ds).createOrReplaceTempView("jt")
}
override def afterAll(): Unit = {

View file

@ -24,14 +24,16 @@ import org.apache.spark.sql.test.SharedSQLContext
import org.apache.spark.util.Utils
class InsertSuite extends DataSourceTest with SharedSQLContext {
import testImplicits._
protected override lazy val sql = spark.sql _
private var path: File = null
override def beforeAll(): Unit = {
super.beforeAll()
path = Utils.createTempDir()
val rdd = sparkContext.parallelize((1 to 10).map(i => s"""{"a":$i, "b":"str$i"}"""))
spark.read.json(rdd).createOrReplaceTempView("jt")
val ds = (1 to 10).map(i => s"""{"a":$i, "b":"str$i"}""").toDS()
spark.read.json(ds).createOrReplaceTempView("jt")
sql(
s"""
|CREATE TEMPORARY VIEW jsonTable (a int, b string)
@ -129,7 +131,7 @@ class InsertSuite extends DataSourceTest with SharedSQLContext {
// Writing the table to less part files.
val rdd1 = sparkContext.parallelize((1 to 10).map(i => s"""{"a":$i, "b":"str$i"}"""), 5)
spark.read.json(rdd1).createOrReplaceTempView("jt1")
spark.read.json(rdd1.toDS()).createOrReplaceTempView("jt1")
sql(
s"""
|INSERT OVERWRITE TABLE jsonTable SELECT a, b FROM jt1
@ -141,7 +143,7 @@ class InsertSuite extends DataSourceTest with SharedSQLContext {
// Writing the table to more part files.
val rdd2 = sparkContext.parallelize((1 to 10).map(i => s"""{"a":$i, "b":"str$i"}"""), 10)
spark.read.json(rdd2).createOrReplaceTempView("jt2")
spark.read.json(rdd1.toDS()).createOrReplaceTempView("jt2")
sql(
s"""
|INSERT OVERWRITE TABLE jsonTable SELECT a, b FROM jt2

View file

@ -28,6 +28,8 @@ import org.apache.spark.sql.types._
import org.apache.spark.util.Utils
class SaveLoadSuite extends DataSourceTest with SharedSQLContext with BeforeAndAfter {
import testImplicits._
protected override lazy val sql = spark.sql _
private var originalDefaultSource: String = null
private var path: File = null
@ -40,8 +42,8 @@ class SaveLoadSuite extends DataSourceTest with SharedSQLContext with BeforeAndA
path = Utils.createTempDir()
path.delete()
val rdd = sparkContext.parallelize((1 to 10).map(i => s"""{"a":$i, "b":"str${i}"}"""))
df = spark.read.json(rdd)
val ds = (1 to 10).map(i => s"""{"a":$i, "b":"str${i}"}""").toDS()
df = spark.read.json(ds)
df.createOrReplaceTempView("jsonTable")
}

View file

@ -26,7 +26,6 @@ import org.junit.Assert;
import org.junit.Before;
import org.junit.Test;
import org.apache.spark.api.java.JavaSparkContext;
import org.apache.spark.sql.*;
import org.apache.spark.sql.expressions.Window;
import org.apache.spark.sql.expressions.UserDefinedAggregateFunction;
@ -35,7 +34,6 @@ import org.apache.spark.sql.hive.test.TestHive$;
import org.apache.spark.sql.hive.aggregate.MyDoubleSum;
public class JavaDataFrameSuite {
private transient JavaSparkContext sc;
private transient SQLContext hc;
Dataset<Row> df;
@ -50,13 +48,11 @@ public class JavaDataFrameSuite {
@Before
public void setUp() throws IOException {
hc = TestHive$.MODULE$;
sc = new JavaSparkContext(hc.sparkContext());
List<String> jsonObjects = new ArrayList<>(10);
for (int i = 0; i < 10; i++) {
jsonObjects.add("{\"key\":" + i + ", \"value\":\"str" + i + "\"}");
}
df = hc.read().json(sc.parallelize(jsonObjects));
df = hc.read().json(hc.createDataset(jsonObjects, Encoders.STRING()));
df.createOrReplaceTempView("window_table");
}

View file

@ -31,9 +31,9 @@ import org.junit.Assert;
import org.junit.Before;
import org.junit.Test;
import org.apache.spark.api.java.JavaRDD;
import org.apache.spark.api.java.JavaSparkContext;
import org.apache.spark.sql.Dataset;
import org.apache.spark.sql.Encoders;
import org.apache.spark.sql.QueryTest$;
import org.apache.spark.sql.Row;
import org.apache.spark.sql.SQLContext;
@ -81,8 +81,8 @@ public class JavaMetastoreDataSourcesSuite {
for (int i = 0; i < 10; i++) {
jsonObjects.add("{\"a\":" + i + ", \"b\":\"str" + i + "\"}");
}
JavaRDD<String> rdd = sc.parallelize(jsonObjects);
df = sqlContext.read().json(rdd);
Dataset<String> ds = sqlContext.createDataset(jsonObjects, Encoders.STRING());
df = sqlContext.read().json(ds);
df.createOrReplaceTempView("jsonTable");
}

View file

@ -511,9 +511,9 @@ class MetastoreDataSourcesSuite extends QueryTest with SQLTestUtils with TestHiv
test("create external table") {
withTempPath { tempPath =>
withTable("savedJsonTable", "createdJsonTable") {
val df = read.json(sparkContext.parallelize((1 to 10).map { i =>
val df = read.json((1 to 10).map { i =>
s"""{ "a": $i, "b": "str$i" }"""
}))
}.toDS())
withSQLConf(SQLConf.DEFAULT_DATA_SOURCE_NAME.key -> "not a source name") {
df.write

View file

@ -26,6 +26,7 @@ import org.apache.spark.sql.test.SQLTestUtils
* A set of tests that validates support for Hive Explain command.
*/
class HiveExplainSuite extends QueryTest with SQLTestUtils with TestHiveSingleton {
import testImplicits._
test("show cost in explain command") {
// Only has sizeInBytes before ANALYZE command
@ -92,8 +93,8 @@ class HiveExplainSuite extends QueryTest with SQLTestUtils with TestHiveSingleto
test("SPARK-17409: The EXPLAIN output of CTAS only shows the analyzed plan") {
withTempView("jt") {
val rdd = sparkContext.parallelize((1 to 10).map(i => s"""{"a":$i, "b":"str$i"}"""))
spark.read.json(rdd).createOrReplaceTempView("jt")
val ds = (1 to 10).map(i => s"""{"a":$i, "b":"str$i"}""").toDS()
spark.read.json(ds).createOrReplaceTempView("jt")
val outputs = sql(
s"""
|EXPLAIN EXTENDED

View file

@ -31,15 +31,15 @@ case class Data(a: Int, B: Int, n: Nested, nestedArray: Seq[Nested])
class HiveResolutionSuite extends HiveComparisonTest {
test("SPARK-3698: case insensitive test for nested data") {
read.json(sparkContext.makeRDD(
"""{"a": [{"a": {"a": 1}}]}""" :: Nil)).createOrReplaceTempView("nested")
read.json(Seq("""{"a": [{"a": {"a": 1}}]}""").toDS())
.createOrReplaceTempView("nested")
// This should be successfully analyzed
sql("SELECT a[0].A.A from nested").queryExecution.analyzed
}
test("SPARK-5278: check ambiguous reference to fields") {
read.json(sparkContext.makeRDD(
"""{"a": [{"b": 1, "B": 2}]}""" :: Nil)).createOrReplaceTempView("nested")
read.json(Seq("""{"a": [{"b": 1, "B": 2}]}""").toDS())
.createOrReplaceTempView("nested")
// there are 2 filed matching field name "b", we should report Ambiguous reference error
val exception = intercept[AnalysisException] {

View file

@ -973,30 +973,30 @@ class SQLQuerySuite extends QueryTest with SQLTestUtils with TestHiveSingleton {
}
test("SPARK-4296 Grouping field with Hive UDF as sub expression") {
val rdd = sparkContext.makeRDD( """{"a": "str", "b":"1", "c":"1970-01-01 00:00:00"}""" :: Nil)
read.json(rdd).createOrReplaceTempView("data")
val ds = Seq("""{"a": "str", "b":"1", "c":"1970-01-01 00:00:00"}""").toDS()
read.json(ds).createOrReplaceTempView("data")
checkAnswer(
sql("SELECT concat(a, '-', b), year(c) FROM data GROUP BY concat(a, '-', b), year(c)"),
Row("str-1", 1970))
dropTempTable("data")
read.json(rdd).createOrReplaceTempView("data")
read.json(ds).createOrReplaceTempView("data")
checkAnswer(sql("SELECT year(c) + 1 FROM data GROUP BY year(c) + 1"), Row(1971))
dropTempTable("data")
}
test("resolve udtf in projection #1") {
val rdd = sparkContext.makeRDD((1 to 5).map(i => s"""{"a":[$i, ${i + 1}]}"""))
read.json(rdd).createOrReplaceTempView("data")
val ds = (1 to 5).map(i => s"""{"a":[$i, ${i + 1}]}""").toDS()
read.json(ds).createOrReplaceTempView("data")
val df = sql("SELECT explode(a) AS val FROM data")
val col = df("val")
}
test("resolve udtf in projection #2") {
val rdd = sparkContext.makeRDD((1 to 2).map(i => s"""{"a":[$i, ${i + 1}]}"""))
read.json(rdd).createOrReplaceTempView("data")
val ds = (1 to 2).map(i => s"""{"a":[$i, ${i + 1}]}""").toDS()
read.json(ds).createOrReplaceTempView("data")
checkAnswer(sql("SELECT explode(map(1, 1)) FROM data LIMIT 1"), Row(1, 1) :: Nil)
checkAnswer(sql("SELECT explode(map(1, 1)) as (k1, k2) FROM data LIMIT 1"), Row(1, 1) :: Nil)
intercept[AnalysisException] {
@ -1010,8 +1010,8 @@ class SQLQuerySuite extends QueryTest with SQLTestUtils with TestHiveSingleton {
// TGF with non-TGF in project is allowed in Spark SQL, but not in Hive
test("TGF with non-TGF in projection") {
val rdd = sparkContext.makeRDD( """{"a": "1", "b":"1"}""" :: Nil)
read.json(rdd).createOrReplaceTempView("data")
val ds = Seq("""{"a": "1", "b":"1"}""").toDS()
read.json(ds).createOrReplaceTempView("data")
checkAnswer(
sql("SELECT explode(map(a, b)) as (k1, k2), a, b FROM data"),
Row("1", "1", "1", "1") :: Nil)
@ -1024,8 +1024,8 @@ class SQLQuerySuite extends QueryTest with SQLTestUtils with TestHiveSingleton {
// is not in a valid state (cannot be executed). Because of this bug, the analysis rule of
// PreInsertionCasts will actually start to work before ImplicitGenerate and then
// generates an invalid query plan.
val rdd = sparkContext.makeRDD((1 to 5).map(i => s"""{"a":[$i, ${i + 1}]}"""))
read.json(rdd).createOrReplaceTempView("data")
val ds = (1 to 5).map(i => s"""{"a":[$i, ${i + 1}]}""").toDS()
read.json(ds).createOrReplaceTempView("data")
withSQLConf(SQLConf.CONVERT_CTAS.key -> "false") {
sql("CREATE TABLE explodeTest (key bigInt)")
@ -1262,9 +1262,8 @@ class SQLQuerySuite extends QueryTest with SQLTestUtils with TestHiveSingleton {
}
test("SPARK-9371: fix the support for special chars in column names for hive context") {
read.json(sparkContext.makeRDD(
"""{"a": {"c.b": 1}, "b.$q": [{"a@!.q": 1}], "q.w": {"w.i&": [1]}}""" :: Nil))
.createOrReplaceTempView("t")
val ds = Seq("""{"a": {"c.b": 1}, "b.$q": [{"a@!.q": 1}], "q.w": {"w.i&": [1]}}""").toDS()
read.json(ds).createOrReplaceTempView("t")
checkAnswer(sql("SELECT a.`c.b`, `b.$q`[0].`a@!.q`, `q.w`.`w.i&`[0] FROM t"), Row(1, 1, 1))
}