Commit graph

339 commits

Author SHA1 Message Date
Michael Mior 1437e344ec [SPARK-22050][CORE] Allow BlockUpdated events to be optionally logged to the event log
## What changes were proposed in this pull request?

I see that block updates are not logged to the event log.
This makes sense as a default for performance reasons.
However, I find it helpful when trying to get a better understanding of caching for a job to be able to log these updates.
This PR adds a configuration setting `spark.eventLog.blockUpdates` (defaulting to false) which allows block updates to be recorded in the log.
This contribution is original work which is licensed to the Apache Spark project.

## How was this patch tested?

Current and additional unit tests.

Author: Michael Mior <mmior@uwaterloo.ca>

Closes #19263 from michaelmior/log-block-updates.
2017-10-17 14:30:52 -07:00
jerryshao e1960c3d6f [SPARK-22062][CORE] Spill large block to disk in BlockManager's remote fetch to avoid OOM
## What changes were proposed in this pull request?

In the current BlockManager's `getRemoteBytes`, it will call `BlockTransferService#fetchBlockSync` to get remote block. In the `fetchBlockSync`, Spark will allocate a temporary `ByteBuffer` to store the whole fetched block. This will potentially lead to OOM if block size is too big or several blocks are fetched simultaneously in this executor.

So here leveraging the idea of shuffle fetch, to spill the large block to local disk before consumed by upstream code. The behavior is controlled by newly added configuration, if block size is smaller than the threshold, then this block will be persisted in memory; otherwise it will first spill to disk, and then read from disk file.

To achieve this feature, what I did is:

1. Rename `TempShuffleFileManager` to `TempFileManager`, since now it is not only used by shuffle.
2. Add a new `TempFileManager` to manage the files of fetched remote blocks, the files are tracked by weak reference, will be deleted when no use at all.

## How was this patch tested?

This was tested by adding UT, also manual verification in local test to perform GC to clean the files.

Author: jerryshao <sshao@hortonworks.com>

Closes #19476 from jerryshao/SPARK-22062.
2017-10-17 22:54:38 +08:00
liuxian b8a08f25cc [SPARK-21506][DOC] The description of "spark.executor.cores" may be not correct
## What changes were proposed in this pull request?

The number of cores assigned to each executor is configurable. When this is not explicitly set,  multiple executors from the same application may be launched on the same worker too.

## How was this patch tested?
N/A

Author: liuxian <liu.xian3@zte.com.cn>

Closes #18711 from 10110346/executorcores.
2017-10-10 20:44:33 +08:00
Sanket Chintapalli 1662e93119 [SPARK-21501] Change CacheLoader to limit entries based on memory footprint
Right now the spark shuffle service has a cache for index files. It is based on a # of files cached (spark.shuffle.service.index.cache.entries). This can cause issues if people have a lot of reducers because the size of each entry can fluctuate based on the # of reducers.
We saw an issues with a job that had 170000 reducers and it caused NM with spark shuffle service to use 700-800MB or memory in NM by itself.
We should change this cache to be memory based and only allow a certain memory size used. When I say memory based I mean the cache should have a limit of say 100MB.

https://issues.apache.org/jira/browse/SPARK-21501

Manual Testing with 170000 reducers has been performed with cache loaded up to max 100MB default limit, with each shuffle index file of size 1.3MB. Eviction takes place as soon as the total cache size reaches the 100MB limit and the objects will be ready for garbage collection there by avoiding NM to crash. No notable difference in runtime has been observed.

Author: Sanket Chintapalli <schintap@yahoo-inc.com>

Closes #18940 from redsanket/SPARK-21501.
2017-08-23 11:51:11 -05:00
hzyaoqin 41568e9a0f [SPARK-21637][SPARK-21451][SQL] get spark.hadoop.* properties from sysProps to hiveconf
## What changes were proposed in this pull request?
When we use `bin/spark-sql` command configuring `--conf spark.hadoop.foo=bar`, the `SparkSQLCliDriver` initializes an instance of  hiveconf, it does not add `foo->bar` to it.
this pr gets `spark.hadoop.*` properties from sysProps to this hiveconf

## How was this patch tested?
UT

Author: hzyaoqin <hzyaoqin@corp.netease.com>
Author: Kent Yao <yaooqinn@hotmail.com>

Closes #18668 from yaooqinn/SPARK-21451.
2017-08-05 17:30:47 -07:00
Sean Owen b1d59e60de [SPARK-21593][DOCS] Fix 2 rendering errors on configuration page
## What changes were proposed in this pull request?

Fix 2 rendering errors on configuration doc page, due to SPARK-21243 and SPARK-15355.

## How was this patch tested?

Manually built and viewed docs with jekyll

Author: Sean Owen <sowen@cloudera.com>

Closes #18793 from srowen/SPARK-21593.
2017-08-01 19:05:55 +01:00
jinxing cfb25b27c0 [SPARK-21530] Update description of spark.shuffle.maxChunksBeingTransferred.
## What changes were proposed in this pull request?

Update the description of `spark.shuffle.maxChunksBeingTransferred` to include that the new coming connections will be closed when the max is hit and client should have retry mechanism.

Author: jinxing <jinxing6042@126.com>

Closes #18735 from jinxing64/SPARK-21530.
2017-07-27 11:55:48 +08:00
jinxing 799e13161e [SPARK-21175] Reject OpenBlocks when memory shortage on shuffle service.
## What changes were proposed in this pull request?

A shuffle service can serves blocks from multiple apps/tasks. Thus the shuffle service can suffers high memory usage when lots of shuffle-reads happen at the same time. In my cluster, OOM always happens on shuffle service. Analyzing heap dump, memory cost by Netty(ChannelOutboundBufferEntry) can be up to 2~3G. It might make sense to reject "open blocks" request when memory usage is high on shuffle service.

