[SPARK-7684] [SQL] Refactoring MetastoreDataSourcesSuite to workaround SPARK-7684

As stated in SPARK-7684, currently `TestHive.reset` has some execution order specific bug, which makes running specific test suites locally pretty frustrating. This PR refactors `MetastoreDataSourcesSuite` (which relies on `TestHive.reset` heavily) using various `withXxx` utility methods in `SQLTestUtils` to ask each test case to cleanup their own mess so that we can avoid calling `TestHive.reset`.

Author: Cheng Lian <lian@databricks.com>
Author: Yin Huai <yhuai@databricks.com>

Closes #6353 from liancheng/workaround-spark-7684 and squashes the following commits:

26939aa [Yin Huai] Move the initialization of jsonFilePath to beforeAll.
a423d48 [Cheng Lian] Fixes Scala style issue
dfe45d0 [Cheng Lian] Refactors MetastoreDataSourcesSuite to workaround SPARK-7684
92a116d [Cheng Lian] Fixes minor styling issues

(cherry picked from commit b97ddff000)
Signed-off-by: Yin Huai <yhuai@databricks.com>
This commit is contained in:
Cheng Lian 2015-05-27 13:09:33 -07:00 committed by Yin Huai
parent d33142fd8c
commit 89fe93fc3b
3 changed files with 658 additions and 602 deletions

View file

