[SPARK-5862][SQL] Only transformUp the given plan once in HiveMetastoreCatalog

Current `ParquetConversions` in `HiveMetastoreCatalog` will transformUp the given plan multiple times if there are many Metastore Parquet tables. Since the transformUp operation is recursive, it should be better to only perform it once.

Author: Liang-Chi Hsieh <viirya@gmail.com>

Closes #4651 from viirya/parquet_atonce and squashes the following commits:

c1ed29d [Liang-Chi Hsieh] Fix bug.
e0f919b [Liang-Chi Hsieh] Only transformUp the given plan once.
This commit is contained in:
Liang-Chi Hsieh 2015-02-17 12:23:18 -08:00 committed by Michael Armbrust
parent 31efb39c1d
commit 4611de1cef

View file

@ -430,33 +430,36 @@ private[hive] class HiveMetastoreCatalog(hive: HiveContext) extends Catalog with
hive.convertMetastoreParquet && hive.convertMetastoreParquet &&
hive.conf.parquetUseDataSourceApi && hive.conf.parquetUseDataSourceApi &&
relation.tableDesc.getSerdeClassName.toLowerCase.contains("parquet") => relation.tableDesc.getSerdeClassName.toLowerCase.contains("parquet") =>
relation val parquetRelation = convertToParquetRelation(relation)
val attributedRewrites = relation.output.zip(parquetRelation.output)
(relation, parquetRelation, attributedRewrites)
// Read path // Read path
case p @ PhysicalOperation(_, _, relation: MetastoreRelation) case p @ PhysicalOperation(_, _, relation: MetastoreRelation)
if hive.convertMetastoreParquet && if hive.convertMetastoreParquet &&
hive.conf.parquetUseDataSourceApi && hive.conf.parquetUseDataSourceApi &&
relation.tableDesc.getSerdeClassName.toLowerCase.contains("parquet") => relation.tableDesc.getSerdeClassName.toLowerCase.contains("parquet") =>
relation val parquetRelation = convertToParquetRelation(relation)
val attributedRewrites = relation.output.zip(parquetRelation.output)
(relation, parquetRelation, attributedRewrites)
} }
val relationMap = toBeReplaced.map(r => (r._1, r._2)).toMap
val attributedRewrites = AttributeMap(toBeReplaced.map(_._3).fold(Nil)(_ ++: _))
// Replaces all `MetastoreRelation`s with corresponding `ParquetRelation2`s, and fixes // Replaces all `MetastoreRelation`s with corresponding `ParquetRelation2`s, and fixes
// attribute IDs referenced in other nodes. // attribute IDs referenced in other nodes.
toBeReplaced.distinct.foldLeft(plan) { (lastPlan, relation) => plan.transformUp {
val parquetRelation = convertToParquetRelation(relation) case r: MetastoreRelation if relationMap.contains(r) => {
val attributedRewrites = AttributeMap(relation.output.zip(parquetRelation.output)) val parquetRelation = relationMap(r)
val withAlias =
r.alias.map(a => Subquery(a, parquetRelation)).getOrElse(
Subquery(r.tableName, parquetRelation))
lastPlan.transformUp { withAlias
case r: MetastoreRelation if r == relation => { }
val withAlias = case other => other.transformExpressions {
r.alias.map(a => Subquery(a, parquetRelation)).getOrElse( case a: Attribute if a.resolved => attributedRewrites.getOrElse(a, a)
Subquery(r.tableName, parquetRelation))
withAlias
}
case other => other.transformExpressions {
case a: Attribute if a.resolved => attributedRewrites.getOrElse(a, a)
}
} }
} }
} }