Commit graph

26032 commits

Author SHA1 Message Date
“attilapiros” fd2bf55aba [SPARK-27651][CORE] Avoid the network when shuffle blocks are fetched from the same host
## What changes were proposed in this pull request?

Before this PR `ShuffleBlockFetcherIterator` was partitioning the block fetches into two distinct sets: local reads and remote fetches. Within this PR (when the feature is enabled by "spark.shuffle.readHostLocalDisk.enabled") a new category is introduced: host-local reads. They are shuffle block fetches where although the block manager is different they are running on the same host along with the requester.

Moreover to get the local directories of the other executors/block managers a new RPC message is introduced `GetLocalDirs` which is sent the the block manager master where it is answered as `BlockManagerLocalDirs`. In `BlockManagerMasterEndpoint` for answering this request the `localDirs` is extracted from the `BlockManagerInfo` and stored separately in a hash map called `executorIdLocalDirs`. Because the earlier used `blockManagerInfo` contains data for the alive block managers (see `org.apache.spark.storage.BlockManagerMasterEndpoint#removeBlockManager`).

Now `executorIdLocalDirs` knows all the local dirs up to the application start (like the external shuffle service does) so in case of an RDD recalculation both host-local shuffle blocks and disk persisted RDD blocks on the same host can be served by reading the files behind the blocks directly.

## How was this patch tested?

### Unit tests

`ExternalShuffleServiceSuite`:
- "SPARK-27651: host local disk reading avoids external shuffle service on the same node"

`ShuffleBlockFetcherIteratorSuite`:
- "successful 3 local reads + 4 host local reads + 2 remote reads"

And with extending existing suites where shuffle metrics was tested.

### Manual tests

Running Spark on YARN in a 4 nodes cluster with 6 executors and having 12 shuffle blocks.

```
$ grep host-local experiment.log
19/07/30 03:57:12 INFO storage.ShuffleBlockFetcherIterator: Getting 12 (1496.8 MB) non-empty blocks including 2 (299.4 MB) local blocks and 2 (299.4 MB) host-local blocks and 8 (1197.4 MB) remote blocks
19/07/30 03:57:12 DEBUG storage.ShuffleBlockFetcherIterator: Start fetching host-local blocks: shuffle_0_2_1, shuffle_0_6_1
19/07/30 03:57:12 DEBUG storage.ShuffleBlockFetcherIterator: Got host-local blocks in 38 ms
19/07/30 03:57:12 INFO storage.ShuffleBlockFetcherIterator: Getting 12 (1496.8 MB) non-empty blocks including 2 (299.4 MB) local blocks and 2 (299.4 MB) host-local blocks and 8 (1197.4 MB) remote blocks
19/07/30 03:57:12 DEBUG storage.ShuffleBlockFetcherIterator: Start fetching host-local blocks: shuffle_0_0_0, shuffle_0_8_0
19/07/30 03:57:12 DEBUG storage.ShuffleBlockFetcherIterator: Got host-local blocks in 35 ms
```

Closes #25299 from attilapiros/SPARK-27651.

Authored-by: “attilapiros” <piros.attila.zsolt@gmail.com>
Signed-off-by: Marcelo Vanzin <vanzin@cloudera.com>
2019-11-26 11:02:25 -08:00
Sean Owen e23c135e56 [SPARK-29293][BUILD] Move scalafmt to Scala 2.12 profile; bump to 0.12
### What changes were proposed in this pull request?

Move scalafmt to Scala 2.12 profile; bump to 0.12.

### Why are the changes needed?

To facilitate a future Scala 2.13 build.

### Does this PR introduce any user-facing change?

None.

### How was this patch tested?

This isn't covered by tests, it's a convenience for contributors.

Closes #26655 from srowen/SPARK-29293.

Authored-by: Sean Owen <sean.owen@databricks.com>
Signed-off-by: Dongjoon Hyun <dhyun@apple.com>
2019-11-26 09:59:19 -08:00
Kent Yao ed0c33fdd4 [SPARK-30026][SQL] Whitespaces can be identified as delimiters in interval string
### What changes were proposed in this pull request?

We are now able to handle whitespaces for integral and fractional types, and the leading or trailing whitespaces for interval, date, and timestamps. But the current interval parser is not able to identify whitespaces as separates as PostgreSQL can do.

This PR makes the whitespaces handling be consistent for nterval values.
Typed interval literal, multi-unit representation, and casting from strings are all supported.

```sql
postgres=# select interval E'1 \t day';
 interval
----------
 1 day
(1 row)

postgres=# select interval E'1\t' day;
 interval
----------
 1 day
(1 row)
```

### Why are the changes needed?

Whitespace handling should be consistent for interval value, and across different types in Spark.
PostgreSQL feature parity.

### Does this PR introduce any user-facing change?
Yes, the interval string of multi-units values which separated by whitespaces can be valid now.

### How was this patch tested?
add ut.

Closes #26662 from yaooqinn/SPARK-30026.

Authored-by: Kent Yao <yaooqinn@hotmail.com>
Signed-off-by: Wenchen Fan <wenchen@databricks.com>
2019-11-27 01:20:38 +08:00
Sean Owen 29018025ba [SPARK-30009][CORE][SQL] Support different floating-point Ordering for Scala 2.12 / 2.13
### What changes were proposed in this pull request?

Make separate source trees for Scala 2.12/2.13 in order to accommodate mutually-incompatible support for Ordering of double, float.

Note: This isn't the last change that will need a split source tree for 2.13. But this particular change could go several ways:

- (Split source tree)
- Inline the Scala 2.12 implementation
- Reflection

For this change alone any are possible, and splitting the source tree is a bit overkill. But if it will be necessary for other JIRAs (see umbrella SPARK-25075), then it might be the easiest way to implement this.

### Why are the changes needed?

Scala 2.13 split Ordering.Double into Ordering.Double.TotalOrdering and Ordering.Double.IeeeOrdering. Neither can be used in a single build that supports 2.12 and 2.13.

TotalOrdering works like java.lang.Double.compare. IeeeOrdering works like Scala 2.12 Ordering.Double. They differ in how NaN is handled - compares always above other values? or always compares as 'false'? In theory they have different uses: TotalOrdering is important if floating-point values are sorted. IeeeOrdering behaves like 2.12 and JVM comparison operators.

I chose TotalOrdering as I think we care more about stable sorting, and because elsewhere we rely on java.lang comparisons. It is also possible to support with two methods.

### Does this PR introduce any user-facing change?

Pending tests, will see if it obviously affects any sort order. We need to see if it changes NaN sort order.

### How was this patch tested?

Existing tests so far.

Closes #26654 from srowen/SPARK-30009.

Authored-by: Sean Owen <sean.owen@databricks.com>
Signed-off-by: Dongjoon Hyun <dhyun@apple.com>
2019-11-26 08:25:53 -08:00
wuyi 7b1b60c758 [SPARK-28574][CORE][FOLLOW-UP] Several minor improvements for event queue capacity config
### What changes were proposed in this pull request?

* Replace hard-coded conf `spark.scheduler.listenerbus.eventqueue` with a constant variable(`LISTENER_BUS_EVENT_QUEUE_PREFIX `) defined in `config/package.scala`.

* Update documentation for `spark.scheduler.listenerbus.eventqueue.capacity` in both `config/package.scala` and `docs/configuration.md`.

### Why are the changes needed?

* Better code maintainability

* Better user guidance of the conf

### Does this PR introduce any user-facing change?

No behavior changes but user will see the updated document.

### How was this patch tested?

Pass Jenkins.

Closes #26676 from Ngone51/SPARK-28574-followup.

Authored-by: wuyi <ngone_5451@163.com>
Signed-off-by: Dongjoon Hyun <dhyun@apple.com>
2019-11-26 08:20:26 -08:00
Dongjoon Hyun c2d513f8e9 [SPARK-30035][BUILD] Upgrade to Apache Commons Lang 3.9
### What changes were proposed in this pull request?

This PR aims to upgrade to `Apache Commons Lang 3.9`.

### Why are the changes needed?

`Apache Commons Lang 3.9` is the first official release to support JDK9+. The following is the full release note.
- https://commons.apache.org/proper/commons-lang/release-notes/RELEASE-NOTES-3.9.txt

### Does this PR introduce any user-facing change?

No.

### How was this patch tested?

Pass the Jenkins with the existing tests.

Closes #26672 from dongjoon-hyun/SPARK-30035.

Authored-by: Dongjoon Hyun <dhyun@apple.com>
Signed-off-by: HyukjinKwon <gurwls223@apache.org>
2019-11-26 21:31:02 +09:00
Dongjoon Hyun 9b9d130f15 [SPARK-30030][BUILD][FOLLOWUP] Remove unused org.apache.commons.lang
### What changes were proposed in this pull request?

This PR aims to remove the unused test dependency `commons-lang:commons-lang` from `core` module.

### Why are the changes needed?

SPARK-30030 already removed all usage of `Apache Commons Lang2` in `core`.

### Does this PR introduce any user-facing change?

No.

### How was this patch tested?

Pass the Jenkins.

Closes #26673 from dongjoon-hyun/SPARK-30030-2.

Authored-by: Dongjoon Hyun <dhyun@apple.com>
Signed-off-by: HyukjinKwon <gurwls223@apache.org>
2019-11-26 20:55:02 +09:00
Huaxin Gao 373c2c3f44 [SPARK-29862][SQL] CREATE (OR REPLACE) ... VIEW should look up catalog/table like v2 commands
### What changes were proposed in this pull request?
Add CreateViewStatement and make CREARE VIEW  go through the same catalog/table resolution framework of v2 commands.

### Why are the changes needed?
It's important to make all the commands have the same table resolution behavior, to avoid confusing end-users. e.g.
```
USE my_catalog
DESC v // success and describe the view v from my_catalog
CREATE VIEW v AS SELECT 1 // report view not found as there is no view v in the session catalog
```
### Does this PR introduce any user-facing change?
Yes. When running CREATE VIEW ...  Spark fails the command if the current catalog is set to a v2 catalog, or the view name specified a v2 catalog.

### How was this patch tested?
unit tests

