Commit graph

7318 commits

Author SHA1 Message Date
root1 39fff9258a [SPARK-29452][WEBUI] Improve Storage tab tooltip
### What changes were proposed in this pull request?
Added Tootips for each column in storage tab of Web UI.

### Why are the changes needed?
Tooltips will help users in understanding columns of storage tabs.

### Does this PR introduce any user-facing change?
Yes

### How was this patch tested?
Manually Tested.

Closes #26226 from iRakson/storage_tooltip.

Authored-by: root1 <raksonrakesh@gmail.com>
Signed-off-by: Sean Owen <sean.owen@databricks.com>
2019-11-01 08:27:34 -05:00
Xingbo Jiang 8207c835b4 Revert "Prepare Spark release v3.0.0-preview-rc2"
This reverts commit 007c873ae3.
2019-10-30 17:45:44 -07:00
Xingbo Jiang 007c873ae3 Prepare Spark release v3.0.0-preview-rc2
### What changes were proposed in this pull request?

To push the built jars to maven release repository, we need to remove the 'SNAPSHOT' tag from the version name.

Made the following changes in this PR:
* Update all the `3.0.0-SNAPSHOT` version name to `3.0.0-preview`
* Update the sparkR version number check logic to allow jvm version like `3.0.0-preview`

**Please note those changes were generated by the release script in the past, but this time since we manually add tags on master branch, we need to manually apply those changes too.**

We shall revert the changes after 3.0.0-preview release passed.

### Why are the changes needed?

To make the maven release repository to accept the built jars.

### Does this PR introduce any user-facing change?

No

### How was this patch tested?

N/A
2019-10-30 17:42:59 -07:00
Gabor Somogyi 9c817a83fc [SPARK-29637][CORE] Add description to Job SHS web API
### Why are the changes needed?
Starting from Spark 2.3, the SHS REST API endpoint `/applications/<app_id>/jobs/` is not including `description` in the JobData returned. This is not the case until Spark 2.2.

In this PR I've added the mentioned field.

### Does this PR introduce any user-facing change?
Yes.

Old API response:
```
[ {
  "jobId" : 0,
  "name" : "foreach at <console>:26",
  "submissionTime" : "2019-10-28T12:41:54.301GMT",
  "completionTime" : "2019-10-28T12:41:54.731GMT",
  "stageIds" : [ 0 ],
  "jobGroup" : "test",
  "status" : "SUCCEEDED",
  "numTasks" : 1,
  "numActiveTasks" : 0,
  "numCompletedTasks" : 1,
  "numSkippedTasks" : 0,
  "numFailedTasks" : 0,
  "numKilledTasks" : 0,
  "numCompletedIndices" : 1,
  "numActiveStages" : 0,
  "numCompletedStages" : 1,
  "numSkippedStages" : 0,
  "numFailedStages" : 0,
  "killedTasksSummary" : { }
} ]
```
New API response:
```
[ {
  "jobId" : 0,
  "name" : "foreach at <console>:26",
  "description" : "job",                            <= This is the addition here
  "submissionTime" : "2019-10-28T13:37:24.107GMT",
  "completionTime" : "2019-10-28T13:37:24.613GMT",
  "stageIds" : [ 0 ],
  "jobGroup" : "test",
  "status" : "SUCCEEDED",
  "numTasks" : 1,
  "numActiveTasks" : 0,
  "numCompletedTasks" : 1,
  "numSkippedTasks" : 0,
  "numFailedTasks" : 0,
  "numKilledTasks" : 0,
  "numCompletedIndices" : 1,
  "numActiveStages" : 0,
  "numCompletedStages" : 1,
  "numSkippedStages" : 0,
  "numFailedStages" : 0,
  "killedTasksSummary" : { }
} ]
```

### How was this patch tested?
Extended + existing unit tests.

Manually:
* Open spark-shell
```
scala> sc.setJobGroup("test", "job", false);
scala> val foo = sc.textFile("/user/foo.txt");
foo: org.apache.spark.rdd.RDD[String] = /user/foo.txt MapPartitionsRDD[1] at textFile at <console>:24
scala> foo.foreach(println);
```
* Access REST API `http://SHS-host:port/api/v1/applications/<app-id>/jobs/`

Closes #26295 from gaborgsomogyi/SPARK-29637.

Authored-by: Gabor Somogyi <gabor.g.somogyi@gmail.com>
Signed-off-by: Marcelo Vanzin <vanzin@cloudera.com>
2019-10-29 12:00:52 -07:00
Xingbo Jiang b33a58c0c6 Revert "Prepare Spark release v3.0.0-preview-rc1"
This reverts commit 5eddbb5f1d.
2019-10-28 22:32:34 -07:00
Xingbo Jiang 5eddbb5f1d Prepare Spark release v3.0.0-preview-rc1
### What changes were proposed in this pull request?

To push the built jars to maven release repository, we need to remove the 'SNAPSHOT' tag from the version name.

Made the following changes in this PR:
* Update all the `3.0.0-SNAPSHOT` version name to `3.0.0-preview`
* Update the PySpark version from `3.0.0.dev0` to `3.0.0`

**Please note those changes were generated by the release script in the past, but this time since we manually add tags on master branch, we need to manually apply those changes too.**

We shall revert the changes after 3.0.0-preview release passed.

### Why are the changes needed?

To make the maven release repository to accept the built jars.

### Does this PR introduce any user-facing change?

No

### How was this patch tested?

N/A

Closes #26243 from jiangxb1987/3.0.0-preview-prepare.

Lead-authored-by: Xingbo Jiang <xingbo.jiang@databricks.com>
Co-authored-by: HyukjinKwon <gurwls223@apache.org>
Signed-off-by: Xingbo Jiang <xingbo.jiang@databricks.com>
2019-10-28 22:31:29 -07:00
Liang-Chi Hsieh ae5b60da32 [SPARK-29182][CORE][FOLLOWUP] Cache preferred locations of checkpointed RDD
### What changes were proposed in this pull request?

This is a followup to #25856. This fixes the document about the config value of spark.rdd.checkpoint.cachePreferredLocsExpireTime.

### Why are the changes needed?

The document is not correct. spark.rdd.checkpoint.cachePreferredLocsExpireTime can not be 0.

### Does this PR introduce any user-facing change?

No

### How was this patch tested?

This is document only change.

Closes #26251 from viirya/SPARK-29182-followup.

Authored-by: Liang-Chi Hsieh <viirya@gmail.com>
Signed-off-by: Dongjoon Hyun <dhyun@apple.com>
2019-10-25 11:13:06 -07:00
Pavithra Ramachandran 1296bbb8ac [SPARK-29504][WEBUI] Toggle full job description on click
### What changes were proposed in this pull request?
On clicking job description in jobs page, the description was not shown fully.
Add the function for the click event on description.

### Why are the changes needed?
when there is a long description of a job, it cannot be seen fully in the UI.
The feature was added in https://github.com/apache/spark/pull/24145
But it is missed after https://github.com/apache/spark/pull/25374

Before change:
![Screenshot from 2019-10-23 11-23-00](https://user-images.githubusercontent.com/51401130/67361914-827b0080-f587-11e9-9181-d49a6a836046.png)
After change: on Double click over decription
![Screenshot from 2019-10-23 11-20-02](https://user-images.githubusercontent.com/51401130/67361936-932b7680-f587-11e9-9e59-d290abed4b70.png)

### Does this PR introduce any user-facing change?

No

### How was this patch tested?
Manually test

Closes #26222 from PavithraRamachandran/jobs_description_tooltip.

Authored-by: Pavithra Ramachandran <pavi.rams@gmail.com>
Signed-off-by: Gengliang Wang <gengliang.wang@databricks.com>
2019-10-24 11:14:31 +02:00
DB Tsai fd899d6331 [SPARK-29576][CORE] Use Spark's CompressionCodec for Ser/Deser of MapOutputStatus
### What changes were proposed in this pull request?
Instead of using ZStd codec directly, we use Spark's CompressionCodec which wraps ZStd codec in a buffered stream to avoid overhead excessive of JNI call while trying to compress/decompress small amount of data.

Also, by using Spark's CompressionCodec, we can easily to make it configurable in the future if it's needed.

### Why are the changes needed?
Faster performance.

### Does this PR introduce any user-facing change?
No.

### How was this patch tested?
Existing tests.

Closes #26235 from dbtsai/optimizeDeser.

Lead-authored-by: DB Tsai <d_tsai@apple.com>
Co-authored-by: Dongjoon Hyun <dhyun@apple.com>
Signed-off-by: Dongjoon Hyun <dhyun@apple.com>
2019-10-23 18:17:37 -07:00
Xianyang Liu 0a7095156b [SPARK-29499][CORE][PYSPARK] Add mapPartitionsWithIndex for RDDBarrier
### What changes were proposed in this pull request?

Add mapPartitionsWithIndex for RDDBarrier.

### Why are the changes needed?

There is only one method in `RDDBarrier`. We often use the partition index as a label for the current partition. We need to get the index from `TaskContext`  index in the method of `mapPartitions` which is not convenient.
### Does this PR introduce any user-facing change?

No

### How was this patch tested?

New UT.

Closes #26148 from ConeyLiu/barrier-index.

Authored-by: Xianyang Liu <xianyang.liu@intel.com>
Signed-off-by: Xingbo Jiang <xingbo.jiang@databricks.com>
2019-10-23 13:46:09 +02:00
Sean Owen 80094688fd [SPARK-29556][CORE] Avoid putting request path in error response in ErrorServlet
### What changes were proposed in this pull request?

Don't include `$path` from user query in the error response.

### Why are the changes needed?

The path could contain input that is then rendered as HTML in the error response. It's not clear whether it's exploitable, but better safe than sorry as the path info really isn't that important in this context.

### Does this PR introduce any user-facing change?

No

### How was this patch tested?

Existing tests.

Closes #26211 from srowen/SPARK-29556.

Authored-by: Sean Owen <sean.owen@databricks.com>
Signed-off-by: Dongjoon Hyun <dhyun@apple.com>
2019-10-22 14:14:59 -07:00
chenjuanni 2036a8cca7 [SPARK-29488][WEBUI] In Web UI, stage page has js error when sort table
### What changes were proposed in this pull request?
In Web UI, stage page has js error when sort table.
https://issues.apache.org/jira/browse/SPARK-29488

### Why are the changes needed?
In Web UI, follow the steps below, get js error "Uncaught TypeError: Failed to execute 'removeChild' on 'Node': parameter 1 is not of type 'Node'.".
 1) Click "Summary Metrics..." 's tablehead "Min"
 2) Click "Aggregated Metrics by Executor" 's tablehead "Task Time"
 3) Click "Summary Metrics..." 's tablehead "Min"(the same as step 1.)

