## What changes were proposed in this pull request?
With the new History Server the summary page loads the application list via the the REST API, this makes it very slow to impossible to load with large (10K+) application history. This pr fixes this by adding the `spark.history.ui.maxApplications` conf to limit the number of applications the History Server displays. This is accomplished using a new optional `limit` param for the `applications` api. (Note this only applies to what the summary page displays, all the Application UI's are still accessible if the user knows the App ID and goes to the Application UI directly.)
I've also added a new test for the `limit` param in `HistoryServerSuite.scala`
## How was this patch tested?
Manual testing and dev/run-tests
Author: Alex Bozarth <ajbozart@us.ibm.com>
Closes#14835 from ajbozarth/spark17243.
This patch addresses a minor scheduler performance issue that was introduced in #13603. If you run
```
sc.parallelize(1 to 100000, 100000).map(identity).count()
```
then most of the time ends up being spent in `TaskSetManager.abortIfCompletelyBlacklisted()`:
![image](https://cloud.githubusercontent.com/assets/50748/18071032/428732b0-6e07-11e6-88b2-c9423cd61f53.png)
When processing resource offers, the scheduler uses a nested loop which considers every task set at multiple locality levels:
```scala
for (taskSet <- sortedTaskSets; maxLocality <- taskSet.myLocalityLevels) {
do {
launchedTask = resourceOfferSingleTaskSet(
taskSet, maxLocality, shuffledOffers, availableCpus, tasks)
} while (launchedTask)
}
```
In order to prevent jobs with globally blacklisted tasks from hanging, #13603 added a `taskSet.abortIfCompletelyBlacklisted` call inside of `resourceOfferSingleTaskSet`; if a call to `resourceOfferSingleTaskSet` fails to schedule any tasks, then `abortIfCompletelyBlacklisted` checks whether the tasks are completely blacklisted in order to figure out whether they will ever be schedulable. The problem with this placement of the call is that the last call to `resourceOfferSingleTaskSet` in the `while` loop will return `false`, implying that `resourceOfferSingleTaskSet` will call `abortIfCompletelyBlacklisted`, so almost every call to `resourceOffers` will trigger the `abortIfCompletelyBlacklisted` check for every task set.
Instead, I think that this call should be moved out of the innermost loop and should be called _at most_ once per task set in case none of the task set's tasks can be scheduled at any locality level.
Before this patch's changes, the microbenchmark example that I posted above took 35 seconds to run, but it now only takes 15 seconds after this change.
/cc squito and kayousterhout for review.
Author: Josh Rosen <joshrosen@databricks.com>
Closes#14871 from JoshRosen/bail-early-if-no-cpus.
This patch is using Apache Commons Crypto library to enable shuffle encryption support.
Author: Ferdinand Xu <cheng.a.xu@intel.com>
Author: kellyzly <kellyzly@126.com>
Closes#8880 from winningsix/SPARK-10771.
## What changes were proposed in this pull request?
Clean up unused variables and unused import statements, unnecessary `return` and `toArray`, and some more style improvement, when I walk through the code examples.
## How was this patch tested?
Testet manually on local laptop.
Author: Xin Ren <iamshrek@126.com>
Closes#14836 from keypointt/codeWalkThroughML.
## What changes were proposed in this pull request?
Move Mesos code into a mvn module
## How was this patch tested?
unit tests
manually submitting a client mode and cluster mode job
spark/mesos integration test suite
Author: Michael Gummelt <mgummelt@mesosphere.io>
Closes#14637 from mgummelt/mesos-module.
Make the config reader transient, and initialize it lazily so that
serialization works with both java and kryo (and hopefully any other
custom serializer).
Added unit test to make sure SparkConf remains serializable and the
reader works with both built-in serializers.
Author: Marcelo Vanzin <vanzin@cloudera.com>
Closes#14813 from vanzin/SPARK-17240.
## What changes were proposed in this pull request?
Handle null from Hadoop getLocationInfo directly instead of catching (and logging) exception
## How was this patch tested?
Jenkins tests
Author: Sean Owen <sowen@cloudera.com>
Closes#14760 from srowen/SPARK-17193.
## What changes were proposed in this pull request?
Based on #12990 by tankkyo
Since the History Server currently loads all application's data it can OOM if too many applications have a significant task count. `spark.ui.trimTasks` (default: false) can be set to true to trim tasks by `spark.ui.retainedTasks` (default: 10000)
(This is a "quick fix" to help those running into the problem until a update of how the history server loads app data can be done)
## How was this patch tested?
Manual testing and dev/run-tests
![spark-15083](https://cloud.githubusercontent.com/assets/13952758/17713694/fe82d246-63b0-11e6-9697-b87ea75ff4ef.png)
Author: Alex Bozarth <ajbozart@us.ibm.com>
Closes#14673 from ajbozarth/spark15083.
## What changes were proposed in this pull request?
Update to py4j 0.10.3 to enable JAVA_HOME support
## How was this patch tested?
Pyspark tests
Author: Sean Owen <sowen@cloudera.com>
Closes#14748 from srowen/SPARK-16781.
## What changes were proposed in this pull request?
As Spark 2.0.1 will be released soon (mentioned in the spark dev mailing list), besides the critical bugs, it's better to fix the code style errors before the release.
Before:
```
./dev/lint-java
Checkstyle checks failed at following occurrences:
[ERROR] src/main/java/org/apache/spark/util/collection/unsafe/sort/UnsafeExternalSorter.java:[525] (sizes) LineLength: Line is longer than 100 characters (found 119).
[ERROR] src/main/java/org/apache/spark/examples/sql/streaming/JavaStructuredNetworkWordCount.java:[64] (sizes) LineLength: Line is longer than 100 characters (found 103).
```
After:
```
./dev/lint-java
Using `mvn` from path: /usr/local/bin/mvn
Checkstyle checks passed.
```
## How was this patch tested?
Manual.
Author: Weiqing Yang <yangweiqing001@gmail.com>
Closes#14768 from Sherry302/fixjavastyle.
## What changes were proposed in this pull request?
Jira: https://issues.apache.org/jira/browse/SPARK-16862
`BufferedInputStream` used in `UnsafeSorterSpillReader` uses the default 8k buffer to read data off disk. This PR makes it configurable to improve on disk reads. I have made the default value to be 1 MB as with that value I observed improved performance.
## How was this patch tested?
I am relying on the existing unit tests.
## Performance
After deploying this change to prod and setting the config to 1 mb, there was a 12% reduction in the CPU time and 19.5% reduction in CPU reservation time.
Author: Tejas Patil <tejasp@fb.com>
Closes#14726 from tejasapatil/spill_buffer_2.
## What changes were proposed in this pull request?
This is a straightforward clone of JoshRosen 's original patch. I have follow-up changes to fix block replication for repl-defined classes as well, but those appear to be flaking tests so I'm going to leave that for SPARK-17042
## How was this patch tested?
End-to-end test in ReplSuite (also more tests in DistributedSuite from the original patch).
Author: Eric Liang <ekl@databricks.com>
Closes#14311 from ericl/spark-16550.
## What changes were proposed in this pull request?
`spark.ssl.enabled`=true, but failing to set `spark.ssl.protocol` will fail and throw meaningless exception. `spark.ssl.protocol` is required when `spark.ssl.enabled`.
Improvement: require `spark.ssl.protocol` when initializing SSLContext, otherwise throws an exception to indicate that.
Remove the OrElse("default").
Document this requirement in configure.md
## How was this patch tested?
(Please explain how this patch was tested. E.g. unit tests, integration tests, manual tests)
Manual tests:
Build document and check document
Configure `spark.ssl.enabled` only, it throws exception below:
6/08/16 16:04:37 INFO SecurityManager: SecurityManager: authentication disabled; ui acls disabled; users with view permissions: Set(mwang); groups with view permissions: Set(); users with modify permissions: Set(mwang); groups with modify permissions: Set()
Exception in thread "main" java.lang.IllegalArgumentException: requirement failed: spark.ssl.protocol is required when enabling SSL connections.
at scala.Predef$.require(Predef.scala:224)
at org.apache.spark.SecurityManager.<init>(SecurityManager.scala:285)
at org.apache.spark.deploy.master.Master$.startRpcEnvAndEndpoint(Master.scala:1026)
at org.apache.spark.deploy.master.Master$.main(Master.scala:1011)
at org.apache.spark.deploy.master.Master.main(Master.scala)
Configure `spark.ssl.protocol` and `spark.ssl.protocol`
It works fine.
Author: wm624@hotmail.com <wm624@hotmail.com>
Closes#14674 from wangmiao1981/ssl.
## What changes were proposed in this pull request?
Adding a "(runtime)" to the dependency configuration will set a fallback configuration to be used if the requested one is not found. E.g. with the setting "default(runtime)", Ivy will look for the conf "default" in the module ivy file and if not found will look for the conf "runtime". This can help with the case when using "sbt publishLocal" which does not write a "default" conf in the published ivy.xml file.
## How was this patch tested?
used spark-submit with --packages option for a package published locally with no default conf, and a package resolved from Maven central.
Author: Bryan Cutler <cutlerb@gmail.com>
Closes#13428 from BryanCutler/fallback-package-conf-SPARK-12666.
## What changes were proposed in this pull request?
This PR fixes executor OOM in offheap mode due to bug in Cooperative Memory Management for UnsafeExternSorter. UnsafeExternalSorter was checking if memory page is being used by upstream by comparing the base object address of the current page with the base object address of upstream. However, in case of offheap memory allocation, the base object addresses are always null, so there was no spilling happening and eventually the operator would OOM.
Following is the stack trace this issue addresses -
java.lang.OutOfMemoryError: Unable to acquire 1220 bytes of memory, got 0
at org.apache.spark.memory.MemoryConsumer.allocatePage(MemoryConsumer.java:120)
at org.apache.spark.util.collection.unsafe.sort.UnsafeExternalSorter.acquireNewPageIfNecessary(UnsafeExternalSorter.java:341)
at org.apache.spark.util.collection.unsafe.sort.UnsafeExternalSorter.insertRecord(UnsafeExternalSorter.java:362)
at org.apache.spark.sql.execution.UnsafeExternalRowSorter.insertRow(UnsafeExternalRowSorter.java:93)
at org.apache.spark.sql.execution.UnsafeExternalRowSorter.sort(UnsafeExternalRowSorter.java:170)
## How was this patch tested?
Tested by running the failing job.
Author: Sital Kedia <skedia@fb.com>
Closes#14693 from sitalkedia/fix_offheap_oom.
## What changes were proposed in this pull request?
If the following conditions are satisfied, executors don't load properties in `hdfs-site.xml` and UnknownHostException can be thrown.
(1) NameNode HA is enabled
(2) spark.eventLogging is disabled or logging path is NOT on HDFS
(3) Using Standalone or Mesos for the cluster manager
(4) There are no code to load `HdfsCondition` class in the driver regardless of directly or indirectly.
(5) The tasks access to HDFS
(There might be some more conditions...)
For example, following code causes UnknownHostException when the conditions above are satisfied.
```
sc.textFile("<path on HDFS>").collect
```
```
java.lang.IllegalArgumentException: java.net.UnknownHostException: hacluster
at org.apache.hadoop.security.SecurityUtil.buildTokenService(SecurityUtil.java:378)
at org.apache.hadoop.hdfs.NameNodeProxies.createNonHAProxy(NameNodeProxies.java:310)
at org.apache.hadoop.hdfs.NameNodeProxies.createProxy(NameNodeProxies.java:176)
at org.apache.hadoop.hdfs.DFSClient.<init>(DFSClient.java:678)
at org.apache.hadoop.hdfs.DFSClient.<init>(DFSClient.java:619)
at org.apache.hadoop.hdfs.DistributedFileSystem.initialize(DistributedFileSystem.java:149)
at org.apache.hadoop.fs.FileSystem.createFileSystem(FileSystem.java:2653)
at org.apache.hadoop.fs.FileSystem.access$200(FileSystem.java:92)
at org.apache.hadoop.fs.FileSystem$Cache.getInternal(FileSystem.java:2687)
at org.apache.hadoop.fs.FileSystem$Cache.get(FileSystem.java:2669)
at org.apache.hadoop.fs.FileSystem.get(FileSystem.java:371)
at org.apache.hadoop.fs.FileSystem.get(FileSystem.java:170)
at org.apache.hadoop.mapred.JobConf.getWorkingDirectory(JobConf.java:656)
at org.apache.hadoop.mapred.FileInputFormat.setInputPaths(FileInputFormat.java:438)
at org.apache.hadoop.mapred.FileInputFormat.setInputPaths(FileInputFormat.java:411)
at org.apache.spark.SparkContext$$anonfun$hadoopFile$1$$anonfun$32.apply(SparkContext.scala:986)
at org.apache.spark.SparkContext$$anonfun$hadoopFile$1$$anonfun$32.apply(SparkContext.scala:986)
at org.apache.spark.rdd.HadoopRDD$$anonfun$getJobConf$6.apply(HadoopRDD.scala:177)
at org.apache.spark.rdd.HadoopRDD$$anonfun$getJobConf$6.apply(HadoopRDD.scala:177)
at scala.Option.map(Option.scala:146)
at org.apache.spark.rdd.HadoopRDD.getJobConf(HadoopRDD.scala:177)
at org.apache.spark.rdd.HadoopRDD$$anon$1.<init>(HadoopRDD.scala:213)
at org.apache.spark.rdd.HadoopRDD.compute(HadoopRDD.scala:209)
at org.apache.spark.rdd.HadoopRDD.compute(HadoopRDD.scala:102)
at org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:318)
at org.apache.spark.rdd.RDD.iterator(RDD.scala:282)
at org.apache.spark.rdd.MapPartitionsRDD.compute(MapPartitionsRDD.scala:38)
at org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:318)
at org.apache.spark.rdd.RDD.iterator(RDD.scala:282)
at org.apache.spark.scheduler.ResultTask.runTask(ResultTask.scala:70)
at org.apache.spark.scheduler.Task.run(Task.scala:85)
at org.apache.spark.executor.Executor$TaskRunner.run(Executor.scala:274)
at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1142)
at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:617)
at java.lang.Thread.run(Thread.java:745)
Caused by: java.net.UnknownHostException: hacluster
```
But following code doesn't cause the Exception because `textFile` method loads `HdfsConfiguration` indirectly.
```
sc.textFile("<path on HDFS>").collect
```
When a job includes some operations which access to HDFS, the object of `org.apache.hadoop.Configuration` is wrapped by `SerializableConfiguration`, serialized and broadcasted from driver to executors and each executor deserialize the object with `loadDefaults` false so HDFS related properties should be set before broadcasted.
## How was this patch tested?
Tested manually on my standalone cluster.
Author: Kousuke Saruta <sarutak@oss.nttdata.co.jp>
Closes#13738 from sarutak/SPARK-11227.
JIRA issue link:
https://issues.apache.org/jira/browse/SPARK-16961
Changed one line of Utils.randomizeInPlace to allow elements to stay in place.
Created a unit test that runs a Pearson's chi squared test to determine whether the output diverges significantly from a uniform distribution.
Author: Nick Lavers <nick.lavers@videoamp.com>
Closes#14551 from nicklavers/SPARK-16961-randomizeInPlace.
A review of the code, working back from Hadoop's `FileSystem.exists()` and `FileSystem.isDirectory()` code, then removing uses of the calls when superfluous.
1. delete is harmless if called on a nonexistent path, so don't do any checks before deletes
1. any `FileSystem.exists()` check before `getFileStatus()` or `open()` is superfluous as the operation itself does the check. Instead the `FileNotFoundException` is caught and triggers the downgraded path. When a `FileNotFoundException` was thrown before, the code still creates a new FNFE with the error messages. Though now the inner exceptions are nested, for easier diagnostics.
Initially, relying on Jenkins test runs.
One troublespot here is that some of the codepaths are clearly error situations; it's not clear that they have coverage anyway. Trying to create the failure conditions in tests would be ideal, but it will also be hard.
Author: Steve Loughran <stevel@apache.org>
Closes#14371 from steveloughran/cloud/SPARK-16736-superfluous-fs-calls.
Both core and sql have slightly different code that does variable substitution
of config values. This change refactors that code and encapsulates the logic
of reading config values and expading variables in a new helper class, which
can be configured so that both core and sql can use it without losing existing
functionality, and allows for easier testing and makes it easier to add more
features in the future.
Tested with existing and new unit tests, and by running spark-shell with
some configs referencing variables and making sure it behaved as expected.
Author: Marcelo Vanzin <vanzin@cloudera.com>
Closes#14468 from vanzin/SPARK-16671.
- Make mesos coarse grained scheduler accept port offers and pre-assign ports
Previous attempt was for fine grained: https://github.com/apache/spark/pull/10808
Author: Stavros Kontopoulos <stavros.kontopoulos@lightbend.com>
Author: Stavros Kontopoulos <stavros.kontopoulos@typesafe.com>
Closes#11157 from skonto/honour_ports_coarse.
## What changes were proposed in this pull request?
* Fixed one typo `"overriden"` as `"overridden"`, also make sure no other same typo.
* Fixed one typo `"lowcase"` as `"lowercase"`, also make sure no other same typo.
## How was this patch tested?
Since the change is very tiny, so I just make sure compilation is successful.
I am new to the spark community, please feel free to let me do other necessary steps.
Thanks in advance!
----
Updated: Found another typo `lowcase` later and fixed then in the same patch
Author: Zhenglai Zhang <zhenglaizhang@hotmail.com>
Closes#14622 from zhenglaizhang/fixtypo.
## What changes were proposed in this pull request?
Fixed warnings below after scanning through warnings during build:
```
[warn] /home/jenkins/workspace/SparkPullRequestBuilder/core/src/test/scala/org/apache/spark/scheduler/cluster/mesos/MesosClusterSchedulerSuite.scala:34: imported `Utils' is permanently hidden by definition of object Utils in package mesos
[warn] import org.apache.spark.scheduler.cluster.mesos.Utils
[warn] ^
```
and
```
[warn] /home/jenkins/workspace/SparkPullRequestBuilder/core/src/test/scala/org/apache/spark/storage/DiskBlockObjectWriterSuite.scala:113: method shuffleBytesWritten in class ShuffleWriteMetrics is deprecated: use bytesWritten instead
[warn] assert(writeMetrics.shuffleBytesWritten === file.length())
[warn] ^
[warn] /home/jenkins/workspace/SparkPullRequestBuilder/core/src/test/scala/org/apache/spark/storage/DiskBlockObjectWriterSuite.scala:119: method shuffleBytesWritten in class ShuffleWriteMetrics is deprecated: use bytesWritten instead
[warn] assert(writeMetrics.shuffleBytesWritten === file.length())
[warn] ^
[warn] /home/jenkins/workspace/SparkPullRequestBuilder/core/src/test/scala/org/apache/spark/storage/DiskBlockObjectWriterSuite.scala:131: method shuffleBytesWritten in class ShuffleWriteMetrics is deprecated: use bytesWritten instead
[warn] assert(writeMetrics.shuffleBytesWritten === file.length())
[warn] ^
[warn] /home/jenkins/workspace/SparkPullRequestBuilder/core/src/test/scala/org/apache/spark/storage/DiskBlockObjectWriterSuite.scala:135: method shuffleBytesWritten in class ShuffleWriteMetrics is deprecated: use bytesWritten instead
[warn] assert(writeMetrics.shuffleBytesWritten === file.length())
[warn] ^
```
## How was this patch tested?
Tested manually on local laptop.
Author: Xin Ren <iamshrek@126.com>
Closes#14609 from keypointt/suiteWarnings.
## What changes were proposed in this pull request?
In our cluster, sometimes the sql output maybe overrided. When I submit some sql, all insert into the same table, and the sql will cost less one minute, here is the detail,
1 sql1, 11:03 insert into table.
2 sql2, 11:04:11 insert into table.
3 sql3, 11:04:48 insert into table.
4 sql4, 11:05 insert into table.
5 sql5, 11:06 insert into table.
The sql3's output file will override the sql2's output file. here is the log:
```
16/05/04 11:04:11 INFO hive.SparkHiveHadoopWriter: XXfinalPath=hdfs://tl-sng-gdt-nn-tdw.tencent-distribute.com:54310/tmp/assorz/tdw-tdwadmin/20160504/04559505496526517_-1_1204544348/10000/_tmp.p_20160428/attempt_201605041104_0001_m_000000_1
16/05/04 11:04:48 INFO hive.SparkHiveHadoopWriter: XXfinalPath=hdfs://tl-sng-gdt-nn-tdw.tencent-distribute.com:54310/tmp/assorz/tdw-tdwadmin/20160504/04559505496526517_-1_212180468/10000/_tmp.p_20160428/attempt_201605041104_0001_m_000000_1
```
The reason is the output file use SimpleDateFormat("yyyyMMddHHmm"), if two sql insert into the same table in the same minute, the output will be overrite. I think we should change dateFormat to "yyyyMMddHHmmss", in our cluster, we can't finished a sql in one second.
## How was this patch tested?
(Please explain how this patch was tested. E.g. unit tests, integration tests, manual tests)
(If this patch involves UI changes, please attach a screenshot; otherwise, remove this)
Author: hongshen <shenh062326@126.com>
Closes#14574 from shenh062326/SPARK-16985.
Before this PR, user have to export environment variable to specify the python of driver & executor which is not so convenient for users. This PR is trying to allow user to specify python through configuration "--pyspark-driver-python" & "--pyspark-executor-python"
Manually test in local & yarn mode for pyspark-shell and pyspark batch mode.
Author: Jeff Zhang <zjffdu@apache.org>
Closes#13146 from zjffdu/SPARK-13081.
## What changes were proposed in this pull request?
Added shutdown hook to DriverRunner to kill the driver process in case the Worker JVM exits suddenly and the `WorkerWatcher` was unable to properly catch this. Did some cleanup to consolidate driver state management and setting of finalized vars within the running thread.
## How was this patch tested?
Added unit tests to verify that final state and exception variables are set accordingly for successfull, failed, and errors in the driver process. Retrofitted existing test to verify killing of mocked process ends with the correct state and stops properly
Manually tested (with deploy-mode=cluster) that the shutdown hook is called by forcibly exiting the `Worker` and various points in the code with the `WorkerWatcher` both disabled and enabled. Also, manually killed the driver through the ui and verified that the `DriverRunner` interrupted, killed the process and exited properly.
Author: Bryan Cutler <cutlerb@gmail.com>
Closes#11746 from BryanCutler/DriverRunner-shutdown-hook-SPARK-13602.
## What changes were proposed in this pull request?
remove requirement to set spark.mesos.executor.home when spark.executor.uri is used
## How was this patch tested?
unit tests
Author: Michael Gummelt <mgummelt@mesosphere.io>
Closes#14552 from mgummelt/fix-spark-home.
## What changes were proposed in this pull request?
Add a configurable token manager for Spark on running on yarn.
### Current Problems ###
1. Supported token provider is hard-coded, currently only hdfs, hbase and hive are supported and it is impossible for user to add new token provider without code changes.
2. Also this problem exits in timely token renewer and updater.
### Changes In This Proposal ###
In this proposal, to address the problems mentioned above and make the current code more cleaner and easier to understand, mainly has 3 changes:
1. Abstract a `ServiceTokenProvider` as well as `ServiceTokenRenewable` interface for token provider. Each service wants to communicate with Spark through token way needs to implement this interface.
2. Provide a `ConfigurableTokenManager` to manage all the register token providers, also token renewer and updater. Also this class offers the API for other modules to obtain tokens, get renewal interval and so on.
3. Implement 3 built-in token providers `HDFSTokenProvider`, `HiveTokenProvider` and `HBaseTokenProvider` to keep the same semantics as supported today. Whether to load in these built-in token providers is controlled by configuration "spark.yarn.security.tokens.${service}.enabled", by default for all the built-in token providers are loaded.
### Behavior Changes ###
For the end user there's no behavior change, we still use the same configuration `spark.yarn.security.tokens.${service}.enabled` to decide which token provider is enabled (hbase or hive).
For user implemented token provider (assume the name of token provider is "test") needs to add into this class should have two configurations:
1. `spark.yarn.security.tokens.test.enabled` to true
2. `spark.yarn.security.tokens.test.class` to the full qualified class name.
So we still keep the same semantics as current code while add one new configuration.
### Current Status ###
- [x] token provider interface and management framework.
- [x] implement built-in token providers (hdfs, hbase, hive).
- [x] Coverage of unit test.
- [x] Integrated test with security cluster.
## How was this patch tested?
Unit test and integrated test.
Please suggest and review, any comment is greatly appreciated.
Author: jerryshao <sshao@hortonworks.com>
Closes#14065 from jerryshao/SPARK-16342.
When large number of jobs are run concurrently with Spark thrift server, thrift server starts running at high CPU due to GC pressure. Job UI retention causes memory pressure with large jobs. https://issues.apache.org/jira/secure/attachment/12783302/SPARK-12920.profiler_job_progress_listner.png has the profiler snapshot. This PR honors `spark.ui.retainedStages` strictly to reduce memory pressure.
Manual and unit tests
Author: Rajesh Balamohan <rbalamohan@apache.org>
Closes#10846 from rajeshbalamohan/SPARK-12920.
## What changes were proposed in this pull request?
The base class `SpecificParquetRecordReaderBase` used for vectorized parquet reader will try to get pushed-down filters from the given configuration. This pushed-down filters are used for RowGroups-level filtering. However, we don't set up the filters to push down into the configuration. In other words, the filters are not actually pushed down to do RowGroups-level filtering. This patch is to fix this and tries to set up the filters for pushing down to configuration for the reader.
The benchmark that excludes the time of writing Parquet file:
test("Benchmark for Parquet") {
val N = 500 << 12
withParquetTable((0 until N).map(i => (101, i)), "t") {
val benchmark = new Benchmark("Parquet reader", N)
benchmark.addCase("reading Parquet file", 10) { iter =>
sql("SELECT _1 FROM t where t._1 < 100").collect()
}
benchmark.run()
}
}
`withParquetTable` in default will run tests for vectorized reader non-vectorized readers. I only let it run vectorized reader.
When we set the block size of parquet as 1024 to have multiple row groups. The benchmark is:
Before this patch:
The retrieved row groups: 8063
Java HotSpot(TM) 64-Bit Server VM 1.8.0_71-b15 on Linux 3.19.0-25-generic
Intel(R) Core(TM) i7-5557U CPU 3.10GHz
Parquet reader: Best/Avg Time(ms) Rate(M/s) Per Row(ns) Relative
------------------------------------------------------------------------------------------------
reading Parquet file 825 / 1233 2.5 402.6 1.0X
After this patch:
The retrieved row groups: 0
Java HotSpot(TM) 64-Bit Server VM 1.8.0_71-b15 on Linux 3.19.0-25-generic
Intel(R) Core(TM) i7-5557U CPU 3.10GHz
Parquet reader: Best/Avg Time(ms) Rate(M/s) Per Row(ns) Relative
------------------------------------------------------------------------------------------------
reading Parquet file 306 / 503 6.7 149.6 1.0X
Next, I run the benchmark for non-pushdown case using the same benchmark code but with disabled pushdown configuration. This time the parquet block size is default value.
Before this patch:
Java HotSpot(TM) 64-Bit Server VM 1.8.0_71-b15 on Linux 3.19.0-25-generic
Intel(R) Core(TM) i7-5557U CPU 3.10GHz
Parquet reader: Best/Avg Time(ms) Rate(M/s) Per Row(ns) Relative
------------------------------------------------------------------------------------------------
reading Parquet file 136 / 238 15.0 66.5 1.0X
After this patch:
Java HotSpot(TM) 64-Bit Server VM 1.8.0_71-b15 on Linux 3.19.0-25-generic
Intel(R) Core(TM) i7-5557U CPU 3.10GHz
Parquet reader: Best/Avg Time(ms) Rate(M/s) Per Row(ns) Relative
------------------------------------------------------------------------------------------------
reading Parquet file 124 / 193 16.5 60.7 1.0X
For non-pushdown case, from the results, I think this patch doesn't affect normal code path.
I've manually output the `totalRowCount` in `SpecificParquetRecordReaderBase` to see if this patch actually filter the row-groups. When running the above benchmark:
After this patch:
`totalRowCount = 0`
Before this patch:
`totalRowCount = 1024000`
## How was this patch tested?
Existing tests should be passed.
Author: Liang-Chi Hsieh <simonh@tw.ibm.com>
Closes#13701 from viirya/vectorized-reader-push-down-filter2.
## What changes were proposed in this pull request?
- enable setting default properties for all jobs submitted through the dispatcher [SPARK-16927]
- remove duplication of conf vars on cluster submitted jobs [SPARK-16923] (this is a small fix, so I'm including in the same PR)
## How was this patch tested?
mesos/spark integration test suite
manual testing
Author: Timothy Chen <tnachen@gmail.com>
Closes#14511 from mgummelt/override-props.
In many terminals double-clicking and dragging also includes the trailing period. Simply remove this to make the value more easily copy/pasteable.
Example value:
`hdfs://mybox-123.net.example.com:8020/spark-events.`
Author: Andrew Ash <andrew@andrewash.com>
Closes#14566 from ash211/patch-9.
## What changes were proposed in this pull request?
This patch introduces a new configuration, `spark.deploy.maxExecutorRetries`, to let users configure an obscure behavior in the standalone master where the master will kill Spark applications which have experienced too many back-to-back executor failures. The current setting is a hardcoded constant (10); this patch replaces that with a new cluster-wide configuration.
**Background:** This application-killing was added in 6b5980da79 (from September 2012) and I believe that it was designed to prevent a faulty application whose executors could never launch from DOS'ing the Spark cluster via an infinite series of executor launch attempts. In a subsequent patch (#1360), this feature was refined to prevent applications which have running executors from being killed by this code path.
**Motivation for making this configurable:** Previously, if a Spark Standalone application experienced more than `ApplicationState.MAX_NUM_RETRY` executor failures and was left with no executors running then the Spark master would kill that application, but this behavior is problematic in environments where the Spark executors run on unstable infrastructure and can all simultaneously die. For instance, if your Spark driver runs on an on-demand EC2 instance while all workers run on ephemeral spot instances then it's possible for all executors to die at the same time while the driver stays alive. In this case, it may be desirable to keep the Spark application alive so that it can recover once new workers and executors are available. In order to accommodate this use-case, this patch modifies the Master to never kill faulty applications if `spark.deploy.maxExecutorRetries` is negative.
I'd like to merge this patch into master, branch-2.0, and branch-1.6.
## How was this patch tested?
I tested this manually using `spark-shell` and `local-cluster` mode. This is a tricky feature to unit test and historically this code has not changed very often, so I'd prefer to skip the additional effort of adding a testing framework and would rather rely on manual tests and review for now.
Author: Josh Rosen <joshrosen@databricks.com>
Closes#14544 from JoshRosen/add-setting-for-max-executor-failures.
## What changes were proposed in this pull request?
Links the Spark Mesos Dispatcher UI to the history server UI
- adds spark.mesos.dispatcher.historyServer.url
- explicitly generates frameworkIDs for the launched drivers, so the dispatcher knows how to correlate drivers and frameworkIDs
## How was this patch tested?
manual testing
Author: Michael Gummelt <mgummelt@mesosphere.io>
Author: Sergiusz Urbaniak <sur@mesosphere.io>
Closes#14414 from mgummelt/history-server.
## What changes were proposed in this pull request?
Spark applications running on Mesos throw exception upon exit. For details, refer to https://issues.apache.org/jira/browse/SPARK-16522.
I am not sure if there is any better fix, so wait for review comments.
## How was this patch tested?
Manual test. Observed that the exception is gone upon application exit.
Author: Sun Rui <sunrui2016@gmail.com>
Closes#14175 from sun-rui/SPARK-16522.
## What changes were proposed in this pull request?
SparkContext.getOrCreate shouldn't warn about ignored config if
- it wasn't ignored because a new context is created with it or
- no config was actually provided
## How was this patch tested?
Jenkins + existing tests.
Author: Sean Owen <sowen@cloudera.com>
Closes#14533 from srowen/SPARK-16606.
## What changes were proposed in this pull request?
Avoid using postfix operation for command execution in SQLQuerySuite where it wasn't whitelisted and audit existing whitelistings removing postfix operators from most places. Some notable places where postfix operation remains is in the XML parsing & time units (seconds, millis, etc.) where it arguably can improve readability.
## How was this patch tested?
Existing tests.
Author: Holden Karau <holden@us.ibm.com>
Closes#14407 from holdenk/SPARK-16779.
## What changes were proposed in this pull request?
RequestExecutors and killExecutor are public developer APIs for managing the number of executors allocated to the SparkContext. For consistency, requestTotalExecutors should also be a public Developer API, as it provides similar functionality. In fact, using requestTotalExecutors is more convenient that requestExecutors as the former is idempotent and the latter is not.
Author: Tathagata Das <tathagata.das1565@gmail.com>
Closes#14541 from tdas/SPARK-16953.
## What changes were proposed in this pull request?
Currently the update interval for the console progress bar is hardcoded. This PR makes it configurable for users.
## How was this patch tested?
Ran a long running job and with a high value of update interval, the updates were shown less frequently.
Author: Tejas Patil <tejasp@fb.com>
Closes#14507 from tejasapatil/SPARK-16919.
As per the postgreSQL JDBC driver [implementation](ab2a6d8908/pgjdbc/src/main/java/org/postgresql/PGProperty.java (L99)), the default record fetch size is 0(which means, it caches all record)
This fix enforces default record fetch size as 10 to enable streaming of data.
Author: Prince J Wesley <princejohnwesley@gmail.com>
Closes#14502 from princejwesley/spark-postgres.
## What changes were proposed in this pull request?
This patch fixes a bug in Spark's standalone Master which could cause applications to hang if tasks cause executors to exit with zero exit codes.
As an example of the bug, run
```
sc.parallelize(1 to 1, 1).foreachPartition { _ => System.exit(0) }
```
on a standalone cluster which has a single Spark application. This will cause all executors to die but those executors won't be replaced unless another Spark application or worker joins or leaves the cluster (or if an executor exits with a non-zero exit code). This behavior is caused by a bug in how the Master handles the `ExecutorStateChanged` event: the current implementation calls `schedule()` only if the executor exited with a non-zero exit code, so a task which causes a JVM to unexpectedly exit "cleanly" will skip the `schedule()` call.
This patch addresses this by modifying the `ExecutorStateChanged` to always unconditionally call `schedule()`. This should be safe because it should always be safe to call `schedule()`; adding extra `schedule()` calls can only affect performance and should not introduce correctness bugs.
## How was this patch tested?
I added a regression test in `DistributedSuite`.
Author: Josh Rosen <joshrosen@databricks.com>
Closes#14510 from JoshRosen/SPARK-16925.
## What changes were proposed in this pull request?
Mask `spark.authenticate.secret` on Spark environment page (Web UI).
This is addition to https://github.com/apache/spark/pull/14409
## How was this patch tested?
`./dev/run-tests`
[info] ScalaTest
[info] Run completed in 1 hour, 8 minutes, 38 seconds.
[info] Total number of tests run: 2166
[info] Suites: completed 65, aborted 0
[info] Tests: succeeded 2166, failed 0, canceled 0, ignored 590, pending 0
[info] All tests passed.
Author: Artur Sukhenko <artur.sukhenko@gmail.com>
Closes#14484 from Devian-ua/SPARK-16796.
## What changes were proposed in this pull request?
As reported by Bryan Cutler on the mailing list, AccumulatorV2 does not have a += method, yet the documentation still references it.
## How was this patch tested?
N/A
Author: petermaxlee <petermaxlee@gmail.com>
Closes#14466 from petermaxlee/accumulator.
## What changes were proposed in this pull request?
Add the missing args-checking for randomSplit and sample
## How was this patch tested?
unit tests
Author: Zheng RuiFeng <ruifengz@foxmail.com>
Closes#14478 from zhengruifeng/fix_randomSplit.
## What changes were proposed in this pull request?
SpillReader NPE when spillFile has no data. See follow logs:
16/07/31 20:54:04 INFO collection.ExternalSorter: spill memory to file:/data4/yarnenv/local/usercache/tesla/appcache/application_1465785263942_56138/blockmgr-db5f46c3-d7a4-4f93-8b77-565e469696fb/09/temp_shuffle_ec3ece08-4569-4197-893a-4a5dfcbbf9fa, fileSize:0.0 B
16/07/31 20:54:04 WARN memory.TaskMemoryManager: leak 164.3 MB memory from org.apache.spark.util.collection.ExternalSorter3db4b52d
16/07/31 20:54:04 ERROR executor.Executor: Managed memory leak detected; size = 190458101 bytes, TID = 2358516/07/31 20:54:04 ERROR executor.Executor: Exception in task 1013.0 in stage 18.0 (TID 23585)
java.lang.NullPointerException
at org.apache.spark.util.collection.ExternalSorter$SpillReader.cleanup(ExternalSorter.scala:624)
at org.apache.spark.util.collection.ExternalSorter$SpillReader.nextBatchStream(ExternalSorter.scala:539)
at org.apache.spark.util.collection.ExternalSorter$SpillReader.<init>(ExternalSorter.scala:507)
at org.apache.spark.util.collection.ExternalSorter$SpillableIterator.spill(ExternalSorter.scala:816)
at org.apache.spark.util.collection.ExternalSorter.forceSpill(ExternalSorter.scala:251)
at org.apache.spark.util.collection.Spillable.spill(Spillable.scala:109)
at org.apache.spark.memory.TaskMemoryManager.acquireExecutionMemory(TaskMemoryManager.java:154)
at org.apache.spark.memory.TaskMemoryManager.allocatePage(TaskMemoryManager.java:249)
at org.apache.spark.memory.MemoryConsumer.allocatePage(MemoryConsumer.java:112)
at org.apache.spark.shuffle.sort.ShuffleExternalSorter.acquireNewPageIfNecessary(ShuffleExternalSorter.java:346)
at org.apache.spark.shuffle.sort.ShuffleExternalSorter.insertRecord(ShuffleExternalSorter.java:367)
at org.apache.spark.shuffle.sort.UnsafeShuffleWriter.insertRecordIntoSorter(UnsafeShuffleWriter.java:237)
at org.apache.spark.shuffle.sort.UnsafeShuffleWriter.write(UnsafeShuffleWriter.java:164)
at org.apache.spark.scheduler.ShuffleMapTask.runTask(ShuffleMapTask.scala:73)
at org.apache.spark.scheduler.ShuffleMapTask.runTask(ShuffleMapTask.scala:41)
at org.apache.spark.scheduler.Task.run(Task.scala:89)
at org.apache.spark.executor.Executor$TaskRunner.run(Executor.scala:227)
at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1145)
at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:615)
at java.lang.Thread.run(Thread.java:744)
16/07/31 20:54:30 INFO executor.Executor: Executor is trying to kill task 1090.1 in stage 18.0 (TID 23793)
16/07/31 20:54:30 INFO executor.CoarseGrainedExecutorBackend: Driver commanded a shutdown
## How was this patch tested?
Manual test.
Author: sharkd <sharkd.tu@gmail.com>
Author: sharkdtu <sharkdtu@tencent.com>
Closes#14479 from sharkdtu/master.
## What changes were proposed in this pull request?
Mask spark.ssl.keyPassword, spark.ssl.keyStorePassword, spark.ssl.trustStorePassword in Web UI environment page.
(Changes their values to ***** in env. page)
## How was this patch tested?
I've built spark, run spark shell and checked that this values have been masked with *****.
Also run tests:
./dev/run-tests
[info] ScalaTest
[info] Run completed in 1 hour, 9 minutes, 5 seconds.
[info] Total number of tests run: 2166
[info] Suites: completed 65, aborted 0
[info] Tests: succeeded 2166, failed 0, canceled 0, ignored 590, pending 0
[info] All tests passed.
![mask](https://cloud.githubusercontent.com/assets/15244468/17262154/7641e132-55e2-11e6-8a6c-30ead77c7372.png)
Author: Artur Sukhenko <artur.sukhenko@gmail.com>
Closes#14409 from Devian-ua/maskpass.