93dd0c518d and 85c6ce6193 tried to alleviate the memory pressure on shuffle service but cannot solve the root cause. This pr proposes to control currency of shuffle read.

## How was this patch tested?
Added unit test.

Author: jinxing <jinxing6042@126.com>

Closes #18388 from jinxing64/SPARK-21175.
2017-07-25 20:52:07 +08:00
Dhruve Ashar ef61775586 [SPARK-21243][Core] Limit no. of map outputs in a shuffle fetch
## What changes were proposed in this pull request?
For configurations with external shuffle enabled, we have observed that if a very large no. of blocks are being fetched from a remote host, it puts the NM under extra pressure and can crash it. This change introduces a configuration `spark.reducer.maxBlocksInFlightPerAddress` , to limit the no. of map outputs being fetched from a given remote address. The changes applied here are applicable for both the scenarios - when external shuffle is enabled as well as disabled.

## How was this patch tested?
Ran the job with the default configuration which does not change the existing behavior and ran it with few configurations of lower values -10,20,50,100. The job ran fine and there is no change in the output. (I will update the metrics related to NM in some time.)

Author: Dhruve Ashar <dhruveashar@gmail.com>

Closes #18487 from dhruve/impr/SPARK-21243.
2017-07-19 15:53:28 -05:00
jerryshao 457dc9ccbf [MINOR][DOC] Improve the docs about how to correctly set configurations
## What changes were proposed in this pull request?

Spark provides several ways to set configurations, either from configuration file, or from `spark-submit` command line options, or programmatically through `SparkConf` class. It may confuses beginners why some configurations set through `SparkConf` cannot take affect. So here add some docs to address this problems and let beginners know how to correctly set configurations.

## How was this patch tested?

N/A

Author: jerryshao <sshao@hortonworks.com>

Closes #18552 from jerryshao/improve-doc.
2017-07-10 11:22:28 +08:00
jinxing 062c336d06 [SPARK-21343] Refine the document for spark.reducer.maxReqSizeShuffleToMem.
## What changes were proposed in this pull request?

In current code, reducer can break the old shuffle service when `spark.reducer.maxReqSizeShuffleToMem` is enabled. Let's refine document.

Author: jinxing <jinxing6042@126.com>

Closes #18566 from jinxing64/SPARK-21343.
2017-07-09 00:27:58 +08:00
jerryshao 5800144a54 [SPARK-21012][SUBMIT] Add glob support for resources adding to Spark
Current "--jars (spark.jars)", "--files (spark.files)", "--py-files (spark.submit.pyFiles)" and "--archives (spark.yarn.dist.archives)" only support non-glob path. This is OK for most of the cases, but when user requires to add more jars, files into Spark, it is too verbose to list one by one. So here propose to add glob path support for resources.

Also improving the code of downloading resources.

## How was this patch tested?

UT added, also verified manually in local cluster.

Author: jerryshao <sshao@hortonworks.com>

Closes #18235 from jerryshao/SPARK-21012.
2017-07-06 15:32:49 +08:00
sadikovi 960298ee66 [SPARK-20858][DOC][MINOR] Document ListenerBus event queue size
## What changes were proposed in this pull request?

This change adds a new configuration option `spark.scheduler.listenerbus.eventqueue.size` to the configuration docs to specify the capacity of the spark listener bus event queue. Default value is 10000.

