Commit graph

4479 commits

Author SHA1 Message Date
Yuanjian Li 86b54f3321 [SPARK-31894][SS] Introduce UnsafeRow format validation for streaming state store
### What changes were proposed in this pull request?
Introduce UnsafeRow format validation for streaming state store.

### Why are the changes needed?
Currently, Structured Streaming directly puts the UnsafeRow into StateStore without any schema validation. It's a dangerous behavior when users reusing the checkpoint file during migration. Any changes or bug fix related to the aggregate function may cause random exceptions, even the wrong answer, e.g SPARK-28067.

### Does this PR introduce _any_ user-facing change?
Yes. If the underlying changes are detected when the checkpoint is reused during migration, the InvalidUnsafeRowException will be thrown.

### How was this patch tested?
UT added. Will also add integrated tests for more scenario in another PR separately.

Closes #28707 from xuanyuanking/SPARK-31894.

Lead-authored-by: Yuanjian Li <xyliyuanjian@gmail.com>
Co-authored-by: Yuanjian Li <yuanjian.li@databricks.com>
Signed-off-by: Wenchen Fan <wenchen@databricks.com>
2020-06-19 05:56:50 +00:00
Max Gekk 17a5007fd8 [SPARK-30865][SQL][SS] Refactor DateTimeUtils
### What changes were proposed in this pull request?

1. Move TimeZoneUTC and TimeZoneGMT to DateTimeTestUtils
2. Remove TimeZoneGMT
3. Use ZoneId.systemDefault() instead of defaultTimeZone().toZoneId
4. Alias SQLDate & SQLTimestamp to internal types of DateType and TimestampType
5. Avoid one `*` `DateTimeUtils`.`in fromJulianDay()`
6. Use toTotalMonths in `DateTimeUtils`.`subtractDates()`
7. Remove `julianCommonEraStart`, `timestampToString()`, `microsToEpochDays()`, `epochDaysToMicros()`, `instantToDays()` from `DateTimeUtils`.
8. Make splitDate() private.
9. Remove `def daysToMicros(days: Int): Long` and `def microsToDays(micros: Long): Int`.

### Why are the changes needed?

This simplifies the common code related to date-time operations, and should improve maintainability. In particular:

1. TimeZoneUTC and TimeZoneGMT are moved to DateTimeTestUtils because they are used only in tests
2. TimeZoneGMT can be removed because it is equal to TimeZoneUTC
3. After the PR #27494, Spark expressions and DateTimeUtils functions switched to ZoneId instead of TimeZone completely. `defaultTimeZone()` with `TimeZone` as return type is not needed anymore.
4. SQLDate and SQLTimestamp types can be explicitly aliased to internal types of DateType and and TimestampType instead of declaring this in a comment.
5. Avoid one `*` `DateTimeUtils`.`in fromJulianDay()`.
6. Use toTotalMonths in `DateTimeUtils`.`subtractDates()`.

### Does this PR introduce any user-facing change?
No

### How was this patch tested?
By existing test suites

Closes #27617 from MaxGekk/move-time-zone-consts.

Lead-authored-by: Max Gekk <max.gekk@gmail.com>
Co-authored-by: Maxim Gekk <max.gekk@gmail.com>
Signed-off-by: Wenchen Fan <wenchen@databricks.com>
2020-06-19 05:41:09 +00:00
Dilip Biswal e4f5036146 [SPARK-32020][SQL] Better error message when SPARK_HOME or spark.test.home is not set
### What changes were proposed in this pull request?
Better error message when SPARK_HOME or spark,test.home is not set.

### Why are the changes needed?
Currently the error message is not easily consumable as it prints  (see below) the real error after printing the current environment which is rather long.

**Old output**
`
 time.name" -> "Java(TM) SE Runtime Environment", "sun.boot.library.path" -> "/Library/Java/JavaVirtualMachines/jdk1.8.0_221.jdk/Contents/Home/jre/lib",
 "java.vm.version" -> "25.221-b11",
 . . .
 . . .
 . . .
) did not contain key "SPARK_HOME" spark.test.home or SPARK_HOME is not set.
	at org.scalatest.Assertions.newAssertionFailedExceptio
`

**New output**
An exception or error caused a run to abort: spark.test.home or SPARK_HOME is not set.
org.scalatest.exceptions.TestFailedException: spark.test.home or SPARK_HOME is not set
### Does this PR introduce any user-facing change?
`
No.

### How was this patch tested?
Ran the tests in intellej  manually to see the new error.

Closes #28825 from dilipbiswal/minor-spark-31950-followup.

Authored-by: Dilip Biswal <dkbiswal@gmail.com>
Signed-off-by: HyukjinKwon <gurwls223@apache.org>
2020-06-18 22:45:55 +09:00
Max Gekk afd8a8b964 [SPARK-31989][SQL] Generate JSON rebasing files w/ 30 minutes step
### What changes were proposed in this pull request?
1. Change the max step from 1 week to 30 minutes in the tests `RebaseDateTimeSuite`.`generate 'gregorian-julian-rebase-micros.json'` and `generate 'julian-gregorian-rebase-micros.json'`.
2. Parallelise JSON files generation in the function `generateRebaseJson` by using `ThreadUtils.parmap`.

### Why are the changes needed?
1. To prevent the bugs that are fixed by https://github.com/apache/spark/pull/28787 and https://github.com/apache/spark/pull/28816.
2. The parallelisation speeds up JSON file generation.

### Does this PR introduce _any_ user-facing change?
Yes

### How was this patch tested?
By generating the JSON file `julian-gregorian-rebase-micros.json`.

Closes #28827 from MaxGekk/rebase-30-min.

Authored-by: Max Gekk <max.gekk@gmail.com>
Signed-off-by: HyukjinKwon <gurwls223@apache.org>
2020-06-17 12:07:36 +09:00
Takeshi Yamamuro 8d577092ed [SPARK-31705][SQL][FOLLOWUP] Avoid the unnecessary CNF computation for full-outer joins
### What changes were proposed in this pull request?

To avoid the unnecessary CNF computation for full-outer joins, this PR fixes code for filtering out full-outer joins at the entrance of the rule.

### Why are the changes needed?

To mitigate optimizer overhead.

### Does this PR introduce _any_ user-facing change?

No.

### How was this patch tested?

Existing tests.

Closes #28810 from maropu/SPARK-31705.

Authored-by: Takeshi Yamamuro <yamamuro@apache.org>
Signed-off-by: Yuming Wang <wgyumg@gmail.com>
2020-06-16 09:13:00 -07:00
Max Gekk 6e9ff72195 [SPARK-31984][SQL] Make micros rebasing functions via local timestamps pure
### What changes were proposed in this pull request?
1. Set the given time zone as the first parameter of `RebaseDateTime`.`rebaseJulianToGregorianMicros()` and `rebaseGregorianToJulianMicros()` to Java 7 `GregorianCalendar`.
```scala
    val cal = new Calendar.Builder()
      // `gregory` is a hybrid calendar that supports both the Julian and Gregorian calendar systems
      .setCalendarType("gregory")
    ...
      .setTimeZone(tz)
      .build()
