Commit graph

1973 commits

Author SHA1 Message Date
Josh Rosen 4f5f9b670e [SPARK-16925] Master should call schedule() after all executor exit events, not only failures
## What changes were proposed in this pull request?

This patch fixes a bug in Spark's standalone Master which could cause applications to hang if tasks cause executors to exit with zero exit codes.

As an example of the bug, run

```
sc.parallelize(1 to 1, 1).foreachPartition { _ => System.exit(0) }
```

on a standalone cluster which has a single Spark application. This will cause all executors to die but those executors won't be replaced unless another Spark application or worker joins or leaves the cluster (or if an executor exits with a non-zero exit code). This behavior is caused by a bug in how the Master handles the `ExecutorStateChanged` event: the current implementation calls `schedule()` only if the executor exited with a non-zero exit code, so a task which causes a JVM to unexpectedly exit "cleanly" will skip the `schedule()` call.

This patch addresses this by modifying the `ExecutorStateChanged` to always unconditionally call `schedule()`. This should be safe because it should always be safe to call `schedule()`; adding extra `schedule()` calls can only affect performance and should not introduce correctness bugs.

## How was this patch tested?

I added a regression test in `DistributedSuite`.

Author: Josh Rosen <joshrosen@databricks.com>

Closes #14510 from JoshRosen/SPARK-16925.
2016-08-06 19:29:19 -07:00
Josh Rosen e9fc0b6a8b [SPARK-16787] SparkContext.addFile() should not throw if called twice with the same file
## What changes were proposed in this pull request?

The behavior of `SparkContext.addFile()` changed slightly with the introduction of the Netty-RPC-based file server, which was introduced in Spark 1.6 (where it was disabled by default) and became the default / only file server in Spark 2.0.0.

Prior to 2.0, calling `SparkContext.addFile()` with files that have the same name and identical contents would succeed. This behavior was never explicitly documented but Spark has behaved this way since very early 1.x versions.

In 2.0 (or 1.6 with the Netty file server enabled), the second `addFile()` call will fail with a requirement error because NettyStreamManager tries to guard against duplicate file registration.

This problem also affects `addJar()` in a more subtle way: the `fileServer.addJar()` call will also fail with an exception but that exception is logged and ignored; I believe that the problematic exception-catching path was mistakenly copied from some old code which was only relevant to very old versions of Spark and YARN mode.

I believe that this change of behavior was unintentional, so this patch weakens the `require` check so that adding the same filename at the same path will succeed.

At file download time, Spark tasks will fail with exceptions if an executor already has a local copy of a file and that file's contents do not match the contents of the file being downloaded / added. As a result, it's important that we prevent files with the same name and different contents from being served because allowing that can effectively brick an executor by preventing it from successfully launching any new tasks. Before this patch's change, this was prevented by forbidding `addFile()` from being called twice on files with the same name. Because Spark does not defensively copy local files that are passed to `addFile` it is vulnerable to files' contents changing, so I think it's okay to rely on an implicit assumption that these files are intended to be immutable (since if they _are_ mutable then this can lead to either explicit task failures or implicit incorrectness (in case new executors silently get newer copies of the file while old executors continue to use an older version)). To guard against this, I have decided to only update the file addition timestamps on the first call to `addFile()`; duplicate calls will succeed but will not update the timestamp. This behavior is fine as long as we assume files are immutable, which seems reasonable given the behaviors described above.

As part of this change, I also improved the thread-safety of the `addedJars` and `addedFiles` maps; this is important because these maps may be concurrently read by a task launching thread and written by a driver thread in case the user's driver code is multi-threaded.

## How was this patch tested?

I added regression tests in `SparkContextSuite`.

Author: Josh Rosen <joshrosen@databricks.com>

Closes #14396 from JoshRosen/SPARK-16787.
2016-08-02 12:02:11 -07:00
Sean Owen 0dc4310b47 [SPARK-16694][CORE] Use for/foreach rather than map for Unit expressions whose side effects are required
## What changes were proposed in this pull request?

Use foreach/for instead of map where operation requires execution of body, not actually defining a transformation

## How was this patch tested?

Jenkins

Author: Sean Owen <sowen@cloudera.com>

Closes #14332 from srowen/SPARK-16694.
2016-07-30 04:42:38 -07:00
Michael Gummelt 266b92faff [SPARK-16637] Unified containerizer
## What changes were proposed in this pull request?

New config var: spark.mesos.docker.containerizer={"mesos","docker" (default)}

This adds support for running docker containers via the Mesos unified containerizer: http://mesos.apache.org/documentation/latest/container-image/

The benefit is losing the dependency on `dockerd`, and all the costs which it incurs.

I've also updated the supported Mesos version to 0.28.2 for support of the required protobufs.

This is blocked on: https://github.com/apache/spark/pull/14167

## How was this patch tested?

- manually testing jobs submitted with both "mesos" and "docker" settings for the new config var.
- spark/mesos integration test suite

Author: Michael Gummelt <mgummelt@mesosphere.io>

Closes #14275 from mgummelt/unified-containerizer.
2016-07-29 05:50:47 -07:00
Mark Grover 70f846a313 [SPARK-5847][CORE] Allow for configuring MetricsSystem's use of app ID to namespace all metrics
## What changes were proposed in this pull request?
Adding a new property to SparkConf called spark.metrics.namespace that allows users to
set a custom namespace for executor and driver metrics in the metrics systems.

By default, the root namespace used for driver or executor metrics is
the value of `spark.app.id`. However, often times, users want to be able to track the metrics
across apps for driver and executor metrics, which is hard to do with application ID
(i.e. `spark.app.id`) since it changes with every invocation of the app. For such use cases,
users can set the `spark.metrics.namespace` property to another spark configuration key like
`spark.app.name` which is then used to populate the root namespace of the metrics system
(with the app name in our example). `spark.metrics.namespace` property can be set to any
arbitrary spark property key, whose value would be used to set the root namespace of the
metrics system. Non driver and executor metrics are never prefixed with `spark.app.id`, nor
does the `spark.metrics.namespace` property have any such affect on such metrics.

## How was this patch tested?
Added new unit tests, modified existing unit tests.

Author: Mark Grover <mark@apache.org>

Closes #14270 from markgrover/spark-5847.
2016-07-27 10:13:15 -07:00
Dhruve Ashar 0b71d9ae08 [SPARK-15703][SCHEDULER][CORE][WEBUI] Make ListenerBus event queue size configurable
## What changes were proposed in this pull request?
This change adds a new configuration entry to specify the size of the spark listener bus event queue. The value for this config ("spark.scheduler.listenerbus.eventqueue.size") is set to a default to 10000.

Note:
I haven't currently documented the configuration entry. We can decide whether it would be appropriate to make it a public configuration or keep it as an undocumented one. Refer JIRA for more details.

## How was this patch tested?
Ran existing jobs and verified the event queue size with debug logs and from the Spark WebUI Environment tab.

Author: Dhruve Ashar <dhruveashar@gmail.com>

Closes #14269 from dhruve/bug/SPARK-15703.
2016-07-26 13:23:33 -05:00
Philipp Hoffmann 0869b3a5f0 [SPARK-15271][MESOS] Allow force pulling executor docker images
## What changes were proposed in this pull request?

(Please fill in changes proposed in this fix)

## How was this patch tested?

(Please explain how this patch was tested. E.g. unit tests, integration tests, manual tests)

(If this patch involves UI changes, please attach a screenshot; otherwise, remove this)

Mesos agents by default will not pull docker images which are cached
locally already. In order to run Spark executors from mutable tags like
`:latest` this commit introduces a Spark setting
(`spark.mesos.executor.docker.forcePullImage`). Setting this flag to
true will tell the Mesos agent to force pull the docker image (default is `false` which is consistent with the previous
implementation and Mesos' default
behaviour).

Author: Philipp Hoffmann <mail@philipphoffmann.de>

Closes #14348 from philipphoffmann/force-pull-image.
2016-07-26 16:09:10 +01:00
Tao Lin db36e1e75d [SPARK-15590][WEBUI] Paginate Job Table in Jobs tab
## What changes were proposed in this pull request?

This patch adds pagination support for the Job Tables in the Jobs tab. Pagination is provided for all of the three Job Tables (active, completed, and failed). Interactions (jumping, sorting, and setting page size) for paged tables are also included.

The diff didn't keep track of some lines based on the original ones. The function `makeRow`of the original `AllJobsPage.scala` is reused. They are separated at the beginning of the function `jobRow` (L427-439) and the function `row`(L594-618) in the new `AllJobsPage.scala`.

## How was this patch tested?

Tested manually by using checking the Web UI after completing and failing hundreds of jobs.
Generate completed jobs by:
```scala
val d = sc.parallelize(Array(1,2,3,4,5))
for(i <- 1 to 255){ var b = d.collect() }
```
Generate failed jobs by calling the following code multiple times:
```scala
var b = d.map(_/0).collect()
```
Interactions like jumping, sorting, and setting page size are all tested.