This is doc PR for [SPARK-15703](https://issues.apache.org/jira/browse/SPARK-15703).

I added option to the `Scheduling` section, however it might be more related to `Spark UI` section.

## How was this patch tested?

Manually verified correct rendering of configuration option.

Author: sadikovi <ivan.sadikov@lincolnuni.ac.nz>
Author: Ivan Sadikov <ivan.sadikov@team.telstra.com>

Closes #18476 from sadikovi/SPARK-20858.
2017-07-05 14:40:44 +01:00
Shixiong Zhu 80f7ac3a60 [SPARK-21253][CORE] Disable spark.reducer.maxReqSizeShuffleToMem
## What changes were proposed in this pull request?

Disable spark.reducer.maxReqSizeShuffleToMem because it breaks the old shuffle service.

Credits to wangyum

Closes #18466

## How was this patch tested?

Jenkins

Author: Shixiong Zhu <shixiong@databricks.com>
Author: Yuming Wang <wgyumg@gmail.com>

Closes #18467 from zsxwing/SPARK-21253.
2017-06-30 11:02:22 +08:00
jerryshao 9e50a1d37a [SPARK-13669][SPARK-20898][CORE] Improve the blacklist mechanism to handle external shuffle service unavailable situation
## What changes were proposed in this pull request?

Currently we are running into an issue with Yarn work preserving enabled + external shuffle service.
In the work preserving enabled scenario, the failure of NM will not lead to the exit of executors, so executors can still accept and run the tasks. The problem here is when NM is failed, external shuffle service is actually inaccessible, so reduce tasks will always complain about the “Fetch failure”, and the failure of reduce stage will make the parent stage (map stage) rerun. The tricky thing here is Spark scheduler is not aware of the unavailability of external shuffle service, and will reschedule the map tasks on the executor where NM is failed, and again reduce stage will be failed with “Fetch failure”, and after 4 retries, the job is failed. This could also apply to other cluster manager with external shuffle service.

So here the main problem is that we should avoid assigning tasks to those bad executors (where shuffle service is unavailable). Current Spark's blacklist mechanism could blacklist executors/nodes by failure tasks, but it doesn't handle this specific fetch failure scenario. So here propose to improve the current application blacklist mechanism to handle fetch failure issue (especially with external shuffle service unavailable issue), to blacklist the executors/nodes where shuffle fetch is unavailable.

## How was this patch tested?

Unit test and small cluster verification.

Author: jerryshao <sshao@hortonworks.com>

Closes #17113 from jerryshao/SPARK-13669.
2017-06-26 11:14:03 -05:00
Yuming Wang 987eb8fadd [MINOR][DOCS] Add lost <tr> tag for configuration.md
## What changes were proposed in this pull request?

Add lost `<tr>` tag for `configuration.md`.

## How was this patch tested?
N/A

Author: Yuming Wang <wgyumg@gmail.com>

Closes #18372 from wangyum/docs-missing-tr.
2017-06-21 15:30:31 +01:00
Li Yichao d107b3b910 [SPARK-20640][CORE] Make rpc timeout and retry for shuffle registration configurable.
## What changes were proposed in this pull request?

Currently the shuffle service registration timeout and retry has been hardcoded. This works well for small workloads but under heavy workload when the shuffle service is busy transferring large amount of data we see significant delay in responding to the registration request, as a result we often see the executors fail to register with the shuffle service, eventually failing the job. We need to make these two parameters configurable.

## How was this patch tested?

* Updated `BlockManagerSuite` to test registration timeout and max attempts configuration actually works.

cc sitalkedia

Author: Li Yichao <lyc@zhihu.com>

Closes #18092 from liyichao/SPARK-20640.
2017-06-21 21:54:29 +08:00
liuzhaokun 0d8604bb84 [SPARK-21126] The configuration which named "spark.core.connection.auth.wait.timeout" hasn't been used in spark
[https://issues.apache.org/jira/browse/SPARK-21126](https://issues.apache.org/jira/browse/SPARK-21126)
The configuration which named "spark.core.connection.auth.wait.timeout" hasn't been used in spark,so I think it should be removed from configuration.md.

Author: liuzhaokun <liu.zhaokun@zte.com.cn>

Closes #18333 from liu-zhaokun/new3.
2017-06-18 08:32:29 +01:00
jerryshao 06c0544113 [SPARK-20981][SPARKSUBMIT] Add new configuration spark.jars.repositories as equivalence of --repositories
## What changes were proposed in this pull request?

In our use case of launching Spark applications via REST APIs (Livy), there's no way for user to specify command line arguments, all Spark configurations are set through configurations map. For "--repositories" because there's no equivalent Spark configuration, so we cannot specify the custom repository through configuration.

So here propose to add "--repositories" equivalent configuration in Spark.

## How was this patch tested?

New UT added.

Author: jerryshao <sshao@hortonworks.com>

Closes #18201 from jerryshao/SPARK-20981.
2017-06-05 11:06:50 -07:00
jinxing 3f94e64aa8 [SPARK-19659] Fetch big blocks to disk when shuffle-read.
## What changes were proposed in this pull request?

Currently the whole block is fetched into memory(off heap by default) when shuffle-read. A block is defined by (shuffleId, mapId, reduceId). Thus it can be large when skew situations. If OOM happens during shuffle read, job will be killed and users will be notified to "Consider boosting spark.yarn.executor.memoryOverhead". Adjusting parameter and allocating more memory can resolve the OOM. However the approach is not perfectly suitable for production environment, especially for data warehouse.
Using Spark SQL as data engine in warehouse, users hope to have a unified parameter(e.g. memory) but less resource wasted(resource is allocated but not used). The hope is strong especially when migrating data engine to Spark from another one(e.g. Hive). Tuning the parameter for thousands of SQLs one by one is very time consuming.
It's not always easy to predict skew situations, when happen, it make sense to fetch remote blocks to disk for shuffle-read, rather than kill the job because of OOM.

In this pr, I propose to fetch big blocks to disk(which is also mentioned in SPARK-3019):

1. Track average size and also the outliers(which are larger than 2*avgSize) in MapStatus;
2. Request memory from `MemoryManager` before fetch blocks and release the memory to `MemoryManager` when `ManagedBuffer` is released.
3. Fetch remote blocks to disk when failing acquiring memory from `MemoryManager`, otherwise fetch to memory.

This is an improvement for memory control when shuffle blocks and help to avoid OOM in scenarios like below:
1. Single huge block;
2. Sizes of many blocks are underestimated in `MapStatus` and the actual footprint of blocks is much larger than the estimated.

## How was this patch tested?
Added unit test in `MapStatusSuite` and `ShuffleBlockFetcherIteratorSuite`.

Author: jinxing <jinxing6042@126.com>

Closes #16989 from jinxing64/SPARK-19659.
2017-05-25 16:11:30 +08:00
jinxing 2597674bcc [SPARK-20801] Record accurate size of blocks in MapStatus when it's above threshold.
## What changes were proposed in this pull request?

Currently, when number of reduces is above 2000, HighlyCompressedMapStatus is used to store size of blocks. in HighlyCompressedMapStatus, only average size is stored for non empty blocks. Which is not good for memory control when we shuffle blocks. It makes sense to store the accurate size of block when it's above threshold.

## How was this patch tested?

Added test in MapStatusSuite.

Author: jinxing <jinxing6042@126.com>

Closes #18031 from jinxing64/SPARK-20801.
2017-05-22 22:09:49 +08:00
Mark Grover 66636ef0b0 [SPARK-20435][CORE] More thorough redaction of sensitive information
This change does a more thorough redaction of sensitive information from logs and UI
Add unit tests that ensure that no regressions happen that leak sensitive information to the logs.

The motivation for this change was appearance of password like so in `SparkListenerEnvironmentUpdate` in event logs under some JVM configurations:
`"sun.java.command":"org.apache.spark.deploy.SparkSubmit ... --conf spark.executorEnv.HADOOP_CREDSTORE_PASSWORD=secret_password ..."
`
Previously redaction logic was only checking if the key matched the secret regex pattern, it'd redact it's value. That worked for most cases. However, in the above case, the key (sun.java.command) doesn't tell much, so the value needs to be searched. This PR expands the check to check for values as well.

## How was this patch tested?

New unit tests added that ensure that no sensitive information is present in the event logs or the yarn logs. Old unit test in UtilsSuite was modified because the test was asserting that a non-sensitive property's value won't be redacted. However, the non-sensitive value had the literal "secret" in it which was causing it to redact. Simply updating the non-sensitive property's value to another arbitrary value (that didn't have "secret" in it) fixed it.

Author: Mark Grover <mark@apache.org>

Closes #17725 from markgrover/spark-20435.
2017-04-26 17:06:21 -07:00
anabranch 7a365257e9 [SPARK-20400][DOCS] Remove References to 3rd Party Vendor Tools
## What changes were proposed in this pull request?

Simple documentation change to remove explicit vendor references.

## How was this patch tested?

NA

Please review http://spark.apache.org/contributing.html before opening a pull request.

Author: anabranch <bill@databricks.com>

Closes #17695 from anabranch/remove-vendor.
2017-04-26 09:49:05 +01:00
ding 0a7f5f2798 [SPARK-5484][GRAPHX] Periodically do checkpoint in Pregel
## What changes were proposed in this pull request?

Pregel-based iterative algorithms with more than ~50 iterations begin to slow down and eventually fail with a StackOverflowError due to Spark's lack of support for long lineage chains.

This PR causes Pregel to checkpoint the graph periodically if the checkpoint directory is set.
This PR moves PeriodicGraphCheckpointer.scala from mllib to graphx, moves PeriodicRDDCheckpointer.scala, PeriodicCheckpointer.scala from mllib to core
## How was this patch tested?

unit tests, manual tests
(Please explain how this patch was tested. E.g. unit tests, integration tests, manual tests)

(If this patch involves UI changes, please attach a screenshot; otherwise, remove this)

Author: ding <ding@localhost.localdomain>
Author: dding3 <ding.ding@intel.com>
Author: Michael Allman <michael@videoamp.com>

Closes #15125 from dding3/cp2_pregel.
2017-04-25 11:20:32 -07:00
郭小龙 10207633 ad290402aa [SPARK-20401][DOC] In the spark official configuration document, the 'spark.driver.supervise' configuration parameter specification and default values are necessary.
## What changes were proposed in this pull request?
Use the REST interface submits the spark job.
e.g.
curl -X  POST http://10.43.183.120:6066/v1/submissions/create --header "Content-Type:application/json;charset=UTF-8" --data'{
    "action": "CreateSubmissionRequest",
    "appArgs": [
        "myAppArgument"
    ],
    "appResource": "/home/mr/gxl/test.jar",
    "clientSparkVersion": "2.2.0",
    "environmentVariables": {
        "SPARK_ENV_LOADED": "1"
    },
    "mainClass": "cn.zte.HdfsTest",
    "sparkProperties": {
        "spark.jars": "/home/mr/gxl/test.jar",
        **"spark.driver.supervise": "true",**
        "spark.app.name": "HdfsTest",
        "spark.eventLog.enabled": "false",
        "spark.submit.deployMode": "cluster",
        "spark.master": "spark://10.43.183.120:6066"
    }
}'

