Revert SPARK-6910 and SPARK-9027

Revert #7216 and #7386.  These patch seems to be causing quite a few test failures:

```
Caused by: java.lang.reflect.InvocationTargetException
	at sun.reflect.GeneratedMethodAccessor322.invoke(Unknown Source)
	at sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)
	at java.lang.reflect.Method.invoke(Method.java:606)
	at org.apache.spark.sql.hive.client.Shim_v0_13.getPartitionsByFilter(HiveShim.scala:351)
	at org.apache.spark.sql.hive.client.ClientWrapper$$anonfun$getPartitionsByFilter$1.apply(ClientWrapper.scala:320)
	at org.apache.spark.sql.hive.client.ClientWrapper$$anonfun$getPartitionsByFilter$1.apply(ClientWrapper.scala:318)
	at org.apache.spark.sql.hive.client.ClientWrapper$$anonfun$withHiveState$1.apply(ClientWrapper.scala:180)
	at org.apache.spark.sql.hive.client.ClientWrapper.retryLocked(ClientWrapper.scala:135)
	at org.apache.spark.sql.hive.client.ClientWrapper.withHiveState(ClientWrapper.scala:172)
	at org.apache.spark.sql.hive.client.ClientWrapper.getPartitionsByFilter(ClientWrapper.scala:318)
	at org.apache.spark.sql.hive.client.HiveTable.getPartitions(ClientInterface.scala:78)
	at org.apache.spark.sql.hive.MetastoreRelation.getHiveQlPartitions(HiveMetastoreCatalog.scala:670)
	at org.apache.spark.sql.hive.execution.HiveTableScan.doExecute(HiveTableScan.scala:137)
	at org.apache.spark.sql.execution.SparkPlan$$anonfun$execute$1.apply(SparkPlan.scala:90)
	at org.apache.spark.sql.execution.SparkPlan$$anonfun$execute$1.apply(SparkPlan.scala:90)
	at org.apache.spark.rdd.RDDOperationScope$.withScope(RDDOperationScope.scala:147)
	at org.apache.spark.sql.execution.SparkPlan.execute(SparkPlan.scala:89)
	at org.apache.spark.sql.execution.Exchange$$anonfun$doExecute$1.apply(Exchange.scala:164)
	at org.apache.spark.sql.execution.Exchange$$anonfun$doExecute$1.apply(Exchange.scala:151)
	at org.apache.spark.sql.catalyst.errors.package$.attachTree(package.scala:48)
	... 85 more
Caused by: MetaException(message:Filtering is supported only on partition keys of type string)
	at org.apache.hadoop.hive.metastore.parser.ExpressionTree$FilterBuilder.setError(ExpressionTree.java:185)
	at org.apache.hadoop.hive.metastore.parser.ExpressionTree$LeafNode.getJdoFilterPushdownParam(ExpressionTree.java:452)
	at org.apache.hadoop.hive.metastore.parser.ExpressionTree$LeafNode.generateJDOFilterOverPartitions(ExpressionTree.java:357)
	at org.apache.hadoop.hive.metastore.parser.ExpressionTree$LeafNode.generateJDOFilter(ExpressionTree.java:279)
	at org.apache.hadoop.hive.metastore.parser.ExpressionTree$TreeNode.generateJDOFilter(ExpressionTree.java:243)
	at org.apache.hadoop.hive.metastore.parser.ExpressionTree.generateJDOFilterFragment(ExpressionTree.java:590)
	at org.apache.hadoop.hive.metastore.ObjectStore.makeQueryFilterString(ObjectStore.java:2417)
	at org.apache.hadoop.hive.metastore.ObjectStore.getPartitionsViaOrmFilter(ObjectStore.java:2029)
	at org.apache.hadoop.hive.metastore.ObjectStore.access$500(ObjectStore.java:146)
	at org.apache.hadoop.hive.metastore.ObjectStore$4.getJdoResult(ObjectStore.java:2332)
```
https://amplab.cs.berkeley.edu/jenkins/view/Spark-QA-Test/job/Spark-Master-Maven-with-YARN/2945/HADOOP_PROFILE=hadoop-2.4,label=centos/testReport/junit/org.apache.spark.sql.hive.execution/SortMergeCompatibilitySuite/auto_sortmerge_join_16/

Author: Michael Armbrust <michael@databricks.com>

Closes #7409 from marmbrus/revertMetastorePushdown and squashes the following commits:

