[SPARK-7270] [SQL] Consider dynamic partition when inserting into hive table

JIRA: https://issues.apache.org/jira/browse/SPARK-7270

Author: Liang-Chi Hsieh <viirya@gmail.com>

Closes #5864 from viirya/dyn_partition_insert and squashes the following commits:

b5627df [Liang-Chi Hsieh] For comments.
3b21e4b [Liang-Chi Hsieh] Merge remote-tracking branch 'upstream/master' into dyn_partition_insert
8a4352d [Liang-Chi Hsieh] Consider dynamic partition when inserting into hive table.

(cherry picked from commit 126d7235de)
Signed-off-by: Michael Armbrust <michael@databricks.com>
This commit is contained in:
Liang-Chi Hsieh 2015-05-22 15:39:58 -07:00 committed by Michael Armbrust
parent e18d623d93
commit d6cb044630
2 changed files with 33 additions and 5 deletions

View file

@ -516,17 +516,19 @@ private[hive] class HiveMetastoreCatalog(val client: ClientInterface, hive: Hive
def castChildOutput(p: InsertIntoTable, table: MetastoreRelation, child: LogicalPlan) def castChildOutput(p: InsertIntoTable, table: MetastoreRelation, child: LogicalPlan)
: LogicalPlan = { : LogicalPlan = {
val childOutputDataTypes = child.output.map(_.dataType) val childOutputDataTypes = child.output.map(_.dataType)
val numDynamicPartitions = p.partition.values.count(_.isEmpty)
val tableOutputDataTypes = val tableOutputDataTypes =
(table.attributes ++ table.partitionKeys).take(child.output.length).map(_.dataType) (table.attributes ++ table.partitionKeys.takeRight(numDynamicPartitions))
.take(child.output.length).map(_.dataType)
if (childOutputDataTypes == tableOutputDataTypes) { if (childOutputDataTypes == tableOutputDataTypes) {
p InsertIntoHiveTable(table, p.partition, p.child, p.overwrite, p.ifNotExists)
} else if (childOutputDataTypes.size == tableOutputDataTypes.size && } else if (childOutputDataTypes.size == tableOutputDataTypes.size &&
childOutputDataTypes.zip(tableOutputDataTypes) childOutputDataTypes.zip(tableOutputDataTypes)
.forall { case (left, right) => left.sameType(right) }) { .forall { case (left, right) => left.sameType(right) }) {
// If both types ignoring nullability of ArrayType, MapType, StructType are the same, // If both types ignoring nullability of ArrayType, MapType, StructType are the same,
// use InsertIntoHiveTable instead of InsertIntoTable. // use InsertIntoHiveTable instead of InsertIntoTable.
InsertIntoHiveTable(p.table, p.partition, p.child, p.overwrite, p.ifNotExists) InsertIntoHiveTable(table, p.partition, p.child, p.overwrite, p.ifNotExists)
} else { } else {
// Only do the casting when child output data types differ from table output data types. // Only do the casting when child output data types differ from table output data types.
val castedChildOutput = child.output.zip(table.output).map { val castedChildOutput = child.output.zip(table.output).map {
@ -561,7 +563,7 @@ private[hive] class HiveMetastoreCatalog(val client: ClientInterface, hive: Hive
* because Hive table doesn't have nullability for ARRAY, MAP, STRUCT types. * because Hive table doesn't have nullability for ARRAY, MAP, STRUCT types.
*/ */
private[hive] case class InsertIntoHiveTable( private[hive] case class InsertIntoHiveTable(
table: LogicalPlan, table: MetastoreRelation,
partition: Map[String, Option[String]], partition: Map[String, Option[String]],
child: LogicalPlan, child: LogicalPlan,
overwrite: Boolean, overwrite: Boolean,
@ -571,7 +573,13 @@ private[hive] case class InsertIntoHiveTable(
override def children: Seq[LogicalPlan] = child :: Nil override def children: Seq[LogicalPlan] = child :: Nil
override def output: Seq[Attribute] = child.output override def output: Seq[Attribute] = child.output
override lazy val resolved: Boolean = childrenResolved && child.output.zip(table.output).forall { val numDynamicPartitions = partition.values.count(_.isEmpty)
// This is the expected schema of the table prepared to be inserted into,
// including dynamic partition columns.
val tableOutput = table.attributes ++ table.partitionKeys.takeRight(numDynamicPartitions)
override lazy val resolved: Boolean = childrenResolved && child.output.zip(tableOutput).forall {
case (childAttr, tableAttr) => childAttr.dataType.sameType(tableAttr.dataType) case (childAttr, tableAttr) => childAttr.dataType.sameType(tableAttr.dataType)
} }
} }

View file

@ -28,6 +28,7 @@ import org.apache.hadoop.hive.conf.HiveConf.ConfVars
import org.apache.spark.{SparkFiles, SparkException} import org.apache.spark.{SparkFiles, SparkException}
import org.apache.spark.sql.{AnalysisException, DataFrame, Row} import org.apache.spark.sql.{AnalysisException, DataFrame, Row}
import org.apache.spark.sql.catalyst.expressions.Cast
import org.apache.spark.sql.catalyst.plans.logical.Project import org.apache.spark.sql.catalyst.plans.logical.Project
import org.apache.spark.sql.hive._ import org.apache.spark.sql.hive._
import org.apache.spark.sql.hive.test.TestHive import org.apache.spark.sql.hive.test.TestHive
@ -415,6 +416,25 @@ class HiveQuerySuite extends HiveComparisonTest with BeforeAndAfter {
|SELECT * FROM createdtable; |SELECT * FROM createdtable;
""".stripMargin) """.stripMargin)
test("SPARK-7270: consider dynamic partition when comparing table output") {
sql(s"CREATE TABLE test_partition (a STRING) PARTITIONED BY (b BIGINT, c STRING)")
sql(s"CREATE TABLE ptest (a STRING, b BIGINT, c STRING)")
val analyzedPlan = sql(
"""
|INSERT OVERWRITE table test_partition PARTITION (b=1, c)
|SELECT 'a', 'c' from ptest
""".stripMargin).queryExecution.analyzed
assertResult(false, "Incorrect cast detected\n" + analyzedPlan) {
var hasCast = false
analyzedPlan.collect {
case p: Project => p.transformExpressionsUp { case c: Cast => hasCast = true; c }
}
hasCast
}
}
createQueryTest("transform", createQueryTest("transform",
"SELECT TRANSFORM (key) USING 'cat' AS (tKey) FROM src") "SELECT TRANSFORM (key) USING 'cat' AS (tKey) FROM src")