[SPARK-20331][SQL][FOLLOW-UP] Add a SQLConf for enhanced Hive partition pruning predicate pushdown

## What changes were proposed in this pull request?
This is a follow-up PR of https://github.com/apache/spark/pull/17633.

This PR is to add a conf `spark.sql.hive.advancedPartitionPredicatePushdown.enabled`, which can be used to turn the enhancement off.

## How was this patch tested?
Add a test case

Author: gatorsmile <gatorsmile@gmail.com>

Closes #19547 from gatorsmile/Spark20331FollowUp.
This commit is contained in:
gatorsmile 2017-10-21 10:05:45 -07:00
parent d9f286d261
commit d8cada8d1d
3 changed files with 64 additions and 5 deletions

View file

@ -173,6 +173,13 @@ object SQLConf {
.intConf
.createWithDefault(4)
val ADVANCED_PARTITION_PREDICATE_PUSHDOWN =
buildConf("spark.sql.hive.advancedPartitionPredicatePushdown.enabled")
.internal()
.doc("When true, advanced partition predicate pushdown into Hive metastore is enabled.")
.booleanConf
.createWithDefault(true)
val ENABLE_FALL_BACK_TO_HDFS_FOR_STATS =
buildConf("spark.sql.statistics.fallBackToHdfs")
.doc("If the table statistics are not available from table metadata enable fall back to hdfs." +
@ -1092,6 +1099,9 @@ class SQLConf extends Serializable with Logging {
def limitScaleUpFactor: Int = getConf(LIMIT_SCALE_UP_FACTOR)
def advancedPartitionPredicatePushdownEnabled: Boolean =
getConf(ADVANCED_PARTITION_PREDICATE_PUSHDOWN)
def fallBackToHdfsForStatsEnabled: Boolean = getConf(ENABLE_FALL_BACK_TO_HDFS_FOR_STATS)
def preferSortMergeJoin: Boolean = getConf(PREFER_SORTMERGEJOIN)

View file

@ -585,6 +585,35 @@ private[client] class Shim_v0_13 extends Shim_v0_12 {
* Unsupported predicates are skipped.
*/
def convertFilters(table: Table, filters: Seq[Expression]): String = {
if (SQLConf.get.advancedPartitionPredicatePushdownEnabled) {
convertComplexFilters(table, filters)
} else {
convertBasicFilters(table, filters)
}
}
private def convertBasicFilters(table: Table, filters: Seq[Expression]): String = {
// hive varchar is treated as catalyst string, but hive varchar can't be pushed down.
lazy val varcharKeys = table.getPartitionKeys.asScala
.filter(col => col.getType.startsWith(serdeConstants.VARCHAR_TYPE_NAME) ||
col.getType.startsWith(serdeConstants.CHAR_TYPE_NAME))
.map(col => col.getName).toSet
filters.collect {
case op @ BinaryComparison(a: Attribute, Literal(v, _: IntegralType)) =>
s"${a.name} ${op.symbol} $v"
case op @ BinaryComparison(Literal(v, _: IntegralType), a: Attribute) =>
s"$v ${op.symbol} ${a.name}"
case op @ BinaryComparison(a: Attribute, Literal(v, _: StringType))
if !varcharKeys.contains(a.name) =>
s"""${a.name} ${op.symbol} ${quoteStringLiteral(v.toString)}"""
case op @ BinaryComparison(Literal(v, _: StringType), a: Attribute)
if !varcharKeys.contains(a.name) =>
s"""${quoteStringLiteral(v.toString)} ${op.symbol} ${a.name}"""
}.mkString(" and ")
}
private def convertComplexFilters(table: Table, filters: Seq[Expression]): String = {
// hive varchar is treated as catalyst string, but hive varchar can't be pushed down.
lazy val varcharKeys = table.getPartitionKeys.asScala
.filter(col => col.getType.startsWith(serdeConstants.VARCHAR_TYPE_NAME) ||

View file

@ -26,13 +26,15 @@ import org.apache.spark.SparkFunSuite
import org.apache.spark.internal.Logging
import org.apache.spark.sql.catalyst.dsl.expressions._
import org.apache.spark.sql.catalyst.expressions._
import org.apache.spark.sql.catalyst.plans.PlanTest
import org.apache.spark.sql.internal.SQLConf
import org.apache.spark.sql.types._
/**
* A set of tests for the filter conversion logic used when pushing partition pruning into the
* metastore
*/
class FiltersSuite extends SparkFunSuite with Logging {
class FiltersSuite extends SparkFunSuite with Logging with PlanTest {
private val shim = new Shim_v0_13
private val testTable = new org.apache.hadoop.hive.ql.metadata.Table("default", "test")
@ -72,10 +74,28 @@ class FiltersSuite extends SparkFunSuite with Logging {
private def filterTest(name: String, filters: Seq[Expression], result: String) = {
test(name) {
withSQLConf(SQLConf.ADVANCED_PARTITION_PREDICATE_PUSHDOWN.key -> "true") {
val converted = shim.convertFilters(testTable, filters)
if (converted != result) {
fail(
s"Expected filters ${filters.mkString(",")} to convert to '$result' but got '$converted'")
fail(s"Expected ${filters.mkString(",")} to convert to '$result' but got '$converted'")
}
}
}
}
test("turn on/off ADVANCED_PARTITION_PREDICATE_PUSHDOWN") {
import org.apache.spark.sql.catalyst.dsl.expressions._
Seq(true, false).foreach { enabled =>
withSQLConf(SQLConf.ADVANCED_PARTITION_PREDICATE_PUSHDOWN.key -> enabled.toString) {
val filters =
(Literal(1) === a("intcol", IntegerType) ||
Literal(2) === a("intcol", IntegerType)) :: Nil
val converted = shim.convertFilters(testTable, filters)
if (enabled) {
assert(converted == "(1 = intcol or 2 = intcol)")
} else {
assert(converted.isEmpty)
}
}
}
}