Commit graph

27787 commits

Author SHA1 Message Date
Kousuke Saruta 4e267f3eb9 [SPARK-32538][CORE][TEST] Use local time zone for the timestamp logged in unit-tests.log
### What changes were proposed in this pull request?

This PR lets the logger log timestamp based on local time zone during test.
`SparkFunSuite` fixes the default time zone to America/Los_Angeles so the timestamp logged in unit-tests.log is also based on the fixed time zone.

### Why are the changes needed?

It's confusable for developers whose time zone is not America/Los_Angeles.

### Does this PR introduce _any_ user-facing change?

No.

### How was this patch tested?

Run existing tests and confirmed uint-tests.log.
If your local time zone is America/Los_Angeles, you can test by setting the environment variable `TZ` like as follows.
```
$ TZ=Asia/Tokyo build/sbt "testOnly org.apache.spark.executor.ExecutorSuite"
$ tail core/target/unit-tests.log
```

Closes #29356 from sarutak/fix-unit-test-log-timezone.

Authored-by: Kousuke Saruta <sarutak@oss.nttdata.com>
Signed-off-by: Takeshi Yamamuro <yamamuro@apache.org>
2020-08-07 11:29:18 +09:00
Huaxin Gao 75c2c53e93 [SPARK-32506][TESTS] Flaky test: StreamingLinearRegressionWithTests
### What changes were proposed in this pull request?
The test creates 10 batches of data  to train the model and expects to see error on test data improves as model is trained. If the difference between the 2nd error and the 10th error is smaller than 2, the assertion fails:
```
FAIL: test_train_prediction (pyspark.mllib.tests.test_streaming_algorithms.StreamingLinearRegressionWithTests)
Test that error on test data improves as model is trained.
----------------------------------------------------------------------
Traceback (most recent call last):
  File "/home/runner/work/spark/spark/python/pyspark/mllib/tests/test_streaming_algorithms.py", line 466, in test_train_prediction
    eventually(condition, timeout=180.0)
  File "/home/runner/work/spark/spark/python/pyspark/testing/utils.py", line 81, in eventually
    lastValue = condition()
  File "/home/runner/work/spark/spark/python/pyspark/mllib/tests/test_streaming_algorithms.py", line 461, in condition
    self.assertGreater(errors[1] - errors[-1], 2)
AssertionError: 1.672640157855923 not greater than 2
```
I saw this quite a few time on Jenkins but was not able to reproduce this on my local. These are the ten errors I got:
```
4.517395047937127
4.894265404350079
3.0392090466559876
1.8786361640757654
0.8973106042078115
0.3715780507684368
0.20815690742907672
0.17333033743125845
0.15686783249863873
0.12584413600569616
```
I am thinking of having 15 batches of data instead of 10, so the model can be trained for a longer time. Hopefully the 15th error - 2nd error will always be larger than 2 on Jenkins. These are the 15 errors I got on my local:
```
4.517395047937127
4.894265404350079
3.0392090466559876
1.8786361640757658
0.8973106042078115
0.3715780507684368
0.20815690742907672
0.17333033743125845
0.15686783249863873
0.12584413600569616
0.11883853835108477
0.09400261862100823
0.08887491447353497
0.05984929624986607
0.07583948141520978
```

### Why are the changes needed?
Fix flaky test

### Does this PR introduce _any_ user-facing change?
No

### How was this patch tested?
Manually tested

Closes #29380 from huaxingao/flaky_test.

Authored-by: Huaxin Gao <huaxing@us.ibm.com>
Signed-off-by: Huaxin Gao <huaxing@us.ibm.com>
2020-08-06 13:54:15 -07:00
Max Gekk 6664e282f6 [SPARK-32546][SQL][FOLLOWUP] Add .toSeq to tableNames in HiveClientImpl.listTablesByType
### What changes were proposed in this pull request?
Explicitly convert `tableNames` to `Seq` in `HiveClientImpl.listTablesByType` as it was done by c28a6fa511 (diff-6fd847124f8eae45ba2de1cf7d6296feR769)

### Why are the changes needed?
See this PR https://github.com/apache/spark/pull/29111, to compile by Scala 2.13. The changes were discarded by https://github.com/apache/spark/pull/29363 accidentally.

### Does this PR introduce _any_ user-facing change?
No

### How was this patch tested?
Compiling by Scala 2.13

Closes #29379 from MaxGekk/fix-listTablesByType-for-views-followup.

Authored-by: Max Gekk <max.gekk@gmail.com>
Signed-off-by: Wenchen Fan <wenchen@databricks.com>
2020-08-06 14:23:56 +00:00
Max Gekk dc96f2f8d6 [SPARK-32546][SQL] Get table names directly from Hive tables
### What changes were proposed in this pull request?
Get table names directly from a sequence of Hive tables in `HiveClientImpl.listTablesByType()` by skipping conversions Hive tables to Catalog tables.

### Why are the changes needed?
A Hive metastore can be shared across many clients. A client can create tables using a SerDe which is not available on other clients, for instance `ROW FORMAT SERDE "com.ibm.spss.hive.serde2.xml.XmlSerDe"`. In the current implementation, other clients get the following exception while getting views:
```
java.lang.RuntimeException: MetaException(message:java.lang.ClassNotFoundException Class com.ibm.spss.hive.serde2.xml.XmlSerDe not found)
```
when `com.ibm.spss.hive.serde2.xml.XmlSerDe` is not available.

### Does this PR introduce _any_ user-facing change?
Yes. For example, `SHOW VIEWS` returns a list of views instead of throwing an exception.

### How was this patch tested?
- By existing test suites like:
```
$ build/sbt -Phive-2.3 "test:testOnly org.apache.spark.sql.hive.client.VersionsSuite"
```
- And manually:

1. Build Spark with Hive 1.2: `./build/sbt package -Phive-1.2 -Phive -Dhadoop.version=2.8.5`

