Commit graph

6544 commits

Author SHA1 Message Date
Marcelo Vanzin 1122754bd9 [SPARK-24653][TESTS] Avoid cross-job pollution in TestUtils / SpillListener.
There is a narrow race in this code that is caused when the code being
run in assertSpilled / assertNotSpilled runs more than a single job.

SpillListener assumed that only a single job was run, and so would only
block waiting for that single job to finish when `numSpilledStages` was
called. But some tests (like SQL tests that call `checkAnswer`) run more
than one job, and so that wait was basically a no-op.

This could cause the next test to install a listener to receive events
from the previous job. Which could cause test failures in certain cases.

The change fixes that race, and also uninstalls listeners after the
test runs, so they don't accumulate when the SparkContext is shared
among multiple tests.

Author: Marcelo Vanzin <vanzin@cloudera.com>

Closes #21639 from vanzin/SPARK-24653.
2018-08-01 15:47:46 +08:00
Xingbo Jiang 3695ba5773 [MINOR][CORE][TEST] Fix afterEach() in TastSetManagerSuite and TaskSchedulerImplSuite
## What changes were proposed in this pull request?

In the `afterEach()` method of both `TastSetManagerSuite` and `TaskSchedulerImplSuite`, `super.afterEach()` shall be called at the end, because it shall stop the SparkContext.

https://amplab.cs.berkeley.edu/jenkins/job/SparkPullRequestBuilder/93706/testReport/org.apache.spark.scheduler/TaskSchedulerImplSuite/_It_is_not_a_test_it_is_a_sbt_testing_SuiteSelector_/
The test failure is caused by the above reason, the newly added `barrierCoordinator` required `rpcEnv` which has been stopped before `TaskSchedulerImpl` doing cleanup.

## How was this patch tested?
Existing tests.

Author: Xingbo Jiang <xingbo.jiang@databricks.com>

Closes #21908 from jiangxb1987/afterEach.
2018-07-30 09:58:28 +08:00
Hieu Huynh 5828f41a52 [SPARK-13343] speculative tasks that didn't commit shouldn't be marked as success
**Description**
Currently Speculative tasks that didn't commit can show up as success (depending on timing of commit). This is a bit confusing because that task didn't really succeed in the sense it didn't write anything.
I think these tasks should be marked as KILLED or something that is more obvious to the user exactly what happened. it is happened to hit the timing where it got a commit denied exception then it shows up as failed and counts against your task failures. It shouldn't count against task failures since that failure really doesn't matter.
MapReduce handles these situation so perhaps we can look there for a model.

<img width="1420" alt="unknown" src="https://user-images.githubusercontent.com/15680678/42013170-99db48c2-7a61-11e8-8c7b-ef94c84e36ea.png">

**How can this issue happen?**
When both attempts of a task finish before the driver sends command to kill one of them, both of them send the status update FINISHED to the driver. The driver calls TaskSchedulerImpl to handle one successful task at a time. When it handles the first successful task, it sends the command to kill the other copy of the task, however, because that task is already finished, the executor will ignore the command. After finishing handling the first attempt, it processes the second one, although all actions on the result of this task are skipped, this copy of the task is still marked as SUCCESS. As a result, even though this issue does not affect the result of the job, it might cause confusing to user because both of them appear to be successful.

**How does this PR fix the issue?**
The simple way to fix this issue is that when taskSetManager handles successful task, it checks if any other attempt succeeded. If this is the case, it will call handleFailedTask with state==KILLED and reason==TaskKilled(“another attempt succeeded”) to handle this task as begin killed.

**How was this patch tested?**
I tested this manually by running applications, that caused the issue before, a few times, and observed that the issue does not happen again. Also, I added a unit test in TaskSetManagerSuite to test that if we call handleSuccessfulTask to handle status update for 2 copies of a task, only the one that is handled first will be mark as SUCCESS

Author: Hieu Huynh <“Hieu.huynh@oath.com”>
Author: hthuynh2 <hthieu96@gmail.com>

Closes #21653 from hthuynh2/SPARK_13343.
2018-07-27 12:34:14 -05:00
Imran Rashid 2c82745686 [SPARK-24307][CORE] Add conf to revert to old code.
In case there are any issues in converting FileSegmentManagedBuffer to
ChunkedByteBuffer, add a conf to go back to old code path.

Followup to 7e847646d1

Author: Imran Rashid <irashid@cloudera.com>

Closes #21867 from squito/SPARK-24307-p2.
2018-07-26 12:13:27 -07:00
Xingbo Jiang e3486e1b95 [SPARK-24795][CORE] Implement barrier execution mode
## What changes were proposed in this pull request?

Propose new APIs and modify job/task scheduling to support barrier execution mode, which requires all tasks in a same barrier stage start at the same time, and retry all tasks in case some tasks fail in the middle. The barrier execution mode is useful for some ML/DL workloads.

The proposed API changes include:

- `RDDBarrier` that marks an RDD as barrier (Spark must launch all the tasks together for the current stage).
- `BarrierTaskContext` that support global sync of all tasks in a barrier stage, and provide extra `BarrierTaskInfo`s.

In DAGScheduler, we retry all tasks of a barrier stage in case some tasks fail in the middle, this is achieved by unregistering map outputs for a shuffleId (for ShuffleMapStage) or clear the finished partitions in an active job (for ResultStage).

## How was this patch tested?

Add `RDDBarrierSuite` to ensure we convert RDDs correctly;
Add new test cases in `DAGSchedulerSuite` to ensure we do task scheduling correctly;
Add new test cases in `SparkContextSuite` to ensure the barrier execution mode actually works (both under local mode and local cluster mode).
Add new test cases in `TaskSchedulerImplSuite` to ensure we schedule tasks for barrier taskSet together.

Author: Xingbo Jiang <xingbo.jiang@databricks.com>

