From faf73dcd33d04365c28c2846d3a1f845785f69df Mon Sep 17 00:00:00 2001 From: gatorsmile Date: Tue, 9 Oct 2018 21:10:33 +0000 Subject: [PATCH] [SPARK-25559][FOLLOW-UP] Add comments for partial pushdown of conjuncts in Parquet ## What changes were proposed in this pull request? This is a follow up of https://github.com/apache/spark/pull/22574. Renamed the parameter and added comments. ## How was this patch tested? N/A Closes #22679 from gatorsmile/followupSPARK-25559. Authored-by: gatorsmile Signed-off-by: DB Tsai --- .../datasources/parquet/ParquetFilters.scala | 31 +++++++++++++------ 1 file changed, 22 insertions(+), 9 deletions(-) diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/parquet/ParquetFilters.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/parquet/ParquetFilters.scala index 44a0d209e6..21ab9c78e5 100644 --- a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/parquet/ParquetFilters.scala +++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/parquet/ParquetFilters.scala @@ -394,13 +394,22 @@ private[parquet] class ParquetFilters( */ def createFilter(schema: MessageType, predicate: sources.Filter): Option[FilterPredicate] = { val nameToParquetField = getFieldMap(schema) - createFilterHelper(nameToParquetField, predicate, canRemoveOneSideInAnd = true) + createFilterHelper(nameToParquetField, predicate, canPartialPushDownConjuncts = true) } + /** + * @param nameToParquetField a map from the field name to its field name and data type. + * This only includes the root fields whose types are primitive types. + * @param predicate the input filter predicates. Not all the predicates can be pushed down. + * @param canPartialPushDownConjuncts whether a subset of conjuncts of predicates can be pushed + * down safely. Pushing ONLY one side of AND down is safe to + * do at the top level or none of its ancestors is NOT and OR. + * @return the Parquet-native filter predicates that are eligible for pushdown. + */ private def createFilterHelper( nameToParquetField: Map[String, ParquetField], predicate: sources.Filter, - canRemoveOneSideInAnd: Boolean): Option[FilterPredicate] = { + canPartialPushDownConjuncts: Boolean): Option[FilterPredicate] = { // Decimal type must make sure that filter value's scale matched the file. // If doesn't matched, which would cause data corruption. def isDecimalMatched(value: Any, decimalMeta: DecimalMetadata): Boolean = value match { @@ -505,24 +514,28 @@ private[parquet] class ParquetFilters( // Pushing one side of AND down is only safe to do at the top level or in the child // AND before hitting NOT or OR conditions, and in this case, the unsupported predicate // can be safely removed. - val lhsFilterOption = createFilterHelper(nameToParquetField, lhs, canRemoveOneSideInAnd) - val rhsFilterOption = createFilterHelper(nameToParquetField, rhs, canRemoveOneSideInAnd) + val lhsFilterOption = + createFilterHelper(nameToParquetField, lhs, canPartialPushDownConjuncts) + val rhsFilterOption = + createFilterHelper(nameToParquetField, rhs, canPartialPushDownConjuncts) (lhsFilterOption, rhsFilterOption) match { case (Some(lhsFilter), Some(rhsFilter)) => Some(FilterApi.and(lhsFilter, rhsFilter)) - case (Some(lhsFilter), None) if canRemoveOneSideInAnd => Some(lhsFilter) - case (None, Some(rhsFilter)) if canRemoveOneSideInAnd => Some(rhsFilter) + case (Some(lhsFilter), None) if canPartialPushDownConjuncts => Some(lhsFilter) + case (None, Some(rhsFilter)) if canPartialPushDownConjuncts => Some(rhsFilter) case _ => None } case sources.Or(lhs, rhs) => for { - lhsFilter <- createFilterHelper(nameToParquetField, lhs, canRemoveOneSideInAnd = false) - rhsFilter <- createFilterHelper(nameToParquetField, rhs, canRemoveOneSideInAnd = false) + lhsFilter <- + createFilterHelper(nameToParquetField, lhs, canPartialPushDownConjuncts = false) + rhsFilter <- + createFilterHelper(nameToParquetField, rhs, canPartialPushDownConjuncts = false) } yield FilterApi.or(lhsFilter, rhsFilter) case sources.Not(pred) => - createFilterHelper(nameToParquetField, pred, canRemoveOneSideInAnd = false) + createFilterHelper(nameToParquetField, pred, canPartialPushDownConjuncts = false) .map(FilterApi.not) case sources.In(name, values) if canMakeFilterOn(name, values.head)