Commit graph

7009 commits

Author SHA1 Message Date
angerszhu 643cd876e4 [SPARK-32352][SQL] Partially push down support data filter if it mixed in partition filters
### What changes were proposed in this pull request?
We support partially push partition filters since SPARK-28169. We can also support partially push down data filters if it mixed in partition filters and data filters. For example:
```
spark.sql(
  s"""
     |CREATE TABLE t(i INT, p STRING)
     |USING parquet
     |PARTITIONED BY (p)""".stripMargin)

spark.range(0, 1000).selectExpr("id as col").createOrReplaceTempView("temp")
for (part <- Seq(1, 2, 3, 4)) {
  sql(s"""
         |INSERT OVERWRITE TABLE t PARTITION (p='$part')
         |SELECT col FROM temp""".stripMargin)
}

spark.sql("SELECT * FROM t WHERE  WHERE (p = '1' AND i = 1) OR (p = '2' and i = 2)").explain()
```

We can also push down ```i = 1 or i = 2 ```

### Why are the changes needed?
Extract more data filter to FileSourceScanExec

### Does this PR introduce _any_ user-facing change?
NO

### How was this patch tested?
Added UT

Closes #29406 from AngersZhuuuu/SPARK-32352.

Authored-by: angerszhu <angers.zhu@gmail.com>
Signed-off-by: Wenchen Fan <wenchen@databricks.com>
2020-08-12 12:18:33 +00:00
Tin Hang To a418548dad [SPARK-31703][SQL] Parquet RLE float/double are read incorrectly on big endian platforms
### What changes were proposed in this pull request?
This PR fixes the issue introduced during SPARK-26985.

SPARK-26985 changes the `putDoubles()` and `putFloats()` methods to respect the platform's endian-ness.  However, that causes the RLE paths in VectorizedRleValuesReader.java to read the RLE entries in parquet as BIG_ENDIAN on big endian platforms (i.e., as is), even though parquet data is always in little endian format.

The comments in `WriteableColumnVector.java` say those methods are used for "ieee formatted doubles in platform native endian" (or floats), but since the data in parquet is always in little endian format, use of those methods appears to be inappropriate.

To demonstrate the problem with spark-shell:

```scala
import org.apache.spark._
import org.apache.spark.sql._
import org.apache.spark.sql.types._

var data = Seq(
  (1.0, 0.1),
  (2.0, 0.2),
  (0.3, 3.0),
  (4.0, 4.0),
  (5.0, 5.0))

var df = spark.createDataFrame(data).write.mode(SaveMode.Overwrite).parquet("/tmp/data.parquet2")
var df2 = spark.read.parquet("/tmp/data.parquet2")
df2.show()
```

result:

```scala
+--------------------+--------------------+
|                  _1|                  _2|
+--------------------+--------------------+
|           3.16E-322|-1.54234871366845...|
|         2.0553E-320|         2.0553E-320|
|          2.561E-320|          2.561E-320|
|4.66726145843124E-62|         1.0435E-320|
|        3.03865E-319|-1.54234871366757...|
+--------------------+--------------------+
```

Also tests in ParquetIOSuite that involve float/double data would fail, e.g.,

- basic data types (without binary)
- read raw Parquet file

/examples/src/main/python/mllib/isotonic_regression_example.py would fail as well.

Purposed code change is to add `putDoublesLittleEndian()` and `putFloatsLittleEndian()` methods for parquet to invoke, just like the existing `putIntsLittleEndian()` and `putLongsLittleEndian()`.  On little endian platforms they would call `putDoubles()` and `putFloats()`, on big endian they would read the entries as little endian like pre-SPARK-26985.

No new unit-test is introduced as the existing ones are actually sufficient.

### Why are the changes needed?
RLE float/double data in parquet files will not be read back correctly on big endian platforms.

### Does this PR introduce _any_ user-facing change?
No

### How was this patch tested?
All unit tests (mvn test) were ran and OK.

Closes #29383 from tinhto-000/SPARK-31703.

Authored-by: Tin Hang To <tinto@us.ibm.com>
Signed-off-by: Wenchen Fan <wenchen@databricks.com>
2020-08-12 06:39:10 +00:00
angerszhu 4cf8c1d07d [SPARK-32400][SQL] Improve test coverage of HiveScriptTransformationExec
### What changes were proposed in this pull request?

1. Extract common test case (no serde) to BasicScriptTransformationExecSuite
2. Add more test case for no serde mode about supported data type and behavior in `BasicScriptTransformationExecSuite`
3. Add more test case for hive serde mode about supported type and behavior in `HiveScriptTransformationExecSuite`

### Why are the changes needed?
Improve test coverage of Script Transformation

### Does this PR introduce _any_ user-facing change?
NO

### How was this patch tested?
Added UT

Closes #29401 from AngersZhuuuu/SPARK-32400.

Authored-by: angerszhu <angers.zhu@gmail.com>
Signed-off-by: Wenchen Fan <wenchen@databricks.com>
2020-08-12 06:02:42 +00:00
Max Gekk 0477d23467 [SPARK-32594][SQL] Fix serialization of dates inserted to Hive tables
### What changes were proposed in this pull request?
Fix `DaysWritable` by overriding parent's method `def get(doesTimeMatter: Boolean): Date` from `DateWritable` instead of `Date get()` because the former one uses the first one. The bug occurs because `HiveOutputWriter.write()` call `def get(doesTimeMatter: Boolean): Date` transitively with default implementation from the parent class  `DateWritable` which doesn't respect date rebases and uses not initialized `daysSinceEpoch` (0 which `1970-01-01`).

### Why are the changes needed?
The changes fix the bug:
```sql
spark-sql> CREATE TABLE table1 (d date);
spark-sql> INSERT INTO table1 VALUES (date '2020-08-11');
spark-sql> SELECT * FROM table1;
1970-01-01
```
The expected result of the last SQL statement must be **2020-08-11** but got **1970-01-01**.

### Does this PR introduce _any_ user-facing change?
Yes. After the fix, `INSERT` work correctly:
```sql
spark-sql> SELECT * FROM table1;
2020-08-11
```

### How was this patch tested?
Add new test to `HiveSerDeReadWriteSuite`

Closes #29409 from MaxGekk/insert-date-into-hive-table.