Closes #26649 from huaxingao/spark-29862.

Authored-by: Huaxin Gao <huaxing@us.ibm.com>
Signed-off-by: Wenchen Fan <wenchen@databricks.com>
2019-11-26 14:10:46 +08:00
wuyi 780555bf60 [MINOR][CORE] Make EventLogger codec be consistent between EventLogFileWriter and SparkContext
### What changes were proposed in this pull request?

Use the same function (`codecName(conf: SparkConf)`) between `EventLogFileWriter` and `SparkContext` to get the consistent codec name for EventLogger.

### Why are the changes needed?

#24921 added a new conf for EventLogger's compression codec. We should reflect this change into `SparkContext` as well. Though I didn't find any places that `SparkContext.eventLogCodec` really takes an effect, I think it'd be better to have it as a right value.

### Does this PR introduce any user-facing change?

No.

### How was this patch tested?

Pass Jenkins.

Closes #26665 from Ngone51/consistent-eventLogCodec.

Authored-by: wuyi <ngone_5451@163.com>
Signed-off-by: Wenchen Fan <wenchen@databricks.com>
2019-11-26 12:54:34 +08:00
Kent Yao 8b0121bea8 [MINOR][DOC] Fix the CalendarIntervalType description
### What changes were proposed in this pull request?

fix the overdue and incorrect description about CalendarIntervalType

### Why are the changes needed?

api doc correctness

### Does this PR introduce any user-facing change?

no
### How was this patch tested?

no

Closes #26659 from yaooqinn/intervaldoc.

Authored-by: Kent Yao <yaooqinn@hotmail.com>
Signed-off-by: Wenchen Fan <wenchen@databricks.com>
2019-11-26 12:49:56 +08:00
Dongjoon Hyun 53e19f3678 [SPARK-30032][BUILD] Upgrade to ORC 1.5.8
### What changes were proposed in this pull request?

This PR aims to upgrade to Apache ORC 1.5.8.

### Why are the changes needed?

This will bring the latest bug fixes. The following is the full release note.
- https://issues.apache.org/jira/projects/ORC/versions/12346462

### Does this PR introduce any user-facing change?

No.

### How was this patch tested?

Pass the Jenkins with the existing tests.

Closes #26669 from dongjoon-hyun/SPARK-ORC-1.5.8.

Authored-by: Dongjoon Hyun <dhyun@apple.com>
Signed-off-by: Dongjoon Hyun <dhyun@apple.com>
2019-11-25 20:08:11 -08:00
Dongjoon Hyun 2a28c73d81 [SPARK-30031][BUILD][SQL] Remove hive-2.3 profile from sql/hive module
### What changes were proposed in this pull request?

This PR aims to remove `hive-2.3` profile from `sql/hive` module.

### Why are the changes needed?

Currently, we need `-Phive-1.2` or `-Phive-2.3` additionally to build `hive` or `hive-thriftserver` module. Without specifying it, the build fails like the following. This PR will recover it.
```
$ build/mvn -DskipTests compile --pl sql/hive
...
[ERROR] [Error] /Users/dongjoon/APACHE/spark-merge/sql/hive/src/main/scala/org/apache/spark/sql/hive/HiveExternalCatalog.scala:32: object serde is not a member of package org.apache.hadoop.hive
```

### Does this PR introduce any user-facing change?

No.

### How was this patch tested?

1. Pass GitHub Action dependency check with no manifest change.
2. Pass GitHub Action build for all combinations.
3. Pass the Jenkins UT.

Closes #26668 from dongjoon-hyun/SPARK-30031.

Authored-by: Dongjoon Hyun <dhyun@apple.com>
Signed-off-by: Dongjoon Hyun <dhyun@apple.com>
2019-11-25 15:17:27 -08:00
Dongjoon Hyun 38240a74dc [SPARK-30030][INFRA] Use RegexChecker instead of TokenChecker to check org.apache.commons.lang.
### What changes were proposed in this pull request?

This PR replace `TokenChecker` with `RegexChecker` in `scalastyle` and fixes the missed instances.

### Why are the changes needed?

This will remove the old `comons-lang2` dependency from `core` module

**BEFORE**
```
$ dev/scalastyle
Scalastyle checks failed at following occurrences:
[error] /Users/dongjoon/PRS/SPARK-SerializationUtils/core/src/test/scala/org/apache/spark/util/PropertiesCloneBenchmark.scala:23:7: Use Commons Lang 3 classes (package org.apache.commons.lang3.*) instead
[error]     of Commons Lang 2 (package org.apache.commons.lang.*)
[error] Total time: 23 s, completed Nov 25, 2019 11:47:44 AM
```

**AFTER**
```
$ dev/scalastyle
Scalastyle checks passed.
```

### Does this PR introduce any user-facing change?

No.

### How was this patch tested?

Pass the GitHub Action linter.

Closes #26666 from dongjoon-hyun/SPARK-29081-2.

Authored-by: Dongjoon Hyun <dhyun@apple.com>
Signed-off-by: Dongjoon Hyun <dhyun@apple.com>
2019-11-25 12:03:15 -08:00
Dongjoon Hyun 1466863cee [SPARK-30015][BUILD] Move hive-storage-api dependency from hive-2.3 to sql/core
# What changes were proposed in this pull request?

This PR aims to relocate the following internal dependencies to compile `sql/core` without `-Phive-2.3` profile.

1. Move the `hive-storage-api` to `sql/core` which is using `hive-storage-api` really.

**BEFORE (sql/core compilation)**
```
$ ./build/mvn -DskipTests --pl sql/core --am compile
...
[ERROR] [Error] /Users/dongjoon/APACHE/spark/sql/core/v2.3/src/main/scala/org/apache/spark/sql/execution/datasources/orc/OrcFilters.scala:21: object hive is not a member of package org.apache.hadoop
...
[INFO] ------------------------------------------------------------------------
[INFO] BUILD FAILURE
[INFO] ------------------------------------------------------------------------
```
**AFTER (sql/core compilation)**
```
$ ./build/mvn -DskipTests --pl sql/core --am compile
...
[INFO] ------------------------------------------------------------------------
[INFO] BUILD SUCCESS
[INFO] ------------------------------------------------------------------------
[INFO] Total time:  02:04 min
[INFO] Finished at: 2019-11-25T00:20:11-08:00
[INFO] ------------------------------------------------------------------------
```

2. For (1), add `commons-lang:commons-lang` test dependency to `spark-core` module to manage the dependency explicitly. Without this, `core` module fails to build the test classes.

```
$ ./build/mvn -DskipTests --pl core --am package -Phadoop-3.2
...
[INFO] --- scala-maven-plugin:4.3.0:testCompile (scala-test-compile-first)  spark-core_2.12 ---
[INFO] Using incremental compilation using Mixed compile order
[INFO] Compiler bridge file: /Users/dongjoon/.sbt/1.0/zinc/org.scala-sbt/org.scala-sbt-compiler-bridge_2.12-1.3.1-bin_2.12.10__52.0-1.3.1_20191012T045515.jar
[INFO] Compiling 271 Scala sources and 26 Java sources to /spark/core/target/scala-2.12/test-classes ...
[ERROR] [Error] /spark/core/src/test/scala/org/apache/spark/util/PropertiesCloneBenchmark.scala:23: object lang is not a member of package org.apache.commons
[ERROR] [Error] /spark/core/src/test/scala/org/apache/spark/util/PropertiesCloneBenchmark.scala:49: not found: value SerializationUtils
[ERROR] two errors found
```

**BEFORE (commons-lang:commons-lang)**
The following is the previous `core` module's `commons-lang:commons-lang` dependency.

1. **branch-2.4**
```
$ mvn dependency:tree -Dincludes=commons-lang:commons-lang
[INFO] --- maven-dependency-plugin:3.0.2:tree (default-cli)  spark-core_2.11 ---
[INFO] org.apache.spark:spark-core_2.11🫙2.4.5-SNAPSHOT
[INFO] \- org.spark-project.hive:hive-exec:jar:1.2.1.spark2:provided
[INFO]    \- commons-lang:commons-lang:jar:2.6:compile
```

2. **v3.0.0-preview (-Phadoop-3.2)**
```
$ mvn dependency:tree -Dincludes=commons-lang:commons-lang -Phadoop-3.2
[INFO] --- maven-dependency-plugin:3.1.1:tree (default-cli)  spark-core_2.12 ---
[INFO] org.apache.spark:spark-core_2.12🫙3.0.0-preview
[INFO] \- org.apache.hive:hive-storage-api:jar:2.6.0:compile
[INFO]    \- commons-lang:commons-lang:jar:2.6:compile
```

3. **v3.0.0-preview(default)**
```
$ mvn dependency:tree -Dincludes=commons-lang:commons-lang
[INFO] --- maven-dependency-plugin:3.1.1:tree (default-cli)  spark-core_2.12 ---
[INFO] org.apache.spark:spark-core_2.12🫙3.0.0-preview
[INFO] \- org.apache.hadoop:hadoop-client:jar:2.7.4:compile
[INFO]    \- org.apache.hadoop:hadoop-common:jar:2.7.4:compile
[INFO]       \- commons-lang:commons-lang:jar:2.6:compile
```

**AFTER (commons-lang:commons-lang)**
```
$ mvn dependency:tree -Dincludes=commons-lang:commons-lang
[INFO] --- maven-dependency-plugin:3.1.1:tree (default-cli)  spark-core_2.12 ---
[INFO] org.apache.spark:spark-core_2.12🫙3.0.0-SNAPSHOT
[INFO] \- commons-lang:commons-lang:jar:2.6:test
```

Since we wanted to verify that this PR doesn't change `hive-1.2` profile, we merged
[SPARK-30005 Update `test-dependencies.sh` to check `hive-1.2/2.3` profile](a1706e2fa7) before this PR.

### Why are the changes needed?

- Apache Spark 2.4's `sql/core` is using `Apache ORC (nohive)` jars including shaded `hive-storage-api` to access ORC data sources.