This shows the pagination for completed jobs:
![paginate success jobs](https://cloud.githubusercontent.com/assets/5558370/15986498/efa12ef6-303b-11e6-8b1d-c3382aeb9ad0.png)

This shows the sorting works in job tables:
![sorting](https://cloud.githubusercontent.com/assets/5558370/15986539/98c8a81a-303c-11e6-86f2-8d2bc7924ee9.png)

This shows the pagination for failed jobs and the effect of jumping and setting page size:
![paginate failed jobs](https://cloud.githubusercontent.com/assets/5558370/15986556/d8c1323e-303c-11e6-8e4b-7bdb030ea42b.png)

Author: Tao Lin <nblintao@gmail.com>

Closes #13620 from nblintao/dev.
2016-07-25 17:35:50 -07:00
jerryshao f5ea7fe539 [SPARK-16166][CORE] Also take off-heap memory usage into consideration in log and webui display
## What changes were proposed in this pull request?

Currently in the log and UI display, only on-heap storage memory is calculated and displayed,

```
16/06/27 13:41:52 INFO MemoryStore: Block rdd_5_0 stored as values in memory (estimated size 17.8 KB, free 665.9 MB)
```
<img width="1232" alt="untitled" src="https://cloud.githubusercontent.com/assets/850797/16369960/53fb614e-3c6e-11e6-8fa3-7ffe65abcb49.png">

With [SPARK-13992](https://issues.apache.org/jira/browse/SPARK-13992) off-heap memory is supported for data persistence, so here change to also take off-heap storage memory into consideration.

## How was this patch tested?

Unit test and local verification.

Author: jerryshao <sshao@hortonworks.com>

Closes #13920 from jerryshao/SPARK-16166.
2016-07-25 15:17:06 -07:00
Josh Rosen fc17121d59 Revert "[SPARK-15271][MESOS] Allow force pulling executor docker images"
This reverts commit 978cd5f125.
2016-07-25 12:43:44 -07:00
Philipp Hoffmann 978cd5f125 [SPARK-15271][MESOS] Allow force pulling executor docker images
## What changes were proposed in this pull request?

Mesos agents by default will not pull docker images which are cached
locally already. In order to run Spark executors from mutable tags like
`:latest` this commit introduces a Spark setting
`spark.mesos.executor.docker.forcePullImage`. Setting this flag to
true will tell the Mesos agent to force pull the docker image (default is `false` which is consistent with the previous
implementation and Mesos' default
behaviour).

## How was this patch tested?

I ran a sample application including this change on a Mesos cluster and verified the correct behaviour for both, with and without, force pulling the executor image. As expected the image is being force pulled if the flag is set.

Author: Philipp Hoffmann <mail@philipphoffmann.de>

Closes #13051 from philipphoffmann/force-pull-image.
2016-07-25 20:14:47 +01:00
Brian Cho daace60142 [SPARK-5581][CORE] When writing sorted map output file, avoid open / …
…close between each partition

## What changes were proposed in this pull request?

Replace commitAndClose with separate commit and close to avoid opening and closing
the file between partitions.

## How was this patch tested?

Run existing unit tests, add a few unit tests regarding reverts.

Observed a ~20% reduction in total time in tasks on stages with shuffle
writes to many partitions.

JoshRosen

Author: Brian Cho <bcho@fb.com>

Closes #13382 from dafrista/separatecommit-master.
2016-07-24 19:36:58 -07:00
Michael Gummelt 235cb256d0 [SPARK-16194] Mesos Driver env vars
## What changes were proposed in this pull request?

Added new configuration namespace: spark.mesos.env.*

This allows a user submitting a job in cluster mode to set arbitrary environment variables on the driver.
spark.mesos.driverEnv.KEY=VAL will result in the env var "KEY" being set to "VAL"

I've also refactored the tests a bit so we can re-use code in MesosClusterScheduler.

And I've refactored the command building logic in `buildDriverCommand`.  Command builder values were very intertwined before, and now it's easier to determine exactly how each variable is set.

## How was this patch tested?

unit tests

Author: Michael Gummelt <mgummelt@mesosphere.io>

Closes #14167 from mgummelt/driver-env-vars.
2016-07-21 18:29:00 +01:00
Marcelo Vanzin 75a06aa256 [SPARK-16272][CORE] Allow config values to reference conf, env, system props.
This allows configuration to be more flexible, for example, when the cluster does
not have a homogeneous configuration (e.g. packages are installed on different
paths in different nodes). By allowing one to reference the environment from
the conf, it becomes possible to work around those in certain cases.

As part of the implementation, ConfigEntry now keeps track of all "known" configs
(i.e. those created through the use of ConfigBuilder), since that list is used
by the resolution code. This duplicates some code in SQLConf, which could potentially
be merged with this now. It will also make it simpler to implement some missing
features such as filtering which configs show up in the UI or in event logs - which
are not part of this change.

Another change is in the way ConfigEntry reads config data; it now takes a string
map and a function that reads env variables, so that it can be called both from
SparkConf and SQLConf. This makes it so both places follow the same read path,
instead of having to replicate certain logic in SQLConf. There are still a
couple of methods in SQLConf that peek into fields of ConfigEntry directly,
though.

Tested via unit tests, and by using the new variable expansion functionality
in a shell session with a custom spark.sql.hive.metastore.jars value.

Author: Marcelo Vanzin <vanzin@cloudera.com>

Closes #14022 from vanzin/SPARK-16272.
2016-07-20 18:24:35 -07:00
Sean Owen 4b079dc396 [SPARK-16613][CORE] RDD.pipe returns values for empty partitions
## What changes were proposed in this pull request?

Document RDD.pipe semantics; don't execute process for empty input partitions.

Note this includes the fix in https://github.com/apache/spark/pull/14256 because it's necessary to even test this. One or the other will merge the fix.

## How was this patch tested?

Jenkins tests including new test.

Author: Sean Owen <sowen@cloudera.com>

Closes #14260 from srowen/SPARK-16613.
2016-07-20 09:48:52 -07:00
Shivaram Venkataraman fc23263623 [SPARK-10683][SPARK-16510][SPARKR] Move SparkR include jar test to SparkSubmitSuite
## What changes were proposed in this pull request?

This change moves the include jar test from R to SparkSubmitSuite and uses a dynamically compiled jar. This helps us remove the binary jar from the R package and solves both the CRAN warnings and the lack of source being available for this jar.

## How was this patch tested?
SparkR unit tests, SparkSubmitSuite, check-cran.sh

Author: Shivaram Venkataraman <shivaram@cs.berkeley.edu>

Closes #14243 from shivaram/sparkr-jar-move.
2016-07-19 19:28:08 -07:00
Andrew Duffy 004e29cba5 [SPARK-14702] Make environment of SparkLauncher launched process more configurable
## What changes were proposed in this pull request?

Adds a few public methods to `SparkLauncher` to allow configuring some extra features of the `ProcessBuilder`, including the working directory, output and error stream redirection.

## How was this patch tested?

Unit testing + simple Spark driver programs

Author: Andrew Duffy <root@aduffy.org>

Closes #14201 from andreweduffy/feature/launcher.
2016-07-19 17:08:38 -07:00
Liwei Lin 0bd76e872b [SPARK-16620][CORE] Add back the tokenization process in RDD.pipe(command: String)
## What changes were proposed in this pull request?

Currently `RDD.pipe(command: String)`:
- works only when the command is specified without any options, such as `RDD.pipe("wc")`
- does NOT work when the command is specified with some options, such as `RDD.pipe("wc -l")`

This is a regression from Spark 1.6.

This patch adds back the tokenization process in `RDD.pipe(command: String)` to fix this regression.

## How was this patch tested?
Added a test which:
- would pass in `1.6`
- _[prior to this patch]_ would fail in `master`
- _[after this patch]_ would pass in `master`

Author: Liwei Lin <lwlin7@gmail.com>

Closes #14256 from lw-lin/rdd-pipe.
2016-07-19 10:24:48 -07:00
jerryshao d8220c1e5e [SPARK-16435][YARN][MINOR] Add warning log if initialExecutors is less than minExecutors
## What changes were proposed in this pull request?

Currently if `spark.dynamicAllocation.initialExecutors` is less than `spark.dynamicAllocation.minExecutors`, Spark will automatically pick the minExecutors without any warning. While in 1.6 Spark will throw exception if configured like this. So here propose to add warning log if these parameters are configured invalidly.

## How was this patch tested?

Unit test added to verify the scenario.

Author: jerryshao <sshao@hortonworks.com>

Closes #14149 from jerryshao/SPARK-16435.
2016-07-13 13:24:47 -05:00
Alex Bozarth f156136dae [SPARK-16375][WEB UI] Fixed misassigned var: numCompletedTasks was assigned to numSkippedTasks
## What changes were proposed in this pull request?

I fixed a misassigned var,  numCompletedTasks was assigned to numSkippedTasks in the convertJobData method

## How was this patch tested?

dev/run-tests

Author: Alex Bozarth <ajbozart@us.ibm.com>

Closes #14141 from ajbozarth/spark16375.
2016-07-13 10:45:06 +01:00
Eric Liang d8b06f18dc [SPARK-16432] Empty blocks fail to serialize due to assert in ChunkedByteBuffer
## What changes were proposed in this pull request?

It's possible to also change the callers to not pass in empty chunks, but it seems cleaner to just allow `ChunkedByteBuffer` to handle empty arrays. cc JoshRosen

## How was this patch tested?

Unit tests, also checked that the original reproduction case in https://github.com/apache/spark/pull/11748#issuecomment-230760283 is resolved.

Author: Eric Liang <ekl@databricks.com>

Closes #14099 from ericl/spark-16432.
2016-07-08 20:18:49 -07:00
Tom Magrino ce3ea96980 [SPARK-15885][WEB UI] Provide links to executor logs from stage details page in UI
## What changes were proposed in this pull request?

This moves over old PR https://github.com/apache/spark/pull/13664 to target master rather than branch-1.6.

Added links to logs (or an indication that there are no logs) for entries which list an executor in the stage details page of the UI.

This helps streamline the workflow where a user views a stage details page and determines that they would like to see the associated executor log for further examination.  Previously, a user would have to cross reference the executor id listed on the stage details page with the corresponding entry on the executors tab.

Link to the JIRA: https://issues.apache.org/jira/browse/SPARK-15885

## How was this patch tested?

Ran existing unit tests.
Ran test queries on a platform which did not record executor logs and again on a platform which did record executor logs and verified that the new table column was empty and links to the logs (which were verified as linking to the appropriate files), respectively.

Attached is a screenshot of the UI page with no links, with the new columns highlighted.  Additional screenshot of these columns with the populated links.

Without links:
![updated without logs](https://cloud.githubusercontent.com/assets/1450821/16059721/2b69dbaa-3239-11e6-9eed-e539764ca159.png)

With links:
![updated with logs](https://cloud.githubusercontent.com/assets/1450821/16059725/32c6e316-3239-11e6-90bd-2553f43f7779.png)

This contribution is my original work and I license the work to the project under the Apache Spark project's open source license.

Author: Tom Magrino <tmagrino@fb.com>

Closes #13861 from tmagrino/uilogstweak.
2016-07-07 00:02:39 -07:00
petermaxlee 480357cc6d [SPARK-16304] LinkageError should not crash Spark executor
## What changes were proposed in this pull request?
This patch updates the failure handling logic so Spark executor does not crash when seeing LinkageError.

## How was this patch tested?
Added an end-to-end test in FailureSuite.

Author: petermaxlee <petermaxlee@gmail.com>

Closes #13982 from petermaxlee/SPARK-16304.
2016-07-06 10:46:22 -07:00
Sean Owen 2075bf8ef6 [SPARK-16182][CORE] Utils.scala -- terminateProcess() should call Process.destroyForcibly() if and only if Process.destroy() fails
## What changes were proposed in this pull request?

Utils.terminateProcess should `destroy()` first and only fall back to `destroyForcibly()` if it fails. It's kind of bad that we're force-killing executors -- and only in Java 8. See JIRA for an example of the impact: no shutdown

While here: `Utils.waitForProcess` should use the Java 8 method if available instead of a custom implementation.

## How was this patch tested?

Existing tests, which cover the force-kill case, and Amplab tests, which will cover both Java 7 and Java 8 eventually. However I tested locally on Java 8 and the PR builder will try Java 7 here.

Author: Sean Owen <sowen@cloudera.com>

Closes #13973 from srowen/SPARK-16182.
2016-07-01 09:22:27 +01:00
Imran Rashid fdf9f94f8c [SPARK-15865][CORE] Blacklist should not result in job hanging with less than 4 executors
## What changes were proposed in this pull request?

Before this change, when you turn on blacklisting with `spark.scheduler.executorTaskBlacklistTime`, but you have fewer than `spark.task.maxFailures` executors, you can end with a job "hung" after some task failures.

Whenever a taskset is unable to schedule anything on resourceOfferSingleTaskSet, we check whether the last pending task can be scheduled on *any* known executor.  If not, the taskset (and any corresponding jobs) are failed.
* Worst case, this is O(maxTaskFailures + numTasks).  But unless many executors are bad, this should be small
* This does not fail as fast as possible -- when a task becomes unschedulable, we keep scheduling other tasks.  This is to avoid an O(numPendingTasks * numExecutors) operation
* Also, it is conceivable this fails too quickly.  You may be 1 millisecond away from unblacklisting a place for a task to run, or acquiring a new executor.

## How was this patch tested?

Added unit test which failed before the change, ran new test 5k times manually, ran all scheduler tests manually, and the full suite via jenkins.

Author: Imran Rashid <irashid@cloudera.com>

Closes #13603 from squito/progress_w_few_execs_and_blacklist.
2016-06-30 13:36:06 -05:00
Sital Kedia 07f46afc73 [SPARK-13850] Force the sorter to Spill when number of elements in th…
## What changes were proposed in this pull request?

Force the sorter to Spill when number of elements in the pointer array reach a certain size. This is to workaround the issue of timSort failing on large buffer size.

## How was this patch tested?

Tested by running a job which was failing without this change due to TimSort bug.

Author: Sital Kedia <skedia@fb.com>

Closes #13107 from sitalkedia/fix_TimSort.
2016-06-30 10:53:18 -07:00
Tom Magrino ae14f36235 [SPARK-16148][SCHEDULER] Allow for underscores in TaskLocation in the Executor ID
## What changes were proposed in this pull request?

Previously, the TaskLocation implementation would not allow for executor ids which include underscores.  This tweaks the string split used to get the hostname and executor id, allowing for underscores in the executor id.

This addresses the JIRA found here: https://issues.apache.org/jira/browse/SPARK-16148

This is moved over from a previous PR against branch-1.6: https://github.com/apache/spark/pull/13857

## How was this patch tested?

Ran existing unit tests for core and streaming.  Manually ran a simple streaming job with an executor whose id contained underscores and confirmed that the job ran successfully.

This is my original work and I license the work to the project under the project's open source license.

Author: Tom Magrino <tmagrino@fb.com>

Closes #13858 from tmagrino/fixtasklocation.
2016-06-28 13:36:41 -07:00
Imran Rashid c15b552dd5 [SPARK-16106][CORE] TaskSchedulerImpl should properly track executors added to existing hosts
## What changes were proposed in this pull request?

TaskSchedulerImpl used to only set `newExecAvailable` when a new *host* was added, not when a new executor was added to an existing host.  It also didn't update some internal state tracking live executors until a task was scheduled on the executor.  This patch changes it to properly update as soon as it knows about a new executor.

## How was this patch tested?

added a unit test, ran everything via jenkins.

Author: Imran Rashid <irashid@cloudera.com>

Closes #13826 from squito/SPARK-16106_executorByHosts.
2016-06-27 16:38:03 -05:00
Imran Rashid 282158914d [SPARK-16136][CORE] Fix flaky TaskManagerSuite
## What changes were proposed in this pull request?

TaskManagerSuite "Kill other task attempts when one attempt belonging to the same task succeeds" was flaky.  When checking whether a task is speculatable, at least one millisecond must pass since the task was submitted.  Use a manual clock to avoid the problem.

I noticed these tests were leaving lots of threads lying around as well (which prevented me from running the test repeatedly), so I fixed that too.

## How was this patch tested?

Ran the test 1k times on my laptop, passed every time (it failed about 20% of the time before this).

Author: Imran Rashid <irashid@cloudera.com>

Closes #13848 from squito/fix_flaky_taskmanagersuite.
2016-06-27 16:28:59 -05:00
Sital Kedia bf665a9586 [SPARK-15958] Make initial buffer size for the Sorter configurable
## What changes were proposed in this pull request?

Currently the initial buffer size in the sorter is hard coded inside the code and is too small for large workload. As a result, the sorter spends significant time expanding the buffer size and copying the data. It would be useful to have it configurable.

## How was this patch tested?

Tested by running a job on the cluster.

Author: Sital Kedia <skedia@fb.com>

Closes #13699 from sitalkedia/config_sort_buffer_upstream.
2016-06-25 09:13:39 +01:00
Liwei Lin a4851ed050 [SPARK-15963][CORE] Catch TaskKilledException correctly in Executor.TaskRunner
## The problem

Before this change, if either of the following cases happened to a task , the task would be marked as `FAILED` instead of `KILLED`:
- the task was killed before it was deserialized
- `executor.kill()` marked `taskRunner.killed`, but before calling `task.killed()` the worker thread threw the `TaskKilledException`

The reason is, in the `catch` block of the current [Executor.TaskRunner](https://github.com/apache/spark/blob/master/core/src/main/scala/org/apache/spark/executor/Executor.scala#L362)'s implementation, we are mistakenly catching:
```scala
case _: TaskKilledException | _: InterruptedException if task.killed => ...
```
the semantics of which is:
- **(**`TaskKilledException` **OR** `InterruptedException`**)** **AND** `task.killed`

Then when `TaskKilledException` is thrown but `task.killed` is not marked, we would mark the task as `FAILED` (which should really be `KILLED`).

## What changes were proposed in this pull request?

This patch alters the catch condition's semantics from:
- **(**`TaskKilledException` **OR** `InterruptedException`**)** **AND** `task.killed`

to

- `TaskKilledException` **OR** **(**`InterruptedException` **AND** `task.killed`**)**

so that we can catch `TaskKilledException` correctly and mark the task as `KILLED` correctly.

## How was this patch tested?

Added unit test which failed before the change, ran new test 1000 times manually

Author: Liwei Lin <lwlin7@gmail.com>

Closes #13685 from lw-lin/fix-task-killed.
2016-06-24 10:09:04 -05:00
peng.zhang f4fd7432fb [SPARK-16125][YARN] Fix not test yarn cluster mode correctly in YarnClusterSuite
## What changes were proposed in this pull request?

Since SPARK-13220(Deprecate "yarn-client" and "yarn-cluster"), YarnClusterSuite doesn't test "yarn cluster" mode correctly.
This pull request fixes it.

## How was this patch tested?
Unit test

(If this patch involves UI changes, please attach a screenshot; otherwise, remove this)

Author: peng.zhang <peng.zhang@xiaomi.com>

Closes #13836 from renozhang/SPARK-16125-test-yarn-cluster-mode.
2016-06-24 08:28:32 +01:00
Ryan Blue 738f134bf4 [SPARK-13723][YARN] Change behavior of --num-executors with dynamic allocation.
## What changes were proposed in this pull request?

This changes the behavior of --num-executors and spark.executor.instances when using dynamic allocation. Instead of turning dynamic allocation off, it uses the value for the initial number of executors.

This changes was discussed on [SPARK-13723](https://issues.apache.org/jira/browse/SPARK-13723). I highly recommend using it while we can change the behavior for 2.0.0. In practice, the 1.x behavior causes unexpected behavior for users (it is not clear that it disables dynamic allocation) and wastes cluster resources because users rarely notice the log message.

## How was this patch tested?

This patch updates tests and adds a test for Utils.getDynamicAllocationInitialExecutors.

Author: Ryan Blue <blue@apache.org>

Closes #13338 from rdblue/SPARK-13723-num-executors-with-dynamic-allocation.
2016-06-23 14:03:46 -05:00
Dongjoon Hyun 5eef1e6c6a [SPARK-15660][CORE] Update RDD variance/stdev description and add popVariance/popStdev
## What changes were proposed in this pull request?

In Spark-11490, `variance/stdev` are redefined as the **sample** `variance/stdev` instead of population ones. This PR updates the other old documentations to prevent users from misunderstanding. This will update the following Scala/Java API docs.

- http://spark.apache.org/docs/2.0.0-preview/api/scala/index.html#org.apache.spark.api.java.JavaDoubleRDD
- http://spark.apache.org/docs/2.0.0-preview/api/scala/index.html#org.apache.spark.rdd.DoubleRDDFunctions
- http://spark.apache.org/docs/2.0.0-preview/api/scala/index.html#org.apache.spark.util.StatCounter
- http://spark.apache.org/docs/2.0.0-preview/api/java/org/apache/spark/api/java/JavaDoubleRDD.html
- http://spark.apache.org/docs/2.0.0-preview/api/java/org/apache/spark/rdd/DoubleRDDFunctions.html
- http://spark.apache.org/docs/2.0.0-preview/api/java/org/apache/spark/util/StatCounter.html

Also, this PR adds them `popVariance` and `popStdev` functions clearly.

## How was this patch tested?

Pass the updated Jenkins tests.

Author: Dongjoon Hyun <dongjoon@apache.org>

Closes #13403 from dongjoon-hyun/SPARK-15660.
2016-06-23 11:07:34 +01:00
Eric Liang 6f915c9ec2 [SPARK-16003] SerializationDebugger runs into infinite loop
## What changes were proposed in this pull request?

This fixes SerializationDebugger to not recurse forever when `writeReplace` returns an object of the same class, which is the case for at least the `SQLMetrics` class.

See also the OpenJDK unit tests on the behavior of recursive `writeReplace()`:
f4d80957e8/test/java/io/Serializable/nestedReplace/NestedReplace.java

cc davies cloud-fan

## How was this patch tested?

Unit tests for SerializationDebugger.

Author: Eric Liang <ekl@databricks.com>

Closes #13814 from ericl/spark-16003.
2016-06-22 12:12:34 -07:00
Imran Rashid cf1995a976 [SPARK-15783][CORE] Fix Flakiness in BlacklistIntegrationSuite
## What changes were proposed in this pull request?

Three changes here -- first two were causing failures w/ BlacklistIntegrationSuite

1. The testing framework didn't include the reviveOffers thread, so the test which involved delay scheduling might never submit offers late enough for the delay scheduling to kick in.  So added in the periodic revive offers, just like the real scheduler.

2. `assertEmptyDataStructures` would occasionally fail, because it appeared there was still an active job.  This is because in DAGScheduler, the jobWaiter is notified of the job completion before the data structures are cleaned up.  Most of the time the test code that is waiting on the jobWaiter won't become active until after the data structures are cleared, but occasionally the race goes the other way, and the assertions fail.

3. `DAGSchedulerSuite` was not stopping all the inner parts it was setting up, so each test was leaking a number of threads.  So we stop those parts too.

4. Turns out that `assertMapOutputAvailable` is not terribly useful in this framework -- most of the places I was trying to use it suffer from some race.

5. When there is an exception in the backend, try to improve the error msg a little bit.  Before the exception was printed to the console, but the test would fail w/ a timeout, and the logs wouldn't show anything.

## How was this patch tested?

I ran all the tests in `BlacklistIntegrationSuite` 5k times and everything in `DAGSchedulerSuite` 1k times on my laptop.  Also I ran a full jenkins build with `BlacklistIntegrationSuite` 500 times and `DAGSchedulerSuite` 50 times, see https://github.com/apache/spark/pull/13548.  (I tried more times but jenkins timed out.)

To check for more leaked threads, I added some code to dump the list of all threads at the end of each test in DAGSchedulerSuite, which is how I discovered the mapOutputTracker and eventLoop were leaking threads.  (I removed that code from the final pr, just part of the testing.)

And I'll run Jenkins on this a couple of times to do one more check.

Author: Imran Rashid <irashid@cloudera.com>

Closes #13565 from squito/blacklist_extra_tests.
2016-06-22 08:35:41 -05:00
Shixiong Zhu 62d8fe2089 [SPARK-16017][CORE] Send hostname from CoarseGrainedExecutorBackend to driver
## What changes were proposed in this pull request?

[SPARK-15395](https://issues.apache.org/jira/browse/SPARK-15395) changes the behavior that how the driver gets the executor host and the driver will get the executor IP address instead of the host name. This PR just sends the hostname from executors to driver so that driver can pass it to TaskScheduler.

## How was this patch tested?

Existing unit tests.

Author: Shixiong Zhu <shixiong@databricks.com>

Closes #13741 from zsxwing/SPARK-16017.
2016-06-17 15:48:17 -07:00
Kay Ousterhout c8809db5a5 [SPARK-15926] Improve readability of DAGScheduler stage creation methods
## What changes were proposed in this pull request?

This pull request refactors parts of the DAGScheduler to improve readability, focusing on the code around stage creation.  One goal of this change it to make it clearer which functions may create new stages (as opposed to looking up stages that already exist).  There are no functionality changes in this pull request.  In more detail:

* shuffleToMapStage was renamed to shuffleIdToMapStage (when reading the existing code I have sometimes struggled to remember what the key is -- is it a stage? A stage id? This change is intended to avoid that confusion)
* Cleaned up the code to create shuffle map stages.  Previously, creating a shuffle map stage involved 3 different functions (newOrUsedShuffleStage, newShuffleMapStage, and getShuffleMapStage), and it wasn't clear what the purpose of each function was.  With the new code, a single function (getOrCreateShuffleMapStage) is responsible for getting a stage (if it already exists) or creating new shuffle map stages and any missing ancestor stages, and it delegates to createShuffleMapStage when new stages need to be created.  There's some remaining confusion here because the getOrCreateParentStages call in createShuffleMapStage may recursively create ancestor stages; this is an issue I plan to fix in a future pull request, because it's trickier to fix and involves a slight functionality change.
* newResultStage was renamed to createResultStage, for consistency with naming around shuffle map stages.
* getParentStages has been renamed to getOrCreateParentStages, to make it clear that this function will sometimes create missing ancestor stages.
* The only *slight* functionality change is that on line 478, updateJobIdStageIdMaps now uses a stage's parents instance variable rather than re-calculating them (I couldn't see any reason why they'd need to be re-calculated, and suspect this is just leftover from older code).
* getAncestorShuffleDependencies was renamed to getMissingAncestorShuffleDependencies, to make it clear that this only returns dependencies that have not yet been run.

cc squito markhamstra JoshRosen (who requested more DAG scheduler commenting long ago -- an issue this pull request tries, in part, to address)

FYI rxin

Author: Kay Ousterhout <kayousterhout@gmail.com>

Closes #13677 from kayousterhout/SPARK-15926.
2016-06-17 12:12:46 -07:00
Nezih Yigitbasi 63470afc99 [SPARK-15782][YARN] Fix spark.jars and spark.yarn.dist.jars handling
When `--packages` is specified with spark-shell the classes from those packages cannot be found, which I think is due to some of the changes in SPARK-12343.

Tested manually with both scala 2.10 and 2.11 repls.

vanzin davies can you guys please review?

Author: Marcelo Vanzin <vanzin@cloudera.com>
Author: Nezih Yigitbasi <nyigitbasi@netflix.com>

Closes #13709 from nezihyigitbasi/SPARK-15782.
2016-06-16 18:20:16 -07:00
Sean Owen 457126e420 [SPARK-15796][CORE] Reduce spark.memory.fraction default to avoid overrunning old gen in JVM default config
## What changes were proposed in this pull request?

Reduce `spark.memory.fraction` default to 0.6 in order to make it fit within default JVM old generation size (2/3 heap). See JIRA discussion. This means a full cache doesn't spill into the new gen. CC andrewor14

## How was this patch tested?

Jenkins tests.

Author: Sean Owen <sowen@cloudera.com>

Closes #13618 from srowen/SPARK-15796.
2016-06-16 23:04:10 +02:00
Davies Liu a153e41c08 Revert "[SPARK-15782][YARN] Set spark.jars system property in client mode"
This reverts commit 4df8df5c2e.
2016-06-15 15:55:07 -07:00
Imran Rashid cafc696d09 [HOTFIX][CORE] fix flaky BasicSchedulerIntegrationTest
## What changes were proposed in this pull request?

SPARK-15927 exacerbated a race in BasicSchedulerIntegrationTest, so it went from very unlikely to fairly frequent.  The issue is that stage numbering is not completely deterministic, but these tests treated it like it was.  So turn off the tests.

## How was this patch tested?

on my laptop the test failed abotu 10% of the time before this change, and didn't fail in 500 runs after the change.

Author: Imran Rashid <irashid@cloudera.com>

Closes #13688 from squito/hotfix_basic_scheduler.
2016-06-15 16:44:18 -05:00
Nezih Yigitbasi 4df8df5c2e [SPARK-15782][YARN] Set spark.jars system property in client mode
## What changes were proposed in this pull request?

When `--packages` is specified with `spark-shell` the classes from those packages cannot be found, which I think is due to some of the changes in `SPARK-12343`. In particular `SPARK-12343` removes a line that sets the `spark.jars` system property in client mode, which is used by the repl main class to set the classpath.

## How was this patch tested?

Tested manually.

This system property is used by the repl to populate its classpath. If
this is not set properly the classes for external packages cannot be
found.

tgravescs vanzin as you may be familiar with this part of the code.

Author: Nezih Yigitbasi <nyigitbasi@netflix.com>

Closes #13527 from nezihyigitbasi/repl-fix.
2016-06-15 14:07:36 -07:00
Tejas Patil 279bd4aa5f [SPARK-15826][CORE] PipedRDD to allow configurable char encoding
## What changes were proposed in this pull request?

Link to jira which describes the problem: https://issues.apache.org/jira/browse/SPARK-15826

The fix in this PR is to allow users specify encoding in the pipe() operation. For backward compatibility,
keeping the default value to be system default.

## How was this patch tested?

Ran existing unit tests

Author: Tejas Patil <tejasp@fb.com>

Closes #13563 from tejasapatil/pipedrdd_utf8.
2016-06-15 12:03:00 -07:00
Liwei Lin 9b234b55d1 [SPARK-15518][CORE][FOLLOW-UP] Rename LocalSchedulerBackendEndpoint -> LocalSchedulerBackend
## What changes were proposed in this pull request?

This patch is a follow-up to https://github.com/apache/spark/pull/13288 completing the renaming:
 - LocalScheduler -> LocalSchedulerBackend~~Endpoint~~

## How was this patch tested?

Updated test cases to reflect the name change.

Author: Liwei Lin <lwlin7@gmail.com>

Closes #13683 from lw-lin/rename-backend.
2016-06-15 11:52:36 -07:00
Kay Ousterhout 5d50d4f0f9 [SPARK-15927] Eliminate redundant DAGScheduler code.
To try to eliminate redundant code to traverse the RDD dependency graph,
this PR creates a new function getShuffleDependencies that returns
shuffle dependencies that are immediate parents of a given RDD.  This
new function is used by getParentStages and
getAncestorShuffleDependencies.

Author: Kay Ousterhout <kayousterhout@gmail.com>

Closes #13646 from kayousterhout/SPARK-15927.
2016-06-14 17:27:01 -07:00
Sean Owen 6151d2641f [MINOR] Clean up several build warnings, mostly due to internal use of old accumulators
## What changes were proposed in this pull request?

Another PR to clean up recent build warnings. This particularly cleans up several instances of the old accumulator API usage in tests that are straightforward to update. I think this qualifies as "minor".

## How was this patch tested?

Jenkins

Author: Sean Owen <sowen@cloudera.com>

Closes #13642 from srowen/BuildWarnings.
2016-06-14 09:40:07 -07:00
Sean Owen 0a6f090837 [SPARK-15876][CORE] Remove support for "zk://" master URL
## What changes were proposed in this pull request?

Remove deprecated support for `zk://` master (`mesos://zk//` remains supported)

## How was this patch tested?

Jenkins

Author: Sean Owen <sowen@cloudera.com>

Closes #13625 from srowen/SPARK-15876.
2016-06-12 11:46:33 -07:00
Sean Owen f51dfe616b [SPARK-15086][CORE][STREAMING] Deprecate old Java accumulator API
## What changes were proposed in this pull request?

- Deprecate old Java accumulator API; should use Scala now
- Update Java tests and examples
- Don't bother testing old accumulator API in Java 8 (too)
- (fix a misspelling too)

## How was this patch tested?

Jenkins tests

Author: Sean Owen <sowen@cloudera.com>

Closes #13606 from srowen/SPARK-15086.
2016-06-12 11:44:33 -07:00
Imran Rashid 8cc22b0085 [SPARK-15878][CORE][TEST] fix cleanup in EventLoggingListenerSuite and ReplayListenerSuite
## What changes were proposed in this pull request?

These tests weren't properly using `LocalSparkContext` so weren't cleaning up correctly when tests failed.

## How was this patch tested?

Jenkins.

Author: Imran Rashid <irashid@cloudera.com>

Closes #13602 from squito/SPARK-15878_cleanup_replaylistener.
2016-06-12 12:54:57 +01:00
Eric Liang e1f986c7a3 [SPARK-15860] Metrics for codegen size and perf
## What changes were proposed in this pull request?

Adds codahale metrics for the codegen source text size and how long it takes to compile. The size is particularly interesting, since the JVM does have hard limits on how large methods can get.

To simplify, I added the metrics under a statically-initialized source that is always registered with SparkEnv.

## How was this patch tested?

Unit tests

Author: Eric Liang <ekl@databricks.com>

Closes #13586 from ericl/spark-15860.
2016-06-11 23:16:21 -07:00
Eric Liang c06c58bbbb [SPARK-14851][CORE] Support radix sort with nullable longs
## What changes were proposed in this pull request?

This adds support for radix sort of nullable long fields. When a sort field is null and radix sort is enabled, we keep nulls in a separate region of the sort buffer so that radix sort does not need to deal with them. This also has performance benefits when sorting smaller integer types, since the current representation of nulls in two's complement (Long.MIN_VALUE) otherwise forces a full-width radix sort.

This strategy for nulls does mean the sort is no longer stable. cc davies

## How was this patch tested?

Existing randomized sort tests for correctness. I also tested some TPCDS queries and there does not seem to be any significant regression for non-null sorts.

Some test queries (best of 5 runs each).
Before change:
scala> val start = System.nanoTime; spark.range(5000000).selectExpr("if(id > 5, cast(hash(id) as long), NULL) as h").coalesce(1).orderBy("h").collect(); (System.nanoTime - start) / 1e6
start: Long = 3190437233227987
res3: Double = 4716.471091

After change:
scala> val start = System.nanoTime; spark.range(5000000).selectExpr("if(id > 5, cast(hash(id) as long), NULL) as h").coalesce(1).orderBy("h").collect(); (System.nanoTime - start) / 1e6
start: Long = 3190367870952791
res4: Double = 2981.143045

Author: Eric Liang <ekl@databricks.com>

Closes #13161 from ericl/sc-2998.
2016-06-11 15:42:58 -07:00
Reynold Xin 254bc8c34e [SPARK-15866] Rename listAccumulator collectionAccumulator
## What changes were proposed in this pull request?
SparkContext.listAccumulator, by Spark's convention, makes it sound like "list" is a verb and the method should return a list of accumulators. This patch renames the method and the class collection accumulator.

## How was this patch tested?
Updated test case to reflect the names.

Author: Reynold Xin <rxin@databricks.com>

Closes #13594 from rxin/SPARK-15866.
2016-06-10 11:08:39 -07:00
Eric Liang b914e1930f [SPARK-15794] Should truncate toString() of very wide plans
## What changes were proposed in this pull request?

With very wide tables, e.g. thousands of fields, the plan output is unreadable and often causes OOMs due to inefficient string processing. This truncates all struct and operator field lists to a user configurable threshold to limit performance impact.

It would also be nice to optimize string generation to avoid these sort of O(n^2) slowdowns entirely (i.e. use StringBuilder everywhere including expressions), but this is probably too large of a change for 2.0 at this point, and truncation has other benefits for usability.

## How was this patch tested?

Added a microbenchmark that covers this case particularly well. I also ran the microbenchmark while varying the truncation threshold.

```
numFields = 5
wide shallowly nested struct field r/w:  Best/Avg Time(ms)    Rate(M/s)   Per Row(ns)   Relative
------------------------------------------------------------------------------------------------
2000 wide x 50 rows (write in-mem)            2336 / 2558          0.0       23364.4       0.1X

numFields = 25
wide shallowly nested struct field r/w:  Best/Avg Time(ms)    Rate(M/s)   Per Row(ns)   Relative
------------------------------------------------------------------------------------------------
2000 wide x 50 rows (write in-mem)            4237 / 4465          0.0       42367.9       0.1X

numFields = 100
wide shallowly nested struct field r/w:  Best/Avg Time(ms)    Rate(M/s)   Per Row(ns)   Relative
------------------------------------------------------------------------------------------------
2000 wide x 50 rows (write in-mem)          10458 / 11223          0.0      104582.0       0.0X

numFields = Infinity
wide shallowly nested struct field r/w:  Best/Avg Time(ms)    Rate(M/s)   Per Row(ns)   Relative
------------------------------------------------------------------------------------------------
[info]   java.lang.OutOfMemoryError: Java heap space
```

Author: Eric Liang <ekl@databricks.com>
Author: Eric Liang <ekhliang@gmail.com>

Closes #13537 from ericl/truncated-string.
2016-06-09 18:05:16 -07:00
Imran Rashid 36d3dfa59a [SPARK-15783][CORE] still some flakiness in these blacklist tests so ignore for now
## What changes were proposed in this pull request?

There is still some flakiness in BlacklistIntegrationSuite, so turning it off for the moment to avoid breaking more builds -- will turn it back with more fixes.

## How was this patch tested?

jenkins.

Author: Imran Rashid <irashid@cloudera.com>

Closes #13528 from squito/ignore_blacklist.
2016-06-06 12:53:11 -07:00
Zheng RuiFeng fd8af39713 [MINOR] Fix Typos 'an -> a'
## What changes were proposed in this pull request?

`an -> a`

Use cmds like `find . -name '*.R' | xargs -i sh -c "grep -in ' an [^aeiou]' {} && echo {}"` to generate candidates, and review them one by one.

## How was this patch tested?
manual tests

Author: Zheng RuiFeng <ruifengz@foxmail.com>

Closes #13515 from zhengruifeng/an_a.
2016-06-06 09:35:47 +01:00
Brett Randall 4e767d0f90 [SPARK-15723] Fixed local-timezone-brittle test where short-timezone form "EST" is …
## What changes were proposed in this pull request?

Stop using the abbreviated and ambiguous timezone "EST" in a test, since it is machine-local default timezone dependent, and fails in different timezones.  Fixed [SPARK-15723](https://issues.apache.org/jira/browse/SPARK-15723).

## How was this patch tested?

Note that to reproduce this problem in any locale/timezone, you can modify the scalatest-maven-plugin argLine to add a timezone:

    <argLine>-ea -Xmx3g -XX:MaxPermSize=${MaxPermGen} -XX:ReservedCodeCacheSize=${CodeCacheSize} -Duser.timezone="Australia/Sydney"</argLine>

and run

    $ mvn test -DwildcardSuites=org.apache.spark.status.api.v1.SimpleDateParamSuite -Dtest=none. Equally this will fix it in an effected timezone:

    <argLine>-ea -Xmx3g -XX:MaxPermSize=${MaxPermGen} -XX:ReservedCodeCacheSize=${CodeCacheSize} -Duser.timezone="America/New_York"</argLine>

To test the fix, apply the above change to `pom.xml` to set test TZ to `Australia/Sydney`, and confirm the test now passes.

Author: Brett Randall <javabrett@gmail.com>

Closes #13462 from javabrett/SPARK-15723-SimpleDateParamSuite.
2016-06-05 15:31:56 +01:00
Davies Liu 3074f575a3 [SPARK-15391] [SQL] manage the temporary memory of timsort
## What changes were proposed in this pull request?

Currently, the memory for temporary buffer used by TimSort is always allocated as on-heap without bookkeeping, it could cause OOM both in on-heap and off-heap mode.

This PR will try to manage that by preallocate it together with the pointer array, same with RadixSort. It both works for on-heap and off-heap mode.

This PR also change the loadFactor of BytesToBytesMap to 0.5 (it was 0.70), it enables use to radix sort also makes sure that we have enough memory for timsort.

## How was this patch tested?

Existing tests.

Author: Davies Liu <davies@databricks.com>

Closes #13318 from davies/fix_timsort.
2016-06-03 16:45:09 -07:00
Xin Wu 28ad0f7b0d [SPARK-15681][CORE] allow lowercase or mixed case log level string when calling sc.setLogLevel
## What changes were proposed in this pull request?
Currently `SparkContext API setLogLevel(level: String) `can not handle lower case or mixed case input string. But `org.apache.log4j.Level.toLevel` can take lowercase or mixed case.

This PR is to allow case-insensitive user input for the log level.

## How was this patch tested?
A unit testcase is added.

Author: Xin Wu <xinwu@us.ibm.com>

Closes #13422 from xwu0226/reset_loglevel.
2016-06-03 14:26:48 -07:00
Imran Rashid c2f0cb4f63 [SPARK-15714][CORE] Fix flaky o.a.s.scheduler.BlacklistIntegrationSuite
## What changes were proposed in this pull request?

BlacklistIntegrationSuite (introduced by SPARK-10372) is a bit flaky because of some race conditions:
1. Failed jobs might have non-empty results, because the resultHandler will be invoked for successful tasks (if there are task successes before failures)
2. taskScheduler.taskIdToTaskSetManager must be protected by a lock on taskScheduler

(1) has failed a handful of jenkins builds recently.  I don't think I've seen (2) in jenkins, but I've run into with some uncommitted tests I'm working on where there are lots more tasks.

While I was in there, I also made an unrelated fix to `runningTasks`in the test framework -- there was a pointless `O(n)` operation to remove completed tasks, could be `O(1)`.

## How was this patch tested?

I modified the o.a.s.scheduler.BlacklistIntegrationSuite to have it run the tests 1k times on my laptop.  It failed 11 times before this change, and none with it.  (Pretty sure all the failures were problem (1), though I didn't check all of them).

Also the full suite of tests via jenkins.

Author: Imran Rashid <irashid@cloudera.com>

Closes #13454 from squito/SPARK-15714.
2016-06-03 11:49:33 -05:00
Josh Rosen 229f902257 [SPARK-15736][CORE] Gracefully handle loss of DiskStore files
If an RDD partition is cached on disk and the DiskStore file is lost, then reads of that cached partition will fail and the missing partition is supposed to be recomputed by a new task attempt. In the current BlockManager implementation, however, the missing file does not trigger any metadata updates / does not invalidate the cache, so subsequent task attempts will be scheduled on the same executor and the doomed read will be repeatedly retried, leading to repeated task failures and eventually a total job failure.

In order to fix this problem, the executor with the missing file needs to properly mark the corresponding block as missing so that it stops advertising itself as a cache location for that block.

This patch fixes this bug and adds an end-to-end regression test (in `FailureSuite`) and a set of unit tests (`in BlockManagerSuite`).

Author: Josh Rosen <joshrosen@databricks.com>

Closes #13473 from JoshRosen/handle-missing-cache-files.
2016-06-02 17:36:31 -07:00
hyukjinkwon 252417fa21 [SPARK-15322][SQL][FOLLOWUP] Use the new long accumulator for old int accumulators.
## What changes were proposed in this pull request?

This PR corrects the remaining cases for using old accumulators.

This does not change some old accumulator usages below:

- `ImplicitSuite.scala` - Tests dedicated to old accumulator, for implicits with `AccumulatorParam`

- `AccumulatorSuite.scala` -  Tests dedicated to old accumulator

- `JavaSparkContext.scala` - For supporting old accumulators for Java API.

- `debug.package.scala` - Usage with `HashSet[String]`. Currently, it seems no implementation for this. I might be able to write an anonymous class for this but I didn't because I think it is not worth writing a lot of codes only for this.

- `SQLMetricsSuite.scala` - This uses the old accumulator for checking type boxing. It seems new accumulator does not require type boxing for this case whereas the old one requires (due to the use of generic).

## How was this patch tested?

Existing tests cover this.

Author: hyukjinkwon <gurwls223@gmail.com>

Closes #13434 from HyukjinKwon/accum.
2016-06-02 11:16:24 -05:00
Tejas Patil ac38bdc756 [SPARK-15601][CORE] CircularBuffer's toString() to print only the contents written if buffer isn't full
## What changes were proposed in this pull request?

1. The class allocated 4x space than needed as it was using `Int` to store the `Byte` values

2. If CircularBuffer isn't full, currently toString() will print some garbage chars along with the content written as is tries to print the entire array allocated for the buffer. The fix is to keep track of buffer getting full and don't print the tail of the buffer if it isn't full (suggestion by sameeragarwal over https://github.com/apache/spark/pull/12194#discussion_r64495331)

3. Simplified `toString()`

## How was this patch tested?

Added new test case

Author: Tejas Patil <tejasp@fb.com>

Closes #13351 from tejasapatil/circular_buffer.
2016-05-31 19:52:22 -05:00
Devaraj K 5b21139dbf [SPARK-10530][CORE] Kill other task attempts when one taskattempt belonging the same task is succeeded in speculation
## What changes were proposed in this pull request?

With this patch, TaskSetManager kills other running attempts when any one of the attempt succeeds for the same task. Also killed tasks will not be considered as failed tasks and they get listed separately in the UI and also shows the task state as KILLED instead of FAILED.

## How was this patch tested?

core\src\test\scala\org\apache\spark\ui\jobs\JobProgressListenerSuite.scala
core\src\test\scala\org\apache\spark\util\JsonProtocolSuite.scala

I have verified this patch manually by enabling spark.speculation as true, when any attempt gets succeeded then other running attempts are getting killed for the same task and other pending tasks are getting assigned in those. And also when any attempt gets killed then they are considered as KILLED tasks and not considered as FAILED tasks. Please find the attached screen shots for the reference.

![stage-tasks-table](https://cloud.githubusercontent.com/assets/3174804/14075132/394c6a12-f4f4-11e5-8638-20ff7b8cc9bc.png)
![stages-table](https://cloud.githubusercontent.com/assets/3174804/14075134/3b60f412-f4f4-11e5-9ea6-dd0dcc86eb03.png)

Ref : https://github.com/apache/spark/pull/11916

Author: Devaraj K <devaraj@apache.org>

Closes #11996 from devaraj-kavali/SPARK-10530.
2016-05-30 14:29:27 -07:00
Reynold Xin 73178c7556 [SPARK-15633][MINOR] Make package name for Java tests consistent
## What changes were proposed in this pull request?
This is a simple patch that makes package names for Java 8 test suites consistent. I moved everything to test.org.apache.spark to we can test package private APIs properly. Also added "java8" as the package name so we can easily run all the tests related to Java 8.

## How was this patch tested?
This is a test only change.

Author: Reynold Xin <rxin@databricks.com>

Closes #13364 from rxin/SPARK-15633.
2016-05-27 21:20:02 -07:00
dding3 88c9c467a3 [SPARK-15562][ML] Delete temp directory after program exit in DataFrameExample
## What changes were proposed in this pull request?
Temp directory used to save records is not deleted after program exit in DataFrameExample. Although it called deleteOnExit, it doesn't work as the directory is not empty. Similar things happend in ContextCleanerSuite. Update the code to make sure temp directory is deleted after program exit.

## How was this patch tested?

unit tests and local build.

Author: dding3 <ding.ding@intel.com>

Closes #13328 from dding3/master.
2016-05-27 21:01:50 -05:00
Sital Kedia ce756daa4f [SPARK-15569] Reduce frequency of updateBytesWritten function in Disk…
## What changes were proposed in this pull request?

Profiling a Spark job spilling large amount of intermediate data we found that significant portion of time is being spent in DiskObjectWriter.updateBytesWritten function. Looking at the code, we see that the function is being called too frequently to update the number of bytes written to disk. We should reduce the frequency to avoid this.

## How was this patch tested?

Tested by running the job on cluster and saw 20% CPU gain  by this change.

Author: Sital Kedia <skedia@fb.com>

Closes #13332 from sitalkedia/DiskObjectWriter.
2016-05-27 11:22:39 -07:00
Sameer Agarwal fe6de16f78 [SPARK-8428][SPARK-13850] Fix integer overflows in TimSort
## What changes were proposed in this pull request?

This patch fixes a few integer overflows in `UnsafeSortDataFormat.copyRange()` and `ShuffleSortDataFormat copyRange()` that seems to be the most likely cause behind a number of `TimSort` contract violation errors seen in Spark 2.0 and Spark 1.6 while sorting large datasets.

## How was this patch tested?

Added a test in `ExternalSorterSuite` that instantiates a large array of the form of [150000000, 150000001, 150000002, ...., 300000000, 0, 1, 2, ..., 149999999] that triggers a `copyRange` in `TimSort.mergeLo` or `TimSort.mergeHi`. Note that the input dataset should contain at least 268.43 million rows with a certain data distribution for an overflow to occur.

Author: Sameer Agarwal <sameer@databricks.com>

Closes #13336 from sameeragarwal/timsort-bug.
2016-05-26 15:49:16 -07:00
Imran Rashid dfc9fc02cc [SPARK-10372] [CORE] basic test framework for entire spark scheduler
This is a basic framework for testing the entire scheduler.  The tests this adds aren't very interesting -- the point of this PR is just to setup the framework, to keep the initial change small, but it can be built upon to test more features (eg., speculation, killing tasks, blacklisting, etc.).

Author: Imran Rashid <irashid@cloudera.com>

Closes #8559 from squito/SPARK-10372-scheduler-integs.
2016-05-26 00:29:09 -05:00
Takuya UESHIN 698ef762f8 [SPARK-14269][SCHEDULER] Eliminate unnecessary submitStage() call.
## What changes were proposed in this pull request?

Currently a method `submitStage()` for waiting stages is called on every iteration of the event loop in `DAGScheduler` to submit all waiting stages, but most of them are not necessary because they are not related to Stage status.
The case we should try to submit waiting stages is only when their parent stages are successfully completed.

This elimination can improve `DAGScheduler` performance.

## How was this patch tested?

Added some checks and other existing tests, and our projects.

We have a project bottle-necked by `DAGScheduler`, having about 2000 stages.

Before this patch the almost all execution time in `Driver` process was spent to process `submitStage()` of `dag-scheduler-event-loop` thread but after this patch the performance was improved as follows:

|        | total execution time | `dag-scheduler-event-loop` thread time | `submitStage()` |
|--------|---------------------:|---------------------------------------:|----------------:|
| Before |              760 sec |                                710 sec |         667 sec |
| After  |              440 sec |                                 14 sec |          10 sec |

Author: Takuya UESHIN <ueshin@happy-camper.st>

Closes #12060 from ueshin/issues/SPARK-14269.
2016-05-25 13:57:25 -07:00
Lukasz b120fba6ae [SPARK-9044] Fix "Storage" tab in UI so that it reflects RDD name change.
## What changes were proposed in this pull request?

1. Making 'name' field of RDDInfo mutable.
2. In StorageListener: catching the fact that RDD's name was changed and updating it in RDDInfo.

## How was this patch tested?

1. Manual verification - the 'Storage' tab now behaves as expected.
2. The commit also contains a new unit test which verifies this.

Author: Lukasz <lgieron@gmail.com>

Closes #13264 from lgieron/SPARK-9044.
2016-05-25 10:24:21 -07:00
Reynold Xin 14494da87b [SPARK-15518] Rename various scheduler backend for consistency
## What changes were proposed in this pull request?
This patch renames various scheduler backends to make them consistent:

- LocalScheduler -> LocalSchedulerBackend
- AppClient -> StandaloneAppClient
- AppClientListener -> StandaloneAppClientListener
- SparkDeploySchedulerBackend -> StandaloneSchedulerBackend
- CoarseMesosSchedulerBackend -> MesosCoarseGrainedSchedulerBackend
- MesosSchedulerBackend -> MesosFineGrainedSchedulerBackend

## How was this patch tested?
Updated test cases to reflect the name change.

Author: Reynold Xin <rxin@databricks.com>

Closes #13288 from rxin/SPARK-15518.
2016-05-24 20:55:47 -07:00
Dongjoon Hyun f08bf587b1 [SPARK-15512][CORE] repartition(0) should raise IllegalArgumentException
## What changes were proposed in this pull request?

Previously, SPARK-8893 added the constraints on positive number of partitions for repartition/coalesce operations in general. This PR adds one missing part for that and adds explicit two testcases.

**Before**
```scala
scala> sc.parallelize(1 to 5).coalesce(0)
java.lang.IllegalArgumentException: requirement failed: Number of partitions (0) must be positive.
...
scala> sc.parallelize(1 to 5).repartition(0).collect()
res1: Array[Int] = Array()   // empty
scala> spark.sql("select 1").coalesce(0)
res2: org.apache.spark.sql.Dataset[org.apache.spark.sql.Row] = [1: int]
scala> spark.sql("select 1").coalesce(0).collect()
java.lang.IllegalArgumentException: requirement failed: Number of partitions (0) must be positive.
scala> spark.sql("select 1").repartition(0)
res3: org.apache.spark.sql.Dataset[org.apache.spark.sql.Row] = [1: int]
scala> spark.sql("select 1").repartition(0).collect()
res4: Array[org.apache.spark.sql.Row] = Array()  // empty
```

**After**
```scala
scala> sc.parallelize(1 to 5).coalesce(0)
java.lang.IllegalArgumentException: requirement failed: Number of partitions (0) must be positive.
...
scala> sc.parallelize(1 to 5).repartition(0)
java.lang.IllegalArgumentException: requirement failed: Number of partitions (0) must be positive.
...
scala> spark.sql("select 1").coalesce(0)
java.lang.IllegalArgumentException: requirement failed: Number of partitions (0) must be positive.
...
scala> spark.sql("select 1").repartition(0)
java.lang.IllegalArgumentException: requirement failed: Number of partitions (0) must be positive.
...
```

## How was this patch tested?

Pass the Jenkins tests with new testcases.

Author: Dongjoon Hyun <dongjoon@apache.org>

Closes #13282 from dongjoon-hyun/SPARK-15512.
2016-05-24 18:55:23 -07:00
Dongjoon Hyun be99a99fe7 [MINOR][CORE][TEST] Update obsolete takeSample test case.
## What changes were proposed in this pull request?

This PR fixes some obsolete comments and assertion in `takeSample` testcase of `RDDSuite.scala`.

## How was this patch tested?

This fixes the testcase only.

Author: Dongjoon Hyun <dongjoon@apache.org>

Closes #13260 from dongjoon-hyun/SPARK-15481.
2016-05-24 11:09:54 -07:00
Xin Wu 01659bc50c [SPARK-15431][SQL] Support LIST FILE(s)|JAR(s) command natively
## What changes were proposed in this pull request?
Currently command `ADD FILE|JAR <filepath | jarpath>` is supported natively in SparkSQL. However, when this command is run, the file/jar is added to the resources that can not be looked up by `LIST FILE(s)|JAR(s)` command because the `LIST` command is passed to Hive command processor in Spark-SQL or simply not supported in Spark-shell. There is no way users can find out what files/jars are added to the spark context.
Refer to [Hive commands](https://cwiki.apache.org/confluence/display/Hive/LanguageManual+Cli)

This PR is to support following commands:
`LIST (FILE[s] [filepath ...] | JAR[s] [jarfile ...])`

### For example:
##### LIST FILE(s)
```
scala> spark.sql("add file hdfs://bdavm009.svl.ibm.com:8020/tmp/test.txt")
res1: org.apache.spark.sql.DataFrame = []
scala> spark.sql("add file hdfs://bdavm009.svl.ibm.com:8020/tmp/test1.txt")
res2: org.apache.spark.sql.DataFrame = []

scala> spark.sql("list file hdfs://bdavm009.svl.ibm.com:8020/tmp/test1.txt").show(false)
+----------------------------------------------+
|result                                        |
+----------------------------------------------+
|hdfs://bdavm009.svl.ibm.com:8020/tmp/test1.txt|
+----------------------------------------------+

scala> spark.sql("list files").show(false)
+----------------------------------------------+
|result                                        |
+----------------------------------------------+
|hdfs://bdavm009.svl.ibm.com:8020/tmp/test1.txt|
|hdfs://bdavm009.svl.ibm.com:8020/tmp/test.txt |
+----------------------------------------------+
```

##### LIST JAR(s)
```
scala> spark.sql("add jar /Users/xinwu/spark/core/src/test/resources/TestUDTF.jar")
res9: org.apache.spark.sql.DataFrame = [result: int]

scala> spark.sql("list jar TestUDTF.jar").show(false)
+---------------------------------------------+
|result                                       |
+---------------------------------------------+
|spark://192.168.1.234:50131/jars/TestUDTF.jar|
+---------------------------------------------+

scala> spark.sql("list jars").show(false)
+---------------------------------------------+
|result                                       |
+---------------------------------------------+
|spark://192.168.1.234:50131/jars/TestUDTF.jar|
+---------------------------------------------+
```
## How was this patch tested?
New test cases are added for Spark-SQL, Spark-Shell and SparkContext API code path.

Author: Xin Wu <xinwu@us.ibm.com>
Author: xin Wu <xinwu@us.ibm.com>

Closes #13212 from xwu0226/list_command.
2016-05-23 17:32:01 -07:00
Shixiong Zhu 305263954a Fix the compiler error introduced by #13153 for Scala 2.10 2016-05-19 12:36:44 -07:00
Shixiong Zhu 4e3cb7a5d9 [SPARK-15317][CORE] Don't store accumulators for every task in listeners
## What changes were proposed in this pull request?

In general, the Web UI doesn't need to store the Accumulator/AccumulableInfo for every task. It only needs the Accumulator values.

In this PR, it creates new UIData classes to store the necessary fields and make `JobProgressListener` store only these new classes, so that `JobProgressListener` won't store Accumulator/AccumulableInfo and the size of `JobProgressListener` becomes pretty small. I also eliminates `AccumulableInfo` from `SQLListener` so that we don't keep any references for those unused `AccumulableInfo`s.

## How was this patch tested?

I ran two tests reported in JIRA locally:

The first one is:
```
val data = spark.range(0, 10000, 1, 10000)
data.cache().count()
```
The retained size of JobProgressListener decreases from 60.7M to 6.9M.

The second one is:
```
import org.apache.spark.ml.CC
import org.apache.spark.sql.SQLContext
val sqlContext = SQLContext.getOrCreate(sc)
CC.runTest(sqlContext)
```

This test won't cause OOM after applying this patch.

Author: Shixiong Zhu <shixiong@databricks.com>

Closes #13153 from zsxwing/memory.
2016-05-19 12:05:17 -07:00
Davies Liu ad182086cc [SPARK-15300] Fix writer lock conflict when remove a block
## What changes were proposed in this pull request?

A writer lock could be acquired when 1) create a new block 2) remove a block 3) evict a block to disk. 1) and 3) could happen in the same time within the same task, all of them could happen in the same time outside a task. It's OK that when someone try to grab the write block for a block, but the block is acquired by another one that has the same task attempt id.

This PR remove the check.

## How was this patch tested?

Updated existing tests.

Author: Davies Liu <davies@databricks.com>

Closes #13082 from davies/write_lock_conflict.
2016-05-19 11:47:17 -07:00
Sandeep Singh 3facca5152 [CORE][MINOR] Remove redundant set master in OutputCommitCoordinatorIntegrationSuite
## What changes were proposed in this pull request?
Remove redundant set master in OutputCommitCoordinatorIntegrationSuite, as we are already setting it in SparkContext below on line 43.

## How was this patch tested?
existing tests

Author: Sandeep Singh <sandeep@techaddict.me>

Closes #13168 from techaddict/minor-1.
2016-05-19 10:44:26 +01:00
Davies Liu 8fb1d1c7f3 [SPARK-15357] Cooperative spilling should check consumer memory mode
## What changes were proposed in this pull request?

Since we support forced spilling for Spillable, which only works in OnHeap mode, different from other SQL operators (could be OnHeap or OffHeap), we should considering the mode of consumer before calling trigger forced spilling.

## How was this patch tested?

Add new test.

Author: Davies Liu <davies@databricks.com>

Closes #13151 from davies/fix_mode.
2016-05-18 09:44:21 -07:00
WeichenXu 2f9047b5eb [SPARK-15322][MLLIB][CORE][SQL] update deprecate accumulator usage into accumulatorV2 in spark project
## What changes were proposed in this pull request?

I use Intellj-IDEA to search usage of deprecate SparkContext.accumulator in the whole spark project, and update the code.(except those test code for accumulator method itself)

## How was this patch tested?

Exisiting unit tests

Author: WeichenXu <WeichenXu123@outlook.com>

Closes #13112 from WeichenXu123/update_accuV2_in_mllib.
2016-05-18 11:48:46 +01:00
Takuya UESHIN a57aadae84 [SPARK-13902][SCHEDULER] Make DAGScheduler not to create duplicate stage.
## What changes were proposed in this pull request?

`DAGScheduler`sometimes generate incorrect stage graph.

Suppose you have the following DAG:

```
[A] <--(s_A)-- [B] <--(s_B)-- [C] <--(s_C)-- [D]
            \                /
              <-------------
```

Note: [] means an RDD, () means a shuffle dependency.

Here, RDD `B` has a shuffle dependency on RDD `A`, and RDD `C` has shuffle dependency on both `B` and `A`. The shuffle dependency IDs are numbers in the `DAGScheduler`, but to make the example easier to understand, let's call the shuffled data from `A` shuffle dependency ID `s_A` and the shuffled data from `B` shuffle dependency ID `s_B`.
The `getAncestorShuffleDependencies` method in `DAGScheduler` (incorrectly) does not check for duplicates when it's adding ShuffleDependencies to the parents data structure, so for this DAG, when `getAncestorShuffleDependencies` gets called on `C` (previous of the final RDD), `getAncestorShuffleDependencies` will return `s_A`, `s_B`, `s_A` (`s_A` gets added twice: once when the method "visit"s RDD `C`, and once when the method "visit"s RDD `B`). This is problematic because this line of code: https://github.com/apache/spark/blob/8ef3399/core/src/main/scala/org/apache/spark/scheduler/DAGScheduler.scala#L289 then generates a new shuffle stage for each dependency returned by `getAncestorShuffleDependencies`, resulting in duplicate map stages that compute the map output from RDD `A`.

As a result, `DAGScheduler` generates the following stages and their parents for each shuffle:

| | stage | parents |
|----|----|----|
| s_A | ShuffleMapStage 2 | List() |
| s_B | ShuffleMapStage 1 | List(ShuffleMapStage 0) |
| s_C | ShuffleMapStage 3 | List(ShuffleMapStage 1, ShuffleMapStage 2) |
| - | ResultStage 4 | List(ShuffleMapStage 3) |

The stage for s_A should be `ShuffleMapStage 0`, but the stage for `s_A` is generated twice as `ShuffleMapStage 2` and `ShuffleMapStage 0` is overwritten by `ShuffleMapStage 2`, and the stage `ShuffleMap Stage1` keeps referring the old stage `ShuffleMapStage 0`.

This patch is fixing it.

## How was this patch tested?

I added the sample RDD graph to show the illegal stage graph to `DAGSchedulerSuite`.

Author: Takuya UESHIN <ueshin@happy-camper.st>

Closes #12655 from ueshin/issues/SPARK-13902.
2016-05-12 12:36:18 -07:00
Sandeep Singh ff92eb2e80 [SPARK-15080][CORE] Break copyAndReset into copy and reset
## What changes were proposed in this pull request?
Break copyAndReset into two methods copy and reset instead of just one.

## How was this patch tested?
Existing Tests

Author: Sandeep Singh <sandeep@techaddict.me>

Closes #12936 from techaddict/SPARK-15080.
2016-05-12 11:12:09 +08:00
Andrew Or bb88ad4e0e [SPARK-15260] Atomically resize memory pools
## What changes were proposed in this pull request?

When we acquire execution memory, we do a lot of things between shrinking the storage memory pool and enlarging the execution memory pool. In particular, we call `memoryStore.evictBlocksToFreeSpace`, which may do a lot of I/O and can throw exceptions. If an exception is thrown, the pool sizes on that executor will be in a bad state.

This patch minimizes the things we do between the two calls to make the resizing more atomic.

## How was this patch tested?

Jenkins.

Author: Andrew Or <andrew@databricks.com>

Closes #13039 from andrewor14/safer-pool.
2016-05-11 12:58:57 -07:00
Eric Liang 6d0368ab8d [SPARK-15259] Sort time metric should not include spill and record insertion time
## What changes were proposed in this pull request?

After SPARK-14669 it seems the sort time metric includes both spill and record insertion time. This makes it not very useful since the metric becomes close to the total execution time of the node.

We should track just the time spent for in-memory sort, as before.

## How was this patch tested?

Verified metric in the UI, also unit test on UnsafeExternalRowSorter.

cc davies

Author: Eric Liang <ekl@databricks.com>
Author: Eric Liang <ekhliang@gmail.com>

Closes #13035 from ericl/fix-metrics.
2016-05-11 11:25:46 -07:00
Wenchen Fan bcfee153b1 [SPARK-12837][CORE] reduce network IO for accumulators
Sending un-updated accumulators back to driver makes no sense, as merging a zero value accumulator is a no-op. We should only send back updated accumulators, to save network IO.

new test in `TaskContextSuite`

Author: Wenchen Fan <wenchen@databricks.com>

Closes #12899 from cloud-fan/acc.
2016-05-10 11:16:56 -07:00
Marcelo Vanzin 0b9cae4242 [SPARK-11249][LAUNCHER] Throw error if app resource is not provided.
Without this, the code would build an invalid spark-submit command line,
and a more cryptic error would be presented to the user. Also, expose
a constant that allows users to set a dummy resource in cases where
they don't need an actual resource file; for backwards compatibility,
that uses the same "spark-internal" resource that Spark itself uses.

Tested via unit tests, run-example, spark-shell, and running the
thrift server with mixed spark and hive command line arguments.

Author: Marcelo Vanzin <vanzin@cloudera.com>

Closes #12909 from vanzin/SPARK-11249.
2016-05-10 10:35:54 -07:00
Sital Kedia a019e6efb7 [SPARK-14542][CORE] PipeRDD should allow configurable buffer size for…
## What changes were proposed in this pull request?

Currently PipedRDD internally uses PrintWriter to write data to the stdin of the piped process, which by default uses a BufferedWriter of buffer size 8k. In our experiment, we have seen that 8k buffer size is too small and the job spends significant amount of CPU time in system calls to copy the data. We should have a way to configure the buffer size for the writer.

## How was this patch tested?
Ran PipedRDDSuite tests.

Author: Sital Kedia <skedia@fb.com>

Closes #12309 from sitalkedia/bufferedPipedRDD.
2016-05-10 15:28:35 +01:00
Alex Bozarth c3e23bc0c3 [SPARK-10653][CORE] Remove unnecessary things from SparkEnv
## What changes were proposed in this pull request?

Removed blockTransferService and sparkFilesDir from SparkEnv since they're rarely used and don't need to be in stored in the env. Edited their few usages to accommodate the change.

## How was this patch tested?

ran dev/run-tests locally

Author: Alex Bozarth <ajbozart@us.ibm.com>

Closes #12970 from ajbozarth/spark10653.
2016-05-09 11:51:37 -07:00
Thomas Graves cc95f1ed5f [SPARK-1239] Improve fetching of map output statuses
The main issue we are trying to solve is the memory bloat of the Driver when tasks request the map output statuses.  This means with a large number of tasks you either need a huge amount of memory on Driver or you have to repartition to smaller number.  This makes it really difficult to run over say 50000 tasks.

The main issues that cause the memory bloat are:
1) no flow control on sending the map output status responses.  We serialize the map status output  and then hand off to netty to send.  netty is sending asynchronously and it can't send them fast enough to keep up with incoming requests so we end up with lots of copies of the serialized map output statuses sitting there and this causes huge bloat when you have 10's of thousands of tasks and map output status is in the 10's of MB.
2) When initial reduce tasks are started up, they all request the map output statuses from the Driver. These requests are handled by multiple threads in parallel so even though we check to see if we have a cached version, initially when we don't have a cached version yet, many of initial requests can all end up serializing the exact same map output statuses.

This patch does a couple of things:
- When the map output status size is over a threshold (default 512K) then it uses broadcast to send the map statuses.  This means we no longer serialize a large map output status and thus we don't have issues with memory bloat.  the messages sizes are now in the 300-400 byte range and the map status output are broadcast. If its under the threadshold it sends it as before, the message contains the DIRECT indicator now.
- synchronize the incoming requests to allow one thread to cache the serialized output and broadcast the map output status  that can then be used by everyone else.  This ensures we don't create multiple broadcast variables when we don't need to.  To ensure this happens I added a second thread pool which the Dispatcher hands the requests to so that those threads can block without blocking the main dispatcher threads (which would cause things like heartbeats and such not to come through)

Note that some of design and code was contributed by mridulm

## How was this patch tested?

Unit tests and a lot of manually testing.
Ran with akka and netty rpc. Ran with both dynamic allocation on and off.

one of the large jobs I used to test this was a join of 15TB of data.  it had 200,000 map tasks, and  20,000 reduce tasks. Executors ranged from 200 to 2000.  This job ran successfully with 5GB of memory on the driver with these changes. Without these changes I was using 20GB and only had 500 reduce tasks.  The job has 50mb of serialized map output statuses and took roughly the same amount of time for the executors to get the map output statuses as before.

Ran a variety of other jobs, from large wordcounts to small ones not using broadcasts.

Author: Thomas Graves <tgraves@staydecay.corp.gq1.yahoo.com>

Closes #12113 from tgravescs/SPARK-1239.
2016-05-06 19:31:26 -07:00
Ryan Blue 08db491265 [SPARK-9926] Parallelize partition logic in UnionRDD.
This patch has the new logic from #8512 that uses a parallel collection to compute partitions in UnionRDD. The rest of #8512 added an alternative code path for calculating splits in S3, but that isn't necessary to get the same speedup. The underlying problem wasn't that bulk listing wasn't used, it was that an extra FileStatus was retrieved for each file. The fix was just committed as [HADOOP-12810](https://issues.apache.org/jira/browse/HADOOP-12810). (I think the original commit also used a single prefix to enumerate all paths, but that isn't always helpful and it was removed in later versions so there is no need for SparkS3Utils.)

I tested this using the same table that piapiaozhexiu was using. Calculating splits for a 10-day period took 25 seconds with this change and HADOOP-12810, which is on par with the results from #8512.

Author: Ryan Blue <blue@apache.org>
Author: Cheolsoo Park <cheolsoop@netflix.com>

Closes #11242 from rdblue/SPARK-9926-parallelize-union-rdd.
2016-05-05 14:40:37 -07:00
Sebastien Rainville eb019af9a9 [SPARK-13001][CORE][MESOS] Prevent getting offers when reached max cores
Similar to https://github.com/apache/spark/pull/8639

This change rejects offers for 120s when reached `spark.cores.max` in coarse-grained mode to mitigate offer starvation. This prevents Mesos to send us offers again and again, starving other frameworks. This is especially problematic when running many small frameworks on the same Mesos cluster, e.g. many small Sparks streaming jobs, and cause the bigger spark jobs to stop receiving offers. By rejecting the offers for a long period of time, they become available to those other frameworks.

Author: Sebastien Rainville <sebastien@hopper.com>

Closes #10924 from sebastienrainville/master.
2016-05-04 14:32:36 -07:00
Bryan Cutler cf2e9da612 [SPARK-12299][CORE] Remove history serving functionality from Master
Remove history server functionality from standalone Master.  Previously, the Master process rebuilt a SparkUI once the application was completed which sometimes caused problems, such as OOM, when the application event log is large (see SPARK-6270).  Keeping this functionality out of the Master will help to simplify the process and increase stability.

Testing for this change included running core unit tests and manually running an application on a standalone cluster to verify that it completed successfully and that the Master UI functioned correctly.  Also added 2 unit tests to verify killing an application and driver from MasterWebUI makes the correct request to the Master.

Author: Bryan Cutler <cutlerb@gmail.com>

Closes #10991 from BryanCutler/remove-history-master-SPARK-12299.
2016-05-04 14:29:54 -07:00
Reynold Xin 6274a520fa [SPARK-15115][SQL] Reorganize whole stage codegen benchmark suites
## What changes were proposed in this pull request?
We currently have a single suite that is very large, making it difficult to maintain and play with specific primitives. This patch reorganizes the file by creating multiple benchmark suites in a single package.

Most of the changes are straightforward move of code. On top of the code moving, I did:
1. Use SparkSession instead of SQLContext.
2. Turned most benchmark scenarios into a their own test cases, rather than having multiple scenarios in a single test case, which takes forever to run.

## How was this patch tested?
This is a test only change.

Author: Reynold Xin <rxin@databricks.com>

Closes #12891 from rxin/SPARK-15115.
2016-05-04 11:00:01 -07:00
Dhruve Ashar a45647746d [SPARK-4224][CORE][YARN] Support group acls
## What changes were proposed in this pull request?
Currently only a list of users can be specified for view and modify acls. This change enables a group of admins/devs/users to be provisioned for viewing and modifying Spark jobs.

**Changes Proposed in the fix**
Three new corresponding config entries have been added where the user can specify the groups to be given access.

```
spark.admin.acls.groups
spark.modify.acls.groups
spark.ui.view.acls.groups
```

New config entries were added because specifying the users and groups explicitly is a better and cleaner way compared to specifying them in the existing config entry using a delimiter.

A generic trait has been introduced to provide the user to group mapping which makes it pluggable to support a variety of mapping protocols - similar to the one used in hadoop. A default unix shell based implementation has been provided.
Custom user to group mapping protocol can be specified and configured by the entry ```spark.user.groups.mapping```

**How the patch was Tested**
We ran different spark jobs setting the config entries in combinations of admin, modify and ui acls. For modify acls we tried killing the job stages from the ui and using yarn commands. For view acls we tried accessing the UI tabs and the logs. Headless accounts were used to launch these jobs and different users tried to modify and view the jobs to ensure that the groups mapping applied correctly.

Additional Unit tests have been added without modifying the existing ones. These test for different ways of setting the acls through configuration and/or API and validate the expected behavior.

Author: Dhruve Ashar <dhruveashar@gmail.com>

Closes #12760 from dhruve/impr/SPARK-4224.
2016-05-04 08:45:43 -05:00
Reynold Xin 695f0e9195 [SPARK-15107][SQL] Allow varying # iterations by test case in Benchmark
## What changes were proposed in this pull request?
This patch changes our micro-benchmark util to allow setting different iteration numbers for different test cases. For some of our benchmarks, turning off whole-stage codegen can make the runtime 20X slower, making it very difficult to run a large number of times without substantially shortening the input cardinality.

With this change, I set the default num iterations to 2 for whole stage codegen off, and 5 for whole stage codegen on. I also updated some results.

## How was this patch tested?
N/A - this is a test util.

Author: Reynold Xin <rxin@databricks.com>

Closes #12884 from rxin/SPARK-15107.
2016-05-03 22:56:40 -07:00
Thomas Graves 83ee92f603 [SPARK-11316] coalesce doesn't handle UnionRDD with partial locality properly
## What changes were proposed in this pull request?

coalesce doesn't handle UnionRDD with partial locality properly.  I had a user who had a UnionRDD that was made up of mapPartitionRDD without preferred locations and a checkpointedRDD with preferred locations (getting from hdfs).  It took the driver over 20 minutes to setup the groups and put the partitions into those groups before it even started any tasks.  Even perhaps worse is it didn't end up with the number of partitions he was asking for because it didn't put a partition in each of the groups properly.

The changes in this patch get rid of a n^2 while loop that was causing the 20 minutes, it properly distributes the partitions to have at least one per group, and it changes from using the rotation iterator which got the preferred locations many times to get all the preferred locations once up front.

Note that the n^2 while loop that I removed in setupGroups took so long because all of the partitions with preferred locations were already assigned to group, so it basically looped through every single one and wasn't ever able to assign it.  At the time I had 960 partitions with preferred locations and 1020 without and did the outer while loop 319 times because that is the # of groups left to create.  Note that each of those times through the inner while loop is going off to hdfs to get the block locations, so this is extremely inefficient.

## How was the this patch tested?

Added unit tests for this case and ran existing ones that applied to make sure no regressions.
Also manually tested on the users production job to make sure it fixed their issue.  It created the proper number of partitions and now it takes about 6 seconds rather then 20 minutes.
 I did also run some basic manual tests with spark-shell doing coalesced to smaller number, same number, and then greater with shuffle.

Author: Thomas Graves <tgraves@prevailsail.corp.gq1.yahoo.com>

Closes #11327 from tgravescs/SPARK-11316.
2016-05-03 13:43:20 -07:00
Sandeep Singh 84b3a4a873 [SPARK-15082][CORE] Improve unit test coverage for AccumulatorV2
## What changes were proposed in this pull request?
Added tests for ListAccumulator and LegacyAccumulatorWrapper, test for ListAccumulator is one similar to old Collection Accumulators

## How was this patch tested?
Ran tests locally.

cc rxin

Author: Sandeep Singh <sandeep@techaddict.me>

Closes #12862 from techaddict/SPARK-15082.
2016-05-03 11:45:51 -07:00
Sandeep Singh ca813330c7 [SPARK-15087][CORE][SQL] Remove AccumulatorV2.localValue and keep only value
## What changes were proposed in this pull request?
Remove AccumulatorV2.localValue and keep only value

## How was this patch tested?
existing tests

Author: Sandeep Singh <sandeep@techaddict.me>

Closes #12865 from techaddict/SPARK-15087.
2016-05-03 11:38:43 -07:00
Reynold Xin d557a5e01e [SPARK-15081] Move AccumulatorV2 and subclasses into util package
## What changes were proposed in this pull request?
This patch moves AccumulatorV2 and subclasses into util package.

## How was this patch tested?
Updated relevant tests.

Author: Reynold Xin <rxin@databricks.com>

Closes #12863 from rxin/SPARK-15081.
2016-05-03 19:45:12 +08:00