[SPARK-6023][SQL] ParquetConversions fails to replace the destination MetastoreRelation of an InsertIntoTable node to ParquetRelation2
JIRA: https://issues.apache.org/jira/browse/SPARK-6023 Author: Yin Huai <yhuai@databricks.com> Closes #4782 from yhuai/parquetInsertInto and squashes the following commits: ae7e806 [Yin Huai] Convert MetastoreRelation in InsertIntoTable and InsertIntoHiveTable. ba543cd [Yin Huai] More tests. 50b6d0f [Yin Huai] Update error messages. 346780c [Yin Huai] Failed test.
This commit is contained in:
parent
51a6f9097b
commit
f02394d064
|
@ -440,6 +440,17 @@ private[hive] class HiveMetastoreCatalog(hive: HiveContext) extends Catalog with
|
|||
val attributedRewrites = relation.output.zip(parquetRelation.output)
|
||||
(relation, parquetRelation, attributedRewrites)
|
||||
|
||||
// Write path
|
||||
case InsertIntoHiveTable(relation: MetastoreRelation, _, _, _)
|
||||
// Inserting into partitioned table is not supported in Parquet data source (yet).
|
||||
if !relation.hiveQlTable.isPartitioned &&
|
||||
hive.convertMetastoreParquet &&
|
||||
hive.conf.parquetUseDataSourceApi &&
|
||||
relation.tableDesc.getSerdeClassName.toLowerCase.contains("parquet") =>
|
||||
val parquetRelation = convertToParquetRelation(relation)
|
||||
val attributedRewrites = relation.output.zip(parquetRelation.output)
|
||||
(relation, parquetRelation, attributedRewrites)
|
||||
|
||||
// Read path
|
||||
case p @ PhysicalOperation(_, _, relation: MetastoreRelation)
|
||||
if hive.convertMetastoreParquet &&
|
||||
|
@ -464,6 +475,16 @@ private[hive] class HiveMetastoreCatalog(hive: HiveContext) extends Catalog with
|
|||
|
||||
withAlias
|
||||
}
|
||||
case InsertIntoTable(r: MetastoreRelation, partition, child, overwrite)
|
||||
if relationMap.contains(r) => {
|
||||
val parquetRelation = relationMap(r)
|
||||
InsertIntoTable(parquetRelation, partition, child, overwrite)
|
||||
}
|
||||
case InsertIntoHiveTable(r: MetastoreRelation, partition, child, overwrite)
|
||||
if relationMap.contains(r) => {
|
||||
val parquetRelation = relationMap(r)
|
||||
InsertIntoTable(parquetRelation, partition, child, overwrite)
|
||||
}
|
||||
case other => other.transformExpressions {
|
||||
case a: Attribute if a.resolved => attributedRewrites.getOrElse(a, a)
|
||||
}
|
||||
|
|
|
@ -24,11 +24,11 @@ import org.scalatest.BeforeAndAfterAll
|
|||
|
||||
import org.apache.spark.sql.{SQLConf, QueryTest}
|
||||
import org.apache.spark.sql.catalyst.expressions.Row
|
||||
import org.apache.spark.sql.execution.PhysicalRDD
|
||||
import org.apache.spark.sql.hive.execution.HiveTableScan
|
||||
import org.apache.spark.sql.execution.{ExecutedCommand, PhysicalRDD}
|
||||
import org.apache.spark.sql.hive.execution.{InsertIntoHiveTable, HiveTableScan}
|
||||
import org.apache.spark.sql.hive.test.TestHive._
|
||||
import org.apache.spark.sql.hive.test.TestHive.implicits._
|
||||
import org.apache.spark.sql.sources.LogicalRelation
|
||||
import org.apache.spark.sql.sources.{InsertIntoDataSource, LogicalRelation}
|
||||
|
||||
// The data where the partitioning key exists only in the directory structure.
|
||||
case class ParquetData(intField: Int, stringField: String)
|
||||
|
@ -93,6 +93,11 @@ class ParquetMetastoreSuiteBase extends ParquetPartitioningTest {
|
|||
sql(s"ALTER TABLE partitioned_parquet_with_key ADD PARTITION (p=$p)")
|
||||
}
|
||||
|
||||
val rdd1 = sparkContext.parallelize((1 to 10).map(i => s"""{"a":$i, "b":"str${i}"}"""))
|
||||
jsonRDD(rdd1).registerTempTable("jt")
|
||||
val rdd2 = sparkContext.parallelize((1 to 10).map(i => s"""{"a":[$i, null]}"""))
|
||||
jsonRDD(rdd2).registerTempTable("jt_array")
|
||||
|
||||
setConf("spark.sql.hive.convertMetastoreParquet", "true")
|
||||
}
|
||||
|
||||
|
@ -100,6 +105,8 @@ class ParquetMetastoreSuiteBase extends ParquetPartitioningTest {
|
|||
sql("DROP TABLE partitioned_parquet")
|
||||
sql("DROP TABLE partitioned_parquet_with_key")
|
||||
sql("DROP TABLE normal_parquet")
|
||||
sql("DROP TABLE IF EXISTS jt")
|
||||
sql("DROP TABLE IF EXISTS jt_array")
|
||||
setConf("spark.sql.hive.convertMetastoreParquet", "false")
|
||||
}
|
||||
|
||||
|
@ -122,9 +129,6 @@ class ParquetDataSourceOnMetastoreSuite extends ParquetMetastoreSuiteBase {
|
|||
override def beforeAll(): Unit = {
|
||||
super.beforeAll()
|
||||
|
||||
val rdd = sparkContext.parallelize((1 to 10).map(i => s"""{"a":$i, "b":"str${i}"}"""))
|
||||
jsonRDD(rdd).registerTempTable("jt")
|
||||
|
||||
sql(
|
||||
"""
|
||||
|create table test_parquet
|
||||
|
@ -143,7 +147,6 @@ class ParquetDataSourceOnMetastoreSuite extends ParquetMetastoreSuiteBase {
|
|||
|
||||
override def afterAll(): Unit = {
|
||||
super.afterAll()
|
||||
sql("DROP TABLE IF EXISTS jt")
|
||||
sql("DROP TABLE IF EXISTS test_parquet")
|
||||
|
||||
setConf(SQLConf.PARQUET_USE_DATA_SOURCE_API, originalConf.toString)
|
||||
|
@ -238,6 +241,70 @@ class ParquetDataSourceOnMetastoreSuite extends ParquetMetastoreSuiteBase {
|
|||
|
||||
sql("DROP TABLE IF EXISTS test_parquet_ctas")
|
||||
}
|
||||
|
||||
test("MetastoreRelation in InsertIntoTable will be converted") {
|
||||
sql(
|
||||
"""
|
||||
|create table test_insert_parquet
|
||||
|(
|
||||
| intField INT
|
||||
|)
|
||||
|ROW FORMAT SERDE 'org.apache.hadoop.hive.ql.io.parquet.serde.ParquetHiveSerDe'
|
||||
|STORED AS
|
||||
| INPUTFORMAT 'org.apache.hadoop.hive.ql.io.parquet.MapredParquetInputFormat'
|
||||
| OUTPUTFORMAT 'org.apache.hadoop.hive.ql.io.parquet.MapredParquetOutputFormat'
|
||||
""".stripMargin)
|
||||
|
||||
val df = sql("INSERT INTO TABLE test_insert_parquet SELECT a FROM jt")
|
||||
df.queryExecution.executedPlan match {
|
||||
case ExecutedCommand(
|
||||
InsertIntoDataSource(
|
||||
LogicalRelation(r: ParquetRelation2), query, overwrite)) => // OK
|
||||
case o => fail("test_insert_parquet should be converted to a " +
|
||||
s"${classOf[ParquetRelation2].getCanonicalName} and " +
|
||||
s"${classOf[InsertIntoDataSource].getCanonicalName} is expcted as the SparkPlan." +
|
||||
s"However, found a ${o.toString} ")
|
||||
}
|
||||
|
||||
checkAnswer(
|
||||
sql("SELECT intField FROM test_insert_parquet WHERE test_insert_parquet.intField > 5"),
|
||||
sql("SELECT a FROM jt WHERE jt.a > 5").collect()
|
||||
)
|
||||
|
||||
sql("DROP TABLE IF EXISTS test_insert_parquet")
|
||||
}
|
||||
|
||||
test("MetastoreRelation in InsertIntoHiveTable will be converted") {
|
||||
sql(
|
||||
"""
|
||||
|create table test_insert_parquet
|
||||
|(
|
||||
| int_array array<int>
|
||||
|)
|
||||
|ROW FORMAT SERDE 'org.apache.hadoop.hive.ql.io.parquet.serde.ParquetHiveSerDe'
|
||||
|STORED AS
|
||||
| INPUTFORMAT 'org.apache.hadoop.hive.ql.io.parquet.MapredParquetInputFormat'
|
||||
| OUTPUTFORMAT 'org.apache.hadoop.hive.ql.io.parquet.MapredParquetOutputFormat'
|
||||
""".stripMargin)
|
||||
|
||||
val df = sql("INSERT INTO TABLE test_insert_parquet SELECT a FROM jt_array")
|
||||
df.queryExecution.executedPlan match {
|
||||
case ExecutedCommand(
|
||||
InsertIntoDataSource(
|
||||
LogicalRelation(r: ParquetRelation2), query, overwrite)) => // OK
|
||||
case o => fail("test_insert_parquet should be converted to a " +
|
||||
s"${classOf[ParquetRelation2].getCanonicalName} and " +
|
||||
s"${classOf[InsertIntoDataSource].getCanonicalName} is expcted as the SparkPlan." +
|
||||
s"However, found a ${o.toString} ")
|
||||
}
|
||||
|
||||
checkAnswer(
|
||||
sql("SELECT int_array FROM test_insert_parquet"),
|
||||
sql("SELECT a FROM jt_array").collect()
|
||||
)
|
||||
|
||||
sql("DROP TABLE IF EXISTS test_insert_parquet")
|
||||
}
|
||||
}
|
||||
|
||||
class ParquetDataSourceOffMetastoreSuite extends ParquetMetastoreSuiteBase {
|
||||
|
@ -252,6 +319,63 @@ class ParquetDataSourceOffMetastoreSuite extends ParquetMetastoreSuiteBase {
|
|||
super.afterAll()
|
||||
setConf(SQLConf.PARQUET_USE_DATA_SOURCE_API, originalConf.toString)
|
||||
}
|
||||
|
||||
test("MetastoreRelation in InsertIntoTable will not be converted") {
|
||||
sql(
|
||||
"""
|
||||
|create table test_insert_parquet
|
||||
|(
|
||||
| intField INT
|
||||
|)
|
||||
|ROW FORMAT SERDE 'org.apache.hadoop.hive.ql.io.parquet.serde.ParquetHiveSerDe'
|
||||
|STORED AS
|
||||
| INPUTFORMAT 'org.apache.hadoop.hive.ql.io.parquet.MapredParquetInputFormat'
|
||||
| OUTPUTFORMAT 'org.apache.hadoop.hive.ql.io.parquet.MapredParquetOutputFormat'
|
||||
""".stripMargin)
|
||||
|
||||
val df = sql("INSERT INTO TABLE test_insert_parquet SELECT a FROM jt")
|
||||
df.queryExecution.executedPlan match {
|
||||
case insert: InsertIntoHiveTable => // OK
|
||||
case o => fail(s"The SparkPlan should be ${classOf[InsertIntoHiveTable].getCanonicalName}. " +
|
||||
s"However, found ${o.toString}.")
|
||||
}
|
||||
|
||||
checkAnswer(
|
||||
sql("SELECT intField FROM test_insert_parquet WHERE test_insert_parquet.intField > 5"),
|
||||
sql("SELECT a FROM jt WHERE jt.a > 5").collect()
|
||||
)
|
||||
|
||||
sql("DROP TABLE IF EXISTS test_insert_parquet")
|
||||
}
|
||||
|
||||
// TODO: enable it after the fix of SPARK-5950.
|
||||
ignore("MetastoreRelation in InsertIntoHiveTable will not be converted") {
|
||||
sql(
|
||||
"""
|
||||
|create table test_insert_parquet
|
||||
|(
|
||||
| int_array array<int>
|
||||
|)
|
||||
|ROW FORMAT SERDE 'org.apache.hadoop.hive.ql.io.parquet.serde.ParquetHiveSerDe'
|
||||
|STORED AS
|
||||
| INPUTFORMAT 'org.apache.hadoop.hive.ql.io.parquet.MapredParquetInputFormat'
|
||||
| OUTPUTFORMAT 'org.apache.hadoop.hive.ql.io.parquet.MapredParquetOutputFormat'
|
||||
""".stripMargin)
|
||||
|
||||
val df = sql("INSERT INTO TABLE test_insert_parquet SELECT a FROM jt_array")
|
||||
df.queryExecution.executedPlan match {
|
||||
case insert: InsertIntoHiveTable => // OK
|
||||
case o => fail(s"The SparkPlan should be ${classOf[InsertIntoHiveTable].getCanonicalName}. " +
|
||||
s"However, found ${o.toString}.")
|
||||
}
|
||||
|
||||
checkAnswer(
|
||||
sql("SELECT int_array FROM test_insert_parquet"),
|
||||
sql("SELECT a FROM jt_array").collect()
|
||||
)
|
||||
|
||||
sql("DROP TABLE IF EXISTS test_insert_parquet")
|
||||
}
|
||||
}
|
||||
|
||||
/**
|
||||
|
|
Loading…
Reference in a new issue