- Apache Spark 3.0's `sql/core` is using `Apache Hive` jars directly. Previously, `-Phadoop-3.2` hid this `hive-storage-api` dependency. Now, we are using `-Phive-2.3` instead. As I mentioned [previously](https://github.com/apache/spark/pull/26619#issuecomment-556926064), this PR is required to compile `sql/core` module without `-Phive-2.3`.

- For `sql/hive` and `sql/hive-thriftserver`, it's natural that we need `-Phive-1.2` or `-Phive-2.3`.

### Does this PR introduce any user-facing change?

No.

### How was this patch tested?

This will pass the Jenkins (with the dependency check and unit tests).

We need to check manually with `./build/mvn -DskipTests --pl sql/core --am compile`.

This closes #26657 .

Closes #26658 from dongjoon-hyun/SPARK-30015.

Authored-by: Dongjoon Hyun <dhyun@apple.com>
Signed-off-by: Dongjoon Hyun <dhyun@apple.com>
2019-11-25 10:54:14 -08:00
shahid bec2068ae8 [SPARK-26260][CORE] For disk store tasks summary table should show only successful tasks summary
…sks metrics for disk store

### What changes were proposed in this pull request?

After https://github.com/apache/spark/pull/23088 task Summary table in the stage page shows successful tasks metrics for lnMemory store. In this PR, it added for disk store also.

### Why are the changes needed?

Now both InMemory and disk store will be consistent in showing the task summary table in the UI, if there are non successful tasks

### Does this PR introduce any user-facing change?

no
### How was this patch tested?

Added UT. Manually verified

Test steps:
1. add the config in spark-defaults.conf -> **spark.history.store.path /tmp/store**
2. sbin/start-hitoryserver
3. bin/spark-shell
4. `sc.parallelize(1 to 1000, 2).map(x => throw new Exception("fail")).count`