Authored-by: Max Gekk <max.gekk@gmail.com>
Signed-off-by: HyukjinKwon <gurwls223@apache.org>
2020-08-12 13:32:16 +09:00
xuewei.linxuewei c37357a092 [SPARK-32573][SQL] Anti Join Improvement with EmptyHashedRelation and EmptyHashedRelationWithAllNullKeys
### What changes were proposed in this pull request?
In [SPARK-32290](https://issues.apache.org/jira/browse/SPARK-32290), we introduced several new types of HashedRelation.

* EmptyHashedRelation
* EmptyHashedRelationWithAllNullKeys

They were all limited to used only in NAAJ scenario. These new HashedRelation could be applied to other scenario for performance improvements.

* EmptyHashedRelation could also be used in Normal AntiJoin for fast stop
* While AQE is on and buildSide is EmptyHashedRelationWithAllNullKeys, can convert NAAJ to a Empty LocalRelation to skip meaningless data iteration since in Single-Key NAAJ, if null key exists in BuildSide, will drop all records in streamedSide.

This Patch including two changes.

* using EmptyHashedRelation to do fast stop for common anti join as well
* In AQE, eliminate BroadcastHashJoin(NAAJ) if buildSide is a EmptyHashedRelationWithAllNullKeys

### Why are the changes needed?
LeftAntiJoin could apply `fast stop` when BuildSide is EmptyHashedRelation, While within AQE with EmptyHashedRelationWithAllNullKeys, we can eliminate the NAAJ. This should be a performance improvement in AntiJoin.

### Does this PR introduce _any_ user-facing change?
No.

### How was this patch tested?

* added case in AdaptiveQueryExecSuite.
* added case in HashedRelationSuite.
* Make sure SubquerySuite JoinSuite SQLQueryTestSuite passed.

Closes #29389 from leanken/leanken-SPARK-32573.

Authored-by: xuewei.linxuewei <xuewei.linxuewei@alibaba-inc.com>
Signed-off-by: Wenchen Fan <wenchen@databricks.com>
2020-08-11 06:23:51 +00:00
allisonwang-db 1b7443bd9a [SPARK-32216][SQL] Remove redundant ProjectExec
### What changes were proposed in this pull request?
This PR added a physical rule to remove redundant project nodes. A `ProjectExec` is redundant when
1. It has the same output attributes and order as its child's output when ordering of these attributes is required.
2. It has the same output attributes as its child's output when attribute output ordering is not required.

For example:
After Filter:
```
== Physical Plan ==
*(1) Project [a#14L, b#15L, c#16, key#17]
+- *(1) Filter (isnotnull(a#14L) AND (a#14L > 5))
   +- *(1) ColumnarToRow
      +- FileScan parquet [a#14L,b#15L,c#16,key#17]
```
The `Project a#14L, b#15L, c#16, key#17` is redundant because its output is exactly the same as filter's output.

Before Aggregate:
```
== Physical Plan ==
*(2) HashAggregate(keys=[key#17], functions=[sum(a#14L), last(b#15L, false)], output=[sum_a#39L, key#17, last_b#41L])
+- Exchange hashpartitioning(key#17, 5), true, [id=#77]
   +- *(1) HashAggregate(keys=[key#17], functions=[partial_sum(a#14L), partial_last(b#15L, false)], output=[key#17, sum#49L, last#50L, valueSet#51])
      +- *(1) Project [key#17, a#14L, b#15L]
         +- *(1) Filter (isnotnull(a#14L) AND (a#14L > 100))
            +- *(1) ColumnarToRow
               +- FileScan parquet [a#14L,b#15L,key#17]
```
The `Project key#17, a#14L, b#15L` is redundant because hash aggregate doesn't require child plan's output to be in a specific order.

### Why are the changes needed?

It removes unnecessary query nodes and makes query plan cleaner.

### Does this PR introduce _any_ user-facing change?

No

### How was this patch tested?

Unit tests

Closes #29031 from allisonwang-db/remove-project.

Lead-authored-by: allisonwang-db <66282705+allisonwang-db@users.noreply.github.com>
Co-authored-by: allisonwang-db <allison.wang@databricks.com>
Signed-off-by: Wenchen Fan <wenchen@databricks.com>
2020-08-11 03:14:15 +00:00
Wenchen Fan 8659ec554f [SPARK-32469][SQL] ApplyColumnarRulesAndInsertTransitions should be idempotent
### What changes were proposed in this pull request?

This PR makes `ApplyColumnarRulesAndInsertTransitions` idempotent (assuming the custom columnar rules are also idempotent).

### Why are the changes needed?

It's good hygiene to keep rules idempotent

### Does this PR introduce _any_ user-facing change?

No

### How was this patch tested?

new suite

Closes #29273 from cloud-fan/rule.

Authored-by: Wenchen Fan <wenchen@databricks.com>
Signed-off-by: Wenchen Fan <wenchen@databricks.com>
2020-08-10 16:41:13 +00:00
angerszhu d251443a02 [SPARK-32403][SQL] Refactor current ScriptTransformationExec
# What changes were proposed in this pull request?

This PR comes from the comment: #29085 (comment)

- Extract common Script IOSchema `ScriptTransformationIOSchema`
- avoid repeated judgement extract process output row method `createOutputIteratorWithoutSerde` && `createOutputIteratorWithSerde`
- add default no serde IO schemas `ScriptTransformationIOSchema.defaultIOSchema`

### Why are the changes needed?
Refactor code

### Does this PR introduce _any_ user-facing change?
NO

### How was this patch tested?
NO

Closes #29199 from AngersZhuuuu/spark-32105-followup.

Authored-by: angerszhu <angers.zhu@gmail.com>
Signed-off-by: Wenchen Fan <wenchen@databricks.com>
2020-08-10 16:37:31 +00:00
LantaoJin f80a480ee5 [SPARK-32537][SQL][TEST] Add a CTEHintSuite for test coverage
### What changes were proposed in this pull request?
Add a new test suite `CTEHintSuite`

### Why are the changes needed?
This ticket is to address the below comments to help us understand the test coverage of SQL HINT for CTE.
https://github.com/apache/spark/pull/29062#discussion_r463247491
https://github.com/apache/spark/pull/29062#discussion_r463248167

### Does this PR introduce _any_ user-facing change?
No

### How was this patch tested?
Add a test suite.

Closes #29359 from LantaoJin/SPARK-32537.

Authored-by: LantaoJin <jinlantao@gmail.com>
Signed-off-by: Wenchen Fan <wenchen@databricks.com>
2020-08-10 14:04:09 +00:00
Yuanjian Li b03761e330 [SPARK-32456][SS] Check the Distinct by assuming it as Aggregate for Structured Streaming
### What changes were proposed in this pull request?
Check the Distinct nodes by assuming it as Aggregate in `UnsupportOperationChecker` for streaming.

### Why are the changes needed?
We want to fix 2 things here:

1. Give better error message for Distinct related operations in append mode that doesn't have a watermark

We use the union streams as the example, distinct in SQL has the same issue. Since the union clause in SQL has the requirement of deduplication, the parser will generate `Distinct(Union)` and the optimizer rule `ReplaceDistinctWithAggregate` will change it to `Aggregate(Union)`. This logic is of both batch and streaming queries. However, in the streaming, the aggregation will be wrapped by state store operations so we need extra checking logic in `UnsupportOperationChecker`.

Before this change, the SS union queries in Append mode will get the following confusing error when the watermark is lacking.
```
java.util.NoSuchElementException: None.get
	at scala.None$.get(Option.scala:529)
	at scala.None$.get(Option.scala:527)
	at org.apache.spark.sql.execution.streaming.StateStoreSaveExec.$anonfun$doExecute$9(statefulOperators.scala:346)
	at scala.runtime.java8.JFunction0$mcV$sp.apply(JFunction0$mcV$sp.java:23)
	at org.apache.spark.util.Utils$.timeTakenMs(Utils.scala:561)
	at org.apache.spark.sql.execution.streaming.StateStoreWriter.timeTakenMs(statefulOperators.scala:112)
...
```

2. Make `Distinct` in complete mode runnable.

Before this fix, the distinct in complete mode will throw the exception:
```
Complete output mode not supported when there are no streaming aggregations on streaming DataFrames/Datasets;
```

### Does this PR introduce _any_ user-facing change?
Yes, return a better error message.

### How was this patch tested?
New UT added.

Closes #29256 from xuanyuanking/SPARK-32456.

Authored-by: Yuanjian Li <yuanjian.li@databricks.com>
Signed-off-by: HyukjinKwon <gurwls223@apache.org>
2020-08-10 14:01:31 +09:00
allisonwang-db 924c161544 [SPARK-32337][SQL] Show initial plan in AQE plan tree string
### What changes were proposed in this pull request?
This PR adds initial plan in `AdaptiveSparkPlanExec` and generates tree string for both current plan and initial plan. When the adaptive plan is not final, `Current Plan` will be used to indicate current physical plan, and `Final Plan` will be used when the adaptive plan is final. The difference between `Current Plan` and `Final Plan` here is that current plan indicates an intermediate state. The plan is subject to further transformations, while final plan represents an end state, which means the plan will no longer be changed.

Examples:

Before this change:
```
AdaptiveSparkPlan isFinalPlan=true
+- *(3) BroadcastHashJoin
   :- BroadcastQueryStage 2
       ...
```
`EXPLAIN FORMATTED`
```
== Physical Plan ==
AdaptiveSparkPlan (9)
+- BroadcastHashJoin Inner BuildRight (8)
   :- Project (3)
   :  +- Filter (2)
```

After this change
```
AdaptiveSparkPlan isFinalPlan=true
+- == Final Plan ==
   *(3) BroadcastHashJoin
   :- BroadcastQueryStage 2
   :  +- BroadcastExchange
           ...
+- == Initial Plan ==
   SortMergeJoin
   :- Sort
   :  +- Exchange
           ...
```

`EXPLAIN FORMATTED`
```
== Physical Plan ==
AdaptiveSparkPlan (9)
+- == Current Plan ==
   BroadcastHashJoin Inner BuildRight (8)
   :- Project (3)
   :  +- Filter (2)
+- == Initial Plan ==
   BroadcastHashJoin Inner BuildRight (8)
   :- Project (3)
   :  +- Filter (2)
```

### Why are the changes needed?
It provides better visibility into the plan change introduced by AQE.

### Does this PR introduce _any_ user-facing change?
Yes. It changed the AQE plan output string.

### How was this patch tested?
Unit test

Closes #29137 from allisonwang-db/aqe-plan.

Lead-authored-by: allisonwang-db <allison.wang@databricks.com>
Co-authored-by: allisonwang-db <66282705+allisonwang-db@users.noreply.github.com>
Signed-off-by: Wenchen Fan <wenchen@databricks.com>
2020-08-10 04:49:37 +00:00
Jungtaek Lim (HeartSaVioR) d08e73d0d3 [SPARK-30866][SS] FileStreamSource: Cache fetched list of files beyond maxFilesPerTrigger as unread files
### What changes were proposed in this pull request?

This patch caches the fetched list of files in FileStreamSource to avoid re-fetching whenever possible.

This improvement would be effective when the source options are being set to below:

* `maxFilesPerTrigger` is set
* `latestFirst` is set to `false` (default)

as

* if `maxFilesPerTrigger` is unset, Spark will process all the new files within a batch
* if `latestFirst` is set to `true`, it intends to process "latest" files which Spark has to refresh for every batch

Fetched list of files are filtered against SeenFilesMap before caching - unnecessary files are filtered in this phase. Once we cached the file, we don't check the file again for `isNewFile`, as Spark processes the files in timestamp order so cached files should have equal or later timestamp than latestTimestamp in SeenFilesMap.

Cache is only persisted in memory to simplify the logic - if we support restore cache when restarting query, we should deal with the changes of source options.

To avoid tiny set of inputs on the batch due to have tiny unread files (that could be possible when the list operation provides slightly more than the max files), this patch employs the "lower-bar" to determine whether it's helpful to retain unread files. Spark will discard unread files and do listing in the next batch if the number of unread files is lower than the specific (20% for now) ratio of max files.

This patch will have synergy with SPARK-20568 - while this patch helps to avoid redundant cost of listing, SPARK-20568 will get rid of the cost of listing for processed files. Once the query processes all files in initial load, the cost of listing for the files in initial load will be gone.

### Why are the changes needed?

Spark spends huge cost to fetch the list of files from input paths, but restricts the usage of list in a batch. If the streaming query starts from huge input data for various reasons (initial load, reprocessing, etc.) the cost to fetch the files will be applied to all batches as it is unusual to let first microbatch to process all of initial load.

SPARK-20568 will help to reduce the cost to fetch as processed files will be either deleted or moved outside of input paths, but it still won't help in early phase.

### Does this PR introduce any user-facing change?

Yes, the driver process would require more memory than before if maxFilesPerTrigger is set and latestFirst is set to "false" to cache fetched files. Previously Spark only takes some amount from left side of the list and discards remaining - so technically the peak memory would be same, but they can be freed sooner.

It may not hurt much, as peak memory is still be similar, and it would require similar amount of memory in any way when maxFilesPerTrigger is unset.

### How was this patch tested?

New unit tests. Manually tested under the test environment:

* input files
  * 171,839 files distributed evenly into 24 directories
  * each file contains 200 lines
* query: read from the "file stream source" and repartition to 50, and write to the "file stream sink"
  * maxFilesPerTrigger is set to 100

> before applying the patch

![Screen Shot 2020-02-18 at 11 53 12 PM](https://user-images.githubusercontent.com/1317309/74747932-139a8380-52ab-11ea-8920-26a40070ec32.png)

> after applying the patch

![Screen Shot 2020-02-18 at 11 56 01 PM](https://user-images.githubusercontent.com/1317309/74747860-f5cd1e80-52aa-11ea-89dd-da24e9f6ab00.png)

The area of brown color represents "latestOffset" where listing operation is performed for FileStreamSource. After the patch the cost for listing is paid "only once", whereas before the patch
it was for "every batch".

Closes #27620 from HeartSaVioR/SPARK-30866.

Authored-by: Jungtaek Lim (HeartSaVioR) <kabhwan.opensource@gmail.com>
Signed-off-by: Dongjoon Hyun <dongjoon@apache.org>
2020-08-09 20:14:15 -07:00
kujon 0ae94ad32f [SPARK-32576][SQL] Support PostgreSQL bpchar type and array of char type
### What changes were proposed in this pull request?
This PR fixes the support for char(n)[], character(n)[] data types. Prior to this change, a user would get `Unsupported type ARRAY` exception when attempting to interact with the table with such types.

The description is a bit more detailed in the [JIRA](https://issues.apache.org/jira/browse/SPARK-32393) itself, but the crux of the issue is that postgres driver names char and character types as `bpchar`. The relevant driver code can be found [here](https://github.com/pgjdbc/pgjdbc/blob/master/pgjdbc/src/main/java/org/postgresql/jdbc/TypeInfoCache.java#L85-L87). `char` is very likely to be still needed, as it seems that pg makes a distinction between `char(1)` and `char(n > 1)` as per [this code](b7fd9f3cef/pgjdbc/src/main/java/org/postgresql/core/Oid.java (L64)).

### Why are the changes needed?
For completeness of the pg dialect support.

### Does this PR introduce _any_ user-facing change?
Yes, successful reads of tables with bpchar array instead of errors after this fix.

### How was this patch tested?
Unit tests

Closes #29192 from kujon/fix_postgres_bpchar_array_support.

Authored-by: kujon <jakub.korzeniowski@vortexa.com>
Signed-off-by: Dongjoon Hyun <dongjoon@apache.org>
2020-08-09 19:03:37 -07:00
Takeshi Yamamuro 1df855bef2 [SPARK-32564][SQL][TEST][FOLLOWUP] Re-enable TPCDSQuerySuite with empty tables
### What changes were proposed in this pull request?

This is the follow-up PR of #29384 to address the cloud-fan comment: https://github.com/apache/spark/pull/29384#issuecomment-670595111
This PR re-enables `TPCDSQuerySuite` with empty tables for better test coverages.

### Why are the changes needed?

For better test coverage.

### Does this PR introduce _any_ user-facing change?

No.

### How was this patch tested?

Existing tests.

Closes #29391 from maropu/SPARK-32564-FOLLOWUP.

Authored-by: Takeshi Yamamuro <yamamuro@apache.org>
Signed-off-by: Dongjoon Hyun <dongjoon@apache.org>
2020-08-08 16:33:25 -07:00
Jungtaek Lim (HeartSaVioR) 8062c1f777 [SPARK-32555][SQL] Add unique ID on query execution
### What changes were proposed in this pull request?

This PR adds unique ID on QueryExecution, so that listeners can leverage the ID to deduplicate redundant calls.

### Why are the changes needed?

I've observed that Spark calls QueryExecutionListener multiple times on same QueryExecution instance (even same funcName for onSuccess). There's no unique ID on QueryExecution, hence it's a bit tricky if the listener would like to deal with same query execution only once.

Note that streaming query has both query ID and run ID which can be leveraged as unique ID.

### Does this PR introduce _any_ user-facing change?

Yes for who uses query execution listener - they'll see `id` field in QueryExecution and leverage it.

### How was this patch tested?

Manually tested. I think the change is obvious hence don't think it warrants a new UT. StreamingQueryListener has been using UUID as `queryId` and `runId` so it should work for the same.

Closes #29372 from HeartSaVioR/SPARK-32555.

Authored-by: Jungtaek Lim (HeartSaVioR) <kabhwan.opensource@gmail.com>
Signed-off-by: Dongjoon Hyun <dongjoon@apache.org>
2020-08-08 15:09:22 -07:00
Takeshi Yamamuro 5b8444af0d [SPARK-32564][SQL][TEST] Inject data statistics to simulate plan generation on actual TPCDS data
### What changes were proposed in this pull request?

`TPCDSQuerySuite` currently computes plans with empty TPCDS tables, then checks if plans can be generated correctly. But, the generated plans can be different from actual ones because the input tables are empty (e.g., the plans always use broadcast-hash joins, but actual ones use sort-merge joins for larger tables). To mitigate the issue, this PR defines data statistics constants extracted from generated TPCDS data in `TPCDSTableStats`, then injects the statistics via `spark.sessionState.catalog.alterTableStats` when defining TPCDS tables in `TPCDSQuerySuite`.

Please see a link below about how to extract the table statistics:
 - https://gist.github.com/maropu/f553d32c323ee803d39e2f7fa0b5a8c3

For example, the generated plans of TPCDS `q2` are different with/without this fix:
```
==== w/ this fix: q2 ====
== Physical Plan ==
* Sort (43)
+- Exchange (42)
   +- * Project (41)
      +- * SortMergeJoin Inner (40)
         :- * Sort (28)
         :  +- Exchange (27)
         :     +- * Project (26)
         :        +- * BroadcastHashJoin Inner BuildRight (25)
         :           :- * HashAggregate (19)
         :           :  +- Exchange (18)
         :           :     +- * HashAggregate (17)
         :           :        +- * Project (16)
         :           :           +- * BroadcastHashJoin Inner BuildRight (15)
         :           :              :- Union (9)
         :           :              :  :- * Project (4)
         :           :              :  :  +- * Filter (3)
         :           :              :  :     +- * ColumnarToRow (2)
         :           :              :  :        +- Scan parquet default.web_sales (1)
         :           :              :  +- * Project (8)
         :           :              :     +- * Filter (7)
         :           :              :        +- * ColumnarToRow (6)
         :           :              :           +- Scan parquet default.catalog_sales (5)
         :           :              +- BroadcastExchange (14)
         :           :                 +- * Project (13)
         :           :                    +- * Filter (12)
         :           :                       +- * ColumnarToRow (11)
         :           :                          +- Scan parquet default.date_dim (10)
         :           +- BroadcastExchange (24)
         :              +- * Project (23)
         :                 +- * Filter (22)
         :                    +- * ColumnarToRow (21)
         :                       +- Scan parquet default.date_dim (20)
         +- * Sort (39)
            +- Exchange (38)
               +- * Project (37)
                  +- * BroadcastHashJoin Inner BuildRight (36)
                     :- * HashAggregate (30)
                     :  +- ReusedExchange (29)
                     +- BroadcastExchange (35)
                        +- * Project (34)
                           +- * Filter (33)
                              +- * ColumnarToRow (32)
                                 +- Scan parquet default.date_dim (31)

==== w/o this fix: q2 ====
== Physical Plan ==
* Sort (40)
+- Exchange (39)
   +- * Project (38)
      +- * BroadcastHashJoin Inner BuildRight (37)
         :- * Project (26)
         :  +- * BroadcastHashJoin Inner BuildRight (25)
         :     :- * HashAggregate (19)
         :     :  +- Exchange (18)
         :     :     +- * HashAggregate (17)
         :     :        +- * Project (16)
         :     :           +- * BroadcastHashJoin Inner BuildRight (15)
         :     :              :- Union (9)
         :     :              :  :- * Project (4)
         :     :              :  :  +- * Filter (3)
         :     :              :  :     +- * ColumnarToRow (2)
         :     :              :  :        +- Scan parquet default.web_sales (1)
         :     :              :  +- * Project (8)
         :     :              :     +- * Filter (7)
         :     :              :        +- * ColumnarToRow (6)
         :     :              :           +- Scan parquet default.catalog_sales (5)
         :     :              +- BroadcastExchange (14)
         :     :                 +- * Project (13)
         :     :                    +- * Filter (12)
         :     :                       +- * ColumnarToRow (11)
         :     :                          +- Scan parquet default.date_dim (10)
         :     +- BroadcastExchange (24)
         :        +- * Project (23)
         :           +- * Filter (22)
         :              +- * ColumnarToRow (21)
         :                 +- Scan parquet default.date_dim (20)
         +- BroadcastExchange (36)
            +- * Project (35)
               +- * BroadcastHashJoin Inner BuildRight (34)
                  :- * HashAggregate (28)
                  :  +- ReusedExchange (27)
                  +- BroadcastExchange (33)
                     +- * Project (32)
                        +- * Filter (31)
                           +- * ColumnarToRow (30)
                              +- Scan parquet default.date_dim (29)
```

This comes from the cloud-fan comment: https://github.com/apache/spark/pull/29270#issuecomment-666098964

### Why are the changes needed?

For better test coverage.

### Does this PR introduce _any_ user-facing change?

No.

### How was this patch tested?

Existing tests.

Closes #29384 from maropu/AddTPCDSTableStats.

Authored-by: Takeshi Yamamuro <yamamuro@apache.org>
Signed-off-by: Dongjoon Hyun <dongjoon@apache.org>
2020-08-07 08:17:45 -07:00
Liang-Chi Hsieh 7b6e1d5cec [SPARK-25557][SQL] Nested column predicate pushdown for ORC
### What changes were proposed in this pull request?

We added nested column predicate pushdown for Parquet in #27728. This patch extends the feature support to ORC.

### Why are the changes needed?

Extending the feature to ORC for feature parity. Better performance for handling nested predicate pushdown.

### Does this PR introduce _any_ user-facing change?

No

### How was this patch tested?

Unit tests.

Closes #28761 from viirya/SPARK-25557.

Authored-by: Liang-Chi Hsieh <viirya@gmail.com>
Signed-off-by: Dongjoon Hyun <dongjoon@apache.org>
2020-08-07 08:07:41 -07:00
wangguangxin.cn 9a35b93c8a [SPARK-32559][SQL] Fix the trim logic in UTF8String.toInt/toLong did't handle non-ASCII characters correctly
### What changes were proposed in this pull request?
The trim logic in Cast expression introduced in https://github.com/apache/spark/pull/26622 trim non-ASCII characters unexpectly.

Before this patch
![image](https://user-images.githubusercontent.com/1312321/89513154-caad9b80-d806-11ea-9ebe-17c9e7d1b5b3.png)

After this patch
![image](https://user-images.githubusercontent.com/1312321/89513196-d731f400-d806-11ea-959c-6a7dc29dcd49.png)

### Why are the changes needed?
The behavior described above doesn't make sense, and also doesn't consistent with the behavior when cast a string to double/float, as well as doesn't consistent with the behavior of Hive

### Does this PR introduce _any_ user-facing change?
Yes

### How was this patch tested?
Added more UT

Closes #29375 from WangGuangxin/cast-bugfix.

Authored-by: wangguangxin.cn <wangguangxin.cn@gmail.com>
Signed-off-by: Wenchen Fan <wenchen@databricks.com>
2020-08-07 05:00:33 +00:00
Gengliang Wang e93b8f02cd [SPARK-32539][INFRA] Disallow FileSystem.get(Configuration conf) in style check by default
### What changes were proposed in this pull request?

Disallow `FileSystem.get(Configuration conf)` in Scala style check by default and suggest developers use `FileSystem.get(URI uri, Configuration conf)` or `Path.getFileSystem()` instead.

### Why are the changes needed?

The method `FileSystem.get(Configuration conf)` will return a default FileSystem instance if the conf `fs.file.impl` is not set. This can cause file not found exception on reading a target path of non-default file system, e.g. S3. It is hard to discover such a mistake via unit tests.
If we disallow it in Scala style check by default and suggest developers use `FileSystem.get(URI uri, Configuration conf)` or `Path.getFileSystem(Configuration conf)`, we can reduce potential regression and PR review effort.

### Does this PR introduce _any_ user-facing change?

No

### How was this patch tested?

Manually run scala style check and test.

Closes #29357 from gengliangwang/newStyleRule.

Authored-by: Gengliang Wang <gengliang.wang@databricks.com>
Signed-off-by: Wenchen Fan <wenchen@databricks.com>
2020-08-06 05:56:59 +00:00
Michael Munday 4a0427cbc1 [SPARK-32485][SQL][TEST] Fix endianness issues in tests in RecordBinaryComparatorSuite
### What changes were proposed in this pull request?
PR #26548 means that RecordBinaryComparator now uses big endian
byte order for long comparisons. However, this means that some of
the constants in the regression tests no longer map to the same
values in the comparison that they used to.

For example, one of the tests does a comparison between
Long.MIN_VALUE and 1 in order to trigger an overflow condition that
existed in the past (i.e. Long.MIN_VALUE - 1). These constants
correspond to the values 0x80..00 and 0x00..01. However on a
little-endian machine the bytes in these values are now swapped
before they are compared. This means that we will now be comparing
0x00..80 with 0x01..00. 0x00..80 - 0x01..00 does not overflow
therefore missing the original purpose of the test.

To fix this the constants are now explicitly written out in big
endian byte order to match the byte order used in the comparison.
This also fixes the tests on big endian machines (which would
otherwise get a different comparison result to the little-endian
machines).

### Why are the changes needed?
The regression tests no longer serve their initial purposes and also fail on big-endian systems.

### Does this PR introduce _any_ user-facing change?
No.

### How was this patch tested?
Tests run on big-endian system (s390x).

Closes #29259 from mundaym/fix-endian.

Authored-by: Michael Munday <mike.munday@ibm.com>
Signed-off-by: Wenchen Fan <wenchen@databricks.com>
2020-08-05 16:11:09 +00:00
Huaxin Gao b14a1e2816 [SPARK-32402][SQL] Implement ALTER TABLE in JDBC Table Catalog
### What changes were proposed in this pull request?
Implement ALTER TABLE in JDBC Table Catalog
The following ALTER TABLE are implemented:
```
ALTER TABLE table_name ADD COLUMNS ( column_name datatype [ , ... ] );
ALTER TABLE table_name RENAME COLUMN old_column_name TO new_column_name;
ALTER TABLE table_name DROP COLUMN column_name;
ALTER TABLE table_name ALTER COLUMN column_name TYPE new_type;
ALTER TABLE table_name ALTER COLUMN column_name SET NOT NULL;
```
I haven't checked ALTER TABLE syntax for all the databases yet. I will check. If there are different syntax, I will have a follow-up to override the dialect.

Seems most of the databases don't support updating comments and column position, so I didn't implement UpdateColumnComment and UpdateColumnPosition.

### Why are the changes needed?
Complete the JDBCTableCatalog implementation

### Does this PR introduce _any_ user-facing change?
Yes
`JDBCTableCatalog.alterTable`

### How was this patch tested?
add new tests

Closes #29324 from huaxingao/alter_table.

Authored-by: Huaxin Gao <huaxing@us.ibm.com>
Signed-off-by: Wenchen Fan <wenchen@databricks.com>
2020-08-05 11:57:33 +00:00
Max Gekk 7eb6f45688 [SPARK-32499][SQL] Use {} in conversions maps and structs to strings
### What changes were proposed in this pull request?
Change casting of map and struct values to strings by using the `{}` brackets instead of `[]`. The behavior is controlled by the SQL config `spark.sql.legacy.castComplexTypesToString.enabled`. When it is `true`, `CAST` wraps maps and structs by `[]` in casting to strings. Otherwise, if this is `false`, which is the default, maps and structs are wrapped by `{}`.

### Why are the changes needed?
- To distinguish structs/maps from arrays.
- To make `show`'s output consistent with Hive and conversions to Hive strings.
- To display dataframe content in the same form by `spark-sql` and `show`
- To be consistent with the `*.sql` tests

### Does this PR introduce _any_ user-facing change?
Yes

### How was this patch tested?
By existing test suite `CastSuite`.

Closes #29308 from MaxGekk/show-struct-map.

Authored-by: Max Gekk <max.gekk@gmail.com>
Signed-off-by: Wenchen Fan <wenchen@databricks.com>
2020-08-04 14:57:09 +00:00
fqaiser94@gmail.com 6d69068057 [SPARK-32521][SQL] Bug-fix: WithFields Expression should not be foldable
### What changes were proposed in this pull request?

Make WithFields Expression not foldable.

### Why are the changes needed?

The following query currently fails on master brach:
```
sql("SELECT named_struct('a', 1, 'b', 2) a")
.select($"a".withField("c", lit(3)).as("a"))
.show(false)
// java.lang.UnsupportedOperationException: Cannot evaluate expression: with_fields(named_struct(a, 1, b, 2), c, 3)
```
This happens because the Catalyst optimizer sees that the WithFields Expression is foldable and tries to statically evaluate the WithFields Expression (via the ConstantFolding rule), however it cannot do so because WithFields Expression is Unevaluable.

### Does this PR introduce _any_ user-facing change?

Yes, queries like the one shared above will now succeed.
That said, this bug was introduced in Spark 3.1.0 which has yet to be released.

### How was this patch tested?

A new unit test was added.

Closes #29338 from fqaiser94/SPARK-32521.

Lead-authored-by: fqaiser94@gmail.com <fqaiser94@gmail.com>
Co-authored-by: fqaiser94 <fqaiser94@gmail.com>
Signed-off-by: Wenchen Fan <wenchen@databricks.com>
2020-08-04 12:11:04 +00:00
Dongjoon Hyun 7fec6e0c16 [SPARK-32524][SQL][TESTS] CachedBatchSerializerSuite should clean up InMemoryRelation.ser
### What changes were proposed in this pull request?

This PR aims to clean up `InMemoryRelation.ser` in `CachedBatchSerializerSuite`.

### Why are the changes needed?

SPARK-32274 makes SQL cache serialization pluggable.
```
[SPARK-32274][SQL] Make SQL cache serialization pluggable
```

This causes UT failures.
```
$ build/sbt "sql/testOnly *.CachedBatchSerializerSuite *.CachedTableSuite"
...
[info]   Cause: java.lang.IllegalStateException: This does not work. This is only for testing
[info]   at org.apache.spark.sql.execution.columnar.TestSingleIntColumnarCachedBatchSerializer.convertInternalRowToCachedBatch(CachedBatchSerializerSuite.scala:49)
...
[info] *** 30 TESTS FAILED ***
[error] Failed: Total 51, Failed 30, Errors 0, Passed 21
[error] Failed tests:
[error] 	org.apache.spark.sql.CachedTableSuite
[error] (sql/test:testOnly) sbt.TestsFailedException: Tests unsuccessful
```

### Does this PR introduce _any_ user-facing change?

No.

### How was this patch tested?

Manually.
```
$ build/sbt "sql/testOnly *.CachedBatchSerializerSuite *.CachedTableSuite"
[info] Tests: succeeded 51, failed 0, canceled 0, ignored 0, pending 0
[info] All tests passed.
[info] Passed: Total 51, Failed 0, Errors 0, Passed 51
```

Closes #29346 from dongjoon-hyun/SPARK-32524-3.

Authored-by: Dongjoon Hyun <dongjoon@apache.org>
Signed-off-by: HyukjinKwon <gurwls223@apache.org>
2020-08-04 17:49:52 +09:00
gengjiaan 1597d8fcd4 [SPARK-30276][SQL] Support Filter expression allows simultaneous use of DISTINCT
### What changes were proposed in this pull request?
This PR is related to https://github.com/apache/spark/pull/26656.
https://github.com/apache/spark/pull/26656 only support use FILTER clause on aggregate expression without DISTINCT.
This PR will enhance this feature when one or more DISTINCT aggregate expressions which allows the use of the FILTER clause.
Such as:
```
select sum(distinct id) filter (where sex = 'man') from student;
select class_id, sum(distinct id) filter (where sex = 'man') from student group by class_id;
select count(id) filter (where class_id = 1), sum(distinct id) filter (where sex = 'man') from student;
select class_id, count(id) filter (where class_id = 1), sum(distinct id) filter (where sex = 'man') from student group by class_id;
select sum(distinct id), sum(distinct id) filter (where sex = 'man') from student;
select class_id, sum(distinct id), sum(distinct id) filter (where sex = 'man') from student group by class_id;
select class_id, count(id), count(id) filter (where class_id = 1), sum(distinct id), sum(distinct id) filter (where sex = 'man') from student group by class_id;
```

### Why are the changes needed?
Spark SQL only support use FILTER clause on aggregate expression without DISTINCT.
This PR support Filter expression allows simultaneous use of DISTINCT

### Does this PR introduce _any_ user-facing change?
Yes

### How was this patch tested?
Exists and new UT

Closes #29291 from beliefer/support-distinct-with-filter.

Lead-authored-by: gengjiaan <gengjiaan@360.cn>
Co-authored-by: beliefer <beliefer@163.com>
Signed-off-by: Wenchen Fan <wenchen@databricks.com>
2020-08-04 04:41:19 +00:00
Takuya UESHIN 7deb67c28f [SPARK-32160][CORE][PYSPARK][FOLLOWUP] Change the config name to switch allow/disallow SparkContext in executors
### What changes were proposed in this pull request?

This is a follow-up of #29278.
This PR changes the config name to switch allow/disallow `SparkContext` in executors as per the comment https://github.com/apache/spark/pull/29278#pullrequestreview-460256338.

### Why are the changes needed?

The config name `spark.executor.allowSparkContext` is more reasonable.

### Does this PR introduce _any_ user-facing change?

Yes, the config name is changed.

### How was this patch tested?

Updated tests.

Closes #29340 from ueshin/issues/SPARK-32160/change_config_name.

Authored-by: Takuya UESHIN <ueshin@databricks.com>
Signed-off-by: HyukjinKwon <gurwls223@apache.org>
2020-08-04 12:45:06 +09:00
Takeshi Yamamuro c6109ba918 [SPARK-32257][SQL] Reports explicit errors for invalid usage of SET/RESET command
### What changes were proposed in this pull request?

This PR modified the parser code to handle invalid usages of a SET/RESET command.
For example;
```
SET spark.sql.ansi.enabled true
```
The above SQL command does not change the configuration value and it just tries to display the value of the configuration
`spark.sql.ansi.enabled true`. This PR disallows using special characters including spaces in the configuration name and reports a user-friendly error instead. In the error message, it tells users a workaround to use quotes or a string literal if they still needs to specify a configuration with them. 

Before this PR:
```
scala> sql("SET spark.sql.ansi.enabled true").show(1, -1)
+---------------------------+-----------+
|key                        |value      |
+---------------------------+-----------+
|spark.sql.ansi.enabled true|<undefined>|
+---------------------------+-----------+
```

After this PR:
```
scala> sql("SET spark.sql.ansi.enabled true")
org.apache.spark.sql.catalyst.parser.ParseException:
Expected format is 'SET', 'SET key', or 'SET key=value'. If you want to include special characters in key, please use quotes, e.g., SET `ke y`=value.(line 1, pos 0)

== SQL ==
SET spark.sql.ansi.enabled true
^^^
```

### Why are the changes needed?

For better user-friendly errors.

### Does this PR introduce _any_ user-facing change?

No.

### How was this patch tested?

Added tests in `SparkSqlParserSuite`.

Closes #29146 from maropu/SPARK-32257.

Lead-authored-by: Takeshi Yamamuro <yamamuro@apache.org>
Co-authored-by: Wenchen Fan <wenchen@databricks.com>
Signed-off-by: Wenchen Fan <wenchen@databricks.com>
2020-08-03 13:00:07 +00:00
Kent Yao 3deb59d5c2 [SPARK-31709][SQL] Proper base path for database/table location when it is a relative path
### What changes were proposed in this pull request?

Currently, the user home directory is used as the base path for the database and table locations when their locationa are specified with a relative paths, e.g.
```sql
> set spark.sql.warehouse.dir;
spark.sql.warehouse.dir	file:/Users/kentyao/Downloads/spark/spark-3.1.0-SNAPSHOT-bin-20200512/spark-warehouse/
spark-sql> create database loctest location 'loctestdbdir';

spark-sql> desc database loctest;
Database Name	loctest
Comment
Location	file:/Users/kentyao/Downloads/spark/spark-3.1.0-SNAPSHOT-bin-20200512/loctestdbdir
Owner	kentyao

spark-sql> create table loctest(id int) location 'loctestdbdir';
spark-sql> desc formatted loctest;
id	int	NULL

# Detailed Table Information
Database	default
Table	loctest
Owner	kentyao
Created Time	Thu May 14 16:29:05 CST 2020
Last Access	UNKNOWN
Created By	Spark 3.1.0-SNAPSHOT
Type	EXTERNAL
Provider	parquet
Location	file:/Users/kentyao/Downloads/spark/spark-3.1.0-SNAPSHOT-bin-20200512/loctestdbdir
Serde Library	org.apache.hadoop.hive.ql.io.parquet.serde.ParquetHiveSerDe
InputFormat	org.apache.hadoop.hive.ql.io.parquet.MapredParquetInputFormat
OutputFormat	org.apache.hadoop.hive.ql.io.parquet.MapredParquetOutputFormat
```
The user home is not always warehouse-related, unchangeable in runtime, and shared both by database and table as the parent directory. Meanwhile, we use the table path as the parent directory for relative partition locations.

The config `spark.sql.warehouse.dir` represents `the default location for managed databases and tables`.
For databases, the case above seems not to follow its semantics, because it should use ` `spark.sql.warehouse.dir` as the base path instead.

For tables, it seems to be right but here I suggest enriching the meaning that lets it also be the for external tables with relative paths for locations.

With changes in this PR,

The location of a database will be `warehouseDir/dbpath` when `dbpath` is relative.
The location of a table will be `dbpath/tblpath` when `tblpath` is relative.

### Why are the changes needed?

bugfix and improvement

Firstly, the databases with relative locations should be created under the default location specified by `spark.sql.warehouse.dir`.

Secondly, the external tables with relative paths may also follow this behavior for consistency.

At last, the behavior for database, tables and partitions with relative paths to choose base paths should be the same.

### Does this PR introduce _any_ user-facing change?

Yes, this PR changes the `createDatabase`, `alterDatabase`, `createTable` and `alterTable` APIs and related DDLs. If the LOCATION clause is followed by a relative path, the root path will be `spark.sql.warehouse.dir` for databases, and `spark.sql.warehouse.dir` / `dbPath` for tables.

e.g.

#### after
```sql
spark-sql> desc database loctest;
Database Name	loctest
Comment
Location	file:/Users/kentyao/Downloads/spark/spark-3.1.0-SNAPSHOT-bin-SPARK-31709/spark-warehouse/loctest
Owner	kentyao
spark-sql> use loctest;
spark-sql> create table loctest(id int) location 'loctest';
20/05/14 18:18:02 WARN InMemoryFileIndex: The directory file:/Users/kentyao/Downloads/spark/spark-3.1.0-SNAPSHOT-bin-SPARK-31709/loctest was not found. Was it deleted very recently?
20/05/14 18:18:02 WARN SessionState: METASTORE_FILTER_HOOK will be ignored, since hive.security.authorization.manager is set to instance of HiveAuthorizerFactory.
20/05/14 18:18:03 WARN HiveConf: HiveConf of name hive.internal.ss.authz.settings.applied.marker does not exist
20/05/14 18:18:03 WARN HiveConf: HiveConf of name hive.stats.jdbc.timeout does not exist
20/05/14 18:18:03 WARN HiveConf: HiveConf of name hive.stats.retries.wait does not exist
spark-sql> desc formatted loctest;
id	int	NULL

# Detailed Table Information
Database	loctest
Table	loctest
Owner	kentyao
Created Time	Thu May 14 18:18:03 CST 2020
Last Access	UNKNOWN
Created By	Spark 3.1.0-SNAPSHOT
Type	EXTERNAL
Provider	parquet
Location	file:/Users/kentyao/Downloads/spark/spark-3.1.0-SNAPSHOT-bin-SPARK-31709/spark-warehouse/loctest/loctest
Serde Library	org.apache.hadoop.hive.ql.io.parquet.serde.ParquetHiveSerDe
InputFormat	org.apache.hadoop.hive.ql.io.parquet.MapredParquetInputFormat
OutputFormat	org.apache.hadoop.hive.ql.io.parquet.MapredParquetOutputFormat
spark-sql> alter table loctest set location 'loctest2'
         > ;
spark-sql> desc formatted loctest;
id	int	NULL

# Detailed Table Information
Database	loctest
Table	loctest
Owner	kentyao
Created Time	Thu May 14 18:18:03 CST 2020
Last Access	UNKNOWN
Created By	Spark 3.1.0-SNAPSHOT
Type	EXTERNAL
Provider	parquet
Location	file:/Users/kentyao/Downloads/spark/spark-3.1.0-SNAPSHOT-bin-SPARK-31709/spark-warehouse/loctest/loctest2
Serde Library	org.apache.hadoop.hive.ql.io.parquet.serde.ParquetHiveSerDe
InputFormat	org.apache.hadoop.hive.ql.io.parquet.MapredParquetInputFormat
OutputFormat	org.apache.hadoop.hive.ql.io.parquet.MapredParquetOutputFormat
```
### How was this patch tested?

Add unit tests.

Closes #28527 from yaooqinn/SPARK-31709.

Authored-by: Kent Yao <yaooqinn@hotmail.com>
Signed-off-by: Wenchen Fan <wenchen@databricks.com>
2020-08-03 12:48:22 +00:00
beliefer 42f9ee4c7d [SPARK-24884][SQL] Support regexp function regexp_extract_all
### What changes were proposed in this pull request?
`regexp_extract_all` is a very useful function expanded the capabilities of `regexp_extract`.
There are some description of this function.
```
SELECT regexp_extract('1a 2b 14m', '\d+', 0); -- 1
SELECT regexp_extract_all('1a 2b 14m', '\d+', 0); -- [1, 2, 14]
SELECT regexp_extract('1a 2b 14m', '(\d+)([a-z]+)', 2); -- 'a'
SELECT regexp_extract_all('1a 2b 14m', '(\d+)([a-z]+)', 2); -- ['a', 'b', 'm']
```
There are some mainstream database support the syntax.
**Presto:**
https://prestodb.io/docs/current/functions/regexp.html

**Pig:**
https://pig.apache.org/docs/latest/api/org/apache/pig/builtin/REGEX_EXTRACT_ALL.html

Note: This PR pick up the work of https://github.com/apache/spark/pull/21985
### Why are the changes needed?
`regexp_extract_all` is a very useful function and make work easier.

### Does this PR introduce any user-facing change?
No

### How was this patch tested?
New UT

Closes #27507 from beliefer/support-regexp_extract_all.

Lead-authored-by: beliefer <beliefer@163.com>
Co-authored-by: gengjiaan <gengjiaan@360.cn>
Co-authored-by: Jiaan Geng <beliefer@163.com>
Signed-off-by: Wenchen Fan <wenchen@databricks.com>
2020-08-03 06:03:55 +00:00
Prakhar Jain 7a09e71198 [SPARK-32509][SQL] Ignore unused DPP True Filter in Canonicalization
### What changes were proposed in this pull request?
This PR fixes issues relate to Canonicalization of FileSourceScanExec when it contains unused DPP Filter.

### Why are the changes needed?

As part of PlanDynamicPruningFilter rule, the unused DPP Filter are simply replaced by `DynamicPruningExpression(TrueLiteral)` so that they can be avoided. But these unnecessary`DynamicPruningExpression(TrueLiteral)` partition filter inside the FileSourceScanExec affects the canonicalization of the node and so in many cases, this can prevent ReuseExchange from happening.

This PR fixes this issue by ignoring the unused DPP filter in the `def doCanonicalize` method.

### Does this PR introduce _any_ user-facing change?
No

### How was this patch tested?
Added UT.

Closes #29318 from prakharjain09/SPARK-32509_df_reuse.

Authored-by: Prakhar Jain <prakharjain09@gmail.com>
Signed-off-by: Wenchen Fan <wenchen@databricks.com>
2020-08-03 03:26:03 +00:00
Max Gekk fda397d9c8 [SPARK-32510][SQL] Check duplicate nested columns in read from JDBC datasource
### What changes were proposed in this pull request?
Check that there are not duplicate column names on the same level (top level or nested levels) in reading from JDBC datasource. If such duplicate columns exist, throw the exception:
```
org.apache.spark.sql.AnalysisException: Found duplicate column(s) in the customSchema option value:
```
The check takes into account the SQL config `spark.sql.caseSensitive` (`false` by default).

### Why are the changes needed?
To make handling of duplicate nested columns is similar to handling of duplicate top-level columns i. e. output the same error:
```Scala
org.apache.spark.sql.AnalysisException: Found duplicate column(s) in the customSchema option value: `camelcase`
```

Checking of top-level duplicates was introduced by https://github.com/apache/spark/pull/17758, and duplicates in nested structures by https://github.com/apache/spark/pull/29234.

### Does this PR introduce _any_ user-facing change?
Yes.

### How was this patch tested?
Added new test suite `JdbcNestedDataSourceSuite`.

Closes #29317 from MaxGekk/jdbc-dup-nested-columns.

Authored-by: Max Gekk <max.gekk@gmail.com>
Signed-off-by: Wenchen Fan <wenchen@databricks.com>
2020-08-03 03:20:17 +00:00
Robert (Bobby) Evans 713124d5e3 [SPARK-32274][SQL] Make SQL cache serialization pluggable
### What changes were proposed in this pull request?

Add a config to let users change how SQL/Dataframe data is compressed when cached.

This adds a few new classes/APIs for use with this config.

1. `CachedBatch` is a trait used to tag data that is intended to be cached. It has a few APIs that lets us keep the compression/serialization of the data separate from the metrics about it.
2. `CachedBatchSerializer` provides the APIs that must be implemented to cache data.
    * `convertForCache` is an API that runs a cached spark plan and turns its result into an `RDD[CachedBatch]`.  The actual caching is done outside of this API
   * `buildFilter` is an API that takes a set of predicates and builds a filter function that can be used to filter the `RDD[CachedBatch]` returned by `convertForCache`
   * `decompressColumnar` decompresses an `RDD[CachedBatch]` into an `RDD[ColumnarBatch]` This is only used for a limited set of data types.  These data types may expand in the future.  If they do we can add in a new API with a default value that says which data types this serializer supports.
   * `decompressToRows` decompresses an `RDD[CachedBatch]` into an `RDD[InternalRow]` this API, like `decompressColumnar` decompresses the data in `CachedBatch` but turns it into `InternalRow`s, typically using code generation for performance reasons.

There is also an API that lets you reuse the current filtering based on min/max values. `SimpleMetricsCachedBatch` and `SimpleMetricsCachedBatchSerializer`.

### Why are the changes needed?

This lets users explore different types of compression and compression ratios.

### Does this PR introduce _any_ user-facing change?

This adds in a single config, and exposes some developer API classes described above.

### How was this patch tested?

I ran the unit tests around this and I also did some manual performance tests.  I could find any performance difference between the old and new code, and if there is any it is within error.

Closes #29067 from revans2/pluggable_cache_serializer.

Authored-by: Robert (Bobby) Evans <bobby@apache.org>
Signed-off-by: Wenchen Fan <wenchen@databricks.com>
2020-08-03 03:15:54 +00:00
Wenchen Fan 1c6dff7b5f [SPARK-32083][SQL] AQE coalesce should at least return one partition
### What changes were proposed in this pull request?

This PR updates the AQE framework to at least return one partition during coalescing.

This PR also updates `ShuffleExchangeExec.canChangeNumPartitions` to not coalesce for `SinglePartition`.

### Why are the changes needed?

It's a bit risky to return 0 partitions, as sometimes it's different from empty data. For example, global aggregate will return one result row even if the input table is empty. If there is 0 partition, no task will be run and no result will be returned. More specifically, the global aggregate requires `AllTuples` and we can't coalesce to 0 partitions.

This is not a real bug for now. The global aggregate will be planned as partial and final physical agg nodes. The partial agg will return at least one row, so that the shuffle still have data. But it's better to fix this issue to avoid potential bugs in the future.

According to https://github.com/apache/spark/pull/28916, this change also fix some perf problems.

### Does this PR introduce _any_ user-facing change?

no

### How was this patch tested?

updated test.

Closes #29307 from cloud-fan/aqe.

Authored-by: Wenchen Fan <wenchen@databricks.com>
Signed-off-by: Wenchen Fan <wenchen@databricks.com>
2020-07-31 14:20:20 +00:00
Kent Yao f4800406a4 [SPARK-32406][SQL][FOLLOWUP] Make RESET fail against static and core configs
### What changes were proposed in this pull request?

This followup addresses comments from https://github.com/apache/spark/pull/29202#discussion_r462054784

1. make RESET static SQL configs/spark core configs fail as same as the SET command. Not that, for core ones, they have to be pre-registered, otherwise, they are still able to be SET/RESET

2. add test cases for configurations w/ optional default values

### Why are the changes needed?

behavior change with suggestions from PMCs

### Does this PR introduce _any_ user-facing change?

Yes, RESET will fail after this PR, before it just does nothing because the static ones are static.

### How was this patch tested?

add more tests.

Closes #29297 from yaooqinn/SPARK-32406-F.

Authored-by: Kent Yao <yaooqinn@hotmail.com>
Signed-off-by: HyukjinKwon <gurwls223@apache.org>
2020-07-31 22:11:05 +09:00
Takuya UESHIN 8014b0b5d6 [SPARK-32160][CORE][PYSPARK] Add a config to switch allow/disallow to create SparkContext in executors
### What changes were proposed in this pull request?

This is a follow-up of #28986.
This PR adds a config to switch allow/disallow to create `SparkContext` in executors.

- `spark.driver.allowSparkContextInExecutors`

### Why are the changes needed?

Some users or libraries actually create `SparkContext` in executors.
We shouldn't break their workloads.

### Does this PR introduce _any_ user-facing change?

Yes, users will be able to create `SparkContext` in executors with the config enabled.

### How was this patch tested?

More tests are added.

Closes #29278 from ueshin/issues/SPARK-32160/add_configs.

Authored-by: Takuya UESHIN <ueshin@databricks.com>
Signed-off-by: HyukjinKwon <gurwls223@apache.org>
2020-07-31 17:28:35 +09:00
Cheng Su ae82768c13 [SPARK-32421][SQL] Add code-gen for shuffled hash join
### What changes were proposed in this pull request?

Adding codegen for shuffled hash join. Shuffled hash join codegen is very similar to broadcast hash join codegen. So most of code change is to refactor existing codegen in `BroadcastHashJoinExec` to `HashJoin`.

Example codegen for query in [`JoinBenchmark`](https://github.com/apache/spark/blob/master/sql/core/src/test/scala/org/apache/spark/sql/execution/benchmark/JoinBenchmark.scala#L153):

```
  def shuffleHashJoin(): Unit = {
    val N: Long = 4 << 20
    withSQLConf(
      SQLConf.SHUFFLE_PARTITIONS.key -> "2",
      SQLConf.AUTO_BROADCASTJOIN_THRESHOLD.key -> "10000000",
      SQLConf.PREFER_SORTMERGEJOIN.key -> "false") {
      codegenBenchmark("shuffle hash join", N) {
        val df1 = spark.range(N).selectExpr(s"id as k1")
        val df2 = spark.range(N / 3).selectExpr(s"id * 3 as k2")
        val df = df1.join(df2, col("k1") === col("k2"))
        assert(df.queryExecution.sparkPlan.find(_.isInstanceOf[ShuffledHashJoinExec]).isDefined)
        df.noop()
      }
    }
  }
```

Shuffled hash join codegen:

```
== Subtree 3 / 3 (maxMethodCodeSize:113; maxConstantPoolSize:126(0.19% used); numInnerClasses:0) ==
*(3) ShuffledHashJoin [k1#2L], [k2#6L], Inner, BuildRight
:- *(1) Project [id#0L AS k1#2L]
:  +- *(1) Range (0, 4194304, step=1, splits=1)
+- *(2) Project [(id#4L * 3) AS k2#6L]
   +- *(2) Range (0, 1398101, step=1, splits=1)

Generated code:
/* 001 */ public Object generate(Object[] references) {
/* 002 */   return new GeneratedIteratorForCodegenStage3(references);
/* 003 */ }
/* 004 */
/* 005 */ // codegenStageId=3
/* 006 */ final class GeneratedIteratorForCodegenStage3 extends org.apache.spark.sql.execution.BufferedRowIterator {
/* 007 */   private Object[] references;
/* 008 */   private scala.collection.Iterator[] inputs;
/* 009 */   private scala.collection.Iterator inputadapter_input_0;
/* 010 */   private org.apache.spark.sql.execution.joins.HashedRelation shj_relation_0;
/* 011 */   private org.apache.spark.sql.catalyst.expressions.codegen.UnsafeRowWriter[] shj_mutableStateArray_0 = new org.apache.spark.sql.catalyst.expressions.codegen.UnsafeRowWriter[1];
/* 012 */
/* 013 */   public GeneratedIteratorForCodegenStage3(Object[] references) {
/* 014 */     this.references = references;
/* 015 */   }
/* 016 */
/* 017 */   public void init(int index, scala.collection.Iterator[] inputs) {
/* 018 */     partitionIndex = index;
/* 019 */     this.inputs = inputs;
/* 020 */     inputadapter_input_0 = inputs[0];
/* 021 */     shj_relation_0 = ((org.apache.spark.sql.execution.joins.ShuffledHashJoinExec) references[0] /* plan */).buildHashedRelation(inputs[1]);
/* 022 */     shj_mutableStateArray_0[0] = new org.apache.spark.sql.catalyst.expressions.codegen.UnsafeRowWriter(2, 0);
/* 023 */
/* 024 */   }
/* 025 */
/* 026 */   private void shj_doConsume_0(InternalRow inputadapter_row_0, long shj_expr_0_0) throws java.io.IOException {
/* 027 */     // generate join key for stream side
/* 028 */
/* 029 */     // find matches from HashRelation
/* 030 */     scala.collection.Iterator shj_matches_0 = false ?
/* 031 */     null : (scala.collection.Iterator)shj_relation_0.get(shj_expr_0_0);
/* 032 */     if (shj_matches_0 != null) {
/* 033 */       while (shj_matches_0.hasNext()) {
/* 034 */         UnsafeRow shj_matched_0 = (UnsafeRow) shj_matches_0.next();
/* 035 */         {
/* 036 */           ((org.apache.spark.sql.execution.metric.SQLMetric) references[1] /* numOutputRows */).add(1);
/* 037 */
/* 038 */           long shj_value_1 = shj_matched_0.getLong(0);
/* 039 */           shj_mutableStateArray_0[0].reset();
/* 040 */
/* 041 */           shj_mutableStateArray_0[0].write(0, shj_expr_0_0);
/* 042 */
/* 043 */           shj_mutableStateArray_0[0].write(1, shj_value_1);
/* 044 */           append((shj_mutableStateArray_0[0].getRow()).copy());
/* 045 */
/* 046 */         }
/* 047 */       }
/* 048 */     }
/* 049 */
/* 050 */   }
/* 051 */
/* 052 */   protected void processNext() throws java.io.IOException {
/* 053 */     while ( inputadapter_input_0.hasNext()) {
/* 054 */       InternalRow inputadapter_row_0 = (InternalRow) inputadapter_input_0.next();
/* 055 */
/* 056 */       long inputadapter_value_0 = inputadapter_row_0.getLong(0);
/* 057 */
/* 058 */       shj_doConsume_0(inputadapter_row_0, inputadapter_value_0);
/* 059 */       if (shouldStop()) return;
/* 060 */     }
/* 061 */   }
/* 062 */
/* 063 */ }
```

Broadcast hash join codegen for the same query (for reference here):

```
== Subtree 2 / 2 (maxMethodCodeSize:280; maxConstantPoolSize:218(0.33% used); numInnerClasses:0) ==
*(2) BroadcastHashJoin [k1#2L], [k2#6L], Inner, BuildRight, false
:- *(2) Project [id#0L AS k1#2L]
:  +- *(2) Range (0, 4194304, step=1, splits=1)
+- BroadcastExchange HashedRelationBroadcastMode(List(input[0, bigint, false]),false), [id=#22]
   +- *(1) Project [(id#4L * 3) AS k2#6L]
      +- *(1) Range (0, 1398101, step=1, splits=1)

Generated code:
/* 001 */ public Object generate(Object[] references) {
/* 002 */   return new GeneratedIteratorForCodegenStage2(references);
/* 003 */ }
/* 004 */
/* 005 */ // codegenStageId=2
/* 006 */ final class GeneratedIteratorForCodegenStage2 extends org.apache.spark.sql.execution.BufferedRowIterator {
/* 007 */   private Object[] references;
/* 008 */   private scala.collection.Iterator[] inputs;
/* 009 */   private boolean range_initRange_0;
/* 010 */   private long range_nextIndex_0;
/* 011 */   private TaskContext range_taskContext_0;
/* 012 */   private InputMetrics range_inputMetrics_0;
/* 013 */   private long range_batchEnd_0;
/* 014 */   private long range_numElementsTodo_0;
/* 015 */   private org.apache.spark.sql.execution.joins.LongHashedRelation bhj_relation_0;
/* 016 */   private org.apache.spark.sql.catalyst.expressions.codegen.UnsafeRowWriter[] range_mutableStateArray_0 = new org.apache.spark.sql.catalyst.expressions.codegen.UnsafeRowWriter[4];
/* 017 */
/* 018 */   public GeneratedIteratorForCodegenStage2(Object[] references) {
/* 019 */     this.references = references;
/* 020 */   }
/* 021 */
/* 022 */   public void init(int index, scala.collection.Iterator[] inputs) {
/* 023 */     partitionIndex = index;
/* 024 */     this.inputs = inputs;
/* 025 */
/* 026 */     range_taskContext_0 = TaskContext.get();
/* 027 */     range_inputMetrics_0 = range_taskContext_0.taskMetrics().inputMetrics();
/* 028 */     range_mutableStateArray_0[0] = new org.apache.spark.sql.catalyst.expressions.codegen.UnsafeRowWriter(1, 0);
/* 029 */     range_mutableStateArray_0[1] = new org.apache.spark.sql.catalyst.expressions.codegen.UnsafeRowWriter(1, 0);
/* 030 */     range_mutableStateArray_0[2] = new org.apache.spark.sql.catalyst.expressions.codegen.UnsafeRowWriter(1, 0);
/* 031 */
/* 032 */     bhj_relation_0 = ((org.apache.spark.sql.execution.joins.LongHashedRelation) ((org.apache.spark.broadcast.TorrentBroadcast) references[1] /* broadcast */).value()).asReadOnlyCopy();
/* 033 */     incPeakExecutionMemory(bhj_relation_0.estimatedSize());
/* 034 */
/* 035 */     range_mutableStateArray_0[3] = new org.apache.spark.sql.catalyst.expressions.codegen.UnsafeRowWriter(2, 0);
/* 036 */
/* 037 */   }
/* 038 */
/* 039 */   private void initRange(int idx) {
/* 040 */     java.math.BigInteger index = java.math.BigInteger.valueOf(idx);
/* 041 */     java.math.BigInteger numSlice = java.math.BigInteger.valueOf(1L);
/* 042 */     java.math.BigInteger numElement = java.math.BigInteger.valueOf(4194304L);
/* 043 */     java.math.BigInteger step = java.math.BigInteger.valueOf(1L);
/* 044 */     java.math.BigInteger start = java.math.BigInteger.valueOf(0L);
/* 045 */     long partitionEnd;
/* 046 */
/* 047 */     java.math.BigInteger st = index.multiply(numElement).divide(numSlice).multiply(step).add(start);
/* 048 */     if (st.compareTo(java.math.BigInteger.valueOf(Long.MAX_VALUE)) > 0) {
/* 049 */       range_nextIndex_0 = Long.MAX_VALUE;
/* 050 */     } else if (st.compareTo(java.math.BigInteger.valueOf(Long.MIN_VALUE)) < 0) {
/* 051 */       range_nextIndex_0 = Long.MIN_VALUE;
/* 052 */     } else {
/* 053 */       range_nextIndex_0 = st.longValue();
/* 054 */     }
/* 055 */     range_batchEnd_0 = range_nextIndex_0;
/* 056 */
/* 057 */     java.math.BigInteger end = index.add(java.math.BigInteger.ONE).multiply(numElement).divide(numSlice)
/* 058 */     .multiply(step).add(start);
/* 059 */     if (end.compareTo(java.math.BigInteger.valueOf(Long.MAX_VALUE)) > 0) {
/* 060 */       partitionEnd = Long.MAX_VALUE;
/* 061 */     } else if (end.compareTo(java.math.BigInteger.valueOf(Long.MIN_VALUE)) < 0) {
/* 062 */       partitionEnd = Long.MIN_VALUE;
/* 063 */     } else {
/* 064 */       partitionEnd = end.longValue();
/* 065 */     }
/* 066 */
/* 067 */     java.math.BigInteger startToEnd = java.math.BigInteger.valueOf(partitionEnd).subtract(
/* 068 */       java.math.BigInteger.valueOf(range_nextIndex_0));
/* 069 */     range_numElementsTodo_0  = startToEnd.divide(step).longValue();
/* 070 */     if (range_numElementsTodo_0 < 0) {
/* 071 */       range_numElementsTodo_0 = 0;
/* 072 */     } else if (startToEnd.remainder(step).compareTo(java.math.BigInteger.valueOf(0L)) != 0) {
/* 073 */       range_numElementsTodo_0++;
/* 074 */     }
/* 075 */   }
/* 076 */
/* 077 */   private void bhj_doConsume_0(long bhj_expr_0_0) throws java.io.IOException {
/* 078 */     // generate join key for stream side
/* 079 */
/* 080 */     // find matches from HashedRelation
/* 081 */     UnsafeRow bhj_matched_0 = false ? null: (UnsafeRow)bhj_relation_0.getValue(bhj_expr_0_0);
/* 082 */     if (bhj_matched_0 != null) {
/* 083 */       {
/* 084 */         ((org.apache.spark.sql.execution.metric.SQLMetric) references[2] /* numOutputRows */).add(1);
/* 085 */
/* 086 */         long bhj_value_2 = bhj_matched_0.getLong(0);
/* 087 */         range_mutableStateArray_0[3].reset();
/* 088 */
/* 089 */         range_mutableStateArray_0[3].write(0, bhj_expr_0_0);
/* 090 */
/* 091 */         range_mutableStateArray_0[3].write(1, bhj_value_2);
/* 092 */         append((range_mutableStateArray_0[3].getRow()));
/* 093 */
/* 094 */       }
/* 095 */     }
/* 096 */
/* 097 */   }
/* 098 */
/* 099 */   protected void processNext() throws java.io.IOException {
/* 100 */     // initialize Range
/* 101 */     if (!range_initRange_0) {
/* 102 */       range_initRange_0 = true;
/* 103 */       initRange(partitionIndex);
/* 104 */     }
/* 105 */
/* 106 */     while (true) {
/* 107 */       if (range_nextIndex_0 == range_batchEnd_0) {
/* 108 */         long range_nextBatchTodo_0;
/* 109 */         if (range_numElementsTodo_0 > 1000L) {
/* 110 */           range_nextBatchTodo_0 = 1000L;
/* 111 */           range_numElementsTodo_0 -= 1000L;
/* 112 */         } else {
/* 113 */           range_nextBatchTodo_0 = range_numElementsTodo_0;
/* 114 */           range_numElementsTodo_0 = 0;
/* 115 */           if (range_nextBatchTodo_0 == 0) break;
/* 116 */         }
/* 117 */         range_batchEnd_0 += range_nextBatchTodo_0 * 1L;
/* 118 */       }
/* 119 */
/* 120 */       int range_localEnd_0 = (int)((range_batchEnd_0 - range_nextIndex_0) / 1L);
/* 121 */       for (int range_localIdx_0 = 0; range_localIdx_0 < range_localEnd_0; range_localIdx_0++) {
/* 122 */         long range_value_0 = ((long)range_localIdx_0 * 1L) + range_nextIndex_0;
/* 123 */
/* 124 */         bhj_doConsume_0(range_value_0);
/* 125 */
/* 126 */         if (shouldStop()) {
/* 127 */           range_nextIndex_0 = range_value_0 + 1L;
/* 128 */           ((org.apache.spark.sql.execution.metric.SQLMetric) references[0] /* numOutputRows */).add(range_localIdx_0 + 1);
/* 129 */           range_inputMetrics_0.incRecordsRead(range_localIdx_0 + 1);
/* 130 */           return;
/* 131 */         }
/* 132 */
/* 133 */       }
/* 134 */       range_nextIndex_0 = range_batchEnd_0;
/* 135 */       ((org.apache.spark.sql.execution.metric.SQLMetric) references[0] /* numOutputRows */).add(range_localEnd_0);
/* 136 */       range_inputMetrics_0.incRecordsRead(range_localEnd_0);
/* 137 */       range_taskContext_0.killTaskIfInterrupted();
/* 138 */     }
/* 139 */   }
/* 140 */
/* 141 */ }
```

### Why are the changes needed?

Codegen shuffled hash join can help save CPU cost. We added shuffled hash join codegen internally in our fork, and seeing obvious improvement in benchmark compared to current non-codegen code path.

Test example query in [`JoinBenchmark`](https://github.com/apache/spark/blob/master/sql/core/src/test/scala/org/apache/spark/sql/execution/benchmark/JoinBenchmark.scala#L153), seeing 30% wall clock time improvement compared to existing non-codegen code path:

Enable shuffled hash join code-gen:

```
Running benchmark: shuffle hash join
  Running case: shuffle hash join wholestage off
  Stopped after 2 iterations, 1358 ms
  Running case: shuffle hash join wholestage on
  Stopped after 5 iterations, 2323 ms

Java HotSpot(TM) 64-Bit Server VM 1.8.0_181-b13 on Mac OS X 10.15.4
Intel(R) Core(TM) i9-9980HK CPU  2.40GHz
shuffle hash join:                        Best Time(ms)   Avg Time(ms)   Stdev(ms)    Rate(M/s)   Per Row(ns)   Relative
------------------------------------------------------------------------------------------------------------------------
shuffle hash join wholestage off                    649            679          43          6.5         154.7       1.0X
shuffle hash join wholestage on                     436            465          45          9.6         103.9       1.5X
```

Disable shuffled hash join codegen:

```
Running benchmark: shuffle hash join
  Running case: shuffle hash join wholestage off
  Stopped after 2 iterations, 1345 ms
  Running case: shuffle hash join wholestage on
  Stopped after 5 iterations, 2967 ms

Java HotSpot(TM) 64-Bit Server VM 1.8.0_181-b13 on Mac OS X 10.15.4
Intel(R) Core(TM) i9-9980HK CPU  2.40GHz
shuffle hash join:                        Best Time(ms)   Avg Time(ms)   Stdev(ms)    Rate(M/s)   Per Row(ns)   Relative
------------------------------------------------------------------------------------------------------------------------
shuffle hash join wholestage off                    646            673          37          6.5         154.1       1.0X
shuffle hash join wholestage on                     549            594          47          7.6         130.9       1.2X
```

### Does this PR introduce _any_ user-facing change?

No.

### How was this patch tested?

Added unit test in `WholeStageCodegenSuite`.

Closes #29277 from c21/codegen.

Authored-by: Cheng Su <chengsu@fb.com>
Signed-off-by: Wenchen Fan <wenchen@databricks.com>
2020-07-31 05:51:57 +00:00
HyukjinKwon e1d7321034 [SPARK-32478][R][SQL] Error message to show the schema mismatch in gapply with Arrow vectorization
### What changes were proposed in this pull request?

This PR proposes to:

1. Fix the error message when the output schema is misbatched with R DataFrame from the given function. For example,

    ```R
    df <- createDataFrame(list(list(a=1L, b="2")))
    count(gapply(df, "a", function(key, group) { group }, structType("a int, b int")))
    ```

    **Before:**

    ```
    Error in handleErrors(returnStatus, conn) :
      ...
      java.lang.UnsupportedOperationException
	    ...
    ```

    **After:**

    ```
    Error in handleErrors(returnStatus, conn) :
     ...
     java.lang.AssertionError: assertion failed: Invalid schema from gapply: expected IntegerType, IntegerType, got IntegerType, StringType
        ...
    ```

2. Update documentation about the schema matching for `gapply` and `dapply`.

### Why are the changes needed?

To show which schema is not matched, and let users know what's going on.

### Does this PR introduce _any_ user-facing change?

Yes, error message is updated as above, and documentation is updated.

### How was this patch tested?

Manually tested and unitttests were added.

Closes #29283 from HyukjinKwon/r-vectorized-error.

Authored-by: HyukjinKwon <gurwls223@apache.org>
Signed-off-by: HyukjinKwon <gurwls223@apache.org>
2020-07-30 15:16:02 +09:00
Max Gekk 99a855575c [SPARK-32431][SQL] Check duplicate nested columns in read from in-built datasources
### What changes were proposed in this pull request?
When `spark.sql.caseSensitive` is `false` (by default), check that there are not duplicate column names on the same level (top level or nested levels) in reading from in-built datasources Parquet, ORC, Avro and JSON. If such duplicate columns exist, throw the exception:
```
org.apache.spark.sql.AnalysisException: Found duplicate column(s) in the data schema:
```

### Why are the changes needed?
To make handling of duplicate nested columns is similar to handling of duplicate top-level columns i. e. output the same error when `spark.sql.caseSensitive` is `false`:
```Scala
org.apache.spark.sql.AnalysisException: Found duplicate column(s) in the data schema: `camelcase`
```

Checking of top-level duplicates was introduced by https://github.com/apache/spark/pull/17758.

### Does this PR introduce _any_ user-facing change?
Yes. For the example from SPARK-32431:

ORC:
```scala
java.io.IOException: Error reading file: file:/private/var/folders/p3/dfs6mf655d7fnjrsjvldh0tc0000gn/T/spark-c02c2f9a-0cdc-4859-94fc-b9c809ca58b1/part-00001-63e8c3f0-7131-4ec9-be02-30b3fdd276f4-c000.snappy.orc
	at org.apache.orc.impl.RecordReaderImpl.nextBatch(RecordReaderImpl.java:1329)
	at org.apache.orc.mapreduce.OrcMapreduceRecordReader.ensureBatch(OrcMapreduceRecordReader.java:78)
...
Caused by: java.io.EOFException: Read past end of RLE integer from compressed stream Stream for column 3 kind DATA position: 6 length: 6 range: 0 offset: 12 limit: 12 range 0 = 0 to 6 uncompressed: 3 to 3
	at org.apache.orc.impl.RunLengthIntegerReaderV2.readValues(RunLengthIntegerReaderV2.java:61)
	at org.apache.orc.impl.RunLengthIntegerReaderV2.next(RunLengthIntegerReaderV2.java:323)
```

JSON:
```scala
+------------+
|StructColumn|
+------------+
|        [,,]|
+------------+
```

Parquet:
```scala
+------------+
|StructColumn|
+------------+
|     [0,, 1]|
+------------+
```

Avro:
```scala
+------------+
|StructColumn|
+------------+
|        [,,]|
+------------+
```

After the changes, Parquet, ORC, JSON and Avro output the same error:
```scala
Found duplicate column(s) in the data schema: `camelcase`;
org.apache.spark.sql.AnalysisException: Found duplicate column(s) in the data schema: `camelcase`;
	at org.apache.spark.sql.util.SchemaUtils$.checkColumnNameDuplication(SchemaUtils.scala:112)
	at org.apache.spark.sql.util.SchemaUtils$.checkSchemaColumnNameDuplication(SchemaUtils.scala:51)
	at org.apache.spark.sql.util.SchemaUtils$.checkSchemaColumnNameDuplication(SchemaUtils.scala:67)
```

### How was this patch tested?
Run modified test suites:
```
$ build/sbt "sql/test:testOnly org.apache.spark.sql.FileBasedDataSourceSuite"
$ build/sbt "avro/test:testOnly org.apache.spark.sql.avro.*"
```
and added new UT to `SchemaUtilsSuite`.

Closes #29234 from MaxGekk/nested-case-insensitive-column.

Authored-by: Max Gekk <max.gekk@gmail.com>
Signed-off-by: Wenchen Fan <wenchen@databricks.com>
2020-07-30 06:05:55 +00:00
Wenchen Fan a025a89f4e [SPARK-32332][SQL] Support columnar exchanges
### What changes were proposed in this pull request?

This PR adds abstract classes for shuffle and broadcast, so that users can provide their columnar implementations.

This PR updates several places to use the abstract exchange classes, and also update `AdaptiveSparkPlanExec` so that the columnar rules can see exchange nodes.

This is an alternative of https://github.com/apache/spark/pull/29134 .
Close https://github.com/apache/spark/pull/29134

### Why are the changes needed?

To allow columnar exchanges.

### Does this PR introduce _any_ user-facing change?

no

### How was this patch tested?

new tests

Closes #29262 from cloud-fan/columnar.

Authored-by: Wenchen Fan <wenchen@databricks.com>
Signed-off-by: Thomas Graves <tgraves@apache.org>
2020-07-29 14:21:47 -05:00
LantaoJin 26e6574d58 [SPARK-32283][CORE] Kryo should support multiple user registrators
### What changes were proposed in this pull request?
`spark.kryo.registrator` in 3.0 has a regression problem. From [SPARK-12080](https://issues.apache.org/jira/browse/SPARK-12080), it supports multiple user registrators by
```scala
private val userRegistrators = conf.get("spark.kryo.registrator", "")
    .split(',').map(_.trim)
    .filter(!_.isEmpty)
```
But it donsn't work in 3.0. Fix it by `toSequence` in `Kryo.scala`

### Why are the changes needed?
In previous Spark version (2.x), it supported multiple user registrators by
```scala
private val userRegistrators = conf.get("spark.kryo.registrator", "")
    .split(',').map(_.trim)
    .filter(!_.isEmpty)
```
But it doesn't work in 3.0. It's should be a regression.

### Does this PR introduce _any_ user-facing change?
No

### How was this patch tested?
Existed unit tests.

Closes #29123 from LantaoJin/SPARK-32283.

Authored-by: LantaoJin <jinlantao@gmail.com>
Signed-off-by: Wenchen Fan <wenchen@databricks.com>
2020-07-29 03:58:03 +00:00
Terry Kim 45b7212fd3 [SPARK-32401][SQL] Migrate function related commands to use UnresolvedFunc to resolve function identifier
### What changes were proposed in this pull request?

This PR proposes to migrate the following function related commands to use `UnresolvedFunc` to resolve function identifier:
- DROP FUNCTION
- DESCRIBE FUNCTION
- SHOW FUNCTIONS

`DropFunctionStatement`, `DescribeFunctionStatement` and `ShowFunctionsStatement` logical plans are replaced with `DropFunction`, `DescribeFunction` and `ShowFunctions` logical plans respectively, and each contains `UnresolvedFunc` as its child so that it can be resolved in `Analyzer`.

### Why are the changes needed?

Migrating to the new resolution framework, which resolves `UnresolvedFunc` in `Analyzer`.

### Does this PR introduce _any_ user-facing change?

The message of exception thrown when a catalog is resolved to v2 has been merged to:
`function is only supported in v1 catalog`

Previously, it printed out the command used. E.g.,:
`CREATE FUNCTION is only supported in v1 catalog`

### How was this patch tested?

Updated existing tests.

Closes #29198 from imback82/function_framework.

Authored-by: Terry Kim <yuminkim@gmail.com>
Signed-off-by: Wenchen Fan <wenchen@databricks.com>
2020-07-29 03:55:48 +00:00
Max Gekk b2180c0950 [SPARK-32471][SQL][DOCS][TESTS][PYTHON][SS] Describe JSON option allowNonNumericNumbers
### What changes were proposed in this pull request?
1. Describe the JSON option `allowNonNumericNumbers` which is used in read
2. Add new test cases for allowed JSON field values: NaN, +INF, +Infinity, Infinity, -INF and -Infinity

### Why are the changes needed?
To improve UX with Spark SQL and to provide users full info about the supported option.

### Does this PR introduce _any_ user-facing change?
Yes, in PySpark.

### How was this patch tested?
Added new test to `JsonParsingOptionsSuite`

Closes #29275 from MaxGekk/allowNonNumericNumbers-doc.

Authored-by: Max Gekk <max.gekk@gmail.com>
Signed-off-by: HyukjinKwon <gurwls223@apache.org>
2020-07-29 12:14:13 +09:00
Max Gekk c28da672f8 [SPARK-32382][SQL] Override table renaming in JDBC dialects
### What changes were proposed in this pull request?
Override the default implementation of `JdbcDialect.renameTable()`:
```scala
s"ALTER TABLE $oldTable RENAME TO $newTable"
```
in the following JDBC dialects according to official documentation:
- DB2
- Derby
- MS SQL Server
- Teradata

Other dialects follow the default implementation:
- MySQL: https://dev.mysql.com/doc/refman/8.0/en/alter-table.html
- Oracle: https://docs.oracle.com/en/database/oracle/oracle-database/19/sqlrf/ALTER-TABLE.html#GUID-552E7373-BF93-477D-9DA3-B2C9386F2877
- PostgreSQL: https://www.postgresql.org/docs/12/sql-altertable.html

### Why are the changes needed?
To have correct implementation of table renaming for all supported JDBC dialects.

### Does this PR introduce _any_ user-facing change?
Yes

### How was this patch tested?
Manually

Closes #29237 from MaxGekk/jdbc-rename-table.

Authored-by: Max Gekk <max.gekk@gmail.com>
Signed-off-by: Wenchen Fan <wenchen@databricks.com>
2020-07-28 12:34:10 +00:00
yi.wu ca1ecf7f9f [SPARK-32459][SQL] Support WrappedArray as customCollectionCls in MapObjects
### What changes were proposed in this pull request?

This PR supports `WrappedArray` as `customCollectionCls` in `MapObjects`.

### Why are the changes needed?

This helps fix the regression caused by SPARK-31826.  For the following test, it can pass in branch-3.0 but fail in master branch:

```scala
test("WrappedArray") {
    val myUdf = udf((a: WrappedArray[Int]) =>
      WrappedArray.make[Int](Array(a.head + 99)))
    checkAnswer(Seq(Array(1))
      .toDF("col")
      .select(myUdf(Column("col"))),
      Row(ArrayBuffer(100)))
  }
```

In SPARK-31826, we've changed the catalyst-to-scala converter from `CatalystTypeConverters` to `ExpressionEncoder.deserializer`. However, `CatalystTypeConverters` supports `WrappedArray` while `ExpressionEncoder.deserializer` doesn't.

### Does this PR introduce _any_ user-facing change?

No,  SPARK-31826 is merged into master and branch-3.1, which haven't been released.

### How was this patch tested?

Added a new test for `WrappedArray` in `UDFSuite`; Also updated `ObjectExpressionsSuite` for `MapObjects`.

Closes #29261 from Ngone51/fix-wrappedarray.

Authored-by: yi.wu <yi.wu@databricks.com>
Signed-off-by: Wenchen Fan <wenchen@databricks.com>
2020-07-28 12:24:15 +00:00
xuewei.linxuewei 12b9787a7f [SPARK-32290][SQL] SingleColumn Null Aware Anti Join Optimize
### What changes were proposed in this pull request?
Normally, a Null aware anti join will be planed into BroadcastNestedLoopJoin which is very time consuming, for instance, in TPCH Query 16.

```
select
    p_brand,
    p_type,
    p_size,
    count(distinct ps_suppkey) as supplier_cnt
from
    partsupp,
    part
where
    p_partkey = ps_partkey
    and p_brand <> 'Brand#45'
    and p_type not like 'MEDIUM POLISHED%'
    and p_size in (49, 14, 23, 45, 19, 3, 36, 9)
    and ps_suppkey not in (
        select
            s_suppkey
        from
            supplier
        where
            s_comment like '%Customer%Complaints%'
    )
group by
    p_brand,
    p_type,
    p_size
order by
    supplier_cnt desc,
    p_brand,
    p_type,
    p_size
```

In above query, will planed into

LeftAnti
    condition Or((ps_suppkey=s_suppkey), IsNull(ps_suppkey=s_suppkey))

Inside BroadcastNestedLoopJoinExec will perform O(M\*N), BUT if there is only single column in NAAJ, we can always change buildSide into a HashSet, and streamedSide just need to lookup in the HashSet, then the calculation will be optimized into O(M).

But this optimize is only targeting on null aware anti join with single column case, because multi-column support is much more complicated, we might be able to support multi-column in future.
After apply this patch, the TPCH Query 16 performance decrease from 41mins to 30s

The semantic of null-aware anti join is:

![image](https://user-images.githubusercontent.com/17242071/88077041-66a39a00-cbad-11ea-8fb6-c235c4d219b4.png)

### Why are the changes needed?
TPCH is a common benchmark for distributed compute engine, all other 21 Query works fine on Spark, except for Query 16, apply this patch will make Spark more competitive among all these popular engine. BTW, this patch has restricted rules and only apply on NAAJ Single Column case, which is safe enough.

### Does this PR introduce _any_ user-facing change?
No.

### How was this patch tested?
1. SQLQueryTestSuite with NOT IN keyword SQL, add CONFIG_DIM with spark.sql.optimizeNullAwareAntiJoin on and off
2. added case in org.apache.spark.sql.JoinSuite.
3. added case in org.apache.spark.sql.SubquerySuite.
3. Compare performance before and after applying this patch against TPCH Query 16.
4. config combination against e2e test with following

```
Map(
  "spark.sql.optimizeNullAwareAntiJoin" -> "true",
  "spark.sql.adaptive.enabled" -> "false",
  "spark.sql.codegen.wholeStage" -> "false"
),
Map(
  "sspark.sql.optimizeNullAwareAntiJoin" -> "true",
  "spark.sql.adaptive.enabled" -> "false",
  "spark.sql.codegen.wholeStage" -> "true"
),
Map(
  "spark.sql.optimizeNullAwareAntiJoin" -> "true",
  "spark.sql.adaptive.enabled" -> "true",
  "spark.sql.codegen.wholeStage" -> "false"
),
Map(
  "spark.sql.optimizeNullAwareAntiJoin" -> "true",
  "spark.sql.adaptive.enabled" -> "true",
  "spark.sql.codegen.wholeStage" -> "true"
)
```

Closes #29104 from leanken/leanken-SPARK-32290.

Authored-by: xuewei.linxuewei <xuewei.linxuewei@alibaba-inc.com>
Signed-off-by: Wenchen Fan <wenchen@databricks.com>
2020-07-28 04:42:15 +00:00
Frank Yin 8323c8eb56 [SPARK-32059][SQL] Allow nested schema pruning thru window/sort plans
### What changes were proposed in this pull request?
This PR is intended to solve schema pruning not working with window functions, as described in SPARK-32059. It also solved schema pruning not working with `Sort`. It also generalizes with `Project->Filter->[any node can be pruned]`.

### Why are the changes needed?
This is needed because of performance issues with nested structures with querying using window functions as well as sorting.

### Does this PR introduce _any_ user-facing change?
No.

### How was this patch tested?
Introduced two tests: 1) optimizer planning level 2) end-to-end tests with SQL queries.

Closes #28898 from frankyin-factual/master.

Authored-by: Frank Yin <frank@factual.com>
Signed-off-by: Takeshi Yamamuro <yamamuro@apache.org>
2020-07-28 10:00:21 +09:00
Kent Yao d315ebf3a7 [SPARK-32424][SQL] Fix silent data change for timestamp parsing if overflow happens
### What changes were proposed in this pull request?

When using `Seconds.toMicros` API to convert epoch seconds to microseconds,

```scala
 /**
     * Equivalent to
     * {link #convert(long, TimeUnit) MICROSECONDS.convert(duration, this)}.
     * param duration the duration
     * return the converted duration,
     * or {code Long.MIN_VALUE} if conversion would negatively
     * overflow, or {code Long.MAX_VALUE} if it would positively overflow.
     */
```
This PR change it to `Math.multiplyExact(epochSeconds, MICROS_PER_SECOND)`

### Why are the changes needed?

fix silent data change between 3.x and 2.x
```
 ~/Downloads/spark/spark-3.1.0-SNAPSHOT-bin-20200722   bin/spark-sql -S -e "select to_timestamp('300000', 'y');"
+294247-01-10 12:00:54.775807
```
```
 kentyaohulk  ~/Downloads/spark/spark-2.4.5-bin-hadoop2.7  bin/spark-sql -S  -e "select to_timestamp('300000', 'y');"
284550-10-19 15:58:1010.448384
```

### Does this PR introduce _any_ user-facing change?

Yes, we will raise `ArithmeticException` instead of giving the wrong answer if overflow.

### How was this patch tested?

add unit test

Closes #29220 from yaooqinn/SPARK-32424.

Authored-by: Kent Yao <yaooqinn@hotmail.com>
Signed-off-by: Wenchen Fan <wenchen@databricks.com>
2020-07-27 17:03:14 +00:00
Cheng Su 548b7db345 [SPARK-32420][SQL] Add handling for unique key in non-codegen hash join
### What changes were proposed in this pull request?

`HashRelation` has two separate code paths for unique key look up and non-unique key look up E.g. in its subclass [`UnsafeHashedRelation`](https://github.com/apache/spark/blob/master/sql/core/src/main/scala/org/apache/spark/sql/execution/joins/HashedRelation.scala#L144-L177), unique key look up is more efficient as it does not have e.g. extra `Iterator[UnsafeRow].hasNext()/next()` overhead per row.

`BroadcastHashJoinExec` has handled unique key vs non-unique key separately in [code-gen path](https://github.com/apache/spark/blob/master/sql/core/src/main/scala/org/apache/spark/sql/execution/joins/BroadcastHashJoinExec.scala#L289-L321). But the non-codegen path for broadcast hash join and shuffled hash join do not separate it yet, so adding the support here.

### Why are the changes needed?

Shuffled hash join and non-codegen broadcast hash join still rely on this code path for execution. So this PR will help save CPU for executing this two type of join. Adding codegen for shuffled hash join would be a different topic and I will add it in https://issues.apache.org/jira/browse/SPARK-32421 .

Ran the same query as [`JoinBenchmark`](https://github.com/apache/spark/blob/master/sql/core/src/test/scala/org/apache/spark/sql/execution/benchmark/JoinBenchmark.scala#L153-L167), with enabling and disabling this feature. Verified 20% wall clock time improvement (switch control and test group order as well to verify the improvement to not be the noise).

```
Running benchmark: shuffle hash join
  Running case: shuffle hash join unique key SHJ off
  Stopped after 5 iterations, 4039 ms
  Running case: shuffle hash join unique key SHJ on
  Stopped after 5 iterations, 2898 ms

Java HotSpot(TM) 64-Bit Server VM 1.8.0_181-b13 on Mac OS X 10.15.4
Intel(R) Core(TM) i9-9980HK CPU  2.40GHz
shuffle hash join:                        Best Time(ms)   Avg Time(ms)   Stdev(ms)    Rate(M/s)   Per Row(ns)   Relative
------------------------------------------------------------------------------------------------------------------------
shuffle hash join unique key SHJ off                707            808          81          5.9         168.6       1.0X
shuffle hash join unique key SHJ on                 547            580          50          7.7         130.4       1.3X
```

```
Running benchmark: shuffle hash join
  Running case: shuffle hash join unique key SHJ on
  Stopped after 5 iterations, 3333 ms
  Running case: shuffle hash join unique key SHJ off
  Stopped after 5 iterations, 4268 ms

Java HotSpot(TM) 64-Bit Server VM 1.8.0_181-b13 on Mac OS X 10.15.4
Intel(R) Core(TM) i9-9980HK CPU  2.40GHz
shuffle hash join:                        Best Time(ms)   Avg Time(ms)   Stdev(ms)    Rate(M/s)   Per Row(ns)   Relative
------------------------------------------------------------------------------------------------------------------------
shuffle hash join unique key SHJ on                 565            667          60          7.4         134.8       1.0X
shuffle hash join unique key SHJ off                774            854          85          5.4         184.4       0.7X
```

### Does this PR introduce _any_ user-facing change?

No.

### How was this patch tested?

* Added test in `OuterJoinSuite` to cover left outer and right outer join.
* Added test in `ExistenceJoinSuite` to cover left semi join, and existence join.
* [Existing `joinSuite` already covered inner join.](https://github.com/apache/spark/blob/master/sql/core/src/test/scala/org/apache/spark/sql/JoinSuite.scala#L182)
* [Existing `ExistenceJoinSuite` already covered left anti join, and existence join.](https://github.com/apache/spark/blob/master/sql/core/src/test/scala/org/apache/spark/sql/execution/joins/ExistenceJoinSuite.scala#L228)

Closes #29216 from c21/unique-key.

Authored-by: Cheng Su <chengsu@fb.com>
Signed-off-by: Wenchen Fan <wenchen@databricks.com>
2020-07-27 17:01:03 +00:00
SaurabhChawla 99f33ec30f [SPARK-32234][FOLLOWUP][SQL] Update the description of utility method
### What changes were proposed in this pull request?
As the part of this PR https://github.com/apache/spark/pull/29045 added the helper method. This PR is the FOLLOWUP PR to update the description of helper method.

### Why are the changes needed?
For better readability and understanding of the code

### Does this PR introduce _any_ user-facing change?
No

### How was this patch tested?
Since its only change of updating the description , So ran the Spark shell

Closes #29232 from SaurabhChawla100/SPARK-32234-Desc.

Authored-by: SaurabhChawla <s.saurabhtim@gmail.com>
Signed-off-by: Wenchen Fan <wenchen@databricks.com>
2020-07-27 08:14:02 +00:00
Cheng Su 01cf8a4ce8 [SPARK-32383][SQL] Preserve hash join (BHJ and SHJ) stream side ordering
### What changes were proposed in this pull request?

Currently `BroadcastHashJoinExec` and `ShuffledHashJoinExec` do not preserve children output ordering information (inherit from `SparkPlan.outputOrdering`, which is Nil). This can add unnecessary sort in complex queries involved multiple joins.

Example:

```
withSQLConf(
      SQLConf.AUTO_BROADCASTJOIN_THRESHOLD.key -> "50") {
      val df1 = spark.range(100).select($"id".as("k1"))
      val df2 = spark.range(100).select($"id".as("k2"))
      val df3 = spark.range(3).select($"id".as("k3"))
      val df4 = spark.range(100).select($"id".as("k4"))
      val plan = df1.join(df2, $"k1" === $"k2")
        .join(df3, $"k1" === $"k3")
        .join(df4, $"k1" === $"k4")
        .queryExecution
        .executedPlan
}
```

Current physical plan (extra sort on `k1` before top sort merge join):

```
*(9) SortMergeJoin [k1#220L], [k4#232L], Inner
:- *(6) Sort [k1#220L ASC NULLS FIRST], false, 0
:  +- *(6) BroadcastHashJoin [k1#220L], [k3#228L], Inner, BuildRight
:     :- *(6) SortMergeJoin [k1#220L], [k2#224L], Inner
:     :  :- *(2) Sort [k1#220L ASC NULLS FIRST], false, 0
:     :  :  +- Exchange hashpartitioning(k1#220L, 5), true, [id=#128]
:     :  :     +- *(1) Project [id#218L AS k1#220L]
:     :  :        +- *(1) Range (0, 100, step=1, splits=2)
:     :  +- *(4) Sort [k2#224L ASC NULLS FIRST], false, 0
:     :     +- Exchange hashpartitioning(k2#224L, 5), true, [id=#134]
:     :        +- *(3) Project [id#222L AS k2#224L]
:     :           +- *(3) Range (0, 100, step=1, splits=2)
:     +- BroadcastExchange HashedRelationBroadcastMode(List(input[0, bigint, false])), [id=#141]
:        +- *(5) Project [id#226L AS k3#228L]
:           +- *(5) Range (0, 3, step=1, splits=2)
+- *(8) Sort [k4#232L ASC NULLS FIRST], false, 0
   +- Exchange hashpartitioning(k4#232L, 5), true, [id=#148]
      +- *(7) Project [id#230L AS k4#232L]
         +- *(7) Range (0, 100, step=1, splits=2)
```

Ideal physical plan (no extra sort on `k1` before top sort merge join):

```
*(9) SortMergeJoin [k1#220L], [k4#232L], Inner
:- *(6) BroadcastHashJoin [k1#220L], [k3#228L], Inner, BuildRight
:  :- *(6) SortMergeJoin [k1#220L], [k2#224L], Inner
:  :  :- *(2) Sort [k1#220L ASC NULLS FIRST], false, 0
:  :  :  +- Exchange hashpartitioning(k1#220L, 5), true, [id=#127]
:  :  :     +- *(1) Project [id#218L AS k1#220L]
:  :  :        +- *(1) Range (0, 100, step=1, splits=2)
:  :  +- *(4) Sort [k2#224L ASC NULLS FIRST], false, 0
:  :     +- Exchange hashpartitioning(k2#224L, 5), true, [id=#133]
:  :        +- *(3) Project [id#222L AS k2#224L]
:  :           +- *(3) Range (0, 100, step=1, splits=2)
:  +- BroadcastExchange HashedRelationBroadcastMode(List(input[0, bigint, false])), [id=#140]
:     +- *(5) Project [id#226L AS k3#228L]
:        +- *(5) Range (0, 3, step=1, splits=2)
+- *(8) Sort [k4#232L ASC NULLS FIRST], false, 0
   +- Exchange hashpartitioning(k4#232L, 5), true, [id=#146]
      +- *(7) Project [id#230L AS k4#232L]
         +- *(7) Range (0, 100, step=1, splits=2)
```

### Why are the changes needed?

To avoid unnecessary sort in query, and it has most impact when users read sorted bucketed table.
Though the unnecessary sort is operating on already sorted data, it would have obvious negative impact on IO and query run time if the data is large and external sorting happens.

### Does this PR introduce _any_ user-facing change?

No.

### How was this patch tested?

Added unit test in `JoinSuite`.

Closes #29181 from c21/ordering.

Authored-by: Cheng Su <chengsu@fb.com>
Signed-off-by: Wenchen Fan <wenchen@databricks.com>
2020-07-27 04:51:32 +00:00