[SPARK-25465][TEST] Refactor Parquet test suites in project Hive

## What changes were proposed in this pull request?

Current the file [parquetSuites.scala](f29c2b5287/sql/hive/src/test/scala/org/apache/spark/sql/hive/parquetSuites.scala) is not recognizable.
When I tried to find test suites for built-in Parquet conversions for Hive serde, I can only find [HiveParquetSuite](f29c2b5287/sql/hive/src/test/scala/org/apache/spark/sql/hive/HiveParquetSuite.scala) in the first few minutes.

This PR is to:
1. Rename `ParquetMetastoreSuite` to `HiveParquetMetastoreSuite`, and create a single file for it.
2. Rename `ParquetSourceSuite` to `HiveParquetSourceSuite`, and create a single file for it.
3. Create a single file for `ParquetPartitioningTest`.
4. Delete `parquetSuites.scala` .

## How was this patch tested?

Unit test

Closes #22467 from gengliangwang/refactor_parquet_suites.

Authored-by: Gengliang Wang <gengliang.wang@databricks.com>
Signed-off-by: gatorsmile <gatorsmile@gmail.com>
This commit is contained in:
Gengliang Wang 2018-09-22 09:44:46 -07:00 committed by gatorsmile
parent 40edab209b
commit 6ca87eb2e0
3 changed files with 555 additions and 500 deletions

View file

