diff --git a/sql/hive/src/test/scala/org/apache/spark/sql/hive/parquetSuites.scala b/sql/hive/src/test/scala/org/apache/spark/sql/hive/HiveParquetMetastoreSuite.scala similarity index 55% rename from sql/hive/src/test/scala/org/apache/spark/sql/hive/parquetSuites.scala rename to sql/hive/src/test/scala/org/apache/spark/sql/hive/HiveParquetMetastoreSuite.scala index e82d457eee..0d4f040156 100644 --- a/sql/hive/src/test/scala/org/apache/spark/sql/hive/parquetSuites.scala +++ b/sql/hive/src/test/scala/org/apache/spark/sql/hive/HiveParquetMetastoreSuite.scala @@ -19,44 +19,18 @@ package org.apache.spark.sql.hive import java.io.File -import org.apache.spark.sql._ +import org.apache.spark.sql.{DataFrame, Row} import org.apache.spark.sql.catalyst.TableIdentifier -import org.apache.spark.sql.catalyst.catalog.HiveTableRelation import org.apache.spark.sql.catalyst.plans.logical.LogicalPlan import org.apache.spark.sql.execution.DataSourceScanExec -import org.apache.spark.sql.execution.datasources._ +import org.apache.spark.sql.execution.datasources.{HadoopFsRelation, InsertIntoHadoopFsRelationCommand, LogicalRelation} import org.apache.spark.sql.hive.execution.HiveTableScanExec -import org.apache.spark.sql.hive.test.TestHiveSingleton -import org.apache.spark.sql.internal.SQLConf -import org.apache.spark.sql.test.SQLTestUtils -import org.apache.spark.sql.types._ -import org.apache.spark.util.Utils - -// The data where the partitioning key exists only in the directory structure. -case class ParquetData(intField: Int, stringField: String) -// The data that also includes the partitioning key -case class ParquetDataWithKey(p: Int, intField: Int, stringField: String) - -case class StructContainer(intStructField: Int, stringStructField: String) - -case class ParquetDataWithComplexTypes( - intField: Int, - stringField: String, - structField: StructContainer, - arrayField: Seq[Int]) - -case class ParquetDataWithKeyAndComplexTypes( - p: Int, - intField: Int, - stringField: String, - structField: StructContainer, - arrayField: Seq[Int]) /** * A suite to test the automatic conversion of metastore tables with parquet data to use the * built in parquet support. */ -class ParquetMetastoreSuite extends ParquetPartitioningTest { +class HiveParquetMetastoreSuite extends ParquetPartitioningTest { import hiveContext._ import spark.implicits._ @@ -70,78 +44,83 @@ class ParquetMetastoreSuite extends ParquetPartitioningTest { "jt", "jt_array", "test_parquet") - sql(s""" - create external table partitioned_parquet - ( - intField INT, - stringField STRING - ) - PARTITIONED BY (p int) - ROW FORMAT SERDE 'org.apache.hadoop.hive.ql.io.parquet.serde.ParquetHiveSerDe' - STORED AS - INPUTFORMAT 'org.apache.hadoop.hive.ql.io.parquet.MapredParquetInputFormat' - OUTPUTFORMAT 'org.apache.hadoop.hive.ql.io.parquet.MapredParquetOutputFormat' - location '${partitionedTableDir.toURI}' - """) + sql( + s""" + |create external table partitioned_parquet + |( + | intField INT, + | stringField STRING + |) + |PARTITIONED BY (p int) + |ROW FORMAT SERDE 'org.apache.hadoop.hive.ql.io.parquet.serde.ParquetHiveSerDe' + | STORED AS + | INPUTFORMAT 'org.apache.hadoop.hive.ql.io.parquet.MapredParquetInputFormat' + | OUTPUTFORMAT 'org.apache.hadoop.hive.ql.io.parquet.MapredParquetOutputFormat' + |location '${partitionedTableDir.toURI}' + """.stripMargin) - sql(s""" - create external table partitioned_parquet_with_key - ( - intField INT, - stringField STRING - ) - PARTITIONED BY (p int) - ROW FORMAT SERDE 'org.apache.hadoop.hive.ql.io.parquet.serde.ParquetHiveSerDe' - STORED AS - INPUTFORMAT 'org.apache.hadoop.hive.ql.io.parquet.MapredParquetInputFormat' - OUTPUTFORMAT 'org.apache.hadoop.hive.ql.io.parquet.MapredParquetOutputFormat' - location '${partitionedTableDirWithKey.toURI}' - """) + sql( + s""" + |create external table partitioned_parquet_with_key + |( + | intField INT, + | stringField STRING + |) + |PARTITIONED BY (p int) + |ROW FORMAT SERDE 'org.apache.hadoop.hive.ql.io.parquet.serde.ParquetHiveSerDe' + | STORED AS + | INPUTFORMAT 'org.apache.hadoop.hive.ql.io.parquet.MapredParquetInputFormat' + | OUTPUTFORMAT 'org.apache.hadoop.hive.ql.io.parquet.MapredParquetOutputFormat' + |location '${partitionedTableDirWithKey.toURI}' + """.stripMargin) - sql(s""" - create external table normal_parquet - ( - intField INT, - stringField STRING - ) - ROW FORMAT SERDE 'org.apache.hadoop.hive.ql.io.parquet.serde.ParquetHiveSerDe' - STORED AS - INPUTFORMAT 'org.apache.hadoop.hive.ql.io.parquet.MapredParquetInputFormat' - OUTPUTFORMAT 'org.apache.hadoop.hive.ql.io.parquet.MapredParquetOutputFormat' - location '${new File(normalTableDir, "normal").toURI}' - """) + sql( + s""" + |create external table normal_parquet + |( + | intField INT, + | stringField STRING + |) + |ROW FORMAT SERDE 'org.apache.hadoop.hive.ql.io.parquet.serde.ParquetHiveSerDe' + | STORED AS + | INPUTFORMAT 'org.apache.hadoop.hive.ql.io.parquet.MapredParquetInputFormat' + | OUTPUTFORMAT 'org.apache.hadoop.hive.ql.io.parquet.MapredParquetOutputFormat' + |location '${new File(normalTableDir, "normal").toURI}' + """.stripMargin) - sql(s""" - CREATE EXTERNAL TABLE partitioned_parquet_with_complextypes - ( - intField INT, - stringField STRING, - structField STRUCT, - arrayField ARRAY - ) - PARTITIONED BY (p int) - ROW FORMAT SERDE 'org.apache.hadoop.hive.ql.io.parquet.serde.ParquetHiveSerDe' - STORED AS - INPUTFORMAT 'org.apache.hadoop.hive.ql.io.parquet.MapredParquetInputFormat' - OUTPUTFORMAT 'org.apache.hadoop.hive.ql.io.parquet.MapredParquetOutputFormat' - LOCATION '${partitionedTableDirWithComplexTypes.toURI}' - """) + sql( + s""" + |CREATE EXTERNAL TABLE partitioned_parquet_with_complextypes + |( + | intField INT, + | stringField STRING, + | structField STRUCT, + | arrayField ARRAY + |) + |PARTITIONED BY (p int) + |ROW FORMAT SERDE 'org.apache.hadoop.hive.ql.io.parquet.serde.ParquetHiveSerDe' + | STORED AS + | INPUTFORMAT 'org.apache.hadoop.hive.ql.io.parquet.MapredParquetInputFormat' + | OUTPUTFORMAT 'org.apache.hadoop.hive.ql.io.parquet.MapredParquetOutputFormat' + |LOCATION '${partitionedTableDirWithComplexTypes.toURI}' + """.stripMargin) - sql(s""" - CREATE EXTERNAL TABLE partitioned_parquet_with_key_and_complextypes - ( - intField INT, - stringField STRING, - structField STRUCT, - arrayField ARRAY - ) - PARTITIONED BY (p int) - ROW FORMAT SERDE 'org.apache.hadoop.hive.ql.io.parquet.serde.ParquetHiveSerDe' - STORED AS - INPUTFORMAT 'org.apache.hadoop.hive.ql.io.parquet.MapredParquetInputFormat' - OUTPUTFORMAT 'org.apache.hadoop.hive.ql.io.parquet.MapredParquetOutputFormat' - LOCATION '${partitionedTableDirWithKeyAndComplexTypes.toURI}' - """) + sql( + s""" + |CREATE EXTERNAL TABLE partitioned_parquet_with_key_and_complextypes + |( + | intField INT, + | stringField STRING, + | structField STRUCT, + | arrayField ARRAY + |) + |PARTITIONED BY (p int) + |ROW FORMAT SERDE 'org.apache.hadoop.hive.ql.io.parquet.serde.ParquetHiveSerDe' + | STORED AS + | INPUTFORMAT 'org.apache.hadoop.hive.ql.io.parquet.MapredParquetInputFormat' + | OUTPUTFORMAT 'org.apache.hadoop.hive.ql.io.parquet.MapredParquetOutputFormat' + |LOCATION '${partitionedTableDirWithKeyAndComplexTypes.toURI}' + """.stripMargin) sql( """ @@ -291,7 +270,7 @@ class ParquetMetastoreSuite extends ParquetPartitioningTest { case LogicalRelation(_: HadoopFsRelation, _, _, _) => // OK case _ => fail( "test_parquet_ctas should be converted to " + - s"${classOf[HadoopFsRelation ].getCanonicalName }") + s"${classOf[HadoopFsRelation ].getCanonicalName }") } } } @@ -430,7 +409,7 @@ class ParquetMetastoreSuite extends ParquetPartitioningTest { } test("SPARK-15968: nonempty partitioned metastore Parquet table lookup should use cached " + - "relation") { + "relation") { withTable("partitioned") { sql( """ @@ -678,404 +657,3 @@ class ParquetMetastoreSuite extends ParquetPartitioningTest { sql("SELECT * FROM normal_parquet x CROSS JOIN normal_parquet y")) } } - -/** - * A suite of tests for the Parquet support through the data sources API. - */ -class ParquetSourceSuite extends ParquetPartitioningTest { - import testImplicits._ - import spark._ - - override def beforeAll(): Unit = { - super.beforeAll() - dropTables("partitioned_parquet", - "partitioned_parquet_with_key", - "partitioned_parquet_with_complextypes", - "partitioned_parquet_with_key_and_complextypes", - "normal_parquet") - - sql( s""" - CREATE TEMPORARY VIEW partitioned_parquet - USING org.apache.spark.sql.parquet - OPTIONS ( - path '${partitionedTableDir.toURI}' - ) - """) - - sql( s""" - CREATE TEMPORARY VIEW partitioned_parquet_with_key - USING org.apache.spark.sql.parquet - OPTIONS ( - path '${partitionedTableDirWithKey.toURI}' - ) - """) - - sql( s""" - CREATE TEMPORARY VIEW normal_parquet - USING org.apache.spark.sql.parquet - OPTIONS ( - path '${new File(partitionedTableDir, "p=1").toURI}' - ) - """) - - sql( s""" - CREATE TEMPORARY VIEW partitioned_parquet_with_key_and_complextypes - USING org.apache.spark.sql.parquet - OPTIONS ( - path '${partitionedTableDirWithKeyAndComplexTypes.toURI}' - ) - """) - - sql( s""" - CREATE TEMPORARY VIEW partitioned_parquet_with_complextypes - USING org.apache.spark.sql.parquet - OPTIONS ( - path '${partitionedTableDirWithComplexTypes.toURI}' - ) - """) - } - - test("SPARK-6016 make sure to use the latest footers") { - sql("drop table if exists spark_6016_fix") - - // Create a DataFrame with two partitions. So, the created table will have two parquet files. - val df1 = (1 to 10).map(Tuple1(_)).toDF("a").coalesce(2) - df1.write.mode(SaveMode.Overwrite).format("parquet").saveAsTable("spark_6016_fix") - checkAnswer( - sql("select * from spark_6016_fix"), - (1 to 10).map(i => Row(i)) - ) - - // Create a DataFrame with four partitions. So, the created table will have four parquet files. - val df2 = (1 to 10).map(Tuple1(_)).toDF("b").coalesce(4) - df2.write.mode(SaveMode.Overwrite).format("parquet").saveAsTable("spark_6016_fix") - // For the bug of SPARK-6016, we are caching two outdated footers for df1. Then, - // since the new table has four parquet files, we are trying to read new footers from two files - // and then merge metadata in footers of these four (two outdated ones and two latest one), - // which will cause an error. - checkAnswer( - sql("select * from spark_6016_fix"), - (1 to 10).map(i => Row(i)) - ) - - sql("drop table spark_6016_fix") - } - - test("SPARK-8811: compatibility with array of struct in Hive") { - withTempPath { dir => - withTable("array_of_struct") { - val conf = Seq( - HiveUtils.CONVERT_METASTORE_PARQUET.key -> "false", - SQLConf.PARQUET_BINARY_AS_STRING.key -> "true", - SQLConf.PARQUET_WRITE_LEGACY_FORMAT.key -> "false") - - withSQLConf(conf: _*) { - sql( - s"""CREATE TABLE array_of_struct - |STORED AS PARQUET LOCATION '${dir.toURI}' - |AS SELECT - | '1st' AS a, - | '2nd' AS b, - | ARRAY(NAMED_STRUCT('a', 'val_a', 'b', 'val_b')) AS c - """.stripMargin) - - checkAnswer( - spark.read.parquet(dir.getCanonicalPath), - Row("1st", "2nd", Seq(Row("val_a", "val_b")))) - } - } - } - } - - test("Verify the PARQUET conversion parameter: CONVERT_METASTORE_PARQUET") { - withTempView("single") { - val singleRowDF = Seq((0, "foo")).toDF("key", "value") - singleRowDF.createOrReplaceTempView("single") - - Seq("true", "false").foreach { parquetConversion => - withSQLConf(HiveUtils.CONVERT_METASTORE_PARQUET.key -> parquetConversion) { - val tableName = "test_parquet_ctas" - withTable(tableName) { - sql( - s""" - |CREATE TABLE $tableName STORED AS PARQUET - |AS SELECT tmp.key, tmp.value FROM single tmp - """.stripMargin) - - val df = spark.sql(s"SELECT * FROM $tableName WHERE key=0") - checkAnswer(df, singleRowDF) - - val queryExecution = df.queryExecution - if (parquetConversion == "true") { - queryExecution.analyzed.collectFirst { - case _: LogicalRelation => - }.getOrElse { - fail(s"Expecting the query plan to convert parquet to data sources, " + - s"but got:\n$queryExecution") - } - } else { - queryExecution.analyzed.collectFirst { - case _: HiveTableRelation => - }.getOrElse { - fail(s"Expecting no conversion from parquet to data sources, " + - s"but got:\n$queryExecution") - } - } - } - } - } - } - } - - test("values in arrays and maps stored in parquet are always nullable") { - val df = createDataFrame(Tuple2(Map(2 -> 3), Seq(4, 5, 6)) :: Nil).toDF("m", "a") - val mapType1 = MapType(IntegerType, IntegerType, valueContainsNull = false) - val arrayType1 = ArrayType(IntegerType, containsNull = false) - val expectedSchema1 = - StructType( - StructField("m", mapType1, nullable = true) :: - StructField("a", arrayType1, nullable = true) :: Nil) - assert(df.schema === expectedSchema1) - - withTable("alwaysNullable") { - df.write.format("parquet").saveAsTable("alwaysNullable") - - val mapType2 = MapType(IntegerType, IntegerType, valueContainsNull = true) - val arrayType2 = ArrayType(IntegerType, containsNull = true) - val expectedSchema2 = - StructType( - StructField("m", mapType2, nullable = true) :: - StructField("a", arrayType2, nullable = true) :: Nil) - - assert(table("alwaysNullable").schema === expectedSchema2) - - checkAnswer( - sql("SELECT m, a FROM alwaysNullable"), - Row(Map(2 -> 3), Seq(4, 5, 6))) - } - } - - test("Aggregation attribute names can't contain special chars \" ,;{}()\\n\\t=\"") { - val tempDir = Utils.createTempDir() - val filePath = new File(tempDir, "testParquet").getCanonicalPath - val filePath2 = new File(tempDir, "testParquet2").getCanonicalPath - - val df = Seq(1, 2, 3).map(i => (i, i.toString)).toDF("int", "str") - val df2 = df.as('x).join(df.as('y), $"x.str" === $"y.str").groupBy("y.str").max("y.int") - intercept[Throwable](df2.write.parquet(filePath)) - - val df3 = df2.toDF("str", "max_int") - df3.write.parquet(filePath2) - val df4 = read.parquet(filePath2) - checkAnswer(df4, Row("1", 1) :: Row("2", 2) :: Row("3", 3) :: Nil) - assert(df4.columns === Array("str", "max_int")) - } -} - -/** - * A collection of tests for parquet data with various forms of partitioning. - */ -abstract class ParquetPartitioningTest extends QueryTest with SQLTestUtils with TestHiveSingleton { - import testImplicits._ - - var partitionedTableDir: File = null - var normalTableDir: File = null - var partitionedTableDirWithKey: File = null - var partitionedTableDirWithComplexTypes: File = null - var partitionedTableDirWithKeyAndComplexTypes: File = null - - override def beforeAll(): Unit = { - super.beforeAll() - partitionedTableDir = Utils.createTempDir() - normalTableDir = Utils.createTempDir() - - (1 to 10).foreach { p => - val partDir = new File(partitionedTableDir, s"p=$p") - sparkContext.makeRDD(1 to 10) - .map(i => ParquetData(i, s"part-$p")) - .toDF() - .write.parquet(partDir.getCanonicalPath) - } - - sparkContext - .makeRDD(1 to 10) - .map(i => ParquetData(i, s"part-1")) - .toDF() - .write.parquet(new File(normalTableDir, "normal").getCanonicalPath) - - partitionedTableDirWithKey = Utils.createTempDir() - - (1 to 10).foreach { p => - val partDir = new File(partitionedTableDirWithKey, s"p=$p") - sparkContext.makeRDD(1 to 10) - .map(i => ParquetDataWithKey(p, i, s"part-$p")) - .toDF() - .write.parquet(partDir.getCanonicalPath) - } - - partitionedTableDirWithKeyAndComplexTypes = Utils.createTempDir() - - (1 to 10).foreach { p => - val partDir = new File(partitionedTableDirWithKeyAndComplexTypes, s"p=$p") - sparkContext.makeRDD(1 to 10).map { i => - ParquetDataWithKeyAndComplexTypes( - p, i, s"part-$p", StructContainer(i, f"${i}_string"), 1 to i) - }.toDF().write.parquet(partDir.getCanonicalPath) - } - - partitionedTableDirWithComplexTypes = Utils.createTempDir() - - (1 to 10).foreach { p => - val partDir = new File(partitionedTableDirWithComplexTypes, s"p=$p") - sparkContext.makeRDD(1 to 10).map { i => - ParquetDataWithComplexTypes(i, s"part-$p", StructContainer(i, f"${i}_string"), 1 to i) - }.toDF().write.parquet(partDir.getCanonicalPath) - } - } - - override protected def afterAll(): Unit = { - try { - partitionedTableDir.delete() - normalTableDir.delete() - partitionedTableDirWithKey.delete() - partitionedTableDirWithComplexTypes.delete() - partitionedTableDirWithKeyAndComplexTypes.delete() - } finally { - super.afterAll() - } - } - - /** - * Drop named tables if they exist - * - * @param tableNames tables to drop - */ - def dropTables(tableNames: String*): Unit = { - tableNames.foreach { name => - sql(s"DROP TABLE IF EXISTS $name") - } - } - - Seq( - "partitioned_parquet", - "partitioned_parquet_with_key", - "partitioned_parquet_with_complextypes", - "partitioned_parquet_with_key_and_complextypes").foreach { table => - - test(s"ordering of the partitioning columns $table") { - checkAnswer( - sql(s"SELECT p, stringField FROM $table WHERE p = 1"), - Seq.fill(10)(Row(1, "part-1")) - ) - - checkAnswer( - sql(s"SELECT stringField, p FROM $table WHERE p = 1"), - Seq.fill(10)(Row("part-1", 1)) - ) - } - - test(s"project the partitioning column $table") { - checkAnswer( - sql(s"SELECT p, count(*) FROM $table group by p"), - Row(1, 10) :: - Row(2, 10) :: - Row(3, 10) :: - Row(4, 10) :: - Row(5, 10) :: - Row(6, 10) :: - Row(7, 10) :: - Row(8, 10) :: - Row(9, 10) :: - Row(10, 10) :: Nil - ) - } - - test(s"project partitioning and non-partitioning columns $table") { - checkAnswer( - sql(s"SELECT stringField, p, count(intField) FROM $table GROUP BY p, stringField"), - Row("part-1", 1, 10) :: - Row("part-2", 2, 10) :: - Row("part-3", 3, 10) :: - Row("part-4", 4, 10) :: - Row("part-5", 5, 10) :: - Row("part-6", 6, 10) :: - Row("part-7", 7, 10) :: - Row("part-8", 8, 10) :: - Row("part-9", 9, 10) :: - Row("part-10", 10, 10) :: Nil - ) - } - - test(s"simple count $table") { - checkAnswer( - sql(s"SELECT COUNT(*) FROM $table"), - Row(100)) - } - - test(s"pruned count $table") { - checkAnswer( - sql(s"SELECT COUNT(*) FROM $table WHERE p = 1"), - Row(10)) - } - - test(s"non-existent partition $table") { - checkAnswer( - sql(s"SELECT COUNT(*) FROM $table WHERE p = 1000"), - Row(0)) - } - - test(s"multi-partition pruned count $table") { - checkAnswer( - sql(s"SELECT COUNT(*) FROM $table WHERE p IN (1,2,3)"), - Row(30)) - } - - test(s"non-partition predicates $table") { - checkAnswer( - sql(s"SELECT COUNT(*) FROM $table WHERE intField IN (1,2,3)"), - Row(30)) - } - - test(s"sum $table") { - checkAnswer( - sql(s"SELECT SUM(intField) FROM $table WHERE intField IN (1,2,3) AND p = 1"), - Row(1 + 2 + 3)) - } - - test(s"hive udfs $table") { - checkAnswer( - sql(s"SELECT concat(stringField, stringField) FROM $table"), - sql(s"SELECT stringField FROM $table").rdd.map { - case Row(s: String) => Row(s + s) - }.collect().toSeq) - } - } - - Seq( - "partitioned_parquet_with_key_and_complextypes", - "partitioned_parquet_with_complextypes").foreach { table => - - test(s"SPARK-5775 read struct from $table") { - checkAnswer( - sql( - s""" - |SELECT p, structField.intStructField, structField.stringStructField - |FROM $table WHERE p = 1 - """.stripMargin), - (1 to 10).map(i => Row(1, i, f"${i}_string"))) - } - - test(s"SPARK-5775 read array from $table") { - checkAnswer( - sql(s"SELECT arrayField, p FROM $table WHERE p = 1"), - (1 to 10).map(i => Row((1 to i).toArray, 1))) - } - } - - - test("non-part select(*)") { - checkAnswer( - sql("SELECT COUNT(*) FROM normal_parquet"), - Row(10)) - } -} diff --git a/sql/hive/src/test/scala/org/apache/spark/sql/hive/HiveParquetSourceSuite.scala b/sql/hive/src/test/scala/org/apache/spark/sql/hive/HiveParquetSourceSuite.scala new file mode 100644 index 0000000000..de588768cf --- /dev/null +++ b/sql/hive/src/test/scala/org/apache/spark/sql/hive/HiveParquetSourceSuite.scala @@ -0,0 +1,225 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.spark.sql.hive + +import java.io.File + +import org.apache.spark.sql.{Row, SaveMode} +import org.apache.spark.sql.catalyst.catalog.HiveTableRelation +import org.apache.spark.sql.execution.datasources.LogicalRelation +import org.apache.spark.sql.internal.SQLConf +import org.apache.spark.sql.types._ +import org.apache.spark.util.Utils + +/** + * A suite of tests for the Parquet support through the data sources API. + */ +class HiveParquetSourceSuite extends ParquetPartitioningTest { + import testImplicits._ + import spark._ + + override def beforeAll(): Unit = { + super.beforeAll() + dropTables("partitioned_parquet", + "partitioned_parquet_with_key", + "partitioned_parquet_with_complextypes", + "partitioned_parquet_with_key_and_complextypes", + "normal_parquet") + + sql( + s""" + |CREATE TEMPORARY VIEW partitioned_parquet + |USING org.apache.spark.sql.parquet + |OPTIONS ( + | path '${partitionedTableDir.toURI}' + |) + """.stripMargin) + + sql( + s""" + |CREATE TEMPORARY VIEW partitioned_parquet_with_key + |USING org.apache.spark.sql.parquet + |OPTIONS ( + | path '${partitionedTableDirWithKey.toURI}' + |) + """.stripMargin) + + sql( + s""" + |CREATE TEMPORARY VIEW normal_parquet + |USING org.apache.spark.sql.parquet + |OPTIONS ( + | path '${new File(partitionedTableDir, "p=1").toURI}' + |) + """.stripMargin) + + sql( + s""" + |CREATE TEMPORARY VIEW partitioned_parquet_with_key_and_complextypes + |USING org.apache.spark.sql.parquet + |OPTIONS ( + | path '${partitionedTableDirWithKeyAndComplexTypes.toURI}' + |) + """.stripMargin) + + sql( + s""" + |CREATE TEMPORARY VIEW partitioned_parquet_with_complextypes + |USING org.apache.spark.sql.parquet + |OPTIONS ( + | path '${partitionedTableDirWithComplexTypes.toURI}' + |) + """.stripMargin) + } + + test("SPARK-6016 make sure to use the latest footers") { + val tableName = "spark_6016_fix" + withTable(tableName) { + // Create a DataFrame with two partitions. So, the created table will have two parquet files. + val df1 = (1 to 10).map(Tuple1(_)).toDF("a").coalesce(2) + df1.write.mode(SaveMode.Overwrite).format("parquet").saveAsTable(tableName) + checkAnswer( + sql(s"select * from $tableName"), + (1 to 10).map(i => Row(i)) + ) + + // Create a DataFrame with four partitions. So the created table will have four parquet files. + val df2 = (1 to 10).map(Tuple1(_)).toDF("b").coalesce(4) + df2.write.mode(SaveMode.Overwrite).format("parquet").saveAsTable(tableName) + // For the bug of SPARK-6016, we are caching two outdated footers for df1. Then, + // since the new table has four parquet files, we are trying to read new footers from two + // files and then merge metadata in footers of these four + // (two outdated ones and two latest one), which will cause an error. + checkAnswer( + sql(s"select * from $tableName"), + (1 to 10).map(i => Row(i)) + ) + } + } + + test("SPARK-8811: compatibility with array of struct in Hive") { + withTempPath { dir => + withTable("array_of_struct") { + val conf = Seq( + HiveUtils.CONVERT_METASTORE_PARQUET.key -> "false", + SQLConf.PARQUET_BINARY_AS_STRING.key -> "true", + SQLConf.PARQUET_WRITE_LEGACY_FORMAT.key -> "false") + + withSQLConf(conf: _*) { + sql( + s"""CREATE TABLE array_of_struct + |STORED AS PARQUET LOCATION '${dir.toURI}' + |AS SELECT + | '1st' AS a, + | '2nd' AS b, + | ARRAY(NAMED_STRUCT('a', 'val_a', 'b', 'val_b')) AS c + """.stripMargin) + + checkAnswer( + spark.read.parquet(dir.getCanonicalPath), + Row("1st", "2nd", Seq(Row("val_a", "val_b")))) + } + } + } + } + + test("Verify the PARQUET conversion parameter: CONVERT_METASTORE_PARQUET") { + withTempView("single") { + val singleRowDF = Seq((0, "foo")).toDF("key", "value") + singleRowDF.createOrReplaceTempView("single") + + Seq("true", "false").foreach { parquetConversion => + withSQLConf(HiveUtils.CONVERT_METASTORE_PARQUET.key -> parquetConversion) { + val tableName = "test_parquet_ctas" + withTable(tableName) { + sql( + s""" + |CREATE TABLE $tableName STORED AS PARQUET + |AS SELECT tmp.key, tmp.value FROM single tmp + """.stripMargin) + + val df = spark.sql(s"SELECT * FROM $tableName WHERE key=0") + checkAnswer(df, singleRowDF) + + val queryExecution = df.queryExecution + if (parquetConversion == "true") { + queryExecution.analyzed.collectFirst { + case _: LogicalRelation => + }.getOrElse { + fail(s"Expecting the query plan to convert parquet to data sources, " + + s"but got:\n$queryExecution") + } + } else { + queryExecution.analyzed.collectFirst { + case _: HiveTableRelation => + }.getOrElse { + fail(s"Expecting no conversion from parquet to data sources, " + + s"but got:\n$queryExecution") + } + } + } + } + } + } + } + + test("values in arrays and maps stored in parquet are always nullable") { + val df = createDataFrame(Tuple2(Map(2 -> 3), Seq(4, 5, 6)) :: Nil).toDF("m", "a") + val mapType1 = MapType(IntegerType, IntegerType, valueContainsNull = false) + val arrayType1 = ArrayType(IntegerType, containsNull = false) + val expectedSchema1 = + StructType( + StructField("m", mapType1, nullable = true) :: + StructField("a", arrayType1, nullable = true) :: Nil) + assert(df.schema === expectedSchema1) + + withTable("alwaysNullable") { + df.write.format("parquet").saveAsTable("alwaysNullable") + + val mapType2 = MapType(IntegerType, IntegerType, valueContainsNull = true) + val arrayType2 = ArrayType(IntegerType, containsNull = true) + val expectedSchema2 = + StructType( + StructField("m", mapType2, nullable = true) :: + StructField("a", arrayType2, nullable = true) :: Nil) + + assert(table("alwaysNullable").schema === expectedSchema2) + + checkAnswer( + sql("SELECT m, a FROM alwaysNullable"), + Row(Map(2 -> 3), Seq(4, 5, 6))) + } + } + + test("Aggregation attribute names can't contain special chars \" ,;{}()\\n\\t=\"") { + withTempDir { tempDir => + val filePath = new File(tempDir, "testParquet").getCanonicalPath + val filePath2 = new File(tempDir, "testParquet2").getCanonicalPath + + val df = Seq(1, 2, 3).map(i => (i, i.toString)).toDF("int", "str") + val df2 = df.as('x).join(df.as('y), $"x.str" === $"y.str").groupBy("y.str").max("y.int") + intercept[Throwable](df2.write.parquet(filePath)) + + val df3 = df2.toDF("str", "max_int") + df3.write.parquet(filePath2) + val df4 = read.parquet(filePath2) + checkAnswer(df4, Row("1", 1) :: Row("2", 2) :: Row("3", 3) :: Nil) + assert(df4.columns === Array("str", "max_int")) + } + } +} diff --git a/sql/hive/src/test/scala/org/apache/spark/sql/hive/ParquetPartitioningTest.scala b/sql/hive/src/test/scala/org/apache/spark/sql/hive/ParquetPartitioningTest.scala new file mode 100644 index 0000000000..2ae3cf4b38 --- /dev/null +++ b/sql/hive/src/test/scala/org/apache/spark/sql/hive/ParquetPartitioningTest.scala @@ -0,0 +1,252 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.spark.sql.hive + +import java.io.File + +import org.apache.spark.sql._ +import org.apache.spark.sql.hive.test.TestHiveSingleton +import org.apache.spark.sql.test.SQLTestUtils +import org.apache.spark.util.Utils + +// The data where the partitioning key exists only in the directory structure. +case class ParquetData(intField: Int, stringField: String) +// The data that also includes the partitioning key +case class ParquetDataWithKey(p: Int, intField: Int, stringField: String) + +case class StructContainer(intStructField: Int, stringStructField: String) + +case class ParquetDataWithComplexTypes( + intField: Int, + stringField: String, + structField: StructContainer, + arrayField: Seq[Int]) + +case class ParquetDataWithKeyAndComplexTypes( + p: Int, + intField: Int, + stringField: String, + structField: StructContainer, + arrayField: Seq[Int]) + +/** + * A collection of tests for parquet data with various forms of partitioning. + */ +abstract class ParquetPartitioningTest extends QueryTest with SQLTestUtils with TestHiveSingleton { + import testImplicits._ + + var partitionedTableDir: File = null + var normalTableDir: File = null + var partitionedTableDirWithKey: File = null + var partitionedTableDirWithComplexTypes: File = null + var partitionedTableDirWithKeyAndComplexTypes: File = null + + override def beforeAll(): Unit = { + super.beforeAll() + partitionedTableDir = Utils.createTempDir() + normalTableDir = Utils.createTempDir() + + (1 to 10).foreach { p => + val partDir = new File(partitionedTableDir, s"p=$p") + sparkContext.makeRDD(1 to 10) + .map(i => ParquetData(i, s"part-$p")) + .toDF() + .write.parquet(partDir.getCanonicalPath) + } + + sparkContext + .makeRDD(1 to 10) + .map(i => ParquetData(i, s"part-1")) + .toDF() + .write.parquet(new File(normalTableDir, "normal").getCanonicalPath) + + partitionedTableDirWithKey = Utils.createTempDir() + + (1 to 10).foreach { p => + val partDir = new File(partitionedTableDirWithKey, s"p=$p") + sparkContext.makeRDD(1 to 10) + .map(i => ParquetDataWithKey(p, i, s"part-$p")) + .toDF() + .write.parquet(partDir.getCanonicalPath) + } + + partitionedTableDirWithKeyAndComplexTypes = Utils.createTempDir() + + (1 to 10).foreach { p => + val partDir = new File(partitionedTableDirWithKeyAndComplexTypes, s"p=$p") + sparkContext.makeRDD(1 to 10).map { i => + ParquetDataWithKeyAndComplexTypes( + p, i, s"part-$p", StructContainer(i, f"${i}_string"), 1 to i) + }.toDF().write.parquet(partDir.getCanonicalPath) + } + + partitionedTableDirWithComplexTypes = Utils.createTempDir() + + (1 to 10).foreach { p => + val partDir = new File(partitionedTableDirWithComplexTypes, s"p=$p") + sparkContext.makeRDD(1 to 10).map { i => + ParquetDataWithComplexTypes(i, s"part-$p", StructContainer(i, f"${i}_string"), 1 to i) + }.toDF().write.parquet(partDir.getCanonicalPath) + } + } + + override protected def afterAll(): Unit = { + try { + partitionedTableDir.delete() + normalTableDir.delete() + partitionedTableDirWithKey.delete() + partitionedTableDirWithComplexTypes.delete() + partitionedTableDirWithKeyAndComplexTypes.delete() + } finally { + super.afterAll() + } + } + + /** + * Drop named tables if they exist + * + * @param tableNames tables to drop + */ + def dropTables(tableNames: String*): Unit = { + tableNames.foreach { name => + sql(s"DROP TABLE IF EXISTS $name") + } + } + + Seq( + "partitioned_parquet", + "partitioned_parquet_with_key", + "partitioned_parquet_with_complextypes", + "partitioned_parquet_with_key_and_complextypes").foreach { table => + + test(s"ordering of the partitioning columns $table") { + checkAnswer( + sql(s"SELECT p, stringField FROM $table WHERE p = 1"), + Seq.fill(10)(Row(1, "part-1")) + ) + + checkAnswer( + sql(s"SELECT stringField, p FROM $table WHERE p = 1"), + Seq.fill(10)(Row("part-1", 1)) + ) + } + + test(s"project the partitioning column $table") { + checkAnswer( + sql(s"SELECT p, count(*) FROM $table group by p"), + Row(1, 10) :: + Row(2, 10) :: + Row(3, 10) :: + Row(4, 10) :: + Row(5, 10) :: + Row(6, 10) :: + Row(7, 10) :: + Row(8, 10) :: + Row(9, 10) :: + Row(10, 10) :: Nil + ) + } + + test(s"project partitioning and non-partitioning columns $table") { + checkAnswer( + sql(s"SELECT stringField, p, count(intField) FROM $table GROUP BY p, stringField"), + Row("part-1", 1, 10) :: + Row("part-2", 2, 10) :: + Row("part-3", 3, 10) :: + Row("part-4", 4, 10) :: + Row("part-5", 5, 10) :: + Row("part-6", 6, 10) :: + Row("part-7", 7, 10) :: + Row("part-8", 8, 10) :: + Row("part-9", 9, 10) :: + Row("part-10", 10, 10) :: Nil + ) + } + + test(s"simple count $table") { + checkAnswer( + sql(s"SELECT COUNT(*) FROM $table"), + Row(100)) + } + + test(s"pruned count $table") { + checkAnswer( + sql(s"SELECT COUNT(*) FROM $table WHERE p = 1"), + Row(10)) + } + + test(s"non-existent partition $table") { + checkAnswer( + sql(s"SELECT COUNT(*) FROM $table WHERE p = 1000"), + Row(0)) + } + + test(s"multi-partition pruned count $table") { + checkAnswer( + sql(s"SELECT COUNT(*) FROM $table WHERE p IN (1,2,3)"), + Row(30)) + } + + test(s"non-partition predicates $table") { + checkAnswer( + sql(s"SELECT COUNT(*) FROM $table WHERE intField IN (1,2,3)"), + Row(30)) + } + + test(s"sum $table") { + checkAnswer( + sql(s"SELECT SUM(intField) FROM $table WHERE intField IN (1,2,3) AND p = 1"), + Row(1 + 2 + 3)) + } + + test(s"hive udfs $table") { + checkAnswer( + sql(s"SELECT concat(stringField, stringField) FROM $table"), + sql(s"SELECT stringField FROM $table").rdd.map { + case Row(s: String) => Row(s + s) + }.collect().toSeq) + } + } + + Seq( + "partitioned_parquet_with_key_and_complextypes", + "partitioned_parquet_with_complextypes").foreach { table => + + test(s"SPARK-5775 read struct from $table") { + checkAnswer( + sql( + s""" + |SELECT p, structField.intStructField, structField.stringStructField + |FROM $table WHERE p = 1 + """.stripMargin), + (1 to 10).map(i => Row(1, i, f"${i}_string"))) + } + + test(s"SPARK-5775 read array from $table") { + checkAnswer( + sql(s"SELECT arrayField, p FROM $table WHERE p = 1"), + (1 to 10).map(i => Row((1 to i).toArray, 1))) + } + } + + test("non-part select(*)") { + checkAnswer( + sql("SELECT COUNT(*) FROM normal_parquet"), + Row(10)) + } +}