2. Run spark-shell with a custom Hive SerDe, for instance download [json-serde-1.3.8-jar-with-dependencies.jar](https://github.com/cdamak/Twitter-Hive/blob/master/json-serde-1.3.8-jar-with-dependencies.jar) from https://github.com/cdamak/Twitter-Hive:
```
$ ./bin/spark-shell --jars ../Downloads/json-serde-1.3.8-jar-with-dependencies.jar
```

3. Create a Hive table using this SerDe:
```scala
scala> :paste
// Entering paste mode (ctrl-D to finish)

sql(s"""
  |CREATE TABLE json_table2(page_id INT NOT NULL)
  |ROW FORMAT SERDE 'org.openx.data.jsonserde.JsonSerDe'
  |""".stripMargin)

// Exiting paste mode, now interpreting.
res0: org.apache.spark.sql.DataFrame = []

scala> sql("SHOW TABLES").show
+--------+-----------+-----------+
|database|  tableName|isTemporary|
+--------+-----------+-----------+
| default|json_table2|      false|
+--------+-----------+-----------+

scala> sql("SHOW VIEWS").show
+---------+--------+-----------+
|namespace|viewName|isTemporary|
+---------+--------+-----------+
+---------+--------+-----------+
```

4. Quit from the current `spark-shell` and run it without jars:
```
$ ./bin/spark-shell
```

5. Show views. Without the fix, it throws the exception:
```scala
scala> sql("SHOW VIEWS").show
20/08/06 10:53:36 ERROR log: error in initSerDe: java.lang.ClassNotFoundException Class org.openx.data.jsonserde.JsonSerDe not found
java.lang.ClassNotFoundException: Class org.openx.data.jsonserde.JsonSerDe not found
	at org.apache.hadoop.conf.Configuration.getClassByName(Configuration.java:2273)
	at org.apache.hadoop.hive.metastore.MetaStoreUtils.getDeserializer(MetaStoreUtils.java:385)
	at org.apache.hadoop.hive.ql.metadata.Table.getDeserializerFromMetaStore(Table.java:276)
	at org.apache.hadoop.hive.ql.metadata.Table.getDeserializer(Table.java:258)
	at org.apache.hadoop.hive.ql.metadata.Table.getCols(Table.java:605)
```

After the fix:
```scala
scala> sql("SHOW VIEWS").show
+---------+--------+-----------+
|namespace|viewName|isTemporary|
+---------+--------+-----------+
+---------+--------+-----------+
```

Closes #29363 from MaxGekk/fix-listTablesByType-for-views.

Authored-by: Max Gekk <max.gekk@gmail.com>
Signed-off-by: Wenchen Fan <wenchen@databricks.com>
2020-08-06 08:35:59 +00:00
Gengliang Wang e93b8f02cd [SPARK-32539][INFRA] Disallow FileSystem.get(Configuration conf) in style check by default
### What changes were proposed in this pull request?

Disallow `FileSystem.get(Configuration conf)` in Scala style check by default and suggest developers use `FileSystem.get(URI uri, Configuration conf)` or `Path.getFileSystem()` instead.

### Why are the changes needed?

The method `FileSystem.get(Configuration conf)` will return a default FileSystem instance if the conf `fs.file.impl` is not set. This can cause file not found exception on reading a target path of non-default file system, e.g. S3. It is hard to discover such a mistake via unit tests.
If we disallow it in Scala style check by default and suggest developers use `FileSystem.get(URI uri, Configuration conf)` or `Path.getFileSystem(Configuration conf)`, we can reduce potential regression and PR review effort.

### Does this PR introduce _any_ user-facing change?

No

### How was this patch tested?

Manually run scala style check and test.

Closes #29357 from gengliangwang/newStyleRule.

Authored-by: Gengliang Wang <gengliang.wang@databricks.com>
Signed-off-by: Wenchen Fan <wenchen@databricks.com>
2020-08-06 05:56:59 +00:00
yi.wu 7f275ee597 [SPARK-32518][CORE] CoarseGrainedSchedulerBackend.maxNumConcurrentTasks should consider all kinds of resources
### What changes were proposed in this pull request?

1.  Make `CoarseGrainedSchedulerBackend.maxNumConcurrentTasks()` considers all kinds of resources when calculating the max concurrent tasks

2. Refactor `calculateAvailableSlots()` to make it be able to be used for both `CoarseGrainedSchedulerBackend` and `TaskSchedulerImpl`

### Why are the changes needed?

Currently, `CoarseGrainedSchedulerBackend.maxNumConcurrentTasks()` only considers the CPU for the max concurrent tasks. This can cause the application to hang when a barrier stage requires extra custom resources but the cluster doesn't have enough corresponding resources. Because, without the checking for other custom resources in `maxNumConcurrentTasks`, the barrier stage can be submitted to the `TaskSchedulerImpl`. But the `TaskSchedulerImpl` won't launch tasks for the barrier stage due to the insufficient task slots calculated by `TaskSchedulerImpl.calculateAvailableSlots` (which does check all kinds of resources).

The application hang issue can be reproduced by the added unit test.

### Does this PR introduce _any_ user-facing change?

Yes. In case of a barrier stage requires more custom resources than the cluster has, the application can get hang before this PR but can fail due to insufficient resources at the end after this PR.

### How was this patch tested?

Added a unit test.

Closes #29332 from Ngone51/fix-slots.

Authored-by: yi.wu <yi.wu@databricks.com>
Signed-off-by: Wenchen Fan <wenchen@databricks.com>
2020-08-06 05:39:47 +00:00
Holden Karau 375d348a83 [SPARK-31197][CORE] Shutdown executor once we are done decommissioning
### What changes were proposed in this pull request?

Exit the executor when it has been asked to decommission and there is nothing left for it to do.

This is a rebase of https://github.com/apache/spark/pull/28817

### Why are the changes needed?

If we want to use decommissioning in Spark's own scale down we should terminate the executor once finished.
Furthermore, in graceful shutdown it makes sense to release resources we no longer need if we've been asked to shutdown by the cluster manager instead of always holding the resources as long as possible.

### Does this PR introduce _any_ user-facing change?

The decommissioned executors will exit and the end of decommissioning. This is sort of a user facing change, however decommissioning hasn't been in any releases yet.

### How was this patch tested?

I changed the unit test to not send the executor exit message and still wait on the executor exited message.

Closes #29211 from holdenk/SPARK-31197-exit-execs-redone.

Authored-by: Holden Karau <hkarau@apple.com>
Signed-off-by: Holden Karau <hkarau@apple.com>
2020-08-05 16:28:14 -07:00
Yan Xiaole c1d17df826 [SPARK-32529][CORE] Fix Historyserver log scan aborted by application status change
# What changes were proposed in this pull request?
This PR adds a `FileNotFoundException` try catch block while adding a new entry to history server application listing to skip the non-existing path.

### Why are the changes needed?
If there are a large number (>100k) of applications log dir, listing the log dir will take a few seconds. After getting the path list some applications might have finished already, and the filename will change from `foo.inprogress` to `foo`.

It leads to a problem when adding an entry to the listing, querying file status like `fileSizeForLastIndex` will throw out a `FileNotFoundException` exception if the application was finished. And the exception will abort current loop, in a busy cluster, it will make history server couldn't list and load any application log.

```
20/08/03 15:17:23 ERROR FsHistoryProvider: Exception in checking for event log updates
 java.io.FileNotFoundException: File does not exist: hdfs://xx/logs/spark/application_11111111111111.lz4.inprogress
 at org.apache.hadoop.hdfs.DistributedFileSystem$29.doCall(DistributedFileSystem.java:1527)
 at org.apache.hadoop.hdfs.DistributedFileSystem$29.doCall(DistributedFileSystem.java:1520)
 at org.apache.hadoop.fs.FileSystemLinkResolver.resolve(FileSystemLinkResolver.java:81)
 at org.apache.hadoop.hdfs.DistributedFileSystem.getFileStatus(DistributedFileSystem.java:1520)
 at org.apache.spark.deploy.history.SingleFileEventLogFileReader.status$lzycompute(EventLogFileReaders.scala:170)
```

### Does this PR introduce _any_ user-facing change?
No

### How was this patch tested?
1. setup another script keeps changing the filename of applications under history log dir
2. launch the history server
3. check whether the `File does not exist` error log was gone.

Closes #29350 from yanxiaole/SPARK-32529.

Authored-by: Yan Xiaole <xiaole.yan@gmail.com>
Signed-off-by: Dongjoon Hyun <dongjoon@apache.org>
2020-08-05 10:57:11 -07:00
HyukjinKwon 42219af906 [SPARK-32543][R] Remove arrow::as_tibble usage in SparkR
### What changes were proposed in this pull request?

SparkR increased the minimal version of Arrow R version to 1.0.0 at SPARK-32452, and Arrow R 0.14 dropped `as_tibble`. We can remove the usage in SparkR.

### Why are the changes needed?

To remove codes unused anymore.

### Does this PR introduce _any_ user-facing change?

No.

### How was this patch tested?

GitHub Actions will test them out.

Closes #29361 from HyukjinKwon/SPARK-32543.

Authored-by: HyukjinKwon <gurwls223@apache.org>
Signed-off-by: Dongjoon Hyun <dongjoon@apache.org>
2020-08-05 10:35:03 -07:00
Michael Munday 4a0427cbc1 [SPARK-32485][SQL][TEST] Fix endianness issues in tests in RecordBinaryComparatorSuite
### What changes were proposed in this pull request?
PR #26548 means that RecordBinaryComparator now uses big endian
byte order for long comparisons. However, this means that some of
the constants in the regression tests no longer map to the same
values in the comparison that they used to.

For example, one of the tests does a comparison between
Long.MIN_VALUE and 1 in order to trigger an overflow condition that
existed in the past (i.e. Long.MIN_VALUE - 1). These constants
correspond to the values 0x80..00 and 0x00..01. However on a
little-endian machine the bytes in these values are now swapped
before they are compared. This means that we will now be comparing
0x00..80 with 0x01..00. 0x00..80 - 0x01..00 does not overflow
therefore missing the original purpose of the test.

To fix this the constants are now explicitly written out in big
endian byte order to match the byte order used in the comparison.
This also fixes the tests on big endian machines (which would
otherwise get a different comparison result to the little-endian
machines).

### Why are the changes needed?
The regression tests no longer serve their initial purposes and also fail on big-endian systems.

### Does this PR introduce _any_ user-facing change?
No.

### How was this patch tested?
Tests run on big-endian system (s390x).

Closes #29259 from mundaym/fix-endian.

Authored-by: Michael Munday <mike.munday@ibm.com>
Signed-off-by: Wenchen Fan <wenchen@databricks.com>
2020-08-05 16:11:09 +00:00
Kent Yao 1b6f482adb [SPARK-32492][SQL][FOLLOWUP][TEST-MAVEN] Fix jenkins maven jobs
### What changes were proposed in this pull request?

The newly added test fails Jenkins maven jobs, see https://github.com/apache/spark/pull/29303#discussion_r464729021

We move the test from `ThriftServerWithSparkContextSuite` to `SparkMetadataOperationSuite`, the former uses an embedded thrift server where the server and the client are in the same JVM process and the latter forks a new process to start the server where the server and client are isolated.
The sbt runner seems to be fine with the test in the `ThriftServerWithSparkContextSuite`, but the maven runner with `scalates`t plugin will face the classloader issue as we will switch classloader to the one in the `sharedState` which is not the one that hive uses to load some classes. This is more like an issue that belongs to the maven runner or the `scalatest`.
So in this PR, we simply move it to bypass the issue.

BTW, we should test against the way of using embedded thrift server to verify whether it is just a maven issue or not, there could be some use cases with this API.

### Why are the changes needed?

Jenkins recovery

### Does this PR introduce _any_ user-facing change?

no

### How was this patch tested?

modified uts

Closes #29347 from yaooqinn/SPARK-32492-F.

Authored-by: Kent Yao <yaooqinn@hotmail.com>
Signed-off-by: Wenchen Fan <wenchen@databricks.com>
2020-08-05 13:56:11 +00:00
Max Gekk 3a437ed22b [SPARK-32501][SQL] Convert null to "null" in structs, maps and arrays while casting to strings
### What changes were proposed in this pull request?
Convert `NULL` elements of maps, structs and arrays to the `"null"` string while converting maps/struct/array values to strings. The SQL config `spark.sql.legacy.omitNestedNullInCast.enabled` controls the behaviour. When it is `true`, `NULL` elements of structs/maps/arrays will be omitted otherwise, when it is `false`, `NULL` elements will be converted to `"null"`.

### Why are the changes needed?
1. It is impossible to distinguish empty string and null, for instance:
```scala
scala> Seq(Seq(""), Seq(null)).toDF().show
+-----+
|value|
+-----+
|   []|
|   []|
+-----+
```
2. Inconsistent NULL conversions for top-level values and nested columns, for instance:
```scala
scala> sql("select named_struct('c', null), null").show
+---------------------+----+
|named_struct(c, NULL)|NULL|
+---------------------+----+
|                   []|null|
+---------------------+----+
```
3. `.show()` is different from conversions to Hive strings, and as a consequence its output is different from `spark-sql` (sql tests):
```sql
spark-sql> select named_struct('c', null) as struct;
{"c":null}
```
```scala
scala> sql("select named_struct('c', null) as struct").show
+------+
|struct|
+------+
|    []|
+------+
```

4. It is impossible to distinguish empty struct/array from struct/array with null in the current implementation:
```scala
scala> Seq[Seq[String]](Seq(), Seq(null)).toDF.show()
+-----+
|value|
+-----+
|   []|
|   []|
+-----+
```

### Does this PR introduce _any_ user-facing change?
Yes, before:
```scala
scala> Seq(Seq(""), Seq(null)).toDF().show
+-----+
|value|
+-----+
|   []|
|   []|
+-----+
```

After:
```scala
scala> Seq(Seq(""), Seq(null)).toDF().show
+------+
| value|
+------+
|    []|
|[null]|
+------+
```

### How was this patch tested?
By existing test suite `CastSuite`.

Closes #29311 from MaxGekk/nested-null-to-string.

Authored-by: Max Gekk <max.gekk@gmail.com>
Signed-off-by: Wenchen Fan <wenchen@databricks.com>
2020-08-05 12:03:36 +00:00
Huaxin Gao b14a1e2816 [SPARK-32402][SQL] Implement ALTER TABLE in JDBC Table Catalog
### What changes were proposed in this pull request?
Implement ALTER TABLE in JDBC Table Catalog
The following ALTER TABLE are implemented:
```
ALTER TABLE table_name ADD COLUMNS ( column_name datatype [ , ... ] );
ALTER TABLE table_name RENAME COLUMN old_column_name TO new_column_name;
ALTER TABLE table_name DROP COLUMN column_name;
ALTER TABLE table_name ALTER COLUMN column_name TYPE new_type;
ALTER TABLE table_name ALTER COLUMN column_name SET NOT NULL;
```
I haven't checked ALTER TABLE syntax for all the databases yet. I will check. If there are different syntax, I will have a follow-up to override the dialect.

Seems most of the databases don't support updating comments and column position, so I didn't implement UpdateColumnComment and UpdateColumnPosition.

### Why are the changes needed?
Complete the JDBCTableCatalog implementation

### Does this PR introduce _any_ user-facing change?
Yes
`JDBCTableCatalog.alterTable`

### How was this patch tested?
add new tests

Closes #29324 from huaxingao/alter_table.

Authored-by: Huaxin Gao <huaxing@us.ibm.com>
Signed-off-by: Wenchen Fan <wenchen@databricks.com>
2020-08-05 11:57:33 +00:00
HyukjinKwon 15b73339d9 [SPARK-32507][DOCS][PYTHON] Add main page for PySpark documentation
### What changes were proposed in this pull request?

This PR proposes to write the main page of PySpark documentation. The base work is finished at https://github.com/apache/spark/pull/29188.

### Why are the changes needed?

For better usability and readability in PySpark documentation.

### Does this PR introduce _any_ user-facing change?

Yes, it creates a new main page as below:

![Screen Shot 2020-07-31 at 10 02 44 PM](https://user-images.githubusercontent.com/6477701/89037618-d2d68880-d379-11ea-9a44-562f2aa0e3fd.png)

### How was this patch tested?

Manually built the PySpark documentation.

```bash
cd python
make clean html
```

Closes #29320 from HyukjinKwon/SPARK-32507.

Authored-by: HyukjinKwon <gurwls223@apache.org>
Signed-off-by: HyukjinKwon <gurwls223@apache.org>
2020-08-05 11:14:14 +09:00
Kousuke Saruta 0660a0501d [SPARK-32525][DOCS] The layout of monitoring.html is broken
### What changes were proposed in this pull request?

This PR fixes the layout of monitoring.html broken after SPARK-31566(#28354).
The cause is there are 2 `<td>` tags not closed in `monitoring.md`.

### Why are the changes needed?

This is a bug.

### Does this PR introduce _any_ user-facing change?

No.

### How was this patch tested?

Build docs and the following screenshots are before/after.

* Before fixed
![broken-doc](https://user-images.githubusercontent.com/4736016/89257873-fba09b80-d661-11ea-90da-06cbc0783011.png)

* After fixed.
![fixed-doc2](https://user-images.githubusercontent.com/4736016/89257910-0fe49880-d662-11ea-9a85-7a1ecb1d38d6.png)

Of course, the table is still rendered correctly.
![fixed-doc1](https://user-images.githubusercontent.com/4736016/89257948-225ed200-d662-11ea-80fd-d9254b44d4a0.png)

Closes #29345 from sarutak/fix-monitoring.md.

Authored-by: Kousuke Saruta <sarutak@oss.nttdata.com>
Signed-off-by: Gengliang Wang <gengliang.wang@databricks.com>
2020-08-04 23:27:05 +08:00
Max Gekk 7eb6f45688 [SPARK-32499][SQL] Use {} in conversions maps and structs to strings
### What changes were proposed in this pull request?
Change casting of map and struct values to strings by using the `{}` brackets instead of `[]`. The behavior is controlled by the SQL config `spark.sql.legacy.castComplexTypesToString.enabled`. When it is `true`, `CAST` wraps maps and structs by `[]` in casting to strings. Otherwise, if this is `false`, which is the default, maps and structs are wrapped by `{}`.

### Why are the changes needed?
- To distinguish structs/maps from arrays.
- To make `show`'s output consistent with Hive and conversions to Hive strings.
- To display dataframe content in the same form by `spark-sql` and `show`
- To be consistent with the `*.sql` tests

### Does this PR introduce _any_ user-facing change?
Yes

### How was this patch tested?
By existing test suite `CastSuite`.

Closes #29308 from MaxGekk/show-struct-map.

Authored-by: Max Gekk <max.gekk@gmail.com>
Signed-off-by: Wenchen Fan <wenchen@databricks.com>
2020-08-04 14:57:09 +00:00
Terry Kim 171b7d5d71 [SPARK-23431][CORE] Expose stage level peak executor metrics via REST API
### What changes were proposed in this pull request?

Note that this PR is forked from #23340 originally written by edwinalu.

This PR proposes to expose the peak executor metrics at the stage level via the REST APIs:
* `/applications/<application_id>/stages/`: peak values of executor metrics for **each stage**
* `/applications/<application_id>/stages/<stage_id>/< stage_attempt_id >`: peak values of executor metrics for **each executor** for the stage, followed by peak values of executor metrics for the stage

### Why are the changes needed?

The stage level peak executor metrics can help better understand your application's resource utilization.

### Does this PR introduce _any_ user-facing change?

1. For the `/applications/<application_id>/stages/` API, you will see the following new info for **each stage**:
```JSON
  "peakExecutorMetrics" : {
    "JVMHeapMemory" : 213367864,
    "JVMOffHeapMemory" : 189011656,
    "OnHeapExecutionMemory" : 0,
    "OffHeapExecutionMemory" : 0,
    "OnHeapStorageMemory" : 2133349,
    "OffHeapStorageMemory" : 0,
    "OnHeapUnifiedMemory" : 2133349,
    "OffHeapUnifiedMemory" : 0,
    "DirectPoolMemory" : 282024,
    "MappedPoolMemory" : 0,
    "ProcessTreeJVMVMemory" : 0,
    "ProcessTreeJVMRSSMemory" : 0,
    "ProcessTreePythonVMemory" : 0,
    "ProcessTreePythonRSSMemory" : 0,
    "ProcessTreeOtherVMemory" : 0,
    "ProcessTreeOtherRSSMemory" : 0,
    "MinorGCCount" : 13,
    "MinorGCTime" : 115,
    "MajorGCCount" : 4,
    "MajorGCTime" : 339
  }
```

2. For the `/applications/<application_id>/stages/<stage_id>/<stage_attempt_id>` API, you will see the following new info for **each executor** under `executorSummary`:
```JSON
  "peakMemoryMetrics" : {
    "JVMHeapMemory" : 0,
    "JVMOffHeapMemory" : 0,
    "OnHeapExecutionMemory" : 0,
    "OffHeapExecutionMemory" : 0,
    "OnHeapStorageMemory" : 0,
    "OffHeapStorageMemory" : 0,
    "OnHeapUnifiedMemory" : 0,
    "OffHeapUnifiedMemory" : 0,
    "DirectPoolMemory" : 0,
    "MappedPoolMemory" : 0,
    "ProcessTreeJVMVMemory" : 0,
    "ProcessTreeJVMRSSMemory" : 0,
    "ProcessTreePythonVMemory" : 0,
    "ProcessTreePythonRSSMemory" : 0,
    "ProcessTreeOtherVMemory" : 0,
    "ProcessTreeOtherRSSMemory" : 0,
    "MinorGCCount" : 0,
    "MinorGCTime" : 0,
    "MajorGCCount" : 0,
    "MajorGCTime" : 0
  }
```
, and the following at the stage level:
```JSON
"peakExecutorMetrics" : {
    "JVMHeapMemory" : 213367864,
    "JVMOffHeapMemory" : 189011656,
    "OnHeapExecutionMemory" : 0,
    "OffHeapExecutionMemory" : 0,
    "OnHeapStorageMemory" : 2133349,
    "OffHeapStorageMemory" : 0,
    "OnHeapUnifiedMemory" : 2133349,
    "OffHeapUnifiedMemory" : 0,
    "DirectPoolMemory" : 282024,
    "MappedPoolMemory" : 0,
    "ProcessTreeJVMVMemory" : 0,
    "ProcessTreeJVMRSSMemory" : 0,
    "ProcessTreePythonVMemory" : 0,
    "ProcessTreePythonRSSMemory" : 0,
    "ProcessTreeOtherVMemory" : 0,
    "ProcessTreeOtherRSSMemory" : 0,
    "MinorGCCount" : 13,
    "MinorGCTime" : 115,
    "MajorGCCount" : 4,
    "MajorGCTime" : 339
  }
```

### How was this patch tested?

Added tests.

Closes #29020 from imback82/metrics.

Lead-authored-by: Terry Kim <yuminkim@gmail.com>
Co-authored-by: edwinalu <edwina.lu@gmail.com>
Signed-off-by: Gengliang Wang <gengliang.wang@databricks.com>
2020-08-04 21:11:00 +08:00
fqaiser94@gmail.com 6d69068057 [SPARK-32521][SQL] Bug-fix: WithFields Expression should not be foldable
### What changes were proposed in this pull request?

Make WithFields Expression not foldable.

### Why are the changes needed?

The following query currently fails on master brach:
```
sql("SELECT named_struct('a', 1, 'b', 2) a")
.select($"a".withField("c", lit(3)).as("a"))
.show(false)
// java.lang.UnsupportedOperationException: Cannot evaluate expression: with_fields(named_struct(a, 1, b, 2), c, 3)
```
This happens because the Catalyst optimizer sees that the WithFields Expression is foldable and tries to statically evaluate the WithFields Expression (via the ConstantFolding rule), however it cannot do so because WithFields Expression is Unevaluable.

### Does this PR introduce _any_ user-facing change?

Yes, queries like the one shared above will now succeed.
That said, this bug was introduced in Spark 3.1.0 which has yet to be released.

### How was this patch tested?

A new unit test was added.

Closes #29338 from fqaiser94/SPARK-32521.

Lead-authored-by: fqaiser94@gmail.com <fqaiser94@gmail.com>
Co-authored-by: fqaiser94 <fqaiser94@gmail.com>
Signed-off-by: Wenchen Fan <wenchen@databricks.com>
2020-08-04 12:11:04 +00:00
Dongjoon Hyun 7fec6e0c16 [SPARK-32524][SQL][TESTS] CachedBatchSerializerSuite should clean up InMemoryRelation.ser
### What changes were proposed in this pull request?

This PR aims to clean up `InMemoryRelation.ser` in `CachedBatchSerializerSuite`.

### Why are the changes needed?

SPARK-32274 makes SQL cache serialization pluggable.
```
[SPARK-32274][SQL] Make SQL cache serialization pluggable
```

This causes UT failures.
```
$ build/sbt "sql/testOnly *.CachedBatchSerializerSuite *.CachedTableSuite"
...
[info]   Cause: java.lang.IllegalStateException: This does not work. This is only for testing
[info]   at org.apache.spark.sql.execution.columnar.TestSingleIntColumnarCachedBatchSerializer.convertInternalRowToCachedBatch(CachedBatchSerializerSuite.scala:49)
...
[info] *** 30 TESTS FAILED ***
[error] Failed: Total 51, Failed 30, Errors 0, Passed 21
[error] Failed tests:
[error] 	org.apache.spark.sql.CachedTableSuite
[error] (sql/test:testOnly) sbt.TestsFailedException: Tests unsuccessful
```

### Does this PR introduce _any_ user-facing change?

No.

### How was this patch tested?

Manually.
```
$ build/sbt "sql/testOnly *.CachedBatchSerializerSuite *.CachedTableSuite"
[info] Tests: succeeded 51, failed 0, canceled 0, ignored 0, pending 0
[info] All tests passed.
[info] Passed: Total 51, Failed 0, Errors 0, Passed 51
```

Closes #29346 from dongjoon-hyun/SPARK-32524-3.

Authored-by: Dongjoon Hyun <dongjoon@apache.org>
Signed-off-by: HyukjinKwon <gurwls223@apache.org>
2020-08-04 17:49:52 +09:00
Jungtaek Lim (HeartSaVioR) 005ef3a5b8 [SPARK-32468][SS][TESTS][FOLLOWUP] Provide "default.api.timeout.ms" as well when specifying "request.timeout.ms" on replacing "default.api.timeout.ms"
### What changes were proposed in this pull request?

This patch is a follow-up to fill the gap in #29272 which missed to also provide `default.api.timeout.ms` as well.  #29272 unintentionally changed the behavior on Kafka side timeout which is incompatible with the test timeout. (`default.api.timeout.ms` gets default value which is 60 seconds, longer than test timeout.)

### Why are the changes needed?

We realized the PR for SPARK-32468 (#29272) doesn't work as we expect. See https://github.com/apache/spark/pull/29272#issuecomment-668333483 for more details.

### Does this PR introduce _any_ user-facing change?

No, as it only touches the tests.

### How was this patch tested?

Will trigger builds from Jenkins or Github Action multiple time and confirm.

Closes #29343 from HeartSaVioR/SPARK-32468-FOLLOWUP.

Authored-by: Jungtaek Lim (HeartSaVioR) <kabhwan.opensource@gmail.com>
Signed-off-by: HyukjinKwon <gurwls223@apache.org>
2020-08-04 14:51:25 +09:00
gengjiaan 1597d8fcd4 [SPARK-30276][SQL] Support Filter expression allows simultaneous use of DISTINCT
### What changes were proposed in this pull request?
This PR is related to https://github.com/apache/spark/pull/26656.
https://github.com/apache/spark/pull/26656 only support use FILTER clause on aggregate expression without DISTINCT.
This PR will enhance this feature when one or more DISTINCT aggregate expressions which allows the use of the FILTER clause.
Such as:
```
select sum(distinct id) filter (where sex = 'man') from student;
select class_id, sum(distinct id) filter (where sex = 'man') from student group by class_id;
select count(id) filter (where class_id = 1), sum(distinct id) filter (where sex = 'man') from student;
select class_id, count(id) filter (where class_id = 1), sum(distinct id) filter (where sex = 'man') from student group by class_id;
select sum(distinct id), sum(distinct id) filter (where sex = 'man') from student;
select class_id, sum(distinct id), sum(distinct id) filter (where sex = 'man') from student group by class_id;
select class_id, count(id), count(id) filter (where class_id = 1), sum(distinct id), sum(distinct id) filter (where sex = 'man') from student group by class_id;
```

### Why are the changes needed?
Spark SQL only support use FILTER clause on aggregate expression without DISTINCT.
This PR support Filter expression allows simultaneous use of DISTINCT

### Does this PR introduce _any_ user-facing change?
Yes

### How was this patch tested?
Exists and new UT

Closes #29291 from beliefer/support-distinct-with-filter.

Lead-authored-by: gengjiaan <gengjiaan@360.cn>
Co-authored-by: beliefer <beliefer@163.com>
Signed-off-by: Wenchen Fan <wenchen@databricks.com>
2020-08-04 04:41:19 +00:00
Takuya UESHIN 7deb67c28f [SPARK-32160][CORE][PYSPARK][FOLLOWUP] Change the config name to switch allow/disallow SparkContext in executors
### What changes were proposed in this pull request?

This is a follow-up of #29278.
This PR changes the config name to switch allow/disallow `SparkContext` in executors as per the comment https://github.com/apache/spark/pull/29278#pullrequestreview-460256338.

### Why are the changes needed?

The config name `spark.executor.allowSparkContext` is more reasonable.

### Does this PR introduce _any_ user-facing change?

Yes, the config name is changed.

### How was this patch tested?

Updated tests.

Closes #29340 from ueshin/issues/SPARK-32160/change_config_name.

Authored-by: Takuya UESHIN <ueshin@databricks.com>
Signed-off-by: HyukjinKwon <gurwls223@apache.org>
2020-08-04 12:45:06 +09:00
Max Gekk 9bbe8c7418 [MINOR][SQL] Fix versions in the SQL migration guide for Spark 3.1
### What changes were proposed in this pull request?
Change _To restore the behavior before Spark **3.0**_ to _To restore the behavior before Spark **3.1**_ in the SQL migration guide while telling about the behaviour before new version 3.1.

### Why are the changes needed?
To have correct info in the SQL migration guide.

### Does this PR introduce _any_ user-facing change?
No

### How was this patch tested?
N/A

Closes #29336 from MaxGekk/fix-version-in-sql-migration.

Authored-by: Max Gekk <max.gekk@gmail.com>
Signed-off-by: HyukjinKwon <gurwls223@apache.org>
2020-08-04 11:23:28 +09:00
Max Gekk f3b10f526b [SPARK-32290][SQL][FOLLOWUP] Add version for the SQL config spark.sql.optimizeNullAwareAntiJoin
### What changes were proposed in this pull request?
Add the version `3.1.0` for the SQL config `spark.sql.optimizeNullAwareAntiJoin`.

### Why are the changes needed?
To inform users when the config was added, for example on the page http://spark.apache.org/docs/latest/configuration.html.

### Does this PR introduce _any_ user-facing change?
No

### How was this patch tested?
By compiling and running `./dev/scalastyle`.

Closes #29335 from MaxGekk/leanken-SPARK-32290-followup.

Authored-by: Max Gekk <max.gekk@gmail.com>
Signed-off-by: Wenchen Fan <wenchen@databricks.com>
2020-08-03 16:05:54 +00:00
Huaxin Gao bc7885901d [SPARK-32310][ML][PYSPARK] ML params default value parity in feature and tuning
### What changes were proposed in this pull request?
set params default values in trait Params for feature and tuning in both Scala and Python.

### Why are the changes needed?
Make ML has the same default param values between estimator and its corresponding transformer, and also between Scala and Python.

### Does this PR introduce _any_ user-facing change?
No

### How was this patch tested?
Existing and modified tests

Closes #29153 from huaxingao/default2.

Authored-by: Huaxin Gao <huaxing@us.ibm.com>
Signed-off-by: Huaxin Gao <huaxing@us.ibm.com>
2020-08-03 08:50:34 -07:00
Takeshi Yamamuro c6109ba918 [SPARK-32257][SQL] Reports explicit errors for invalid usage of SET/RESET command
### What changes were proposed in this pull request?

This PR modified the parser code to handle invalid usages of a SET/RESET command.
For example;
```
SET spark.sql.ansi.enabled true
```
The above SQL command does not change the configuration value and it just tries to display the value of the configuration
`spark.sql.ansi.enabled true`. This PR disallows using special characters including spaces in the configuration name and reports a user-friendly error instead. In the error message, it tells users a workaround to use quotes or a string literal if they still needs to specify a configuration with them. 

Before this PR:
```
scala> sql("SET spark.sql.ansi.enabled true").show(1, -1)
+---------------------------+-----------+
|key                        |value      |
+---------------------------+-----------+
|spark.sql.ansi.enabled true|<undefined>|
+---------------------------+-----------+
```

After this PR:
```
scala> sql("SET spark.sql.ansi.enabled true")
org.apache.spark.sql.catalyst.parser.ParseException:
Expected format is 'SET', 'SET key', or 'SET key=value'. If you want to include special characters in key, please use quotes, e.g., SET `ke y`=value.(line 1, pos 0)

== SQL ==
SET spark.sql.ansi.enabled true
^^^
```

### Why are the changes needed?

For better user-friendly errors.

### Does this PR introduce _any_ user-facing change?

No.

### How was this patch tested?

Added tests in `SparkSqlParserSuite`.

Closes #29146 from maropu/SPARK-32257.

Lead-authored-by: Takeshi Yamamuro <yamamuro@apache.org>
Co-authored-by: Wenchen Fan <wenchen@databricks.com>
Signed-off-by: Wenchen Fan <wenchen@databricks.com>
2020-08-03 13:00:07 +00:00
Kent Yao 7f5326c082 [SPARK-32492][SQL] Fulfill missing column meta information COLUMN_SIZE /DECIMAL_DIGITS/NUM_PREC_RADIX/ORDINAL_POSITION for thriftserver client tools
### What changes were proposed in this pull request?

This PR fulfills some missing fields for SparkGetColumnsOperation including COLUMN_SIZE /DECIMAL_DIGITS/NUM_PREC_RADIX/ORDINAL_POSITION

and improve the test coverage.

### Why are the changes needed?

make jdbc tools happier

### Does this PR introduce _any_ user-facing change?

yes,

#### before
![image](https://user-images.githubusercontent.com/8326978/88911764-e78b2180-d290-11ea-8abb-96f137f9c3c4.png)

#### after

![image](https://user-images.githubusercontent.com/8326978/88911709-d04c3400-d290-11ea-90ab-02bda3e628e9.png)

![image](https://user-images.githubusercontent.com/8326978/88912007-39cc4280-d291-11ea-96d6-1ef3abbbddec.png)

### How was this patch tested?

add unit tests

Closes #29303 from yaooqinn/SPARK-32492.

Authored-by: Kent Yao <yaooqinn@hotmail.com>
Signed-off-by: Wenchen Fan <wenchen@databricks.com>
2020-08-03 12:49:34 +00:00
Kent Yao 3deb59d5c2 [SPARK-31709][SQL] Proper base path for database/table location when it is a relative path
### What changes were proposed in this pull request?

Currently, the user home directory is used as the base path for the database and table locations when their locationa are specified with a relative paths, e.g.
```sql
> set spark.sql.warehouse.dir;
spark.sql.warehouse.dir	file:/Users/kentyao/Downloads/spark/spark-3.1.0-SNAPSHOT-bin-20200512/spark-warehouse/
spark-sql> create database loctest location 'loctestdbdir';

spark-sql> desc database loctest;
Database Name	loctest
Comment
Location	file:/Users/kentyao/Downloads/spark/spark-3.1.0-SNAPSHOT-bin-20200512/loctestdbdir
Owner	kentyao

spark-sql> create table loctest(id int) location 'loctestdbdir';
spark-sql> desc formatted loctest;
id	int	NULL

# Detailed Table Information
Database	default
Table	loctest
Owner	kentyao
Created Time	Thu May 14 16:29:05 CST 2020
Last Access	UNKNOWN
Created By	Spark 3.1.0-SNAPSHOT
Type	EXTERNAL
Provider	parquet
Location	file:/Users/kentyao/Downloads/spark/spark-3.1.0-SNAPSHOT-bin-20200512/loctestdbdir
Serde Library	org.apache.hadoop.hive.ql.io.parquet.serde.ParquetHiveSerDe
InputFormat	org.apache.hadoop.hive.ql.io.parquet.MapredParquetInputFormat
OutputFormat	org.apache.hadoop.hive.ql.io.parquet.MapredParquetOutputFormat
```
The user home is not always warehouse-related, unchangeable in runtime, and shared both by database and table as the parent directory. Meanwhile, we use the table path as the parent directory for relative partition locations.

The config `spark.sql.warehouse.dir` represents `the default location for managed databases and tables`.
For databases, the case above seems not to follow its semantics, because it should use ` `spark.sql.warehouse.dir` as the base path instead.

For tables, it seems to be right but here I suggest enriching the meaning that lets it also be the for external tables with relative paths for locations.

With changes in this PR,

The location of a database will be `warehouseDir/dbpath` when `dbpath` is relative.
The location of a table will be `dbpath/tblpath` when `tblpath` is relative.

### Why are the changes needed?

bugfix and improvement

Firstly, the databases with relative locations should be created under the default location specified by `spark.sql.warehouse.dir`.

Secondly, the external tables with relative paths may also follow this behavior for consistency.

At last, the behavior for database, tables and partitions with relative paths to choose base paths should be the same.

### Does this PR introduce _any_ user-facing change?

Yes, this PR changes the `createDatabase`, `alterDatabase`, `createTable` and `alterTable` APIs and related DDLs. If the LOCATION clause is followed by a relative path, the root path will be `spark.sql.warehouse.dir` for databases, and `spark.sql.warehouse.dir` / `dbPath` for tables.

e.g.

#### after
```sql
spark-sql> desc database loctest;
Database Name	loctest
Comment
Location	file:/Users/kentyao/Downloads/spark/spark-3.1.0-SNAPSHOT-bin-SPARK-31709/spark-warehouse/loctest
Owner	kentyao
spark-sql> use loctest;
spark-sql> create table loctest(id int) location 'loctest';
20/05/14 18:18:02 WARN InMemoryFileIndex: The directory file:/Users/kentyao/Downloads/spark/spark-3.1.0-SNAPSHOT-bin-SPARK-31709/loctest was not found. Was it deleted very recently?
20/05/14 18:18:02 WARN SessionState: METASTORE_FILTER_HOOK will be ignored, since hive.security.authorization.manager is set to instance of HiveAuthorizerFactory.
20/05/14 18:18:03 WARN HiveConf: HiveConf of name hive.internal.ss.authz.settings.applied.marker does not exist
20/05/14 18:18:03 WARN HiveConf: HiveConf of name hive.stats.jdbc.timeout does not exist
20/05/14 18:18:03 WARN HiveConf: HiveConf of name hive.stats.retries.wait does not exist
spark-sql> desc formatted loctest;
id	int	NULL

# Detailed Table Information
Database	loctest
Table	loctest
Owner	kentyao
Created Time	Thu May 14 18:18:03 CST 2020
Last Access	UNKNOWN
Created By	Spark 3.1.0-SNAPSHOT
Type	EXTERNAL
Provider	parquet
Location	file:/Users/kentyao/Downloads/spark/spark-3.1.0-SNAPSHOT-bin-SPARK-31709/spark-warehouse/loctest/loctest
Serde Library	org.apache.hadoop.hive.ql.io.parquet.serde.ParquetHiveSerDe
InputFormat	org.apache.hadoop.hive.ql.io.parquet.MapredParquetInputFormat
OutputFormat	org.apache.hadoop.hive.ql.io.parquet.MapredParquetOutputFormat
spark-sql> alter table loctest set location 'loctest2'
         > ;
spark-sql> desc formatted loctest;
id	int	NULL

# Detailed Table Information
Database	loctest
Table	loctest
Owner	kentyao
Created Time	Thu May 14 18:18:03 CST 2020
Last Access	UNKNOWN
Created By	Spark 3.1.0-SNAPSHOT
Type	EXTERNAL
Provider	parquet
Location	file:/Users/kentyao/Downloads/spark/spark-3.1.0-SNAPSHOT-bin-SPARK-31709/spark-warehouse/loctest/loctest2
Serde Library	org.apache.hadoop.hive.ql.io.parquet.serde.ParquetHiveSerDe
InputFormat	org.apache.hadoop.hive.ql.io.parquet.MapredParquetInputFormat
OutputFormat	org.apache.hadoop.hive.ql.io.parquet.MapredParquetOutputFormat
```
### How was this patch tested?

Add unit tests.

Closes #28527 from yaooqinn/SPARK-31709.

Authored-by: Kent Yao <yaooqinn@hotmail.com>
Signed-off-by: Wenchen Fan <wenchen@databricks.com>
2020-08-03 12:48:22 +00:00
beliefer 42f9ee4c7d [SPARK-24884][SQL] Support regexp function regexp_extract_all
### What changes were proposed in this pull request?
`regexp_extract_all` is a very useful function expanded the capabilities of `regexp_extract`.
There are some description of this function.
```
SELECT regexp_extract('1a 2b 14m', '\d+', 0); -- 1
SELECT regexp_extract_all('1a 2b 14m', '\d+', 0); -- [1, 2, 14]
SELECT regexp_extract('1a 2b 14m', '(\d+)([a-z]+)', 2); -- 'a'
SELECT regexp_extract_all('1a 2b 14m', '(\d+)([a-z]+)', 2); -- ['a', 'b', 'm']
```
There are some mainstream database support the syntax.
**Presto:**
https://prestodb.io/docs/current/functions/regexp.html

**Pig:**
https://pig.apache.org/docs/latest/api/org/apache/pig/builtin/REGEX_EXTRACT_ALL.html

Note: This PR pick up the work of https://github.com/apache/spark/pull/21985
### Why are the changes needed?
`regexp_extract_all` is a very useful function and make work easier.

### Does this PR introduce any user-facing change?
No

### How was this patch tested?
New UT

Closes #27507 from beliefer/support-regexp_extract_all.

Lead-authored-by: beliefer <beliefer@163.com>
Co-authored-by: gengjiaan <gengjiaan@360.cn>
Co-authored-by: Jiaan Geng <beliefer@163.com>
Signed-off-by: Wenchen Fan <wenchen@databricks.com>
2020-08-03 06:03:55 +00:00
Prakhar Jain 7a09e71198 [SPARK-32509][SQL] Ignore unused DPP True Filter in Canonicalization
### What changes were proposed in this pull request?
This PR fixes issues relate to Canonicalization of FileSourceScanExec when it contains unused DPP Filter.

### Why are the changes needed?

As part of PlanDynamicPruningFilter rule, the unused DPP Filter are simply replaced by `DynamicPruningExpression(TrueLiteral)` so that they can be avoided. But these unnecessary`DynamicPruningExpression(TrueLiteral)` partition filter inside the FileSourceScanExec affects the canonicalization of the node and so in many cases, this can prevent ReuseExchange from happening.

This PR fixes this issue by ignoring the unused DPP filter in the `def doCanonicalize` method.

### Does this PR introduce _any_ user-facing change?
No

### How was this patch tested?
Added UT.

Closes #29318 from prakharjain09/SPARK-32509_df_reuse.

Authored-by: Prakhar Jain <prakharjain09@gmail.com>
Signed-off-by: Wenchen Fan <wenchen@databricks.com>
2020-08-03 03:26:03 +00:00
Max Gekk fda397d9c8 [SPARK-32510][SQL] Check duplicate nested columns in read from JDBC datasource
### What changes were proposed in this pull request?
Check that there are not duplicate column names on the same level (top level or nested levels) in reading from JDBC datasource. If such duplicate columns exist, throw the exception:
```
org.apache.spark.sql.AnalysisException: Found duplicate column(s) in the customSchema option value:
```
The check takes into account the SQL config `spark.sql.caseSensitive` (`false` by default).

### Why are the changes needed?
To make handling of duplicate nested columns is similar to handling of duplicate top-level columns i. e. output the same error:
```Scala
org.apache.spark.sql.AnalysisException: Found duplicate column(s) in the customSchema option value: `camelcase`
```

Checking of top-level duplicates was introduced by https://github.com/apache/spark/pull/17758, and duplicates in nested structures by https://github.com/apache/spark/pull/29234.

### Does this PR introduce _any_ user-facing change?
Yes.

### How was this patch tested?
Added new test suite `JdbcNestedDataSourceSuite`.

Closes #29317 from MaxGekk/jdbc-dup-nested-columns.

Authored-by: Max Gekk <max.gekk@gmail.com>
Signed-off-by: Wenchen Fan <wenchen@databricks.com>
2020-08-03 03:20:17 +00:00
Robert (Bobby) Evans 713124d5e3 [SPARK-32274][SQL] Make SQL cache serialization pluggable
### What changes were proposed in this pull request?

Add a config to let users change how SQL/Dataframe data is compressed when cached.

This adds a few new classes/APIs for use with this config.

1. `CachedBatch` is a trait used to tag data that is intended to be cached. It has a few APIs that lets us keep the compression/serialization of the data separate from the metrics about it.
2. `CachedBatchSerializer` provides the APIs that must be implemented to cache data.
    * `convertForCache` is an API that runs a cached spark plan and turns its result into an `RDD[CachedBatch]`.  The actual caching is done outside of this API
   * `buildFilter` is an API that takes a set of predicates and builds a filter function that can be used to filter the `RDD[CachedBatch]` returned by `convertForCache`
   * `decompressColumnar` decompresses an `RDD[CachedBatch]` into an `RDD[ColumnarBatch]` This is only used for a limited set of data types.  These data types may expand in the future.  If they do we can add in a new API with a default value that says which data types this serializer supports.
   * `decompressToRows` decompresses an `RDD[CachedBatch]` into an `RDD[InternalRow]` this API, like `decompressColumnar` decompresses the data in `CachedBatch` but turns it into `InternalRow`s, typically using code generation for performance reasons.

There is also an API that lets you reuse the current filtering based on min/max values. `SimpleMetricsCachedBatch` and `SimpleMetricsCachedBatchSerializer`.

### Why are the changes needed?

This lets users explore different types of compression and compression ratios.

### Does this PR introduce _any_ user-facing change?

This adds in a single config, and exposes some developer API classes described above.

### How was this patch tested?

I ran the unit tests around this and I also did some manual performance tests.  I could find any performance difference between the old and new code, and if there is any it is within error.

Closes #29067 from revans2/pluggable_cache_serializer.

Authored-by: Robert (Bobby) Evans <bobby@apache.org>
Signed-off-by: Wenchen Fan <wenchen@databricks.com>
2020-08-03 03:15:54 +00:00
yangjie01 0693d8bbf2 [SPARK-32490][BUILD] Upgrade netty-all to 4.1.51.Final
### What changes were proposed in this pull request?
This PR aims to bring the bug fixes from the latest netty version.

### Why are the changes needed?
- 4.1.48.Final: [https://github.com/netty/netty/milestone/223?closed=1](https://github.com/netty/netty/milestone/223?closed=1)(14 patches or issues)
- 4.1.49.Final: [https://github.com/netty/netty/milestone/224?closed=1](https://github.com/netty/netty/milestone/224?closed=1)(48 patches or issues)
- 4.1.50.Final: [https://github.com/netty/netty/milestone/225?closed=1](https://github.com/netty/netty/milestone/225?closed=1)(38 patches or issues)
- 4.1.51.Final: [https://github.com/netty/netty/milestone/226?closed=1](https://github.com/netty/netty/milestone/226?closed=1)(53 patches or issues)

### Does this PR introduce _any_ user-facing change?
No

### How was this patch tested?
Pass the Jenkins with the existing tests.

Closes #29299 from LuciferYang/upgrade-netty-version.

Authored-by: yangjie01 <yangjie01@baidu.com>
Signed-off-by: Dongjoon Hyun <dongjoon@apache.org>
2020-08-02 16:46:11 -07:00
Gengliang Wang 71aea02e9f [SPARK-32467][UI] Avoid encoding URL twice on https redirect
### What changes were proposed in this pull request?

When https is enabled for Spark UI, an HTTP request will be redirected as an encoded HTTPS URL: https://github.com/apache/spark/pull/10238/files#diff-f79a5ead735b3d0b34b6b94486918e1cR312

When we create the redirect url, we will call getRequestURI and getQueryString. Both two methods may return an encoded string. However, we pass them directly to the following URI constructor
```
URI(String scheme, String authority, String path, String query, String fragment)
```
As this URI constructor assumes both path and query parameters are decoded strings, it will encode them again. This makes the redirect URL encoded twice.

This problem is on stage page with HTTPS enabled. The URL of "/taskTable" contains query parameter `order%5B0%5D%5Bcolumn%5D`. After encoded it becomes  `order%255B0%255D%255Bcolumn%255D` and it will be decoded as `order%5B0%5D%5Bcolumn%5D` instead of `order[0][dir]`.  When the parameter `order[0][dir]` is missing, there will be an excetpion from:
https://github.com/apache/spark/blob/master/core/src/main/scala/org/apache/spark/status/api/v1/StagesResource.scala#L176
and the stage page fail to load.

To fix the problem, we can try decoding the query parameters before encoding it. This is to make sure we encode the URL

### Why are the changes needed?

Fix a UI issue when HTTPS is enabled

### Does this PR introduce _any_ user-facing change?

No

### How was this patch tested?

A new Unit test + manually test on a cluster

Closes #29271 from gengliangwang/urlEncode.

Authored-by: Gengliang Wang <gengliang.wang@databricks.com>
Signed-off-by: Gengliang Wang <gengliang.wang@databricks.com>
2020-08-01 13:09:26 +08:00
Wenchen Fan 1c6dff7b5f [SPARK-32083][SQL] AQE coalesce should at least return one partition
### What changes were proposed in this pull request?

This PR updates the AQE framework to at least return one partition during coalescing.

This PR also updates `ShuffleExchangeExec.canChangeNumPartitions` to not coalesce for `SinglePartition`.

### Why are the changes needed?

It's a bit risky to return 0 partitions, as sometimes it's different from empty data. For example, global aggregate will return one result row even if the input table is empty. If there is 0 partition, no task will be run and no result will be returned. More specifically, the global aggregate requires `AllTuples` and we can't coalesce to 0 partitions.

This is not a real bug for now. The global aggregate will be planned as partial and final physical agg nodes. The partial agg will return at least one row, so that the shuffle still have data. But it's better to fix this issue to avoid potential bugs in the future.

According to https://github.com/apache/spark/pull/28916, this change also fix some perf problems.

### Does this PR introduce _any_ user-facing change?

no

### How was this patch tested?

updated test.

Closes #29307 from cloud-fan/aqe.

Authored-by: Wenchen Fan <wenchen@databricks.com>
Signed-off-by: Wenchen Fan <wenchen@databricks.com>
2020-07-31 14:20:20 +00:00
Yuanjian Li 354313b6bc [SPARK-31894][SS][FOLLOW-UP] Rephrase the config doc
### What changes were proposed in this pull request?
Address comment in https://github.com/apache/spark/pull/28707#discussion_r461102749

### Why are the changes needed?
Hide the implementation details in the config doc.

### Does this PR introduce _any_ user-facing change?
Config doc change.

### How was this patch tested?
Document only.

Closes #29315 from xuanyuanking/SPARK-31894-follow.

Authored-by: Yuanjian Li <yuanjian.li@databricks.com>
Signed-off-by: Wenchen Fan <wenchen@databricks.com>
2020-07-31 14:16:41 +00:00
Venkata krishnan Sowrirajan 4eaf3a0a23 [SPARK-31418][CORE][FOLLOW-UP][MINOR] Fix log messages to print stage id instead of the object name
### What changes were proposed in this pull request?
Just few log lines fixes which are logging the object name instead of the stage IDs

### Why are the changes needed?
This would make it easier later for debugging.

### Does this PR introduce _any_ user-facing change?
No

### How was this patch tested?
Just log messages. Existing tests should be enough

Closes #29279 from venkata91/SPARK-31418-follow-up.

Authored-by: Venkata krishnan Sowrirajan <vsowrirajan@linkedin.com>
Signed-off-by: HyukjinKwon <gurwls223@apache.org>
2020-07-31 22:12:24 +09:00
Kent Yao f4800406a4 [SPARK-32406][SQL][FOLLOWUP] Make RESET fail against static and core configs
### What changes were proposed in this pull request?

This followup addresses comments from https://github.com/apache/spark/pull/29202#discussion_r462054784

1. make RESET static SQL configs/spark core configs fail as same as the SET command. Not that, for core ones, they have to be pre-registered, otherwise, they are still able to be SET/RESET

2. add test cases for configurations w/ optional default values

### Why are the changes needed?

behavior change with suggestions from PMCs

### Does this PR introduce _any_ user-facing change?

Yes, RESET will fail after this PR, before it just does nothing because the static ones are static.

### How was this patch tested?

add more tests.

Closes #29297 from yaooqinn/SPARK-32406-F.

Authored-by: Kent Yao <yaooqinn@hotmail.com>
Signed-off-by: HyukjinKwon <gurwls223@apache.org>
2020-07-31 22:11:05 +09:00
Takuya UESHIN 8014b0b5d6 [SPARK-32160][CORE][PYSPARK] Add a config to switch allow/disallow to create SparkContext in executors
### What changes were proposed in this pull request?

This is a follow-up of #28986.
This PR adds a config to switch allow/disallow to create `SparkContext` in executors.

- `spark.driver.allowSparkContextInExecutors`

### Why are the changes needed?

Some users or libraries actually create `SparkContext` in executors.
We shouldn't break their workloads.

### Does this PR introduce _any_ user-facing change?

Yes, users will be able to create `SparkContext` in executors with the config enabled.

### How was this patch tested?

More tests are added.

Closes #29278 from ueshin/issues/SPARK-32160/add_configs.

Authored-by: Takuya UESHIN <ueshin@databricks.com>
Signed-off-by: HyukjinKwon <gurwls223@apache.org>
2020-07-31 17:28:35 +09:00
Gabor Somogyi 813532d103 [SPARK-32468][SS][TESTS] Fix timeout config issue in Kafka connector tests
### What changes were proposed in this pull request?
While I'm implementing SPARK-32032 I've found a bug in Kafka: https://issues.apache.org/jira/browse/KAFKA-10318. This will cause issues only later when it's fixed but it would be good to fix it now because SPARK-32032 would like to bring in `AdminClient` where the code blows up with the mentioned `ConfigException`. This would reduce the code changes in the mentioned jira. In this PR I've changed `default.api.timeout.ms` to `request.timeout.ms` which fulfils this condition.

### Why are the changes needed?
Solve later problems and reduce SPARK-32032 PR size.

### Does this PR introduce _any_ user-facing change?
No.

### How was this patch tested?
Existing unit tests.

Closes #29272 from gaborgsomogyi/SPARK-32468.

Authored-by: Gabor Somogyi <gabor.g.somogyi@gmail.com>
Signed-off-by: Jungtaek Lim (HeartSaVioR) <kabhwan.opensource@gmail.com>
2020-07-31 14:52:33 +09:00
Cheng Su ae82768c13 [SPARK-32421][SQL] Add code-gen for shuffled hash join
### What changes were proposed in this pull request?

Adding codegen for shuffled hash join. Shuffled hash join codegen is very similar to broadcast hash join codegen. So most of code change is to refactor existing codegen in `BroadcastHashJoinExec` to `HashJoin`.

Example codegen for query in [`JoinBenchmark`](https://github.com/apache/spark/blob/master/sql/core/src/test/scala/org/apache/spark/sql/execution/benchmark/JoinBenchmark.scala#L153):

```
  def shuffleHashJoin(): Unit = {
    val N: Long = 4 << 20
    withSQLConf(
      SQLConf.SHUFFLE_PARTITIONS.key -> "2",
      SQLConf.AUTO_BROADCASTJOIN_THRESHOLD.key -> "10000000",
      SQLConf.PREFER_SORTMERGEJOIN.key -> "false") {
      codegenBenchmark("shuffle hash join", N) {
        val df1 = spark.range(N).selectExpr(s"id as k1")
        val df2 = spark.range(N / 3).selectExpr(s"id * 3 as k2")
        val df = df1.join(df2, col("k1") === col("k2"))
        assert(df.queryExecution.sparkPlan.find(_.isInstanceOf[ShuffledHashJoinExec]).isDefined)
        df.noop()
      }
    }
  }
```

Shuffled hash join codegen:

```
== Subtree 3 / 3 (maxMethodCodeSize:113; maxConstantPoolSize:126(0.19% used); numInnerClasses:0) ==
*(3) ShuffledHashJoin [k1#2L], [k2#6L], Inner, BuildRight
:- *(1) Project [id#0L AS k1#2L]
:  +- *(1) Range (0, 4194304, step=1, splits=1)
+- *(2) Project [(id#4L * 3) AS k2#6L]
   +- *(2) Range (0, 1398101, step=1, splits=1)

Generated code:
/* 001 */ public Object generate(Object[] references) {
/* 002 */   return new GeneratedIteratorForCodegenStage3(references);
/* 003 */ }
/* 004 */
/* 005 */ // codegenStageId=3
/* 006 */ final class GeneratedIteratorForCodegenStage3 extends org.apache.spark.sql.execution.BufferedRowIterator {
/* 007 */   private Object[] references;
/* 008 */   private scala.collection.Iterator[] inputs;
/* 009 */   private scala.collection.Iterator inputadapter_input_0;
/* 010 */   private org.apache.spark.sql.execution.joins.HashedRelation shj_relation_0;
/* 011 */   private org.apache.spark.sql.catalyst.expressions.codegen.UnsafeRowWriter[] shj_mutableStateArray_0 = new org.apache.spark.sql.catalyst.expressions.codegen.UnsafeRowWriter[1];
/* 012 */
/* 013 */   public GeneratedIteratorForCodegenStage3(Object[] references) {
/* 014 */     this.references = references;
/* 015 */   }
/* 016 */
/* 017 */   public void init(int index, scala.collection.Iterator[] inputs) {
/* 018 */     partitionIndex = index;
/* 019 */     this.inputs = inputs;
/* 020 */     inputadapter_input_0 = inputs[0];
/* 021 */     shj_relation_0 = ((org.apache.spark.sql.execution.joins.ShuffledHashJoinExec) references[0] /* plan */).buildHashedRelation(inputs[1]);
/* 022 */     shj_mutableStateArray_0[0] = new org.apache.spark.sql.catalyst.expressions.codegen.UnsafeRowWriter(2, 0);
/* 023 */
/* 024 */   }
/* 025 */
/* 026 */   private void shj_doConsume_0(InternalRow inputadapter_row_0, long shj_expr_0_0) throws java.io.IOException {
/* 027 */     // generate join key for stream side
/* 028 */
/* 029 */     // find matches from HashRelation
/* 030 */     scala.collection.Iterator shj_matches_0 = false ?
/* 031 */     null : (scala.collection.Iterator)shj_relation_0.get(shj_expr_0_0);
/* 032 */     if (shj_matches_0 != null) {
/* 033 */       while (shj_matches_0.hasNext()) {
/* 034 */         UnsafeRow shj_matched_0 = (UnsafeRow) shj_matches_0.next();
/* 035 */         {
/* 036 */           ((org.apache.spark.sql.execution.metric.SQLMetric) references[1] /* numOutputRows */).add(1);
/* 037 */
/* 038 */           long shj_value_1 = shj_matched_0.getLong(0);
/* 039 */           shj_mutableStateArray_0[0].reset();
/* 040 */
/* 041 */           shj_mutableStateArray_0[0].write(0, shj_expr_0_0);
/* 042 */
/* 043 */           shj_mutableStateArray_0[0].write(1, shj_value_1);
/* 044 */           append((shj_mutableStateArray_0[0].getRow()).copy());
/* 045 */
/* 046 */         }
/* 047 */       }
/* 048 */     }
/* 049 */
/* 050 */   }
/* 051 */
/* 052 */   protected void processNext() throws java.io.IOException {
/* 053 */     while ( inputadapter_input_0.hasNext()) {
/* 054 */       InternalRow inputadapter_row_0 = (InternalRow) inputadapter_input_0.next();
/* 055 */
/* 056 */       long inputadapter_value_0 = inputadapter_row_0.getLong(0);
/* 057 */
/* 058 */       shj_doConsume_0(inputadapter_row_0, inputadapter_value_0);
/* 059 */       if (shouldStop()) return;
/* 060 */     }
/* 061 */   }
/* 062 */
/* 063 */ }
```

Broadcast hash join codegen for the same query (for reference here):

```
== Subtree 2 / 2 (maxMethodCodeSize:280; maxConstantPoolSize:218(0.33% used); numInnerClasses:0) ==
*(2) BroadcastHashJoin [k1#2L], [k2#6L], Inner, BuildRight, false
:- *(2) Project [id#0L AS k1#2L]
:  +- *(2) Range (0, 4194304, step=1, splits=1)
+- BroadcastExchange HashedRelationBroadcastMode(List(input[0, bigint, false]),false), [id=#22]
   +- *(1) Project [(id#4L * 3) AS k2#6L]
      +- *(1) Range (0, 1398101, step=1, splits=1)

Generated code:
/* 001 */ public Object generate(Object[] references) {
/* 002 */   return new GeneratedIteratorForCodegenStage2(references);
/* 003 */ }
/* 004 */
/* 005 */ // codegenStageId=2
/* 006 */ final class GeneratedIteratorForCodegenStage2 extends org.apache.spark.sql.execution.BufferedRowIterator {
/* 007 */   private Object[] references;
/* 008 */   private scala.collection.Iterator[] inputs;
/* 009 */   private boolean range_initRange_0;
/* 010 */   private long range_nextIndex_0;
/* 011 */   private TaskContext range_taskContext_0;
/* 012 */   private InputMetrics range_inputMetrics_0;
/* 013 */   private long range_batchEnd_0;
/* 014 */   private long range_numElementsTodo_0;
/* 015 */   private org.apache.spark.sql.execution.joins.LongHashedRelation bhj_relation_0;
/* 016 */   private org.apache.spark.sql.catalyst.expressions.codegen.UnsafeRowWriter[] range_mutableStateArray_0 = new org.apache.spark.sql.catalyst.expressions.codegen.UnsafeRowWriter[4];
/* 017 */
/* 018 */   public GeneratedIteratorForCodegenStage2(Object[] references) {
/* 019 */     this.references = references;
/* 020 */   }
/* 021 */
/* 022 */   public void init(int index, scala.collection.Iterator[] inputs) {
/* 023 */     partitionIndex = index;
/* 024 */     this.inputs = inputs;
/* 025 */
/* 026 */     range_taskContext_0 = TaskContext.get();
/* 027 */     range_inputMetrics_0 = range_taskContext_0.taskMetrics().inputMetrics();
/* 028 */     range_mutableStateArray_0[0] = new org.apache.spark.sql.catalyst.expressions.codegen.UnsafeRowWriter(1, 0);
/* 029 */     range_mutableStateArray_0[1] = new org.apache.spark.sql.catalyst.expressions.codegen.UnsafeRowWriter(1, 0);
/* 030 */     range_mutableStateArray_0[2] = new org.apache.spark.sql.catalyst.expressions.codegen.UnsafeRowWriter(1, 0);
/* 031 */
/* 032 */     bhj_relation_0 = ((org.apache.spark.sql.execution.joins.LongHashedRelation) ((org.apache.spark.broadcast.TorrentBroadcast) references[1] /* broadcast */).value()).asReadOnlyCopy();
/* 033 */     incPeakExecutionMemory(bhj_relation_0.estimatedSize());
/* 034 */
/* 035 */     range_mutableStateArray_0[3] = new org.apache.spark.sql.catalyst.expressions.codegen.UnsafeRowWriter(2, 0);
/* 036 */
/* 037 */   }
/* 038 */
/* 039 */   private void initRange(int idx) {
/* 040 */     java.math.BigInteger index = java.math.BigInteger.valueOf(idx);
/* 041 */     java.math.BigInteger numSlice = java.math.BigInteger.valueOf(1L);
/* 042 */     java.math.BigInteger numElement = java.math.BigInteger.valueOf(4194304L);
/* 043 */     java.math.BigInteger step = java.math.BigInteger.valueOf(1L);
/* 044 */     java.math.BigInteger start = java.math.BigInteger.valueOf(0L);
/* 045 */     long partitionEnd;
/* 046 */
/* 047 */     java.math.BigInteger st = index.multiply(numElement).divide(numSlice).multiply(step).add(start);
/* 048 */     if (st.compareTo(java.math.BigInteger.valueOf(Long.MAX_VALUE)) > 0) {
/* 049 */       range_nextIndex_0 = Long.MAX_VALUE;
/* 050 */     } else if (st.compareTo(java.math.BigInteger.valueOf(Long.MIN_VALUE)) < 0) {
/* 051 */       range_nextIndex_0 = Long.MIN_VALUE;
/* 052 */     } else {
/* 053 */       range_nextIndex_0 = st.longValue();
/* 054 */     }
/* 055 */     range_batchEnd_0 = range_nextIndex_0;
/* 056 */
/* 057 */     java.math.BigInteger end = index.add(java.math.BigInteger.ONE).multiply(numElement).divide(numSlice)
/* 058 */     .multiply(step).add(start);
/* 059 */     if (end.compareTo(java.math.BigInteger.valueOf(Long.MAX_VALUE)) > 0) {
/* 060 */       partitionEnd = Long.MAX_VALUE;
/* 061 */     } else if (end.compareTo(java.math.BigInteger.valueOf(Long.MIN_VALUE)) < 0) {
/* 062 */       partitionEnd = Long.MIN_VALUE;
/* 063 */     } else {
/* 064 */       partitionEnd = end.longValue();
/* 065 */     }
/* 066 */
/* 067 */     java.math.BigInteger startToEnd = java.math.BigInteger.valueOf(partitionEnd).subtract(
/* 068 */       java.math.BigInteger.valueOf(range_nextIndex_0));
/* 069 */     range_numElementsTodo_0  = startToEnd.divide(step).longValue();
/* 070 */     if (range_numElementsTodo_0 < 0) {
/* 071 */       range_numElementsTodo_0 = 0;
/* 072 */     } else if (startToEnd.remainder(step).compareTo(java.math.BigInteger.valueOf(0L)) != 0) {
/* 073 */       range_numElementsTodo_0++;
/* 074 */     }
/* 075 */   }
/* 076 */
/* 077 */   private void bhj_doConsume_0(long bhj_expr_0_0) throws java.io.IOException {
/* 078 */     // generate join key for stream side
/* 079 */
/* 080 */     // find matches from HashedRelation
/* 081 */     UnsafeRow bhj_matched_0 = false ? null: (UnsafeRow)bhj_relation_0.getValue(bhj_expr_0_0);
/* 082 */     if (bhj_matched_0 != null) {
/* 083 */       {
/* 084 */         ((org.apache.spark.sql.execution.metric.SQLMetric) references[2] /* numOutputRows */).add(1);
/* 085 */
/* 086 */         long bhj_value_2 = bhj_matched_0.getLong(0);
/* 087 */         range_mutableStateArray_0[3].reset();
/* 088 */
/* 089 */         range_mutableStateArray_0[3].write(0, bhj_expr_0_0);
/* 090 */
/* 091 */         range_mutableStateArray_0[3].write(1, bhj_value_2);
/* 092 */         append((range_mutableStateArray_0[3].getRow()));
/* 093 */
/* 094 */       }
/* 095 */     }
/* 096 */
/* 097 */   }
/* 098 */
/* 099 */   protected void processNext() throws java.io.IOException {
/* 100 */     // initialize Range
/* 101 */     if (!range_initRange_0) {
/* 102 */       range_initRange_0 = true;
/* 103 */       initRange(partitionIndex);
/* 104 */     }
/* 105 */
/* 106 */     while (true) {
/* 107 */       if (range_nextIndex_0 == range_batchEnd_0) {
/* 108 */         long range_nextBatchTodo_0;
/* 109 */         if (range_numElementsTodo_0 > 1000L) {
/* 110 */           range_nextBatchTodo_0 = 1000L;
/* 111 */           range_numElementsTodo_0 -= 1000L;
/* 112 */         } else {
/* 113 */           range_nextBatchTodo_0 = range_numElementsTodo_0;
/* 114 */           range_numElementsTodo_0 = 0;
/* 115 */           if (range_nextBatchTodo_0 == 0) break;
/* 116 */         }
/* 117 */         range_batchEnd_0 += range_nextBatchTodo_0 * 1L;
/* 118 */       }
/* 119 */
/* 120 */       int range_localEnd_0 = (int)((range_batchEnd_0 - range_nextIndex_0) / 1L);
/* 121 */       for (int range_localIdx_0 = 0; range_localIdx_0 < range_localEnd_0; range_localIdx_0++) {
/* 122 */         long range_value_0 = ((long)range_localIdx_0 * 1L) + range_nextIndex_0;
/* 123 */
/* 124 */         bhj_doConsume_0(range_value_0);
/* 125 */
/* 126 */         if (shouldStop()) {
/* 127 */           range_nextIndex_0 = range_value_0 + 1L;
/* 128 */           ((org.apache.spark.sql.execution.metric.SQLMetric) references[0] /* numOutputRows */).add(range_localIdx_0 + 1);
/* 129 */           range_inputMetrics_0.incRecordsRead(range_localIdx_0 + 1);
/* 130 */           return;
/* 131 */         }
/* 132 */
/* 133 */       }
/* 134 */       range_nextIndex_0 = range_batchEnd_0;
/* 135 */       ((org.apache.spark.sql.execution.metric.SQLMetric) references[0] /* numOutputRows */).add(range_localEnd_0);
/* 136 */       range_inputMetrics_0.incRecordsRead(range_localEnd_0);
/* 137 */       range_taskContext_0.killTaskIfInterrupted();
/* 138 */     }
/* 139 */   }
/* 140 */
/* 141 */ }
```

### Why are the changes needed?

Codegen shuffled hash join can help save CPU cost. We added shuffled hash join codegen internally in our fork, and seeing obvious improvement in benchmark compared to current non-codegen code path.

Test example query in [`JoinBenchmark`](https://github.com/apache/spark/blob/master/sql/core/src/test/scala/org/apache/spark/sql/execution/benchmark/JoinBenchmark.scala#L153), seeing 30% wall clock time improvement compared to existing non-codegen code path:

Enable shuffled hash join code-gen:

```
Running benchmark: shuffle hash join
  Running case: shuffle hash join wholestage off
  Stopped after 2 iterations, 1358 ms
  Running case: shuffle hash join wholestage on
  Stopped after 5 iterations, 2323 ms

Java HotSpot(TM) 64-Bit Server VM 1.8.0_181-b13 on Mac OS X 10.15.4
Intel(R) Core(TM) i9-9980HK CPU  2.40GHz
shuffle hash join:                        Best Time(ms)   Avg Time(ms)   Stdev(ms)    Rate(M/s)   Per Row(ns)   Relative
------------------------------------------------------------------------------------------------------------------------
shuffle hash join wholestage off                    649            679          43          6.5         154.7       1.0X
shuffle hash join wholestage on                     436            465          45          9.6         103.9       1.5X
```

Disable shuffled hash join codegen:

```
Running benchmark: shuffle hash join
  Running case: shuffle hash join wholestage off
  Stopped after 2 iterations, 1345 ms
  Running case: shuffle hash join wholestage on
  Stopped after 5 iterations, 2967 ms

Java HotSpot(TM) 64-Bit Server VM 1.8.0_181-b13 on Mac OS X 10.15.4
Intel(R) Core(TM) i9-9980HK CPU  2.40GHz
shuffle hash join:                        Best Time(ms)   Avg Time(ms)   Stdev(ms)    Rate(M/s)   Per Row(ns)   Relative
------------------------------------------------------------------------------------------------------------------------
shuffle hash join wholestage off                    646            673          37          6.5         154.1       1.0X
shuffle hash join wholestage on                     549            594          47          7.6         130.9       1.2X
```

### Does this PR introduce _any_ user-facing change?

No.

### How was this patch tested?

Added unit test in `WholeStageCodegenSuite`.

Closes #29277 from c21/codegen.

Authored-by: Cheng Su <chengsu@fb.com>
Signed-off-by: Wenchen Fan <wenchen@databricks.com>
2020-07-31 05:51:57 +00:00
Gabor Somogyi f6027827a4 [SPARK-32482][SS][TESTS] Eliminate deprecated poll(long) API calls to avoid infinite wait in tests
### What changes were proposed in this pull request?
Structured Streaming Kafka connector tests are now using a deprecated `poll(long)` API which could cause infinite wait. In this PR I've eliminated these calls and replaced them with `AdminClient`.

### Why are the changes needed?
Deprecated `poll(long)` API calls.

### Does this PR introduce _any_ user-facing change?
No.

### How was this patch tested?
Existing unit tests.

Closes #29289 from gaborgsomogyi/SPARK-32482.

Authored-by: Gabor Somogyi <gabor.g.somogyi@gmail.com>
Signed-off-by: Jungtaek Lim (HeartSaVioR) <kabhwan.opensource@gmail.com>
2020-07-31 13:40:33 +09:00
Kousuke Saruta 9d7b1d935f [SPARK-32175][SPARK-32175][FOLLOWUP] Remove flaky test added in
### What changes were proposed in this pull request?

This PR removes a test added in SPARK-32175(#29002).

### Why are the changes needed?

That test is flaky. It can be mitigated by increasing the timeout but it would rather be simpler to remove the test.
See also the [discussion](https://github.com/apache/spark/pull/29002#issuecomment-666746857).

### Does this PR introduce _any_ user-facing change?

No.

Closes #29314 from sarutak/remove-flaky-test.

Authored-by: Kousuke Saruta <sarutak@oss.nttdata.com>
Signed-off-by: Kousuke Saruta <sarutak@oss.nttdata.com>
2020-07-31 10:37:05 +09:00
Devesh Agrawal 6032c5b032 [SPARK-32417] Fix flakyness of BlockManagerDecommissionIntegrationSuite
### What changes were proposed in this pull request?

This test tries to fix the flakyness of BlockManagerDecommissionIntegrationSuite.

### Description of the problem

Make the block manager decommissioning test be less flaky

An interesting failure happens when migrateDuring = true (and persist or shuffle is true):
- We schedule the job with tasks on executors 0, 1, 2.
- We wait 300 ms and decommission executor 0.
- If the task is not yet done on executor 0, it will now fail because
   the block manager won't be able to save the block. This condition is
   easy to trigger on a loaded machine where the github checks run.
- The task with retry on a different executor (1 or 2) and its shuffle
   blocks will land there.
- No actual block migration happens here because the decommissioned
   executor technically failed before it could even produce a block.

To remove the above race, this change replaces the fixed wait for 300 ms to wait for an actual task to succeed. When a task has succeeded, we know its blocks would have been written for sure and thus its executor would certainly be forced to migrate those blocks when it is decommissioned.

The change always decommissions an executor on which a real task finished successfully instead of picking the first executor. Because the system may choose to schedule nothing on the first executor and instead run the two tasks on one executor.

### Why are the changes needed?

I have had bad luck with BlockManagerDecommissionIntegrationSuite and it has failed several times on my PRs. So fixing it.

### Does this PR introduce _any_ user-facing change?

No, unit test only change.

### How was this patch tested?

Github checks. Ran this test 100 times, 10 at a time in parallel in a script.

Closes #29226 from agrawaldevesh/block-manager-decom-flaky.

Authored-by: Devesh Agrawal <devesh.agrawal@gmail.com>
Signed-off-by: Holden Karau <hkarau@apple.com>
2020-07-30 12:00:19 -07:00
Devesh Agrawal 366a178933 [SPARK-32199][SPARK-32198] Reduce job failures during decommissioning
### What changes were proposed in this pull request?

This PR reduces the prospect of a job loss during decommissioning. It
fixes two holes in the current decommissioning framework:

- (a) Loss of decommissioned executors is not treated as a job failure:
We know that the decommissioned executor would be dying soon, so its death is
clearly not caused by the application.

- (b) Shuffle files on the decommissioned host are cleared when the
first fetch failure is detected from a decommissioned host: This is a
bit tricky in terms of when to clear the shuffle state ? Ideally you
want to clear it the millisecond before the shuffle service on the node
dies (or the executor dies when there is no external shuffle service) --
too soon and it could lead to some wastage and too late would lead to
fetch failures.

  The approach here is to do this clearing when the very first fetch
failure is observed on the decommissioned block manager, without waiting for
other blocks to also signal a failure.

### Why are the changes needed?

Without them decommissioning a lot of executors at a time leads to job failures.

### Code overview

The task scheduler tracks the executors that were decommissioned along with their
`ExecutorDecommissionInfo`. This information is used by: (a) For handling a `ExecutorProcessLost` error, or (b) by the `DAGScheduler` when handling a fetch failure.

### Does this PR introduce _any_ user-facing change?

No

### How was this patch tested?

Added a new unit test `DecommissionWorkerSuite` to test the new behavior by exercising the Master-Worker decommissioning. I chose to add a new test since the setup logic was quite different from the existing `WorkerDecommissionSuite`. I am open to changing the name of the newly added test suite :-)

### Questions for reviewers
- Should I add a feature flag to guard these two behaviors ? They seem safe to me that they should only get triggered by decommissioning, but you never know :-).

Closes #29014 from agrawaldevesh/decom_harden.

Authored-by: Devesh Agrawal <devesh.agrawal@gmail.com>
Signed-off-by: Holden Karau <hkarau@apple.com>
2020-07-30 11:58:11 -07:00
Dongjoon Hyun 7cf3b54a2a [SPARK-32489][CORE] Pass core module UTs in Scala 2.13
### What changes were proposed in this pull request?

So far, we fixed many stuffs in `core` module. This PR fixes the remaining UT failures in Scala 2.13.

- `OneApplicationResource.environmentInfo` will return a deterministic result for `sparkProperties`, `hadoopProperties`, `systemProperties`, and `classpathEntries`.
- `SubmitRestProtocolSuite` has Scala 2.13 answer in addition to the existing Scala 2.12 answer, and uses the expected answer based on the Scala runtime version.

### Why are the changes needed?

To support Scala 2.13.

### Does this PR introduce _any_ user-facing change?

Yes, `environmentInfo` is changed, but this fixes the indeterministic behavior.

### How was this patch tested?

- Scala 2.12: Pass the Jenkins or GitHub Action
- Scala 2.13: Do the following.
```
$ dev/change-scala-version.sh 2.13
$ build/mvn test -pl core --am -Pscala-2.13
```

**BEFORE**
```
Tests: succeeded 2612, failed 3, canceled 1, ignored 8, pending 0
*** 3 TESTS FAILED ***
```

**AFTER**
```
Tests: succeeded 2615, failed 0, canceled 1, ignored 8, pending 0
All tests passed.
```

Closes #29298 from dongjoon-hyun/SPARK-32489.

Authored-by: Dongjoon Hyun <dongjoon@apache.org>
Signed-off-by: Dongjoon Hyun <dongjoon@apache.org>
2020-07-30 10:59:26 -07:00
HyukjinKwon 32f4ef005f [SPARK-32497][INFRA] Installs qpdf package for CRAN check in GitHub Actions
### What changes were proposed in this pull request?

CRAN check fails due to the size of the generated PDF docs as below:

```
...
 WARNING
‘qpdf’ is needed for checks on size reduction of PDFs
...
Status: 1 WARNING, 1 NOTE
See
  ‘/home/runner/work/spark/spark/R/SparkR.Rcheck/00check.log’
for details.
```

This PR proposes to install `qpdf` in GitHub Actions.

Note that I cannot reproduce in my local with the same R version so I am not documenting it for now.

Also, while I am here, I piggyback to install SparkR when the module includes `sparkr`. it is rather a followup of SPARK-32491.

### Why are the changes needed?

To fix SparkR CRAN check failure.

### Does this PR introduce _any_ user-facing change?

No, dev-only.

### How was this patch tested?

GitHub Actions will test it out.

Closes #29306 from HyukjinKwon/SPARK-32497.

Authored-by: HyukjinKwon <gurwls223@apache.org>
Signed-off-by: HyukjinKwon <gurwls223@apache.org>
2020-07-31 00:57:24 +09:00
Warren Zhu 7437720952 [SPARK-32227] Fix regression bug in load-spark-env.cmd with Spark 3.0.0
### What changes were proposed in this pull request?
Fix regression bug in load-spark-env.cmd with Spark 3.0.0

### Why are the changes needed?
cmd doesn't support set env twice. So set `SPARK_ENV_CMD=%SPARK_CONF_DIR%\%SPARK_ENV_CMD%` doesn't take effect, which caused regression.

### Does this PR introduce _any_ user-facing change?
No

### How was this patch tested?
Manually tested.
1. Create a spark-env.cmd under conf folder. Inside this, `echo spark-env.cmd`
2. Run old load-spark-env.cmd, nothing printed in the output
2. Run fixed load-spark-env.cmd, `spark-env.cmd` showed in the output.

Closes #29044 from warrenzhu25/32227.

Lead-authored-by: Warren Zhu <zhonzh@microsoft.com>
Co-authored-by: Warren Zhu <warren.zhu25@gmail.com>
Signed-off-by: HyukjinKwon <gurwls223@apache.org>
2020-07-30 21:44:49 +09:00
HyukjinKwon 12f443cd99 [SPARK-32496][INFRA] Include GitHub Action file as the changes in testing
### What changes were proposed in this pull request?

https://github.com/apache/spark/pull/26556 excluded `.github/workflows/master.yml`. So tests are skipped if the GitHub Actions configuration file is changed.

As of SPARK-32245, we now run the regular tests via the testing script. We should include it to test to make sure GitHub Actions build does not break due to some changes such as Python versions.

### Why are the changes needed?

For better test coverage in GitHub Actions build.

### Does this PR introduce _any_ user-facing change?

No, dev-only.

### How was this patch tested?

GitHub Actions in this PR will test.

Closes #29305 from HyukjinKwon/SPARK-32496.

Authored-by: HyukjinKwon <gurwls223@apache.org>
Signed-off-by: HyukjinKwon <gurwls223@apache.org>
2020-07-30 21:07:31 +09:00
HyukjinKwon e0c8bd07af [SPARK-32493][INFRA] Manually install R instead of using setup-r in GitHub Actions
### What changes were proposed in this pull request?

This PR proposes to manually install R instead of using `setup-r` which seems broken. Currently, GitHub Actions uses its default R 3.4.4 installed, which we dropped as of SPARK-32073.

While I am here, I am also upgrading R version to 4.0. Jenkins will test the old version and GitHub Actions tests the new version. AppVeyor uses R 4.0 but it does not check CRAN which is important when we make a release.

### Why are the changes needed?

To recover GitHub Actions build.

### Does this PR introduce _any_ user-facing change?

No, dev-only

### How was this patch tested?

Manually tested at https://github.com/HyukjinKwon/spark/pull/15

Closes #29302 from HyukjinKwon/SPARK-32493.

Authored-by: HyukjinKwon <gurwls223@apache.org>
Signed-off-by: HyukjinKwon <gurwls223@apache.org>
2020-07-30 20:06:35 +09:00