Closes #21758 from jiangxb1987/barrier-execution-mode.
2018-07-26 12:09:01 -07:00
Imran Rashid 15fff79032 [SPARK-24297][CORE] Fetch-to-disk by default for > 2gb
Fetch-to-mem is guaranteed to fail if the message is bigger than 2 GB,
so we might as well use fetch-to-disk in that case.  The message includes
some metadata in addition to the block data itself (in particular
UploadBlock has a lot of metadata), so we leave a little room.

Author: Imran Rashid <irashid@cloudera.com>

Closes #21474 from squito/SPARK-24297.
2018-07-25 09:08:42 +08:00
Yuanjian Li 7db81ac8a2 [SPARK-24195][CORE] Ignore the files with "local" scheme in SparkContext.addFile
## What changes were proposed in this pull request?

In Spark "local" scheme means resources are already on the driver/executor nodes, this pr ignore the files with "local" scheme in `SparkContext.addFile` for fixing potential bug.

## How was this patch tested?

Existing tests.

Author: Yuanjian Li <xyliyuanjian@gmail.com>

Closes #21533 from xuanyuanking/SPARK-24195.
2018-07-20 11:25:51 +08:00
Imran Rashid 7e847646d1 [SPARK-24307][CORE] Support reading remote cached partitions > 2gb
(1) Netty's ByteBuf cannot support data > 2gb.  So to transfer data from a
ChunkedByteBuffer over the network, we use a custom version of
FileRegion which is backed by the ChunkedByteBuffer.

(2) On the receiving end, we need to expose all the data in a
FileSegmentManagedBuffer as a ChunkedByteBuffer.  We do that by memory
mapping the entire file in chunks.

Added unit tests.  Ran the randomized test a couple of hundred times on my laptop.  Tests cover the equivalent of SPARK-24107 for the ChunkedByteBufferFileRegion.  Also tested on a cluster with remote cache reads >2gb (in memory and on disk).

Author: Imran Rashid <irashid@cloudera.com>

