[SPARK-35480][SQL] Make percentile_approx work with pivot

### What changes were proposed in this pull request?

This PR proposes to avoid wrapping if-else to the constant literals for `percentage` and `accuracy` in `percentile_approx`. They are expected to be literals (or foldable expressions).

Pivot works by two phrase aggregations, and it works with manipulating the input to `null` for non-matched values (pivot column and value).

Note that pivot supports an optimized version without such logic with changing input to `null` for some types (non-nested types basically). So the issue fixed by this PR is only for complex types.

```scala
val df = Seq(
  ("a", -1.0), ("a", 5.5), ("a", 2.5), ("b", 3.0), ("b", 5.2)).toDF("type", "value")
  .groupBy().pivot("type", Seq("a", "b")).agg(
    percentile_approx(col("value"), array(lit(0.5)), lit(10000)))
df.show()
```

**Before:**

```
org.apache.spark.sql.AnalysisException: cannot resolve 'percentile_approx((IF((type <=> CAST('a' AS STRING)), value, CAST(NULL AS DOUBLE))), (IF((type <=> CAST('a' AS STRING)), array(0.5D), NULL)), (IF((type <=> CAST('a' AS STRING)), 10000, CAST(NULL AS INT))))' due to data type mismatch: The accuracy or percentage provided must be a constant literal;
'Aggregate [percentile_approx(if ((type#7 <=> cast(a as string))) value#8 else cast(null as double), if ((type#7 <=> cast(a as string))) array(0.5) else cast(null as array<double>), if ((type#7 <=> cast(a as string))) 10000 else cast(null as int), 0, 0) AS a#16, percentile_approx(if ((type#7 <=> cast(b as string))) value#8 else cast(null as double), if ((type#7 <=> cast(b as string))) array(0.5) else cast(null as array<double>), if ((type#7 <=> cast(b as string))) 10000 else cast(null as int), 0, 0) AS b#18]
+- Project [_1#2 AS type#7, _2#3 AS value#8]
   +- LocalRelation [_1#2, _2#3]
```

**After:**

```
+-----+-----+
|    a|    b|
+-----+-----+
|[2.5]|[3.0]|
+-----+-----+
```

### Why are the changes needed?

To make percentile_approx work with pivot as expected

### Does this PR introduce _any_ user-facing change?

Yes. It threw an exception but now it returns a correct result as shown above.

### How was this patch tested?

Manually tested and unit test was added.

Closes #32619 from HyukjinKwon/SPARK-35480.

Authored-by: Hyukjin Kwon <gurwls223@apache.org>
Signed-off-by: Hyukjin Kwon <gurwls223@apache.org>
This commit is contained in:
Hyukjin Kwon 2021-05-23 07:35:43 +09:00
parent fa424ac2b8
commit 1d9f09decb
2 changed files with 12 additions and 0 deletions

View file

@ -770,6 +770,10 @@ class Analyzer(override val catalogManager: CatalogManager)
First(ifExpr(expr), true)
case Last(expr, _) =>
Last(ifExpr(expr), true)
case a: ApproximatePercentile =>
// ApproximatePercentile takes two literals for accuracy and percentage which
// should not be wrapped by if-else.
a.withNewChildren(ifExpr(a.first) :: a.second :: a.third :: Nil)
case a: AggregateFunction =>
a.withNewChildren(a.children.map(ifExpr))
}.transform {

View file

@ -344,4 +344,12 @@ class DataFramePivotSuite extends QueryTest with SharedSparkSession {
val actual = df.groupBy("x").pivot("s").count()
checkAnswer(actual, expected)
}
test("SPARK-35480: percentile_approx should work with pivot") {
val actual = Seq(
("a", -1.0), ("a", 5.5), ("a", 2.5), ("b", 3.0), ("b", 5.2)).toDF("type", "value")
.groupBy().pivot("type", Seq("a", "b")).agg(
percentile_approx(col("value"), array(lit(0.5)), lit(10000)))
checkAnswer(actual, Row(Array(2.5), Array(3.0)))
}
}