**I hope that make sure that the driver is automatically restarted if it fails with non-zero exit code.
But I can not find the 'spark.driver.supervise' configuration parameter specification and default values from the spark official document.**
## How was this patch tested?

manual tests

Please review http://spark.apache.org/contributing.html before opening a pull request.

Author: 郭小龙 10207633 <guo.xiaolong1@zte.com.cn>
Author: guoxiaolong <guo.xiaolong1@zte.com.cn>
Author: guoxiaolongzte <guo.xiaolong1@zte.com.cn>

Closes #17696 from guoxiaolongzte/SPARK-20401.
2017-04-21 20:08:26 +01:00
郭小龙 10207633 cf5963c961 [SPARK-20177] Document about compression way has some little detail ch…
…anges.

## What changes were proposed in this pull request?

Document compression way little detail changes.
1.spark.eventLog.compress add 'Compression will use spark.io.compression.codec.'
2.spark.broadcast.compress add 'Compression will use spark.io.compression.codec.'
3,spark.rdd.compress add 'Compression will use spark.io.compression.codec.'
4.spark.io.compression.codec add 'event log describe'.

eg
Through the documents, I don't know  what is compression mode about 'event log'.

## How was this patch tested?

manual tests

Please review http://spark.apache.org/contributing.html before opening a pull request.

Author: 郭小龙 10207633 <guo.xiaolong1@zte.com.cn>

Closes #17498 from guoxiaolongzte/SPARK-20177.
2017-04-01 11:48:58 +01:00
Yuming Wang edc87d76ef [SPARK-20107][DOC] Add spark.hadoop.mapreduce.fileoutputcommitter.algorithm.version option to configuration.md
## What changes were proposed in this pull request?

