[SPARK-25271][SQL] Hive ctas commands should use data source if it is convertible

## What changes were proposed in this pull request?

In Spark 2.3.0 and previous versions, Hive CTAS command will convert to use data source to write data into the table when the table is convertible. This behavior is controlled by the configs like HiveUtils.CONVERT_METASTORE_ORC and HiveUtils.CONVERT_METASTORE_PARQUET.

In 2.3.1, we drop this optimization by mistake in the PR [SPARK-22977](https://github.com/apache/spark/pull/20521/files#r217254430). Since that Hive CTAS command only uses Hive Serde to write data.

This patch adds this optimization back to Hive CTAS command. This patch adds OptimizedCreateHiveTableAsSelectCommand which uses data source to write data.

## How was this patch tested?

Added test.

Closes #22514 from viirya/SPARK-25271-2.

Authored-by: Liang-Chi Hsieh <viirya@gmail.com>
Signed-off-by: Wenchen Fan <wenchen@databricks.com>
This commit is contained in:
Liang-Chi Hsieh 2018-12-20 10:47:24 +08:00 committed by Wenchen Fan
parent 61c443acd2
commit 5ad03607d1
8 changed files with 230 additions and 91 deletions

View file

@ -820,6 +820,14 @@ object DDLUtils {
table.provider.isDefined && table.provider.get.toLowerCase(Locale.ROOT) != HIVE_PROVIDER
}
def readHiveTable(table: CatalogTable): HiveTableRelation = {
HiveTableRelation(
table,
// Hive table columns are always nullable.
table.dataSchema.asNullable.toAttributes,
table.partitionSchema.asNullable.toAttributes)
}
/**
* Throws a standard error for actions that require partitionProvider = hive.
*/

View file

@ -244,27 +244,19 @@ class FindDataSourceTable(sparkSession: SparkSession) extends Rule[LogicalPlan]
})
}
private def readHiveTable(table: CatalogTable): LogicalPlan = {
HiveTableRelation(
table,
// Hive table columns are always nullable.
table.dataSchema.asNullable.toAttributes,
table.partitionSchema.asNullable.toAttributes)
}
override def apply(plan: LogicalPlan): LogicalPlan = plan resolveOperators {
case i @ InsertIntoTable(UnresolvedCatalogRelation(tableMeta), _, _, _, _)
if DDLUtils.isDatasourceTable(tableMeta) =>
i.copy(table = readDataSourceTable(tableMeta))
case i @ InsertIntoTable(UnresolvedCatalogRelation(tableMeta), _, _, _, _) =>
i.copy(table = readHiveTable(tableMeta))
i.copy(table = DDLUtils.readHiveTable(tableMeta))
case UnresolvedCatalogRelation(tableMeta) if DDLUtils.isDatasourceTable(tableMeta) =>
readDataSourceTable(tableMeta)
case UnresolvedCatalogRelation(tableMeta) =>
readHiveTable(tableMeta)
DDLUtils.readHiveTable(tableMeta)
}
}

View file