@ -19,44 +19,18 @@ package org.apache.spark.sql.hive
import java.io.File
import org.apache.spark.sql._
import org.apache.spark.sql.{DataFrame, Row}
import org.apache.spark.sql.catalyst.TableIdentifier
import org.apache.spark.sql.catalyst.catalog.HiveTableRelation
import org.apache.spark.sql.catalyst.plans.logical.LogicalPlan
import org.apache.spark.sql.execution.DataSourceScanExec
import org.apache.spark.sql.execution.datasources._
import org.apache.spark.sql.execution.datasources.{HadoopFsRelation, InsertIntoHadoopFsRelationCommand, LogicalRelation}
import org.apache.spark.sql.hive.execution.HiveTableScanExec
import org.apache.spark.sql.hive.test.TestHiveSingleton
import org.apache.spark.sql.internal.SQLConf
import org.apache.spark.sql.test.SQLTestUtils
import org.apache.spark.sql.types._
import org.apache.spark.util.Utils
// The data where the partitioning key exists only in the directory structure.
case class ParquetData(intField: Int, stringField: String)
// The data that also includes the partitioning key
case class ParquetDataWithKey(p: Int, intField: Int, stringField: String)
case class StructContainer(intStructField: Int, stringStructField: String)
case class ParquetDataWithComplexTypes(
intField: Int,
stringField: String,
structField: StructContainer,
arrayField: Seq[Int])
case class ParquetDataWithKeyAndComplexTypes(
p: Int,
intField: Int,
stringField: String,
structField: StructContainer,
arrayField: Seq[Int])
/**
* A suite to test the automatic conversion of metastore tables with parquet data to use the
* built in parquet support.
*/
class ParquetMetastoreSuite extends ParquetPartitioningTest {
class HiveParquetMetastoreSuite extends ParquetPartitioningTest {
import hiveContext._
import spark.implicits._
@ -70,78 +44,83 @@ class ParquetMetastoreSuite extends ParquetPartitioningTest {
"jt",
"jt_array",
"test_parquet")
sql(s"""
create external table partitioned_parquet
(
intField INT,
stringField STRING
)
PARTITIONED BY (p int)
ROW FORMAT SERDE 'org.apache.hadoop.hive.ql.io.parquet.serde.ParquetHiveSerDe'
STORED AS
INPUTFORMAT 'org.apache.hadoop.hive.ql.io.parquet.MapredParquetInputFormat'
OUTPUTFORMAT 'org.apache.hadoop.hive.ql.io.parquet.MapredParquetOutputFormat'
location '${partitionedTableDir.toURI}'
""")
sql(
s"""
|create external table partitioned_parquet
|(
| intField INT,
| stringField STRING
|)
|PARTITIONED BY (p int)
|ROW FORMAT SERDE 'org.apache.hadoop.hive.ql.io.parquet.serde.ParquetHiveSerDe'
| STORED AS
| INPUTFORMAT 'org.apache.hadoop.hive.ql.io.parquet.MapredParquetInputFormat'
| OUTPUTFORMAT 'org.apache.hadoop.hive.ql.io.parquet.MapredParquetOutputFormat'
|location '${partitionedTableDir.toURI}'
""".stripMargin)
sql(s"""
create external table partitioned_parquet_with_key
(
intField INT,
stringField STRING
)
PARTITIONED BY (p int)
ROW FORMAT SERDE 'org.apache.hadoop.hive.ql.io.parquet.serde.ParquetHiveSerDe'
STORED AS
INPUTFORMAT 'org.apache.hadoop.hive.ql.io.parquet.MapredParquetInputFormat'
OUTPUTFORMAT 'org.apache.hadoop.hive.ql.io.parquet.MapredParquetOutputFormat'
location '${partitionedTableDirWithKey.toURI}'
""")
sql(
s"""
|create external table partitioned_parquet_with_key
|(
| intField INT,
| stringField STRING
|)
|PARTITIONED BY (p int)
|ROW FORMAT SERDE 'org.apache.hadoop.hive.ql.io.parquet.serde.ParquetHiveSerDe'
| STORED AS
| INPUTFORMAT 'org.apache.hadoop.hive.ql.io.parquet.MapredParquetInputFormat'
| OUTPUTFORMAT 'org.apache.hadoop.hive.ql.io.parquet.MapredParquetOutputFormat'
|location '${partitionedTableDirWithKey.toURI}'
""".stripMargin)
sql(s"""
create external table normal_parquet
(
intField INT,
stringField STRING
)
ROW FORMAT SERDE 'org.apache.hadoop.hive.ql.io.parquet.serde.ParquetHiveSerDe'
STORED AS
INPUTFORMAT 'org.apache.hadoop.hive.ql.io.parquet.MapredParquetInputFormat'
OUTPUTFORMAT 'org.apache.hadoop.hive.ql.io.parquet.MapredParquetOutputFormat'
location '${new File(normalTableDir, "normal").toURI}'
""")
sql(
s"""
|create external table normal_parquet
|(
| intField INT,
| stringField STRING
|)
|ROW FORMAT SERDE 'org.apache.hadoop.hive.ql.io.parquet.serde.ParquetHiveSerDe'
| STORED AS
| INPUTFORMAT 'org.apache.hadoop.hive.ql.io.parquet.MapredParquetInputFormat'
| OUTPUTFORMAT 'org.apache.hadoop.hive.ql.io.parquet.MapredParquetOutputFormat'
|location '${new File(normalTableDir, "normal").toURI}'
""".stripMargin)
sql(s"""
CREATE EXTERNAL TABLE partitioned_parquet_with_complextypes
(
intField INT,
stringField STRING,
structField STRUCT<intStructField: INT, stringStructField: STRING>,
arrayField ARRAY<INT>
)
PARTITIONED BY (p int)
ROW FORMAT SERDE 'org.apache.hadoop.hive.ql.io.parquet.serde.ParquetHiveSerDe'
STORED AS
INPUTFORMAT 'org.apache.hadoop.hive.ql.io.parquet.MapredParquetInputFormat'
OUTPUTFORMAT 'org.apache.hadoop.hive.ql.io.parquet.MapredParquetOutputFormat'
LOCATION '${partitionedTableDirWithComplexTypes.toURI}'
""")
sql(
s"""
|CREATE EXTERNAL TABLE partitioned_parquet_with_complextypes
|(
| intField INT,
| stringField STRING,
| structField STRUCT<intStructField: INT, stringStructField: STRING>,
| arrayField ARRAY<INT>
|)
|PARTITIONED BY (p int)
|ROW FORMAT SERDE 'org.apache.hadoop.hive.ql.io.parquet.serde.ParquetHiveSerDe'
| STORED AS
| INPUTFORMAT 'org.apache.hadoop.hive.ql.io.parquet.MapredParquetInputFormat'
| OUTPUTFORMAT 'org.apache.hadoop.hive.ql.io.parquet.MapredParquetOutputFormat'
|LOCATION '${partitionedTableDirWithComplexTypes.toURI}'
""".stripMargin)
sql(s"""
CREATE EXTERNAL TABLE partitioned_parquet_with_key_and_complextypes
(
intField INT,
stringField STRING,
structField STRUCT<intStructField: INT, stringStructField: STRING>,
arrayField ARRAY<INT>
)
PARTITIONED BY (p int)
ROW FORMAT SERDE 'org.apache.hadoop.hive.ql.io.parquet.serde.ParquetHiveSerDe'
STORED AS
INPUTFORMAT 'org.apache.hadoop.hive.ql.io.parquet.MapredParquetInputFormat'
OUTPUTFORMAT 'org.apache.hadoop.hive.ql.io.parquet.MapredParquetOutputFormat'
LOCATION '${partitionedTableDirWithKeyAndComplexTypes.toURI}'
""")
sql(
s"""
|CREATE EXTERNAL TABLE partitioned_parquet_with_key_and_complextypes
|(
| intField INT,
| stringField STRING,
| structField STRUCT<intStructField: INT, stringStructField: STRING>,
| arrayField ARRAY<INT>
|)
|PARTITIONED BY (p int)
|ROW FORMAT SERDE 'org.apache.hadoop.hive.ql.io.parquet.serde.ParquetHiveSerDe'
| STORED AS
| INPUTFORMAT 'org.apache.hadoop.hive.ql.io.parquet.MapredParquetInputFormat'
| OUTPUTFORMAT 'org.apache.hadoop.hive.ql.io.parquet.MapredParquetOutputFormat'
|LOCATION '${partitionedTableDirWithKeyAndComplexTypes.toURI}'
""".stripMargin)
sql(
"""
@ -678,404 +657,3 @@ class ParquetMetastoreSuite extends ParquetPartitioningTest {
sql("SELECT * FROM normal_parquet x CROSS JOIN normal_parquet y"))
}
}
/**
* A suite of tests for the Parquet support through the data sources API.
*/
class ParquetSourceSuite extends ParquetPartitioningTest {
import testImplicits._
import spark._
override def beforeAll(): Unit = {
super.beforeAll()
dropTables("partitioned_parquet",
"partitioned_parquet_with_key",
"partitioned_parquet_with_complextypes",
"partitioned_parquet_with_key_and_complextypes",
"normal_parquet")
sql( s"""
CREATE TEMPORARY VIEW partitioned_parquet
USING org.apache.spark.sql.parquet
OPTIONS (
path '${partitionedTableDir.toURI}'
)
""")
sql( s"""
CREATE TEMPORARY VIEW partitioned_parquet_with_key
USING org.apache.spark.sql.parquet
OPTIONS (
path '${partitionedTableDirWithKey.toURI}'
)
""")
sql( s"""
CREATE TEMPORARY VIEW normal_parquet
USING org.apache.spark.sql.parquet
OPTIONS (
path '${new File(partitionedTableDir, "p=1").toURI}'
)
""")
sql( s"""
CREATE TEMPORARY VIEW partitioned_parquet_with_key_and_complextypes
USING org.apache.spark.sql.parquet
OPTIONS (
path '${partitionedTableDirWithKeyAndComplexTypes.toURI}'
)
""")
sql( s"""
CREATE TEMPORARY VIEW partitioned_parquet_with_complextypes
USING org.apache.spark.sql.parquet
OPTIONS (
path '${partitionedTableDirWithComplexTypes.toURI}'
)
""")
}
test("SPARK-6016 make sure to use the latest footers") {
sql("drop table if exists spark_6016_fix")
// Create a DataFrame with two partitions. So, the created table will have two parquet files.
val df1 = (1 to 10).map(Tuple1(_)).toDF("a").coalesce(2)
df1.write.mode(SaveMode.Overwrite).format("parquet").saveAsTable("spark_6016_fix")
checkAnswer(
sql("select * from spark_6016_fix"),
(1 to 10).map(i => Row(i))
)
// Create a DataFrame with four partitions. So, the created table will have four parquet files.
val df2 = (1 to 10).map(Tuple1(_)).toDF("b").coalesce(4)
df2.write.mode(SaveMode.Overwrite).format("parquet").saveAsTable("spark_6016_fix")
// For the bug of SPARK-6016, we are caching two outdated footers for df1. Then,
// since the new table has four parquet files, we are trying to read new footers from two files
// and then merge metadata in footers of these four (two outdated ones and two latest one),
// which will cause an error.
checkAnswer(
sql("select * from spark_6016_fix"),
(1 to 10).map(i => Row(i))
)
sql("drop table spark_6016_fix")
}
test("SPARK-8811: compatibility with array of struct in Hive") {
withTempPath { dir =>
withTable("array_of_struct") {
val conf = Seq(
HiveUtils.CONVERT_METASTORE_PARQUET.key -> "false",
SQLConf.PARQUET_BINARY_AS_STRING.key -> "true",
SQLConf.PARQUET_WRITE_LEGACY_FORMAT.key -> "false")
withSQLConf(conf: _*) {
sql(
s"""CREATE TABLE array_of_struct
|STORED AS PARQUET LOCATION '${dir.toURI}'
|AS SELECT
| '1st' AS a,
| '2nd' AS b,
| ARRAY(NAMED_STRUCT('a', 'val_a', 'b', 'val_b')) AS c
""".stripMargin)
checkAnswer(
spark.read.parquet(dir.getCanonicalPath),
Row("1st", "2nd", Seq(Row("val_a", "val_b"))))
}
}
}
}
test("Verify the PARQUET conversion parameter: CONVERT_METASTORE_PARQUET") {
withTempView("single") {
val singleRowDF = Seq((0, "foo")).toDF("key", "value")
singleRowDF.createOrReplaceTempView("single")
Seq("true", "false").foreach { parquetConversion =>
withSQLConf(HiveUtils.CONVERT_METASTORE_PARQUET.key -> parquetConversion) {
val tableName = "test_parquet_ctas"
withTable(tableName) {
sql(
s"""
|CREATE TABLE $tableName STORED AS PARQUET
|AS SELECT tmp.key, tmp.value FROM single tmp
""".stripMargin)
val df = spark.sql(s"SELECT * FROM $tableName WHERE key=0")
checkAnswer(df, singleRowDF)
val queryExecution = df.queryExecution
if (parquetConversion == "true") {
queryExecution.analyzed.collectFirst {
case _: LogicalRelation =>
}.getOrElse {
fail(s"Expecting the query plan to convert parquet to data sources, " +
s"but got:\n$queryExecution")
}
} else {
queryExecution.analyzed.collectFirst {
case _: HiveTableRelation =>
}.getOrElse {
fail(s"Expecting no conversion from parquet to data sources, " +
s"but got:\n$queryExecution")
}
}
}
}
}
}
}
test("values in arrays and maps stored in parquet are always nullable") {
val df = createDataFrame(Tuple2(Map(2 -> 3), Seq(4, 5, 6)) :: Nil).toDF("m", "a")
val mapType1 = MapType(IntegerType, IntegerType, valueContainsNull = false)
val arrayType1 = ArrayType(IntegerType, containsNull = false)
val expectedSchema1 =
StructType(
StructField("m", mapType1, nullable = true) ::
StructField("a", arrayType1, nullable = true) :: Nil)
assert(df.schema === expectedSchema1)
withTable("alwaysNullable") {
df.write.format("parquet").saveAsTable("alwaysNullable")
val mapType2 = MapType(IntegerType, IntegerType, valueContainsNull = true)
val arrayType2 = ArrayType(IntegerType, containsNull = true)
val expectedSchema2 =
StructType(
StructField("m", mapType2, nullable = true) ::
StructField("a", arrayType2, nullable = true) :: Nil)
assert(table("alwaysNullable").schema === expectedSchema2)
checkAnswer(
sql("SELECT m, a FROM alwaysNullable"),
Row(Map(2 -> 3), Seq(4, 5, 6)))
}
}
test("Aggregation attribute names can't contain special chars \" ,;{}()\\n\\t=\"") {
val tempDir = Utils.createTempDir()
val filePath = new File(tempDir, "testParquet").getCanonicalPath
val filePath2 = new File(tempDir, "testParquet2").getCanonicalPath
val df = Seq(1, 2, 3).map(i => (i, i.toString)).toDF("int", "str")
val df2 = df.as('x).join(df.as('y), $"x.str" === $"y.str").groupBy("y.str").max("y.int")
intercept[Throwable](df2.write.parquet(filePath))
val df3 = df2.toDF("str", "max_int")
df3.write.parquet(filePath2)
val df4 = read.parquet(filePath2)
checkAnswer(df4, Row("1", 1) :: Row("2", 2) :: Row("3", 3) :: Nil)
assert(df4.columns === Array("str", "max_int"))
}
}
/**
* A collection of tests for parquet data with various forms of partitioning.
*/
abstract class ParquetPartitioningTest extends QueryTest with SQLTestUtils with TestHiveSingleton {
import testImplicits._
var partitionedTableDir: File = null
var normalTableDir: File = null
var partitionedTableDirWithKey: File = null
var partitionedTableDirWithComplexTypes: File = null
var partitionedTableDirWithKeyAndComplexTypes: File = null
override def beforeAll(): Unit = {
super.beforeAll()
partitionedTableDir = Utils.createTempDir()
normalTableDir = Utils.createTempDir()
(1 to 10).foreach { p =>
val partDir = new File(partitionedTableDir, s"p=$p")
sparkContext.makeRDD(1 to 10)
.map(i => ParquetData(i, s"part-$p"))
.toDF()
.write.parquet(partDir.getCanonicalPath)
}
sparkContext
.makeRDD(1 to 10)
.map(i => ParquetData(i, s"part-1"))
.toDF()
.write.parquet(new File(normalTableDir, "normal").getCanonicalPath)
partitionedTableDirWithKey = Utils.createTempDir()
(1 to 10).foreach { p =>
val partDir = new File(partitionedTableDirWithKey, s"p=$p")
sparkContext.makeRDD(1 to 10)
.map(i => ParquetDataWithKey(p, i, s"part-$p"))
.toDF()
.write.parquet(partDir.getCanonicalPath)
}
partitionedTableDirWithKeyAndComplexTypes = Utils.createTempDir()
(1 to 10).foreach { p =>
val partDir = new File(partitionedTableDirWithKeyAndComplexTypes, s"p=$p")
sparkContext.makeRDD(1 to 10).map { i =>
ParquetDataWithKeyAndComplexTypes(
p, i, s"part-$p", StructContainer(i, f"${i}_string"), 1 to i)
}.toDF().write.parquet(partDir.getCanonicalPath)
}
partitionedTableDirWithComplexTypes = Utils.createTempDir()
(1 to 10).foreach { p =>
val partDir = new File(partitionedTableDirWithComplexTypes, s"p=$p")
sparkContext.makeRDD(1 to 10).map { i =>
ParquetDataWithComplexTypes(i, s"part-$p", StructContainer(i, f"${i}_string"), 1 to i)
}.toDF().write.parquet(partDir.getCanonicalPath)
}
}
override protected def afterAll(): Unit = {
try {
partitionedTableDir.delete()
normalTableDir.delete()
partitionedTableDirWithKey.delete()
partitionedTableDirWithComplexTypes.delete()
partitionedTableDirWithKeyAndComplexTypes.delete()
} finally {
super.afterAll()
}
}
/**
* Drop named tables if they exist
*
* @param tableNames tables to drop
*/
def dropTables(tableNames: String*): Unit = {
tableNames.foreach { name =>
sql(s"DROP TABLE IF EXISTS $name")
}
}
Seq(
"partitioned_parquet",
"partitioned_parquet_with_key",
"partitioned_parquet_with_complextypes",
"partitioned_parquet_with_key_and_complextypes").foreach { table =>
test(s"ordering of the partitioning columns $table") {
checkAnswer(
sql(s"SELECT p, stringField FROM $table WHERE p = 1"),
Seq.fill(10)(Row(1, "part-1"))
)
checkAnswer(
sql(s"SELECT stringField, p FROM $table WHERE p = 1"),
Seq.fill(10)(Row("part-1", 1))
)
}
test(s"project the partitioning column $table") {
checkAnswer(
sql(s"SELECT p, count(*) FROM $table group by p"),
Row(1, 10) ::
Row(2, 10) ::
Row(3, 10) ::
Row(4, 10) ::
Row(5, 10) ::
Row(6, 10) ::
Row(7, 10) ::
Row(8, 10) ::
Row(9, 10) ::
Row(10, 10) :: Nil
)
}
test(s"project partitioning and non-partitioning columns $table") {
checkAnswer(
sql(s"SELECT stringField, p, count(intField) FROM $table GROUP BY p, stringField"),
Row("part-1", 1, 10) ::
Row("part-2", 2, 10) ::
Row("part-3", 3, 10) ::
Row("part-4", 4, 10) ::
Row("part-5", 5, 10) ::
Row("part-6", 6, 10) ::
Row("part-7", 7, 10) ::
Row("part-8", 8, 10) ::
Row("part-9", 9, 10) ::
Row("part-10", 10, 10) :: Nil
)
}
test(s"simple count $table") {
checkAnswer(
sql(s"SELECT COUNT(*) FROM $table"),
Row(100))
}
test(s"pruned count $table") {
checkAnswer(
sql(s"SELECT COUNT(*) FROM $table WHERE p = 1"),
Row(10))
}
test(s"non-existent partition $table") {
checkAnswer(
sql(s"SELECT COUNT(*) FROM $table WHERE p = 1000"),
Row(0))
}
test(s"multi-partition pruned count $table") {
checkAnswer(
sql(s"SELECT COUNT(*) FROM $table WHERE p IN (1,2,3)"),
Row(30))
}
test(s"non-partition predicates $table") {
checkAnswer(
sql(s"SELECT COUNT(*) FROM $table WHERE intField IN (1,2,3)"),
Row(30))
}
test(s"sum $table") {
checkAnswer(
sql(s"SELECT SUM(intField) FROM $table WHERE intField IN (1,2,3) AND p = 1"),
Row(1 + 2 + 3))
}
test(s"hive udfs $table") {
checkAnswer(
sql(s"SELECT concat(stringField, stringField) FROM $table"),
sql(s"SELECT stringField FROM $table").rdd.map {
case Row(s: String) => Row(s + s)
}.collect().toSeq)
}
}
Seq(
"partitioned_parquet_with_key_and_complextypes",
"partitioned_parquet_with_complextypes").foreach { table =>
test(s"SPARK-5775 read struct from $table") {
checkAnswer(
sql(
s"""
|SELECT p, structField.intStructField, structField.stringStructField
|FROM $table WHERE p = 1
""".stripMargin),
(1 to 10).map(i => Row(1, i, f"${i}_string")))
}
test(s"SPARK-5775 read array from $table") {
checkAnswer(
sql(s"SELECT arrayField, p FROM $table WHERE p = 1"),
(1 to 10).map(i => Row((1 to i).toArray, 1)))
}
}
test("non-part select(*)") {
checkAnswer(
sql("SELECT COUNT(*) FROM normal_parquet"),
Row(10))
}
}

View file

@ -0,0 +1,225 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.spark.sql.hive
import java.io.File
import org.apache.spark.sql.{Row, SaveMode}
import org.apache.spark.sql.catalyst.catalog.HiveTableRelation
import org.apache.spark.sql.execution.datasources.LogicalRelation
import org.apache.spark.sql.internal.SQLConf
import org.apache.spark.sql.types._
import org.apache.spark.util.Utils
/**
* A suite of tests for the Parquet support through the data sources API.
*/
class HiveParquetSourceSuite extends ParquetPartitioningTest {
import testImplicits._
import spark._
override def beforeAll(): Unit = {
super.beforeAll()
dropTables("partitioned_parquet",
"partitioned_parquet_with_key",
"partitioned_parquet_with_complextypes",
"partitioned_parquet_with_key_and_complextypes",
"normal_parquet")
sql(
s"""
|CREATE TEMPORARY VIEW partitioned_parquet
|USING org.apache.spark.sql.parquet
|OPTIONS (
| path '${partitionedTableDir.toURI}'
|)
""".stripMargin)
sql(
s"""
|CREATE TEMPORARY VIEW partitioned_parquet_with_key
|USING org.apache.spark.sql.parquet
|OPTIONS (
| path '${partitionedTableDirWithKey.toURI}'
|)
""".stripMargin)
sql(
s"""
|CREATE TEMPORARY VIEW normal_parquet
|USING org.apache.spark.sql.parquet
|OPTIONS (
| path '${new File(partitionedTableDir, "p=1").toURI}'
|)
""".stripMargin)
sql(
s"""
|CREATE TEMPORARY VIEW partitioned_parquet_with_key_and_complextypes
|USING org.apache.spark.sql.parquet
|OPTIONS (
| path '${partitionedTableDirWithKeyAndComplexTypes.toURI}'
|)
""".stripMargin)
sql(
s"""
|CREATE TEMPORARY VIEW partitioned_parquet_with_complextypes
|USING org.apache.spark.sql.parquet
|OPTIONS (
| path '${partitionedTableDirWithComplexTypes.toURI}'
|)
""".stripMargin)
}
test("SPARK-6016 make sure to use the latest footers") {
val tableName = "spark_6016_fix"
withTable(tableName) {
// Create a DataFrame with two partitions. So, the created table will have two parquet files.
val df1 = (1 to 10).map(Tuple1(_)).toDF("a").coalesce(2)
df1.write.mode(SaveMode.Overwrite).format("parquet").saveAsTable(tableName)
checkAnswer(
sql(s"select * from $tableName"),
(1 to 10).map(i => Row(i))
)
// Create a DataFrame with four partitions. So the created table will have four parquet files.
val df2 = (1 to 10).map(Tuple1(_)).toDF("b").coalesce(4)
df2.write.mode(SaveMode.Overwrite).format("parquet").saveAsTable(tableName)
// For the bug of SPARK-6016, we are caching two outdated footers for df1. Then,
// since the new table has four parquet files, we are trying to read new footers from two
// files and then merge metadata in footers of these four
// (two outdated ones and two latest one), which will cause an error.
checkAnswer(
sql(s"select * from $tableName"),
(1 to 10).map(i => Row(i))
)
}
}
test("SPARK-8811: compatibility with array of struct in Hive") {
withTempPath { dir =>
withTable("array_of_struct") {
val conf = Seq(
HiveUtils.CONVERT_METASTORE_PARQUET.key -> "false",
SQLConf.PARQUET_BINARY_AS_STRING.key -> "true",
SQLConf.PARQUET_WRITE_LEGACY_FORMAT.key -> "false")
withSQLConf(conf: _*) {
sql(
s"""CREATE TABLE array_of_struct
|STORED AS PARQUET LOCATION '${dir.toURI}'
|AS SELECT
| '1st' AS a,
| '2nd' AS b,
| ARRAY(NAMED_STRUCT('a', 'val_a', 'b', 'val_b')) AS c
""".stripMargin)
checkAnswer(
spark.read.parquet(dir.getCanonicalPath),
Row("1st", "2nd", Seq(Row("val_a", "val_b"))))
}
}
}
}
test("Verify the PARQUET conversion parameter: CONVERT_METASTORE_PARQUET") {
withTempView("single") {
val singleRowDF = Seq((0, "foo")).toDF("key", "value")
singleRowDF.createOrReplaceTempView("single")
Seq("true", "false").foreach { parquetConversion =>
withSQLConf(HiveUtils.CONVERT_METASTORE_PARQUET.key -> parquetConversion) {
val tableName = "test_parquet_ctas"
withTable(tableName) {
sql(
s"""
|CREATE TABLE $tableName STORED AS PARQUET
|AS SELECT tmp.key, tmp.value FROM single tmp
""".stripMargin)
val df = spark.sql(s"SELECT * FROM $tableName WHERE key=0")
checkAnswer(df, singleRowDF)
val queryExecution = df.queryExecution
if (parquetConversion == "true") {
queryExecution.analyzed.collectFirst {
case _: LogicalRelation =>
}.getOrElse {
fail(s"Expecting the query plan to convert parquet to data sources, " +
s"but got:\n$queryExecution")
}
} else {
queryExecution.analyzed.collectFirst {
case _: HiveTableRelation =>
}.getOrElse {
fail(s"Expecting no conversion from parquet to data sources, " +
s"but got:\n$queryExecution")
}
}
}
}
}
}
}
test("values in arrays and maps stored in parquet are always nullable") {
val df = createDataFrame(Tuple2(Map(2 -> 3), Seq(4, 5, 6)) :: Nil).toDF("m", "a")
val mapType1 = MapType(IntegerType, IntegerType, valueContainsNull = false)
val arrayType1 = ArrayType(IntegerType, containsNull = false)
val expectedSchema1 =
StructType(
StructField("m", mapType1, nullable = true) ::
StructField("a", arrayType1, nullable = true) :: Nil)
assert(df.schema === expectedSchema1)
withTable("alwaysNullable") {
df.write.format("parquet").saveAsTable("alwaysNullable")
val mapType2 = MapType(IntegerType, IntegerType, valueContainsNull = true)
val arrayType2 = ArrayType(IntegerType, containsNull = true)
val expectedSchema2 =
StructType(
StructField("m", mapType2, nullable = true) ::
StructField("a", arrayType2, nullable = true) :: Nil)
assert(table("alwaysNullable").schema === expectedSchema2)
checkAnswer(
sql("SELECT m, a FROM alwaysNullable"),
Row(Map(2 -> 3), Seq(4, 5, 6)))
}
}
test("Aggregation attribute names can't contain special chars \" ,;{}()\\n\\t=\"") {
withTempDir { tempDir =>
val filePath = new File(tempDir, "testParquet").getCanonicalPath
val filePath2 = new File(tempDir, "testParquet2").getCanonicalPath
val df = Seq(1, 2, 3).map(i => (i, i.toString)).toDF("int", "str")
val df2 = df.as('x).join(df.as('y), $"x.str" === $"y.str").groupBy("y.str").max("y.int")
intercept[Throwable](df2.write.parquet(filePath))
val df3 = df2.toDF("str", "max_int")
df3.write.parquet(filePath2)
val df4 = read.parquet(filePath2)
checkAnswer(df4, Row("1", 1) :: Row("2", 2) :: Row("3", 3) :: Nil)
assert(df4.columns === Array("str", "max_int"))
}
}
}

View file

@ -0,0 +1,252 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.spark.sql.hive
import java.io.File
import org.apache.spark.sql._
import org.apache.spark.sql.hive.test.TestHiveSingleton
import org.apache.spark.sql.test.SQLTestUtils
import org.apache.spark.util.Utils
// The data where the partitioning key exists only in the directory structure.
case class ParquetData(intField: Int, stringField: String)
// The data that also includes the partitioning key
case class ParquetDataWithKey(p: Int, intField: Int, stringField: String)
case class StructContainer(intStructField: Int, stringStructField: String)
case class ParquetDataWithComplexTypes(
intField: Int,
stringField: String,
structField: StructContainer,
arrayField: Seq[Int])
case class ParquetDataWithKeyAndComplexTypes(
p: Int,
intField: Int,
stringField: String,
structField: StructContainer,
arrayField: Seq[Int])
/**
* A collection of tests for parquet data with various forms of partitioning.
*/
abstract class ParquetPartitioningTest extends QueryTest with SQLTestUtils with TestHiveSingleton {
import testImplicits._
var partitionedTableDir: File = null
var normalTableDir: File = null
var partitionedTableDirWithKey: File = null
var partitionedTableDirWithComplexTypes: File = null
var partitionedTableDirWithKeyAndComplexTypes: File = null
override def beforeAll(): Unit = {
super.beforeAll()
partitionedTableDir = Utils.createTempDir()
normalTableDir = Utils.createTempDir()
(1 to 10).foreach { p =>
val partDir = new File(partitionedTableDir, s"p=$p")
sparkContext.makeRDD(1 to 10)
.map(i => ParquetData(i, s"part-$p"))
.toDF()
.write.parquet(partDir.getCanonicalPath)
}
sparkContext
.makeRDD(1 to 10)
.map(i => ParquetData(i, s"part-1"))
.toDF()
.write.parquet(new File(normalTableDir, "normal").getCanonicalPath)
partitionedTableDirWithKey = Utils.createTempDir()
(1 to 10).foreach { p =>
val partDir = new File(partitionedTableDirWithKey, s"p=$p")
sparkContext.makeRDD(1 to 10)
.map(i => ParquetDataWithKey(p, i, s"part-$p"))
.toDF()
.write.parquet(partDir.getCanonicalPath)
}
partitionedTableDirWithKeyAndComplexTypes = Utils.createTempDir()
(1 to 10).foreach { p =>
val partDir = new File(partitionedTableDirWithKeyAndComplexTypes, s"p=$p")
sparkContext.makeRDD(1 to 10).map { i =>
ParquetDataWithKeyAndComplexTypes(
p, i, s"part-$p", StructContainer(i, f"${i}_string"), 1 to i)
}.toDF().write.parquet(partDir.getCanonicalPath)
}
partitionedTableDirWithComplexTypes = Utils.createTempDir()
(1 to 10).foreach { p =>
val partDir = new File(partitionedTableDirWithComplexTypes, s"p=$p")
sparkContext.makeRDD(1 to 10).map { i =>
ParquetDataWithComplexTypes(i, s"part-$p", StructContainer(i, f"${i}_string"), 1 to i)
}.toDF().write.parquet(partDir.getCanonicalPath)
}
}
override protected def afterAll(): Unit = {
try {
partitionedTableDir.delete()
normalTableDir.delete()
partitionedTableDirWithKey.delete()
partitionedTableDirWithComplexTypes.delete()
partitionedTableDirWithKeyAndComplexTypes.delete()
} finally {
super.afterAll()
}
}
/**
* Drop named tables if they exist
*
* @param tableNames tables to drop
*/
def dropTables(tableNames: String*): Unit = {
tableNames.foreach { name =>
sql(s"DROP TABLE IF EXISTS $name")
}
}
Seq(
"partitioned_parquet",
"partitioned_parquet_with_key",
"partitioned_parquet_with_complextypes",
"partitioned_parquet_with_key_and_complextypes").foreach { table =>
test(s"ordering of the partitioning columns $table") {
checkAnswer(
sql(s"SELECT p, stringField FROM $table WHERE p = 1"),
Seq.fill(10)(Row(1, "part-1"))
)
checkAnswer(
sql(s"SELECT stringField, p FROM $table WHERE p = 1"),
Seq.fill(10)(Row("part-1", 1))
)
}
test(s"project the partitioning column $table") {
checkAnswer(
sql(s"SELECT p, count(*) FROM $table group by p"),
Row(1, 10) ::
Row(2, 10) ::
Row(3, 10) ::
Row(4, 10) ::
Row(5, 10) ::
Row(6, 10) ::
Row(7, 10) ::
Row(8, 10) ::
Row(9, 10) ::
Row(10, 10) :: Nil
)
}
test(s"project partitioning and non-partitioning columns $table") {
checkAnswer(
sql(s"SELECT stringField, p, count(intField) FROM $table GROUP BY p, stringField"),
Row("part-1", 1, 10) ::
Row("part-2", 2, 10) ::
Row("part-3", 3, 10) ::
Row("part-4", 4, 10) ::
Row("part-5", 5, 10) ::
Row("part-6", 6, 10) ::
Row("part-7", 7, 10) ::
Row("part-8", 8, 10) ::
Row("part-9", 9, 10) ::
Row("part-10", 10, 10) :: Nil
)
}
test(s"simple count $table") {
checkAnswer(
sql(s"SELECT COUNT(*) FROM $table"),
Row(100))
}
test(s"pruned count $table") {
checkAnswer(
sql(s"SELECT COUNT(*) FROM $table WHERE p = 1"),
Row(10))
}
test(s"non-existent partition $table") {
checkAnswer(
sql(s"SELECT COUNT(*) FROM $table WHERE p = 1000"),
Row(0))
}
test(s"multi-partition pruned count $table") {
checkAnswer(
sql(s"SELECT COUNT(*) FROM $table WHERE p IN (1,2,3)"),
Row(30))
}
test(s"non-partition predicates $table") {
checkAnswer(
sql(s"SELECT COUNT(*) FROM $table WHERE intField IN (1,2,3)"),
Row(30))
}
test(s"sum $table") {
checkAnswer(
sql(s"SELECT SUM(intField) FROM $table WHERE intField IN (1,2,3) AND p = 1"),
Row(1 + 2 + 3))
}
test(s"hive udfs $table") {
checkAnswer(
sql(s"SELECT concat(stringField, stringField) FROM $table"),
sql(s"SELECT stringField FROM $table").rdd.map {
case Row(s: String) => Row(s + s)
}.collect().toSeq)
}
}
Seq(
"partitioned_parquet_with_key_and_complextypes",
"partitioned_parquet_with_complextypes").foreach { table =>
test(s"SPARK-5775 read struct from $table") {
checkAnswer(
sql(
s"""
|SELECT p, structField.intStructField, structField.stringStructField
|FROM $table WHERE p = 1
""".stripMargin),
(1 to 10).map(i => Row(1, i, f"${i}_string")))
}
test(s"SPARK-5775 read array from $table") {
checkAnswer(
sql(s"SELECT arrayField, p FROM $table WHERE p = 1"),
(1 to 10).map(i => Row((1 to i).toArray, 1)))
}
}
test("non-part select(*)") {
checkAnswer(
sql("SELECT COUNT(*) FROM normal_parquet"),
Row(10))
}
}