![Screenshot 2019-11-14 at 3 51 39 AM](https://user-images.githubusercontent.com/23054875/68809546-268d2e80-0692-11ea-8b2c-bee767478135.png)

Closes #26508 from shahidki31/task.

Authored-by: shahid <shahidki31@gmail.com>
Signed-off-by: Marcelo Vanzin <vanzin@cloudera.com>
2019-11-25 10:04:25 -08:00
fuwhu 29ebd9336c [SPARK-29979][SQL] Add basic/reserved property key constants in TableCatalog and SupportsNamespaces
### What changes were proposed in this pull request?
Add "comment" and "location" property key constants in TableCatalog and SupportNamespaces.
And update code of implementation classes to use these constants instead of hard code.

### Why are the changes needed?
Currently, some basic/reserved keys (eg. "location", "comment") of table and namespace properties are hard coded or defined in specific logical plan implementation class.
These keys can be centralized in TableCatalog and SupportsNamespaces interface and shared across different implementation classes.

### Does this PR introduce any user-facing change?
no

### How was this patch tested?
Existing unit test

Closes #26617 from fuwhu/SPARK-29979.

Authored-by: fuwhu <bestwwg@163.com>
Signed-off-by: Wenchen Fan <wenchen@databricks.com>
2019-11-26 01:24:43 +08:00
Terry Kim f09c1a36c4 [SPARK-29890][SQL] DataFrameNaFunctions.fill should handle duplicate columns
### What changes were proposed in this pull request?

`DataFrameNaFunctions.fill` doesn't handle duplicate columns even when column names are not specified.

```Scala
val left = Seq(("1", null), ("3", "4")).toDF("col1", "col2")
val right = Seq(("1", "2"), ("3", null)).toDF("col1", "col2")
val df = left.join(right, Seq("col1"))
df.printSchema
df.na.fill("hello").show
```
produces
```
root
 |-- col1: string (nullable = true)
 |-- col2: string (nullable = true)
 |-- col2: string (nullable = true)

org.apache.spark.sql.AnalysisException: Reference 'col2' is ambiguous, could be: col2, col2.;
  at org.apache.spark.sql.catalyst.expressions.package$AttributeSeq.resolve(package.scala:259)
  at org.apache.spark.sql.catalyst.plans.logical.LogicalPlan.resolveQuoted(LogicalPlan.scala:121)
  at org.apache.spark.sql.Dataset.resolve(Dataset.scala:221)
  at org.apache.spark.sql.Dataset.col(Dataset.scala:1268)
```
The reason for the above failure is that columns are looked up with `DataSet.col()` which tries to resolve a column by name and if there are multiple columns with the same name, it will fail due to ambiguity.

This PR updates `DataFrameNaFunctions.fill` such that if the columns to fill are not specified, it will resolve ambiguity gracefully by applying `fill` to all the eligible columns. (Note that if the user specifies the columns, it will still continue to fail due to ambiguity).

### Why are the changes needed?

If column names are not specified, `fill` should not fail due to ambiguity since it should still be able to apply `fill` to the eligible columns.

### Does this PR introduce any user-facing change?

Yes, now the above example displays the following:
```
+----+-----+-----+
|col1| col2| col2|
+----+-----+-----+
|   1|hello|    2|
|   3|    4|hello|
+----+-----+-----+

```

### How was this patch tested?

Added new unit tests.

Closes #26593 from imback82/na_fill.

Authored-by: Terry Kim <yuminkim@gmail.com>
Signed-off-by: Wenchen Fan <wenchen@databricks.com>
2019-11-26 00:06:19 +08:00
Thomas Graves 2d5de25a99 [SPARK-29415][CORE] Stage Level Sched: Add base ResourceProfile and Request classes
### What changes were proposed in this pull request?

This PR is adding the base classes needed for Stage level scheduling. Its adding a ResourceProfile and the executor and task resource request classes.  These are made private for now until we get all the parts implemented, at which point this will become public interfaces.  I am adding them first as all the other subtasks for this feature require these classes.  If people have better ideas on breaking this feature up please let me know.

See https://issues.apache.org/jira/browse/SPARK-29415 for more detailed design.

### Why are the changes needed?

New API for stage level scheduling.  Its easier to add these first because the other jira for this features will all use them.

### Does this PR introduce any user-facing change?

Yes adds API to create a ResourceProfile with executor/task resources, see the spip jira https://issues.apache.org/jira/browse/SPARK-27495

Example of the api:
val rp = new ResourceProfile()
rp.require(new ExecutorResourceRequest("cores", 2))
rp.require(new ExecutorResourceRequest("gpu", 1, Some("/opt/gpuScripts/getGpus")))
rp.require(new TaskResourceRequest("gpu", 1))

### How was this patch tested?

Tested using Unit tests added with this PR.

Closes #26284 from tgravescs/SPARK-29415.

Authored-by: Thomas Graves <tgraves@nvidia.com>
Signed-off-by: Thomas Graves <tgraves@apache.org>
2019-11-25 09:36:39 -06:00
Wenchen Fan bd9ce83063 [SPARK-29975][SQL][FOLLOWUP] document --CONFIG_DIM
### What changes were proposed in this pull request?

add document to address https://github.com/apache/spark/pull/26612#discussion_r349844327

### Why are the changes needed?

help people understand how to use --CONFIG_DIM

### Does this PR introduce any user-facing change?

no

### How was this patch tested?

N/A

Closes #26661 from cloud-fan/test.

Authored-by: Wenchen Fan <wenchen@databricks.com>
Signed-off-by: Takeshi Yamamuro <yamamuro@apache.org>
2019-11-25 20:45:31 +09:00
Kent Yao de21f28f8a [SPARK-29986][SQL] casting string to date/timestamp/interval should trim all whitespaces
### What changes were proposed in this pull request?

A java like string trim method trims all whitespaces that less or equal than 0x20. currently, our UTF8String handle the space =0x20 ONLY. This is not suitable for many cases in Spark, like trim for interval strings, date, timestamps, PostgreSQL like cast string to boolean.

### Why are the changes needed?

improve the white spaces handling in UTF8String, also with some bugs fixed

### Does this PR introduce any user-facing change?

yes,
string with `control character` at either end can be convert to date/timestamp and interval now

### How was this patch tested?

add ut

Closes #26626 from yaooqinn/SPARK-29986.

Authored-by: Kent Yao <yaooqinn@hotmail.com>
Signed-off-by: Wenchen Fan <wenchen@databricks.com>
2019-11-25 14:37:04 +08:00
wuyi 456cfe6e46 [SPARK-29939][CORE] Add spark.shuffle.mapStatus.compression.codec conf
### What changes were proposed in this pull request?

Add a new conf named `spark.shuffle.mapStatus.compression.codec` for user to decide which codec should be used(default by `zstd`) for `MapStatus` compression.

### Why are the changes needed?

We already have this functionality for `broadcast`/`rdd`/`shuffle`/`shuflleSpill`,
so it might be better to have the same functionality for `MapStatus` as well.

### Does this PR introduce any user-facing change?

Yes, user now could use `spark.shuffle.mapStatus.compression.codec` to decide which codec should be used during `MapStatus` compression.
### How was this patch tested?

N/A

Closes #26611 from Ngone51/SPARK-29939.

Authored-by: wuyi <ngone_5451@163.com>
Signed-off-by: Dongjoon Hyun <dhyun@apple.com>
2019-11-24 21:21:19 -08:00
Kent Yao 5cf475d288 [SPARK-30000][SQL] Trim the string when cast string type to decimals
### What changes were proposed in this pull request?

https://bugs.openjdk.java.net/browse/JDK-8170259
https://bugs.openjdk.java.net/browse/JDK-8170563

When we cast string type to decimal type, we rely on java.math. BigDecimal. It can't accept leading and training spaces, as you can see in the above links. This behavior is not consistent with other numeric types now. we need to fix it and keep consistency.

### Why are the changes needed?

make string to numeric types be consistent

### Does this PR introduce any user-facing change?

yes, string removed trailing or leading white spaces will be able to convert to decimal if the trimmed is valid

### How was this patch tested?

1. modify ut

#### Benchmark
```scala
/*
 * Licensed to the Apache Software Foundation (ASF) under one or more
 * contributor license agreements.  See the NOTICE file distributed with
 * this work for additional information regarding copyright ownership.
 * The ASF licenses this file to You under the Apache License, Version 2.0
 * (the "License"); you may not use this file except in compliance with
 * the License.  You may obtain a copy of the License at
 *
 *    http://www.apache.org/licenses/LICENSE-2.0
 *
 * Unless required by applicable law or agreed to in writing, software
 * distributed under the License is distributed on an "AS IS" BASIS,
 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
 * See the License for the specific language governing permissions and
 * limitations under the License.
 */

package org.apache.spark.sql.execution.benchmark

import org.apache.spark.benchmark.Benchmark

/**
 * Benchmark trim the string when casting string type to Boolean/Numeric types.
 * To run this benchmark:
 * {{{
 *   1. without sbt:
 *      bin/spark-submit --class <this class> --jars <spark core test jar> <spark sql test jar>
 *   2. build/sbt "sql/test:runMain <this class>"
 *   3. generate result: SPARK_GENERATE_BENCHMARK_FILES=1 build/sbt "sql/test:runMain <this class>"
 *      Results will be written to "benchmarks/CastBenchmark-results.txt".
 * }}}
 */
object CastBenchmark extends SqlBasedBenchmark {

  override def runBenchmarkSuite(mainArgs: Array[String]): Unit = {
    val title = "Cast String to Integral"
    runBenchmark(title) {
      withTempPath { dir =>
        val N = 500L << 14
        val df = spark.range(N)
        val types = Seq("decimal")
        (1 to 5).by(2).foreach { i =>
          df.selectExpr(s"concat(id, '${" " * i}') as str")
            .write.mode("overwrite").parquet(dir + i.toString)
        }

        val benchmark = new Benchmark(title, N, minNumIters = 5, output = output)
        Seq(true, false).foreach { trim =>
          types.foreach { t =>
            val str = if (trim) "trim(str)" else "str"
            val expr = s"cast($str as $t) as c_$t"
            (1 to 5).by(2).foreach { i =>
              benchmark.addCase(expr + s" - with $i spaces") { _ =>
                spark.read.parquet(dir + i.toString).selectExpr(expr).collect()
              }
            }
          }
        }
        benchmark.run()
      }
    }
  }
}

```

#### string trim vs not trim
```java
[info] Java HotSpot(TM) 64-Bit Server VM 1.8.0_231-b11 on Mac OS X 10.15.1
[info] Intel(R) Core(TM) i9-9980HK CPU  2.40GHz
[info] Cast String to Integral:                  Best Time(ms)   Avg Time(ms)   Stdev(ms)    Rate(M/s)   Per Row(ns)   Relative
[info] ------------------------------------------------------------------------------------------------------------------------
[info] cast(trim(str) as decimal) as c_decimal - with 1 spaces           3362           5486         NaN          2.4         410.4       1.0X
[info] cast(trim(str) as decimal) as c_decimal - with 3 spaces           3251           5655         NaN          2.5         396.8       1.0X
[info] cast(trim(str) as decimal) as c_decimal - with 5 spaces           3208           5725         NaN          2.6         391.7       1.0X
[info] cast(str as decimal) as c_decimal - with 1 spaces          13962          16233        1354          0.6        1704.3       0.2X
[info] cast(str as decimal) as c_decimal - with 3 spaces          14273          14444         179          0.6        1742.4       0.2X
[info] cast(str as decimal) as c_decimal - with 5 spaces          14318          14535         125          0.6        1747.8       0.2X
```
#### string trim vs this fix
```java
[info] Java HotSpot(TM) 64-Bit Server VM 1.8.0_231-b11 on Mac OS X 10.15.1
[info] Intel(R) Core(TM) i9-9980HK CPU  2.40GHz
[info] Cast String to Integral:                  Best Time(ms)   Avg Time(ms)   Stdev(ms)    Rate(M/s)   Per Row(ns)   Relative
[info] ------------------------------------------------------------------------------------------------------------------------
[info] cast(trim(str) as decimal) as c_decimal - with 1 spaces           3265           6299         NaN          2.5         398.6       1.0X
[info] cast(trim(str) as decimal) as c_decimal - with 3 spaces           3183           6241         693          2.6         388.5       1.0X
[info] cast(trim(str) as decimal) as c_decimal - with 5 spaces           3167           5923        1151          2.6         386.7       1.0X
[info] cast(str as decimal) as c_decimal - with 1 spaces           3161           5838        1126          2.6         385.9       1.0X
[info] cast(str as decimal) as c_decimal - with 3 spaces           3046           3457         837          2.7         371.8       1.1X
[info] cast(str as decimal) as c_decimal - with 5 spaces           3053           4445         NaN          2.7         372.7       1.1X
[info]
```

Closes #26640 from yaooqinn/SPARK-30000.

Authored-by: Kent Yao <yaooqinn@hotmail.com>
Signed-off-by: Wenchen Fan <wenchen@databricks.com>
2019-11-25 12:47:07 +08:00
Sean Owen 13896e4eae [SPARK-30013][SQL] For scala 2.13, omit parens in various BigDecimal value() methods
### What changes were proposed in this pull request?

Omit parens on calls like BigDecimal.longValue()

### Why are the changes needed?

For some reason, this won't compile in Scala 2.13. The calls are otherwise equivalent in 2.12.

### Does this PR introduce any user-facing change?

No

### How was this patch tested?

Existing tests

Closes #26653 from srowen/SPARK-30013.

Authored-by: Sean Owen <sean.owen@databricks.com>
Signed-off-by: Dongjoon Hyun <dhyun@apple.com>
2019-11-24 18:23:34 -08:00
ulysses a8d907ce94 [SPARK-29937][SQL] Make FileSourceScanExec class fields lazy
### What changes were proposed in this pull request?

Since JIRA SPARK-28346,PR [25111](https://github.com/apache/spark/pull/25111), QueryExecution will copy all node stage-by-stage. This make all node instance twice almost. So we should make all class fields lazy to avoid create more unexpected object.

### Why are the changes needed?

Avoid create more unexpected object.

### Does this PR introduce any user-facing change?

No.

### How was this patch tested?

Exists UT.

Closes #26565 from ulysses-you/make-val-lazy.

Authored-by: ulysses <youxiduo@weidian.com>
Signed-off-by: Dongjoon Hyun <dhyun@apple.com>
2019-11-24 16:32:09 -08:00
Jungtaek Lim (HeartSaVioR) 0d3d46db21 [SPARK-29999][SS] Handle FileStreamSink metadata correctly for empty partition
### What changes were proposed in this pull request?

This patch checks the existence of output file for each task while committing the task, so that it doesn't throw FileNotFoundException while creating SinkFileStatus. The check is newly required for DSv2 implementation of FileStreamSink, as it is changed to create the output file lazily (as an improvement).

JSON writer for example: 9ec2a4e58c/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/json/JsonOutputWriter.scala (L49-L60)

### Why are the changes needed?

Without this patch, FileStreamSink throws FileNotFoundException when writing empty partition.

### Does this PR introduce any user-facing change?

No.

### How was this patch tested?

Added UT.

Closes #26639 from HeartSaVioR/SPARK-29999.

Authored-by: Jungtaek Lim (HeartSaVioR) <kabhwan.opensource@gmail.com>
Signed-off-by: Dongjoon Hyun <dhyun@apple.com>
2019-11-24 15:31:06 -08:00
Dongjoon Hyun cb68e58f88 [MINOR][INFRA] Use GitHub Action Cache for build
### What changes were proposed in this pull request?

This PR adds `GitHub Action Cache` task on `build` directory.

### Why are the changes needed?

This will replace the Maven downloading with the cache.

### Does this PR introduce any user-facing change?

No.

### How was this patch tested?

Manually check the GitHub Action log of this PR.

Closes #26652 from dongjoon-hyun/SPARK-MAVEN-CACHE.

Authored-by: Dongjoon Hyun <dhyun@apple.com>
Signed-off-by: Dongjoon Hyun <dhyun@apple.com>
2019-11-24 12:35:57 -08:00
Dongjoon Hyun a1706e2fa7 [SPARK-30005][INFRA] Update test-dependencies.sh to check hive-1.2/2.3 profile
### What changes were proposed in this pull request?

This PR aims to update `test-dependencies.sh` to validate all available `Hadoop/Hive` combination.

### Why are the changes needed?

Previously, we have been checking only `Hadoop2.7/Hive1.2` and `Hadoop3.2/Hive2.3`.
We need to validate `Hadoop2.7/Hive2.3` additionally for Apache Spark 3.0.

### Does this PR introduce any user-facing change?

No. (This is a dev-only change).

### How was this patch tested?

Pass the GitHub Action (Linter) with the newly updated manifest because this is only dependency check.

Closes #26646 from dongjoon-hyun/SPARK-30005.

Authored-by: Dongjoon Hyun <dhyun@apple.com>
Signed-off-by: Dongjoon Hyun <dhyun@apple.com>
2019-11-24 10:14:02 -08:00
Takeshi Yamamuro 3f3a18fff1 [SPARK-24690][SQL] Add a config to control plan stats computation in LogicalRelation
### What changes were proposed in this pull request?

This pr proposes a new independent config so that `LogicalRelation` could use `rowCount` to compute data statistics in logical plans even if CBO disabled. In the master, we currently cannot enable `StarSchemaDetection.reorderStarJoins` because we need to turn off CBO to enable it but `StarSchemaDetection` internally references the `rowCount` that is used in LogicalRelation if CBO disabled.

### Why are the changes needed?

Plan stats are pretty useful other than CBO, e.g., star-schema detector and dynamic partition pruning.

### Does this PR introduce any user-facing change?

No.

### How was this patch tested?

Added tests in `DataFrameJoinSuite`.

Closes #21668 from maropu/PlanStatsConf.

Authored-by: Takeshi Yamamuro <yamamuro@apache.org>
Signed-off-by: Dongjoon Hyun <dhyun@apple.com>
2019-11-24 08:30:24 -08:00
uncleGen 3d740901d6 [SPARK-29973][SS] Make processedRowsPerSecond calculated more accurately and meaningfully
### What changes were proposed in this pull request?

Give `processingTimeSec` 0.001 when a micro-batch completed under 1ms.

### Why are the changes needed?

The `processingTimeSec` of batch may be less than 1 ms.  As `processingTimeSec` is calculated in ms, so `processingTimeSec` equals 0L. If there is no data in this batch, the `processedRowsPerSecond` equals `0/0.0d`, i.e. `Double.NaN`. If there are some data in this batch, the `processedRowsPerSecond` equals `N/0.0d`, i.e. `Double.Infinity`.

### Does this PR introduce any user-facing change?
No

### How was this patch tested?
Add new UT

Closes #26610 from uncleGen/SPARK-29973.

Authored-by: uncleGen <hustyugm@gmail.com>
Signed-off-by: Sean Owen <sean.owen@databricks.com>
2019-11-24 08:08:15 -06:00
Dongjoon Hyun a60da23d64 [SPARK-30007][INFRA] Publish snapshot/release artifacts with -Phive-2.3 only
### What changes were proposed in this pull request?

This PR aims to add `-Phive-2.3` to publish profiles.
Since Apache Spark 3.0.0, Maven artifacts will be publish with Apache Hive 2.3 profile only.

This PR also will recover `SNAPSHOT` publishing Jenkins job.
- https://amplab.cs.berkeley.edu/jenkins/view/Spark%20Packaging/job/spark-master-maven-snapshots/

We will provide the pre-built distributions (with Hive 1.2.1 also) like Apache Spark 2.4.
SPARK-29989 will update the release script to generate all combinations.

### Why are the changes needed?

This will reduce the explicit dependency on the illegitimate Hive fork in Maven repository.

### Does this PR introduce any user-facing change?

Yes, but this is dev only changes.

### How was this patch tested?

Manual.

Closes #26648 from dongjoon-hyun/SPARK-30007.

Authored-by: Dongjoon Hyun <dhyun@apple.com>
Signed-off-by: Dongjoon Hyun <dhyun@apple.com>
2019-11-23 22:34:21 -08:00
Dongjoon Hyun 13338eaa95 [SPARK-29554][SQL][FOLLOWUP] Rename Version to SparkVersion
### What changes were proposed in this pull request?

This is a follow-up of https://github.com/apache/spark/pull/26209 .
This renames class `Version` to class `SparkVersion`.

### Why are the changes needed?

According to the review comment, this uses more specific class name.

### Does this PR introduce any user-facing change?

No.

### How was this patch tested?

Pass the Jenkins with the existing tests.

Closes #26647 from dongjoon-hyun/SPARK-29554.

Authored-by: Dongjoon Hyun <dhyun@apple.com>
Signed-off-by: Dongjoon Hyun <dhyun@apple.com>
2019-11-23 19:53:52 -08:00
Dilip Biswal 564826d960 [SPARK-28812][SQL][DOC] Document SHOW PARTITIONS in SQL Reference
### What changes were proposed in this pull request?
Document SHOW PARTITIONS statement in SQL Reference Guide.

### Why are the changes needed?
Currently Spark lacks documentation on the supported SQL constructs causing
confusion among users who sometimes have to look at the code to understand the
usage. This is aimed at addressing this issue.

### Does this PR introduce any user-facing change?
Yes.

**Before**
**After**
![image](https://user-images.githubusercontent.com/14225158/69405056-89468180-0cb3-11ea-8eb7-93046eaf551c.png)
![image](https://user-images.githubusercontent.com/14225158/69405067-93688000-0cb3-11ea-810a-11cab9e4a041.png)
![image](https://user-images.githubusercontent.com/14225158/69405120-c01c9780-0cb3-11ea-91c0-91eeaa9238a0.png)

Closes #26635 from dilipbiswal/show_partitions.

Authored-by: Dilip Biswal <dkbiswal@gmail.com>
Signed-off-by: Dongjoon Hyun <dhyun@apple.com>
2019-11-23 19:34:19 -08:00
Prakhar Jain 6898be9f02 [SPARK-29681][WEBUI] Support column sorting in Environment tab
### What changes were proposed in this pull request?
Add extra classnames to table headers in EnvironmentPage tables in Spark UI.

### Why are the changes needed?
SparkUI uses sorttable.js to provide the sort functionality in different tables. This library tries to guess the datatype of each column during initialization phase - numeric/alphanumeric etc and uses it to sort the columns whenever user clicks on a column. That way it guesses incorrect data type for environment tab.

sorttable.js has way to hint the datatype of table columns explicitly. This is done by passing custom HTML class attribute.

### Does this PR introduce any user-facing change?
No

### How was this patch tested?
Manually tested sorting in tables in Environment tab in Spark UI.

![Annotation 2019-11-22 154058](https://user-images.githubusercontent.com/2551496/69417432-a8d6bc00-0d3e-11ea-865b-f8017976c6f4.png)
![Annotation 2019-11-22 153600](https://user-images.githubusercontent.com/2551496/69417433-a8d6bc00-0d3e-11ea-9a75-8e1f4d66107e.png)
![Annotation 2019-11-22 153841](https://user-images.githubusercontent.com/2551496/69417435-a96f5280-0d3e-11ea-85f6-9f61b015e161.png)

Closes #26638 from prakharjain09/SPARK-29681-SPARK-UI-SORT.

Authored-by: Prakhar Jain <prakjai@microsoft.com>
Signed-off-by: Dongjoon Hyun <dhyun@apple.com>
2019-11-23 18:09:02 -08:00
Kousuke Saruta 6cd6d5f57e [SPARK-29970][WEBUI] Preserver open/close state of Timelineview
### What changes were proposed in this pull request?

Fix a bug related to Timelineview that does not preserve open/close state properly.

### Why are the changes needed?

To preserve open/close state is originally intended but it doesn't work.

### Does this PR introduce any user-facing change?

Yes. open/close state for Timeineview is to be preserved so if you open Timelineview in Stage page and go to another page, and then go back to Stage page, Timelineview should keep open.

### How was this patch tested?

Manual test.

Closes #26607 from sarutak/fix-timeline-view-state.

Authored-by: Kousuke Saruta <sarutak@oss.nttdata.com>
Signed-off-by: Dongjoon Hyun <dhyun@apple.com>
2019-11-23 16:16:24 -08:00
Dongjoon Hyun 6625b69027 [SPARK-29981][BUILD][FOLLOWUP] Change hive.version.short
### What changes were proposed in this pull request?

This is a follow-up according to liancheng 's advice.
- https://github.com/apache/spark/pull/26619#discussion_r349326090

### Why are the changes needed?

Previously, we chose the full version to be carefully. As of today, it seems that `Apache Hive 2.3` branch seems to become stable.

### Does this PR introduce any user-facing change?

No.

### How was this patch tested?

Pass the compile combination on GitHub Action.
1. hadoop-2.7/hive-1.2/JDK8
2. hadoop-2.7/hive-2.3/JDK8
3. hadoop-3.2/hive-2.3/JDK8
4. hadoop-3.2/hive-2.3/JDK11

Also, pass the Jenkins with `hadoop-2.7` and `hadoop-3.2` for (1) and (4).
(2) and (3) is not ready in Jenkins.

Closes #26645 from dongjoon-hyun/SPARK-RENAME-HIVE-DIRECTORY.

Authored-by: Dongjoon Hyun <dhyun@apple.com>
Signed-off-by: Dongjoon Hyun <dhyun@apple.com>
2019-11-23 12:50:50 -08:00
Dongjoon Hyun c98e5eb339 [SPARK-29981][BUILD] Add hive-1.2/2.3 profiles
### What changes were proposed in this pull request?

This PR aims the followings.
- Add two profiles, `hive-1.2` and `hive-2.3` (default)
- Validate if we keep the existing combination at least. (Hadoop-2.7 + Hive 1.2 / Hadoop-3.2 + Hive 2.3).

For now, we assumes that `hive-1.2` is explicitly used with `hadoop-2.7` and `hive-2.3` with `hadoop-3.2`. The followings are beyond the scope of this PR.

- SPARK-29988 Adjust Jenkins jobs for `hive-1.2/2.3` combination
- SPARK-29989 Update release-script for `hive-1.2/2.3` combination
- SPARK-29991 Support `hive-1.2/2.3` in PR Builder

### Why are the changes needed?

This will help to switch our dependencies to update the exposed dependencies.

### Does this PR introduce any user-facing change?

This is a dev-only change that the build profile combinations are changed.
- `-Phadoop-2.7` => `-Phadoop-2.7 -Phive-1.2`
- `-Phadoop-3.2` => `-Phadoop-3.2 -Phive-2.3`

### How was this patch tested?

Pass the Jenkins with the dependency check and tests to make it sure we don't change anything for now.

- [Jenkins (-Phadoop-2.7 -Phive-1.2)](https://amplab.cs.berkeley.edu/jenkins/job/SparkPullRequestBuilder/114192/consoleFull)
- [Jenkins (-Phadoop-3.2 -Phive-2.3)](https://amplab.cs.berkeley.edu/jenkins/job/SparkPullRequestBuilder/114192/consoleFull)

Also, from now, GitHub Action validates the following combinations.
![gha](https://user-images.githubusercontent.com/9700541/69355365-822d5e00-0c36-11ea-93f7-e00e5459e1d0.png)

Closes #26619 from dongjoon-hyun/SPARK-29981.

Authored-by: Dongjoon Hyun <dhyun@apple.com>
Signed-off-by: Dongjoon Hyun <dhyun@apple.com>
2019-11-23 10:02:22 -08:00
HyukjinKwon fc7a37b147 [SPARK-30003][SQL] Do not throw stack overflow exception in non-root unknown hint resolution
### What changes were proposed in this pull request?
This is rather a followup of https://github.com/apache/spark/pull/25464 (see https://github.com/apache/spark/pull/25464/files#r349543286)

It will cause an infinite recursion via mapping children - we should return the hint rather than its parent plan in unknown hint resolution.

### Why are the changes needed?

Prevent Stack over flow during hint resolution.

### Does this PR introduce any user-facing change?

Yes, it avoids stack overflow exception It was caused by https://github.com/apache/spark/pull/25464 and this is only in the master.

No behaviour changes to end users as it happened only in the master.

### How was this patch tested?

Unittest was added.

Closes #26642 from HyukjinKwon/SPARK-30003.

Authored-by: HyukjinKwon <gurwls223@apache.org>
Signed-off-by: HyukjinKwon <gurwls223@apache.org>
2019-11-23 17:24:56 +09:00
Norman Maurer f28eab2de7 [SPARK-29971][CORE] Fix buffer leaks in TransportFrameDecoder/TransportCipher
### What changes were proposed in this pull request?

- Correctly release `ByteBuf` in `TransportCipher` in all cases
- Move closing / releasing logic to `handlerRemoved(...)` so we are guaranteed that is always called.
- Correctly release `frameBuf` it is not null when the handler is removed (and so also when the channel becomes inactive)

### Why are the changes needed?

We need to carefully manage the ownership / lifecycle of `ByteBuf` instances so we don't leak any of these. We did not correctly do this in all cases:
 - when end up in invalid cipher state.
 - when partial data was received and the channel is closed before the full frame is decoded

Fixes https://github.com/netty/netty/issues/9784.

### Does this PR introduce any user-facing change?

No.

### How was this patch tested?

Pass the newly added UTs.

Closes #26609 from normanmaurer/fix_leaks.

Authored-by: Norman Maurer <norman_maurer@apple.com>
Signed-off-by: Marcelo Vanzin <vanzin@cloudera.com>
2019-11-22 15:20:54 -08:00
Liang-Chi Hsieh 6b0e391aa4 [SPARK-29427][SQL] Add API to convert RelationalGroupedDataset to KeyValueGroupedDataset
### What changes were proposed in this pull request?

This PR proposes to add `as` API to RelationalGroupedDataset. It creates KeyValueGroupedDataset instance using given grouping expressions, instead of a typed function in groupByKey API. Because it can leverage existing columns, it can use existing data partition, if any, when doing operations like cogroup.

### Why are the changes needed?

Currently if users want to do cogroup on DataFrames, there is no good way to do except for KeyValueGroupedDataset.

1. KeyValueGroupedDataset ignores existing data partition if any. That is a problem.
2. groupByKey calls typed function to create additional keys. You can not reuse existing columns, if you just need grouping by them.

```scala
// df1 and df2 are certainly partitioned and sorted.
val df1 = Seq((1, 2, 3), (2, 3, 4)).toDF("a", "b", "c")
  .repartition($"a").sortWithinPartitions("a")
val df2 = Seq((1, 2, 4), (2, 3, 5)).toDF("a", "b", "c")
  .repartition($"a").sortWithinPartitions("a")
```
```scala
// This groupBy.as.cogroup won't unnecessarily repartition the data
val df3 = df1.groupBy("a").as[Int]
  .cogroup(df2.groupBy("a").as[Int]) { case (key, data1, data2) =>
    data1.zip(data2).map { p =>
      p._1.getInt(2) + p._2.getInt(2)
    }
}
```

```
== Physical Plan ==
*(5) SerializeFromObject [input[0, int, false] AS value#11247]
+- CoGroup org.apache.spark.sql.DataFrameSuite$$Lambda$4922/12067092816eec1b6f, a#11209: int, createexternalrow(a#11209, b#11210, c#11211, StructField(a,IntegerType,false), StructField(b,IntegerType,false), StructField(c,IntegerType,false)), createexternalrow(a#11225, b#11226, c#11227, StructField(a,IntegerType,false), StructField(b,IntegerType,false), StructField(c,IntegerType,false)), [a#11209], [a#11225], [a#11209, b#11210, c#11211], [a#11225, b#11226, c#11227], obj#11246: int
   :- *(2) Sort [a#11209 ASC NULLS FIRST], false, 0
   :  +- Exchange hashpartitioning(a#11209, 5), false, [id=#10218]
   :     +- *(1) Project [_1#11202 AS a#11209, _2#11203 AS b#11210, _3#11204 AS c#11211]
   :        +- *(1) LocalTableScan [_1#11202, _2#11203, _3#11204]
   +- *(4) Sort [a#11225 ASC NULLS FIRST], false, 0
      +- Exchange hashpartitioning(a#11225, 5), false, [id=#10223]
         +- *(3) Project [_1#11218 AS a#11225, _2#11219 AS b#11226, _3#11220 AS c#11227]
            +- *(3) LocalTableScan [_1#11218, _2#11219, _3#11220]
```

```scala
// Current approach creates additional AppendColumns and repartition data again
val df4 = df1.groupByKey(r => r.getInt(0)).cogroup(df2.groupByKey(r => r.getInt(0))) {
  case (key, data1, data2) =>
    data1.zip(data2).map { p =>
      p._1.getInt(2) + p._2.getInt(2)
  }
}
```

```
== Physical Plan ==
*(7) SerializeFromObject [input[0, int, false] AS value#11257]
+- CoGroup org.apache.spark.sql.DataFrameSuite$$Lambda$4933/138102700737171997, value#11252: int, createexternalrow(a#11209, b#11210, c#11211, StructField(a,IntegerType,false), StructField(b,IntegerType,false), StructField(c,IntegerType,false)), createexternalrow(a#11225, b#11226, c#11227, StructField(a,IntegerType,false), StructField(b,IntegerType,false), StructField(c,IntegerType,false)), [value#11252], [value#11254], [a#11209, b#11210, c#11211], [a#11225, b#11226, c#11227], obj#11256: int
   :- *(3) Sort [value#11252 ASC NULLS FIRST], false, 0
   :  +- Exchange hashpartitioning(value#11252, 5), true, [id=#10302]
   :     +- AppendColumns org.apache.spark.sql.DataFrameSuite$$Lambda$4930/19529195347ce07f47, createexternalrow(a#11209, b#11210, c#11211, StructField(a,IntegerType,false), StructField(b,IntegerType,false), StructField(c,IntegerType,false)), [input[0, int, false] AS value#11252]
   :        +- *(2) Sort [a#11209 ASC NULLS FIRST], false, 0
   :           +- Exchange hashpartitioning(a#11209, 5), false, [id=#10297]
   :              +- *(1) Project [_1#11202 AS a#11209, _2#11203 AS b#11210, _3#11204 AS c#11211]
   :                 +- *(1) LocalTableScan [_1#11202, _2#11203, _3#11204]
   +- *(6) Sort [value#11254 ASC NULLS FIRST], false, 0
      +- Exchange hashpartitioning(value#11254, 5), true, [id=#10312]
         +- AppendColumns org.apache.spark.sql.DataFrameSuite$$Lambda$4932/15265288491f0e0c1f, createexternalrow(a#11225, b#11226, c#11227, StructField(a,IntegerType,false), StructField(b,IntegerType,false), StructField(c,IntegerType,false)), [input[0, int, false] AS value#11254]
            +- *(5) Sort [a#11225 ASC NULLS FIRST], false, 0
               +- Exchange hashpartitioning(a#11225, 5), false, [id=#10307]
                  +- *(4) Project [_1#11218 AS a#11225, _2#11219 AS b#11226, _3#11220 AS c#11227]
                     +- *(4) LocalTableScan [_1#11218, _2#11219, _3#11220]
```

### Does this PR introduce any user-facing change?

Yes, this adds a new `as` API to RelationalGroupedDataset. Users can use it to create KeyValueGroupedDataset and do cogroup.

### How was this patch tested?

Unit tests.

Closes #26509 from viirya/SPARK-29427-2.

Lead-authored-by: Liang-Chi Hsieh <viirya@gmail.com>
Co-authored-by: Liang-Chi Hsieh <liangchi@uber.com>
Signed-off-by: Dongjoon Hyun <dhyun@apple.com>
2019-11-22 10:34:26 -08:00
Wenchen Fan 6e581cf164 [SPARK-29893][SQL][FOLLOWUP] code cleanup for local shuffle reader
### What changes were proposed in this pull request?

A few cleanups for https://github.com/apache/spark/pull/26516:
1. move the calculating of partition start indices from the RDD to the rule. We can reuse code from "shrink number of reducers" in the future if we split partitions by size.
2. only check extra shuffles when adding local readers to the probe side.
3. add comments.
4. simplify the config name: `optimizedLocalShuffleReader` -> `localShuffleReader`

### Why are the changes needed?

make code more maintainable.

### Does this PR introduce any user-facing change?

no

### How was this patch tested?

existing tests

Closes #26625 from cloud-fan/aqe.

Authored-by: Wenchen Fan <wenchen@databricks.com>
Signed-off-by: Xiao Li <gatorsmile@gmail.com>
2019-11-22 10:26:54 -08:00
Kent Yao 2dd6807e42 [SPARK-28023][SQL] Add trim logic in UTF8String's toInt/toLong to make it consistent with other string-numeric casting
### What changes were proposed in this pull request?

Modify `UTF8String.toInt/toLong` to support trim spaces for both sides before converting it to byte/short/int/long.

With this kind of "cheap" trim can help improve performance for casting string to integrals. The idea is from https://github.com/apache/spark/pull/24872#issuecomment-556917834

### Why are the changes needed?

make the behavior consistent.

### Does this PR introduce any user-facing change?
yes, cast string to an integral type, and binary comparison between string and integrals will trim spaces first. their behavior will be consistent with float and double.
### How was this patch tested?
1. add ut.
2. benchmark tests
 the benchmark is modified based on https://github.com/apache/spark/pull/24872#issuecomment-503827016

```scala
/*
 * Licensed to the Apache Software Foundation (ASF) under one or more
 * contributor license agreements.  See the NOTICE file distributed with
 * this work for additional information regarding copyright ownership.
 * The ASF licenses this file to You under the Apache License, Version 2.0
 * (the "License"); you may not use this file except in compliance with
 * the License.  You may obtain a copy of the License at
 *
 *    http://www.apache.org/licenses/LICENSE-2.0
 *
 * Unless required by applicable law or agreed to in writing, software
 * distributed under the License is distributed on an "AS IS" BASIS,
 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
 * See the License for the specific language governing permissions and
 * limitations under the License.
 */

package org.apache.spark.sql.execution.benchmark

import org.apache.spark.benchmark.Benchmark

/**
 * Benchmark trim the string when casting string type to Boolean/Numeric types.
 * To run this benchmark:
 * {{{
 *   1. without sbt:
 *      bin/spark-submit --class <this class> --jars <spark core test jar> <spark sql test jar>
 *   2. build/sbt "sql/test:runMain <this class>"
 *   3. generate result: SPARK_GENERATE_BENCHMARK_FILES=1 build/sbt "sql/test:runMain <this class>"
 *      Results will be written to "benchmarks/CastBenchmark-results.txt".
 * }}}
 */
object CastBenchmark extends SqlBasedBenchmark {
This conversation was marked as resolved by yaooqinn

  override def runBenchmarkSuite(mainArgs: Array[String]): Unit = {
    val title = "Cast String to Integral"
    runBenchmark(title) {
      withTempPath { dir =>
        val N = 500L << 14
        val df = spark.range(N)
        val types = Seq("int", "long")
        (1 to 5).by(2).foreach { i =>
          df.selectExpr(s"concat(id, '${" " * i}') as str")
            .write.mode("overwrite").parquet(dir + i.toString)
        }

        val benchmark = new Benchmark(title, N, minNumIters = 5, output = output)
        Seq(true, false).foreach { trim =>
          types.foreach { t =>
            val str = if (trim) "trim(str)" else "str"
            val expr = s"cast($str as $t) as c_$t"
            (1 to 5).by(2).foreach { i =>
              benchmark.addCase(expr + s" - with $i spaces") { _ =>
                spark.read.parquet(dir + i.toString).selectExpr(expr).collect()
              }
            }
          }
        }
        benchmark.run()
      }
    }
  }
}
```
#### benchmark result.
normal trim v.s. trim in toInt/toLong
```java
================================================================================================
Cast String to Integral
================================================================================================

Java HotSpot(TM) 64-Bit Server VM 1.8.0_231-b11 on Mac OS X 10.15.1
Intel(R) Core(TM) i5-5287U CPU  2.90GHz
Cast String to Integral:                  Best Time(ms)   Avg Time(ms)   Stdev(ms)    Rate(M/s)   Per Row(ns)   Relative
------------------------------------------------------------------------------------------------------------------------
cast(trim(str) as int) as c_int - with 1 spaces          10220          12994        1337          0.8        1247.5       1.0X
cast(trim(str) as int) as c_int - with 3 spaces           4763           8356         357          1.7         581.4       2.1X
cast(trim(str) as int) as c_int - with 5 spaces           4791           8042         NaN          1.7         584.9       2.1X
cast(trim(str) as long) as c_long - with 1 spaces           4014           6755         NaN          2.0         490.0       2.5X
cast(trim(str) as long) as c_long - with 3 spaces           4737           6938         NaN          1.7         578.2       2.2X
cast(trim(str) as long) as c_long - with 5 spaces           4478           6919        1404          1.8         546.6       2.3X
cast(str as int) as c_int - with 1 spaces           4443           6222         NaN          1.8         542.3       2.3X
cast(str as int) as c_int - with 3 spaces           3659           3842         170          2.2         446.7       2.8X
cast(str as int) as c_int - with 5 spaces           4372           7996         NaN          1.9         533.7       2.3X
cast(str as long) as c_long - with 1 spaces           3866           5838         NaN          2.1         471.9       2.6X
cast(str as long) as c_long - with 3 spaces           3793           5449         NaN          2.2         463.0       2.7X
cast(str as long) as c_long - with 5 spaces           4947           5961        1198          1.7         603.9       2.1X
```

Closes #26622 from yaooqinn/cheapstringtrim.

Authored-by: Kent Yao <yaooqinn@hotmail.com>
Signed-off-by: Wenchen Fan <wenchen@databricks.com>
2019-11-22 19:32:27 +08:00
LantaoJin 9ec2a4e58c [SPARK-29911][SQL][FOLLOWUP] Move related unit test to ThriftServerWithSparkContextSuite
### What changes were proposed in this pull request?
This is follow up of #26543

See https://github.com/apache/spark/pull/26543#discussion_r348934276

### Does this PR introduce any user-facing change?
No

### How was this patch tested?
Exist UT.

Closes #26628 from LantaoJin/SPARK-29911_FOLLOWUP.

Authored-by: LantaoJin <jinlantao@gmail.com>
Signed-off-by: HyukjinKwon <gurwls223@apache.org>
2019-11-22 18:36:50 +09:00
Wenchen Fan e2f056f4a8 [SPARK-29975][SQL] introduce --CONFIG_DIM directive
### What changes were proposed in this pull request?

allow the sql test files to specify different dimensions of config sets during testing. For example,
```
--CONFIG_DIM1 a=1
--CONFIG_DIM1 b=2,c=3

--CONFIG_DIM2 x=1
--CONFIG_DIM2 y=1,z=2
```

This example defines 2 config dimensions, and each dimension defines 2 config sets. We will run the queries 4 times:
1. a=1, x=1
2. a=1, y=1, z=2
3. b=2, c=3, x=1
4. b=2, c=3, y=1, z=2

### Why are the changes needed?

Currently `SQLQueryTestSuite` takes a long time. This is because we run each test at least 3 times, to check with different codegen modes. This is not necessary for most of the tests, e.g. DESC TABLE. We should only check these codegen modes for certain tests.

With the --CONFIG_DIM directive, we can do things like: test different join operator(broadcast or shuffle join) X different codegen modes.

After reducing testing time, we should be able to run thrifter server SQL tests with config settings.

### Does this PR introduce any user-facing change?

no

### How was this patch tested?

test only

Closes #26612 from cloud-fan/test.

Authored-by: Wenchen Fan <wenchen@databricks.com>
Signed-off-by: HyukjinKwon <gurwls223@apache.org>
2019-11-22 10:56:28 +09:00
Wenchen Fan 6b4b6a87cd [SPARK-29558][SQL] ResolveTables and ResolveRelations should be order-insensitive
### What changes were proposed in this pull request?

Make `ResolveRelations` call `ResolveTables` at the beginning, and make `ResolveTables` call `ResolveTempViews`(newly added) at the beginning, to ensure the relation resolution priority.

### Why are the changes needed?

To resolve an `UnresolvedRelation`, the general process is:
1. try to resolve to (global) temp view first. If it's not a temp view, move on
2. if the table name specifies a catalog, lookup the table from the specified catalog. Otherwise, lookup table from the current catalog.
3. when looking up table from session catalog, return a v1 relation if the table provider is v1.

Currently, this process is done by 2 rules: `ResolveTables` and `ResolveRelations`. To avoid rule conflicts, we add a lot of checks:
1. `ResolveTables` only resolves `UnresolvedRelation` if it's not a temp view and the resolved table is not v1.
2. `ResolveRelations` only resolves `UnresolvedRelation` if the table name has less than 2 parts.

This requires to run `ResolveTables` before `ResolveRelations`, otherwise we may resolve a v2 table to a v1 relation.

To clearly guarantee the resolution priority, and avoid massive changes, this PR proposes to call one rule in another rule to ensure the rule execution order. Now the process is simple:
1. first run `ResolveTempViews`, see if we can resolve relation to temp view
2. then run `ResolveTables`, see if we can resolve relation to v2 tables.
3. finally run `ResolveRelations`, see if we can resolve relation to v1 tables.

### Does this PR introduce any user-facing change?

no

### How was this patch tested?

existing tests

Closes #26214 from cloud-fan/resolve.

Authored-by: Wenchen Fan <wenchen@databricks.com>
Signed-off-by: Ryan Blue <blue@apache.org>
2019-11-21 09:47:42 -08:00
Ximo Guanter 54c5087a3a [SPARK-29248][SQL] provider number of partitions when creating v2 data writer factory
### What changes were proposed in this pull request?
When implementing a ScanBuilder, we require the implementor to provide the schema of the data and the number of partitions.

However, when someone is implementing WriteBuilder we only pass them the schema, but not the number of partitions. This is an asymetrical developer experience.

This PR adds a PhysicalWriteInfo interface that is passed to createBatchWriterFactory and createStreamingWriterFactory that adds the number of partitions of the data that is going to be written.

### Why are the changes needed?
Passing in the number of partitions on the WriteBuilder would enable data sources to provision their write targets before starting to write. For example:

it could be used to provision a Kafka topic with a specific number of partitions
it could be used to scale a microservice prior to sending the data to it
it could be used to create a DsV2 that sends the data to another spark cluster (currently not possible since the reader wouldn't be able to know the number of partitions)
### Does this PR introduce any user-facing change?
No

### How was this patch tested?
Tests passed

Closes #26591 from edrevo/temp.

Authored-by: Ximo Guanter <joaquin.guantergonzalbez@telefonica.com>
Signed-off-by: Wenchen Fan <wenchen@databricks.com>
2019-11-22 00:19:25 +08:00
Takeshi Yamamuro cdcd43cbf2 [SPARK-29977][SQL] Remove newMutableProjection/newOrdering/newNaturalAscendingOrdering from SparkPlan
### What changes were proposed in this pull request?

This is to refactor `SparkPlan` code; it mainly removed `newMutableProjection`/`newOrdering`/`newNaturalAscendingOrdering` from `SparkPlan`.
The other modifications are listed below;
 - Move `BaseOrdering` from `o.a.s.sqlcatalyst.expressions.codegen.GenerateOrdering.scala` to `o.a.s.sqlcatalyst.expressions.ordering.scala`
 - `RowOrdering` extends `CodeGeneratorWithInterpretedFallback ` for `BaseOrdering`
 - Remove the unused variables (`subexpressionEliminationEnabled` and `codeGenFallBack`) from `SparkPlan`

### Why are the changes needed?

For better code/test coverage.

### Does this PR introduce any user-facing change?

No.

### How was this patch tested?

Existing.

Closes #26615 from maropu/RefactorOrdering.

Authored-by: Takeshi Yamamuro <yamamuro@apache.org>
Signed-off-by: Wenchen Fan <wenchen@databricks.com>
2019-11-21 23:51:12 +08:00
angerszhu 6146dc4562 [SPARK-29874][SQL] Optimize Dataset.isEmpty()
### What changes were proposed in this pull request?
In  origin way to judge if a DataSet is empty by
```
 def isEmpty: Boolean = withAction("isEmpty", limit(1).groupBy().count().queryExecution) { plan =>
    plan.executeCollect().head.getLong(0) == 0
  }
```
will add two shuffles by `limit()`, `groupby() and count()`, then collect all data to driver.
In this way we can avoid `oom` when collect data to driver. But it will trigger all partitions calculated and add more shuffle process.

We change it to
```
  def isEmpty: Boolean = withAction("isEmpty", select().queryExecution) { plan =>
    plan.executeTake(1).isEmpty
  }
```
After these pr, we will add a column pruning to origin LogicalPlan and use `executeTake()` API.
then we won't add more shuffle process and just compute only one partition's data in last stage.
In this way we can reduce cost when we call `DataSet.isEmpty()` and won't bring memory issue to driver side.

### Why are the changes needed?
Optimize Dataset.isEmpty()

### Does this PR introduce any user-facing change?
No

### How was this patch tested?
Origin UT

Closes #26500 from AngersZhuuuu/SPARK-29874.

Authored-by: angerszhu <angers.zhu@gmail.com>
Signed-off-by: Wenchen Fan <wenchen@databricks.com>
2019-11-21 18:43:21 +08:00
zhengruifeng 0f40d2a6ee [SPARK-29960][ML][PYSPARK] MulticlassClassificationEvaluator support hammingLoss
### What changes were proposed in this pull request?
MulticlassClassificationEvaluator support hammingLoss

### Why are the changes needed?
1, it is an easy to compute hammingLoss based on confusion matrix
2, scikit-learn supports it

### Does this PR introduce any user-facing change?
yes

### How was this patch tested?
added testsuites

Closes #26597 from zhengruifeng/multi_class_hamming_loss.

Authored-by: zhengruifeng <ruifengz@foxmail.com>
Signed-off-by: zhengruifeng <ruifengz@foxmail.com>
2019-11-21 18:32:28 +08:00
zhengruifeng 297cbab98e [SPARK-29942][ML] Impl Complement Naive Bayes Classifier
### What changes were proposed in this pull request?
Impl Complement Naive Bayes Classifier as a `modelType` option in `NaiveBayes`

### Why are the changes needed?
1, it is a better choice for text classification: it is said in [scikit-learn](https://scikit-learn.org/stable/modules/naive_bayes.html#complement-naive-bayes) that 'CNB regularly outperforms MNB (often by a considerable margin) on text classification tasks.'
2, CNB is highly similar to existing MNB, only a small part of existing MNB need to be changed, so it is a easy win to support CNB.

### Does this PR introduce any user-facing change?
yes, a new `modelType` is supported

### How was this patch tested?
added testsuites

Closes #26575 from zhengruifeng/cnb.

Authored-by: zhengruifeng <ruifengz@foxmail.com>
Signed-off-by: zhengruifeng <ruifengz@foxmail.com>
2019-11-21 18:22:05 +08:00
gengjiaan 85c004d5b0 [SPARK-29885][PYTHON][CORE] Improve the exception message when reading the daemon port
### What changes were proposed in this pull request?
In production environment, my PySpark application occurs an exception and it's message as below:
```
19/10/28 16:15:03 ERROR Executor: Exception in task 0.0 in stage 0.0 (TID 0)
org.apache.spark.SparkException: No port number in pyspark.daemon's stdout
	at org.apache.spark.api.python.PythonWorkerFactory.startDaemon(PythonWorkerFactory.scala:204)
	at org.apache.spark.api.python.PythonWorkerFactory.createThroughDaemon(PythonWorkerFactory.scala:122)
	at org.apache.spark.api.python.PythonWorkerFactory.create(PythonWorkerFactory.scala:95)
	at org.apache.spark.SparkEnv.createPythonWorker(SparkEnv.scala:117)
	at org.apache.spark.api.python.BasePythonRunner.compute(PythonRunner.scala:108)
	at org.apache.spark.api.python.PythonRDD.compute(PythonRDD.scala:65)
	at org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:324)
	at org.apache.spark.rdd.RDD$$anonfun$7.apply(RDD.scala:337)
	at org.apache.spark.rdd.RDD$$anonfun$7.apply(RDD.scala:335)
	at org.apache.spark.storage.BlockManager$$anonfun$doPutIterator$1.apply(BlockManager.scala:1182)
	at org.apache.spark.storage.BlockManager$$anonfun$doPutIterator$1.apply(BlockManager.scala:1156)
	at org.apache.spark.storage.BlockManager.doPut(BlockManager.scala:1091)
	at org.apache.spark.storage.BlockManager.doPutIterator(BlockManager.scala:1156)
	at org.apache.spark.storage.BlockManager.getOrElseUpdate(BlockManager.scala:882)
	at org.apache.spark.rdd.RDD.getOrCompute(RDD.scala:335)
	at org.apache.spark.rdd.RDD.iterator(RDD.scala:286)
	at org.apache.spark.api.python.PythonRDD.compute(PythonRDD.scala:65)
	at org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:324)
	at org.apache.spark.rdd.RDD.iterator(RDD.scala:288)
	at org.apache.spark.rdd.MapPartitionsRDD.compute(MapPartitionsRDD.scala:52)
	at org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:324)
	at org.apache.spark.rdd.RDD.iterator(RDD.scala:288)
	at org.apache.spark.rdd.MapPartitionsRDD.compute(MapPartitionsRDD.scala:52)
	at org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:324)
	at org.apache.spark.rdd.RDD.iterator(RDD.scala:288)
	at org.apache.spark.scheduler.ResultTask.runTask(ResultTask.scala:90)
	at org.apache.spark.scheduler.Task.run(Task.scala:121)
	at org.apache.spark.executor.Executor$TaskRunner$$anonfun$10.apply(Executor.scala:408)
	at org.apache.spark.util.Utils$.tryWithSafeFinally(Utils.scala:1360)
	at org.apache.spark.executor.Executor$TaskRunner.run(Executor.scala:414)
	at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1142)
	at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:617)
	at java.lang.Thread.run(Thread.java:745)
```
At first, I think a physical node has many ports are occupied by a large number of processes.
But I found the total number of ports in use is only 671.
```
[yarnr1115 ~]$ netstat -a | wc -l 671
671
```
I  checked the code of PythonWorkerFactory in line 204 and found:
```
 daemon = pb.start()
 val in = new DataInputStream(daemon.getInputStream)
 try {
 daemonPort = in.readInt()
 } catch {
 case _: EOFException =>
 throw new SparkException(s"No port number in $daemonModule's stdout")
 }
```
I added some code here:
```
logError("Meet EOFException, daemon is alive: ${daemon.isAlive()}")
logError("Exit value: ${daemon.exitValue()}")
```
Then I recurrent the exception and it's message as below:
```
19/10/28 16:15:03 ERROR PythonWorkerFactory: Meet EOFException, daemon is alive: false
19/10/28 16:15:03 ERROR PythonWorkerFactory: Exit value: 139
19/10/28 16:15:03 ERROR Executor: Exception in task 0.0 in stage 0.0 (TID 0)
org.apache.spark.SparkException: No port number in pyspark.daemon's stdout
 at org.apache.spark.api.python.PythonWorkerFactory.startDaemon(PythonWorkerFactory.scala:206)
 at org.apache.spark.api.python.PythonWorkerFactory.createThroughDaemon(PythonWorkerFactory.scala:122)
 at org.apache.spark.api.python.PythonWorkerFactory.create(PythonWorkerFactory.scala:95)
 at org.apache.spark.SparkEnv.createPythonWorker(SparkEnv.scala:117)
 at org.apache.spark.api.python.BasePythonRunner.compute(PythonRunner.scala:108)
 at org.apache.spark.api.python.PythonRDD.compute(PythonRDD.scala:65)
 at org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:324)
 at org.apache.spark.rdd.RDD$$anonfun$7.apply(RDD.scala:337)
 at org.apache.spark.rdd.RDD$$anonfun$7.apply(RDD.scala:335)
 at org.apache.spark.storage.BlockManager$$anonfun$doPutIterator$1.apply(BlockManager.scala:1182)
 at org.apache.spark.storage.BlockManager$$anonfun$doPutIterator$1.apply(BlockManager.scala:1156)
 at org.apache.spark.storage.BlockManager.doPut(BlockManager.scala:1091)
 at org.apache.spark.storage.BlockManager.doPutIterator(BlockManager.scala:1156)
 at org.apache.spark.storage.BlockManager.getOrElseUpdate(BlockManager.scala:882)
 at org.apache.spark.rdd.RDD.getOrCompute(RDD.scala:335)
 at org.apache.spark.rdd.RDD.iterator(RDD.scala:286)
 at org.apache.spark.api.python.PythonRDD.compute(PythonRDD.scala:65)
 at org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:324)
 at org.apache.spark.rdd.RDD.iterator(RDD.scala:288)
 at org.apache.spark.rdd.MapPartitionsRDD.compute(MapPartitionsRDD.scala:52)
 at org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:324)
 at org.apache.spark.rdd.RDD.iterator(RDD.scala:288)
 at org.apache.spark.rdd.MapPartitionsRDD.compute(MapPartitionsRDD.scala:52)
 at org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:324)
 at org.apache.spark.rdd.RDD.iterator(RDD.scala:288)
 at org.apache.spark.scheduler.ResultTask.runTask(ResultTask.scala:90)
 at org.apache.spark.scheduler.Task.run(Task.scala:121)
 at org.apache.spark.executor.Executor$TaskRunner$$anonfun$10.apply(Executor.scala:408)
 at org.apache.spark.util.Utils$.tryWithSafeFinally(Utils.scala:1360)
 at org.apache.spark.executor.Executor$TaskRunner.run(Executor.scala:414)
 at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1142)
 at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:617)
 at java.lang.Thread.run(Thread.java:745)
```
I think the exception message has caused me a lot of confusion.
This PR will add meaningful log for exception information.

### Why are the changes needed?
In order to clarify the exception and try three times default.

### Does this PR introduce any user-facing change?
No.

### How was this patch tested?
Exists UT.

Closes #26510 from beliefer/improve-except-message.

Authored-by: gengjiaan <gengjiaan@360.cn>
Signed-off-by: HyukjinKwon <gurwls223@apache.org>
2019-11-21 16:13:42 +09:00