92fabd3 [Michael Armbrust] Revert SPARK-6910 and SPARK-9027
5d3bdf2 [Michael Armbrust] Revert "[SPARK-9027] [SQL] Generalize metastore predicate pushdown"
This commit is contained in:
Michael Armbrust 2015-07-14 22:57:39 -07:00 committed by Reynold Xin
parent f23a721c10
commit c6b1a9e74e
10 changed files with 44 additions and 218 deletions

View file

@ -301,9 +301,7 @@ private[hive] class HiveMetastoreCatalog(val client: ClientInterface, hive: Hive
val result = if (metastoreRelation.hiveQlTable.isPartitioned) {
val partitionSchema = StructType.fromAttributes(metastoreRelation.partitionKeys)
val partitionColumnDataTypes = partitionSchema.map(_.dataType)
// We're converting the entire table into ParquetRelation, so predicates to Hive metastore
// are empty.
val partitions = metastoreRelation.getHiveQlPartitions().map { p =>
val partitions = metastoreRelation.hiveQlPartitions.map { p =>
val location = p.getLocation
val values = InternalRow.fromSeq(p.getValues.zip(partitionColumnDataTypes).map {
case (rawValue, dataType) => Cast(Literal(rawValue), dataType).eval(null)
@ -646,28 +644,7 @@ private[hive] case class MetastoreRelation
new Table(tTable)
}
@transient override lazy val statistics: Statistics = Statistics(
sizeInBytes = {
val totalSize = hiveQlTable.getParameters.get(StatsSetupConst.TOTAL_SIZE)
val rawDataSize = hiveQlTable.getParameters.get(StatsSetupConst.RAW_DATA_SIZE)
// TODO: check if this estimate is valid for tables after partition pruning.
// NOTE: getting `totalSize` directly from params is kind of hacky, but this should be
// relatively cheap if parameters for the table are populated into the metastore. An
// alternative would be going through Hadoop's FileSystem API, which can be expensive if a lot
// of RPCs are involved. Besides `totalSize`, there are also `numFiles`, `numRows`,
// `rawDataSize` keys (see StatsSetupConst in Hive) that we can look at in the future.
BigInt(
// When table is external,`totalSize` is always zero, which will influence join strategy
// so when `totalSize` is zero, use `rawDataSize` instead
// if the size is still less than zero, we use default size
Option(totalSize).map(_.toLong).filter(_ > 0)
.getOrElse(Option(rawDataSize).map(_.toLong).filter(_ > 0)
.getOrElse(sqlContext.conf.defaultSizeInBytes)))
}
)
def getHiveQlPartitions(predicates: Seq[Expression] = Nil): Seq[Partition] = {
table.getPartitions(predicates).map { p =>
@transient val hiveQlPartitions: Seq[Partition] = table.getAllPartitions.map { p =>
val tPartition = new org.apache.hadoop.hive.metastore.api.Partition
tPartition.setDbName(databaseName)
tPartition.setTableName(tableName)
@ -692,7 +669,26 @@ private[hive] case class MetastoreRelation
new Partition(hiveQlTable, tPartition)
}
@transient override lazy val statistics: Statistics = Statistics(
sizeInBytes = {
val totalSize = hiveQlTable.getParameters.get(StatsSetupConst.TOTAL_SIZE)
val rawDataSize = hiveQlTable.getParameters.get(StatsSetupConst.RAW_DATA_SIZE)
// TODO: check if this estimate is valid for tables after partition pruning.
// NOTE: getting `totalSize` directly from params is kind of hacky, but this should be
// relatively cheap if parameters for the table are populated into the metastore. An
// alternative would be going through Hadoop's FileSystem API, which can be expensive if a lot
// of RPCs are involved. Besides `totalSize`, there are also `numFiles`, `numRows`,
// `rawDataSize` keys (see StatsSetupConst in Hive) that we can look at in the future.
BigInt(
// When table is external,`totalSize` is always zero, which will influence join strategy
// so when `totalSize` is zero, use `rawDataSize` instead
// if the size is still less than zero, we use default size
Option(totalSize).map(_.toLong).filter(_ > 0)
.getOrElse(Option(rawDataSize).map(_.toLong).filter(_ > 0)
.getOrElse(sqlContext.conf.defaultSizeInBytes)))
}
)
/** Only compare database and tablename, not alias. */
override def sameResult(plan: LogicalPlan): Boolean = {

View file

@ -27,7 +27,6 @@ import scala.reflect.ClassTag
import com.esotericsoftware.kryo.Kryo
import com.esotericsoftware.kryo.io.{Input, Output}
import org.apache.hadoop.conf.Configuration
import org.apache.hadoop.fs.Path
import org.apache.hadoop.hive.ql.exec.{UDF, Utilities}

View file

@ -125,7 +125,7 @@ private[hive] trait HiveStrategies {
InterpretedPredicate.create(castedPredicate)
}
val partitions = relation.getHiveQlPartitions(pruningPredicates).filter { part =>
val partitions = relation.hiveQlPartitions.filter { part =>
val partitionValues = part.getValues
var i = 0
while (i < partitionValues.size()) {
@ -213,7 +213,7 @@ private[hive] trait HiveStrategies {
projectList,
otherPredicates,
identity[Seq[Expression]],
HiveTableScan(_, relation, pruningPredicates)(hiveContext)) :: Nil
HiveTableScan(_, relation, pruningPredicates.reduceLeftOption(And))(hiveContext)) :: Nil
case _ =>
Nil
}

View file

@ -21,7 +21,6 @@ import java.io.PrintStream
import java.util.{Map => JMap}
import org.apache.spark.sql.catalyst.analysis.{NoSuchDatabaseException, NoSuchTableException}
import org.apache.spark.sql.catalyst.expressions.Expression
private[hive] case class HiveDatabase(
name: String,
@ -72,12 +71,7 @@ private[hive] case class HiveTable(
def isPartitioned: Boolean = partitionColumns.nonEmpty
def getPartitions(predicates: Seq[Expression]): Seq[HivePartition] = {
predicates match {
case Nil => client.getAllPartitions(this)
case _ => client.getPartitionsByFilter(this, predicates)
}
}
def getAllPartitions: Seq[HivePartition] = client.getAllPartitions(this)
// Hive does not support backticks when passing names to the client.
def qualifiedName: String = s"$database.$name"
@ -138,9 +132,6 @@ private[hive] trait ClientInterface {
/** Returns all partitions for the given table. */
def getAllPartitions(hTable: HiveTable): Seq[HivePartition]
/** Returns partitions filtered by predicates for the given table. */
def getPartitionsByFilter(hTable: HiveTable, predicates: Seq[Expression]): Seq[HivePartition]
/** Loads a static partition into an existing table. */
def loadPartition(
loadPath: String,

View file

@ -17,21 +17,25 @@
package org.apache.spark.sql.hive.client
import java.io.{File, PrintStream}
import java.util.{Map => JMap}
import java.io.{BufferedReader, InputStreamReader, File, PrintStream}
import java.net.URI
import java.util.{ArrayList => JArrayList, Map => JMap, List => JList, Set => JSet}
import javax.annotation.concurrent.GuardedBy
import scala.collection.JavaConversions._
import scala.language.reflectiveCalls
import org.apache.hadoop.fs.Path
import org.apache.hadoop.hive.metastore.api.Database
import org.apache.hadoop.hive.conf.HiveConf
import org.apache.hadoop.hive.metastore.api.{Database, FieldSchema}
import org.apache.hadoop.hive.metastore.{TableType => HTableType}
import org.apache.hadoop.hive.metastore.api
import org.apache.hadoop.hive.metastore.api.FieldSchema
import org.apache.hadoop.hive.ql.metadata
import org.apache.hadoop.hive.ql.metadata.Hive
import org.apache.hadoop.hive.ql.processors._
import org.apache.hadoop.hive.ql.session.SessionState
import org.apache.hadoop.hive.ql.{Driver, metadata}
import org.apache.hadoop.hive.ql.processors._
import org.apache.hadoop.hive.ql.Driver
import org.apache.spark.Logging
import org.apache.spark.sql.catalyst.expressions.Expression
@ -312,13 +316,6 @@ private[hive] class ClientWrapper(
shim.getAllPartitions(client, qlTable).map(toHivePartition)
}
override def getPartitionsByFilter(
hTable: HiveTable,
predicates: Seq[Expression]): Seq[HivePartition] = withHiveState {
val qlTable = toQlTable(hTable)
shim.getPartitionsByFilter(client, qlTable, predicates).map(toHivePartition)
}
override def listTables(dbName: String): Seq[String] = withHiveState {
client.getAllTables(dbName)
}

View file

@ -31,11 +31,6 @@ import org.apache.hadoop.hive.ql.Driver
import org.apache.hadoop.hive.ql.metadata.{Hive, Partition, Table}
import org.apache.hadoop.hive.ql.processors.{CommandProcessor, CommandProcessorFactory}
import org.apache.hadoop.hive.ql.session.SessionState
import org.apache.hadoop.hive.serde.serdeConstants
import org.apache.spark.Logging
import org.apache.spark.sql.catalyst.expressions._
import org.apache.spark.sql.types.{StringType, IntegralType}
/**
* A shim that defines the interface between ClientWrapper and the underlying Hive library used to
@ -66,8 +61,6 @@ private[client] sealed abstract class Shim {
def getAllPartitions(hive: Hive, table: Table): Seq[Partition]
def getPartitionsByFilter(hive: Hive, table: Table, predicates: Seq[Expression]): Seq[Partition]
def getCommandProcessor(token: String, conf: HiveConf): CommandProcessor
def getDriverResults(driver: Driver): Seq[String]
@ -116,7 +109,7 @@ private[client] sealed abstract class Shim {
}
private[client] class Shim_v0_12 extends Shim with Logging {
private[client] class Shim_v0_12 extends Shim {
private lazy val startMethod =
findStaticMethod(
@ -203,17 +196,6 @@ private[client] class Shim_v0_12 extends Shim with Logging {
override def getAllPartitions(hive: Hive, table: Table): Seq[Partition] =
getAllPartitionsMethod.invoke(hive, table).asInstanceOf[JSet[Partition]].toSeq
override def getPartitionsByFilter(
hive: Hive,
table: Table,
predicates: Seq[Expression]): Seq[Partition] = {
// getPartitionsByFilter() doesn't support binary comparison ops in Hive 0.12.
// See HIVE-4888.
logDebug("Hive 0.12 doesn't support predicate pushdown to metastore. " +
"Please use Hive 0.13 or higher.")
getAllPartitions(hive, table)
}
override def getCommandProcessor(token: String, conf: HiveConf): CommandProcessor =
getCommandProcessorMethod.invoke(null, token, conf).asInstanceOf[CommandProcessor]
@ -285,12 +267,6 @@ private[client] class Shim_v0_13 extends Shim_v0_12 {
classOf[Hive],
"getAllPartitionsOf",
classOf[Table])
private lazy val getPartitionsByFilterMethod =
findMethod(
classOf[Hive],
"getPartitionsByFilter",
classOf[Table],
classOf[String])
private lazy val getCommandProcessorMethod =
findStaticMethod(
classOf[CommandProcessorFactory],
@ -312,52 +288,6 @@ private[client] class Shim_v0_13 extends Shim_v0_12 {
override def getAllPartitions(hive: Hive, table: Table): Seq[Partition] =
getAllPartitionsMethod.invoke(hive, table).asInstanceOf[JSet[Partition]].toSeq
/**
* Converts catalyst expression to the format that Hive's getPartitionsByFilter() expects, i.e.
* a string that represents partition predicates like "str_key=\"value\" and int_key=1 ...".
*
* Unsupported predicates are skipped.
*/
def convertFilters(table: Table, filters: Seq[Expression]): String = {
// hive varchar is treated as catalyst string, but hive varchar can't be pushed down.
val varcharKeys = table.getPartitionKeys
.filter(col => col.getType.startsWith(serdeConstants.VARCHAR_TYPE_NAME))
.map(col => col.getName).toSet
filters.collect {
case op @ BinaryComparison(a: Attribute, Literal(v, _: IntegralType)) =>
s"${a.name} ${op.symbol} $v"
case op @ BinaryComparison(Literal(v, _: IntegralType), a: Attribute) =>
s"$v ${op.symbol} ${a.name}"
case op @ BinaryComparison(a: Attribute, Literal(v, _: StringType))
if !varcharKeys.contains(a.name) =>
s"""${a.name} ${op.symbol} "$v""""
case op @ BinaryComparison(Literal(v, _: StringType), a: Attribute)
if !varcharKeys.contains(a.name) =>
s""""$v" ${op.symbol} ${a.name}"""
}.mkString(" and ")
}
override def getPartitionsByFilter(
hive: Hive,
table: Table,
predicates: Seq[Expression]): Seq[Partition] = {
// Hive getPartitionsByFilter() takes a string that represents partition
// predicates like "str_key=\"value\" and int_key=1 ..."
val filter = convertFilters(table, predicates)
val partitions =
if (filter.isEmpty) {
getAllPartitionsMethod.invoke(hive, table).asInstanceOf[JSet[Partition]]
} else {
logDebug(s"Hive metastore filter is '$filter'.")
getPartitionsByFilterMethod.invoke(hive, table, filter).asInstanceOf[JArrayList[Partition]]
}
partitions.toSeq
}
override def getCommandProcessor(token: String, conf: HiveConf): CommandProcessor =
getCommandProcessorMethod.invoke(null, Array(token), conf).asInstanceOf[CommandProcessor]

View file

@ -44,7 +44,7 @@ private[hive]
case class HiveTableScan(
requestedAttributes: Seq[Attribute],
relation: MetastoreRelation,
partitionPruningPred: Seq[Expression])(
partitionPruningPred: Option[Expression])(
@transient val context: HiveContext)
extends LeafNode {
@ -56,7 +56,7 @@ case class HiveTableScan(
// Bind all partition key attribute references in the partition pruning predicate for later
// evaluation.
private[this] val boundPruningPred = partitionPruningPred.reduceLeftOption(And).map { pred =>
private[this] val boundPruningPred = partitionPruningPred.map { pred =>
require(
pred.dataType == BooleanType,
s"Data type of predicate $pred must be BooleanType rather than ${pred.dataType}.")
@ -133,8 +133,7 @@ case class HiveTableScan(
protected override def doExecute(): RDD[InternalRow] = if (!relation.hiveQlTable.isPartitioned) {
hadoopReader.makeRDDForTable(relation.hiveQlTable)
} else {
hadoopReader.makeRDDForPartitionedTable(
prunePartitions(relation.getHiveQlPartitions(partitionPruningPred)))
hadoopReader.makeRDDForPartitionedTable(prunePartitions(relation.hiveQlPartitions))
}
override def output: Seq[Attribute] = attributes

View file

@ -1,78 +0,0 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.spark.sql.hive.client
import scala.collection.JavaConversions._
import org.apache.hadoop.hive.metastore.api.FieldSchema
import org.apache.hadoop.hive.serde.serdeConstants
import org.apache.spark.{Logging, SparkFunSuite}
import org.apache.spark.sql.catalyst.dsl.expressions._
import org.apache.spark.sql.catalyst.expressions._
import org.apache.spark.sql.types._
/**
* A set of tests for the filter conversion logic used when pushing partition pruning into the
* metastore
*/
class FiltersSuite extends SparkFunSuite with Logging {
private val shim = new Shim_v0_13
private val testTable = new org.apache.hadoop.hive.ql.metadata.Table("default", "test")
private val varCharCol = new FieldSchema()
varCharCol.setName("varchar")
varCharCol.setType(serdeConstants.VARCHAR_TYPE_NAME)
testTable.setPartCols(varCharCol :: Nil)
filterTest("string filter",
(a("stringcol", StringType) > Literal("test")) :: Nil,
"stringcol > \"test\"")
filterTest("string filter backwards",
(Literal("test") > a("stringcol", StringType)) :: Nil,
"\"test\" > stringcol")
filterTest("int filter",
(a("intcol", IntegerType) === Literal(1)) :: Nil,
"intcol = 1")
filterTest("int filter backwards",
(Literal(1) === a("intcol", IntegerType)) :: Nil,
"1 = intcol")
filterTest("int and string filter",
(Literal(1) === a("intcol", IntegerType)) :: (Literal("a") === a("strcol", IntegerType)) :: Nil,
"1 = intcol and \"a\" = strcol")
filterTest("skip varchar",
(Literal("") === a("varchar", StringType)) :: Nil,
"")
private def filterTest(name: String, filters: Seq[Expression], result: String) = {
test(name){
val converted = shim.convertFilters(testTable, filters)
if (converted != result) {
fail(
s"Expected filters ${filters.mkString(",")} to convert to '$result' but got '$converted'")
}
}
}
private def a(name: String, dataType: DataType) = AttributeReference(name, dataType)()
}

View file

@ -20,9 +20,7 @@ package org.apache.spark.sql.hive.client
import java.io.File
import org.apache.spark.{Logging, SparkFunSuite}
import org.apache.spark.sql.catalyst.expressions.{NamedExpression, Literal, AttributeReference, EqualTo}
import org.apache.spark.sql.catalyst.util.quietly
import org.apache.spark.sql.types.IntegerType
import org.apache.spark.util.Utils
/**
@ -153,12 +151,6 @@ class VersionsSuite extends SparkFunSuite with Logging {
client.getAllPartitions(client.getTable("default", "src_part"))
}
test(s"$version: getPartitionsByFilter") {
client.getPartitionsByFilter(client.getTable("default", "src_part"), Seq(EqualTo(
AttributeReference("key", IntegerType, false)(NamedExpression.newExprId),
Literal(1))))
}
test(s"$version: loadPartition") {
client.loadPartition(
emptyDir,

View file

@ -151,7 +151,7 @@ class PruningSuite extends HiveComparisonTest with BeforeAndAfter {
case p @ HiveTableScan(columns, relation, _) =>
val columnNames = columns.map(_.name)
val partValues = if (relation.table.isPartitioned) {
p.prunePartitions(relation.getHiveQlPartitions()).map(_.getValues)
p.prunePartitions(relation.hiveQlPartitions).map(_.getValues)
} else {
Seq.empty
}