[SPARK-25559][FOLLOW-UP] Add comments for partial pushdown of conjuncts in Parquet

## What changes were proposed in this pull request?
This is a follow up of https://github.com/apache/spark/pull/22574. Renamed the parameter and added comments.

## How was this patch tested?
N/A

Closes #22679 from gatorsmile/followupSPARK-25559.

Authored-by: gatorsmile <gatorsmile@gmail.com>
Signed-off-by: DB Tsai <d_tsai@apple.com>
This commit is contained in:
gatorsmile 2018-10-09 21:10:33 +00:00 committed by DB Tsai
parent 3eee9e0246
commit faf73dcd33
No known key found for this signature in database
GPG key ID: E6FD79DA81FE14FD

View file

@ -394,13 +394,22 @@ private[parquet] class ParquetFilters(
*/
def createFilter(schema: MessageType, predicate: sources.Filter): Option[FilterPredicate] = {
val nameToParquetField = getFieldMap(schema)
createFilterHelper(nameToParquetField, predicate, canRemoveOneSideInAnd = true)
createFilterHelper(nameToParquetField, predicate, canPartialPushDownConjuncts = true)
}
/**
* @param nameToParquetField a map from the field name to its field name and data type.
* This only includes the root fields whose types are primitive types.
* @param predicate the input filter predicates. Not all the predicates can be pushed down.
* @param canPartialPushDownConjuncts whether a subset of conjuncts of predicates can be pushed
* down safely. Pushing ONLY one side of AND down is safe to
* do at the top level or none of its ancestors is NOT and OR.
* @return the Parquet-native filter predicates that are eligible for pushdown.
*/
private def createFilterHelper(
nameToParquetField: Map[String, ParquetField],
predicate: sources.Filter,
canRemoveOneSideInAnd: Boolean): Option[FilterPredicate] = {
canPartialPushDownConjuncts: Boolean): Option[FilterPredicate] = {
// Decimal type must make sure that filter value's scale matched the file.
// If doesn't matched, which would cause data corruption.
def isDecimalMatched(value: Any, decimalMeta: DecimalMetadata): Boolean = value match {
@ -505,24 +514,28 @@ private[parquet] class ParquetFilters(
// Pushing one side of AND down is only safe to do at the top level or in the child
// AND before hitting NOT or OR conditions, and in this case, the unsupported predicate
// can be safely removed.
val lhsFilterOption = createFilterHelper(nameToParquetField, lhs, canRemoveOneSideInAnd)
val rhsFilterOption = createFilterHelper(nameToParquetField, rhs, canRemoveOneSideInAnd)
val lhsFilterOption =
createFilterHelper(nameToParquetField, lhs, canPartialPushDownConjuncts)
val rhsFilterOption =
createFilterHelper(nameToParquetField, rhs, canPartialPushDownConjuncts)
(lhsFilterOption, rhsFilterOption) match {
case (Some(lhsFilter), Some(rhsFilter)) => Some(FilterApi.and(lhsFilter, rhsFilter))
case (Some(lhsFilter), None) if canRemoveOneSideInAnd => Some(lhsFilter)
case (None, Some(rhsFilter)) if canRemoveOneSideInAnd => Some(rhsFilter)
case (Some(lhsFilter), None) if canPartialPushDownConjuncts => Some(lhsFilter)
case (None, Some(rhsFilter)) if canPartialPushDownConjuncts => Some(rhsFilter)
case _ => None
}
case sources.Or(lhs, rhs) =>
for {
lhsFilter <- createFilterHelper(nameToParquetField, lhs, canRemoveOneSideInAnd = false)
rhsFilter <- createFilterHelper(nameToParquetField, rhs, canRemoveOneSideInAnd = false)
lhsFilter <-
createFilterHelper(nameToParquetField, lhs, canPartialPushDownConjuncts = false)
rhsFilter <-
createFilterHelper(nameToParquetField, rhs, canPartialPushDownConjuncts = false)
} yield FilterApi.or(lhsFilter, rhsFilter)
case sources.Not(pred) =>
createFilterHelper(nameToParquetField, pred, canRemoveOneSideInAnd = false)
createFilterHelper(nameToParquetField, pred, canPartialPushDownConjuncts = false)
.map(FilterApi.not)
case sources.In(name, values) if canMakeFilterOn(name, values.head)