@ -67,6 +67,10 @@ class QueryTest extends PlanTest {
checkAnswer(df, Seq(expectedAnswer))
}
protected def checkAnswer(df: DataFrame, expectedAnswer: DataFrame): Unit = {
checkAnswer(df, expectedAnswer.collect())
}
def sqlTest(sqlString: String, expectedAnswer: Seq[Row])(implicit sqlContext: SQLContext) {
test(sqlString) {
checkAnswer(sqlContext.sql(sqlString), expectedAnswer)

View file

@ -75,14 +75,18 @@ trait SQLTestUtils {
/**
* Drops temporary table `tableName` after calling `f`.
*/
protected def withTempTable(tableName: String)(f: => Unit): Unit = {
try f finally sqlContext.dropTempTable(tableName)
protected def withTempTable(tableNames: String*)(f: => Unit): Unit = {
try f finally tableNames.foreach(sqlContext.dropTempTable)
}
/**
* Drops table `tableName` after calling `f`.
*/
protected def withTable(tableName: String)(f: => Unit): Unit = {
try f finally sqlContext.sql(s"DROP TABLE IF EXISTS $tableName")
protected def withTable(tableNames: String*)(f: => Unit): Unit = {
try f finally {
tableNames.foreach { name =>
sqlContext.sql(s"DROP TABLE IF EXISTS $name")
}
}
}
}

View file

@ -21,124 +21,134 @@ import java.io.File
import scala.collection.mutable.ArrayBuffer
import org.scalatest.BeforeAndAfterAll
import org.apache.hadoop.fs.Path
import org.apache.hadoop.mapred.InvalidInputException
import org.scalatest.BeforeAndAfterEach
import org.apache.spark.sql._
import org.apache.spark.sql.hive.client.{HiveTable, ManagedTable}
import org.apache.spark.sql.hive.test.TestHive
import org.apache.spark.sql.hive.test.TestHive._
import org.apache.spark.sql.hive.test.TestHive.implicits._
import org.apache.spark.sql.parquet.ParquetRelation2
import org.apache.spark.sql.sources.LogicalRelation
import org.apache.spark.sql.test.SQLTestUtils
import org.apache.spark.sql.types._
import org.apache.spark.util.Utils
/**
* Tests for persisting tables created though the data sources API into the metastore.
*/
class MetastoreDataSourcesSuite extends QueryTest with BeforeAndAfterEach {
class MetastoreDataSourcesSuite extends QueryTest with SQLTestUtils with BeforeAndAfterAll {
override val sqlContext = TestHive
override def afterEach(): Unit = {
reset()
Utils.deleteRecursively(tempPath)
var jsonFilePath: String = _
override def beforeAll(): Unit = {
jsonFilePath = Utils.getSparkClassLoader.getResource("sample.json").getFile
}
val filePath = Utils.getSparkClassLoader.getResource("sample.json").getFile
var tempPath: File = Utils.createTempDir()
tempPath.delete()
test ("persistent JSON table") {
test("persistent JSON table") {
withTable("jsonTable") {
sql(
s"""
|CREATE TABLE jsonTable
s"""CREATE TABLE jsonTable
|USING org.apache.spark.sql.json.DefaultSource
|OPTIONS (
| path '${filePath}'
| path '$jsonFilePath'
|)
""".stripMargin)
checkAnswer(
sql("SELECT * FROM jsonTable"),
read.json(filePath).collect().toSeq)
read.json(jsonFilePath).collect().toSeq)
}
}
test ("persistent JSON table with a user specified schema") {
test("persistent JSON table with a user specified schema") {
withTable("jsonTable") {
sql(
s"""
|CREATE TABLE jsonTable (
s"""CREATE TABLE jsonTable (
|a string,
|b String,
|`c_!@(3)` int,
|`<d>` Struct<`d!`:array<int>, `=`:array<struct<Dd2: boolean>>>)
|USING org.apache.spark.sql.json.DefaultSource
|OPTIONS (
| path '${filePath}'
| path '$jsonFilePath'
|)
""".stripMargin)
read.json(filePath).registerTempTable("expectedJsonTable")
withTempTable("expectedJsonTable") {
read.json(jsonFilePath).registerTempTable("expectedJsonTable")
checkAnswer(
sql("SELECT a, b, `c_!@(3)`, `<d>`.`d!`, `<d>`.`=` FROM jsonTable"),
sql("SELECT a, b, `c_!@(3)`, `<d>`.`d!`, `<d>`.`=` FROM expectedJsonTable").collect().toSeq)
sql("SELECT a, b, `c_!@(3)`, `<d>`.`d!`, `<d>`.`=` FROM expectedJsonTable"))
}
}
}
test ("persistent JSON table with a user specified schema with a subset of fields") {
test("persistent JSON table with a user specified schema with a subset of fields") {
withTable("jsonTable") {
// This works because JSON objects are self-describing and JSONRelation can get needed
// field values based on field names.
sql(
s"""
|CREATE TABLE jsonTable (`<d>` Struct<`=`:array<struct<Dd2: boolean>>>, b String)
s"""CREATE TABLE jsonTable (`<d>` Struct<`=`:array<struct<Dd2: boolean>>>, b String)
|USING org.apache.spark.sql.json.DefaultSource
|OPTIONS (
| path '${filePath}'
| path '$jsonFilePath'
|)
""".stripMargin)
val innerStruct = StructType(
StructField("=", ArrayType(StructType(StructField("Dd2", BooleanType, true) :: Nil))) :: Nil)
val expectedSchema = StructType(
StructField("<d>", innerStruct, true) ::
StructField("b", StringType, true) :: Nil)
val innerStruct = StructType(Seq(
StructField("=", ArrayType(StructType(StructField("Dd2", BooleanType, true) :: Nil)))))
val expectedSchema = StructType(Seq(
StructField("<d>", innerStruct, true),
StructField("b", StringType, true)))
assert(expectedSchema === table("jsonTable").schema)
read.json(filePath).registerTempTable("expectedJsonTable")
withTempTable("expectedJsonTable") {
read.json(jsonFilePath).registerTempTable("expectedJsonTable")
checkAnswer(
sql("SELECT b, `<d>`.`=` FROM jsonTable"),
sql("SELECT b, `<d>`.`=` FROM expectedJsonTable").collect().toSeq)
sql("SELECT b, `<d>`.`=` FROM expectedJsonTable"))
}
}
}
test("resolve shortened provider names") {
withTable("jsonTable") {
sql(
s"""
|CREATE TABLE jsonTable
|USING org.apache.spark.sql.json
|OPTIONS (
| path '${filePath}'
| path '$jsonFilePath'
|)
""".stripMargin)
checkAnswer(
sql("SELECT * FROM jsonTable"),
read.json(filePath).collect().toSeq)
read.json(jsonFilePath).collect().toSeq)
}
}
test("drop table") {
withTable("jsonTable") {
sql(
s"""
|CREATE TABLE jsonTable
|USING org.apache.spark.sql.json
|OPTIONS (
| path '${filePath}'
| path '$jsonFilePath'
|)
""".stripMargin)
checkAnswer(
sql("SELECT * FROM jsonTable"),
read.json(filePath).collect().toSeq)
read.json(jsonFilePath))
sql("DROP TABLE jsonTable")
@ -147,20 +157,19 @@ class MetastoreDataSourcesSuite extends QueryTest with BeforeAndAfterEach {
}
assert(
(new File(filePath)).exists(),
new File(jsonFilePath).exists(),
"The table with specified path is considered as an external table, " +
"its data should not deleted after DROP TABLE.")
}
}
test("check change without refresh") {
val tempDir = File.createTempFile("sparksql", "json", Utils.createTempDir())
tempDir.delete()
sparkContext.parallelize(("a", "b") :: Nil).toDF()
.toJSON.saveAsTextFile(tempDir.getCanonicalPath)
withTempPath { tempDir =>
withTable("jsonTable") {
(("a", "b") :: Nil).toDF().toJSON.saveAsTextFile(tempDir.getCanonicalPath)
sql(
s"""
|CREATE TABLE jsonTable
s"""CREATE TABLE jsonTable
|USING org.apache.spark.sql.json
|OPTIONS (
| path '${tempDir.getCanonicalPath}'
@ -172,8 +181,7 @@ class MetastoreDataSourcesSuite extends QueryTest with BeforeAndAfterEach {
Row("a", "b"))
Utils.deleteRecursively(tempDir)
sparkContext.parallelize(("a1", "b1", "c1") :: Nil).toDF()
.toJSON.saveAsTextFile(tempDir.getCanonicalPath)
(("a1", "b1", "c1") :: Nil).toDF().toJSON.saveAsTextFile(tempDir.getCanonicalPath)
// Schema is cached so the new column does not show. The updated values in existing columns
// will show.
@ -187,18 +195,17 @@ class MetastoreDataSourcesSuite extends QueryTest with BeforeAndAfterEach {
checkAnswer(
sql("SELECT * FROM jsonTable"),
Row("a1", "b1", "c1"))
Utils.deleteRecursively(tempDir)
}
}
}
test("drop, change, recreate") {
val tempDir = File.createTempFile("sparksql", "json", Utils.createTempDir())
tempDir.delete()
sparkContext.parallelize(("a", "b") :: Nil).toDF()
.toJSON.saveAsTextFile(tempDir.getCanonicalPath)
withTempPath { tempDir =>
(("a", "b") :: Nil).toDF().toJSON.saveAsTextFile(tempDir.getCanonicalPath)
withTable("jsonTable") {
sql(
s"""
|CREATE TABLE jsonTable
s"""CREATE TABLE jsonTable
|USING org.apache.spark.sql.json
|OPTIONS (
| path '${tempDir.getCanonicalPath}'
@ -210,14 +217,12 @@ class MetastoreDataSourcesSuite extends QueryTest with BeforeAndAfterEach {
Row("a", "b"))
Utils.deleteRecursively(tempDir)
sparkContext.parallelize(("a", "b", "c") :: Nil).toDF()
.toJSON.saveAsTextFile(tempDir.getCanonicalPath)
(("a", "b", "c") :: Nil).toDF().toJSON.saveAsTextFile(tempDir.getCanonicalPath)
sql("DROP TABLE jsonTable")
sql(
s"""
|CREATE TABLE jsonTable
s"""CREATE TABLE jsonTable
|USING org.apache.spark.sql.json
|OPTIONS (
| path '${tempDir.getCanonicalPath}'
@ -228,20 +233,22 @@ class MetastoreDataSourcesSuite extends QueryTest with BeforeAndAfterEach {
checkAnswer(
sql("SELECT * FROM jsonTable"),
Row("a", "b", "c"))
Utils.deleteRecursively(tempDir)
}
}
}
test("invalidate cache and reload") {
withTable("jsonTable") {
sql(
s"""
|CREATE TABLE jsonTable (`c_!@(3)` int)
s"""CREATE TABLE jsonTable (`c_!@(3)` int)
|USING org.apache.spark.sql.json.DefaultSource
|OPTIONS (
| path '${filePath}'
| path '$jsonFilePath'
|)
""".stripMargin)
read.json(filePath).registerTempTable("expectedJsonTable")
withTempTable("expectedJsonTable") {
read.json(jsonFilePath).registerTempTable("expectedJsonTable")
checkAnswer(
sql("SELECT * FROM jsonTable"),
@ -259,23 +266,25 @@ class MetastoreDataSourcesSuite extends QueryTest with BeforeAndAfterEach {
assert(expectedSchema === table("jsonTable").schema)
}
}
}
test("CTAS") {
withTempPath { tempPath =>
withTable("jsonTable", "ctasJsonTable") {
sql(
s"""
|CREATE TABLE jsonTable
s"""CREATE TABLE jsonTable
|USING org.apache.spark.sql.json.DefaultSource
|OPTIONS (
| path '${filePath}'
| path '$jsonFilePath'
|)
""".stripMargin)
sql(
s"""
|CREATE TABLE ctasJsonTable
s"""CREATE TABLE ctasJsonTable
|USING org.apache.spark.sql.json.DefaultSource
|OPTIONS (
| path '${tempPath}'
| path '$tempPath'
|) AS
|SELECT * FROM jsonTable
""".stripMargin)
@ -286,23 +295,27 @@ class MetastoreDataSourcesSuite extends QueryTest with BeforeAndAfterEach {
sql("SELECT * FROM ctasJsonTable"),
sql("SELECT * FROM jsonTable").collect())
}
}
}
test("CTAS with IF NOT EXISTS") {
withTempPath { path =>
val tempPath = path.getCanonicalPath
withTable("jsonTable", "ctasJsonTable") {
sql(
s"""
|CREATE TABLE jsonTable
s"""CREATE TABLE jsonTable
|USING org.apache.spark.sql.json.DefaultSource
|OPTIONS (
| path '${filePath}'
| path '$jsonFilePath'
|)
""".stripMargin)
sql(
s"""
|CREATE TABLE ctasJsonTable
s"""CREATE TABLE ctasJsonTable
|USING org.apache.spark.sql.json.DefaultSource
|OPTIONS (
| path '${tempPath}'
| path '$tempPath'
|) AS
|SELECT * FROM jsonTable
""".stripMargin)
@ -310,27 +323,27 @@ class MetastoreDataSourcesSuite extends QueryTest with BeforeAndAfterEach {
// Create the table again should trigger a AnalysisException.
val message = intercept[AnalysisException] {
sql(
s"""
|CREATE TABLE ctasJsonTable
s"""CREATE TABLE ctasJsonTable
|USING org.apache.spark.sql.json.DefaultSource
|OPTIONS (
| path '${tempPath}'
| path '$tempPath'
|) AS
|SELECT * FROM jsonTable
""".stripMargin)
}.getMessage
assert(message.contains("Table ctasJsonTable already exists."),
assert(
message.contains("Table ctasJsonTable already exists."),
"We should complain that ctasJsonTable already exists")
// The following statement should be fine if it has IF NOT EXISTS.
// It tries to create a table ctasJsonTable with a new schema.
// The actual table's schema and data should not be changed.
sql(
s"""
|CREATE TABLE IF NOT EXISTS ctasJsonTable
s"""CREATE TABLE IF NOT EXISTS ctasJsonTable
|USING org.apache.spark.sql.json.DefaultSource
|OPTIONS (
| path '${tempPath}'
| path '$tempPath'
|) AS
|SELECT a FROM jsonTable
""".stripMargin)
@ -345,14 +358,16 @@ class MetastoreDataSourcesSuite extends QueryTest with BeforeAndAfterEach {
sql("SELECT * FROM ctasJsonTable"),
sql("SELECT * FROM jsonTable").collect())
}
}
}
test("CTAS a managed table") {
withTable("jsonTable", "ctasJsonTable", "loadedTable") {
sql(
s"""
|CREATE TABLE jsonTable
s"""CREATE TABLE jsonTable
|USING org.apache.spark.sql.json.DefaultSource
|OPTIONS (
| path '${filePath}'
| path '$jsonFilePath'
|)
""".stripMargin)
@ -363,8 +378,7 @@ class MetastoreDataSourcesSuite extends QueryTest with BeforeAndAfterEach {
// It is a managed table when we do not specify the location.
sql(
s"""
|CREATE TABLE ctasJsonTable
s"""CREATE TABLE ctasJsonTable
|USING org.apache.spark.sql.json.DefaultSource
|AS
|SELECT * FROM jsonTable
@ -373,11 +387,10 @@ class MetastoreDataSourcesSuite extends QueryTest with BeforeAndAfterEach {
assert(fs.exists(filesystemPath), s"$expectedPath should exist after we create the table.")
sql(
s"""
|CREATE TABLE loadedTable
s"""CREATE TABLE loadedTable
|USING org.apache.spark.sql.json.DefaultSource
|OPTIONS (
| path '${expectedPath}'
| path '$expectedPath'
|)
""".stripMargin)
@ -385,17 +398,17 @@ class MetastoreDataSourcesSuite extends QueryTest with BeforeAndAfterEach {
checkAnswer(
sql("SELECT * FROM ctasJsonTable"),
sql("SELECT * FROM loadedTable").collect()
)
sql("SELECT * FROM loadedTable"))
sql("DROP TABLE ctasJsonTable")
assert(!fs.exists(filesystemPath), s"$expectedPath should not exist after we drop the table.")
}
}
test("SPARK-5286 Fail to drop an invalid table when using the data source API") {
withTable("jsonTable") {
sql(
s"""
|CREATE TABLE jsonTable
s"""CREATE TABLE jsonTable
|USING org.apache.spark.sql.json.DefaultSource
|OPTIONS (
| path 'it is not a path at all!'
@ -404,54 +417,50 @@ class MetastoreDataSourcesSuite extends QueryTest with BeforeAndAfterEach {
sql("DROP TABLE jsonTable").collect().foreach(println)
}
}
test("SPARK-5839 HiveMetastoreCatalog does not recognize table aliases of data source tables.") {
val originalDefaultSource = conf.defaultDataSourceName
val rdd = sparkContext.parallelize((1 to 10).map(i => s"""{"a":$i, "b":"str${i}"}"""))
val df = read.json(rdd)
conf.setConf(SQLConf.DEFAULT_DATA_SOURCE_NAME, "org.apache.spark.sql.json")
// Save the df as a managed table (by not specifiying the path).
df.write.saveAsTable("savedJsonTable")
withTable("savedJsonTable") {
// Save the df as a managed table (by not specifying the path).
(1 to 10)
.map(i => i -> s"str$i")
.toDF("a", "b")
.write
.format("json")
.saveAsTable("savedJsonTable")
checkAnswer(
sql("SELECT * FROM savedJsonTable where savedJsonTable.a < 5"),
(1 to 4).map(i => Row(i, s"str${i}")))
(1 to 4).map(i => Row(i, s"str$i")))
checkAnswer(
sql("SELECT * FROM savedJsonTable tmp where tmp.a > 5"),
(6 to 10).map(i => Row(i, s"str${i}")))
(6 to 10).map(i => Row(i, s"str$i")))
invalidateTable("savedJsonTable")
checkAnswer(
sql("SELECT * FROM savedJsonTable where savedJsonTable.a < 5"),
(1 to 4).map(i => Row(i, s"str${i}")))
(1 to 4).map(i => Row(i, s"str$i")))
checkAnswer(
sql("SELECT * FROM savedJsonTable tmp where tmp.a > 5"),
(6 to 10).map(i => Row(i, s"str${i}")))
// Drop table will also delete the data.
sql("DROP TABLE savedJsonTable")
conf.setConf(SQLConf.DEFAULT_DATA_SOURCE_NAME, originalDefaultSource)
(6 to 10).map(i => Row(i, s"str$i")))
}
}
test("save table") {
val originalDefaultSource = conf.defaultDataSourceName
withTempPath { path =>
val tempPath = path.getCanonicalPath
val rdd = sparkContext.parallelize((1 to 10).map(i => s"""{"a":$i, "b":"str${i}"}"""))
val df = read.json(rdd)
withTable("savedJsonTable") {
val df = (1 to 10).map(i => i -> s"str$i").toDF("a", "b")
conf.setConf(SQLConf.DEFAULT_DATA_SOURCE_NAME, "org.apache.spark.sql.json")
// Save the df as a managed table (by not specifiying the path).
withSQLConf(SQLConf.DEFAULT_DATA_SOURCE_NAME -> "json") {
// Save the df as a managed table (by not specifying the path).
df.write.saveAsTable("savedJsonTable")
checkAnswer(
sql("SELECT * FROM savedJsonTable"),
df.collect())
checkAnswer(sql("SELECT * FROM savedJsonTable"), df)
// Right now, we cannot append to an existing JSON table.
intercept[RuntimeException] {
@ -460,159 +469,174 @@ class MetastoreDataSourcesSuite extends QueryTest with BeforeAndAfterEach {
// We can overwrite it.
df.write.mode(SaveMode.Overwrite).saveAsTable("savedJsonTable")
checkAnswer(
sql("SELECT * FROM savedJsonTable"),
df.collect())
checkAnswer(sql("SELECT * FROM savedJsonTable"), df)
// When the save mode is Ignore, we will do nothing when the table already exists.
df.select("b").write.mode(SaveMode.Ignore).saveAsTable("savedJsonTable")
assert(df.schema === table("savedJsonTable").schema)
checkAnswer(
sql("SELECT * FROM savedJsonTable"),
df.collect())
checkAnswer(sql("SELECT * FROM savedJsonTable"), df)
// Drop table will also delete the data.
sql("DROP TABLE savedJsonTable")
intercept[InvalidInputException] {
read.json(catalog.hiveDefaultTableFilePath("savedJsonTable"))
}
}
// Create an external table by specifying the path.
conf.setConf(SQLConf.DEFAULT_DATA_SOURCE_NAME, "not a source name")
withSQLConf(SQLConf.DEFAULT_DATA_SOURCE_NAME -> "not a source name") {
df.write
.format("org.apache.spark.sql.json")
.mode(SaveMode.Append)
.option("path", tempPath.toString)
.saveAsTable("savedJsonTable")
checkAnswer(
sql("SELECT * FROM savedJsonTable"),
df.collect())
checkAnswer(sql("SELECT * FROM savedJsonTable"), df)
}
// Data should not be deleted after we drop the table.
sql("DROP TABLE savedJsonTable")
checkAnswer(
read.json(tempPath.toString),
df.collect())
conf.setConf(SQLConf.DEFAULT_DATA_SOURCE_NAME, originalDefaultSource)
checkAnswer(read.json(tempPath.toString), df)
}
}
}
test("create external table") {
val originalDefaultSource = conf.defaultDataSourceName
withTempPath { tempPath =>
withTable("savedJsonTable", "createdJsonTable") {
val df = read.json(sparkContext.parallelize((1 to 10).map { i =>
s"""{ "a": $i, "b": "str$i" }"""
}))
val rdd = sparkContext.parallelize((1 to 10).map(i => s"""{"a":$i, "b":"str${i}"}"""))
val df = read.json(rdd)
conf.setConf(SQLConf.DEFAULT_DATA_SOURCE_NAME, "not a source name")
df.write.format("org.apache.spark.sql.json")
withSQLConf(SQLConf.DEFAULT_DATA_SOURCE_NAME -> "not a source name") {
df.write
.format("json")
.mode(SaveMode.Append)
.option("path", tempPath.toString)
.saveAsTable("savedJsonTable")
}
conf.setConf(SQLConf.DEFAULT_DATA_SOURCE_NAME, "org.apache.spark.sql.json")
withSQLConf(SQLConf.DEFAULT_DATA_SOURCE_NAME -> "json") {
createExternalTable("createdJsonTable", tempPath.toString)
assert(table("createdJsonTable").schema === df.schema)
checkAnswer(
sql("SELECT * FROM createdJsonTable"),
df.collect())
checkAnswer(sql("SELECT * FROM createdJsonTable"), df)
var message = intercept[AnalysisException] {
createExternalTable("createdJsonTable", filePath.toString)
}.getMessage
assert(message.contains("Table createdJsonTable already exists."),
"We should complain that ctasJsonTable already exists")
assert(
intercept[AnalysisException] {
createExternalTable("createdJsonTable", jsonFilePath.toString)
}.getMessage.contains("Table createdJsonTable already exists."),
"We should complain that createdJsonTable already exists")
}
// Data should not be deleted.
sql("DROP TABLE createdJsonTable")
checkAnswer(
read.json(tempPath.toString),
df.collect())
checkAnswer(read.json(tempPath.toString), df)
// Try to specify the schema.
conf.setConf(SQLConf.DEFAULT_DATA_SOURCE_NAME, "not a source name")
withSQLConf(SQLConf.DEFAULT_DATA_SOURCE_NAME -> "not a source name") {
val schema = StructType(StructField("b", StringType, true) :: Nil)
createExternalTable(
"createdJsonTable",
"org.apache.spark.sql.json",
schema,
Map("path" -> tempPath.toString))
checkAnswer(
sql("SELECT * FROM createdJsonTable"),
sql("SELECT b FROM savedJsonTable").collect())
sql("SELECT b FROM savedJsonTable"))
sql("DROP TABLE createdJsonTable")
message = intercept[RuntimeException] {
assert(
intercept[RuntimeException] {
createExternalTable(
"createdJsonTable",
"org.apache.spark.sql.json",
schema,
Map.empty[String, String])
}.getMessage
assert(
message.contains("'path' must be specified for json data."),
}.getMessage.contains("'path' must be specified for json data."),
"We should complain that path is not specified.")
sql("DROP TABLE savedJsonTable")
conf.setConf(SQLConf.DEFAULT_DATA_SOURCE_NAME, originalDefaultSource)
}
}
}
}
if (HiveShim.version == "0.13.1") {
test("scan a parquet table created through a CTAS statement") {
val originalConvertMetastore = getConf("spark.sql.hive.convertMetastoreParquet", "true")
val originalUseDataSource = getConf(SQLConf.PARQUET_USE_DATA_SOURCE_API, "true")
setConf("spark.sql.hive.convertMetastoreParquet", "true")
setConf(SQLConf.PARQUET_USE_DATA_SOURCE_API, "true")
withSQLConf(
"spark.sql.hive.convertMetastoreParquet" -> "true",
SQLConf.PARQUET_USE_DATA_SOURCE_API -> "true") {
val rdd = sparkContext.parallelize((1 to 10).map(i => s"""{"a":$i, "b":"str${i}"}"""))
read.json(rdd).registerTempTable("jt")
withTempTable("jt") {
(1 to 10).map(i => i -> s"str$i").toDF("a", "b").registerTempTable("jt")
withTable("test_parquet_ctas") {
sql(
"""
|create table test_parquet_ctas STORED AS parquET
|AS select tmp.a from jt tmp where tmp.a < 5
"""CREATE TABLE test_parquet_ctas STORED AS PARQUET
|AS SELECT tmp.a FROM jt tmp WHERE tmp.a < 5
""".stripMargin)
checkAnswer(
sql(s"SELECT a FROM test_parquet_ctas WHERE a > 2 "),
Row(3) :: Row(4) :: Nil
)
Row(3) :: Row(4) :: Nil)
table("test_parquet_ctas").queryExecution.optimizedPlan match {
case LogicalRelation(p: ParquetRelation2) => // OK
case _ =>
fail(
"test_parquet_ctas should be converted to " +
s"${classOf[ParquetRelation2].getCanonicalName}")
fail(s"test_parquet_ctas should have be converted to ${classOf[ParquetRelation2]}")
}
}
}
}
// Clenup and reset confs.
sql("DROP TABLE IF EXISTS jt")
sql("DROP TABLE IF EXISTS test_parquet_ctas")
setConf("spark.sql.hive.convertMetastoreParquet", originalConvertMetastore)
setConf(SQLConf.PARQUET_USE_DATA_SOURCE_API, originalUseDataSource)
}
}
test("Pre insert nullability check (ArrayType)") {
val df1 =
createDataFrame(Tuple1(Seq(Int.box(1), null.asInstanceOf[Integer])) :: Nil).toDF("a")
val expectedSchema1 =
withTable("arrayInParquet") {
{
val df = (Tuple1(Seq(Int.box(1), null: Integer)) :: Nil).toDF("a")
val expectedSchema =
StructType(
StructField("a", ArrayType(IntegerType, containsNull = true), nullable = true) :: Nil)
assert(df1.schema === expectedSchema1)
df1.write.mode(SaveMode.Overwrite).format("parquet").saveAsTable("arrayInParquet")
StructField(
"a",
ArrayType(IntegerType, containsNull = true),
nullable = true) :: Nil)
val df2 =
createDataFrame(Tuple1(Seq(2, 3)) :: Nil).toDF("a")
val expectedSchema2 =
assert(df.schema === expectedSchema)
df.write
.format("parquet")
.mode(SaveMode.Overwrite)
.saveAsTable("arrayInParquet")
}
{
val df = (Tuple1(Seq(2, 3)) :: Nil).toDF("a")
val expectedSchema =
StructType(
StructField("a", ArrayType(IntegerType, containsNull = false), nullable = true) :: Nil)
assert(df2.schema === expectedSchema2)
df2.write.mode(SaveMode.Append).insertInto("arrayInParquet")
createDataFrame(Tuple1(Seq(4, 5)) :: Nil).toDF("a").write.mode(SaveMode.Append)
StructField(
"a",
ArrayType(IntegerType, containsNull = false),
nullable = true) :: Nil)
assert(df.schema === expectedSchema)
df.write
.format("parquet")
.mode(SaveMode.Append)
.insertInto("arrayInParquet")
}
(Tuple1(Seq(4, 5)) :: Nil).toDF("a")
.write
.mode(SaveMode.Append)
.saveAsTable("arrayInParquet") // This one internally calls df2.insertInto.
createDataFrame(Tuple1(Seq(Int.box(6), null.asInstanceOf[Integer])) :: Nil).toDF("a").write
.mode(SaveMode.Append).saveAsTable("arrayInParquet")
(Tuple1(Seq(Int.box(6), null: Integer)) :: Nil).toDF("a")
.write
.mode(SaveMode.Append)
.saveAsTable("arrayInParquet")
refreshTable("arrayInParquet")
checkAnswer(
@ -621,32 +645,57 @@ class MetastoreDataSourcesSuite extends QueryTest with BeforeAndAfterEach {
Row(ArrayBuffer(2, 3)) ::
Row(ArrayBuffer(4, 5)) ::
Row(ArrayBuffer(6, null)) :: Nil)
sql("DROP TABLE arrayInParquet")
}
}
test("Pre insert nullability check (MapType)") {
val df1 =
createDataFrame(Tuple1(Map(1 -> null.asInstanceOf[Integer])) :: Nil).toDF("a")
val mapType1 = MapType(IntegerType, IntegerType, valueContainsNull = true)
val expectedSchema1 =
withTable("mapInParquet") {
{
val df = (Tuple1(Map(1 -> (null: Integer))) :: Nil).toDF("a")
val expectedSchema =
StructType(
StructField("a", mapType1, nullable = true) :: Nil)
assert(df1.schema === expectedSchema1)
df1.write.mode(SaveMode.Overwrite).format("parquet").saveAsTable("mapInParquet")
StructField(
"a",
MapType(IntegerType, IntegerType, valueContainsNull = true),
nullable = true) :: Nil)
val df2 =
createDataFrame(Tuple1(Map(2 -> 3)) :: Nil).toDF("a")
val mapType2 = MapType(IntegerType, IntegerType, valueContainsNull = false)
val expectedSchema2 =
assert(df.schema === expectedSchema)
df.write
.format("parquet")
.mode(SaveMode.Overwrite)
.saveAsTable("mapInParquet")
}
{
val df = (Tuple1(Map(2 -> 3)) :: Nil).toDF("a")
val expectedSchema =
StructType(
StructField("a", mapType2, nullable = true) :: Nil)
assert(df2.schema === expectedSchema2)
df2.write.mode(SaveMode.Append).insertInto("mapInParquet")
createDataFrame(Tuple1(Map(4 -> 5)) :: Nil).toDF("a").write.mode(SaveMode.Append)
StructField(
"a",
MapType(IntegerType, IntegerType, valueContainsNull = false),
nullable = true) :: Nil)
assert(df.schema === expectedSchema)
df.write
.format("parquet")
.mode(SaveMode.Append)
.insertInto("mapInParquet")
}
(Tuple1(Map(4 -> 5)) :: Nil).toDF("a")
.write
.format("parquet")
.mode(SaveMode.Append)
.saveAsTable("mapInParquet") // This one internally calls df2.insertInto.
createDataFrame(Tuple1(Map(6 -> null.asInstanceOf[Integer])) :: Nil).toDF("a").write
.format("parquet").mode(SaveMode.Append).saveAsTable("mapInParquet")
(Tuple1(Map(6 -> null.asInstanceOf[Integer])) :: Nil).toDF("a")
.write
.format("parquet")
.mode(SaveMode.Append)
.saveAsTable("mapInParquet")
refreshTable("mapInParquet")
checkAnswer(
@ -655,17 +704,15 @@ class MetastoreDataSourcesSuite extends QueryTest with BeforeAndAfterEach {
Row(Map(2 -> 3)) ::
Row(Map(4 -> 5)) ::
Row(Map(6 -> null)) :: Nil)
sql("DROP TABLE mapInParquet")
}
}
test("SPARK-6024 wide schema support") {
withSQLConf(SQLConf.SCHEMA_STRING_LENGTH_THRESHOLD -> "4000") {
withTable("wide_schema") {
// We will need 80 splits for this schema if the threshold is 4000.
val schema = StructType((1 to 5000).map(i => StructField(s"c_${i}", StringType, true)))
assert(
schema.json.size > conf.schemaStringLengthThreshold,
"To correctly test the fix of SPARK-6024, the value of " +
s"spark.sql.sources.schemaStringLengthThreshold needs to be less than ${schema.json.size}")
val schema = StructType((1 to 5000).map(i => StructField(s"c_$i", StringType, true)))
// Manually create a metastore data source table.
catalog.createDataSourceTable(
tableName = "wide_schema",
@ -680,11 +727,13 @@ class MetastoreDataSourcesSuite extends QueryTest with BeforeAndAfterEach {
val actualSchema = table("wide_schema").schema
assert(schema === actualSchema)
}
}
}
test("SPARK-6655 still support a schema stored in spark.sql.sources.schema") {
val tableName = "spark6655"
withTable(tableName) {
val schema = StructType(StructField("int", IntegerType, true) :: Nil)
val hiveTable = HiveTable(
specifiedDatabase = Some("default"),
name = tableName,
@ -703,21 +752,18 @@ class MetastoreDataSourcesSuite extends QueryTest with BeforeAndAfterEach {
invalidateTable(tableName)
val actualSchema = table(tableName).schema
assert(schema === actualSchema)
sql(s"drop table $tableName")
}
}
test("Saving partition columns information") {
val df =
sparkContext.parallelize(1 to 10, 4).map { i =>
Tuple4(i, i + 1, s"str$i", s"str${i + 1}")
}.toDF("a", "b", "c", "d")
val df = (1 to 10).map(i => (i, i + 1, s"str$i", s"str${i + 1}")).toDF("a", "b", "c", "d")
val tableName = s"partitionInfo_${System.currentTimeMillis()}"
withTable(tableName) {
df.write.format("parquet").partitionBy("d", "b").saveAsTable(tableName)
invalidateTable(tableName)
val metastoreTable = catalog.client.getTable("default", tableName)
val expectedPartitionColumns =
StructType(df.schema("d") :: df.schema("b") :: Nil)
val expectedPartitionColumns = StructType(df.schema("d") :: df.schema("b") :: Nil)
val actualPartitionColumns =
StructType(
metastoreTable.partitionColumns.map(c =>
@ -730,16 +776,17 @@ class MetastoreDataSourcesSuite extends QueryTest with BeforeAndAfterEach {
// Check the content of the saved table.
checkAnswer(
table(tableName).selectExpr("c", "b", "d", "a"),
df.selectExpr("c", "b", "d", "a").collect())
sql(s"drop table $tableName")
table(tableName).select("c", "b", "d", "a"),
df.select("c", "b", "d", "a"))
}
}
test("insert into a table") {
def createDF(from: Int, to: Int): DataFrame =
createDataFrame((from to to).map(i => Tuple2(i, s"str$i"))).toDF("c1", "c2")
def createDF(from: Int, to: Int): DataFrame = {
(from to to).map(i => i -> s"str$i").toDF("c1", "c2")
}
withTable("insertParquet") {
createDF(0, 9).write.format("parquet").saveAsTable("insertParquet")
checkAnswer(
sql("SELECT p.c1, p.c2 FROM insertParquet p WHERE p.c1 > 5"),
@ -787,4 +834,5 @@ class MetastoreDataSourcesSuite extends QueryTest with BeforeAndAfterEach {
sql("SELECT p.c1, c2 FROM insertParquet p"),
(70 to 79).map(i => Row(i, s"str$i")))
}
}
}