@ -17,6 +17,8 @@
package org.apache.spark.sql.hive
import java.util.Locale
import scala.util.control.NonFatal
import com.google.common.util.concurrent.Striped
@ -29,6 +31,8 @@ import org.apache.spark.sql.catalyst.{QualifiedTableName, TableIdentifier}
import org.apache.spark.sql.catalyst.catalog._
import org.apache.spark.sql.catalyst.plans.logical._
import org.apache.spark.sql.execution.datasources._
import org.apache.spark.sql.execution.datasources.parquet.{ParquetFileFormat, ParquetOptions}
import org.apache.spark.sql.internal.SQLConf
import org.apache.spark.sql.internal.SQLConf.HiveCaseSensitiveInferenceMode._
import org.apache.spark.sql.types._
@ -113,7 +117,44 @@ private[hive] class HiveMetastoreCatalog(sparkSession: SparkSession) extends Log
}
}
def convertToLogicalRelation(
// Return true for Apache ORC and Hive ORC-related configuration names.
// Note that Spark doesn't support configurations like `hive.merge.orcfile.stripe.level`.
private def isOrcProperty(key: String) =
key.startsWith("orc.") || key.contains(".orc.")
private def isParquetProperty(key: String) =
key.startsWith("parquet.") || key.contains(".parquet.")
def convert(relation: HiveTableRelation): LogicalRelation = {
val serde = relation.tableMeta.storage.serde.getOrElse("").toLowerCase(Locale.ROOT)
// Consider table and storage properties. For properties existing in both sides, storage
// properties will supersede table properties.
if (serde.contains("parquet")) {
val options = relation.tableMeta.properties.filterKeys(isParquetProperty) ++
relation.tableMeta.storage.properties + (ParquetOptions.MERGE_SCHEMA ->
SQLConf.get.getConf(HiveUtils.CONVERT_METASTORE_PARQUET_WITH_SCHEMA_MERGING).toString)
convertToLogicalRelation(relation, options, classOf[ParquetFileFormat], "parquet")
} else {
val options = relation.tableMeta.properties.filterKeys(isOrcProperty) ++
relation.tableMeta.storage.properties
if (SQLConf.get.getConf(SQLConf.ORC_IMPLEMENTATION) == "native") {
convertToLogicalRelation(
relation,
options,
classOf[org.apache.spark.sql.execution.datasources.orc.OrcFileFormat],
"orc")
} else {
convertToLogicalRelation(
relation,
options,
classOf[org.apache.spark.sql.hive.orc.OrcFileFormat],
"orc")
}
}
}
private def convertToLogicalRelation(
relation: HiveTableRelation,
options: Map[String, String],
fileFormatClass: Class[_ <: FileFormat],

View file

@ -31,8 +31,7 @@ import org.apache.spark.sql.catalyst.plans.logical.{InsertIntoDir, InsertIntoTab
import org.apache.spark.sql.catalyst.rules.Rule
import org.apache.spark.sql.execution._
import org.apache.spark.sql.execution.command.{CreateTableCommand, DDLUtils}
import org.apache.spark.sql.execution.datasources.{CreateTable, LogicalRelation}
import org.apache.spark.sql.execution.datasources.parquet.{ParquetFileFormat, ParquetOptions}
import org.apache.spark.sql.execution.datasources.CreateTable
import org.apache.spark.sql.hive.execution._
import org.apache.spark.sql.internal.{HiveSerDe, SQLConf}
@ -181,49 +180,17 @@ case class RelationConversions(
conf: SQLConf,
sessionCatalog: HiveSessionCatalog) extends Rule[LogicalPlan] {
private def isConvertible(relation: HiveTableRelation): Boolean = {
val serde = relation.tableMeta.storage.serde.getOrElse("").toLowerCase(Locale.ROOT)
serde.contains("parquet") && conf.getConf(HiveUtils.CONVERT_METASTORE_PARQUET) ||
serde.contains("orc") && conf.getConf(HiveUtils.CONVERT_METASTORE_ORC)
isConvertible(relation.tableMeta)
}
// Return true for Apache ORC and Hive ORC-related configuration names.
// Note that Spark doesn't support configurations like `hive.merge.orcfile.stripe.level`.
private def isOrcProperty(key: String) =
key.startsWith("orc.") || key.contains(".orc.")
private def isParquetProperty(key: String) =
key.startsWith("parquet.") || key.contains(".parquet.")
private def convert(relation: HiveTableRelation): LogicalRelation = {
val serde = relation.tableMeta.storage.serde.getOrElse("").toLowerCase(Locale.ROOT)
// Consider table and storage properties. For properties existing in both sides, storage
// properties will supersede table properties.
if (serde.contains("parquet")) {
val options = relation.tableMeta.properties.filterKeys(isParquetProperty) ++
relation.tableMeta.storage.properties + (ParquetOptions.MERGE_SCHEMA ->
conf.getConf(HiveUtils.CONVERT_METASTORE_PARQUET_WITH_SCHEMA_MERGING).toString)
sessionCatalog.metastoreCatalog
.convertToLogicalRelation(relation, options, classOf[ParquetFileFormat], "parquet")
} else {
val options = relation.tableMeta.properties.filterKeys(isOrcProperty) ++
relation.tableMeta.storage.properties
if (conf.getConf(SQLConf.ORC_IMPLEMENTATION) == "native") {
sessionCatalog.metastoreCatalog.convertToLogicalRelation(
relation,
options,
classOf[org.apache.spark.sql.execution.datasources.orc.OrcFileFormat],
"orc")
} else {
sessionCatalog.metastoreCatalog.convertToLogicalRelation(
relation,
options,
classOf[org.apache.spark.sql.hive.orc.OrcFileFormat],
"orc")
}
}
private def isConvertible(tableMeta: CatalogTable): Boolean = {
val serde = tableMeta.storage.serde.getOrElse("").toLowerCase(Locale.ROOT)
serde.contains("parquet") && SQLConf.get.getConf(HiveUtils.CONVERT_METASTORE_PARQUET) ||
serde.contains("orc") && SQLConf.get.getConf(HiveUtils.CONVERT_METASTORE_ORC)
}
private val metastoreCatalog = sessionCatalog.metastoreCatalog
override def apply(plan: LogicalPlan): LogicalPlan = {
plan resolveOperators {
// Write path
@ -231,12 +198,21 @@ case class RelationConversions(
// Inserting into partitioned table is not supported in Parquet/Orc data source (yet).
if query.resolved && DDLUtils.isHiveTable(r.tableMeta) &&
!r.isPartitioned && isConvertible(r) =>
InsertIntoTable(convert(r), partition, query, overwrite, ifPartitionNotExists)
InsertIntoTable(metastoreCatalog.convert(r), partition,
query, overwrite, ifPartitionNotExists)
// Read path
case relation: HiveTableRelation
if DDLUtils.isHiveTable(relation.tableMeta) && isConvertible(relation) =>
convert(relation)
metastoreCatalog.convert(relation)
// CTAS
case CreateTable(tableDesc, mode, Some(query))
if DDLUtils.isHiveTable(tableDesc) && tableDesc.partitionColumnNames.isEmpty &&
isConvertible(tableDesc) && SQLConf.get.getConf(HiveUtils.CONVERT_METASTORE_CTAS) =>
DDLUtils.checkDataColNames(tableDesc)
OptimizedCreateHiveTableAsSelectCommand(
tableDesc, query, query.output.map(_.name), mode)
}
}
}

View file

@ -110,6 +110,14 @@ private[spark] object HiveUtils extends Logging {
.booleanConf
.createWithDefault(true)
val CONVERT_METASTORE_CTAS = buildConf("spark.sql.hive.convertMetastoreCtas")
.doc("When set to true, Spark will try to use built-in data source writer " +
"instead of Hive serde in CTAS. This flag is effective only if " +
"`spark.sql.hive.convertMetastoreParquet` or `spark.sql.hive.convertMetastoreOrc` is " +
"enabled respectively for Parquet and ORC formats")
.booleanConf
.createWithDefault(true)
val HIVE_METASTORE_SHARED_PREFIXES = buildConf("spark.sql.hive.metastore.sharedPrefixes")
.doc("A comma separated list of class prefixes that should be loaded using the classloader " +
"that is shared between Spark SQL and a specific version of Hive. An example of classes " +

View file

@ -20,32 +20,26 @@ package org.apache.spark.sql.hive.execution
import scala.util.control.NonFatal
import org.apache.spark.sql.{AnalysisException, Row, SaveMode, SparkSession}
import org.apache.spark.sql.catalyst.catalog.CatalogTable
import org.apache.spark.sql.catalyst.expressions.Attribute
import org.apache.spark.sql.catalyst.catalog.{CatalogTable, SessionCatalog}
import org.apache.spark.sql.catalyst.plans.logical.LogicalPlan
import org.apache.spark.sql.execution.SparkPlan
import org.apache.spark.sql.execution.command.DataWritingCommand
import org.apache.spark.sql.execution.command.{DataWritingCommand, DDLUtils}
import org.apache.spark.sql.execution.datasources.{HadoopFsRelation, InsertIntoHadoopFsRelationCommand, LogicalRelation}
import org.apache.spark.sql.hive.HiveSessionCatalog
trait CreateHiveTableAsSelectBase extends DataWritingCommand {
val tableDesc: CatalogTable
val query: LogicalPlan
val outputColumnNames: Seq[String]
val mode: SaveMode
/**
* Create table and insert the query result into it.
*
* @param tableDesc the Table Describe, which may contain serde, storage handler etc.
* @param query the query whose result will be insert into the new relation
* @param mode SaveMode
*/
case class CreateHiveTableAsSelectCommand(
tableDesc: CatalogTable,
query: LogicalPlan,
outputColumnNames: Seq[String],
mode: SaveMode)
extends DataWritingCommand {
private val tableIdentifier = tableDesc.identifier
protected val tableIdentifier = tableDesc.identifier
override def run(sparkSession: SparkSession, child: SparkPlan): Seq[Row] = {
val catalog = sparkSession.sessionState.catalog
if (catalog.tableExists(tableIdentifier)) {
val tableExists = catalog.tableExists(tableIdentifier)
if (tableExists) {
assert(mode != SaveMode.Overwrite,
s"Expect the table $tableIdentifier has been dropped when the save mode is Overwrite")
@ -57,15 +51,8 @@ case class CreateHiveTableAsSelectCommand(
return Seq.empty
}
// For CTAS, there is no static partition values to insert.
val partition = tableDesc.partitionColumnNames.map(_ -> None).toMap
InsertIntoHiveTable(
tableDesc,
partition,
query,
overwrite = false,
ifPartitionNotExists = false,
outputColumnNames = outputColumnNames).run(sparkSession, child)
val command = getWritingCommand(catalog, tableDesc, tableExists = true)
command.run(sparkSession, child)
} else {
// TODO ideally, we should get the output data ready first and then
// add the relation into catalog, just in case of failure occurs while data
@ -77,15 +64,8 @@ case class CreateHiveTableAsSelectCommand(
try {
// Read back the metadata of the table which was created just now.
val createdTableMeta = catalog.getTableMetadata(tableDesc.identifier)
// For CTAS, there is no static partition values to insert.
val partition = createdTableMeta.partitionColumnNames.map(_ -> None).toMap
InsertIntoHiveTable(
createdTableMeta,
partition,
query,
overwrite = true,
ifPartitionNotExists = false,
outputColumnNames = outputColumnNames).run(sparkSession, child)
val command = getWritingCommand(catalog, createdTableMeta, tableExists = false)
command.run(sparkSession, child)
} catch {
case NonFatal(e) =>
// drop the created table.
@ -97,9 +77,89 @@ case class CreateHiveTableAsSelectCommand(
Seq.empty[Row]
}
// Returns `DataWritingCommand` which actually writes data into the table.
def getWritingCommand(
catalog: SessionCatalog,
tableDesc: CatalogTable,
tableExists: Boolean): DataWritingCommand
override def argString: String = {
s"[Database:${tableDesc.database}, " +
s"TableName: ${tableDesc.identifier.table}, " +
s"InsertIntoHiveTable]"
}
}
/**
* Create table and insert the query result into it.
*
* @param tableDesc the table description, which may contain serde, storage handler etc.
* @param query the query whose result will be insert into the new relation
* @param mode SaveMode
*/
case class CreateHiveTableAsSelectCommand(
tableDesc: CatalogTable,
query: LogicalPlan,
outputColumnNames: Seq[String],
mode: SaveMode)
extends CreateHiveTableAsSelectBase {
override def getWritingCommand(
catalog: SessionCatalog,
tableDesc: CatalogTable,
tableExists: Boolean): DataWritingCommand = {
// For CTAS, there is no static partition values to insert.
val partition = tableDesc.partitionColumnNames.map(_ -> None).toMap
InsertIntoHiveTable(
tableDesc,
partition,
query,
overwrite = if (tableExists) false else true,
ifPartitionNotExists = false,
outputColumnNames = outputColumnNames)
}
}
/**
* Create table and insert the query result into it. This creates Hive table but inserts
* the query result into it by using data source.
*
* @param tableDesc the table description, which may contain serde, storage handler etc.
* @param query the query whose result will be insert into the new relation
* @param mode SaveMode
*/
case class OptimizedCreateHiveTableAsSelectCommand(
tableDesc: CatalogTable,
query: LogicalPlan,
outputColumnNames: Seq[String],
mode: SaveMode)
extends CreateHiveTableAsSelectBase {
override def getWritingCommand(
catalog: SessionCatalog,
tableDesc: CatalogTable,
tableExists: Boolean): DataWritingCommand = {
val metastoreCatalog = catalog.asInstanceOf[HiveSessionCatalog].metastoreCatalog
val hiveTable = DDLUtils.readHiveTable(tableDesc)
val hadoopRelation = metastoreCatalog.convert(hiveTable) match {
case LogicalRelation(t: HadoopFsRelation, _, _, _) => t
case _ => throw new AnalysisException(s"$tableIdentifier should be converted to " +
"HadoopFsRelation.")
}
InsertIntoHadoopFsRelationCommand(
hadoopRelation.location.rootPaths.head,
Map.empty, // We don't support to convert partitioned table.
false,
Seq.empty, // We don't support to convert partitioned table.
hadoopRelation.bucketSpec,
hadoopRelation.fileFormat,
hadoopRelation.options,
query,
if (tableExists) mode else SaveMode.Overwrite,
Some(tableDesc),
Some(hadoopRelation.location),
query.output.map(_.name))
}
}

View file

@ -92,4 +92,18 @@ class HiveParquetSuite extends QueryTest with ParquetTest with TestHiveSingleton
}
}
}
test("SPARK-25271: write empty map into hive parquet table") {
import testImplicits._
Seq(Map(1 -> "a"), Map.empty[Int, String]).toDF("m").createOrReplaceTempView("p")
withTempView("p") {
val targetTable = "targetTable"
withTable(targetTable) {
sql(s"CREATE TABLE $targetTable STORED AS PARQUET AS SELECT m FROM p")
checkAnswer(sql(s"SELECT m FROM $targetTable"),
Row(Map(1 -> "a")) :: Row(Map.empty[Int, String]) :: Nil)
}
}
}
}

View file

@ -2276,6 +2276,46 @@ class SQLQuerySuite extends QueryTest with SQLTestUtils with TestHiveSingleton {
}
}
test("SPARK-25271: Hive ctas commands should use data source if it is convertible") {
withTempView("p") {
Seq(1, 2, 3).toDF("id").createOrReplaceTempView("p")
Seq("orc", "parquet").foreach { format =>
Seq(true, false).foreach { isConverted =>
withSQLConf(
HiveUtils.CONVERT_METASTORE_ORC.key -> s"$isConverted",
HiveUtils.CONVERT_METASTORE_PARQUET.key -> s"$isConverted") {
Seq(true, false).foreach { isConvertedCtas =>
withSQLConf(HiveUtils.CONVERT_METASTORE_CTAS.key -> s"$isConvertedCtas") {
val targetTable = "targetTable"
withTable(targetTable) {
val df = sql(s"CREATE TABLE $targetTable STORED AS $format AS SELECT id FROM p")
checkAnswer(sql(s"SELECT id FROM $targetTable"),
Row(1) :: Row(2) :: Row(3) :: Nil)
val ctasDSCommand = df.queryExecution.analyzed.collect {
case _: OptimizedCreateHiveTableAsSelectCommand => true
}.headOption
val ctasCommand = df.queryExecution.analyzed.collect {
case _: CreateHiveTableAsSelectCommand => true
}.headOption
if (isConverted && isConvertedCtas) {
assert(ctasDSCommand.nonEmpty)
assert(ctasCommand.isEmpty)
} else {
assert(ctasDSCommand.isEmpty)
assert(ctasCommand.nonEmpty)
}
}
}
}
}
}
}
}
}
test("SPARK-26181 hasMinMaxStats method of ColumnStatsMap is not correct") {
withSQLConf(SQLConf.CBO_ENABLED.key -> "true") {