### Does this PR introduce any user-facing change?
No.

### How was this patch tested?
In Web UI, follow the steps below, no error occur.
 1) Click "Summary Metrics..." 's tablehead "Min"
 2) Click "Aggregated Metrics by Executor" 's tablehead "Task Time"
 3) Click "Summary Metrics..." 's tablehead "Min"(the same as step 1.)
![image](https://user-images.githubusercontent.com/7802338/66899878-464b1b80-f02e-11e9-9660-6cdaab283491.png)

Closes #26136 from cjn082030/SPARK-1.

Authored-by: chenjuanni <chenjuanni@inspur.com>
Signed-off-by: Sean Owen <sean.owen@databricks.com>
2019-10-22 08:58:12 -05:00
DB Tsai f4d5aa4213 [SPARK-29434][CORE] Improve the MapStatuses Serialization Performance
### What changes were proposed in this pull request?
Instead of using GZIP for compressing the serialized `MapStatuses`, ZStd provides better compression rate and faster compression time.

The original approach is serializing and writing data directly into `GZIPOutputStream` as one step; however, the compression time is faster if a bigger chuck of the data is processed by the codec at once. As a result, in this PR, the serialized data is written into an uncompressed byte array first, and then the data is compressed. For smaller `MapStatues`, we find it's 2x faster.

Here is the benchmark result.

#### 20k map outputs, and each has 500 blocks
1. ZStd two steps in this PR: 0.402 ops/ms, 89,066 bytes
2. ZStd one step as the original approach: 0.370 ops/ms, 89,069 bytes
3. GZip: 0.092 ops/ms, 217,345 bytes

#### 20k map outputs, and each has 5 blocks
1. ZStd two steps in this PR: 0.9 ops/ms, 75,449 bytes
2. ZStd one step as the original approach: 0.38 ops/ms, 75,452 bytes
3. GZip: 0.21 ops/ms, 160,094 bytes

### Why are the changes needed?
Decrease the time for serializing the `MapStatuses` in large scale job.

### Does this PR introduce any user-facing change?
No.

### How was this patch tested?
Existing tests.

Closes #26085 from dbtsai/mapStatus.

Lead-authored-by: DB Tsai <d_tsai@apple.com>
Co-authored-by: Dongjoon Hyun <dhyun@apple.com>
Signed-off-by: Dongjoon Hyun <dhyun@apple.com>
2019-10-20 13:56:23 -07:00
Dongjoon Hyun e4b4a35de2 [SPARK-29466][WEBUI] Show Duration for running drivers in Standalone master web UI
### What changes were proposed in this pull request?

This PR aims to add a new column `Duration` for running drivers in Apache Spark `Standalone` master web UI in order to improve UX. This help users like the other `Duration` columns in the `Running` and `Completed` application tables.

### Why are the changes needed?

When we use `--supervise`, the drivers can survive longer.
Technically, the `Duration` column is not the same. (Please see the image below.)

### Does this PR introduce any user-facing change?

Yes. The red box is added newly.

<img width="1312" alt="Screen Shot 2019-10-14 at 12 53 43 PM" src="https://user-images.githubusercontent.com/9700541/66779127-50301b80-ee82-11e9-853f-72222cd011ac.png">

### How was this patch tested?

Manual since this is a UI column. After starting standalone cluster and jobs, kill the `DriverWrapper` and see the UI.

```
$ sbin/start-master.sh
$ sbin/start-slave.sh spark://$(hostname):7077
$ bin/spark-submit --master spark://(hostname):7077 --deploy-mode cluster --supervise --class org.apache.spark.examples.JavaSparkPi examples/target/scala-2.12/jars/spark-examples_2.12-3.0.0-SNAPSHOT.jar 1000
$ jps
41521 DriverWrapper
...
$ kill -9 41521   // kill the `DriverWrapper`.
```

Closes #26113 from dongjoon-hyun/SPARK-29466.

Authored-by: Dongjoon Hyun <dhyun@apple.com>
Signed-off-by: Dongjoon Hyun <dhyun@apple.com>
2019-10-18 15:39:44 -07:00
DB Tsai 23f45f1822 [SPARK-29515][CORE] MapStatuses SerDeser Benchmark
<!--
Thanks for sending a pull request!  Here are some tips for you:
  1. If this is your first time, please read our contributor guidelines: https://spark.apache.org/contributing.html
  2. Ensure you have added or run the appropriate tests for your PR: https://spark.apache.org/developer-tools.html
  3. If the PR is unfinished, add '[WIP]' in your PR title, e.g., '[WIP][SPARK-XXXX] Your PR title ...'.
  4. Be sure to keep the PR description updated to reflect all changes.
  5. Please write your PR title to summarize what this PR proposes.
  6. If possible, provide a concise example to reproduce the issue for a faster review.
-->

### What changes were proposed in this pull request?
Add benchmark code for MapStatuses serialization & deserialization performance.

### Why are the changes needed?
For comparing the performance differences against optimization.

### Does this PR introduce any user-facing change?
<!--
If yes, please clarify the previous behavior and the change this PR proposes - provide the console output, description and/or an example to show the behavior difference if possible.
If no, write 'No'.
-->
No

### How was this patch tested?
<!--
If tests were added, say they were added here. Please make sure to add some test cases that check the changes thoroughly including negative and positive cases if possible.
If it was tested in a way different from regular unit tests, please clarify how you tested step by step, ideally copy and paste-able, so that other reviewers can test and check, and descendants can verify in the future.
If tests were not added, please describe why they were not added and/or why it was difficult to add.
-->
No test is required.

Closes #26169 from dbtsai/benchmark.

Lead-authored-by: DB Tsai <d_tsai@apple.com>
Co-authored-by: Dongjoon Hyun <dhyun@apple.com>
Co-authored-by: DB Tsai <dbtsai@dbtsai.com>
Signed-off-by: DB Tsai <d_tsai@apple.com>
2019-10-18 21:30:36 +00:00
Yuanjian Li 8616109061 [SPARK-9853][CORE][FOLLOW-UP] Regularize all the shuffle configurations related to adaptive execution
### What changes were proposed in this pull request?
1. Regularize all the shuffle configurations related to adaptive execution.
2. Add default value for `BlockStoreShuffleReader.shouldBatchFetch`.

### Why are the changes needed?
It's a follow-up PR for #26040.
Regularize the existing `spark.sql.adaptive.shuffle` namespace in SQLConf.

### Does this PR introduce any user-facing change?
Rename one released user config `spark.sql.adaptive.minNumPostShufflePartitions` to `spark.sql.adaptive.shuffle.minNumPostShufflePartitions`, other changed configs is not released yet.

### How was this patch tested?
Existing UT.

Closes #26147 from xuanyuanking/SPARK-9853.

Authored-by: Yuanjian Li <xyliyuanjian@gmail.com>
Signed-off-by: Wenchen Fan <wenchen@databricks.com>
2019-10-18 15:39:35 +08:00
Ivan Gozali 00347a3c78 [SPARK-28762][CORE] Read JAR main class if JAR is not located in local file system
### What changes were proposed in this pull request?

JIRA: https://issues.apache.org/jira/browse/SPARK-28762

TL;DR: Automatically read the `Main-Class` from a JAR's manifest even if the JAR isn't in the local file system (i.e. in S3 or HDFS).

### Why are the changes needed?

When deploying a fat JAR (e.g. using `sbt-assembly`) to S3/HDFS, users might choose to include the main class for the JAR in its manifest. This change allows the user to `spark-submit` the JAR without having to specify the main class again via the `--class` argument.

### Does this PR introduce any user-facing change?

Yes. Previously, if the primary resource is a JAR and isn't located in the local file system, it will fail with the error:
```
$ spark-submit s3a://nonexistent.jar
Exception in thread "main" org.apache.spark.SparkException: Cannot load main class from JAR s3a://nonexistent.jar with URI s3a. Please specify a class through --class.
    ...
```

With this PR, the main class will be read from the manifest, assuming the classpath contains the appropriate JAR to read the file system.

### How was this patch tested?

Added some tests in `core/src/test/scala/org/apache/spark/deploy/SparkSubmitSuite.scala`.

Closes #25910 from igozali/SPARK-28762.

Authored-by: Ivan Gozali <gozaliivan@gmail.com>
Signed-off-by: Marcelo Vanzin <vanzin@cloudera.com>
2019-10-17 14:36:01 -07:00
Jungtaek Lim (HeartSaVioR) 100fc58da5 [SPARK-28869][CORE] Roll over event log files
### What changes were proposed in this pull request?

This patch is a part of [SPARK-28594](https://issues.apache.org/jira/browse/SPARK-28594) and design doc for SPARK-28594 is linked here: https://docs.google.com/document/d/12bdCC4nA58uveRxpeo8k7kGOI2NRTXmXyBOweSi4YcY/edit?usp=sharing

This patch proposes adding new feature to event logging, rolling event log files via configured file size.

Previously event logging is done with single file and related codebase (`EventLoggingListener`/`FsHistoryProvider`) is tightly coupled with it. This patch adds layer on both reader (`EventLogFileReader`) and writer (`EventLogFileWriter`) to decouple implementation details between "handling events" and "how to read/write events from/to file".

This patch adds two properties, `spark.eventLog.rollLog` and `spark.eventLog.rollLog.maxFileSize` which provides configurable behavior of rolling log. The feature is disabled by default, as we only expect huge event log for huge/long-running application. For other cases single event log file would be sufficient and still simpler.

### Why are the changes needed?

This is a part of SPARK-28594 which addresses event log growing infinitely for long-running application.

This patch itself also provides some option for the situation where event log file gets huge and consume their storage. End users may give up replaying their events and want to delete the event log file, but given application is still running and writing the file, it's not safe to delete the file. End users will be able to delete some of old files after applying rolling over event log.

### Does this PR introduce any user-facing change?

No, as the new feature is turned off by default.

### How was this patch tested?

Added unit tests, as well as basic manual tests.

Basic manual tests - ran SHS, ran structured streaming query with roll event log enabled, verified split files are generated as well as SHS can load these files, with handling app status as incomplete/complete.

Closes #25670 from HeartSaVioR/SPARK-28869.

Lead-authored-by: Jungtaek Lim (HeartSaVioR) <kabhwan.opensource@gmail.com>
Co-authored-by: Jungtaek Lim (HeartSaVioR) <kabhwan@gmail.com>
Signed-off-by: Marcelo Vanzin <vanzin@cloudera.com>
2019-10-17 11:15:25 -07:00
Marcelo Vanzin 2f0a38cb50 [SPARK-29398][CORE] Support dedicated thread pools for RPC endpoints
The current RPC backend in Spark supports single- and multi-threaded
message delivery to endpoints, but they all share the same underlying
thread pool. So an RPC endpoint that blocks a dispatcher thread can
negatively affect other endpoints.

This can be more pronounced with configurations that limit the number
of RPC dispatch threads based on configuration and / or running
environment. And exposing the RPC layer to other code (for example
with something like SPARK-29396) could make it easy to affect normal
Spark operation with a badly written RPC handler.

This change adds a new RPC endpoint type that tells the RPC env to
create dedicated dispatch threads, so that those effects are minimised.
Other endpoints will still need CPU to process their messages, but
they won't be able to actively block the dispatch thread of these
isolated endpoints.

As part of the change, I've changed the most important Spark endpoints
(the driver, executor and block manager endpoints) to be isolated from
others. This means a couple of extra threads are created on the driver
and executor for these endpoints.

Tested with existing unit tests, which hammer the RPC system extensively,
and also by running applications on a cluster (with a prototype of
SPARK-29396).

Closes #26059 from vanzin/SPARK-29398.

Authored-by: Marcelo Vanzin <vanzin@cloudera.com>
Signed-off-by: Imran Rashid <irashid@cloudera.com>
2019-10-17 13:14:32 -05:00
Yuanjian Li 239ee3f561 [SPARK-9853][CORE] Optimize shuffle fetch of continuous partition IDs
This PR takes over #19788. After we split the shuffle fetch protocol from `OpenBlock` in #24565, this optimization can be extended in the new shuffle protocol. Credit to yucai, closes #19788.

### What changes were proposed in this pull request?
This PR adds the support for continuous shuffle block fetching in batch:

- Shuffle client changes:
    - Add new feature tag `spark.shuffle.fetchContinuousBlocksInBatch`, implement the decision logic in `BlockStoreShuffleReader`.
    - Merge the continuous shuffle block ids in batch if needed in ShuffleBlockFetcherIterator.
- Shuffle server changes:
    - Add support in `ExternalBlockHandler` for the external shuffle service side.
    - Make `ShuffleBlockResolver.getBlockData` accept getting block data by range.
- Protocol changes:
    - Add new block id type `ShuffleBlockBatchId` represent continuous shuffle block ids.
    - Extend `FetchShuffleBlocks` and `OneForOneBlockFetcher`.
    - After the new shuffle fetch protocol completed in #24565, the backward compatibility for external shuffle service can be controlled by `spark.shuffle.useOldFetchProtocol`.

### Why are the changes needed?
In adaptive execution, one reducer may fetch multiple continuous shuffle blocks from one map output file. However, as the original approach, each reducer needs to fetch those 10 reducer blocks one by one. This way needs many IO and impacts performance. This PR is to support fetching those continuous shuffle blocks in one IO (batch way). See below example:

The shuffle block is stored like below:
![image](https://user-images.githubusercontent.com/2989575/51654634-c37fbd80-1fd3-11e9-935e-5652863676c3.png)
The ShuffleId format is s"shuffle_$shuffleId_$mapId_$reduceId", referring to BlockId.scala.

In adaptive execution, one reducer may want to read output for reducer 5 to 14, whose block Ids are from shuffle_0_x_5 to shuffle_0_x_14.
Before this PR, Spark needs 10 disk IOs + 10 network IOs for each output file.
After this PR, Spark only needs 1 disk IO and 1 network IO. This way can reduce IO dramatically.

### Does this PR introduce any user-facing change?
No.

### How was this patch tested?
Add new UT.
Integrate test with setting `spark.sql.adaptive.enabled=true`.

Closes #26040 from xuanyuanking/SPARK-9853.

Lead-authored-by: Yuanjian Li <xyliyuanjian@gmail.com>
Co-authored-by: yucai <yyu1@ebay.com>
Signed-off-by: Wenchen Fan <wenchen@databricks.com>
2019-10-17 14:47:56 +08:00
prasha2 57edb42582 [SPARK-27259][CORE] Allow setting -1 as length for FileBlock
### What changes were proposed in this pull request?

This PR aims to update the validation check on `length` from `length >= 0` to `length >= -1` in order to allow set `-1` to keep the default value.

### Why are the changes needed?

At Apache Spark 2.2.0, [SPARK-18702](https://github.com/apache/spark/pull/16133/files#diff-2c5519b1cf4308d77d6f12212971544fR27-R38) adds `class FileBlock` with the default `length` value, `-1`, initially.

There is no way to set `filePath` only while keeping `length` is `-1`.
```scala
  def set(filePath: String, startOffset: Long, length: Long): Unit = {
     require(filePath != null, "filePath cannot be null")
     require(startOffset >= 0, s"startOffset ($startOffset) cannot be negative")
     require(length >= 0, s"length ($length) cannot be negative")
     inputBlock.set(new FileBlock(UTF8String.fromString(filePath), startOffset, length))
   }
```

For compressed files (like GZ), the size of split can be set to -1. This was allowed till Spark 2.1 but regressed starting with spark 2.2.x. Please note that split length of -1 also means the length was unknown - a valid scenario. Thus, split length of -1 should be acceptable like pre Spark 2.2.

### Does this PR introduce any user-facing change?

No

### How was this patch tested?

This is updating the corner case on the requirement check. Manually check the code.

Closes #26123 from praneetsharma/fix-SPARK-27259.

Authored-by: prasha2 <prasha22@mail.com>
Signed-off-by: Dongjoon Hyun <dhyun@apple.com>
2019-10-15 22:22:37 -07:00
Wenchen Fan 51f10ed90f [SPARK-28560][SQL][FOLLOWUP] code cleanup for local shuffle reader
### What changes were proposed in this pull request?

A followup of https://github.com/apache/spark/pull/25295

This PR proposes a few code cleanups:
1. rename the special `getMapSizesByExecutorId` to `getMapSizesByMapIndex`
2. rename the parameter `mapId` to `mapIndex` as that's really a mapper index.
3. `BlockStoreShuffleReader` should take `blocksByAddress` directly instead of a map id.
4. rename `getMapReader` to `getReaderForOneMapper` to be more clearer.

### Why are the changes needed?

make code easier to understand

### Does this PR introduce any user-facing change?

no

### How was this patch tested?

existing tests

Closes #26128 from cloud-fan/followup.

Authored-by: Wenchen Fan <wenchen@databricks.com>
Signed-off-by: Wenchen Fan <wenchen@databricks.com>
2019-10-16 11:19:16 +08:00
Dongjoon Hyun 39d53d3e74 [SPARK-29470][BUILD] Update plugins to latest versions
### What changes were proposed in this pull request?

This PR updates plugins to latest versions.

### Why are the changes needed?

This brings bug fixes like the following.
- https://issues.apache.org/jira/projects/MCOMPILER/versions/12343484 (maven-compiler-plugin)
- https://issues.apache.org/jira/projects/MJAVADOC/versions/12345060 (maven-javadoc-plugin)
- https://issues.apache.org/jira/projects/MCHECKSTYLE/versions/12342397 (maven-checkstyle-plugin)
- https://checkstyle.sourceforge.io/releasenotes.html#Release_8.25 (checkstyle)
- https://checkstyle.sourceforge.io/releasenotes.html#Release_8.24 (checkstyle)

### Does this PR introduce any user-facing change?

No.

### How was this patch tested?

Pass the Jenkins building and testing with the existing code.

Closes #26117 from dongjoon-hyun/SPARK-29470.

Authored-by: Dongjoon Hyun <dhyun@apple.com>
Signed-off-by: Dongjoon Hyun <dhyun@apple.com>
2019-10-15 11:55:52 -07:00
Liang-Chi Hsieh 4ecbdbb6a7 [SPARK-29182][CORE] Cache preferred locations of checkpointed RDD
### What changes were proposed in this pull request?

This proposes to add a Spark config to control the caching behavior of ReliableCheckpointRDD.getPreferredLocations. If it is enabled, getPreferredLocations will only compute preferred locations once and cache it for later usage.

The drawback of caching the preferred locations is that when the cached locations are outdated, and lose data locality. It was documented in config document. To mitigate this, this patch also adds a config to set up expire time (default is 60 mins) for the cache. If time expires, the cache will be invalid and it needs to query updated location info.

This adds a test case. Looks like the most suitable test suite is CheckpointCompressionSuite. So this renames CheckpointCompressionSuite to CheckpointStorageSuite and put the test case into.

### Why are the changes needed?

One Spark job in our cluster fits many ALS models in parallel. The fitting goes well, but in next when we union all factors, the union operation is very slow.

By looking into the driver stack dump, looks like the driver spends a lot of time on computing preferred locations. As we checkpoint training data before fitting ALS, the time is spent on ReliableCheckpointRDD.getPreferredLocations. In this method, it will call DFS interface to query file status and block locations. As we have big number of partitions derived from the checkpointed RDD,  the union will spend a lot of time on querying the same information.

It reduces the time on huge union from few hours to dozens of minutes.

This issue is not limited to ALS so this change is not specified to ALS. Actually it is common usage to checkpoint data in Spark, to increase reliability and cut RDD linage. Spark operations on the checkpointed data, will be beneficial.

### Does this PR introduce any user-facing change?

Yes. This adds a Spark config users can use to control the cache behavior of preferred locations of checkpointed RDD.

### How was this patch tested?

Unit test added and manual test on development cluster.

Closes #25856 from viirya/cache-checkpoint-preferredloc.

Authored-by: Liang-Chi Hsieh <liangchi@uber.com>
Signed-off-by: Liang-Chi Hsieh <liangchi@uber.com>
2019-10-15 10:45:18 -07:00
Yifei Huang 2e28622d8a [SPARK-28211][CORE][SHUFFLE] Propose Shuffle Driver Components API
### What changes were proposed in this pull request?

This is the next step of the Spark-25299 work of proposing a new Shuffle storage API. This patch includes the components of the plugin that hook into the driver, including driver shuffle initialization, application cleanup, and shuffle cleanup.

### How was this patch tested?
Existing unit tests, plus an additional test for testing the interactions between the driver and executor initialization.

Closes #25823 from yifeih/yh/upstream/driver-lifecycle.

Lead-authored-by: Yifei Huang <yifeih@palantir.com>
Co-authored-by: mccheah <mcheah@palantir.com>
Signed-off-by: Imran Rashid <irashid@cloudera.com>
2019-10-15 12:26:49 -05:00
jiake 9ac4b2dbc5 [SPARK-28560][SQL] Optimize shuffle reader to local shuffle reader when smj converted to bhj in adaptive execution
## What changes were proposed in this pull request?
Implement a rule in the new adaptive execution framework introduced in [SPARK-23128](https://issues.apache.org/jira/browse/SPARK-23128). This rule is used to optimize the shuffle reader to local shuffle reader when smj is converted to bhj in adaptive execution.

## How was this patch tested?
Existing tests

Closes #25295 from JkSelf/localShuffleOptimization.

Authored-by: jiake <ke.a.jia@intel.com>
Signed-off-by: Wenchen Fan <wenchen@databricks.com>
2019-10-15 21:51:15 +08:00
Marcelo Vanzin 857f109c47 [SPARK-10614][CORE] Add monotonic time to Clock interface
This change adds a new method to the Clock interface that returns
the time from a monotonic time source, so that code that needs that
feature can also mock the Clock in tests.

The original getTimeMillis and waitTillTime methods are unchanged, since
streaming code that uses the Clock interface seems to rely on wall clock
semantics, not monotonic clock. So, in a way, this doesn't directly
address the problem raised in the bug, that waitTillTime can be affected
by drift, but then the places being modified to use the new API don't
really rely on that API.

The dynamic allocation code was modified to use the new time source,
since they shouldn't be basing their decisions on wall clock time.

For a longer discussion on how monotonic clocks work on Linux/x64, the
following blog post (and links within) shed a lot of light on the safety of
`System.nanoTime()`:

http://btorpey.github.io/blog/2014/02/18/clock-sources-in-linux/

Tested with unit test and also running apps with dynamic allocation on.

Closes #26058 from vanzin/SPARK-10614.

Authored-by: Marcelo Vanzin <vanzin@cloudera.com>
Signed-off-by: Dongjoon Hyun <dhyun@apple.com>
2019-10-14 19:38:04 -07:00
Thomas Graves a42d894a40 [SPARK-29417][CORE] Resource Scheduling - add TaskContext.resource java api
### What changes were proposed in this pull request?
We added a TaskContext.resources() api, but I realized this is returning a scala Map which is not ideal for access from Java.  Here I add a resourcesJMap function which returns a java.util.Map to make it easily accessible from Java.

### Why are the changes needed?
Java API access

### Does this PR introduce any user-facing change?
<!--
If yes, please clarify the previous behavior and the change this PR proposes - provide the console output, description and/or an example to show the behavior difference if possible.
If no, write 'No'.
-->
Yes, new TaskContext function to access from Java

### How was this patch tested?
<!--
If tests were added, say they were added here. Please make sure to add some test cases that check the changes thoroughly including negative and positive cases if possible.
If it was tested in a way different from regular unit tests, please clarify how you tested step by step, ideally copy and paste-able, so that other reviewers can test and check, and descendants can verify in the future.
If tests were not added, please describe why they were not added and/or why it was difficult to add.
-->
new unit test

Closes #26083 from tgravescs/SPARK-29417.

Lead-authored-by: Thomas Graves <tgraves@ngvpn01-168-221.dyn.scz.us.nvidia.com>
Co-authored-by: Thomas Graves <tgraves@TGRAVES-MLT.local>
Co-authored-by: Thomas Graves <tgraves@nvidia.com>
Signed-off-by: Xiangrui Meng <meng@databricks.com>
2019-10-14 13:27:34 -07:00
sandeep katta ba04562234 [SPARK-29435][CORE] MapId in Shuffle Block is inconsistent at the writer and reader part when spark.shuffle.useOldFetchProtocol=true
### What changes were proposed in this pull request?
Shuffle Block Construction during Shuffle Write and Read is wrong

Shuffle Map Task  (Shuffle Write)
19/10/11 22:07:32| ERROR| [Executor task launch worker for task 3] org.apache.spark.shuffle.IndexShuffleBlockResolver: ####### For Debug ############ /tmp/hadoop-root1/nm-local-dir/usercache/root1/appcache/application_1570422377362_0008/blockmgr-6d03250d-6e7c-4bc2-bbb7-22b8e3981c35/0d/**shuffle_0_3_0.index**

Result Task (Shuffle Read)
19/10/11 22:07:32| ERROR| [Executor task launch worker for task 6] org.apache.spark.storage.ShuffleBlockFetcherIterator: Error occurred while fetching local blocks
java.nio.file.NoSuchFileException: /tmp/hadoop-root1/nm-local-dir/usercache/root1/appcache/application_1570422377362_0008/blockmgr-6d03250d-6e7c-4bc2-bbb7-22b8e3981c35/30/**shuffle_0_0_0.index**
	at sun.nio.fs.UnixException.translateToIOException(UnixException.java:86)

As per [SPARK-25341](https://issues.apache.org/jira/browse/SPARK-25341) `mapId` of `SortShuffleManager.getWriter `changed to `context.taskAttemptId() ` from `partitionId`

[code]( https://github.com/apache/spark/pull/25620/files#diff-363c53ca5a72cfdc37dac4a723309638R54)

But `MapOutputTracker.convertMapStatuses` returns the wrong ShuffleBlock, if `spark.shuffle.useOldFetchProtocol `enabled, it returns `paritionId` as `mapID` which is wrong . [Code](https://github.com/apache/spark/blob/master/core/src/main/scala/org/apache/spark/MapOutputTracker.scala#L912)

### Why are the changes needed?
Already MapStatus returned by the ShuffleWriter has the mapId for e.g.[ code here](https://github.com/apache/spark/blob/master/core/src/main/java/org/apache/spark/shuffle/sort/BypassMergeSortShuffleWriter.java#L134). So it's nice to use `status.mapTaskId`

### Does this PR introduce any user-facing change?
No

### How was this patch tested?
Existing UT  and manually tested with `spark.shuffle.useOldFetchProtocol` as true and false

![image](https://user-images.githubusercontent.com/35216143/66716530-4f4caa80-edec-11e9-833d-7131a9fbd442.png)

Closes #26095 from sandeep-katta/shuffleIssue.

Lead-authored-by: sandeep katta <sandeep.katta2007@gmail.com>
Co-authored-by: Yuanjian Li <xyliyuanjian@gmail.com>
Signed-off-by: Wenchen Fan <wenchen@databricks.com>
2019-10-15 00:03:13 +08:00
Pablo 782a94d289 [SPARK-29433][WEBUI] Fix tooltip stages table
### What changes were proposed in this pull request?
In the Web UI, Stages table, the tool tip of Input and output column are not corrrect.

Actual tooltip messages:

Bytes and records read from Hadoop or from Spark storage.
Bytes and records written to Hadoop.
In this column we are only showing bytes, not records

![image](https://user-images.githubusercontent.com/12819544/66608286-85a0e480-ebb6-11e9-812a-9760bea53664.png)
![image](https://user-images.githubusercontent.com/12819544/66608323-96515a80-ebb6-11e9-9e5f-e3f2cc99a3b3.png)
![image](https://user-images.githubusercontent.com/12819544/66608450-de707d00-ebb6-11e9-84e3-0917b5cfe6f6.png)
![image](https://user-images.githubusercontent.com/12819544/66608468-eaf4d580-ebb6-11e9-8c5b-2a9a290bea9c.png)

### Why are the changes needed?
Simple correction of a tooltip

### Does this PR introduce any user-facing change?
Yes, tooltip correction

### How was this patch tested?
Manual testing

Closes #26084 from planga82/feature/SPARK-29433_Tooltip_correction.

Lead-authored-by: Pablo <soypab@gmail.com>
Co-authored-by: Unknown <soypab@gmail.com>
Signed-off-by: Sean Owen <sean.owen@databricks.com>
2019-10-12 10:34:29 -05:00
liucht e94abd7f16 [SPARK-29323][WEBUI] Add tooltip for The Executors Tab's column names in the Spark history server Page
### What changes were proposed in this pull request?

This PR is Adding tooltip for The Executors Tab's column names include RDD Blocks, Disk Used,Cores, Activity Tasks, Failed Tasks , Complete Tasks, Total Tasks in the history server Page.
https://issues.apache.org/jira/browse/SPARK-29323

![image](https://user-images.githubusercontent.com/28332082/66017759-b6c24a80-e50e-11e9-807b-5b076f701d2f.png)

I have modify the following code in executorspage-template.html
Before:
<th>RDD Blocks</th>
<th>Disk Used</th>
<th>Cores</th>
<th>Active Tasks</th>
<th>Failed Tasks</th>
<th>Complete Tasks</th>
<th>Total Tasks</th>

![image](https://user-images.githubusercontent.com/28332082/66018111-4ddbd200-e510-11e9-9cfc-19f3eae81e76.png)

After:

<th><span data-toggle="tooltip" data-placement="top" title="RDD Blocks">RDD Blocks</span></th>
<th><span data-toggle="tooltip" data-placement="top" title="Disk Used">Disk Used</span></th>
<th><span data-toggle="tooltip" data-placement="top" title="Cores">Cores</span></th>
<th><span data-toggle="tooltip" data-placement="top" title="Active Tasks">Active Tasks</span></th>
<th><span data-toggle="tooltip" data-placement="top" title="Failed Tasks">Failed Tasks</span></th>
<th><span data-toggle="tooltip" data-placement="top" title="Complete Tasks">Complete Tasks</span></th>
<th><span data-toggle="tooltip" data-placement="top" title="Total Tasks">Total Tasks</span></th>

![image](https://user-images.githubusercontent.com/28332082/66018157-79f75300-e510-11e9-96ba-6230aa0940c7.png)

### Why are the changes needed?

the spark Executors of history Tab page, the Summary part shows the line in the list of title, but format is irregular.
Some column names have tooltip, such as Storage Memory, Task Time(GC Time), Input, Shuffle Read,
Shuffle Write and Blacklisted, but there are still some list names that have not tooltip. They are RDD Blocks, Disk Used,Cores, Activity Tasks, Failed Tasks , Complete Tasks and Total Tasks. oddly, Executors section below,All the column names Contains the column names above have tooltip .
It's important for open source projects to have consistent style and user-friendly UI, and I'm working on keeping it consistent And more user-friendly.

### Does this PR introduce any user-facing change?

No.

### How was this patch tested?

Manual tests for Chrome, Firefox and Safari

Authored-by: liucht-inspur <liuchtinspur.com>
Signed-off-by: liucht-inspur <liuchtinspur.com>

Closes #25994 from liucht-inspur/master.

Authored-by: liucht <liucht@inspur.com>
Signed-off-by: Sean Owen <sean.owen@databricks.com>
2019-10-12 09:40:31 -05:00
Dongjoon Hyun e946104c42 [SPARK-29400][CORE] Improve PrometheusResource to use labels
### What changes were proposed in this pull request?

[SPARK-29064](https://github.com/apache/spark/pull/25770) introduced `PrometheusResource` to expose `ExecutorSummary`. This PR aims to improve it further more `Prometheus`-friendly to use [Prometheus labels](https://prometheus.io/docs/concepts/data_model/#metric-names-and-labels).

### Why are the changes needed?

**BEFORE**
```
metrics_app_20191008151432_0000_driver_executor_rddBlocks_Count 0
metrics_app_20191008151432_0000_driver_executor_memoryUsed_Count 0
metrics_app_20191008151432_0000_driver_executor_diskUsed_Count 0
```

**AFTER**
```
$ curl -s http://localhost:4040/metrics/executors/prometheus/ | head -n3
metrics_executor_rddBlocks_Count{application_id="app-20191008151625-0000", application_name="Spark shell", executor_id="driver"} 0
metrics_executor_memoryUsed_Count{application_id="app-20191008151625-0000", application_name="Spark shell", executor_id="driver"} 0
metrics_executor_diskUsed_Count{application_id="app-20191008151625-0000", application_name="Spark shell", executor_id="driver"} 0
```

### Does this PR introduce any user-facing change?

No, but `Prometheus` understands the new format and shows more intelligently.

<img width="735" alt="ui" src="https://user-images.githubusercontent.com/9700541/66438279-1756f900-e9e1-11e9-91c7-c04c6ce9172f.png">

### How was this patch tested?

Manually.

**SETUP**
```
$ sbin/start-master.sh
$ sbin/start-slave.sh spark://`hostname`:7077
$ bin/spark-shell --master spark://`hostname`:7077 --conf spark.ui.prometheus.enabled=true
```

**RESULT**
```
$ curl -s http://localhost:4040/metrics/executors/prometheus/ | head -n3
metrics_executor_rddBlocks_Count{application_id="app-20191008151625-0000", application_name="Spark shell", executor_id="driver"} 0
metrics_executor_memoryUsed_Count{application_id="app-20191008151625-0000", application_name="Spark shell", executor_id="driver"} 0
metrics_executor_diskUsed_Count{application_id="app-20191008151625-0000", application_name="Spark shell", executor_id="driver"} 0
```

Closes #26060 from dongjoon-hyun/SPARK-29400.

Authored-by: Dongjoon Hyun <dhyun@apple.com>
Signed-off-by: Dongjoon Hyun <dhyun@apple.com>
2019-10-10 08:47:12 -07:00
Sean Owen cc7493fa21 [SPARK-29416][CORE][ML][SQL][MESOS][TESTS] Use .sameElements to compare arrays, instead of .deep (gone in 2.13)
### What changes were proposed in this pull request?

Use `.sameElements` to compare (non-nested) arrays, as `Arrays.deep` is removed in 2.13 and wasn't the best way to do this in the first place.

### Why are the changes needed?

To compile with 2.13.

### Does this PR introduce any user-facing change?

None.

### How was this patch tested?

Existing tests.

Closes #26073 from srowen/SPARK-29416.

Authored-by: Sean Owen <sean.owen@databricks.com>
Signed-off-by: Dongjoon Hyun <dhyun@apple.com>
2019-10-09 17:00:48 -07:00
Sean Owen 4d93fb7dfa [SPARK-29413][CORE] Rewrite ThreadUtils.parmap to avoid TraversableLike, gone in 2.13
### What changes were proposed in this pull request?

Rewrite declaration of internal `ThreadUtils.parmap` method to avoid `TraversableLike`, which is removed in Scala 2.13.

### Why are the changes needed?

To compile in Scala 2.13.

### Does this PR introduce any user-facing change?

None.

### How was this patch tested?

Existing tests.

Closes #26072 from srowen/SPARK-29413.

Authored-by: Sean Owen <sean.owen@databricks.com>
Signed-off-by: Dongjoon Hyun <dhyun@apple.com>
2019-10-09 10:53:51 -07:00
Sean Owen 3b0bca42ac [SPARK-29401][FOLLOWUP] Additional cases where a .parallelize call with Array is ambiguous in 2.13
This is just a followup on https://github.com/apache/spark/pull/26062 -- see it for more detail.

I think we will eventually find more cases of this. It's hard to get them all at once as there are many different types of compile errors in earlier modules. I'm trying to address them in as a big a chunk as possible.

Closes #26074 from srowen/SPARK-29401.2.

Authored-by: Sean Owen <sean.owen@databricks.com>
Signed-off-by: Dongjoon Hyun <dhyun@apple.com>
2019-10-09 10:27:05 -07:00
Sean Owen fa95a5c395 [SPARK-29411][CORE][ML][SQL][DSTREAM] Replace use of Unit object with () for Scala 2.13
### What changes were proposed in this pull request?

Replace `Unit` with equivalent `()` where code refers to the `Unit` companion object.

### Why are the changes needed?

It doesn't compile otherwise in Scala 2.13.
- https://github.com/scala/scala/blob/v2.13.0/src/library/scala/Unit.scala#L30

### Does this PR introduce any user-facing change?

Should be no behavior change at all.

### How was this patch tested?

Existing tests.

Closes #26070 from srowen/SPARK-29411.

Authored-by: Sean Owen <sean.owen@databricks.com>
Signed-off-by: Dongjoon Hyun <dhyun@apple.com>
2019-10-09 10:24:13 -07:00
Sean Owen ee83d09b53 [SPARK-29401][CORE][ML][SQL][GRAPHX][TESTS] Replace calls to .parallelize Arrays of tuples, ambiguous in Scala 2.13, with Seqs of tuples
### What changes were proposed in this pull request?

Invocations like `sc.parallelize(Array((1,2)))` cause a compile error in 2.13, like:
```
[ERROR] [Error] /Users/seanowen/Documents/spark_2.13/core/src/test/scala/org/apache/spark/ShuffleSuite.scala:47: overloaded method value apply with alternatives:
  (x: Unit,xs: Unit*)Array[Unit] <and>
  (x: Double,xs: Double*)Array[Double] <and>
  (x: Float,xs: Float*)Array[Float] <and>
  (x: Long,xs: Long*)Array[Long] <and>
  (x: Int,xs: Int*)Array[Int] <and>
  (x: Char,xs: Char*)Array[Char] <and>
  (x: Short,xs: Short*)Array[Short] <and>
  (x: Byte,xs: Byte*)Array[Byte] <and>
  (x: Boolean,xs: Boolean*)Array[Boolean]
 cannot be applied to ((Int, Int), (Int, Int), (Int, Int), (Int, Int))
```
Using a `Seq` instead appears to resolve it, and is effectively equivalent.

### Why are the changes needed?

To better cross-build for 2.13.

### Does this PR introduce any user-facing change?

None.

### How was this patch tested?

Existing tests.

Closes #26062 from srowen/SPARK-29401.

Authored-by: Sean Owen <sean.owen@databricks.com>
Signed-off-by: Dongjoon Hyun <dhyun@apple.com>
2019-10-08 20:22:02 -07:00
Sean Owen 2d871ad0e7 [SPARK-29392][CORE][SQL][STREAMING] Remove symbol literal syntax 'foo, deprecated in Scala 2.13, in favor of Symbol("foo")
### What changes were proposed in this pull request?

Syntax like `'foo` is deprecated in Scala 2.13. Replace usages with `Symbol("foo")`

### Why are the changes needed?

Avoids ~50 deprecation warnings when attempting to build with 2.13.

### Does this PR introduce any user-facing change?

None, should be no functional change at all.

### How was this patch tested?

Existing tests.

Closes #26061 from srowen/SPARK-29392.

Authored-by: Sean Owen <sean.owen@databricks.com>
Signed-off-by: Dongjoon Hyun <dhyun@apple.com>
2019-10-08 20:15:37 -07:00
Imran Rashid 0da667d314 [SPARK-28917][CORE] Synchronize access to RDD mutable state
RDD dependencies and partitions can be simultaneously
accessed and mutated by user threads and spark's scheduler threads, so
access must be thread-safe.  In particular, as partitions and
dependencies are lazily-initialized, before this change they could get
initialized multiple times, which would lead to the scheduler having an
inconsistent view of the pendings stages and get stuck.

Tested with existing unit tests.

Closes #25951 from squito/SPARK-28917.

Authored-by: Imran Rashid <irashid@cloudera.com>
Signed-off-by: Marcelo Vanzin <vanzin@cloudera.com>
2019-10-08 11:35:54 -07:00
Marcelo Vanzin d2f21b0199 [SPARK-27468][CORE] Track correct storage level of RDDs and partitions
Previously, the RDD level would change depending on the status reported
by executors for the block they were storing, and individual blocks would
reflect that. That is wrong because different blocks may be stored differently
in different executors.

So now the RDD tracks the user-provided storage level, while the individual
partitions reflect the current storage level of that particular block,
including the current number of replicas.

Closes #25779 from vanzin/SPARK-27468.

Authored-by: Marcelo Vanzin <vanzin@cloudera.com>
Signed-off-by: Imran Rashid <irashid@cloudera.com>
2019-10-07 16:07:00 -05:00
Xingbo Jiang 80afc79d89 [SPARK-29263][SCHEDULER][FOLLOWUP][TEST] Update FakeTask.createTaskSet() method
### What changes were proposed in this pull request?

Update `FakeTask.createTaskSet()` method to make the generated TaskSet makes use of the `priority` param.

### How was this patch tested?

All the places referring to this method pass `priority = 0`, but we still need to fix this to avoid future test failure.

Closes #26030 from jiangxb1987/SPARK-292630-FOLLOWUP.

Authored-by: Xingbo Jiang <xingbo.jiang@databricks.com>
Signed-off-by: Dongjoon Hyun <dhyun@apple.com>
2019-10-05 14:44:58 -07:00
DB Tsai 8b71e545d5 [SPARK-29351][CORE] Avoid Full Synchronization in ShuffleMapStage
### What changes were proposed in this pull request?

This PR use read/write locks instead of `synchronized`.

### Why are the changes needed?

In one of our production streaming jobs that has more than 1k executors, and each has 20 cores, Spark spends significant portion of time (30s) in sending out the `ShuffeStatus`. We find there are two issues.

1. In driver's message loop, it's calling `serializedMapStatus` which is in sync block. When the job scales really big, it can cause the contention.
2. When the job is big, the `MapStatus` is huge as well, the serialization time and compression time is slow.

This PR aims to address the first problem.

### Does this PR introduce any user-facing change?

No.

### How was this patch tested?

Test with existing test cases.

Closes #26017 from dbtsai/readWriteLock.

Authored-by: DB Tsai <d_tsai@apple.com>
Signed-off-by: DB Tsai <d_tsai@apple.com>
2019-10-04 03:14:10 +00:00
zhouyongjin aedf090ab7 [SPARK-25468][WEBUI][FOLLOWUP] Current page index keep style with dataTable in the spark UI
### What changes were proposed in this pull request?
Current page index keep style with dataTable  in the spark UI.
https://issues.apache.org/jira/browse/SPARK-25468

move
.paginate_button.active > a {
    color: #999999;
    text-decoration: underline;
}

add
.paginate_button.active {
  border: 1px solid #979797 !important;
  background: white linear-gradient(to bottom, #fff 0%, #dcdcdc 100%);
}

### How was this patch tested?
![image](https://user-images.githubusercontent.com/8166352/65872683-b5303f80-e3b3-11e9-9785-9e1d15b0b3cf.png)
![image](https://user-images.githubusercontent.com/8166352/65872692-ba8d8a00-e3b3-11e9-8cf6-db6f905d3387.png)

Closes #25976 from YongjinZhou/master.

Authored-by: zhouyongjin <zhouyongjin@inspur.com>
Signed-off-by: Sean Owen <sean.owen@databricks.com>
2019-10-03 14:03:21 -05:00
Sean Owen 7aca0dd658 [SPARK-29296][BUILD][CORE] Remove use of .par to make 2.13 support easier; add scala-2.13 profile to enable pulling in par collections library separately, for the future
### What changes were proposed in this pull request?

Scala 2.13 removes the parallel collections classes to a separate library, so first, this establishes a `scala-2.13` profile to bring it back, for future use.

However the library enables use of `.par` implicit conversions via a new class that is not in 2.12, which makes cross-building hard. This implements a suggested workaround from https://github.com/scala/scala-parallel-collections/issues/22 to avoid `.par` entirely.

### Why are the changes needed?

To compile for 2.13 and later to work with 2.13.

### Does this PR introduce any user-facing change?

Should not, no.

### How was this patch tested?

Existing tests.

Closes #25980 from srowen/SPARK-29296.

Authored-by: Sean Owen <sean.owen@databricks.com>
Signed-off-by: Sean Owen <sean.owen@databricks.com>
2019-10-03 08:56:08 -05:00
Jungtaek Lim (HeartSaVioR) e44d191bf7 [SPARK-29322][CORE] Enable closeFrameOnFlush on ZstdOutputStream for event log file
### What changes were proposed in this pull request?

This patch proposes to enable `closeFrameOnFlush` for ZstdOutputStream specific to event logger, so that continuous input stream of zstd is not stuck when reading "inprogress" event log file.

The issue seems to be introduced from [SPARK-26283](https://issues.apache.org/jira/browse/SPARK-26283) which addressed some bug via reading event log file with enabling continuous mode, but it changed the behavior of input stream to read open frame, which seem to wait for frame to be closed. Enabling `closeFrameOnFlush` would close frame whenever flush is called, so input stream could read the frame sooner.

As a pair of `compressedContinuousInputStream`, this patch adds `compressedContinuousOutputStream` which will be only used for event logging.

### Why are the changes needed?

Without this patch, the reader thread in SHS is stuck on reading inprogress event log file compressed with zstd until the application becomes finished.

### Does this PR introduce any user-facing change?

It might bring some overhead on each flush when writing zstd compressed event log, so some sort of performance hit could be introduced. I've restricted the case to only event logging.

### How was this patch tested?

Manually tested, via setting Spark configuration as below:

```
spark.eventLog.enabled                     true
spark.eventLog.compress                  true
spark.eventLog.compression.codec zstd
```

and start Spark application. While the application is running, load the application in SHS webpage.

Before this patch, it may succeed to replay the event log, but high likely it will be stuck and loading page will be also stuck. After this patch, SHS can properly reads the inprogress event log file.

Closes #25996 from HeartSaVioR/SPARK-29322.

Authored-by: Jungtaek Lim (HeartSaVioR) <kabhwan.opensource@gmail.com>
Signed-off-by: Dongjoon Hyun <dhyun@apple.com>
2019-10-02 20:48:38 -07:00
huangweiyi 85dafabeb4 [SPARK-29273][CORE] Save peakExecutionMemory value when writing task end to event log
in TaskMetrics, there is a exposed metrics peakExecutionMemory, but the value never been set. when a task is finished, it generate a SparkListenerTaskEnd event info, incuding the metrics value. actually the peakExecutionMemory is stored in the Accumulables which is a member of TaskInfo.

so when parse the SparkListenerTaskEnd event, we can get the `internal.metrics.peakExecutionMemory` value from the parsed taskInfo object, and set back to the TaskMetricsInfo with supply a setPeakExecutionMemory method.

Closes #25949 from 012huang/fix-peakExecutionMemory-metrics-value.

Lead-authored-by: huangweiyi <huangweiyi_2006@qq.com>
Co-authored-by: hwy <huangweiyi_2006@qq.com>
Signed-off-by: Marcelo Vanzin <vanzin@cloudera.com>
2019-10-02 08:45:12 -07:00
Josh Rosen c6938eab57 [SPARK-29310][CORE][TESTS] TestMemoryManager should implement getExecutionMemoryUsageForTask()
### What changes were proposed in this pull request?

This PR updates `TestMemoryManager`, a class used only in unit tests, to override the `getExecutionMemoryUsageForTask()` and `releaseAllExecutionMemoryForTask()` methods. I also `synchronized` its state-accessing methods (to make the class thread-safe) and added some additional assertions to guard against freeing memory memory than has been allocated.

### Why are the changes needed?

Spark uses a `TestMemoryManager` class to mock out memory manager functionality in tests, allowing test authors to exercise control over certain behaviors (e.g. to simulate OOMs).

Several of Spark's test suites have memory-leak detection to ensure that all allocated memory is cleaned up at the end of each test case; this helps to guard against bugs that could cause production memory leaks. For example, see `testWithMemoryLeakDetection` in `UnsafeFixedWidthAggregationMapSuite`.

Unfortunately, however, this leak-detection logic is broken for tests which use TestMemoryManager because it does not override the `getExecutionMemoryUsageForTask()` method that is used by the leak-detection checks.

This PR fixes that problem, thereby strengthening our existing tests.

I spotted this problem while reviewing #25953: I tried introducing a change to remove a `freePage()` call (purposely inducing a memory leak) but no tests failed.

### Does this PR introduce any user-facing change?

No.

### How was this patch tested?

Added a new `TestMemoryManagerSuite`, with tests covering `TestMemoryManager` itself.

Ran a subset of existing tests on my laptop and uncovered a bug in one test's `free()` calls, plus missing cleanup calls in another test suite; both of these issues are fixed in this PR.

Closes #25985 from JoshRosen/SPARK-29310-testmemorymanager-getExecutionMemoryUsageForTask.

Lead-authored-by: Josh Rosen <rosenville@gmail.com>
Co-authored-by: joshrosen-stripe <48632449+joshrosen-stripe@users.noreply.github.com>
Signed-off-by: Wenchen Fan <wenchen@databricks.com>
2019-10-02 11:07:19 +08:00
Liang-Chi Hsieh 0cd436be66 [SPARK-29244][CORE] Prevent freed page in BytesToBytesMap free again
### What changes were proposed in this pull request?

When BytesToBytesMap cannot allocate a page, allocated page was freed by TaskMemoryManager. In this case, we should not keep it in longArray field of BytesToBytesMap. Otherwise it might be freed again in task completion listener of UnsafeFixedWidthAggregationMap and cause confusing error.

Note that because we catch Throwable when invoking completion listeners, this error should not affect other listeners, except for the current listener. In the completion listener of UnsafeFixedWidthAggregationMap, it only performs BytesToBytesMap.free().

BytesToBytesMap.free() does two things: freeing allocated pages and deleting spilled files. When it tries to free a freed page, this error hits and skips remaining pages (Executor.cleanUpAllAllocatedMemory will guard memory leak) and spilled files.

### Why are the changes needed?

By chance, it is possibly that we free an already freed page in BytesToBytesMap. Because we have some guards when freeing a page, a confusing error would be hit:

```
16:07:33.550 ERROR org.apache.spark.TaskContextImpl: Error in TaskCompletionListener
java.lang.AssertionError: Called freePage() on a memory block that has already been freed
        at org.apache.spark.memory.TaskMemoryManager.freePage(TaskMemoryManager.java:332)
        at org.apache.spark.memory.MemoryConsumer.freePage(MemoryConsumer.java:129)
        at org.apache.spark.memory.MemoryConsumer.freeArray(MemoryConsumer.java:107)
        at org.apache.spark.unsafe.map.BytesToBytesMap.free(BytesToBytesMap.java:806)
        at org.apache.spark.sql.execution.UnsafeFixedWidthAggregationMap.free(UnsafeFixedWidthAggregationMap.java:226)
        at org.apache.spark.sql.execution.UnsafeFixedWidthAggregationMap.lambda$new$0(UnsafeFixedWidthAggregationMap.java:112)
        at org.apache.spark.TaskContextImpl.$anonfun$markTaskCompleted$1(TaskContextImpl.scala:119)
        at org.apache.spark.TaskContextImpl.$anonfun$markTaskCompleted$1$adapted(TaskContextImpl.scala:119)
        at org.apache.spark.TaskContextImpl.$anonfun$invokeListeners$1(TaskContextImpl.scala:132)
        at org.apache.spark.TaskContextImpl.$anonfun$invokeListeners$1$adapted(TaskContextImpl.scala:130)
        at scala.collection.mutable.ResizableArray.foreach(ResizableArray.scala:62)
        at scala.collection.mutable.ResizableArray.foreach$(ResizableArray.scala:55)
        at scala.collection.mutable.ArrayBuffer.foreach(ArrayBuffer.scala:49)
        at org.apache.spark.TaskContextImpl.invokeListeners(TaskContextImpl.scala:130)
        at org.apache.spark.TaskContextImpl.markTaskCompleted(TaskContextImpl.scala:119)
        at org.apache.spark.sql.execution.UnsafeFixedWidthAggregationMapSuite.$anonfun$new$19(UnsafeFixedWidthAggregationMapSuite.scala:425)
        at org.apache.spark.sql.execution.UnsafeFixedWidthAggregationMapSuite.$anonfun$testWithMemoryLeakDetection$1(UnsafeFixedWidthAggregationMapSuite.scala:87)
        at scala.runtime.java8.JFunction0$mcV$sp.apply(JFunction0$mcV$sp.java:23)
        at org.scalatest.OutcomeOf.outcomeOf(OutcomeOf.scala:85)
        at org.scalatest.OutcomeOf.outcomeOf$(OutcomeOf.scala:83)
        at org.scalatest.OutcomeOf$.outcomeOf(OutcomeOf.scala:104)
        at org.scalatest.Transformer.apply(Transformer.scala:22)
        at org.scalatest.Transformer.apply(Transformer.scala:20)
        at org.scalatest.FunSuiteLike$$anon$1.apply(FunSuiteLike.scala:186)
        at org.apache.spark.SparkFunSuite.withFixture(SparkFunSuite.scala:149)
        at org.scalatest.FunSuiteLike.invokeWithFixture$1(FunSuiteLike.scala:184)
        at org.scalatest.FunSuiteLike.$anonfun$runTest$1(FunSuiteLike.scala:196)
        at org.scalatest.SuperEngine.runTestImpl(Engine.scala:289)
        at org.scalatest.FunSuiteLike.runTest(FunSuiteLike.scala:196)
        at org.scalatest.FunSuiteLike.runTest$(FunSuiteLike.scala:178)
        at org.apache.spark.SparkFunSuite.org$scalatest$BeforeAndAfterEach$$super$runTest(SparkFunSuite.scala:56)
        at org.scalatest.BeforeAndAfterEach.runTest(BeforeAndAfterEach.scala:221)
        at org.scalatest.BeforeAndAfterEach.runTest$(BeforeAndAfterEach.scala:214)
        at org.apache.spark.SparkFunSuite.runTest(SparkFunSuite.scala:56)
        at org.scalatest.FunSuiteLike.$anonfun$runTests$1(FunSuiteLike.scala:229)
        at org.scalatest.SuperEngine.$anonfun$runTestsInBranch$1(Engine.scala:396)
        at scala.collection.immutable.List.foreach(List.scala:392)
        at org.scalatest.SuperEngine.traverseSubNodes$1(Engine.scala:384)
        at org.scalatest.SuperEngine.runTestsInBranch(Engine.scala:379)
        at org.scalatest.SuperEngine.runTestsImpl(Engine.scala:461)
        at org.scalatest.FunSuiteLike.runTests(FunSuiteLike.scala:229)
        at org.scalatest.FunSuiteLike.runTests$(FunSuiteLike.scala:228)
        at org.scalatest.FunSuite.runTests(FunSuite.scala:1560)
        at org.scalatest.Suite.run(Suite.scala:1147)
        at org.scalatest.Suite.run$(Suite.scala:1129)
        at org.scalatest.FunSuite.org$scalatest$FunSuiteLike$$super$run(FunSuite.scala:1560)
        at org.scalatest.FunSuiteLike.$anonfun$run$1(FunSuiteLike.scala:233)
        at org.scalatest.SuperEngine.runImpl(Engine.scala:521)
        at org.scalatest.FunSuiteLike.run(FunSuiteLike.scala:233)
        at org.scalatest.FunSuiteLike.run$(FunSuiteLike.scala:232)
        at org.apache.spark.SparkFunSuite.org$scalatest$BeforeAndAfterAll$$super$run(SparkFunSuite.scala:56)
        at org.scalatest.BeforeAndAfterAll.liftedTree1$1(BeforeAndAfterAll.scala:213)
        at org.scalatest.BeforeAndAfterAll.run(BeforeAndAfterAll.scala:210)
        at org.scalatest.BeforeAndAfterAll.run$(BeforeAndAfterAll.scala:208)
        at org.apache.spark.SparkFunSuite.run(SparkFunSuite.scala:56)
        at org.scalatest.tools.Framework.org$scalatest$tools$Framework$$runSuite(Framework.scala:314)
        at org.scalatest.tools.Framework$ScalaTestTask.execute(Framework.scala:507)
        at sbt.ForkMain$Run$2.call(ForkMain.java:296)
        at sbt.ForkMain$Run$2.call(ForkMain.java:286)
        at java.util.concurrent.FutureTask.run(FutureTask.java:266)
        at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1149)
        at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:624)
        at java.lang.Thread.run(Thread.java:748)
[info] - SPARK-29244 *** FAILED *** (16 milliseconds)
[info]   org.apache.spark.util.TaskCompletionListenerException: Called freePage() on a memory block that has already been freed
[info]
[info] Previous exception in task: Unable to acquire 4096 bytes of memory, got 0
[info]  org.apache.spark.memory.MemoryConsumer.throwOom(MemoryConsumer.java:157)
[info]  org.apache.spark.memory.MemoryConsumer.allocateArray(MemoryConsumer.java:97)
[info]  org.apache.spark.unsafe.map.BytesToBytesMap.allocate(BytesToBytesMap.java:790)
[info]  org.apache.spark.unsafe.map.BytesToBytesMap.reset(BytesToBytesMap.java:893)
[info]  org.apache.spark.sql.execution.UnsafeKVExternalSorter.<init>(UnsafeKVExternalSorter.java:170)
[info]  org.apache.spark.sql.execution.UnsafeFixedWidthAggregationMap.destructAndCreateExternalSorter(UnsafeFixedWidthAggregationMap.java:249)
[info]  org.apache.spark.sql.execution.UnsafeFixedWidthAggregationMapSuite.$anonfun$new$19(UnsafeFixedWidthAggregationMapSuite.scala:421)
[info]  org.apache.spark.sql.execution.UnsafeFixedWidthAggregationMapSuite.$anonfun$testWithMemoryLeakDetection$1(UnsafeFixedWidthAggregationMapSuite.scala:87)
```

### Does this PR introduce any user-facing change?

No

### How was this patch tested?

Added unit test.

Closes #25953 from viirya/SPARK-29244.

Lead-authored-by: Liang-Chi Hsieh <viirya@gmail.com>
Co-authored-by: Liang-Chi Hsieh <liangchi@uber.com>
Signed-off-by: Liang-Chi Hsieh <liangchi@uber.com>
2019-10-01 11:50:31 -07:00
Jungtaek Lim (HeartSaVioR) a4601cb44e [SPARK-29055][CORE] Update driver/executors' storage memory when block is removed from BlockManager
### What changes were proposed in this pull request?

This patch proposes to fix the issue that storage memory is not decreasing even block is removed in BlockManager. Originally the issue is found while removed broadcast doesn't reflect the storage memory on driver/executors.

AppStatusListener expects the value of memory in events on block update as "delta" so that it adjusts driver/executors' storage memory based on delta, but when removing block BlockManager reports the delta as 0, so the storage memory is not decreased. `BlockManager.dropFromMemory` deals with this correctly, so some of path of freeing memory has been updated correctly.

### Why are the changes needed?

The storage memory in metrics in AppStatusListener is now out of sync which lets end users easy to confuse as memory leak is happening.

### Does this PR introduce any user-facing change?

No.

### How was this patch tested?

Modified UTs. Also manually tested via running simple query repeatedly and observe executor page of Spark UI to see the value of storage memory is decreasing as well.

Please refer the description of [SPARK-29055](https://issues.apache.org/jira/browse/SPARK-29055) to get simple reproducer.

Closes #25973 from HeartSaVioR/SPARK-29055.

Authored-by: Jungtaek Lim (HeartSaVioR) <kabhwan.opensource@gmail.com>
Signed-off-by: Marcelo Vanzin <vanzin@cloudera.com>
2019-10-01 09:41:51 -07:00
Dongjoon Hyun bd031c2173 [SPARK-29307][BUILD][TESTS] Remove scalatest deprecation warnings
### What changes were proposed in this pull request?

This PR aims to remove `scalatest` deprecation warnings with the following changes.
- `org.scalatest.mockito.MockitoSugar` -> `org.scalatestplus.mockito.MockitoSugar`
- `org.scalatest.selenium.WebBrowser` -> `org.scalatestplus.selenium.WebBrowser`
- `org.scalatest.prop.Checkers` -> `org.scalatestplus.scalacheck.Checkers`
- `org.scalatest.prop.GeneratorDrivenPropertyChecks` -> `org.scalatestplus.scalacheck.ScalaCheckDrivenPropertyChecks`

### Why are the changes needed?

According to the Jenkins logs, there are 118 warnings about this.
```
 grep "is deprecated" ~/consoleText | grep scalatest | wc -l
     118
```

### Does this PR introduce any user-facing change?

No.

### How was this patch tested?

After Jenkins passes, we need to check the Jenkins log.

Closes #25982 from dongjoon-hyun/SPARK-29307.

Authored-by: Dongjoon Hyun <dhyun@apple.com>
Signed-off-by: Dongjoon Hyun <dhyun@apple.com>
2019-09-30 21:00:11 -07:00