Closes #21440 from squito/chunked_bb_file_region.
2018-07-20 11:16:53 +08:00
Hieu Huynh 8d707b0600 [SPARK-24755][CORE] Executor loss can cause task to not be resubmitted
**Description**
As described in [SPARK-24755](https://issues.apache.org/jira/browse/SPARK-24755), when speculation is enabled, there is scenario that executor loss can cause task to not be resubmitted.
This patch changes the variable killedByOtherAttempt to keeps track of the taskId of tasks that are killed by other attempt. By doing this, we can still prevent resubmitting task killed by other attempt while resubmit successful attempt when executor lost.

**How was this patch tested?**
A UT is added based on the UT written by xuanyuanking with modification to simulate the scenario described in SPARK-24755.

Author: Hieu Huynh <“Hieu.huynh@oath.com”>

Closes #21729 from hthuynh2/SPARK_24755.
2018-07-19 09:52:07 -05:00
sychen c8bee932cb [SPARK-24677][CORE] Avoid NoSuchElementException from MedianHeap
## What changes were proposed in this pull request?
When speculation is enabled,
TaskSetManager#markPartitionCompleted should write successful task duration to MedianHeap,
not just increase tasksSuccessful.

Otherwise when TaskSetManager#checkSpeculatableTasks,tasksSuccessful non-zero, but MedianHeap is empty.
Then throw an exception successfulTaskDurations.median java.util.NoSuchElementException: MedianHeap is empty.
Finally led to stopping SparkContext.
## How was this patch tested?
TaskSetManagerSuite.scala
unit test:[SPARK-24677] MedianHeap should not be empty when speculation is enabled

Author: sychen <sychen@ctrip.com>

Closes #21656 from cxzl25/fix_MedianHeap_empty.
2018-07-18 13:24:41 -05:00
Nihar Sheth 2694dd2bf0 [MINOR][CORE] Add test cases for RDD.cartesian
## What changes were proposed in this pull request?

While looking through the codebase, it appeared that the scala code for RDD.cartesian does not have any tests for correctness. This adds a couple basic tests to verify cartesian yields correct values. While the implementation for RDD.cartesian is pretty simple, it always helps to have a few tests!

## How was this patch tested?

The new test cases pass, and the scala style tests from running dev/run-tests all pass.

Please review http://spark.apache.org/contributing.html before opening a pull request.

Author: Nihar Sheth <niharrsheth@gmail.com>

Closes #21765 from NiharS/cartesianTests.
2018-07-18 09:14:36 -05:00
sandeep-katta 2603ae30be [SPARK-24558][CORE] wrong Idle Timeout value is used in case of the cacheBlock.
It is corrected as per the configuration.

## What changes were proposed in this pull request?
IdleTimeout info used to print in the logs is taken based on the cacheBlock. If it is cacheBlock then cachedExecutorIdleTimeoutS is considered else executorIdleTimeoutS

## How was this patch tested?
Manual Test
spark-sql> cache table sample;
2018-05-15 14:44:02 INFO  DAGScheduler:54 - Submitting 3 missing tasks from ShuffleMapStage 0 (MapPartitionsRDD[8] at processCmd at CliDriver.java:376) (first 15 tasks are for partitions Vector(0, 1, 2))
2018-05-15 14:44:02 INFO  YarnScheduler:54 - Adding task set 0.0 with 3 tasks
2018-05-15 14:44:03 INFO  ExecutorAllocationManager:54 - Requesting 1 new executor because tasks are backlogged (new desired total will be 1)
...
...
2018-05-15 14:46:10 INFO  YarnClientSchedulerBackend:54 - Actual list of executor(s) to be killed is 1
2018-05-15 14:46:10 INFO  **ExecutorAllocationManager:54 - Removing executor 1 because it has been idle for 120 seconds (new desired total will be 0)**
2018-05-15 14:46:11 INFO  YarnSchedulerBackend$YarnDriverEndpoint:54 - Disabling executor 1.
2018-05-15 14:46:11 INFO  DAGScheduler:54 - Executor lost: 1 (epoch 1)

Author: sandeep-katta <sandeep.katta2007@gmail.com>

Closes #21565 from sandeep-katta/loginfoBug.
2018-07-16 14:52:49 +08:00
Maxim Gekk 69993217fc [SPARK-24807][CORE] Adding files/jars twice: output a warning and add a note
## What changes were proposed in this pull request?

In the PR, I propose to output an warning if the `addFile()` or `addJar()` methods are callled more than once for the same path. Currently, overwriting of already added files is not supported. New comments and warning are reflected the existing behaviour.

Author: Maxim Gekk <maxim.gekk@databricks.com>

Closes #21771 from MaxGekk/warning-on-adding-file.
2018-07-14 22:07:49 -07:00
Dhruve Ashar 1055c94cdf [SPARK-24610] fix reading small files via wholeTextFiles
## What changes were proposed in this pull request?
The `WholeTextFileInputFormat` determines the `maxSplitSize` for the file/s being read using the `wholeTextFiles` method. While this works well for large files, for smaller files where the maxSplitSize is smaller than the defaults being used with configs like hive-site.xml or explicitly passed in the form of `mapreduce.input.fileinputformat.split.minsize.per.node` or `mapreduce.input.fileinputformat.split.minsize.per.rack` , it just throws up an exception.

```java
java.io.IOException: Minimum split size pernode 123456 cannot be larger than maximum split size 9962
at org.apache.hadoop.mapreduce.lib.input.CombineFileInputFormat.getSplits(CombineFileInputFormat.java:200)
at org.apache.spark.rdd.WholeTextFileRDD.getPartitions(WholeTextFileRDD.scala:50)
at org.apache.spark.rdd.RDD$$anonfun$partitions$2.apply(RDD.scala:252)
at org.apache.spark.rdd.RDD$$anonfun$partitions$2.apply(RDD.scala:250)
at scala.Option.getOrElse(Option.scala:121)
at org.apache.spark.rdd.RDD.partitions(RDD.scala:250)
at org.apache.spark.rdd.MapPartitionsRDD.getPartitions(MapPartitionsRDD.scala:35)
at org.apache.spark.rdd.RDD$$anonfun$partitions$2.apply(RDD.scala:252)
at org.apache.spark.rdd.RDD$$anonfun$partitions$2.apply(RDD.scala:250)
at scala.Option.getOrElse(Option.scala:121)
at org.apache.spark.rdd.RDD.partitions(RDD.scala:250)
at org.apache.spark.SparkContext.runJob(SparkContext.scala:2096)
at org.apache.spark.rdd.RDD.count(RDD.scala:1158)
... 48 elided
`

This change checks the maxSplitSize against the minSplitSizePerNode and minSplitSizePerRack and set them if `maxSplitSize < minSplitSizePerNode/Rack`

## How was this patch tested?
Test manually setting the conf while launching the job and added unit test.

Author: Dhruve Ashar <dhruveashar@gmail.com>

Closes #21601 from dhruve/bug/SPARK-24610.
2018-07-12 15:36:02 -05:00
Kazuaki Ishizaki 5ad4735bda [SPARK-24529][BUILD][TEST-MAVEN] Add spotbugs into maven build process
## What changes were proposed in this pull request?

This PR enables a Java bytecode check tool [spotbugs](https://spotbugs.github.io/) to avoid possible integer overflow at multiplication. When an violation is detected, the build process is stopped.
Due to the tool limitation, some other checks will be enabled. In this PR, [these patterns](http://spotbugs-in-kengo-toda.readthedocs.io/en/lqc-list-detectors/detectors.html#findpuzzlers) in `FindPuzzlers` can be detected.

This check is enabled at `compile` phase. Thus, `mvn compile` or `mvn package` launches this check.

## How was this patch tested?

Existing UTs

Author: Kazuaki Ishizaki <ishizaki@jp.ibm.com>

Closes #21542 from kiszk/SPARK-24529.
2018-07-12 09:52:23 +08:00
Rekha Joshi 290c30a53f [SPARK-24470][CORE] RestSubmissionClient to be robust against 404 & non json responses
## What changes were proposed in this pull request?
Added check for 404, to avoid json parsing on not found response and to avoid returning malformed or bad request when it was a not found http response.
Not sure if I need to add an additional check on non json response [if(connection.getHeaderField("Content-Type").contains("text/html")) then exception] as non-json is a subset of malformed json and covered in flow.

## How was this patch tested?
./dev/run-tests

Author: Rekha Joshi <rekhajoshm@gmail.com>

Closes #21684 from rekhajoshm/SPARK-24470.
2018-07-11 13:48:28 -05:00
sharkdtu 6fe32869cc [SPARK-24678][SPARK-STREAMING] Give priority in use of 'PROCESS_LOCAL' for spark-streaming
## What changes were proposed in this pull request?

Currently, `BlockRDD.getPreferredLocations`  only get hosts info of blocks, which results in subsequent schedule level is not better than 'NODE_LOCAL'. We can just make a small changes, the schedule level can be improved to 'PROCESS_LOCAL'

## How was this patch tested?

manual test

Author: sharkdtu <sharkdtu@tencent.com>

Closes #21658 from sharkdtu/master.
2018-07-10 20:18:34 +08:00
jerryshao e2c7e09f74 [SPARK-24646][CORE] Minor change to spark.yarn.dist.forceDownloadSchemes to support wildcard '*'
## What changes were proposed in this pull request?

In the case of getting tokens via customized `ServiceCredentialProvider`, it is required that `ServiceCredentialProvider` be available in local spark-submit process classpath. In this case, all the configured remote sources should be forced to download to local.

For the ease of using this configuration, here propose to add wildcard '*' support to `spark.yarn.dist.forceDownloadSchemes`, also clarify the usage of this configuration.

## How was this patch tested?

New UT added.

Author: jerryshao <sshao@hortonworks.com>

Closes #21633 from jerryshao/SPARK-21917-followup.
2018-07-09 10:21:40 +08:00
Michael Mior e58dadb77e [SPARK-23820][CORE] Enable use of long form of callsite in logs
This adds an option to event logging to include the long form of the callsite instead of the short form.

Author: Michael Mior <mmior@uwaterloo.ca>

Closes #21433 from michaelmior/long-callsite.
2018-07-05 08:32:20 -05:00
Stan Zhai 772060d094 [SPARK-24704][WEBUI] Fix the order of stages in the DAG graph
## What changes were proposed in this pull request?

Before:

![wx20180630-155537](https://user-images.githubusercontent.com/1438757/42123357-2c2e2d84-7c83-11e8-8abd-1c2860f38783.png)

After:

![wx20180630-155604](https://user-images.githubusercontent.com/1438757/42123359-32fae990-7c83-11e8-8a7b-cdcee94f9123.png)

## How was this patch tested?

Manual tests.

Author: Stan Zhai <mail@stanzhai.site>

Closes #21680 from stanzhai/fix-dag-graph.
2018-07-04 10:12:36 +02:00
DB Tsai 5585c5765f
[SPARK-24420][BUILD] Upgrade ASM to 6.1 to support JDK9+
## What changes were proposed in this pull request?

Upgrade ASM to 6.1 to support JDK9+

## How was this patch tested?

Existing tests.

Author: DB Tsai <d_tsai@apple.com>

Closes #21459 from dbtsai/asm.
2018-07-03 10:13:48 -07:00
mcheah 42815548c7 [SPARK-24683][K8S] Fix k8s no resource
## What changes were proposed in this pull request?

Make SparkSubmit pass in the main class even if `SparkLauncher.NO_RESOURCE` is the primary resource.

## How was this patch tested?

New integration test written to capture this case.

Author: mcheah <mcheah@palantir.com>

Closes #21660 from mccheah/fix-k8s-no-resource.
2018-07-02 10:24:04 -07:00
xueyu f71e8da5ef [SPARK-24566][CORE] Fix spark.storage.blockManagerSlaveTimeoutMs default config
This PR use spark.network.timeout in place of spark.storage.blockManagerSlaveTimeoutMs when it is not configured, as configuration doc said

manual test

Author: xueyu <278006819@qq.com>

Closes #21575 from xueyumusic/slaveTimeOutConfig.
2018-06-29 10:44:49 -07:00
Xingbo Jiang 5b05966488 [SPARK-24564][TEST] Add test suite for RecordBinaryComparator
## What changes were proposed in this pull request?

Add a new test suite to test RecordBinaryComparator.

## How was this patch tested?

New test suite.

Author: Xingbo Jiang <xingbo.jiang@databricks.com>

Closes #21570 from jiangxb1987/rbc-test.
2018-06-28 14:19:50 +08:00
Kallman, Steven c5aa54d54b [SPARK-24553][WEB-UI] http 302 fixes for href redirect
## What changes were proposed in this pull request?

Updated URL/href links to include a '/' before '?id' to make links consistent and avoid http 302 redirect errors within UI port 4040 tabs.

## How was this patch tested?

Built a runnable distribution and executed jobs. Validated that http 302 redirects are no longer encountered when clicking on links within UI port 4040 tabs.

Author: Steven Kallman <SJKallmangmail.com>

Author: Kallman, Steven <Steven.Kallman@CapitalOne.com>

Closes #21600 from SJKallman/{Spark-24553}{WEB-UI}-redirect-href-fixes.
2018-06-27 15:36:59 -07:00
Marco Gaido 776befbfd5 [SPARK-24660][SHS] Show correct error pages when downloading logs
## What changes were proposed in this pull request?

SHS is showing bad errors when trying to download logs is not successful. This may happen because the requested application doesn't exist or the user doesn't have permissions for it, for instance.

The PR fixes the response when errors occur, so that they are displayed properly.

## How was this patch tested?

manual tests

**Before the patch:**
 1. Unauthorized user
![screen shot 2018-06-26 at 3 53 33 pm](https://user-images.githubusercontent.com/8821783/41918118-f8b37e70-795b-11e8-91e8-d0250239f09d.png)

 2. Non-existing application
![screen shot 2018-06-26 at 3 25 19 pm](https://user-images.githubusercontent.com/8821783/41918082-e3034c72-795b-11e8-970e-cee4a1eae77f.png)

**After the patch**
 1. Unauthorized user
![screen shot 2018-06-26 at 3 41 29 pm](https://user-images.githubusercontent.com/8821783/41918155-0d950476-795c-11e8-8d26-7b7ce73e6fe1.png)

 2. Non-existing application
![screen shot 2018-06-26 at 3 40 37 pm](https://user-images.githubusercontent.com/8821783/41918175-1a14bb88-795c-11e8-91ab-eadf29190a02.png)

Author: Marco Gaido <marcogaido91@gmail.com>

Closes #21644 from mgaido91/SPARK-24660.
2018-06-27 14:26:08 -07:00
Marcelo Vanzin 6d16b9885d [SPARK-24552][CORE][SQL] Use task ID instead of attempt number for writes.
This passes the unique task attempt id instead of attempt number to v2 data sources because attempt number is reused when stages are retried. When attempt numbers are reused, sources that track data by partition id and attempt number may incorrectly clean up data because the same attempt number can be both committed and aborted.

For v1 / Hadoop writes, generate a unique ID based on available attempt numbers to avoid a similar problem.

Closes #21558

Author: Marcelo Vanzin <vanzin@cloudera.com>
Author: Ryan Blue <blue@apache.org>

Closes #21606 from vanzin/SPARK-24552.2.
2018-06-25 16:54:57 -07:00
Takeshi Yamamuro f596ebe4d3 [SPARK-24327][SQL] Verify and normalize a partition column name based on the JDBC resolved schema
## What changes were proposed in this pull request?
This pr modified JDBC datasource code to verify and normalize a partition column based on the JDBC resolved schema before building `JDBCRelation`.

Closes #20370

## How was this patch tested?
Added tests in `JDBCSuite`.

Author: Takeshi Yamamuro <yamamuro@apache.org>

Closes #21379 from maropu/SPARK-24327.
2018-06-24 23:14:42 -07:00
jerryshao 33e77fa89b [SPARK-24518][CORE] Using Hadoop credential provider API to store password
## What changes were proposed in this pull request?

In our distribution,  because we don't do such fine-grained access control of config file, also configuration file is world readable shared between different components, so password may leak to different users.

Hadoop credential provider API support storing password in a secure way, in which Spark could read it in a secure way, so here propose to add support of using credential provider API to get password.

## How was this patch tested?

Adding tests and verified locally.

Author: jerryshao <sshao@hortonworks.com>

Closes #21548 from jerryshao/SPARK-24518.
2018-06-22 10:14:12 -07:00
Hieu Huynh 39dfaf2fd1 [SPARK-24519] Make the threshold for highly compressed map status configurable
**Problem**
MapStatus uses hardcoded value of 2000 partitions to determine if it should use highly compressed map status. We should make it configurable to allow users to more easily tune their jobs with respect to this without having for them to modify their code to change the number of partitions.  Note we can leave this as an internal/undocumented config for now until we have more advise for the users on how to set this config.
Some of my reasoning:
The config gives you a way to easily change something without the user having to change code, redeploy jar, and then run again. You can simply change the config and rerun. It also allows for easier experimentation. Changing the # of partitions has other side affects, whether good or bad is situation dependent. It can be worse are you could be increasing # of output files when you don't want to be, affects the # of tasks needs and thus executors to run in parallel, etc.
There have been various talks about this number at spark summits where people have told customers to increase it to be 2001 partitions. Note if you just do a search for spark 2000 partitions you will fine various things all talking about this number.  This shows that people are modifying their code to take this into account so it seems to me having this configurable would be better.
Once we have more advice for users we could expose this and document information on it.

**What changes were proposed in this pull request?**
I make the hardcoded value mentioned above to be configurable under the name _SHUFFLE_MIN_NUM_PARTS_TO_HIGHLY_COMPRESS_, which has default value to be 2000. Users can set it to the value they want by setting the property name _spark.shuffle.minNumPartitionsToHighlyCompress_

**How was this patch tested?**
I wrote a unit test to make sure that the default value is 2000, and  _IllegalArgumentException_ will be thrown if user set it to a non-positive value. The unit test also checks that highly compressed map status is correctly used when the number of partition is greater than _SHUFFLE_MIN_NUM_PARTS_TO_HIGHLY_COMPRESS_.

Author: Hieu Huynh <“Hieu.huynh@oath.com”>

Closes #21527 from hthuynh2/spark_branch_1.
2018-06-22 09:16:14 -05:00
Marcelo Vanzin c8e909cd49 [SPARK-24589][CORE] Correctly identify tasks in output commit coordinator.
When an output stage is retried, it's possible that tasks from the previous
attempt are still running. In that case, there would be a new task for the
same partition in the new attempt, and the coordinator would allow both
tasks to commit their output since it did not keep track of stage attempts.

The change adds more information to the stage state tracked by the coordinator,
so that only one task is allowed to commit the output in the above case.
The stage state in the coordinator is also maintained across stage retries,
so that a stray speculative task from a previous stage attempt is not allowed
to commit.

This also removes some code added in SPARK-18113 that allowed for duplicate
commit requests; with the RPC code used in Spark 2, that situation cannot
happen, so there is no need to handle it.

Author: Marcelo Vanzin <vanzin@cloudera.com>

Closes #21577 from vanzin/SPARK-24552.
2018-06-21 13:25:15 -05:00
“attilapiros” b56e9c613f [SPARK-16630][YARN] Blacklist a node if executors won't launch on it
## What changes were proposed in this pull request?

This change extends YARN resource allocation handling with blacklisting functionality.
This handles cases when node is messed up or misconfigured such that a container won't launch on it. Before this change backlisting only focused on task execution but this change introduces YarnAllocatorBlacklistTracker which tracks allocation failures per host (when enabled via "spark.yarn.blacklist.executor.launch.blacklisting.enabled").

## How was this patch tested?

### With unit tests

Including a new suite: YarnAllocatorBlacklistTrackerSuite.

#### Manually

It was tested on a cluster by deleting the Spark jars on one of the node.

#### Behaviour before these changes

Starting Spark as:
```
spark2-shell --master yarn --deploy-mode client --num-executors 4  --conf spark.executor.memory=4g --conf "spark.yarn.max.executor.failures=6"
```

Log is:
```
18/04/12 06:49:36 INFO yarn.ApplicationMaster: Final app status: FAILED, exitCode: 11, (reason: Max number of executor failures (6) reached)
18/04/12 06:49:39 INFO yarn.ApplicationMaster: Unregistering ApplicationMaster with FAILED (diag message: Max number of executor failures (6) reached)
18/04/12 06:49:39 INFO impl.AMRMClientImpl: Waiting for application to be successfully unregistered.
18/04/12 06:49:39 INFO yarn.ApplicationMaster: Deleting staging directory hdfs://apiros-1.gce.test.com:8020/user/systest/.sparkStaging/application_1523459048274_0016
18/04/12 06:49:39 INFO util.ShutdownHookManager: Shutdown hook called
```

#### Behaviour after these changes

Starting Spark as:
```
spark2-shell --master yarn --deploy-mode client --num-executors 4  --conf spark.executor.memory=4g --conf "spark.yarn.max.executor.failures=6" --conf "spark.yarn.blacklist.executor.launch.blacklisting.enabled=true"
```

And the log is:
```
18/04/13 05:37:43 INFO yarn.YarnAllocator: Will request 1 executor container(s), each with 1 core(s) and 4505 MB memory (including 409 MB of overhead)
18/04/13 05:37:43 INFO yarn.YarnAllocator: Submitted 1 unlocalized container requests.
18/04/13 05:37:43 INFO yarn.YarnAllocator: Launching container container_1523459048274_0025_01_000008 on host apiros-4.gce.test.com for executor with ID 6
18/04/13 05:37:43 INFO yarn.YarnAllocator: Received 1 containers from YARN, launching executors on 1 of them.
18/04/13 05:37:43 INFO yarn.YarnAllocator: Completed container container_1523459048274_0025_01_000007 on host: apiros-4.gce.test.com (state: COMPLETE, exit status: 1)
18/04/13 05:37:43 INFO yarn.YarnAllocatorBlacklistTracker: blacklisting host as YARN allocation failed: apiros-4.gce.test.com
18/04/13 05:37:43 INFO yarn.YarnAllocatorBlacklistTracker: adding nodes to YARN application master's blacklist: List(apiros-4.gce.test.com)
18/04/13 05:37:43 WARN yarn.YarnAllocator: Container marked as failed: container_1523459048274_0025_01_000007 on host: apiros-4.gce.test.com. Exit status: 1. Diagnostics: Exception from container-launch.
Container id: container_1523459048274_0025_01_000007
Exit code: 1
Stack trace: ExitCodeException exitCode=1:
        at org.apache.hadoop.util.Shell.runCommand(Shell.java:604)
        at org.apache.hadoop.util.Shell.run(Shell.java:507)
        at org.apache.hadoop.util.Shell$ShellCommandExecutor.execute(Shell.java:789)
        at org.apache.hadoop.yarn.server.nodemanager.DefaultContainerExecutor.launchContainer(DefaultContainerExecutor.java:213)
        at org.apache.hadoop.yarn.server.nodemanager.containermanager.launcher.ContainerLaunch.call(ContainerLaunch.java:302)
        at org.apache.hadoop.yarn.server.nodemanager.containermanager.launcher.ContainerLaunch.call(ContainerLaunch.java:82)
        at java.util.concurrent.FutureTask.run(FutureTask.java:266)
        at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1149)
        at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:624)
        at java.lang.Thread.run(Thread.java:748)
```

Where the most important part is:

```
18/04/13 05:37:43 INFO yarn.YarnAllocatorBlacklistTracker: blacklisting host as YARN allocation failed: apiros-4.gce.test.com
18/04/13 05:37:43 INFO yarn.YarnAllocatorBlacklistTracker: adding nodes to YARN application master's blacklist: List(apiros-4.gce.test.com)
```

And execution was continued (no shutdown called).

### Testing the backlisting of the whole cluster

Starting Spark with YARN blacklisting enabled then removing a the Spark core jar one by one from all the cluster nodes. Then executing a simple spark job which fails checking the yarn log the expected exit status is contained:

```
18/06/15 01:07:10 INFO yarn.ApplicationMaster: Final app status: FAILED, exitCode: 11, (reason: Due to executor failures all available nodes are blacklisted)
18/06/15 01:07:13 INFO util.ShutdownHookManager: Shutdown hook called
```

Author: “attilapiros” <piros.attila.zsolt@gmail.com>

Closes #21068 from attilapiros/SPARK-16630.
2018-06-21 09:17:18 -05:00
Marco Gaido bc111463a7 [SPARK-23778][CORE] Avoid unneeded shuffle when union gets an empty RDD
## What changes were proposed in this pull request?

When a `union` is invoked on several RDDs of which one is an empty RDD, the result of the operation is a `UnionRDD`. This causes an unneeded extra-shuffle when all the other RDDs have the same partitioning.

The PR ignores incoming empty RDDs in the union method.

## How was this patch tested?

added UT

Author: Marco Gaido <marcogaido91@gmail.com>

Closes #21333 from mgaido91/SPARK-23778.
2018-06-19 22:29:00 -07:00
Kazuaki Ishizaki 90da7dc241 [SPARK-24452][SQL][CORE] Avoid possible overflow in int add or multiple
## What changes were proposed in this pull request?

This PR fixes possible overflow in int add or multiply. In particular, their overflows in multiply are detected by [Spotbugs](https://spotbugs.github.io/)

The following assignments may cause overflow in right hand side. As a result, the result may be negative.
```
long = int * int
long = int + int
```

To avoid this problem, this PR performs cast from int to long in right hand side.

## How was this patch tested?

Existing UTs.

Author: Kazuaki Ishizaki <ishizaki@jp.ibm.com>

Closes #21481 from kiszk/SPARK-24452.
2018-06-15 13:47:48 -07:00
Jacek Laskowski 495d8cf09a [SPARK-24490][WEBUI] Use WebUI.addStaticHandler in web UIs
`WebUI` defines `addStaticHandler` that web UIs don't use (and simply introduce duplication). Let's clean them up and remove duplications.

Local build and waiting for Jenkins

Author: Jacek Laskowski <jacek@japila.pl>

Closes #21510 from jaceklaskowski/SPARK-24490-Use-WebUI.addStaticHandler.
2018-06-15 09:59:02 -07:00
mcheah 270a9a3cac [SPARK-24248][K8S] Use level triggering and state reconciliation in scheduling and lifecycle
## What changes were proposed in this pull request?

Previously, the scheduler backend was maintaining state in many places, not only for reading state but also writing to it. For example, state had to be managed in both the watch and in the executor allocator runnable. Furthermore, one had to keep track of multiple hash tables.

We can do better here by:

1. Consolidating the places where we manage state. Here, we take inspiration from traditional Kubernetes controllers. These controllers tend to follow a level-triggered mechanism. This means that the controller will continuously monitor the API server via watches and polling, and on periodic passes, the controller will reconcile the current state of the cluster with the desired state. We implement this by introducing the concept of a pod snapshot, which is a given state of the executors in the Kubernetes cluster. We operate periodically on snapshots. To prevent overloading the API server with polling requests to get the state of the cluster (particularly for executor allocation where we want to be checking frequently to get executors to launch without unbearably bad latency), we use watches to populate snapshots by applying observed events to a previous snapshot to get a new snapshot. Whenever we do poll the cluster, the polled state replaces any existing snapshot - this ensures eventual consistency and mirroring of the cluster, as is desired in a level triggered architecture.

2. Storing less specialized in-memory state in general. Previously we were creating hash tables to represent the state of executors. Instead, it's easier to represent state solely by the snapshots.

## How was this patch tested?

Integration tests should test there's no regressions end to end. Unit tests to be updated, in particular focusing on different orderings of events, particularly accounting for when events come in unexpected ordering.

Author: mcheah <mcheah@palantir.com>

Closes #21366 from mccheah/event-queue-driven-scheduling.
2018-06-14 15:56:21 -07:00
Xingbo Jiang 534065efeb [MINOR][CORE][TEST] Remove unnecessary sort in UnsafeInMemorySorterSuite
## What changes were proposed in this pull request?

We don't require specific ordering of the input data, the sort action is not necessary and misleading.

## How was this patch tested?

Existing test suite.

Author: Xingbo Jiang <xingbo.jiang@databricks.com>

Closes #21536 from jiangxb1987/sorterSuite.
2018-06-14 14:20:48 +08:00
Jungtaek Lim 4c388bccf1 [SPARK-24485][SS] Measure and log elapsed time for filesystem operations in HDFSBackedStateStoreProvider
## What changes were proposed in this pull request?

This patch measures and logs elapsed time for each operation which communicate with file system (mostly remote HDFS in production) in HDFSBackedStateStoreProvider to help investigating any latency issue.

## How was this patch tested?

Manually tested.

Author: Jungtaek Lim <kabhwan@gmail.com>

Closes #21506 from HeartSaVioR/SPARK-24485.
2018-06-13 12:36:20 +08:00
Li Jin 9786ce66c5 [SPARK-22239][SQL][PYTHON] Enable grouped aggregate pandas UDFs as window functions with unbounded window frames
## What changes were proposed in this pull request?
This PR enables using a grouped aggregate pandas UDFs as window functions. The semantics is the same as using SQL aggregation function as window functions.

```
       >>> from pyspark.sql.functions import pandas_udf, PandasUDFType
       >>> from pyspark.sql import Window
       >>> df = spark.createDataFrame(
       ...     [(1, 1.0), (1, 2.0), (2, 3.0), (2, 5.0), (2, 10.0)],
       ...     ("id", "v"))
       >>> pandas_udf("double", PandasUDFType.GROUPED_AGG)
       ... def mean_udf(v):
       ...     return v.mean()
       >>> w = Window.partitionBy('id')
       >>> df.withColumn('mean_v', mean_udf(df['v']).over(w)).show()
       +---+----+------+
       | id|   v|mean_v|
       +---+----+------+
       |  1| 1.0|   1.5|
       |  1| 2.0|   1.5|
       |  2| 3.0|   6.0|
       |  2| 5.0|   6.0|
       |  2|10.0|   6.0|
       +---+----+------+
```

The scope of this PR is somewhat limited in terms of:
(1) Only supports unbounded window, which acts essentially as group by.
(2) Only supports aggregation functions, not "transform" like window functions (n -> n mapping)

Both of these are left as future work. Especially, (1) needs careful thinking w.r.t. how to pass rolling window data to python efficiently. (2) is a bit easier but does require more changes therefore I think it's better to leave it as a separate PR.

## How was this patch tested?

WindowPandasUDFTests

Author: Li Jin <ice.xelloss@gmail.com>

Closes #21082 from icexelloss/SPARK-22239-window-udf.
2018-06-13 09:10:52 +08:00
Marco Gaido f53818d35b [SPARK-24506][UI] Add UI filters to tabs added after binding
## What changes were proposed in this pull request?

Currently, `spark.ui.filters` are not applied to the handlers added after binding the server. This means that every page which is added after starting the UI will not have the filters configured on it. This can allow unauthorized access to the pages.

The PR adds the filters also to the handlers added after the UI starts.

## How was this patch tested?

manual tests (without the patch, starting the thriftserver with `--conf spark.ui.filters=org.apache.hadoop.security.authentication.server.AuthenticationFilter --conf spark.org.apache.hadoop.security.authentication.server.AuthenticationFilter.params="type=simple"` you can access `http://localhost:4040/sqlserver`; with the patch, 401 is the response as for the other pages).

Author: Marco Gaido <marcogaido91@gmail.com>

Closes #21523 from mgaido91/SPARK-24506.
2018-06-12 16:42:44 -07:00
Fangshi Li cc88d7fad1 [SPARK-24216][SQL] Spark TypedAggregateExpression uses getSimpleName that is not safe in scala
## What changes were proposed in this pull request?

When user create a aggregator object in scala and pass the aggregator to Spark Dataset's agg() method, Spark's will initialize TypedAggregateExpression with the nodeName field as aggregator.getClass.getSimpleName. However, getSimpleName is not safe in scala environment, depending on how user creates the aggregator object. For example, if the aggregator class full qualified name is "com.my.company.MyUtils$myAgg$2$", the getSimpleName will throw java.lang.InternalError "Malformed class name". This has been reported in scalatest https://github.com/scalatest/scalatest/pull/1044 and discussed in many scala upstream jiras such as SI-8110, SI-5425.

To fix this issue, we follow the solution in https://github.com/scalatest/scalatest/pull/1044 to add safer version of getSimpleName as a util method, and TypedAggregateExpression will invoke this util method rather than getClass.getSimpleName.

## How was this patch tested?
added unit test

Author: Fangshi Li <fli@linkedin.com>

Closes #21276 from fangshil/SPARK-24216.
2018-06-12 12:10:08 -07:00
Jonathan Kelly 9b6f24202f [MINOR][CORE] Log committer class used by HadoopMapRedCommitProtocol
## What changes were proposed in this pull request?

When HadoopMapRedCommitProtocol is used (e.g., when using saveAsTextFile() or
saveAsHadoopFile() with RDDs), it's not easy to determine which output committer
class was used, so this PR simply logs the class that was used, similarly to what
is done in SQLHadoopMapReduceCommitProtocol.

## How was this patch tested?

Built Spark then manually inspected logging when calling saveAsTextFile():

```scala
scala> sc.setLogLevel("INFO")
scala> sc.textFile("README.md").saveAsTextFile("/tmp/out")
...
18/05/29 10:06:20 INFO HadoopMapRedCommitProtocol: Using output committer class org.apache.hadoop.mapred.FileOutputCommitter
```

Author: Jonathan Kelly <jonathak@amazon.com>

Closes #21452 from ejono/master.
2018-06-11 16:41:15 -05:00
Ilan Filonenko 1a644afbac [SPARK-23984][K8S] Initial Python Bindings for PySpark on K8s
## What changes were proposed in this pull request?

Introducing Python Bindings for PySpark.

- [x] Running PySpark Jobs
- [x] Increased Default Memory Overhead value
- [ ] Dependency Management for virtualenv/conda

## How was this patch tested?

This patch was tested with

- [x] Unit Tests
- [x] Integration tests with [this addition](https://github.com/apache-spark-on-k8s/spark-integration/pull/46)
```
KubernetesSuite:
- Run SparkPi with no resources
- Run SparkPi with a very long application name.
- Run SparkPi with a master URL without a scheme.
- Run SparkPi with an argument.
- Run SparkPi with custom labels, annotations, and environment variables.
- Run SparkPi with a test secret mounted into the driver and executor pods
- Run extraJVMOptions check on driver
- Run SparkRemoteFileTest using a remote data file
- Run PySpark on simple pi.py example
- Run PySpark with Python2 to test a pyfiles example
- Run PySpark with Python3 to test a pyfiles example
Run completed in 4 minutes, 28 seconds.
Total number of tests run: 11
Suites: completed 2, aborted 0
Tests: succeeded 11, failed 0, canceled 0, ignored 0, pending 0
All tests passed.
```

Author: Ilan Filonenko <if56@cornell.edu>
Author: Ilan Filonenko <ifilondz@gmail.com>

Closes #21092 from ifilonenko/master.
2018-06-08 11:18:34 -07:00
xueyu a2166ecdda [SPARK-24455][CORE] fix typo in TaskSchedulerImpl comment
change runTasks to submitTasks  in the TaskSchedulerImpl.scala 's comment

Author: xueyu <xueyu@yidian-inc.com>
Author: Xue Yu <278006819@qq.com>

Closes #21485 from xueyumusic/fixtypo1.
2018-06-04 08:10:49 +07:00
Xingbo Jiang 8ef167a5f9 [SPARK-24340][CORE] Clean up non-shuffle disk block manager files following executor exits on a Standalone cluster
## What changes were proposed in this pull request?

Currently we only clean up the local directories on application removed. However, when executors die and restart repeatedly, many temp files are left untouched in the local directories, which is undesired behavior and could cause disk space used up gradually.

We can detect executor death in the Worker, and clean up the non-shuffle files (files not ended with ".index" or ".data") in the local directories, we should not touch the shuffle files since they are expected to be used by the external shuffle service.

Scope of this PR is limited to only implement the cleanup logic on a Standalone cluster, we defer to experts familiar with other cluster managers(YARN/Mesos/K8s) to determine whether it's worth to add similar support.

## How was this patch tested?

Add new test suite to cover.

Author: Xingbo Jiang <xingbo.jiang@databricks.com>

Closes #21390 from jiangxb1987/cleanupNonshuffleFiles.
2018-06-01 13:46:05 -07:00
hyukjinkwon 2c9c8629b7 [MINOR][YARN] Add YARN-specific credential providers in debug logging message
This PR adds a debugging log for YARN-specific credential providers which is loaded by service loader mechanism.

It took me a while to debug if it's actually loaded or not. I had to explicitly set the deprecated configuration and check if that's actually being loaded.

The change scope is manually tested. Logs are like:

```
Using the following builtin delegation token providers: hadoopfs, hive, hbase.
Using the following YARN-specific credential providers: yarn-test.
```

Author: hyukjinkwon <gurwls223@apache.org>

Closes #21466 from HyukjinKwon/minor-log.

Change-Id: I18e2fb8eeb3289b148f24c47bb3130a560a881cf
2018-06-01 08:44:57 +08:00
Tathagata Das 223df5d9d4 [SPARK-24397][PYSPARK] Added TaskContext.getLocalProperty(key) in Python
## What changes were proposed in this pull request?

This adds a new API `TaskContext.getLocalProperty(key)` to the Python TaskContext. It mirrors the Java TaskContext API of returning a string value if the key exists, or None if the key does not exist.

## How was this patch tested?
New test added.

Author: Tathagata Das <tathagata.das1565@gmail.com>

Closes #21437 from tdas/SPARK-24397.
2018-05-31 11:23:57 -07:00
Marcelo Vanzin 7a82e93b34 [SPARK-24414][UI] Calculate the correct number of tasks for a stage.
This change takes into account all non-pending tasks when calculating
the number of tasks to be shown. This also means that when the stage
is pending, the task table (or, in fact, most of the data in the stage
page) will not be rendered.

I also fixed the label when the known number of tasks is larger than
the recorded number of tasks (it was inverted).

Author: Marcelo Vanzin <vanzin@cloudera.com>

Closes #21457 from vanzin/SPARK-24414.
2018-05-31 10:05:20 -07:00
Sean Owen 698b9a0981 [WEBUI] Avoid possibility of script in query param keys
As discussed separately, this avoids the possibility of XSS on certain request param keys.

CC vanzin

Author: Sean Owen <srowen@gmail.com>

Closes #21464 from srowen/XSS2.
2018-05-31 09:34:39 -07:00
William Sheu 0053e153fa [SPARK-24337][CORE] Improve error messages for Spark conf values
## What changes were proposed in this pull request?

Improve the exception messages when retrieving Spark conf values to include the key name when the value is invalid.

## How was this patch tested?

Unit tests for all get* operations in SparkConf that require a specific value format

Author: William Sheu <william.sheu@databricks.com>

Closes #21454 from PenguinToast/SPARK-24337-spark-config-errors.
2018-05-30 22:37:27 -07:00