```
This makes the instance of the calendar independent from the default JVM time zone.

2. Change type of the first parameter from `ZoneId` to `TimeZone`. This allows to avoid unnecessary conversion from `TimeZone` to `ZoneId`, for example in
```scala
  def rebaseJulianToGregorianMicros(micros: Long): Long = {
    ...
      if (rebaseRecord == null || micros < rebaseRecord.switches(0)) {
        rebaseJulianToGregorianMicros(timeZone.toZoneId, micros)
```
and back to `TimeZone` inside of `rebaseJulianToGregorianMicros(zoneId: ZoneId, ...)`

3. Modify tests in `RebaseDateTimeSuite`, and set the default JVM time zone only for functions that depend on it.

### Why are the changes needed?
1. Ignoring passed parameter and using a global variable is bad practice.
2. Dependency from the global state doesn't allow to run the functions in parallel otherwise there is non-zero probability that the functions may return wrong result if the default JVM is changed during their execution.
3. This open opportunity for parallelisation of JSON files generation `gregorian-julian-rebase-micros.json` and `julian-gregorian-rebase-micros.json`. Currently, the tests `generate 'gregorian-julian-rebase-micros.json'` and `generate 'julian-gregorian-rebase-micros.json'` generate the JSON files by iterating over all time zones sequentially w/ step of 1 week. Due to the large step, we can miss some spikes in diffs between 2 calendars (Java 8 Gregorian and Java 7 hybrid calendars) as the PR https://github.com/apache/spark/pull/28787 fixed and https://github.com/apache/spark/pull/28816 should fix.

### Does this PR introduce _any_ user-facing change?
No

### How was this patch tested?
By running existing tests from `RebaseDateTimeSuite`.

Closes #28824 from MaxGekk/pure-micros-rebasing.

Authored-by: Max Gekk <max.gekk@gmail.com>
Signed-off-by: Wenchen Fan <wenchen@databricks.com>
2020-06-16 12:56:27 +00:00
GuoPhilipse f0e6d0ec13 [SPARK-31710][SQL] Fail casting numeric to timestamp by default
## What changes were proposed in this pull request?
we fail casting from numeric to timestamp by default.

## Why are the changes needed?
casting from numeric to timestamp is not a  non-standard,meanwhile it may generate different result between spark and other systems,for example hive

## Does this PR introduce any user-facing change?
Yes,user cannot cast numeric to timestamp directly,user have to use the following function to achieve the same effect:TIMESTAMP_SECONDS/TIMESTAMP_MILLIS/TIMESTAMP_MICROS

## How was this patch tested?
unit test added

Closes #28593 from GuoPhilipse/31710-fix-compatibility.

Lead-authored-by: GuoPhilipse <guofei_ok@126.com>
Co-authored-by: GuoPhilipse <46367746+GuoPhilipse@users.noreply.github.com>
Signed-off-by: Wenchen Fan <wenchen@databricks.com>
2020-06-16 08:35:35 +00:00
Max Gekk e9145d41f3 [SPARK-31986][SQL] Fix Julian-Gregorian micros rebasing of overlapping local timestamps
### What changes were proposed in this pull request?
It fixes microseconds rebasing from the hybrid calendar (Julian + Gregorian) to Proleptic Gregorian calendar in the function `RebaseDateTime`.`rebaseJulianToGregorianMicros(zoneId: ZoneId, micros: Long): Long` in the case of local timestamp overlapping.

In the case of overlapping, we look ahead of 1 day to determinate which instant we should take - earlier or later zoned timestamp. If our current standard zone and DST offsets are equal to zone offset of the next date, we choose the later timestamp otherwise the earlier one. For example, the local timestamp **1945-11-18 01:30:00.0** can be mapped to two instants (microseconds since 1970-01-01 00:00:00Z): -761211000000000 or -761207400000000. If the first one is passed to `rebaseJulianToGregorianMicros()`, we take the earlier instant in Proleptic Gregorian calendar while rebasing **1945-11-18T01:30+09:00[Asia/Hong_Kong]** otherwise the later one **1945-11-18T01:30+08:00[Asia/Hong_Kong]**.

Note: The fix assumes that only one transition of standard or DST offsets can occur during a day.

### Why are the changes needed?
Current implementation of `rebaseJulianToGregorianMicros()` handles timestamps overlapping only during daylight saving time but overlapping can happen also during transition from one standard time zone to another one. For example in the case of `Asia/Hong_Kong`, the time zone switched from `Japan Standard Time` (UTC+9) to `Hong Kong Time` (UTC+8) on _Sunday, 18 November, 1945 01:59:59 AM_. The changes allow to handle the special case as well.

### Does this PR introduce _any_ user-facing change?
There is no behaviour change for timestamps of CE after 0001-01-01. The PR might affects timestamps of BCE for which the modified `rebaseJulianToGregorianMicros()` is called directly.

### How was this patch tested?

1. By existing tests in `DateTimeUtilsSuite`, `RebaseDateTimeSuite`, `DateFunctionsSuite`, `DateExpressionsSuite` and `TimestampFormatterSuite`.

2. Added new checks to `RebaseDateTimeSuite`.`SPARK-31959: JST -> HKT at Asia/Hong_Kong in 1945`:
```scala
      assert(rebaseJulianToGregorianMicros(hkZid, rebasedEarlierMicros) === earlierMicros)
      assert(rebaseJulianToGregorianMicros(hkZid, rebasedLaterMicros) === laterMicros)
```

3. Regenerated `julian-gregorian-rebase-micros.json` with the step of 30 minutes, and got the same JSON file. The JSON file isn't affected because previously it was generated with the step of 1 week. And the spike in diffs/switch points during 1 hour of timestamp overlapping wasn't detected.

Closes #28816 from MaxGekk/fix-overlap-julian-2-grep.

Authored-by: Max Gekk <max.gekk@gmail.com>
Signed-off-by: Wenchen Fan <wenchen@databricks.com>
2020-06-16 06:00:05 +00:00
Takeshi Yamamuro 3698a14204 [SPARK-26905][SQL] Follow the SQL:2016 reserved keywords
### What changes were proposed in this pull request?

This PR intends to move keywords `ANTI`, `SEMI`, and `MINUS` from reserved to non-reserved.

### Why are the changes needed?

To comply with the ANSI/SQL standard.

### Does this PR introduce _any_ user-facing change?

No.

### How was this patch tested?

Added tests.

Closes #28807 from maropu/SPARK-26905-2.

Authored-by: Takeshi Yamamuro <yamamuro@apache.org>
Signed-off-by: Takeshi Yamamuro <yamamuro@apache.org>
2020-06-16 00:27:45 +09:00
Max Gekk eae1747b66 [SPARK-31959][SQL][TESTS][FOLLOWUP] Adopt the test "SPARK-31959: JST -> HKT at Asia/Hong_Kong in 1945" to outdated tzdb
### What changes were proposed in this pull request?
Old JDK can have outdated time zone database in which `Asia/Hong_Kong` doesn't have timestamp overlapping in 1946 at all. This PR changes the test "SPARK-31959: JST -> HKT at Asia/Hong_Kong in 1945" in `RebaseDateTimeSuite`, and makes it tolerant to the case.

### Why are the changes needed?
To fix the test failures on old JDK w/ outdated tzdb like on Jenkins machine `research-jenkins-worker-09`.

### Does this PR introduce _any_ user-facing change?
No

### How was this patch tested?
By running the test on old JDK

Closes #28832 from MaxGekk/HongKong-tz-1945-followup.

Authored-by: Max Gekk <max.gekk@gmail.com>
Signed-off-by: Dongjoon Hyun <dongjoon@apache.org>
2020-06-15 08:09:07 -07:00
Liang-Chi Hsieh 8282bbf12d [SPARK-27633][SQL] Remove redundant aliases in NestedColumnAliasing
## What changes were proposed in this pull request?

In NestedColumnAliasing rule, we create aliases for nested field access in project list. We considered that top level parent field and nested fields under it were both accessed. In the case, we don't create the aliases because they are redundant.

There is another case, where a nested parent field and nested fields under it were both accessed, which we don't consider now. We don't need to create aliases in this case too.

## How was this patch tested?

Added test.

Closes #24525 from viirya/SPARK-27633.

Lead-authored-by: Liang-Chi Hsieh <viirya@gmail.com>
Co-authored-by: Liang-Chi Hsieh <liangchi@uber.com>
Signed-off-by: HyukjinKwon <gurwls223@apache.org>
2020-06-15 11:01:56 +09:00
HyukjinKwon a620a2a7e5 [SPARK-31977][SQL] Returns the plan directly from NestedColumnAliasing
### What changes were proposed in this pull request?

This proposes a minor refactoring to match `NestedColumnAliasing` to `GeneratorNestedColumnAliasing` so it returns the pruned plan directly.

```scala
    case p  NestedColumnAliasing(nestedFieldToAlias, attrToAliases) =>
      NestedColumnAliasing.replaceToAliases(p, nestedFieldToAlias, attrToAliases)
```

vs

```scala
    case GeneratorNestedColumnAliasing(p) => p
```

### Why are the changes needed?

Just for readability.

### Does this PR introduce _any_ user-facing change?

No.

### How was this patch tested?

Existing tests should cover.

Closes #28812 from HyukjinKwon/SPARK-31977.

Authored-by: HyukjinKwon <gurwls223@apache.org>
Signed-off-by: Takeshi Yamamuro <yamamuro@apache.org>
2020-06-13 07:26:37 +09:00
Takeshi Yamamuro 78d08a8c38 [SPARK-31950][SQL][TESTS] Extract SQL keywords from the SqlBase.g4 file
### What changes were proposed in this pull request?

This PR intends to extract SQL reserved/non-reserved keywords from the ANTLR grammar file (`SqlBase.g4`) directly.

This approach is based on the cloud-fan suggestion: https://github.com/apache/spark/pull/28779#issuecomment-642033217

### Why are the changes needed?

It is hard to maintain a full set of the keywords in `TableIdentifierParserSuite`, so it would be nice if we could extract them from the `SqlBase.g4` file directly.

### Does this PR introduce _any_ user-facing change?

No.

### How was this patch tested?

Existing tests.

Closes #28802 from maropu/SPARK-31950-2.

Authored-by: Takeshi Yamamuro <yamamuro@apache.org>
Signed-off-by: Takeshi Yamamuro <yamamuro@apache.org>
2020-06-13 07:12:27 +09:00
Liang-Chi Hsieh ff89b11143 [SPARK-31736][SQL] Nested column aliasing for RepartitionByExpression/Join
### What changes were proposed in this pull request?

Currently we only push nested column pruning through a few operators such as LIMIT, SAMPLE, etc. This patch extends the feature to other operators including RepartitionByExpression, Join.

### Why are the changes needed?

Currently nested column pruning only applied on a few operators. It limits the benefit of nested column pruning. Extending nested column pruning coverage to make this feature more generally applied through different queries.

### Does this PR introduce _any_ user-facing change?

Yes. More SQL operators are covered by nested column pruning.

### How was this patch tested?

Added unit test, end-to-end tests.

Closes #28556 from viirya/others-column-pruning.

Authored-by: Liang-Chi Hsieh <viirya@gmail.com>
Signed-off-by: HyukjinKwon <gurwls223@apache.org>
2020-06-12 16:54:55 +09:00
Max Gekk c259844df8 [SPARK-31959][SQL][TEST-JAVA11] Fix Gregorian-Julian micros rebasing while switching standard time zone offset
### What changes were proposed in this pull request?
Fix the bug in microseconds rebasing during transitions from one standard time zone offset to another one. In the PR, I propose to change the implementation of `rebaseGregorianToJulianMicros` which performs rebasing via local timestamps. In the case of overlapping:
1. Check that the original instant belongs to earlier or later instant of overlapped local timestamp.
2. If it is an earlier instant, take zone and DST offsets from the previous day otherwise
3. Set time zone offsets to Julian timestamp from the next day.

Note: The fix assumes that transitions cannot happen more often than once per 2 days.

### Why are the changes needed?
Current implementation handles timestamps overlapping only during daylight saving time but overlapping can happen also during transition from one standard time zone to another one. For example in the case of `Asia/Hong_Kong`, the time zone switched from `Japan Standard Time` (UTC+9) to `Hong Kong Time` (UTC+8) on _Sunday, 18 November, 1945 01:59:59 AM_. The changes allow to handle the special case as well.

### Does this PR introduce _any_ user-facing change?
It might affect micros rebasing in before common era when not-optimised version of `rebaseGregorianToJulianMicros()` is used directly.

### How was this patch tested?
1. By existing tests in `DateTimeUtilsSuite`, `RebaseDateTimeSuite`, `DateFunctionsSuite`, `DateExpressionsSuite` and `TimestampFormatterSuite`.
2. Added new test to `RebaseDateTimeSuite`
3. Regenerated `gregorian-julian-rebase-micros.json` with the step of 30 minutes, and got the same JSON file. The JSON file isn't affected because previously it was generated with the step of 1 week. And the spike in diffs/switch points during 1 hour of timestamp overlapping wasn't detected.

Closes #28787 from MaxGekk/HongKong-tz-1945.

Authored-by: Max Gekk <max.gekk@gmail.com>
Signed-off-by: Wenchen Fan <wenchen@databricks.com>
2020-06-12 06:17:31 +00:00
Yuming Wang 78f9043862 [SPARK-31912][SQL][TESTS] Normalize all binary comparison expressions
### What changes were proposed in this pull request?

This pr normalize all binary comparison expressions when comparing plans.

### Why are the changes needed?

Improve test framework, otherwise this test will fail:
```scala
  test("SPARK-31912 Normalize all binary comparison expressions") {
    val original = testRelation
      .where('a === 'b && Literal(13) >= 'b).as("x")
    val optimized = testRelation
      .where(IsNotNull('a) && IsNotNull('b) && 'a === 'b && 'b <= 13 && 'a <= 13).as("x")
    comparePlans(Optimize.execute(original.analyze), optimized.analyze)
  }
```

### Does this PR introduce _any_ user-facing change?
No.

### How was this patch tested?

Manual test.

Closes #28734 from wangyum/SPARK-31912.

Authored-by: Yuming Wang <yumwang@ebay.com>
Signed-off-by: Yuming Wang <wgyumg@gmail.com>
2020-06-11 22:50:36 -07:00
Dilip Biswal b87a342c7d [SPARK-31916][SQL] StringConcat can lead to StringIndexOutOfBoundsException
### What changes were proposed in this pull request?
A minor fix to fix the append method of StringConcat to cap the length at MAX_ROUNDED_ARRAY_LENGTH to make sure it does not overflow and cause StringIndexOutOfBoundsException

Thanks to **Jeffrey Stokes** for reporting the issue and explaining the underlying problem in detail in the JIRA.

### Why are the changes needed?
This fixes StringIndexOutOfBoundsException on an overflow.

### Does this PR introduce any user-facing change?
No.

### How was this patch tested?
Added a test in StringsUtilSuite.

Closes #28750 from dilipbiswal/SPARK-31916.

Authored-by: Dilip Biswal <dkbiswal@gmail.com>
Signed-off-by: Takeshi Yamamuro <yamamuro@apache.org>
2020-06-12 09:19:29 +09:00
Takeshi Yamamuro b1adc3deee [SPARK-21117][SQL] Built-in SQL Function Support - WIDTH_BUCKET
### What changes were proposed in this pull request?

This PR intends to add a build-in SQL function - `WIDTH_BUCKET`.
It is the rework of #18323.

Closes #18323

The other RDBMS references for `WIDTH_BUCKET`:
 - Oracle: https://docs.oracle.com/cd/B28359_01/olap.111/b28126/dml_functions_2137.htm#OLADM717
 - PostgreSQL: https://www.postgresql.org/docs/current/functions-math.html
 - Snowflake: https://docs.snowflake.com/en/sql-reference/functions/width_bucket.html
 - Prestodb: https://prestodb.io/docs/current/functions/math.html
 - Teradata: https://docs.teradata.com/reader/kmuOwjp1zEYg98JsB8fu_A/Wa8vw69cGzoRyNULHZeudg
 - DB2: https://www.ibm.com/support/producthub/db2/docs/content/SSEPGG_11.5.0/com.ibm.db2.luw.sql.ref.doc/doc/r0061483.html?pos=2

### Why are the changes needed?

For better usability.

### Does this PR introduce _any_ user-facing change?

No.

### How was this patch tested?

Added unit tests.

Closes #28764 from maropu/SPARK-21117.

Lead-authored-by: Takeshi Yamamuro <yamamuro@apache.org>
Co-authored-by: Yuming Wang <wgyumg@gmail.com>
Signed-off-by: Dongjoon Hyun <dongjoon@apache.org>
2020-06-11 14:15:28 -07:00
Gengliang Wang 11d3a744e2 [SPARK-31705][SQL] Push more possible predicates through Join via CNF conversion
### What changes were proposed in this pull request?

This PR add a new rule to support push predicate through join by rewriting join condition to CNF(conjunctive normal form). The following example is the steps of this rule:

1. Prepare Table:

```sql
CREATE TABLE x(a INT);
CREATE TABLE y(b INT);
...
SELECT * FROM x JOIN y ON ((a < 0 and a > b) or a > 10);
```

2. Convert the join condition to CNF:
```
(a < 0 or a > 10) and (a > b or a > 10)
```

3. Split conjunctive predicates

Predicates
---|
(a < 0 or a > 10)
(a > b or a > 10)

4. Push predicate

Table | Predicate
--- | ---
x | (a < 0 or a > 10)

### Why are the changes needed?
Improve query performance. PostgreSQL, [Impala](https://issues.apache.org/jira/browse/IMPALA-9183) and Hive support this feature.

### Does this PR introduce _any_ user-facing change?
No.

### How was this patch tested?
Unit test and benchmark test.

SQL | Before this PR | After this PR
--- | --- | ---
TPCDS 5T Q13 | 84s | 21s
TPCDS 5T q85 | 66s | 34s
TPCH 1T q19 | 37s | 32s

Closes #28733 from gengliangwang/cnf.

Lead-authored-by: Gengliang Wang <gengliang.wang@databricks.com>
Co-authored-by: Yuming Wang <yumwang@ebay.com>
Signed-off-by: Gengliang Wang <gengliang.wang@databricks.com>
2020-06-11 10:13:45 -07:00
Wenchen Fan 6fb9c80da1 [SPARK-31958][SQL] normalize special floating numbers in subquery
### What changes were proposed in this pull request?

This is a followup of https://github.com/apache/spark/pull/23388 .

https://github.com/apache/spark/pull/23388 has an issue: it doesn't handle subquery expressions and assumes they will be turned into joins. However, this is not true for non-correlated subquery expressions.

This PR fixes this issue. It now doesn't skip `Subquery`, and subquery expressions will be handled by `OptimizeSubqueries`, which runs the optimizer with the subquery.

Note that, correlated subquery expressions will be handled twice: once in `OptimizeSubqueries`, once later when it becomes join. This is OK as `NormalizeFloatingNumbers` is idempotent now.

### Why are the changes needed?

fix a bug

### Does this PR introduce _any_ user-facing change?

yes, see the newly added test.

### How was this patch tested?

new test

Closes #28785 from cloud-fan/normalize.

Authored-by: Wenchen Fan <wenchen@databricks.com>
Signed-off-by: Wenchen Fan <wenchen@databricks.com>
2020-06-11 06:39:14 +00:00
Kent Yao 22dda6e18e [SPARK-31939][SQL][TEST-JAVA11] Fix Parsing day of year when year field pattern is missing
### What changes were proposed in this pull request?

If a datetime pattern contains no year field, the day of year field should not be ignored if exists

e.g.

```
spark-sql> select to_timestamp('31', 'DD');
1970-01-01 00:00:00
spark-sql> select to_timestamp('31 30', 'DD dd');
1970-01-30 00:00:00

spark.sql.legacy.timeParserPolicy legacy
spark-sql> select to_timestamp('31', 'DD');
1970-01-31 00:00:00
spark-sql> select to_timestamp('31 30', 'DD dd');
NULL
```

This PR only fixes some corner cases that use 'D' pattern to parse datetimes and there is w/o 'y'.

### Why are the changes needed?

fix some corner cases

### Does this PR introduce _any_ user-facing change?

yes, the day of year field will not be ignored

### How was this patch tested?

add unit tests.

Closes #28766 from yaooqinn/SPARK-31939.

Authored-by: Kent Yao <yaooqinn@hotmail.com>
Signed-off-by: Wenchen Fan <wenchen@databricks.com>
2020-06-11 03:29:12 +00:00
Liang-Chi Hsieh 43063e2db2 [SPARK-27217][SQL] Nested column aliasing for more operators which can prune nested column
### What changes were proposed in this pull request?

Currently we only push nested column pruning from a Project through a few operators such as LIMIT, SAMPLE, etc. There are a few operators like Aggregate, Expand which can prune nested columns by themselves, without a Project on top.

This patch extends the feature to those operators.

### Why are the changes needed?

Currently nested column pruning only applied on a few cases. It limits the benefit of nested column pruning. Extending nested column pruning coverage to make this feature more generally applied through different queries.

### Does this PR introduce _any_ user-facing change?

Yes. More SQL operators are covered by nested column pruning.

### How was this patch tested?

Added unit test, end-to-end tests.

Closes #28560 from viirya/SPARK-27217-2.

Authored-by: Liang-Chi Hsieh <viirya@gmail.com>
Signed-off-by: HyukjinKwon <gurwls223@apache.org>
2020-06-10 18:08:47 +09:00
Takeshi Yamamuro e14029b18d [SPARK-26905][SQL] Add TYPE in the ANSI non-reserved list
### What changes were proposed in this pull request?

This PR intends to add `TYPE` in the ANSI non-reserved list because it is not reserved in the standard. See SPARK-26905 for a full set of the reserved/non-reserved keywords of `SQL:2016`.

Note: The current master behaviour is as follows;
```
scala> sql("SET spark.sql.ansi.enabled=false")
scala> sql("create table t1 (type int)")
res4: org.apache.spark.sql.DataFrame = []

scala> sql("SET spark.sql.ansi.enabled=true")
scala> sql("create table t2 (type int)")
org.apache.spark.sql.catalyst.parser.ParseException:
no viable alternative at input 'type'(line 1, pos 17)

== SQL ==
create table t2 (type int)
-----------------^^^
```

### Why are the changes needed?

To follow the ANSI/SQL standard.

### Does this PR introduce _any_ user-facing change?

Makes users use `TYPE` as identifiers.

### How was this patch tested?

Update the keyword lists in `TableIdentifierParserSuite`.

Closes #28773 from maropu/SPARK-26905.

Authored-by: Takeshi Yamamuro <yamamuro@apache.org>
Signed-off-by: Takeshi Yamamuro <yamamuro@apache.org>
2020-06-10 16:29:43 +09:00
Kent Yao 6a424b93e5 [SPARK-31830][SQL] Consistent error handling for datetime formatting and parsing functions
### What changes were proposed in this pull request?
Currently, `date_format` and `from_unixtime`, `unix_timestamp`,`to_unix_timestamp`, `to_timestamp`, `to_date`  have different exception handling behavior for formatting datetime values.

In this PR, we apply the exception handling behavior of `date_format` to `from_unixtime`, `unix_timestamp`,`to_unix_timestamp`, `to_timestamp` and `to_date`.

In the phase of creating the datetime formatted or formating, exceptions will be raised.

e.g.

```java
spark-sql> select date_format(make_timestamp(1, 1 ,1,1,1,1), 'yyyyyyyyyyy-MM-aaa');
20/05/28 15:25:38 ERROR SparkSQLDriver: Failed in [select date_format(make_timestamp(1, 1 ,1,1,1,1), 'yyyyyyyyyyy-MM-aaa')]
org.apache.spark.SparkUpgradeException: You may get a different result due to the upgrading of Spark 3.0: Fail to recognize 'yyyyyyyyyyy-MM-aaa' pattern in the DateTimeFormatter. 1) You can set spark.sql.legacy.timeParserPolicy to LEGACY to restore the behavior before Spark 3.0. 2) You can form a valid datetime pattern with the guide from https://spark.apache.org/docs/latest/sql-ref-datetime-pattern.html
```

```java
spark-sql> select date_format(make_timestamp(1, 1 ,1,1,1,1), 'yyyyyyyyyyy-MM-AAA');
20/05/28 15:26:10 ERROR SparkSQLDriver: Failed in [select date_format(make_timestamp(1, 1 ,1,1,1,1), 'yyyyyyyyyyy-MM-AAA')]
java.lang.IllegalArgumentException: Illegal pattern character: A
```

```java
spark-sql> select date_format(make_timestamp(1,1,1,1,1,1), 'yyyyyyyyyyy-MM-dd');
20/05/28 15:23:23 ERROR SparkSQLDriver: Failed in [select date_format(make_timestamp(1,1,1,1,1,1), 'yyyyyyyyyyy-MM-dd')]
java.lang.ArrayIndexOutOfBoundsException: 11
	at java.time.format.DateTimeFormatterBuilder$NumberPrinterParser.format(DateTimeFormatterBuilder.java:2568)
```
In the phase of parsing, `DateTimeParseException | DateTimeException | ParseException` will be suppressed, but `SparkUpgradeException` will still be raised

e.g.

```java
spark-sql> set spark.sql.legacy.timeParserPolicy=exception;
spark.sql.legacy.timeParserPolicy	exception
spark-sql> select to_timestamp("2020-01-27T20:06:11.847-0800", "yyyy-MM-dd'T'HH:mm:ss.SSSz");
20/05/28 15:31:15 ERROR SparkSQLDriver: Failed in [select to_timestamp("2020-01-27T20:06:11.847-0800", "yyyy-MM-dd'T'HH:mm:ss.SSSz")]
org.apache.spark.SparkUpgradeException: You may get a different result due to the upgrading of Spark 3.0: Fail to parse '2020-01-27T20:06:11.847-0800' in the new parser. You can set spark.sql.legacy.timeParserPolicy to LEGACY to restore the behavior before Spark 3.0, or set to CORRECTED and treat it as an invalid datetime string.
```

```java
spark-sql> set spark.sql.legacy.timeParserPolicy=corrected;
spark.sql.legacy.timeParserPolicy	corrected
spark-sql> select to_timestamp("2020-01-27T20:06:11.847-0800", "yyyy-MM-dd'T'HH:mm:ss.SSSz");
NULL
spark-sql> set spark.sql.legacy.timeParserPolicy=legacy;
spark.sql.legacy.timeParserPolicy	legacy
spark-sql> select to_timestamp("2020-01-27T20:06:11.847-0800", "yyyy-MM-dd'T'HH:mm:ss.SSSz");
2020-01-28 12:06:11.847
```

### Why are the changes needed?
Consistency

### Does this PR introduce _any_ user-facing change?

Yes, invalid datetime patterns will fail `from_unixtime`, `unix_timestamp`,`to_unix_timestamp`, `to_timestamp` and `to_date` instead of resulting `NULL`

### How was this patch tested?

add more tests

Closes #28650 from yaooqinn/SPARK-31830.

Authored-by: Kent Yao <yaooqinn@hotmail.com>
Signed-off-by: Wenchen Fan <wenchen@databricks.com>
2020-06-09 16:56:45 +00:00
Yuming Wang 1d1eacde9d [SPARK-31220][SQL] repartition obeys initialPartitionNum when adaptiveExecutionEnabled
### What changes were proposed in this pull request?
This PR makes `repartition`/`DISTRIBUTE BY` obeys [initialPartitionNum](af4248b2d6/sql/catalyst/src/main/scala/org/apache/spark/sql/internal/SQLConf.scala (L446-L455)) when adaptive execution enabled.

### Why are the changes needed?
To make `DISTRIBUTE BY`/`GROUP BY` partitioned by same partition number.
How to reproduce:
```scala
spark.sql("CREATE TABLE spark_31220(id int)")
spark.sql("set spark.sql.adaptive.enabled=true")
spark.sql("set spark.sql.adaptive.coalescePartitions.initialPartitionNum=1000")
```

Before this PR:
```
scala> spark.sql("SELECT id from spark_31220 GROUP BY id").explain
== Physical Plan ==
AdaptiveSparkPlan(isFinalPlan=false)
+- HashAggregate(keys=[id#5], functions=[])
   +- Exchange hashpartitioning(id#5, 1000), true, [id=#171]
      +- HashAggregate(keys=[id#5], functions=[])
         +- FileScan parquet default.spark_31220[id#5]

scala> spark.sql("SELECT id from spark_31220 DISTRIBUTE BY id").explain
== Physical Plan ==
AdaptiveSparkPlan(isFinalPlan=false)
+- Exchange hashpartitioning(id#5, 200), false, [id=#179]
   +- FileScan parquet default.spark_31220[id#5]
```
After this PR:
```
scala> spark.sql("SELECT id from spark_31220 GROUP BY id").explain
== Physical Plan ==
AdaptiveSparkPlan(isFinalPlan=false)
+- HashAggregate(keys=[id#5], functions=[])
   +- Exchange hashpartitioning(id#5, 1000), true, [id=#171]
      +- HashAggregate(keys=[id#5], functions=[])
         +- FileScan parquet default.spark_31220[id#5]

scala> spark.sql("SELECT id from spark_31220 DISTRIBUTE BY id").explain
== Physical Plan ==
AdaptiveSparkPlan(isFinalPlan=false)
+- Exchange hashpartitioning(id#5, 1000), false, [id=#179]
   +- FileScan parquet default.spark_31220[id#5]
```

### Does this PR introduce any user-facing change?
No.

### How was this patch tested?
Unit test.

Closes #27986 from wangyum/SPARK-31220.

Authored-by: Yuming Wang <yumwang@ebay.com>
Signed-off-by: Wenchen Fan <wenchen@databricks.com>
2020-06-09 16:07:22 +00:00
Max Gekk de91915a24 [SPARK-31940][SQL][DOCS] Document the default JVM time zone in to/fromJavaDate and legacy date formatters
### What changes were proposed in this pull request?
Update comments for `DateTimeUtils`.`toJavaDate` and `fromJavaDate`, and for the legacy date formatters `LegacySimpleDateFormatter` and `LegacyFastDateFormatter` regarding to the default JVM time zone. The comments say that the default JVM time zone is used intentionally for backward compatibility with Spark 2.4 and earlier versions.

Closes #28709

### Why are the changes needed?
To document current behaviour of related methods in `DateTimeUtils` and the legacy date formatters. For example, correctness of `HiveResult.hiveResultString` and `toHiveString` is directly related to the same time zone used by `toJavaDate` and `LegacyFastDateFormatter`.

### Does this PR introduce _any_ user-facing change?
No

### How was this patch tested?
By running the Scala style checker `./dev/scalastyle`

Closes #28767 from MaxGekk/doc-legacy-formatters.

Authored-by: Max Gekk <max.gekk@gmail.com>
Signed-off-by: Wenchen Fan <wenchen@databricks.com>
2020-06-09 15:20:13 +00:00
lipzhu ca2cfd4185 [SPARK-31906][SQL][DOCS] Enhance comments in NamedExpression.qualifier
### Why are the changes needed?
The qualifier name should contains catalog name.

### Does this PR introduce _any_ user-facing change?
NO.

### How was this patch tested?
UT.

Closes #28726 from lipzhu/SPARK-31906.

Authored-by: lipzhu <lipzhu@ebay.com>
Signed-off-by: Wenchen Fan <wenchen@databricks.com>
2020-06-09 13:59:00 +00:00
Kent Yao fc6af9d900 [SPARK-31867][SQL][FOLLOWUP] Check result differences for datetime formatting
### What changes were proposed in this pull request?

In this PR, we throw `SparkUpgradeException` when getting `DateTimeException` for datetime formatting in the `EXCEPTION` legacy Time Parser Policy.

### Why are the changes needed?
`DateTimeException` is also declared by `java.time.format.DateTimeFormatter#format`, but in Spark, it can barely occur. We have suspected one that due to a JDK bug so far. see https://bugs.openjdk.java.net/browse/JDK-8079628.

For `from_unixtime` function, we will suppress the DateTimeException caused by `DD` and result `NULL`. It is a silent date change that should be avoided in Java 8.

### Does this PR introduce _any_ user-facing change?

Yes,  when running on Java8 and using `from_unixtime` function with pattern `DD` to format datetimes, if dayofyear>=100, `SparkUpgradeException` will alert users instead of silently resulting null. For `date_format`, `SparkUpgradeException` take the palace of  `DateTimeException`.

### How was this patch tested?

add unit tests.

Closes #28736 from yaooqinn/SPARK-31867-F.

Authored-by: Kent Yao <yaooqinn@hotmail.com>
Signed-off-by: Wenchen Fan <wenchen@databricks.com>
2020-06-05 16:44:16 +00:00
Kent Yao 9d5b5d0a58 [SPARK-31879][SQL][TEST-JAVA11] Make week-based pattern invalid for formatting too
# What changes were proposed in this pull request?

After all these attempts https://github.com/apache/spark/pull/28692 and https://github.com/apache/spark/pull/28719 an https://github.com/apache/spark/pull/28727.
they all have limitations as mentioned in their discussions.

Maybe the only way is to forbid them all

### Why are the changes needed?

These week-based fields need Locale to express their semantics, the first day of the week varies from country to country.

From the Java doc of WeekFields
```java
    /**
     * Gets the first day-of-week.
     * <p>
     * The first day-of-week varies by culture.
     * For example, the US uses Sunday, while France and the ISO-8601 standard use Monday.
     * This method returns the first day using the standard {code DayOfWeek} enum.
     *
     * return the first day-of-week, not null
     */
    public DayOfWeek getFirstDayOfWeek() {
        return firstDayOfWeek;
    }
```

But for the SimpleDateFormat, the day-of-week is not localized

```
u	Day number of week (1 = Monday, ..., 7 = Sunday)	Number	1
```

Currently, the default locale we use is the US, so the result moved a day or a year or a week backward.

e.g.

For the date `2019-12-29(Sunday)`, in the Sunday Start system(e.g. en-US), it belongs to 2020 of week-based-year, in the Monday Start system(en-GB), it goes to 2019. the week-of-week-based-year(w) will be affected too

```sql
spark-sql> SELECT to_csv(named_struct('time', to_timestamp('2019-12-29', 'yyyy-MM-dd')), map('timestampFormat', 'YYYY', 'locale', 'en-US'));
2020
spark-sql> SELECT to_csv(named_struct('time', to_timestamp('2019-12-29', 'yyyy-MM-dd')), map('timestampFormat', 'YYYY', 'locale', 'en-GB'));
2019

spark-sql> SELECT to_csv(named_struct('time', to_timestamp('2019-12-29', 'yyyy-MM-dd')), map('timestampFormat', 'YYYY-ww-uu', 'locale', 'en-US'));
2020-01-01
spark-sql> SELECT to_csv(named_struct('time', to_timestamp('2019-12-29', 'yyyy-MM-dd')), map('timestampFormat', 'YYYY-ww-uu', 'locale', 'en-GB'));
2019-52-07

spark-sql> SELECT to_csv(named_struct('time', to_timestamp('2020-01-05', 'yyyy-MM-dd')), map('timestampFormat', 'YYYY-ww-uu', 'locale', 'en-US'));
2020-02-01
spark-sql> SELECT to_csv(named_struct('time', to_timestamp('2020-01-05', 'yyyy-MM-dd')), map('timestampFormat', 'YYYY-ww-uu', 'locale', 'en-GB'));
2020-01-07
```

For other countries, please refer to [First Day of the Week in Different Countries](http://chartsbin.com/view/41671)

### Does this PR introduce _any_ user-facing change?
With this change, user can not use 'YwuW',  but 'e' for 'u' instead. This can at least turn this not to be a silent data change.

### How was this patch tested?

add unit tests

Closes #28728 from yaooqinn/SPARK-31879-NEW2.

Authored-by: Kent Yao <yaooqinn@hotmail.com>
Signed-off-by: Wenchen Fan <wenchen@databricks.com>
2020-06-05 08:14:01 +00:00
Wenchen Fan dc0709fa0c [SPARK-29947][SQL][FOLLOWUP] ResolveRelations should return relations with fresh attribute IDs
### What changes were proposed in this pull request?

This is a followup of https://github.com/apache/spark/pull/26589, which caches the table relations to speed up the table lookup. However, it brings some side effects: the rule `ResolveRelations` may return exactly the same relations, while before it always returns relations with fresh attribute IDs.

This PR is to eliminate this side effect.

### Why are the changes needed?

There is no bug report yet, but this side effect may impact things like self-join. It's better to restore the 2.4 behavior and always return refresh relations.

### Does this PR introduce _any_ user-facing change?

no

### How was this patch tested?

N/A

Closes #28717 from cloud-fan/fix.

Authored-by: Wenchen Fan <wenchen@databricks.com>
Signed-off-by: Wenchen Fan <wenchen@databricks.com>
2020-06-03 19:08:36 +00:00
Wenchen Fan e61d0de11f Revert "[SPARK-31879][SQL] Using GB as default Locale for datetime formatters"
This reverts commit c59f51bcc2.
2020-06-04 01:54:22 +08:00
Kent Yao afcc14c6d2 [SPARK-31896][SQL] Handle am-pm timestamp parsing when hour is missing
### What changes were proposed in this pull request?

This PR set the hour to 12/0 when the AMPM_OF_DAY field exists

### Why are the changes needed?

When the hour is absent but the am-pm is present, the time is incorrect for pm

### Does this PR introduce _any_ user-facing change?
yes, the change is user-facing but to change back to 2.4 to keep backward compatibility

e.g.
```sql
spark-sql> select to_timestamp('33:33 PM', 'mm:ss a');
1970-01-01 12:33:33
spark-sql> select to_timestamp('33:33 AM', 'mm:ss a');
1970-01-01 00:33:33

```

otherwise, the results are all `1970-01-01 00:33:33`

### How was this patch tested?

add unit tests

Closes #28713 from yaooqinn/SPARK-31896.

Authored-by: Kent Yao <yaooqinn@hotmail.com>
Signed-off-by: Wenchen Fan <wenchen@databricks.com>
2020-06-03 13:30:22 +00:00
Kent Yao afe95bd9ad [SPARK-31892][SQL] Disable week-based date filed for parsing
### What changes were proposed in this pull request?

This PR disables week-based date filed for parsing

closes #28674
### Why are the changes needed?

1. It's an un-fixable behavior change to fill the gap between SimpleDateFormat and DateTimeFormater and backward-compatibility for different JDKs.A lot of effort has been made to prove it at https://github.com/apache/spark/pull/28674

2. The existing behavior itself in 2.4 is confusing, e.g.

```sql
spark-sql> select to_timestamp('1', 'w');
1969-12-28 00:00:00
spark-sql> select to_timestamp('1', 'u');
1970-01-05 00:00:00
```
  The 'u' here seems not to go to the Monday of the first week in week-based form or the first day of the year in non-week-based form but go to the Monday of the second week in week-based form.

And, e.g.
```sql
spark-sql> select to_timestamp('2020 2020', 'YYYY yyyy');
2020-01-01 00:00:00
spark-sql> select to_timestamp('2020 2020', 'yyyy YYYY');
2019-12-29 00:00:00
spark-sql> select to_timestamp('2020 2020 1', 'YYYY yyyy w');
NULL
spark-sql> select to_timestamp('2020 2020 1', 'yyyy YYYY w');
2019-12-29 00:00:00
```

  I think we don't need to introduce all the weird behavior from Java

3. The current test coverage for week-based date fields is almost 0%, which indicates that we've never imagined using it.

4. Avoiding JDK bugs

https://issues.apache.org/jira/browse/SPARK-31880

### Does this PR introduce _any_ user-facing change?

Yes, the 'Y/W/w/u/F/E' pattern cannot be used datetime parsing functions.

### How was this patch tested?

more tests added

Closes #28706 from yaooqinn/SPARK-31892.

Authored-by: Kent Yao <yaooqinn@hotmail.com>
Signed-off-by: Wenchen Fan <wenchen@databricks.com>
2020-06-03 06:49:58 +00:00
Kent Yao c59f51bcc2 [SPARK-31879][SQL] Using GB as default Locale for datetime formatters
# What changes were proposed in this pull request?

This PR switches the default Locale from the `US` to `GB` to change the behavior of the first day of the week from Sunday-started to Monday-started as same as v2.4

### Why are the changes needed?

#### cases
```sql
spark-sql> select to_timestamp('2020-1-1', 'YYYY-w-u');
2019-12-29 00:00:00
spark-sql> set spark.sql.legacy.timeParserPolicy=legacy;
spark.sql.legacy.timeParserPolicy	legacy
spark-sql> select to_timestamp('2020-1-1', 'YYYY-w-u');
2019-12-30 00:00:00
```

#### reasons

These week-based fields need Locale to express their semantics, the first day of the week varies from country to country.

From the Java doc of WeekFields
```java
    /**
     * Gets the first day-of-week.
     * <p>
     * The first day-of-week varies by culture.
     * For example, the US uses Sunday, while France and the ISO-8601 standard use Monday.
     * This method returns the first day using the standard {code DayOfWeek} enum.
     *
     * return the first day-of-week, not null
     */
    public DayOfWeek getFirstDayOfWeek() {
        return firstDayOfWeek;
    }
```

But for the SimpleDateFormat, the day-of-week is not localized

```
u	Day number of week (1 = Monday, ..., 7 = Sunday)	Number	1
```

Currently, the default locale we use is the US, so the result moved a day backward.

For other countries, please refer to [First Day of the Week in Different Countries](http://chartsbin.com/view/41671)

With this change, it restores the first day of week calculating for functions when using the default locale.

### Does this PR introduce _any_ user-facing change?

Yes, but the behavior change is used to restore the old one of v2.4

### How was this patch tested?

add unit tests

Closes #28692 from yaooqinn/SPARK-31879.

Authored-by: Kent Yao <yaooqinn@hotmail.com>
Signed-off-by: Wenchen Fan <wenchen@databricks.com>
2020-06-03 06:07:53 +00:00
lipzhu d79a8a88b1 [SPARK-31834][SQL] Improve error message for incompatible data types
### What changes were proposed in this pull request?
We should use dataType.catalogString to unified the data type mismatch message.
Before:
```sql
spark-sql> create table SPARK_31834(a int) using parquet;
spark-sql> insert into SPARK_31834 select '1';
Error in query: Cannot write incompatible data to table '`default`.`spark_31834`':
- Cannot safely cast 'a': StringType to IntegerType;
```

After:
```sql
spark-sql> create table SPARK_31834(a int) using parquet;
spark-sql> insert into SPARK_31834 select '1';
Error in query: Cannot write incompatible data to table '`default`.`spark_31834`':
- Cannot safely cast 'a': string to int;
```

### How was this patch tested?
UT.

Closes #28654 from lipzhu/SPARK-31834.

Authored-by: lipzhu <lipzhu@ebay.com>
Signed-off-by: HyukjinKwon <gurwls223@apache.org>
2020-06-02 21:07:10 +09:00
Sunitha Kambhampati 4161c62429 [SPARK-28067][SQL] Fix incorrect results for decimal aggregate sum by returning null on decimal overflow
### What changes were proposed in this pull request?

JIRA SPARK-28067:  Wrong results are returned for aggregate sum with decimals with whole stage codegen enabled

**Repro:**
WholeStage enabled enabled ->  Wrong results
WholeStage disabled -> Returns exception Decimal precision 39 exceeds max precision 38

**Issues:**
1. Wrong results are returned which is bad
2. Inconsistency between whole stage enabled and disabled.

**Cause:**
Sum does not take care of possibility of overflow for the intermediate steps.  ie the updateExpressions and mergeExpressions.

This PR makes the following changes:
- Add changes to check if overflow occurs for decimal in aggregate Sum and if there is an overflow,  it will return null for the Sum operation when spark.sql.ansi.enabled is false.
- When spark.sql.ansi.enabled is true, then the sum operation will return an exception if an overflow occurs for the decimal operation in Sum.
- This is keeping it consistent with the behavior defined in spark.sql.ansi.enabled property

**Before the fix:  Scenario 1:** - WRONG RESULTS
```
scala> val df = Seq(
     |  (BigDecimal("10000000000000000000"), 1),
     |  (BigDecimal("10000000000000000000"), 1),
     |  (BigDecimal("10000000000000000000"), 2),
     |  (BigDecimal("10000000000000000000"), 2),
     |  (BigDecimal("10000000000000000000"), 2),
     |  (BigDecimal("10000000000000000000"), 2),
     |  (BigDecimal("10000000000000000000"), 2),
     |  (BigDecimal("10000000000000000000"), 2),
     |  (BigDecimal("10000000000000000000"), 2),
     |  (BigDecimal("10000000000000000000"), 2),
     |  (BigDecimal("10000000000000000000"), 2),
     |  (BigDecimal("10000000000000000000"), 2)).toDF("decNum", "intNum")
df: org.apache.spark.sql.DataFrame = [decNum: decimal(38,18), intNum: int]

scala> val df2 = df.withColumnRenamed("decNum", "decNum2").join(df, "intNum").agg(sum("decNum"))
df2: org.apache.spark.sql.DataFrame = [sum(decNum): decimal(38,18)]

scala> df2.show(40,false)
+---------------------------------------+
|sum(decNum)                            |
+---------------------------------------+
|20000000000000000000.000000000000000000|
+---------------------------------------+
```

--
**Before fix: Scenario2:  Setting spark.sql.ansi.enabled to true** - WRONG RESULTS
```
scala> spark.conf.set("spark.sql.ansi.enabled", "true")

scala> val df = Seq(
     |  (BigDecimal("10000000000000000000"), 1),
     |  (BigDecimal("10000000000000000000"), 1),
     |  (BigDecimal("10000000000000000000"), 2),
     |  (BigDecimal("10000000000000000000"), 2),
     |  (BigDecimal("10000000000000000000"), 2),
     |  (BigDecimal("10000000000000000000"), 2),
     |  (BigDecimal("10000000000000000000"), 2),
     |  (BigDecimal("10000000000000000000"), 2),
     |  (BigDecimal("10000000000000000000"), 2),
     |  (BigDecimal("10000000000000000000"), 2),
     |  (BigDecimal("10000000000000000000"), 2),
     |  (BigDecimal("10000000000000000000"), 2)).toDF("decNum", "intNum")
df: org.apache.spark.sql.DataFrame = [decNum: decimal(38,18), intNum: int]

scala> val df2 = df.withColumnRenamed("decNum", "decNum2").join(df, "intNum").agg(sum("decNum"))
df2: org.apache.spark.sql.DataFrame = [sum(decNum): decimal(38,18)]

scala> df2.show(40,false)
+---------------------------------------+
|sum(decNum)                            |
+---------------------------------------+
|20000000000000000000.000000000000000000|
+---------------------------------------+

```

**After the fix: Scenario1:**
```
scala> val df = Seq(
     |  (BigDecimal("10000000000000000000"), 1),
     |  (BigDecimal("10000000000000000000"), 1),
     |  (BigDecimal("10000000000000000000"), 2),
     |  (BigDecimal("10000000000000000000"), 2),
     |  (BigDecimal("10000000000000000000"), 2),
     |  (BigDecimal("10000000000000000000"), 2),
     |  (BigDecimal("10000000000000000000"), 2),
     |  (BigDecimal("10000000000000000000"), 2),
     |  (BigDecimal("10000000000000000000"), 2),
     |  (BigDecimal("10000000000000000000"), 2),
     |  (BigDecimal("10000000000000000000"), 2),
     |  (BigDecimal("10000000000000000000"), 2)).toDF("decNum", "intNum")
df: org.apache.spark.sql.DataFrame = [decNum: decimal(38,18), intNum: int]

scala> val df2 = df.withColumnRenamed("decNum", "decNum2").join(df, "intNum").agg(sum("decNum"))
df2: org.apache.spark.sql.DataFrame = [sum(decNum): decimal(38,18)]

scala>  df2.show(40,false)
+-----------+
|sum(decNum)|
+-----------+
|null       |
+-----------+

```

**After fix:  Scenario2:  Setting the spark.sql.ansi.enabled to true:**
```
scala> spark.conf.set("spark.sql.ansi.enabled", "true")

scala> val df = Seq(
     |  (BigDecimal("10000000000000000000"), 1),
     |  (BigDecimal("10000000000000000000"), 1),
     |  (BigDecimal("10000000000000000000"), 2),
     |  (BigDecimal("10000000000000000000"), 2),
     |  (BigDecimal("10000000000000000000"), 2),
     |  (BigDecimal("10000000000000000000"), 2),
     |  (BigDecimal("10000000000000000000"), 2),
     |  (BigDecimal("10000000000000000000"), 2),
     |  (BigDecimal("10000000000000000000"), 2),
     |  (BigDecimal("10000000000000000000"), 2),
     |  (BigDecimal("10000000000000000000"), 2),
     |  (BigDecimal("10000000000000000000"), 2)).toDF("decNum", "intNum")
df: org.apache.spark.sql.DataFrame = [decNum: decimal(38,18), intNum: int]

scala> val df2 = df.withColumnRenamed("decNum", "decNum2").join(df, "intNum").agg(sum("decNum"))
df2: org.apache.spark.sql.DataFrame = [sum(decNum): decimal(38,18)]

scala>  df2.show(40,false)
20/02/18 10:57:43 ERROR Executor: Exception in task 5.0 in stage 4.0 (TID 30)
java.lang.ArithmeticException: Decimal(expanded,100000000000000000000.000000000000000000,39,18}) cannot be represented as Decimal(38, 18).

```

### Why are the changes needed?
The changes are needed in order to fix the wrong results that are returned for decimal aggregate sum.

### Does this PR introduce any user-facing change?
User would see wrong results on aggregate sum that involved decimal overflow prior to this change, but now the user will see null.  But if user enables the spark.sql.ansi.enabled flag to true, then the user will see an exception and not incorrect results.

### How was this patch tested?
New test has been added and existing tests for sql, catalyst and hive suites were run ok.

Closes #27627 from skambha/decaggfixwrongresults.

Lead-authored-by: Sunitha Kambhampati <skambha@us.ibm.com>
Co-authored-by: Wenchen Fan <wenchen@databricks.com>
Signed-off-by: Wenchen Fan <wenchen@databricks.com>
2020-06-02 11:30:30 +00:00
HyukjinKwon e69466056f [SPARK-31849][PYTHON][SQL] Make PySpark SQL exceptions more Pythonic
### What changes were proposed in this pull request?

This PR proposes to make PySpark exception more Pythonic by hiding JVM stacktrace by default. It can be enabled by turning on `spark.sql.pyspark.jvmStacktrace.enabled` configuration.

```
Traceback (most recent call last):
  ...
pyspark.sql.utils.PythonException:
  An exception was thrown from Python worker in the executor. The below is the Python worker stacktrace.
Traceback (most recent call last):
  ...
```

If this `spark.sql.pyspark.jvmStacktrace.enabled` is enabled, it appends:

```
JVM stacktrace:
org.apache.spark.Exception: ...
  ...
```

For example, the codes below:

```python
from pyspark.sql.functions import udf
udf
def divide_by_zero(v):
    raise v / 0

spark.range(1).select(divide_by_zero("id")).show()
```

will show an error messages that looks like Python exception thrown from the local.

<details>
<summary>Python exception message when <code>spark.sql.pyspark.jvmStacktrace.enabled</code> is off (default)</summary>

```
Traceback (most recent call last):
  File "<stdin>", line 1, in <module>
  File "/.../spark/python/pyspark/sql/dataframe.py", line 427, in show
    print(self._jdf.showString(n, 20, vertical))
  File "/.../spark/python/lib/py4j-0.10.9-src.zip/py4j/java_gateway.py", line 1305, in __call__
  File "/.../spark/python/pyspark/sql/utils.py", line 131, in deco
    raise_from(converted)
  File "<string>", line 3, in raise_from
pyspark.sql.utils.PythonException:
  An exception was thrown from Python worker in the executor. The below is the Python worker stacktrace.
Traceback (most recent call last):
  File "/.../spark/python/lib/pyspark.zip/pyspark/worker.py", line 605, in main
    process()
  File "/.../spark/python/lib/pyspark.zip/pyspark/worker.py", line 597, in process
    serializer.dump_stream(out_iter, outfile)
  File "/.../spark/python/lib/pyspark.zip/pyspark/serializers.py", line 223, in dump_stream
    self.serializer.dump_stream(self._batched(iterator), stream)
  File "/.../spark/python/lib/pyspark.zip/pyspark/serializers.py", line 141, in dump_stream
    for obj in iterator:
  File "/.../spark/python/lib/pyspark.zip/pyspark/serializers.py", line 212, in _batched
    for item in iterator:
  File "/.../spark/python/lib/pyspark.zip/pyspark/worker.py", line 450, in mapper
    result = tuple(f(*[a[o] for o in arg_offsets]) for (arg_offsets, f) in udfs)
  File "/.../spark/python/lib/pyspark.zip/pyspark/worker.py", line 450, in <genexpr>
    result = tuple(f(*[a[o] for o in arg_offsets]) for (arg_offsets, f) in udfs)
  File "/.../spark/python/lib/pyspark.zip/pyspark/worker.py", line 90, in <lambda>
    return lambda *a: f(*a)
  File "/.../spark/python/lib/pyspark.zip/pyspark/util.py", line 107, in wrapper
    return f(*args, **kwargs)
  File "<stdin>", line 3, in divide_by_zero
ZeroDivisionError: division by zero
```

</details>

<details>
<summary>Python exception message when <code>spark.sql.pyspark.jvmStacktrace.enabled</code> is on</summary>

```
Traceback (most recent call last):
  File "<stdin>", line 1, in <module>
  File "/.../spark/python/pyspark/sql/dataframe.py", line 427, in show
    print(self._jdf.showString(n, 20, vertical))
  File "/.../spark/python/lib/py4j-0.10.9-src.zip/py4j/java_gateway.py", line 1305, in __call__
  File "/.../spark/python/pyspark/sql/utils.py", line 137, in deco
    raise_from(converted)
  File "<string>", line 3, in raise_from
pyspark.sql.utils.PythonException:
  An exception was thrown from Python worker in the executor. The below is the Python worker stacktrace.
Traceback (most recent call last):
  File "/.../spark/python/lib/pyspark.zip/pyspark/worker.py", line 605, in main
    process()
  File "/.../spark/python/lib/pyspark.zip/pyspark/worker.py", line 597, in process
    serializer.dump_stream(out_iter, outfile)
  File "/.../spark/python/lib/pyspark.zip/pyspark/serializers.py", line 223, in dump_stream
    self.serializer.dump_stream(self._batched(iterator), stream)
  File "/.../spark/python/lib/pyspark.zip/pyspark/serializers.py", line 141, in dump_stream
    for obj in iterator:
  File "/.../spark/python/lib/pyspark.zip/pyspark/serializers.py", line 212, in _batched
    for item in iterator:
  File "/.../spark/python/lib/pyspark.zip/pyspark/worker.py", line 450, in mapper
    result = tuple(f(*[a[o] for o in arg_offsets]) for (arg_offsets, f) in udfs)
  File "/.../spark/python/lib/pyspark.zip/pyspark/worker.py", line 450, in <genexpr>
    result = tuple(f(*[a[o] for o in arg_offsets]) for (arg_offsets, f) in udfs)
  File "/.../spark/python/lib/pyspark.zip/pyspark/worker.py", line 90, in <lambda>
    return lambda *a: f(*a)
  File "/.../spark/python/lib/pyspark.zip/pyspark/util.py", line 107, in wrapper
    return f(*args, **kwargs)
  File "<stdin>", line 3, in divide_by_zero
ZeroDivisionError: division by zero

JVM stacktrace:
org.apache.spark.SparkException: Job aborted due to stage failure: Task 0 in stage 1.0 failed 4 times, most recent failure: Lost task 0.3 in stage 1.0 (TID 4, 192.168.35.193, executor 0): org.apache.spark.api.python.PythonException: Traceback (most recent call last):
  File "/.../spark/python/lib/pyspark.zip/pyspark/worker.py", line 605, in main
    process()
  File "/.../spark/python/lib/pyspark.zip/pyspark/worker.py", line 597, in process
    serializer.dump_stream(out_iter, outfile)
  File "/.../spark/python/lib/pyspark.zip/pyspark/serializers.py", line 223, in dump_stream
    self.serializer.dump_stream(self._batched(iterator), stream)
  File "/.../spark/python/lib/pyspark.zip/pyspark/serializers.py", line 141, in dump_stream
    for obj in iterator:
  File "/.../spark/python/lib/pyspark.zip/pyspark/serializers.py", line 212, in _batched
    for item in iterator:
  File "/.../spark/python/lib/pyspark.zip/pyspark/worker.py", line 450, in mapper
    result = tuple(f(*[a[o] for o in arg_offsets]) for (arg_offsets, f) in udfs)
  File "/.../spark/python/lib/pyspark.zip/pyspark/worker.py", line 450, in <genexpr>
    result = tuple(f(*[a[o] for o in arg_offsets]) for (arg_offsets, f) in udfs)
  File "/.../spark/python/lib/pyspark.zip/pyspark/worker.py", line 90, in <lambda>
    return lambda *a: f(*a)
  File "/.../spark/python/lib/pyspark.zip/pyspark/util.py", line 107, in wrapper
    return f(*args, **kwargs)
  File "<stdin>", line 3, in divide_by_zero
ZeroDivisionError: division by zero

	at org.apache.spark.api.python.BasePythonRunner$ReaderIterator.handlePythonException(PythonRunner.scala:516)
	at org.apache.spark.sql.execution.python.PythonUDFRunner$$anon$2.read(PythonUDFRunner.scala:81)
	at org.apache.spark.sql.execution.python.PythonUDFRunner$$anon$2.read(PythonUDFRunner.scala:64)
	at org.apache.spark.api.python.BasePythonRunner$ReaderIterator.hasNext(PythonRunner.scala:469)
	at org.apache.spark.InterruptibleIterator.hasNext(InterruptibleIterator.scala:37)
	at scala.collection.Iterator$$anon$11.hasNext(Iterator.scala:489)
	at scala.collection.Iterator$$anon$10.hasNext(Iterator.scala:458)
	at scala.collection.Iterator$$anon$10.hasNext(Iterator.scala:458)
	at org.apache.spark.sql.catalyst.expressions.GeneratedClass$GeneratedIteratorForCodegenStage2.processNext(Unknown Source)
	at org.apache.spark.sql.execution.BufferedRowIterator.hasNext(BufferedRowIterator.java:43)
	at org.apache.spark.sql.execution.WholeStageCodegenExec$$anon$1.hasNext(WholeStageCodegenExec.scala:753)
	at org.apache.spark.sql.execution.SparkPlan.$anonfun$getByteArrayRdd$1(SparkPlan.scala:340)
	at org.apache.spark.rdd.RDD.$anonfun$mapPartitionsInternal$2(RDD.scala:898)
	at org.apache.spark.rdd.RDD.$anonfun$mapPartitionsInternal$2$adapted(RDD.scala:898)
	at org.apache.spark.rdd.MapPartitionsRDD.compute(MapPartitionsRDD.scala:52)
	at org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:373)
	at org.apache.spark.rdd.RDD.iterator(RDD.scala:337)
	at org.apache.spark.scheduler.ResultTask.runTask(ResultTask.scala:90)
	at org.apache.spark.scheduler.Task.run(Task.scala:127)
	at org.apache.spark.executor.Executor$TaskRunner.$anonfun$run$3(Executor.scala:469)
	at org.apache.spark.util.Utils$.tryWithSafeFinally(Utils.scala:1377)
	at org.apache.spark.executor.Executor$TaskRunner.run(Executor.scala:472)
	at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1149)
	at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:624)
	at java.lang.Thread.run(Thread.java:748)

Driver stacktrace:
	at org.apache.spark.scheduler.DAGScheduler.failJobAndIndependentStages(DAGScheduler.scala:2117)
	at org.apache.spark.scheduler.DAGScheduler.$anonfun$abortStage$2(DAGScheduler.scala:2066)
	at org.apache.spark.scheduler.DAGScheduler.$anonfun$abortStage$2$adapted(DAGScheduler.scala:2065)
	at scala.collection.mutable.ResizableArray.foreach(ResizableArray.scala:62)
	at scala.collection.mutable.ResizableArray.foreach$(ResizableArray.scala:55)
	at scala.collection.mutable.ArrayBuffer.foreach(ArrayBuffer.scala:49)
	at org.apache.spark.scheduler.DAGScheduler.abortStage(DAGScheduler.scala:2065)
	at org.apache.spark.scheduler.DAGScheduler.$anonfun$handleTaskSetFailed$1(DAGScheduler.scala:1021)
	at org.apache.spark.scheduler.DAGScheduler.$anonfun$handleTaskSetFailed$1$adapted(DAGScheduler.scala:1021)
	at scala.Option.foreach(Option.scala:407)
	at org.apache.spark.scheduler.DAGScheduler.handleTaskSetFailed(DAGScheduler.scala:1021)
	at org.apache.spark.scheduler.DAGSchedulerEventProcessLoop.doOnReceive(DAGScheduler.scala:2297)
	at org.apache.spark.scheduler.DAGSchedulerEventProcessLoop.onReceive(DAGScheduler.scala:2246)
	at org.apache.spark.scheduler.DAGSchedulerEventProcessLoop.onReceive(DAGScheduler.scala:2235)
	at org.apache.spark.util.EventLoop$$anon$1.run(EventLoop.scala:49)
	at org.apache.spark.scheduler.DAGScheduler.runJob(DAGScheduler.scala:823)
	at org.apache.spark.SparkContext.runJob(SparkContext.scala:2108)
	at org.apache.spark.SparkContext.runJob(SparkContext.scala:2129)
	at org.apache.spark.SparkContext.runJob(SparkContext.scala:2148)
	at org.apache.spark.sql.execution.SparkPlan.executeTake(SparkPlan.scala:467)
	at org.apache.spark.sql.execution.SparkPlan.executeTake(SparkPlan.scala:420)
	at org.apache.spark.sql.execution.CollectLimitExec.executeCollect(limit.scala:47)
	at org.apache.spark.sql.Dataset.collectFromPlan(Dataset.scala:3653)
	at org.apache.spark.sql.Dataset.$anonfun$head$1(Dataset.scala:2695)
	at org.apache.spark.sql.Dataset.$anonfun$withAction$1(Dataset.scala:3644)
	at org.apache.spark.sql.execution.SQLExecution$.$anonfun$withNewExecutionId$5(SQLExecution.scala:103)
	at org.apache.spark.sql.execution.SQLExecution$.withSQLConfPropagated(SQLExecution.scala:163)
	at org.apache.spark.sql.execution.SQLExecution$.$anonfun$withNewExecutionId$1(SQLExecution.scala:90)
	at org.apache.spark.sql.SparkSession.withActive(SparkSession.scala:763)
	at org.apache.spark.sql.execution.SQLExecution$.withNewExecutionId(SQLExecution.scala:64)
	at org.apache.spark.sql.Dataset.withAction(Dataset.scala:3642)
	at org.apache.spark.sql.Dataset.head(Dataset.scala:2695)
	at org.apache.spark.sql.Dataset.take(Dataset.scala:2902)
	at org.apache.spark.sql.Dataset.getRows(Dataset.scala:300)
	at org.apache.spark.sql.Dataset.showString(Dataset.scala:337)
	at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method)
	at sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:62)
	at sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)
	at java.lang.reflect.Method.invoke(Method.java:498)
	at py4j.reflection.MethodInvoker.invoke(MethodInvoker.java:244)
	at py4j.reflection.ReflectionEngine.invoke(ReflectionEngine.java:357)
	at py4j.Gateway.invoke(Gateway.java:282)
	at py4j.commands.AbstractCommand.invokeMethod(AbstractCommand.java:132)
	at py4j.commands.CallCommand.execute(CallCommand.java:79)
	at py4j.GatewayConnection.run(GatewayConnection.java:238)
	at java.lang.Thread.run(Thread.java:748)
Caused by: org.apache.spark.api.python.PythonException: Traceback (most recent call last):
  File "/.../spark/python/lib/pyspark.zip/pyspark/worker.py", line 605, in main
    process()
  File "/.../spark/python/lib/pyspark.zip/pyspark/worker.py", line 597, in process
    serializer.dump_stream(out_iter, outfile)
  File "/.../spark/python/lib/pyspark.zip/pyspark/serializers.py", line 223, in dump_stream
    self.serializer.dump_stream(self._batched(iterator), stream)
  File "/.../spark/python/lib/pyspark.zip/pyspark/serializers.py", line 141, in dump_stream
    for obj in iterator:
  File "/.../spark/python/lib/pyspark.zip/pyspark/serializers.py", line 212, in _batched
    for item in iterator:
  File "/.../spark/python/lib/pyspark.zip/pyspark/worker.py", line 450, in mapper
    result = tuple(f(*[a[o] for o in arg_offsets]) for (arg_offsets, f) in udfs)
  File "/.../spark/python/lib/pyspark.zip/pyspark/worker.py", line 450, in <genexpr>
    result = tuple(f(*[a[o] for o in arg_offsets]) for (arg_offsets, f) in udfs)
  File "/.../spark/python/lib/pyspark.zip/pyspark/worker.py", line 90, in <lambda>
    return lambda *a: f(*a)
  File "/.../spark/python/lib/pyspark.zip/pyspark/util.py", line 107, in wrapper
    return f(*args, **kwargs)
  File "<stdin>", line 3, in divide_by_zero
ZeroDivisionError: division by zero

	at org.apache.spark.api.python.BasePythonRunner$ReaderIterator.handlePythonException(PythonRunner.scala:516)
	at org.apache.spark.sql.execution.python.PythonUDFRunner$$anon$2.read(PythonUDFRunner.scala:81)
	at org.apache.spark.sql.execution.python.PythonUDFRunner$$anon$2.read(PythonUDFRunner.scala:64)
	at org.apache.spark.api.python.BasePythonRunner$ReaderIterator.hasNext(PythonRunner.scala:469)
	at org.apache.spark.InterruptibleIterator.hasNext(InterruptibleIterator.scala:37)
	at scala.collection.Iterator$$anon$11.hasNext(Iterator.scala:489)
	at scala.collection.Iterator$$anon$10.hasNext(Iterator.scala:458)
	at scala.collection.Iterator$$anon$10.hasNext(Iterator.scala:458)
	at org.apache.spark.sql.catalyst.expressions.GeneratedClass$GeneratedIteratorForCodegenStage2.processNext(Unknown Source)
	at org.apache.spark.sql.execution.BufferedRowIterator.hasNext(BufferedRowIterator.java:43)
	at org.apache.spark.sql.execution.WholeStageCodegenExec$$anon$1.hasNext(WholeStageCodegenExec.scala:753)
	at org.apache.spark.sql.execution.SparkPlan.$anonfun$getByteArrayRdd$1(SparkPlan.scala:340)
	at org.apache.spark.rdd.RDD.$anonfun$mapPartitionsInternal$2(RDD.scala:898)
	at org.apache.spark.rdd.RDD.$anonfun$mapPartitionsInternal$2$adapted(RDD.scala:898)
	at org.apache.spark.rdd.MapPartitionsRDD.compute(MapPartitionsRDD.scala:52)
	at org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:373)
	at org.apache.spark.rdd.RDD.iterator(RDD.scala:337)
	at org.apache.spark.scheduler.ResultTask.runTask(ResultTask.scala:90)
	at org.apache.spark.scheduler.Task.run(Task.scala:127)
	at org.apache.spark.executor.Executor$TaskRunner.$anonfun$run$3(Executor.scala:469)
	at org.apache.spark.util.Utils$.tryWithSafeFinally(Utils.scala:1377)
	at org.apache.spark.executor.Executor$TaskRunner.run(Executor.scala:472)
	at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1149)
	at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:624)
	... 1 more
```

</details>

<details>
<summary>Python exception message without this change</summary>

```
Traceback (most recent call last):
  File "<stdin>", line 1, in <module>
  File "/.../spark/python/pyspark/sql/dataframe.py", line 427, in show
    print(self._jdf.showString(n, 20, vertical))
  File "/.../spark/python/lib/py4j-0.10.9-src.zip/py4j/java_gateway.py", line 1305, in __call__
  File "/.../spark/python/pyspark/sql/utils.py", line 98, in deco
    return f(*a, **kw)
  File "/.../spark/python/lib/py4j-0.10.9-src.zip/py4j/protocol.py", line 328, in get_return_value
py4j.protocol.Py4JJavaError: An error occurred while calling o160.showString.
: org.apache.spark.SparkException: Job aborted due to stage failure: Task 10 in stage 5.0 failed 4 times, most recent failure: Lost task 10.3 in stage 5.0 (TID 37, 192.168.35.193, executor 3): org.apache.spark.api.python.PythonException: Traceback (most recent call last):
  File "/.../spark/python/lib/pyspark.zip/pyspark/worker.py", line 605, in main
    process()
  File "/.../spark/python/lib/pyspark.zip/pyspark/worker.py", line 597, in process
    serializer.dump_stream(out_iter, outfile)
  File "/.../spark/python/lib/pyspark.zip/pyspark/serializers.py", line 223, in dump_stream
    self.serializer.dump_stream(self._batched(iterator), stream)
  File "/.../spark/python/lib/pyspark.zip/pyspark/serializers.py", line 141, in dump_stream
    for obj in iterator:
  File "/.../spark/python/lib/pyspark.zip/pyspark/serializers.py", line 212, in _batched
    for item in iterator:
  File "/.../spark/python/lib/pyspark.zip/pyspark/worker.py", line 450, in mapper
    result = tuple(f(*[a[o] for o in arg_offsets]) for (arg_offsets, f) in udfs)
  File "/.../spark/python/lib/pyspark.zip/pyspark/worker.py", line 450, in <genexpr>
    result = tuple(f(*[a[o] for o in arg_offsets]) for (arg_offsets, f) in udfs)
  File "/.../spark/python/lib/pyspark.zip/pyspark/worker.py", line 90, in <lambda>
    return lambda *a: f(*a)
  File "/.../spark/python/lib/pyspark.zip/pyspark/util.py", line 107, in wrapper
    return f(*args, **kwargs)
  File "<stdin>", line 3, in divide_by_zero
ZeroDivisionError: division by zero

	at org.apache.spark.api.python.BasePythonRunner$ReaderIterator.handlePythonException(PythonRunner.scala:516)
	at org.apache.spark.sql.execution.python.PythonUDFRunner$$anon$2.read(PythonUDFRunner.scala:81)
	at org.apache.spark.sql.execution.python.PythonUDFRunner$$anon$2.read(PythonUDFRunner.scala:64)
	at org.apache.spark.api.python.BasePythonRunner$ReaderIterator.hasNext(PythonRunner.scala:469)
	at org.apache.spark.InterruptibleIterator.hasNext(InterruptibleIterator.scala:37)
	at scala.collection.Iterator$$anon$11.hasNext(Iterator.scala:489)
	at scala.collection.Iterator$$anon$10.hasNext(Iterator.scala:458)
	at scala.collection.Iterator$$anon$10.hasNext(Iterator.scala:458)
	at org.apache.spark.sql.catalyst.expressions.GeneratedClass$GeneratedIteratorForCodegenStage2.processNext(Unknown Source)
	at org.apache.spark.sql.execution.BufferedRowIterator.hasNext(BufferedRowIterator.java:43)
	at org.apache.spark.sql.execution.WholeStageCodegenExec$$anon$1.hasNext(WholeStageCodegenExec.scala:753)
	at org.apache.spark.sql.execution.SparkPlan.$anonfun$getByteArrayRdd$1(SparkPlan.scala:340)
	at org.apache.spark.rdd.RDD.$anonfun$mapPartitionsInternal$2(RDD.scala:898)
	at org.apache.spark.rdd.RDD.$anonfun$mapPartitionsInternal$2$adapted(RDD.scala:898)
	at org.apache.spark.rdd.MapPartitionsRDD.compute(MapPartitionsRDD.scala:52)
	at org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:373)
	at org.apache.spark.rdd.RDD.iterator(RDD.scala:337)
	at org.apache.spark.scheduler.ResultTask.runTask(ResultTask.scala:90)
	at org.apache.spark.scheduler.Task.run(Task.scala:127)
	at org.apache.spark.executor.Executor$TaskRunner.$anonfun$run$3(Executor.scala:469)
	at org.apache.spark.util.Utils$.tryWithSafeFinally(Utils.scala:1377)
	at org.apache.spark.executor.Executor$TaskRunner.run(Executor.scala:472)
	at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1149)
	at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:624)
	at java.lang.Thread.run(Thread.java:748)

Driver stacktrace:
	at org.apache.spark.scheduler.DAGScheduler.failJobAndIndependentStages(DAGScheduler.scala:2117)
	at org.apache.spark.scheduler.DAGScheduler.$anonfun$abortStage$2(DAGScheduler.scala:2066)
	at org.apache.spark.scheduler.DAGScheduler.$anonfun$abortStage$2$adapted(DAGScheduler.scala:2065)
	at scala.collection.mutable.ResizableArray.foreach(ResizableArray.scala:62)
	at scala.collection.mutable.ResizableArray.foreach$(ResizableArray.scala:55)
	at scala.collection.mutable.ArrayBuffer.foreach(ArrayBuffer.scala:49)
	at org.apache.spark.scheduler.DAGScheduler.abortStage(DAGScheduler.scala:2065)
	at org.apache.spark.scheduler.DAGScheduler.$anonfun$handleTaskSetFailed$1(DAGScheduler.scala:1021)
	at org.apache.spark.scheduler.DAGScheduler.$anonfun$handleTaskSetFailed$1$adapted(DAGScheduler.scala:1021)
	at scala.Option.foreach(Option.scala:407)
	at org.apache.spark.scheduler.DAGScheduler.handleTaskSetFailed(DAGScheduler.scala:1021)
	at org.apache.spark.scheduler.DAGSchedulerEventProcessLoop.doOnReceive(DAGScheduler.scala:2297)
	at org.apache.spark.scheduler.DAGSchedulerEventProcessLoop.onReceive(DAGScheduler.scala:2246)
	at org.apache.spark.scheduler.DAGSchedulerEventProcessLoop.onReceive(DAGScheduler.scala:2235)
	at org.apache.spark.util.EventLoop$$anon$1.run(EventLoop.scala:49)
	at org.apache.spark.scheduler.DAGScheduler.runJob(DAGScheduler.scala:823)
	at org.apache.spark.SparkContext.runJob(SparkContext.scala:2108)
	at org.apache.spark.SparkContext.runJob(SparkContext.scala:2129)
	at org.apache.spark.SparkContext.runJob(SparkContext.scala:2148)
	at org.apache.spark.sql.execution.SparkPlan.executeTake(SparkPlan.scala:467)
	at org.apache.spark.sql.execution.SparkPlan.executeTake(SparkPlan.scala:420)
	at org.apache.spark.sql.execution.CollectLimitExec.executeCollect(limit.scala:47)
	at org.apache.spark.sql.Dataset.collectFromPlan(Dataset.scala:3653)
	at org.apache.spark.sql.Dataset.$anonfun$head$1(Dataset.scala:2695)
	at org.apache.spark.sql.Dataset.$anonfun$withAction$1(Dataset.scala:3644)
	at org.apache.spark.sql.execution.SQLExecution$.$anonfun$withNewExecutionId$5(SQLExecution.scala:103)
	at org.apache.spark.sql.execution.SQLExecution$.withSQLConfPropagated(SQLExecution.scala:163)
	at org.apache.spark.sql.execution.SQLExecution$.$anonfun$withNewExecutionId$1(SQLExecution.scala:90)
	at org.apache.spark.sql.SparkSession.withActive(SparkSession.scala:763)
	at org.apache.spark.sql.execution.SQLExecution$.withNewExecutionId(SQLExecution.scala:64)
	at org.apache.spark.sql.Dataset.withAction(Dataset.scala:3642)
	at org.apache.spark.sql.Dataset.head(Dataset.scala:2695)
	at org.apache.spark.sql.Dataset.take(Dataset.scala:2902)
	at org.apache.spark.sql.Dataset.getRows(Dataset.scala:300)
	at org.apache.spark.sql.Dataset.showString(Dataset.scala:337)
	at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method)
	at sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:62)
	at sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)
	at java.lang.reflect.Method.invoke(Method.java:498)
	at py4j.reflection.MethodInvoker.invoke(MethodInvoker.java:244)
	at py4j.reflection.ReflectionEngine.invoke(ReflectionEngine.java:357)
	at py4j.Gateway.invoke(Gateway.java:282)
	at py4j.commands.AbstractCommand.invokeMethod(AbstractCommand.java:132)
	at py4j.commands.CallCommand.execute(CallCommand.java:79)
	at py4j.GatewayConnection.run(GatewayConnection.java:238)
	at java.lang.Thread.run(Thread.java:748)
Caused by: org.apache.spark.api.python.PythonException: Traceback (most recent call last):
  File "/.../spark/python/lib/pyspark.zip/pyspark/worker.py", line 605, in main
    process()
  File "/.../spark/python/lib/pyspark.zip/pyspark/worker.py", line 597, in process
    serializer.dump_stream(out_iter, outfile)
  File "/.../spark/python/lib/pyspark.zip/pyspark/serializers.py", line 223, in dump_stream
    self.serializer.dump_stream(self._batched(iterator), stream)
  File "/.../spark/python/lib/pyspark.zip/pyspark/serializers.py", line 141, in dump_stream
    for obj in iterator:
  File "/.../spark/python/lib/pyspark.zip/pyspark/serializers.py", line 212, in _batched
    for item in iterator:
  File "/.../spark/python/lib/pyspark.zip/pyspark/worker.py", line 450, in mapper
    result = tuple(f(*[a[o] for o in arg_offsets]) for (arg_offsets, f) in udfs)
  File "/.../spark/python/lib/pyspark.zip/pyspark/worker.py", line 450, in <genexpr>
    result = tuple(f(*[a[o] for o in arg_offsets]) for (arg_offsets, f) in udfs)
  File "/.../spark/python/lib/pyspark.zip/pyspark/worker.py", line 90, in <lambda>
    return lambda *a: f(*a)
  File "/.../spark/python/lib/pyspark.zip/pyspark/util.py", line 107, in wrapper
    return f(*args, **kwargs)
  File "<stdin>", line 3, in divide_by_zero
ZeroDivisionError: division by zero

	at org.apache.spark.api.python.BasePythonRunner$ReaderIterator.handlePythonException(PythonRunner.scala:516)
	at org.apache.spark.sql.execution.python.PythonUDFRunner$$anon$2.read(PythonUDFRunner.scala:81)
	at org.apache.spark.sql.execution.python.PythonUDFRunner$$anon$2.read(PythonUDFRunner.scala:64)
	at org.apache.spark.api.python.BasePythonRunner$ReaderIterator.hasNext(PythonRunner.scala:469)
	at org.apache.spark.InterruptibleIterator.hasNext(InterruptibleIterator.scala:37)
	at scala.collection.Iterator$$anon$11.hasNext(Iterator.scala:489)
	at scala.collection.Iterator$$anon$10.hasNext(Iterator.scala:458)
	at scala.collection.Iterator$$anon$10.hasNext(Iterator.scala:458)
	at org.apache.spark.sql.catalyst.expressions.GeneratedClass$GeneratedIteratorForCodegenStage2.processNext(Unknown Source)
	at org.apache.spark.sql.execution.BufferedRowIterator.hasNext(BufferedRowIterator.java:43)
	at org.apache.spark.sql.execution.WholeStageCodegenExec$$anon$1.hasNext(WholeStageCodegenExec.scala:753)
	at org.apache.spark.sql.execution.SparkPlan.$anonfun$getByteArrayRdd$1(SparkPlan.scala:340)
	at org.apache.spark.rdd.RDD.$anonfun$mapPartitionsInternal$2(RDD.scala:898)
	at org.apache.spark.rdd.RDD.$anonfun$mapPartitionsInternal$2$adapted(RDD.scala:898)
	at org.apache.spark.rdd.MapPartitionsRDD.compute(MapPartitionsRDD.scala:52)
	at org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:373)
	at org.apache.spark.rdd.RDD.iterator(RDD.scala:337)
	at org.apache.spark.scheduler.ResultTask.runTask(ResultTask.scala:90)
	at org.apache.spark.scheduler.Task.run(Task.scala:127)
	at org.apache.spark.executor.Executor$TaskRunner.$anonfun$run$3(Executor.scala:469)
	at org.apache.spark.util.Utils$.tryWithSafeFinally(Utils.scala:1377)
	at org.apache.spark.executor.Executor$TaskRunner.run(Executor.scala:472)
	at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1149)
	at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:624)
	... 1 more
```

</details>

<br/>

Another example with Python 3.7:

```python
sql("a")
```

<details>
<summary>Python exception message when <code>spark.sql.pyspark.jvmStacktrace.enabled</code> is off (default)</summary>

```
Traceback (most recent call last):
  File "<stdin>", line 1, in <module>
  File "/.../spark/python/pyspark/sql/session.py", line 646, in sql
    return DataFrame(self._jsparkSession.sql(sqlQuery), self._wrapped)
  File "/.../spark/python/lib/py4j-0.10.9-src.zip/py4j/java_gateway.py", line 1305, in __call__
  File "/.../spark/python/pyspark/sql/utils.py", line 131, in deco
    raise_from(converted)
  File "<string>", line 3, in raise_from
pyspark.sql.utils.ParseException:
mismatched input 'a' expecting {'(', 'ADD', 'ALTER', 'ANALYZE', 'CACHE', 'CLEAR', 'COMMENT', 'COMMIT', 'CREATE', 'DELETE', 'DESC', 'DESCRIBE', 'DFS', 'DROP', 'EXPLAIN', 'EXPORT', 'FROM', 'GRANT', 'IMPORT', 'INSERT', 'LIST', 'LOAD', 'LOCK', 'MAP', 'MERGE', 'MSCK', 'REDUCE', 'REFRESH', 'REPLACE', 'RESET', 'REVOKE', 'ROLLBACK', 'SELECT', 'SET', 'SHOW', 'START', 'TABLE', 'TRUNCATE', 'UNCACHE', 'UNLOCK', 'UPDATE', 'USE', 'VALUES', 'WITH'}(line 1, pos 0)

== SQL ==
a
^^^
```

</details>

<details>
<summary>Python exception message when <code>spark.sql.pyspark.jvmStacktrace.enabled</code> is on</summary>

```
Traceback (most recent call last):
  File "<stdin>", line 1, in <module>
  File "/.../spark/python/pyspark/sql/session.py", line 646, in sql
    return DataFrame(self._jsparkSession.sql(sqlQuery), self._wrapped)
  File "/.../spark/python/lib/py4j-0.10.9-src.zip/py4j/java_gateway.py", line 1305, in __call__
  File "/.../spark/python/pyspark/sql/utils.py", line 131, in deco
    raise_from(converted)
  File "<string>", line 3, in raise_from
pyspark.sql.utils.ParseException:
mismatched input 'a' expecting {'(', 'ADD', 'ALTER', 'ANALYZE', 'CACHE', 'CLEAR', 'COMMENT', 'COMMIT', 'CREATE', 'DELETE', 'DESC', 'DESCRIBE', 'DFS', 'DROP', 'EXPLAIN', 'EXPORT', 'FROM', 'GRANT', 'IMPORT', 'INSERT', 'LIST', 'LOAD', 'LOCK', 'MAP', 'MERGE', 'MSCK', 'REDUCE', 'REFRESH', 'REPLACE', 'RESET', 'REVOKE', 'ROLLBACK', 'SELECT', 'SET', 'SHOW', 'START', 'TABLE', 'TRUNCATE', 'UNCACHE', 'UNLOCK', 'UPDATE', 'USE', 'VALUES', 'WITH'}(line 1, pos 0)

== SQL ==
a
^^^

JVM stacktrace:
org.apache.spark.sql.catalyst.parser.ParseException:
mismatched input 'a' expecting {'(', 'ADD', 'ALTER', 'ANALYZE', 'CACHE', 'CLEAR', 'COMMENT', 'COMMIT', 'CREATE', 'DELETE', 'DESC', 'DESCRIBE', 'DFS', 'DROP', 'EXPLAIN', 'EXPORT', 'FROM', 'GRANT', 'IMPORT', 'INSERT', 'LIST', 'LOAD', 'LOCK', 'MAP', 'MERGE', 'MSCK', 'REDUCE', 'REFRESH', 'REPLACE', 'RESET', 'REVOKE', 'ROLLBACK', 'SELECT', 'SET', 'SHOW', 'START', 'TABLE', 'TRUNCATE', 'UNCACHE', 'UNLOCK', 'UPDATE', 'USE', 'VALUES', 'WITH'}(line 1, pos 0)

== SQL ==
a
^^^

	at org.apache.spark.sql.catalyst.parser.ParseException.withCommand(ParseDriver.scala:266)
	at org.apache.spark.sql.catalyst.parser.AbstractSqlParser.parse(ParseDriver.scala:133)
	at org.apache.spark.sql.execution.SparkSqlParser.parse(SparkSqlParser.scala:49)
	at org.apache.spark.sql.catalyst.parser.AbstractSqlParser.parsePlan(ParseDriver.scala:81)
	at org.apache.spark.sql.SparkSession.$anonfun$sql$2(SparkSession.scala:604)
	at org.apache.spark.sql.catalyst.QueryPlanningTracker.measurePhase(QueryPlanningTracker.scala:111)
	at org.apache.spark.sql.SparkSession.$anonfun$sql$1(SparkSession.scala:604)
	at org.apache.spark.sql.SparkSession.withActive(SparkSession.scala:763)
	at org.apache.spark.sql.SparkSession.sql(SparkSession.scala:601)
	at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method)
	at sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:62)
	at sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)
	at java.lang.reflect.Method.invoke(Method.java:498)
	at py4j.reflection.MethodInvoker.invoke(MethodInvoker.java:244)
	at py4j.reflection.ReflectionEngine.invoke(ReflectionEngine.java:357)
	at py4j.Gateway.invoke(Gateway.java:282)
	at py4j.commands.AbstractCommand.invokeMethod(AbstractCommand.java:132)
	at py4j.commands.CallCommand.execute(CallCommand.java:79)
	at py4j.GatewayConnection.run(GatewayConnection.java:238)
	at java.lang.Thread.run(Thread.java:748)
```

</details>

<details>
<summary>Python exception message without this change</summary>

```
Traceback (most recent call last):
  File "/.../spark/python/pyspark/sql/utils.py", line 98, in deco
    return f(*a, **kw)
  File "/.../spark/python/lib/py4j-0.10.9-src.zip/py4j/protocol.py", line 328, in get_return_value
py4j.protocol.Py4JJavaError: An error occurred while calling o26.sql.
: org.apache.spark.sql.catalyst.parser.ParseException:
mismatched input 'a' expecting {'(', 'ADD', 'ALTER', 'ANALYZE', 'CACHE', 'CLEAR', 'COMMENT', 'COMMIT', 'CREATE', 'DELETE', 'DESC', 'DESCRIBE', 'DFS', 'DROP', 'EXPLAIN', 'EXPORT', 'FROM', 'GRANT', 'IMPORT', 'INSERT', 'LIST', 'LOAD', 'LOCK', 'MAP', 'MERGE', 'MSCK', 'REDUCE', 'REFRESH', 'REPLACE', 'RESET', 'REVOKE', 'ROLLBACK', 'SELECT', 'SET', 'SHOW', 'START', 'TABLE', 'TRUNCATE', 'UNCACHE', 'UNLOCK', 'UPDATE', 'USE', 'VALUES', 'WITH'}(line 1, pos 0)

== SQL ==
a
^^^

	at org.apache.spark.sql.catalyst.parser.ParseException.withCommand(ParseDriver.scala:266)
	at org.apache.spark.sql.catalyst.parser.AbstractSqlParser.parse(ParseDriver.scala:133)
	at org.apache.spark.sql.execution.SparkSqlParser.parse(SparkSqlParser.scala:49)
	at org.apache.spark.sql.catalyst.parser.AbstractSqlParser.parsePlan(ParseDriver.scala:81)
	at org.apache.spark.sql.SparkSession.$anonfun$sql$2(SparkSession.scala:604)
	at org.apache.spark.sql.catalyst.QueryPlanningTracker.measurePhase(QueryPlanningTracker.scala:111)
	at org.apache.spark.sql.SparkSession.$anonfun$sql$1(SparkSession.scala:604)
	at org.apache.spark.sql.SparkSession.withActive(SparkSession.scala:763)
	at org.apache.spark.sql.SparkSession.sql(SparkSession.scala:601)
	at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method)
	at sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:62)
	at sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)
	at java.lang.reflect.Method.invoke(Method.java:498)
	at py4j.reflection.MethodInvoker.invoke(MethodInvoker.java:244)
	at py4j.reflection.ReflectionEngine.invoke(ReflectionEngine.java:357)
	at py4j.Gateway.invoke(Gateway.java:282)
	at py4j.commands.AbstractCommand.invokeMethod(AbstractCommand.java:132)
	at py4j.commands.CallCommand.execute(CallCommand.java:79)
	at py4j.GatewayConnection.run(GatewayConnection.java:238)
	at java.lang.Thread.run(Thread.java:748)

During handling of the above exception, another exception occurred:

Traceback (most recent call last):
  File "<stdin>", line 1, in <module>
  File "/.../spark/python/pyspark/sql/session.py", line 646, in sql
    return DataFrame(self._jsparkSession.sql(sqlQuery), self._wrapped)
  File "/.../spark/python/lib/py4j-0.10.9-src.zip/py4j/java_gateway.py", line 1305, in __call__
  File "/.../spark/python/pyspark/sql/utils.py", line 102, in deco
    raise converted
pyspark.sql.utils.ParseException:
mismatched input 'a' expecting {'(', 'ADD', 'ALTER', 'ANALYZE', 'CACHE', 'CLEAR', 'COMMENT', 'COMMIT', 'CREATE', 'DELETE', 'DESC', 'DESCRIBE', 'DFS', 'DROP', 'EXPLAIN', 'EXPORT', 'FROM', 'GRANT', 'IMPORT', 'INSERT', 'LIST', 'LOAD', 'LOCK', 'MAP', 'MERGE', 'MSCK', 'REDUCE', 'REFRESH', 'REPLACE', 'RESET', 'REVOKE', 'ROLLBACK', 'SELECT', 'SET', 'SHOW', 'START', 'TABLE', 'TRUNCATE', 'UNCACHE', 'UNLOCK', 'UPDATE', 'USE', 'VALUES', 'WITH'}(line 1, pos 0)

== SQL ==
a
^^^
```

</details>

### Why are the changes needed?

Currently, PySpark exceptions are very unfriendly to Python users with causing a bunch of JVM stacktrace. See "Python exception message without this change" above.

### Does this PR introduce _any_ user-facing change?

Yes, it will change the exception message. See the examples above.

### How was this patch tested?

Manually tested by

```bash
./bin/pyspark --conf spark.sql.pyspark.jvmStacktrace.enabled=true
```

and running the examples above.

Closes #28661 from HyukjinKwon/python-debug.

Authored-by: HyukjinKwon <gurwls223@apache.org>
Signed-off-by: HyukjinKwon <gurwls223@apache.org>
2020-06-01 09:45:21 +09:00
Max Gekk 47dc332258 [SPARK-31874][SQL] Use FastDateFormat as the legacy fractional formatter
### What changes were proposed in this pull request?
1. Replace `SimpleDateFormat` by `FastDateFormat` as the legacy formatter of `FractionTimestampFormatter`.
2. Optimise `LegacyFastTimestampFormatter` for `java.sql.Timestamp` w/o fractional part.

### Why are the changes needed?
1. By default `HiveResult`.`hiveResultString` retrieves timestamps values as instances of `java.sql.Timestamp`, and uses the legacy parser `SimpleDateFormat` to convert the timestamps to strings. After the fix https://github.com/apache/spark/pull/28024, the fractional formatter and its companion - legacy formatter `SimpleDateFormat` are created per every value. By switching from `LegacySimpleTimestampFormatter` to `LegacyFastTimestampFormatter`, we can utilize the internal cache of `FastDateFormat`, and avoid parsing the default pattern `yyyy-MM-dd HH:mm:ss`.
2. The second change in the method `def format(ts: Timestamp): String` of `LegacyFastTimestampFormatter` is needed to optimize the formatter for patterns without the fractional part and avoid conversions to microseconds.

### Does this PR introduce _any_ user-facing change?
No

### How was this patch tested?
By existing tests in `TimestampFormatter`.

Closes #28678 from MaxGekk/fastdateformat-as-legacy-frac-formatter.

Authored-by: Max Gekk <max.gekk@gmail.com>
Signed-off-by: Wenchen Fan <wenchen@databricks.com>
2020-05-31 13:05:00 +00:00
Kent Yao 547c5bf552 [SPARK-31867][SQL] Disable year type datetime patterns which are longer than 10
### What changes were proposed in this pull request?

As mentioned in https://github.com/apache/spark/pull/28673 and suggested via cloud-fan at https://github.com/apache/spark/pull/28673#discussion_r432817075

In this PR, we disable datetime pattern in the form of `y..y` and `Y..Y` whose lengths are greater than 10 to avoid sort of JDK bug as described below

he new datetime formatter introduces silent data change like,

```sql
spark-sql> select from_unixtime(1, 'yyyyyyyyyyy-MM-dd');
NULL
spark-sql> set spark.sql.legacy.timeParserPolicy=legacy;
spark.sql.legacy.timeParserPolicy	legacy
spark-sql> select from_unixtime(1, 'yyyyyyyyyyy-MM-dd');
00000001970-01-01
spark-sql>
```

For patterns that support `SignStyle.EXCEEDS_PAD`, e.g. `y..y`(len >=4), when using the `NumberPrinterParser` to format it

```java
switch (signStyle) {
  case EXCEEDS_PAD:
    if (minWidth < 19 && value >= EXCEED_POINTS[minWidth]) {
      buf.append(decimalStyle.getPositiveSign());
    }
    break;

           ....
```
the `minWidth` == `len(y..y)`
the `EXCEED_POINTS` is

```java
/**
         * Array of 10 to the power of n.
         */
        static final long[] EXCEED_POINTS = new long[] {
            0L,
            10L,
            100L,
            1000L,
            10000L,
            100000L,
            1000000L,
            10000000L,
            100000000L,
            1000000000L,
            10000000000L,
        };
```

So when the `len(y..y)` is greater than 10, ` ArrayIndexOutOfBoundsException` will be raised.

 And at the caller side, for `from_unixtime`, the exception will be suppressed and silent data change occurs. for `date_format`, the `ArrayIndexOutOfBoundsException` will continue.

### Why are the changes needed?
fix silent data change

### Does this PR introduce _any_ user-facing change?

Yes, SparkUpgradeException will take place of `null` result when the pattern contains 10 or more continuous  'y' or 'Y'

### How was this patch tested?

new tests

Closes #28684 from yaooqinn/SPARK-31867-2.

Authored-by: Kent Yao <yaooqinn@hotmail.com>
Signed-off-by: Wenchen Fan <wenchen@databricks.com>
2020-05-31 12:34:39 +00:00
Maryann Xue b9737c3c22 [SPARK-31864][SQL] Adjust AQE skew join trigger condition
### What changes were proposed in this pull request?

This PR makes a minor change in deciding whether a partition is skewed by comparing the partition size to the median size of coalesced partitions instead of median size of raw partitions before coalescing.

### Why are the changes needed?

This change is line with target size criteria of splitting skew join partitions and can also cope with situations of extra empty partitions caused by over-partitioning. This PR has also improved skew join tests in AQE tests.

### Does this PR introduce _any_ user-facing change?

No.

### How was this patch tested?

Updated UTs.

Closes #28669 from maryannxue/spark-31864.

Authored-by: Maryann Xue <maryann.xue@gmail.com>
Signed-off-by: Wenchen Fan <wenchen@databricks.com>
2020-05-30 07:47:29 +00:00
Jungtaek Lim (HeartSaVioR) fe1d1e24bc [SPARK-31214][BUILD] Upgrade Janino to 3.1.2
### What changes were proposed in this pull request?

This PR proposes to upgrade Janino to 3.1.2 which is released recently.

Major changes were done for refactoring, as well as there're lots of "no commit message". Belows are the pairs of (commit title, commit) which seem to deal with some bugs or specific improvements (not purposed to refactor) after 3.0.15.

* Issue #119: Guarantee executing popOperand() in popUninitializedVariableOperand() via moving popOperand() out of "assert"
* Issue #116: replace operand to final target type if boxing conversion and widening reference conversion happen together
* Merged pull request `#114` "Grow the code for relocatables, and do fixup, and relocate".
  * 367c58e73e
* issue `#107`: Janino requires "org.codehaus.commons.compiler.io", but commons-compiler does not export this package
  * f7d99596d4
* Throw an NYI CompileException when a static interface method is invoked.
  * efd3884983
* Fixed the promotion of the array access index expression (see JLS7 15.13 Array Access Expressions)
  * 32fdb5f5f1
* Issue `#104`: ClassLoaderIClassLoader 's ClassNotFoundException handle mechanism enhancement
  * 6e8a97d609

You can see the changelog from the link: http://janino-compiler.github.io/janino/changelog.html

### Why are the changes needed?

We got some report on failure on user's query which Janino throws error on compiling generated code. The issue is here: https://github.com/janino-compiler/janino/issues/113 It contains the information of generated code, symptom (error), and analysis of the bug, so please refer the link for more details.
Janino 3.1.1 contains the PR https://github.com/janino-compiler/janino/pull/114 which would enable Janino to succeed to compile user's query properly. I've also fixed a couple of more bugs as 3.1.1 made Spark UTs fail - hence we need to upgrade to 3.1.2.

Furthermore, from my testing, https://github.com/janino-compiler/janino/issues/90 (which Josh Rosen filed before) seems to be also resolved in 3.1.2 as well.

Looks like Janino is maintained by one person and there's no even version branches and releases/tags so we can't expect Janino maintainer to release a new bugfix version - hence have to try out new minor version.

### Does this PR introduce any user-facing change?

No.

### How was this patch tested?

Existing UTs.

Closes #27860 from HeartSaVioR/SPARK-31101.

Authored-by: Jungtaek Lim (HeartSaVioR) <kabhwan.opensource@gmail.com>
Signed-off-by: Dongjoon Hyun <dongjoon@apache.org>
2020-05-29 07:42:57 -07:00
Yuming Wang 91148f428b [SPARK-28481][SQL] More expressions should extend NullIntolerant
### What changes were proposed in this pull request?

1. Make more expressions extend `NullIntolerant`.
2. Add a checker(in `ExpressionInfoSuite`) to identify whether the expression is `NullIntolerant`.

### Why are the changes needed?

Avoid skew join if the join column has many null values and can improve query performance. For examples:
```sql
CREATE TABLE t1(c1 string, c2 string) USING parquet;
CREATE TABLE t2(c1 string, c2 string) USING parquet;
EXPLAIN SELECT t1.* FROM t1 JOIN t2 ON upper(t1.c1) = upper(t2.c1);
```

Before and after this PR:
```sql
== Physical Plan ==
*(2) Project [c1#5, c2#6]
+- *(2) BroadcastHashJoin [upper(c1#5)], [upper(c1#7)], Inner, BuildLeft
   :- BroadcastExchange HashedRelationBroadcastMode(List(upper(input[0, string, true]))), [id=#41]
   :  +- *(1) ColumnarToRow
   :     +- FileScan parquet default.t1[c1#5,c2#6]
   +- *(2) ColumnarToRow
      +- FileScan parquet default.t2[c1#7]

== Physical Plan ==
*(2) Project [c1#5, c2#6]
+- *(2) BroadcastHashJoin [upper(c1#5)], [upper(c1#7)], Inner, BuildRight
   :- *(2) Project [c1#5, c2#6]
   :  +- *(2) Filter isnotnull(c1#5)
   :     +- *(2) ColumnarToRow
   :        +- FileScan parquet default.t1[c1#5,c2#6]
   +- BroadcastExchange HashedRelationBroadcastMode(List(upper(input[0, string, true]))), [id=#59]
      +- *(1) Project [c1#7]
         +- *(1) Filter isnotnull(c1#7)
            +- *(1) ColumnarToRow
               +- FileScan parquet default.t2[c1#7]

```

### Does this PR introduce _any_ user-facing change?

No.

### How was this patch tested?

Unit test.

Closes #28626 from wangyum/SPARK-28481.

Authored-by: Yuming Wang <yumwang@ebay.com>
Signed-off-by: Wenchen Fan <wenchen@databricks.com>
2020-05-29 07:28:57 +00:00
GuoPhilipse dfbc5edf20 [SPARK-31839][TESTS] Delete duplicate code in castsuit
### What changes were proposed in this pull request?
Delete duplicate code castsuit

### Why are the changes needed?
keep spark code clean

### Does this PR introduce _any_ user-facing change?
no

### How was this patch tested?
no need

Closes #28655 from GuoPhilipse/delete-duplicate-code-castsuit.

Lead-authored-by: GuoPhilipse <46367746+GuoPhilipse@users.noreply.github.com>
Co-authored-by: GuoPhilipse <guofei_ok@126.com>
Signed-off-by: HyukjinKwon <gurwls223@apache.org>
2020-05-28 09:57:11 +09:00
Wenchen Fan 1528fbced8 [SPARK-31827][SQL] fail datetime parsing/formatting if detect the Java 8 bug of stand-alone form
### What changes were proposed in this pull request?

If `LLL`/`qqq` is used in the datetime pattern string, and the current JDK in use has a bug for the stand-alone form (see https://bugs.openjdk.java.net/browse/JDK-8114833), throw an exception with a clear error message.

### Why are the changes needed?

to keep backward compatibility with Spark 2.4

### Does this PR introduce _any_ user-facing change?

Yes

Spark 2.4
```
scala> sql("select date_format('1990-1-1', 'LLL')").show
+---------------------------------------------+
|date_format(CAST(1990-1-1 AS TIMESTAMP), LLL)|
+---------------------------------------------+
|                                          Jan|
+---------------------------------------------+
```

Spark 3.0 with Java 11
```
scala> sql("select date_format('1990-1-1', 'LLL')").show
+---------------------------------------------+
|date_format(CAST(1990-1-1 AS TIMESTAMP), LLL)|
+---------------------------------------------+
|                                          Jan|
+---------------------------------------------+
```

Spark 3.0 with Java 8
```
// before this PR
+---------------------------------------------+
|date_format(CAST(1990-1-1 AS TIMESTAMP), LLL)|
+---------------------------------------------+
|                                            1|
+---------------------------------------------+
// after this PR
scala> sql("select date_format('1990-1-1', 'LLL')").show
org.apache.spark.SparkUpgradeException
```

### How was this patch tested?

manual test with java 8 and 11

Closes #28646 from cloud-fan/format.

Authored-by: Wenchen Fan <wenchen@databricks.com>
Signed-off-by: Wenchen Fan <wenchen@databricks.com>
2020-05-27 18:53:19 +00:00
Max Gekk b5eb0933ac [SPARK-31762][SQL][FOLLOWUP] Avoid double formatting in legacy fractional formatter
### What changes were proposed in this pull request?
Currently, the legacy fractional formatter is based on the implementation from Spark 2.4 which formats the input timestamp twice:
```
    val timestampString = ts.toString
    val formatted = legacyFormatter.format(ts)
```
to strip trailing zeros. This PR proposes to avoid the first formatting by forming the second fraction directly.

### Why are the changes needed?
It makes legacy fractional formatter faster.

### Does this PR introduce _any_ user-facing change?
No

### How was this patch tested?
By existing test "format fraction of second" in `TimestampFormatterSuite` + added test for timestamps before 1970-01-01 00:00:00Z

Closes #28643 from MaxGekk/optimize-legacy-fract-format.

Authored-by: Max Gekk <max.gekk@gmail.com>
Signed-off-by: Wenchen Fan <wenchen@databricks.com>
2020-05-27 18:44:22 +00:00
Kent Yao 311fe6a880 [SPARK-31835][SQL][TESTS] Add zoneId to codegen related tests in DateExpressionsSuite
### What changes were proposed in this pull request?

This PR modifies some codegen related tests to test escape characters for datetime functions which are time zone aware. If the timezone is absent, the formatter could result in `null` caused by `java.util.NoSuchElementException: None.get` and bypassing the real intention of those test cases.

### Why are the changes needed?

fix tests

### Does this PR introduce _any_ user-facing change?

no

### How was this patch tested?

passing the modified test cases.

Closes #28653 from yaooqinn/SPARK-31835.

Authored-by: Kent Yao <yaooqinn@hotmail.com>
Signed-off-by: Wenchen Fan <wenchen@databricks.com>
2020-05-27 17:26:07 +00:00
Ali Afroozeh f6f1e51072 [SPARK-31719][SQL] Refactor JoinSelection
### What changes were proposed in this pull request?
This PR extracts the logic for selecting the planned join type out of the `JoinSelection` rule and moves it to `JoinSelectionHelper` in Catalyst.

### Why are the changes needed?
This change both cleans up the code in `JoinSelection` and allows the logic to be in one place and be used from other rules that need to make decision based on the join type before the planning time.

### Does this PR introduce _any_ user-facing change?
`BuildSide`, `BuildLeft`, and `BuildRight` are moved from `org.apache.spark.sql.execution` to Catalyst in `org.apache.spark.sql.catalyst.optimizer`.

### How was this patch tested?
This is a refactoring, passes existing tests.

Closes #28540 from dbaliafroozeh/RefactorJoinSelection.

Authored-by: Ali Afroozeh <ali.afroozeh@databricks.com>
Signed-off-by: Wenchen Fan <wenchen@databricks.com>
2020-05-27 15:49:08 +00:00
beliefer 8f2b6f3a0b [SPARK-31393][SQL][FOLLOW-UP] Show the correct alias in schema for expression
### What changes were proposed in this pull request?
Some alias of expression can not display correctly in schema. This PR will fix them.
- `ln`
- `rint`
- `lcase`
- `position`

### Why are the changes needed?
Improve the implement of some expression.

### Does this PR introduce _any_ user-facing change?
 'Yes'. This PR will let user see the correct alias in schema.

### How was this patch tested?
Jenkins test.

Closes #28551 from beliefer/show-correct-alias-in-schema.

Lead-authored-by: beliefer <beliefer@163.com>
Co-authored-by: gengjiaan <gengjiaan@360.cn>
Signed-off-by: HyukjinKwon <gurwls223@apache.org>
2020-05-27 15:05:06 +09:00
HyukjinKwon df2a1fe131
[SPARK-31808][SQL] Makes struct function's output name and class name pretty
### What changes were proposed in this pull request?

This PR proposes to set the alias, and class name in its `ExpressionInfo` for `struct`.
- Class name in `ExpressionInfo`
  - from: `org.apache.spark.sql.catalyst.expressions.NamedStruct`
  - to:`org.apache.spark.sql.catalyst.expressions.CreateNamedStruct`
- Alias name: `named_struct(col1, v, ...)` -> `struct(v, ...)`

This PR takes over https://github.com/apache/spark/pull/28631

### Why are the changes needed?

To show the correct output name and class names to users.

### Does this PR introduce _any_ user-facing change?

Yes.

**Before:**

```scala
scala> sql("DESC FUNCTION struct").show(false)
+------------------------------------------------------------------------------------+
|function_desc                                                                       |
+------------------------------------------------------------------------------------+
|Function: struct                                                                    |
|Class: org.apache.spark.sql.catalyst.expressions.NamedStruct                        |
|Usage: struct(col1, col2, col3, ...) - Creates a struct with the given field values.|
+------------------------------------------------------------------------------------+
```

```scala
scala> sql("SELECT struct(1, 2)").show(false)
+------------------------------+
|named_struct(col1, 1, col2, 2)|
+------------------------------+
|[1, 2]                        |
+------------------------------+
```

**After:**

```scala
scala> sql("DESC FUNCTION struct").show(false)
+------------------------------------------------------------------------------------+
|function_desc                                                                       |
+------------------------------------------------------------------------------------+
|Function: struct                                                                    |
|Class: org.apache.spark.sql.catalyst.expressions.CreateNamedStruct                  |
|Usage: struct(col1, col2, col3, ...) - Creates a struct with the given field values.|
+------------------------------------------------------------------------------------+
```

```scala
scala> sql("SELECT struct(1, 2)").show(false)
+------------+
|struct(1, 2)|
+------------+
|[1, 2]      |
+------------+
```

### How was this patch tested?

Manually tested, and Jenkins tests.

Closes #28633 from HyukjinKwon/SPARK-31808.

Authored-by: HyukjinKwon <gurwls223@apache.org>
Signed-off-by: Dongjoon Hyun <dongjoon@apache.org>
2020-05-25 20:36:00 -07:00
Max Gekk 6c80ebbccb
[SPARK-31818][SQL] Fix pushing down filters with java.time.Instant values in ORC
### What changes were proposed in this pull request?
Convert `java.time.Instant` to `java.sql.Timestamp` in pushed down filters to ORC datasource when Java 8 time API enabled.

### Why are the changes needed?
The changes fix the exception raised while pushing date filters when `spark.sql.datetime.java8API.enabled` is set to `true`:
```
java.lang.IllegalArgumentException: Wrong value class java.time.Instant for TIMESTAMP.EQUALS leaf
 at org.apache.hadoop.hive.ql.io.sarg.SearchArgumentImpl$PredicateLeafImpl.checkLiteralType(SearchArgumentImpl.java:192)
 at org.apache.hadoop.hive.ql.io.sarg.SearchArgumentImpl$PredicateLeafImpl.<init>(SearchArgumentImpl.java:75)
```

### Does this PR introduce any user-facing change?
Yes

### How was this patch tested?
Added tests to `OrcFilterSuite`.

Closes #28636 from MaxGekk/orc-timestamp-filter-pushdown.

Authored-by: Max Gekk <max.gekk@gmail.com>
Signed-off-by: Dongjoon Hyun <dongjoon@apache.org>
2020-05-25 18:36:02 -07:00