From fb9316388a7410420ffb4ad1153cf524245fe084 Mon Sep 17 00:00:00 2001 From: Yuming Wang Date: Sun, 16 May 2021 22:12:52 -0700 Subject: [PATCH] [SPARK-32792][SQL][FOLLOWUP] Fix conflict with SPARK-34661 ### What changes were proposed in this pull request? This fixes the compilation error due to the logical conflicts between https://github.com/apache/spark/pull/31776 and https://github.com/apache/spark/pull/29642 . ### Why are the changes needed? To recover compilation. ### Does this PR introduce _any_ user-facing change? No. ### How was this patch tested? Closes #32568 from wangyum/HOT-FIX. Authored-by: Yuming Wang Signed-off-by: Dongjoon Hyun --- .../datasources/parquet/ParquetFilters.scala | 19 ++++++++++--------- 1 file changed, 10 insertions(+), 9 deletions(-) diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/parquet/ParquetFilters.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/parquet/ParquetFilters.scala index e2cfda6da4..71205f967b 100644 --- a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/parquet/ParquetFilters.scala +++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/parquet/ParquetFilters.scala @@ -210,7 +210,7 @@ class ParquetFilters( longColumn(n), Option(v).map(d => decimalToInt64(d.asInstanceOf[JBigDecimal])).orNull) case ParquetSchemaType(_: DecimalLogicalTypeAnnotation, FIXED_LEN_BYTE_ARRAY, length) - if pushDownDecimal => + if pushDownDecimal => (n: Array[String], v: Any) => FilterApi.eq( binaryColumn(n), Option(v).map(d => decimalToByteArray(d.asInstanceOf[JBigDecimal], length)).orNull) @@ -261,7 +261,7 @@ class ParquetFilters( longColumn(n), Option(v).map(d => decimalToInt64(d.asInstanceOf[JBigDecimal])).orNull) case ParquetSchemaType(_: DecimalLogicalTypeAnnotation, FIXED_LEN_BYTE_ARRAY, length) - if pushDownDecimal => + if pushDownDecimal => (n: Array[String], v: Any) => FilterApi.notEq( binaryColumn(n), Option(v).map(d => decimalToByteArray(d.asInstanceOf[JBigDecimal], length)).orNull) @@ -300,7 +300,7 @@ class ParquetFilters( (n: Array[String], v: Any) => FilterApi.lt(longColumn(n), decimalToInt64(v.asInstanceOf[JBigDecimal])) case ParquetSchemaType(_: DecimalLogicalTypeAnnotation, FIXED_LEN_BYTE_ARRAY, length) - if pushDownDecimal => + if pushDownDecimal => (n: Array[String], v: Any) => FilterApi.lt(binaryColumn(n), decimalToByteArray(v.asInstanceOf[JBigDecimal], length)) } @@ -338,7 +338,7 @@ class ParquetFilters( (n: Array[String], v: Any) => FilterApi.ltEq(longColumn(n), decimalToInt64(v.asInstanceOf[JBigDecimal])) case ParquetSchemaType(_: DecimalLogicalTypeAnnotation, FIXED_LEN_BYTE_ARRAY, length) - if pushDownDecimal => + if pushDownDecimal => (n: Array[String], v: Any) => FilterApi.ltEq(binaryColumn(n), decimalToByteArray(v.asInstanceOf[JBigDecimal], length)) } @@ -376,7 +376,7 @@ class ParquetFilters( (n: Array[String], v: Any) => FilterApi.gt(longColumn(n), decimalToInt64(v.asInstanceOf[JBigDecimal])) case ParquetSchemaType(_: DecimalLogicalTypeAnnotation, FIXED_LEN_BYTE_ARRAY, length) - if pushDownDecimal => + if pushDownDecimal => (n: Array[String], v: Any) => FilterApi.gt(binaryColumn(n), decimalToByteArray(v.asInstanceOf[JBigDecimal], length)) } @@ -414,7 +414,7 @@ class ParquetFilters( (n: Array[String], v: Any) => FilterApi.gtEq(longColumn(n), decimalToInt64(v.asInstanceOf[JBigDecimal])) case ParquetSchemaType(_: DecimalLogicalTypeAnnotation, FIXED_LEN_BYTE_ARRAY, length) - if pushDownDecimal => + if pushDownDecimal => (n: Array[String], v: Any) => FilterApi.gtEq(binaryColumn(n), decimalToByteArray(v.asInstanceOf[JBigDecimal], length)) } @@ -486,21 +486,22 @@ class ParquetFilters( FilterApi.gtEq(longColumn(n), statistics.genericGetMin().asInstanceOf[JLong]), FilterApi.ltEq(longColumn(n), statistics.genericGetMax().asInstanceOf[JLong])) - case ParquetSchemaType(DECIMAL, INT32, _, _) if pushDownDecimal => + case ParquetSchemaType(_: DecimalLogicalTypeAnnotation, INT32, _) if pushDownDecimal => (n: Array[String], v: Array[Any], statistics: ParquetStatistics[_]) => v.map(_.asInstanceOf[JBigDecimal]).map(decimalToInt32).foreach(statistics.updateStats(_)) FilterApi.and( FilterApi.gtEq(intColumn(n), statistics.genericGetMin().asInstanceOf[Integer]), FilterApi.ltEq(intColumn(n), statistics.genericGetMax().asInstanceOf[Integer])) - case ParquetSchemaType(DECIMAL, INT64, _, _) if pushDownDecimal => + case ParquetSchemaType(_: DecimalLogicalTypeAnnotation, INT64, _) if pushDownDecimal => (n: Array[String], v: Array[Any], statistics: ParquetStatistics[_]) => v.map(_.asInstanceOf[JBigDecimal]).map(decimalToInt64).foreach(statistics.updateStats(_)) FilterApi.and( FilterApi.gtEq(longColumn(n), statistics.genericGetMin().asInstanceOf[JLong]), FilterApi.ltEq(longColumn(n), statistics.genericGetMax().asInstanceOf[JLong])) - case ParquetSchemaType(DECIMAL, FIXED_LEN_BYTE_ARRAY, length, _) if pushDownDecimal => + case ParquetSchemaType(_: DecimalLogicalTypeAnnotation, FIXED_LEN_BYTE_ARRAY, length) + if pushDownDecimal => (path: Array[String], v: Array[Any], statistics: ParquetStatistics[_]) => v.map(d => decimalToByteArray(d.asInstanceOf[JBigDecimal], length)) .foreach(statistics.updateStats)