Add `spark.hadoop.mapreduce.fileoutputcommitter.algorithm.version` option to `configuration.md`.
Set `spark.hadoop.mapreduce.fileoutputcommitter.algorithm.version=2` can speed up [HadoopMapReduceCommitProtocol.commitJob](https://github.com/apache/spark/blob/v2.1.0/core/src/main/scala/org/apache/spark/internal/io/HadoopMapReduceCommitProtocol.scala#L121) for many output files.

All cloudera's hadoop 2.6.0-cdh5.4.0 or higher versions(see: 1c12361823 and 16b2de2732) and apache's hadoop 2.7.0 or higher versions support this improvement.

More see:

1. [MAPREDUCE-4815](https://issues.apache.org/jira/browse/MAPREDUCE-4815): Speed up FileOutputCommitter#commitJob for many output files.
2. [MAPREDUCE-6406](https://issues.apache.org/jira/browse/MAPREDUCE-6406): Update the default version for the property mapreduce.fileoutputcommitter.algorithm.version to 2.

## How was this patch tested?

Manual test and exist tests.

Author: Yuming Wang <wgyumg@gmail.com>

Closes #17442 from wangyum/SPARK-20107.
2017-03-30 10:39:57 +01:00
Sital Kedia 7b5d873aef [SPARK-13369] Add config for number of consecutive fetch failures
The previously hardcoded max 4 retries per stage is not suitable for all cluster configurations. Since spark retries a stage at the sign of the first fetch failure, you can easily end up with many stage retries to discover all the failures. In particular, two scenarios this value should change are (1) if there are more than 4 executors per node; in that case, it may take 4 retries to discover the problem with each executor on the node and (2) during cluster maintenance on large clusters, where multiple machines are serviced at once, but you also cannot afford total cluster downtime. By making this value configurable, cluster managers can tune this value to something more appropriate to their cluster configuration.

Unit tests

Author: Sital Kedia <skedia@fb.com>

Closes #17307 from sitalkedia/SPARK-13369.
2017-03-17 09:33:58 -05:00
Shubham Chopra fa7c582e94 [SPARK-15355][CORE] Proactive block replication
## What changes were proposed in this pull request?

We are proposing addition of pro-active block replication in case of executor failures. BlockManagerMasterEndpoint does all the book-keeping to keep a track of all the executors and the blocks they hold. It also keeps a track of which executors are alive through heartbeats. When an executor is removed, all this book-keeping state is updated to reflect the lost executor. This step can be used to identify executors that are still in possession of a copy of the cached data and a message could be sent to them to use the existing "replicate" function to find and place new replicas on other suitable hosts. Blocks replicated this way will let the master know of their existence.

This can happen when an executor is lost, and would that way be pro-active as opposed be being done at query time.
## How was this patch tested?

This patch was tested with existing unit tests along with new unit tests added to test the functionality.

Author: Shubham Chopra <schopra31@bloomberg.net>

Closes #14412 from shubhamchopra/ProactiveBlockReplication.
2017-02-24 15:40:01 -08:00
José Hiram Soltren 6287c94f08 [SPARK-16554][CORE] Automatically Kill Executors and Nodes when they are Blacklisted
## What changes were proposed in this pull request?

In SPARK-8425, we introduced a mechanism for blacklisting executors and nodes (hosts). After a certain number of failures, these resources would be "blacklisted" and no further work would be assigned to them for some period of time.

In some scenarios, it is better to fail fast, and to simply kill these unreliable resources. This changes proposes to do so by having the BlacklistTracker kill unreliable resources when they would otherwise be "blacklisted".

In order to be thread safe, this code depends on the CoarseGrainedSchedulerBackend sending a message to the driver backend in order to do the actual killing. This also helps to prevent a race which would permit work to begin on a resource (executor or node), between the time the resource is marked for killing and the time at which it is finally killed.

## How was this patch tested?

./dev/run-tests
Ran https://github.com/jsoltren/jose-utils/blob/master/blacklist/test-blacklist.sh, and checked logs to see executors and nodes being killed.

Testing can likely be improved here; suggestions welcome.

Author: José Hiram Soltren <jose@cloudera.com>

Closes #16650 from jsoltren/SPARK-16554-submit.
2017-02-09 12:49:31 -06:00
Marcelo Vanzin 3fc8e8caf8 [SPARK-17874][CORE] Add SSL port configuration.
Make the SSL port configuration explicit, instead of deriving it
from the non-SSL port, but retain the existing functionality in
case anyone depends on it.

The change starts the HTTPS and HTTP connectors separately, so
that it's possible to use independent ports for each. For that to
work, the initialization of the server needs to be shuffled around
a bit. The change also makes it so the initialization of both
connectors is similar, and end up using the same Scheduler - previously
only the HTTP connector would use the correct one.

Also fixed some outdated documentation about a couple of services
that were removed long ago.

Tested with unit tests and by running spark-shell with SSL configs.

Author: Marcelo Vanzin <vanzin@cloudera.com>

Closes #16625 from vanzin/SPARK-17874.
2017-02-09 22:06:46 +09:00
Marcelo Vanzin 8f3f73abc1 [SPARK-19139][CORE] New auth mechanism for transport library.
This change introduces a new auth mechanism to the transport library,
to be used when users enable strong encryption. This auth mechanism
has better security than the currently used DIGEST-MD5.

The new protocol uses symmetric key encryption to mutually authenticate
the endpoints, and is very loosely based on ISO/IEC 9798.

The new protocol falls back to SASL when it thinks the remote end is old.
Because SASL does not support asking the server for multiple auth protocols,
which would mean we could re-use the existing SASL code by just adding a
new SASL provider, the protocol is implemented outside of the SASL API
to avoid the boilerplate of adding a new provider.

Details of the auth protocol are discussed in the included README.md
file.

This change partly undos the changes added in SPARK-13331; AES encryption
is now decoupled from SASL authentication. The encryption code itself,
though, has been re-used as part of this change.

## How was this patch tested?

- Unit tests
- Tested Spark 2.2 against Spark 1.6 shuffle service with SASL enabled
- Tested Spark 2.2 against Spark 2.2 shuffle service with SASL fallback disabled

Author: Marcelo Vanzin <vanzin@cloudera.com>

Closes #16521 from vanzin/SPARK-19139.
2017-01-24 10:44:04 -08:00
uncleGen 7c61c2a1c4
[DOCS] Fix typo in docs
## What changes were proposed in this pull request?

Fix typo in docs

## How was this patch tested?

Author: uncleGen <hustyugm@gmail.com>

Closes #16658 from uncleGen/typo-issue.
2017-01-24 11:32:11 +00:00
Yuming Wang c99492141b
[SPARK-19146][CORE] Drop more elements when stageData.taskData.size > retainedTasks
## What changes were proposed in this pull request?

Drop more elements when `stageData.taskData.size > retainedTasks` to reduce the number of times on call drop function.

## How was this patch tested?

Jenkins

Author: Yuming Wang <wgyumg@gmail.com>

Closes #16527 from wangyum/SPARK-19146.
2017-01-23 11:02:22 +00:00
Bryan Cutler 3bc2eff888 [SPARK-17568][CORE][DEPLOY] Add spark-submit option to override ivy settings used to resolve packages/artifacts
## What changes were proposed in this pull request?

Adding option in spark-submit to allow overriding the default IvySettings used to resolve artifacts as part of the Spark Packages functionality.  This will allow all artifact resolution to go through a central managed repository, such as Nexus or Artifactory, where site admins can better approve and control what is used with Spark apps.

This change restructures the creation of the IvySettings object in two distinct ways.  First, if the `spark.ivy.settings` option is not defined then `buildIvySettings` will create a default settings instance, as before, with defined repositories (Maven Central) included.  Second, if the option is defined, the ivy settings file will be loaded from the given path and only repositories defined within will be used for artifact resolution.
## How was this patch tested?

Existing tests for default behaviour, Manual tests that load a ivysettings.xml file with local and Nexus repositories defined.  Added new test to load a simple Ivy settings file with a local filesystem resolver.

Author: Bryan Cutler <cutlerb@gmail.com>
Author: Ian Hummel <ian@themodernlife.net>

Closes #15119 from BryanCutler/spark-custom-IvySettings.
2017-01-11 11:57:38 -08:00
Sean Owen 54138f6e89
[SPARK-19106][DOCS] Styling for the configuration docs is broken
## What changes were proposed in this pull request?

configuration.html section headings were not specified correctly in markdown and weren't rendering, being recognized correctly. Removed extra p tags and pulled level 4 titles up to level 3, since level 3 had been skipped. This improves the TOC.

## How was this patch tested?

Doc build, manual check.

Author: Sean Owen <sowen@cloudera.com>

Closes #16490 from srowen/SPARK-19106.
2017-01-07 19:15:51 +00:00
Yuexin Zhang 28ab0ec49f
[SPARK-19006][DOCS] mention spark.kryoserializer.buffer.max must be less than 2048m in doc
## What changes were proposed in this pull request?

On configuration doc page:https://spark.apache.org/docs/latest/configuration.html
We mentioned spark.kryoserializer.buffer.max : Maximum allowable size of Kryo serialization buffer. This must be larger than any object you attempt to serialize. Increase this if you get a "buffer limit exceeded" exception inside Kryo.
from source code, it has hard coded upper limit :
```
val maxBufferSizeMb = conf.getSizeAsMb("spark.kryoserializer.buffer.max", "64m").toInt
if (maxBufferSizeMb >= ByteUnit.GiB.toMiB(2))
{ throw new IllegalArgumentException("spark.kryoserializer.buffer.max must be less than " + s"2048 mb, got: + $maxBufferSizeMb mb.") }
```
We should mention "this value must be less than 2048 mb" on the configuration doc page as well.

## How was this patch tested?

None. Since it's minor doc change.

Author: Yuexin Zhang <yxzhang@cloudera.com>

Closes #16412 from cnZach/SPARK-19006.
2016-12-27 20:29:45 +00:00
Josh Rosen fa829ce21f [SPARK-18761][CORE] Introduce "task reaper" to oversee task killing in executors
## What changes were proposed in this pull request?

Spark's current task cancellation / task killing mechanism is "best effort" because some tasks may not be interruptible or may not respond to their "killed" flags being set. If a significant fraction of a cluster's task slots are occupied by tasks that have been marked as killed but remain running then this can lead to a situation where new jobs and tasks are starved of resources that are being used by these zombie tasks.

This patch aims to address this problem by adding a "task reaper" mechanism to executors. At a high-level, task killing now launches a new thread which attempts to kill the task and then watches the task and periodically checks whether it has been killed. The TaskReaper will periodically re-attempt to call `TaskRunner.kill()` and will log warnings if the task keeps running. I modified TaskRunner to rename its thread at the start of the task, allowing TaskReaper to take a thread dump and filter it in order to log stacktraces from the exact task thread that we are waiting to finish. If the task has not stopped after a configurable timeout then the TaskReaper will throw an exception to trigger executor JVM death, thereby forcibly freeing any resources consumed by the zombie tasks.

This feature is flagged off by default and is controlled by four new configurations under the `spark.task.reaper.*` namespace. See the updated `configuration.md` doc for details.

## How was this patch tested?

Tested via a new test case in `JobCancellationSuite`, plus manual testing.

Author: Josh Rosen <joshrosen@databricks.com>

Closes #16189 from JoshRosen/cancellation.
2016-12-19 18:43:59 -08:00
gatorsmile c0c9e1d27a
[SPARK-18918][DOC] Missing </td> in Configuration page
### What changes were proposed in this pull request?
The configuration page looks messy now, as shown in the nightly build:
https://people.apache.org/~pwendell/spark-nightly/spark-master-docs/latest/configuration.html

Starting from the following location:

![screenshot 2016-12-18 00 26 33](https://cloud.githubusercontent.com/assets/11567269/21292396/ace4719c-c4b8-11e6-8dfd-d9ab95be43d5.png)

### How was this patch tested?
Attached is the screenshot generated in my local computer after the fix.
[Configuration - Spark 2.2.0 Documentation.pdf](https://github.com/apache/spark/files/659315/Configuration.-.Spark.2.2.0.Documentation.pdf)

Author: gatorsmile <gatorsmile@gmail.com>

Closes #16327 from gatorsmile/docFix.
2016-12-18 09:02:04 +00:00
Imran Rashid 93cdb8a7d0 [SPARK-8425][CORE] Application Level Blacklisting
## What changes were proposed in this pull request?

This builds upon the blacklisting introduced in SPARK-17675 to add blacklisting of executors and nodes for an entire Spark application.  Resources are blacklisted based on tasks that fail, in tasksets that eventually complete successfully; they are automatically returned to the pool of active resources based on a timeout.  Full details are available in a design doc attached to the jira.
## How was this patch tested?

Added unit tests, ran them via Jenkins, also ran a handful of them in a loop to check for flakiness.

The added tests include:
- verifying BlacklistTracker works correctly
- verifying TaskSchedulerImpl interacts with BlacklistTracker correctly (via a mock BlacklistTracker)
- an integration test for the entire scheduler with blacklisting in a few different scenarios

Author: Imran Rashid <irashid@cloudera.com>
Author: mwws <wei.mao@intel.com>

Closes #14079 from squito/blacklist-SPARK-8425.
2016-12-15 08:29:56 -06:00
Marcelo Vanzin bc59951bab [SPARK-18773][CORE] Make commons-crypto config translation consistent.
This change moves the logic that translates Spark configuration to
commons-crypto configuration to the network-common module. It also
extends TransportConf and ConfigProvider to provide the necessary
interfaces for the translation to work.

As part of the change, I removed SystemPropertyConfigProvider, which
was mostly used as an "empty config" in unit tests, and adjusted the
very few tests that required a specific config.

I also changed the config keys for AES encryption to live under the
"spark.network." namespace, which is more correct than their previous
names under "spark.authenticate.".

Tested via existing unit test.

Author: Marcelo Vanzin <vanzin@cloudera.com>

Closes #16200 from vanzin/SPARK-18773.
2016-12-12 16:27:04 -08:00
Nicholas Chammas 18eaabb71e [SPARK-18719] Add spark.ui.showConsoleProgress to configuration docs
This PR adds `spark.ui.showConsoleProgress` to the configuration docs.

I tested this PR by building the docs locally and confirming that this change shows up as expected.

Relates to #3029.

Author: Nicholas Chammas <nicholas.chammas@gmail.com>

Closes #16151 from nchammas/ui-progressbar-doc.
2016-12-05 14:40:50 -08:00
Marcelo Vanzin 8b325b17ec [SPARK-18547][CORE] Propagate I/O encryption key when executors register.
This change modifies the method used to propagate encryption keys used during
shuffle. Instead of relying on YARN's UserGroupInformation credential propagation,
this change explicitly distributes the key using the messages exchanged between
driver and executor during registration. When RPC encryption is enabled, this means
key propagation is also secure.

This allows shuffle encryption to work in non-YARN mode, which means that it's
easier to write unit tests for areas of the code that are affected by the feature.

The key is stored in the SecurityManager; because there are many instances of
that class used in the code, the key is only guaranteed to exist in the instance
managed by the SparkEnv. This path was chosen to avoid storing the key in the
SparkConf, which would risk having the key being written to disk as part of the
configuration (as, for example, is done when starting YARN applications).

Tested by new and existing unit tests (which were moved from the YARN module to
core), and by running apps with shuffle encryption enabled.

Author: Marcelo Vanzin <vanzin@cloudera.com>

Closes #15981 from vanzin/SPARK-18547.
2016-11-28 21:10:57 -08:00
Mark Grover 237c3b9642 [SPARK-18535][UI][YARN] Redact sensitive information from Spark logs and UI
## What changes were proposed in this pull request?

This patch adds a new property called `spark.secret.redactionPattern` that
allows users to specify a scala regex to decide which Spark configuration
properties and environment variables in driver and executor environments
contain sensitive information. When this regex matches the property or
environment variable name, its value is redacted from the environment UI and
various logs like YARN and event logs.

This change uses this property to redact information from event logs and YARN
logs. It also, updates the UI code to adhere to this property instead of
hardcoding the logic to decipher which properties are sensitive.

Here's an image of the UI post-redaction:
![image](https://cloud.githubusercontent.com/assets/1709451/20506215/4cc30654-b007-11e6-8aee-4cde253fba2f.png)

Here's the text in the YARN logs, post-redaction:
``HADOOP_CREDSTORE_PASSWORD -> *********(redacted)``

Here's the text in the event logs, post-redaction:
``...,"spark.executorEnv.HADOOP_CREDSTORE_PASSWORD":"*********(redacted)","spark.yarn.appMasterEnv.HADOOP_CREDSTORE_PASSWORD":"*********(redacted)",...``

## How was this patch tested?
1. Unit tests are added to ensure that redaction works.
2. A YARN job reading data off of S3 with confidential information
(hadoop credential provider password) being provided in the environment
variables of driver and executor. And, afterwards, logs were grepped to make
sure that no mention of secret password was present. It was also ensure that
the job was able to read the data off of S3 correctly, thereby ensuring that
the sensitive information was being trickled down to the right places to read
the data.
3. The event logs were checked to make sure no mention of secret password was
present.
4. UI environment tab was checked to make sure there was no secret information
being displayed.

Author: Mark Grover <mark@apache.org>

Closes #15971 from markgrover/master_redaction.
2016-11-28 08:59:47 -08:00
Sean Owen 8b1e1088eb
[SPARK-18353][CORE] spark.rpc.askTimeout defalut value is not 120s
## What changes were proposed in this pull request?

Avoid hard-coding spark.rpc.askTimeout to non-default in Client; fix doc about spark.rpc.askTimeout default

## How was this patch tested?

Existing tests

Author: Sean Owen <sowen@cloudera.com>

Closes #15833 from srowen/SPARK-18353.
2016-11-19 11:28:25 +00:00
Weiqing Yang 241e04bc03
[MINOR][DOC] Fix typos in the 'configuration', 'monitoring' and 'sql-programming-guide' documentation
## What changes were proposed in this pull request?

Fix typos in the 'configuration', 'monitoring' and 'sql-programming-guide' documentation.

## How was this patch tested?
Manually.

Author: Weiqing Yang <yangweiqing001@gmail.com>

Closes #15886 from weiqingy/fixTypo.
2016-11-16 10:34:56 +00:00
Weiqing Yang 3af894511b [SPARK-16759][CORE] Add a configuration property to pass caller contexts of upstream applications into Spark
## What changes were proposed in this pull request?

Many applications take Spark as a computing engine and run on it. This PR adds a configuration property `spark.log.callerContext` that can be used by Spark's upstream applications (e.g. Oozie) to set up their caller contexts into Spark. In the end, Spark will combine its own caller context with the caller contexts of its upstream applications, and write them into Yarn RM log and HDFS audit log.

The audit log has a config to truncate the caller contexts passed in (default 128). The caller contexts will be sent over rpc, so it should be concise. The call context written into HDFS log and Yarn log consists of two parts: the information `A` specified by Spark itself and the value `B` of `spark.log.callerContext` property.  Currently `A` typically takes 64 to 74 characters,  so `B` can have up to 50 characters (mentioned in the doc `running-on-yarn.md`)
## How was this patch tested?

Manual tests. I have run some Spark applications with `spark.log.callerContext` configuration in Yarn client/cluster mode, and verified that the caller contexts were written into Yarn RM log and HDFS audit log correctly.

The ways to configure `spark.log.callerContext` property:
- In spark-defaults.conf:

```
spark.log.callerContext  infoSpecifiedByUpstreamApp
```
- In app's source code:

```
val spark = SparkSession
      .builder
      .appName("SparkKMeans")
      .config("spark.log.callerContext", "infoSpecifiedByUpstreamApp")
      .getOrCreate()
```

When running on Spark Yarn cluster mode, the driver is unable to pass 'spark.log.callerContext' to Yarn client and AM since Yarn client and AM have already started before the driver performs `.config("spark.log.callerContext", "infoSpecifiedByUpstreamApp")`.

The following  example shows the command line used to submit a SparkKMeans application and the corresponding records in Yarn RM log and HDFS audit log.

Command:

```
./bin/spark-submit --verbose --executor-cores 3 --num-executors 1 --master yarn --deploy-mode client --class org.apache.spark.examples.SparkKMeans examples/target/original-spark-examples_2.11-2.1.0-SNAPSHOT.jar hdfs://localhost:9000/lr_big.txt 2 5
```

Yarn RM log:

<img width="1440" alt="screen shot 2016-10-19 at 9 12 03 pm" src="https://cloud.githubusercontent.com/assets/8546874/19547050/7d2f278c-9649-11e6-9df8-8d5ff12609f0.png">

HDFS audit log:

<img width="1400" alt="screen shot 2016-10-19 at 10 18 14 pm" src="https://cloud.githubusercontent.com/assets/8546874/19547102/096060ae-964a-11e6-981a-cb28efd5a058.png">

Author: Weiqing Yang <yangweiqing001@gmail.com>

Closes #15563 from weiqingy/SPARK-16759.
2016-11-11 18:36:23 -08:00
Junjie Chen 4f15d94cfe [SPARK-13331] AES support for over-the-wire encryption
## What changes were proposed in this pull request?

DIGEST-MD5 mechanism is used for SASL authentication and secure communication. DIGEST-MD5 mechanism supports 3DES, DES, and RC4 ciphers. However, 3DES, DES and RC4 are slow relatively.

AES provide better performance and security by design and is a replacement for 3DES according to NIST. Apache Common Crypto is a cryptographic library optimized with AES-NI, this patch employ Apache Common Crypto as enc/dec backend for SASL authentication and secure channel to improve spark RPC.
## How was this patch tested?

Unit tests and Integration test.

Author: Junjie Chen <junjie.j.chen@intel.com>

Closes #15172 from cjjnjust/shuffle_rpc_encrypt.
2016-11-11 10:37:58 -08:00
fidato 6f3697136a [SPARK-16575][CORE] partition calculation mismatch with sc.binaryFiles
## What changes were proposed in this pull request?

This Pull request comprises of the critical bug SPARK-16575 changes. This change rectifies the issue with BinaryFileRDD partition calculations as  upon creating an RDD with sc.binaryFiles, the resulting RDD always just consisted of two partitions only.
## How was this patch tested?

The original issue ie. getNumPartitions on binary Files RDD (always having two partitions) was first replicated and then tested upon the changes. Also the unit tests have been checked and passed.

This contribution is my original work and I licence the work to the project under the project's open source license

srowen hvanhovell rxin vanzin skyluc kmader zsxwing datafarmer Please have a look .

Author: fidato <fidato.july13@gmail.com>

Closes #15327 from fidato13/SPARK-16575.
2016-11-07 18:41:17 -08:00
Josh Rosen 6e6298154a [SPARK-17350][SQL] Disable default use of KryoSerializer in Thrift Server
In SPARK-4761 / #3621 (December 2014) we enabled Kryo serialization by default in the Spark Thrift Server. However, I don't think that the original rationale for doing this still holds now that most Spark SQL serialization is now performed via encoders and our UnsafeRow format.

In addition, the use of Kryo as the default serializer can introduce performance problems because the creation of new KryoSerializer instances is expensive and we haven't performed instance-reuse optimizations in several code paths (including DirectTaskResult deserialization).

Given all of this, I propose to revert back to using JavaSerializer as the default serializer in the Thrift Server.

/cc liancheng

Author: Josh Rosen <joshrosen@databricks.com>

Closes #14906 from JoshRosen/disable-kryo-in-thriftserver.
2016-11-01 16:23:47 -07:00