Currently, when max number of executor failures reached the `maxNumExecutorFailures`, `ApplicationMaster` will be killed and re-register another one.This time, `YarnAllocator` will be created a new instance.
But, the value of property `executorIdCounter` in `YarnAllocator` will reset to `0`. Then the Id of new executor will starting from `1`. This will confuse with the executor has already created before, which will cause FetchFailedException.
This situation is just in yarn client mode, so this is an issue in yarn client mode. For more details, [link to jira issues SPARK-12864](https://issues.apache.org/jira/browse/SPARK-12864)
This PR introduce a mechanism to initialize `executorIdCounter` after `ApplicationMaster` killed.
Author: zhonghaihua <793507405@qq.com>
Closes#10794 from zhonghaihua/initExecutorIdCounterAfterAMKilled.
JIRA: https://issues.apache.org/jira/browse/SPARK-13674
## What changes were proposed in this pull request?
Sample operator doesn't support wholestage codegen now. This pr is to add support to it.
## How was this patch tested?
A test is added into `BenchmarkWholeStageCodegen`. Besides, all tests should be passed.
Author: Liang-Chi Hsieh <simonh@tw.ibm.com>
Author: Liang-Chi Hsieh <viirya@gmail.com>
Closes#11517 from viirya/add-wholestage-sample.
## What changes were proposed in this pull request?
Currently in Spark on YARN, configurations can be passed through SparkConf, env and command arguments, some parts are duplicated, like client argument and SparkConf. So here propose to simplify the command arguments.
## How was this patch tested?
This patch is tested manually with unit test.
CC vanzin tgravescs , please help to suggest this proposal. The original purpose of this JIRA is to remove `ClientArguments`, through refactoring some arguments like `--class`, `--arg` are not so easy to replace, so here I remove the most part of command line arguments, only keep the minimal set.
Author: jerryshao <sshao@hortonworks.com>
Closes#11603 from jerryshao/SPARK-12343.
## What changes were proposed in this pull request?
This PR support multiple Python UDFs within single batch, also improve the performance.
```python
>>> from pyspark.sql.types import IntegerType
>>> sqlContext.registerFunction("double", lambda x: x * 2, IntegerType())
>>> sqlContext.registerFunction("add", lambda x, y: x + y, IntegerType())
>>> sqlContext.sql("SELECT double(add(1, 2)), add(double(2), 1)").explain(True)
== Parsed Logical Plan ==
'Project [unresolvedalias('double('add(1, 2)), None),unresolvedalias('add('double(2), 1), None)]
+- OneRowRelation$
== Analyzed Logical Plan ==
double(add(1, 2)): int, add(double(2), 1): int
Project [double(add(1, 2))#14,add(double(2), 1)#15]
+- Project [double(add(1, 2))#14,add(double(2), 1)#15]
+- Project [pythonUDF0#16 AS double(add(1, 2))#14,pythonUDF0#18 AS add(double(2), 1)#15]
+- EvaluatePython [add(pythonUDF1#17, 1)], [pythonUDF0#18]
+- EvaluatePython [double(add(1, 2)),double(2)], [pythonUDF0#16,pythonUDF1#17]
+- OneRowRelation$
== Optimized Logical Plan ==
Project [pythonUDF0#16 AS double(add(1, 2))#14,pythonUDF0#18 AS add(double(2), 1)#15]
+- EvaluatePython [add(pythonUDF1#17, 1)], [pythonUDF0#18]
+- EvaluatePython [double(add(1, 2)),double(2)], [pythonUDF0#16,pythonUDF1#17]
+- OneRowRelation$
== Physical Plan ==
WholeStageCodegen
: +- Project [pythonUDF0#16 AS double(add(1, 2))#14,pythonUDF0#18 AS add(double(2), 1)#15]
: +- INPUT
+- !BatchPythonEvaluation [add(pythonUDF1#17, 1)], [pythonUDF0#16,pythonUDF1#17,pythonUDF0#18]
+- !BatchPythonEvaluation [double(add(1, 2)),double(2)], [pythonUDF0#16,pythonUDF1#17]
+- Scan OneRowRelation[]
```
## How was this patch tested?
Added new tests.
Using the following script to benchmark 1, 2 and 3 udfs,
```
df = sqlContext.range(1, 1 << 23, 1, 4)
double = F.udf(lambda x: x * 2, LongType())
print df.select(double(df.id)).count()
print df.select(double(df.id), double(df.id + 1)).count()
print df.select(double(df.id), double(df.id + 1), double(df.id + 2)).count()
```
Here is the results:
N | Before | After | speed up
---- |------------ | -------------|------
1 | 22 s | 7 s | 3.1X
2 | 38 s | 13 s | 2.9X
3 | 58 s | 16 s | 3.6X
This benchmark ran locally with 4 CPUs. For 3 UDFs, it launched 12 Python before before this patch, 4 process after this patch. After this patch, it will use less memory for multiple UDFs than before (less buffering).
Author: Davies Liu <davies@databricks.com>
Closes#12057 from davies/multi_udfs.
## What changes were proposed in this pull request?
Track executor information like host and port, cache size, running tasks.
TODO: tests
## How was this patch tested?
N/A
Author: Wenchen Fan <wenchen@databricks.com>
Closes#11888 from cloud-fan/status-tracker.
## What changes were proposed in this pull request?
This PR try to use `incUpdatedBlockStatuses ` to update the `updatedBlockStatuses ` when removing blocks, making sure `BlockManager` correctly updates `updatedBlockStatuses`
## How was this patch tested?
test("updated block statuses") in BlockManagerSuite.scala
Author: jeanlyn <jeanlyn92@gmail.com>
Closes#12091 from jeanlyn/updateBlock.
## What changes were proposed in this pull request?
Redirect error message to logWarning
## How was this patch tested?
Unit tests, manual tests
JoshRosen
Author: Nishkam Ravi <nishkamravi@gmail.com>
Closes#12052 from nishkamravi2/master_warning.
## What changes were proposed in this pull request?
For MemoryMode.OFF_HEAP, Unsafe.getInt etc. are used with no restriction.
However, the Oracle implementation uses these methods only if the class variable unaligned (commented as "Cached unaligned-access capability") is true, which seems to be calculated whether the architecture is i386, x86, amd64, or x86_64.
I think we should perform similar check for the use of Unsafe.
Reference: https://github.com/netty/netty/blob/4.1/common/src/main/java/io/netty/util/internal/PlatformDependent0.java#L112
## How was this patch tested?
Unit test suite
Author: tedyu <yuzhihong@gmail.com>
Closes#11943 from tedyu/master.
## What changes were proposed in this pull request?
This PR brings the support for chained Python UDFs, for example
```sql
select udf1(udf2(a))
select udf1(udf2(a) + 3)
select udf1(udf2(a) + udf3(b))
```
Also directly chained unary Python UDFs are put in single batch of Python UDFs, others may require multiple batches.
For example,
```python
>>> sqlContext.sql("select double(double(1))").explain()
== Physical Plan ==
WholeStageCodegen
: +- Project [pythonUDF#10 AS double(double(1))#9]
: +- INPUT
+- !BatchPythonEvaluation double(double(1)), [pythonUDF#10]
+- Scan OneRowRelation[]
>>> sqlContext.sql("select double(double(1) + double(2))").explain()
== Physical Plan ==
WholeStageCodegen
: +- Project [pythonUDF#19 AS double((double(1) + double(2)))#16]
: +- INPUT
+- !BatchPythonEvaluation double((pythonUDF#17 + pythonUDF#18)), [pythonUDF#17,pythonUDF#18,pythonUDF#19]
+- !BatchPythonEvaluation double(2), [pythonUDF#17,pythonUDF#18]
+- !BatchPythonEvaluation double(1), [pythonUDF#17]
+- Scan OneRowRelation[]
```
TODO: will support multiple unrelated Python UDFs in one batch (another PR).
## How was this patch tested?
Added new unit tests for chained UDFs.
Author: Davies Liu <davies@databricks.com>
Closes#12014 from davies/py_udfs.
Add a new api endpoint `/api/v1/version` to retrieve various version info. This PR only adds support for finding the current spark version, however other version info such as jvm or scala versions can easily be added.
Author: Jakob Odersky <jodersky@gmail.com>
Closes#10760 from jodersky/SPARK-10570.
## What changes were proposed in this pull request?
The event timeline doesn't show on job page if an executor is removed with a multiple line reason. This PR replaces all new line characters in the reason string with spaces.
![timelineerror](https://cloud.githubusercontent.com/assets/9278199/14100211/5fd4cd30-f5be-11e5-9cea-f32651a4cd62.jpg)
## How was this patch tested?
Verified on the Web UI.
Author: Carson Wang <carson.wang@intel.com>
Closes#12029 from carsonwang/eventTimeline.
## What changes were proposed in this pull request?
Refactor RRDD by separating the common logic interacting with the R worker to a new class RRunner, which can be used to evaluate R UDFs.
Now RRDD relies on RRuner for RDD computation and RRDD could be reomved if we want to remove RDD API in SparkR later.
## How was this patch tested?
dev/lint-r
SparkR unit tests
Author: Sun Rui <rui.sun@intel.com>
Closes#12024 from sun-rui/SPARK-12792_new.
## What changes were proposed in this pull request?
This is a follow-up fix of #9963, in #9963 we handle this stale states clean-up work only for dynamic allocation enabled scenario. Here we should also clean the states in `CoarseGrainedSchedulerBackend` for dynamic allocation disabled scenario.
Please review, CC andrewor14 lianhuiwang , thanks a lot.
## How was this patch tested?
Run the unit test locally, also with integration test manually.
Author: jerryshao <sshao@hortonworks.com>
Closes#11366 from jerryshao/SPARK-13447.
## What changes were proposed in this pull request?
We have a streaming job using `FlumePollInputStream` always driver OOM after few days, here is some driver heap dump before OOM
```
num #instances #bytes class name
----------------------------------------------
1: 13845916 553836640 org.apache.spark.storage.BlockStatus
2: 14020324 336487776 org.apache.spark.storage.StreamBlockId
3: 13883881 333213144 scala.collection.mutable.DefaultEntry
4: 8907 89043952 [Lscala.collection.mutable.HashEntry;
5: 62360 65107352 [B
6: 163368 24453904 [Ljava.lang.Object;
7: 293651 20342664 [C
...
```
`BlockStatus` and `StreamBlockId` keep on growing, and the driver OOM in the end.
After investigated, i found the `executorIdToStorageStatus` in `StorageStatusListener` seems never remove the blocks from `StorageStatus`.
In order to fix the issue, i try to use `onBlockUpdated` replace `onTaskEnd ` , so we can update the block informations(add blocks, drop the block from memory to disk and delete the blocks) in time.
## How was this patch tested?
Existing unit tests and manual tests
Author: jeanlyn <jeanlyn92@gmail.com>
Closes#11779 from jeanlyn/fix_driver_oom.
## What changes were proposed in this pull request?
Extract the workaround for HADOOP-10622 introduced by #11940 into UninterruptibleThread so that we can test and reuse it.
## How was this patch tested?
Unit tests
Author: Shixiong Zhu <shixiong@databricks.com>
Closes#11971 from zsxwing/uninterrupt.
## What changes were proposed in this pull request?
Call `executor.stop` in a new thread to eliminate deadlock.
## How was this patch tested?
Existing unit tests
Author: Shixiong Zhu <shixiong@databricks.com>
Closes#12012 from zsxwing/SPARK-14180.
## What changes were proposed in this pull request?
Currently, for the key that can not fit within a long, we build a hash map for UnsafeHashedRelation, it's converted to BytesToBytesMap after serialization and deserialization. We should build a BytesToBytesMap directly to have better memory efficiency.
In order to do that, BytesToBytesMap should support multiple (K,V) pair with the same K, Location.putNewKey() is renamed to Location.append(), which could append multiple values for the same key (same Location). `Location.newValue()` is added to find the next value for the same key.
## How was this patch tested?
Existing tests. Added benchmark for broadcast hash join with duplicated keys.
Author: Davies Liu <davies@databricks.com>
Closes#11870 from davies/map2.
Refactor RRDD by separating the common logic interacting with the R worker to a new class RRunner, which can be used to evaluate R UDFs.
Now RRDD relies on RRuner for RDD computation and RRDD could be reomved if we want to remove RDD API in SparkR later.
Author: Sun Rui <rui.sun@intel.com>
Closes#10947 from sun-rui/SPARK-12792.
JIRA: https://issues.apache.org/jira/browse/SPARK-13742
## What changes were proposed in this pull request?
`RandomSampler.sample` currently accepts iterator as input and output another iterator. This makes it inappropriate to use in wholestage codegen of `Sampler` operator #11517. This change is to add non-iterator interface to `RandomSampler`.
This change adds a new method `def sample(): Int` to the trait `RandomSampler`. As we don't need to know the actual values of the sampling items, so this new method takes no arguments.
This method will decide whether to sample the next item or not. It returns how many times the next item will be sampled.
For `BernoulliSampler` and `BernoulliCellSampler`, the returned sampling times can only be 0 or 1. It simply means whether to sample the next item or not.
For `PoissonSampler`, the returned value can be more than 1, meaning the next item will be sampled multiple times.
## How was this patch tested?
Tests are added into `RandomSamplerSuite`.
Author: Liang-Chi Hsieh <simonh@tw.ibm.com>
Author: Liang-Chi Hsieh <viirya@appier.com>
Author: Liang-Chi Hsieh <viirya@gmail.com>
Closes#11578 from viirya/random-sampler-no-iterator.
This patch extends Spark's `UnifiedMemoryManager` to add bookkeeping support for off-heap storage memory, an requirement for enabling off-heap caching (which will be done by #11805). The `MemoryManager`'s `storageMemoryPool` has been split into separate on- and off-heap pools and the storage and unroll memory allocation methods have been updated to accept a `memoryMode` parameter to specify whether allocations should be performed on- or off-heap.
In order to reduce the testing surface, the `StaticMemoryManager` does not support off-heap caching (we plan to eventually remove the `StaticMemoryManager`, so this isn't a significant limitation).
Author: Josh Rosen <joshrosen@databricks.com>
Closes#11942 from JoshRosen/off-heap-storage-memory-bookkeeping.
## What changes were proposed in this pull request?
Removed methods that has been deprecated since 1.1, 1.2, 1.3, 1.4, and 1.5.
## How was this patch tested?
- manully checked that no codes in Spark call these methods any more
- existing test suits
Author: Liwei Lin <lwlin7@gmail.com>
Author: proflin <proflin.me@gmail.com>
Closes#11910 from lw-lin/remove-deprecates.
## What changes were proposed in this pull request?
This PR fixes some newly added java-lint errors(unused-imports, line-lengsth).
## How was this patch tested?
Pass the Jenkins tests.
Author: Dongjoon Hyun <dongjoon@apache.org>
Closes#11968 from dongjoon-hyun/SPARK-14167.
Currently SparkContext.getCallSite() makes a call to Utils.getCallSite().
```
private[spark] def getCallSite(): CallSite = {
val callSite = Utils.getCallSite()
CallSite(
Option(getLocalProperty(CallSite.SHORT_FORM)).getOrElse(callSite.shortForm),
Option(getLocalProperty(CallSite.LONG_FORM)).getOrElse(callSite.longForm)
)
}
```
However, in some places utils.withDummyCallSite(sc) is invoked to avoid expensive threaddumps within getCallSite(). But Utils.getCallSite() is evaluated earlier causing threaddumps to be computed.
This can have severe impact on smaller queries (that finish in 10-20 seconds) having large number of RDDs.
Creating this patch for lazy evaluation of getCallSite.
No new test cases are added. Following standalone test was tried out manually. Also, built entire spark binary and tried with few SQL queries in TPC-DS and TPC-H in multi node cluster
```
def run(): Unit = {
val conf = new SparkConf()
val sc = new SparkContext("local[1]", "test-context", conf)
val start: Long = System.currentTimeMillis();
val confBroadcast = sc.broadcast(new SerializableConfiguration(new Configuration()))
Utils.withDummyCallSite(sc) {
//Large tables end up creating 5500 RDDs
for(i <- 1 to 5000) {
//ignore nulls in RDD as its mainly for testing callSite
val testRDD = new HadoopRDD(sc, confBroadcast, None, null,
classOf[NullWritable], classOf[Writable], 10)
}
}
val end: Long = System.currentTimeMillis();
println("Time taken : " + (end - start))
}
def main(args: Array[String]): Unit = {
run
}
```
Author: Rajesh Balamohan <rbalamohan@apache.org>
Closes#11911 from rajeshbalamohan/SPARK-14091.
## What changes were proposed in this pull request?
We ran into a problem today debugging some class loading problem during deserialization, and JVM was masking the underlying exception which made it very difficult to debug. We can however log the exceptions using try/catch ourselves in serialization/deserialization. The good thing is that all these methods are already using Utils.tryOrIOException, so we can just put the try catch and logging in a single place.
## How was this patch tested?
A logging change with a manual test.
Author: Reynold Xin <rxin@databricks.com>
Closes#11951 from rxin/SPARK-14149.
When a block is persisted in the MemoryStore at a serialized storage level, the current MemoryStore.putIterator() code will unroll the entire iterator as Java objects in memory, then will turn around and serialize an iterator obtained from the unrolled array. This is inefficient and doubles our peak memory requirements.
Instead, I think that we should incrementally serialize blocks while unrolling them.
A downside to incremental serialization is the fact that we will need to deserialize the partially-unrolled data in case there is not enough space to unroll the block and the block cannot be dropped to disk. However, I'm hoping that the memory efficiency improvements will outweigh any performance losses as a result of extra serialization in that hopefully-rare case.
Author: Josh Rosen <joshrosen@databricks.com>
Closes#11791 from JoshRosen/serialize-incrementally.
## What changes were proposed in this pull request?
A fix for local metrics tests that can fail on fast machines.
This is probably what is suggested here #3380 by aarondav?
## How was this patch tested?
CI Tests
Cheers
Author: Joan <joan@goyeau.com>
Closes#11747 from joan38/SPARK-2208-Local-metrics-tests.
## What changes were proposed in this pull request?
In case of failure in subprocess launched in PipedRDD, the failure exception reads “Subprocess exited with status XXX”. Debugging this is not easy for users especially if there are multiple pipe() operations in the Spark application.
Changes done:
- Changed the exception message when non-zero exit code is seen
- If the reader and writer threads see exception, simply logging the command ran. The current model is to propagate the exception "as is" so that upstream Spark logic will take the right action based on what the exception was (eg. for fetch failure, it needs to retry; but for some fatal exception, it will decide to fail the stage / job). So wrapping the exception with a generic exception will not work. Altering the exception message will keep that guarantee but that is ugly (plus not all exceptions might have a constructor for a string message)
## How was this patch tested?
- Added a new test case
- Ran all existing tests for PipedRDD
Author: Tejas Patil <tejasp@fb.com>
Closes#11927 from tejasapatil/SPARK-14110-piperdd-failure.
## What changes were proposed in this pull request?
https://issues.apache.org/jira/browse/SPARK-14055
## How was this patch tested?
manual tests by running LiveJournalPageRank on a large dataset ( the dataset must larger enough to incure RDD partition eviction).
Author: Ernest <earneyzxl@gmail.com>
Closes#11875 from Earne/issue-14055.
This patch refactors the `MemoryStore` so that it can be tested without needing to construct / mock an entire `BlockManager`.
- The block manager's serialization- and compression-related methods have been moved from `BlockManager` to `SerializerManager`.
- `BlockInfoManager `is now passed directly to classes that need it, rather than being passed via the `BlockManager`.
- The `MemoryStore` now calls `dropFromMemory` via a new `BlockEvictionHandler` interface rather than directly calling the `BlockManager`. This change helps to enforce a narrow interface between the `MemoryStore` and `BlockManager` functionality and makes this interface easier to mock in tests.
- Several of the block unrolling tests have been moved from `BlockManagerSuite` into a new `MemoryStoreSuite`.
Author: Josh Rosen <joshrosen@databricks.com>
Closes#11899 from JoshRosen/reduce-memorystore-blockmanager-coupling.
## What changes were proposed in this pull request?
This PR allows us to identify what JVM is used when someone ran a benchmark program. In some cases, a JVM version may affect performance result. Thus, it would be good to show processor information and JVM version information.
```
model name : Intel(R) Xeon(R) CPU E5-2697 v2 2.70GHz
JVM information : OpenJDK 64-Bit Server VM, 1.7.0_65-mockbuild_2014_07_14_06_19-b00
Int and String Scan: Best/Avg Time(ms) Rate(M/s) Per Row(ns) Relative
-------------------------------------------------------------------------------------------
SQL Parquet Vectorized 981 / 994 10.7 93.5 1.0X
SQL Parquet MR 2518 / 2542 4.2 240.1 0.4X
```
```
model name : Intel(R) Xeon(R) CPU E5-2697 v2 2.70GHz
JVM information : IBM J9 VM, pxa6480sr2-20151023_01 (SR2)
String Dictionary: Best/Avg Time(ms) Rate(M/s) Per Row(ns) Relative
-------------------------------------------------------------------------------------------
SQL Parquet Vectorized 693 / 740 15.1 66.1 1.0X
SQL Parquet MR 2501 / 2562 4.2 238.5 0.3X
```
## How was this patch tested?
Tested by using existing benchmark programs
(If this patch involves UI changes, please attach a screenshot; otherwise, remove this)
Author: Kazuaki Ishizaki <ishizaki@jp.ibm.com>
Closes#11893 from kiszk/SPARK-14072.
Building on the `SerializerManager` introduced in SPARK-13926/ #11755, this patch Spark modifies Spark's BlockManager to use RDD's ClassTags in order to select the best serializer to use when caching RDD blocks.
When storing a local block, the BlockManager `put()` methods use implicits to record ClassTags and stores those tags in the blocks' BlockInfo records. When reading a local block, the stored ClassTag is used to pick the appropriate serializer. When a block is stored with replication, the class tag is written into the block transfer metadata and will also be stored in the remote BlockManager.
There are two or three places where we don't properly pass ClassTags, including TorrentBroadcast and BlockRDD. I think this happens to work because the missing ClassTag always happens to be `ClassTag.Any`, but it might be worth looking more carefully at those places to see whether we should be more explicit.
Author: Josh Rosen <joshrosen@databricks.com>
Closes#11801 from JoshRosen/pick-best-serializer-for-caching.
## What changes were proposed in this pull request?
This PR try acquire the memory for hash map in shuffled hash join, fail the task if there is no enough memory (otherwise it could OOM the executor).
It also removed unused HashedRelation.
## How was this patch tested?
Existing unit tests. Manual tests with TPCDS Q78.
Author: Davies Liu <davies@databricks.com>
Closes#11826 from davies/cleanup_hash2.
## What changes were proposed in this pull request?
Spark uses `DeveloperApi` annotation, but sometimes it seems to conflict with visibility. This PR tries to fix those conflict by removing annotations for non-publics. The following is the example.
**JobResult.scala**
```scala
DeveloperApi
sealed trait JobResult
DeveloperApi
case object JobSucceeded extends JobResult
-DeveloperApi
private[spark] case class JobFailed(exception: Exception) extends JobResult
```
## How was this patch tested?
Pass the existing Jenkins test.
Author: Dongjoon Hyun <dongjoon@apache.org>
Closes#11797 from dongjoon-hyun/SPARK-13986.
## What changes were proposed in this pull request?
This PR adds some proper periods and spaces to Spark CLI help messages and SQL/YARN conf docs for consistency.
## How was this patch tested?
Manual.
Author: Dongjoon Hyun <dongjoon@apache.org>
Closes#11848 from dongjoon-hyun/add_proper_period_and_space.
## What changes were proposed in this pull request?
[Spark Coding Style Guide](https://cwiki.apache.org/confluence/display/SPARK/Spark+Code+Style+Guide) has 100-character limit on lines, but it's disabled for Java since 11/09/15. This PR enables **LineLength** checkstyle again. To help that, this also introduces **RedundantImport** and **RedundantModifier**, too. The following is the diff on `checkstyle.xml`.
```xml
- <!-- TODO: 11/09/15 disabled - the lengths are currently > 100 in many places -->
- <!--
<module name="LineLength">
<property name="max" value="100"/>
<property name="ignorePattern" value="^package.*|^import.*|a href|href|http://|https://|ftp://"/>
</module>
- -->
<module name="NoLineWrap"/>
<module name="EmptyBlock">
<property name="option" value="TEXT"/>
-167,5 +164,7
</module>
<module name="CommentsIndentation"/>
<module name="UnusedImports"/>
+ <module name="RedundantImport"/>
+ <module name="RedundantModifier"/>
```
## How was this patch tested?
Currently, `lint-java` is disabled in Jenkins. It needs a manual test.
After passing the Jenkins tests, `dev/lint-java` should passes locally.
Author: Dongjoon Hyun <dongjoon@apache.org>
Closes#11831 from dongjoon-hyun/SPARK-14011.
## What changes were proposed in this pull request?
This change fixes the executor OOM which was recently introduced in PR apache/spark#11095
(Please fill in changes proposed in this fix)
## How was this patch tested?
Tested by running a spark job on the cluster.
(Please explain how this patch was tested. E.g. unit tests, integration tests, manual tests)
(If this patch involves UI changes, please attach a screenshot; otherwise, remove this)
… Sorter
Author: Sital Kedia <skedia@fb.com>
Closes#11794 from sitalkedia/SPARK-13958.
## What changes were proposed in this pull request?
This regression is introduced in #9182, previously attempt id is simply as counter "1" or "2". With the change of #9182, it is changed to full name as "appattemtp-xxx-00001", this will affect all the parts which uses this attempt id, like event log file name, history server app url link. So here change it back to the counter to keep consistent with previous code.
Also revert back this patch #11518, this patch fix the url link of history log according to the new way of attempt id, since here we change back to the previous way, so this patch is not necessary, here to revert it.
Also clean "spark.yarn.app.id" and "spark.yarn.app.attemptId", since it is useless now.
## How was this patch tested?
Test it with unit test and manually test different scenario:
1. application running in yarn-client mode.
2. application running in yarn-cluster mode.
3. application running in yarn-cluster mode with multiple attempts.
Checked both the event log file name and url link.
CC vanzin tgravescs , please help to review, thanks a lot.
Author: jerryshao <sshao@hortonworks.com>
Closes#11721 from jerryshao/SPARK-13885.
This patch modifies the BlockManager, MemoryStore, and several other storage components so that serialized cached blocks are stored as multiple small chunks rather than as a single contiguous ByteBuffer.
This change will help to improve the efficiency of memory allocation and the accuracy of memory accounting when serializing blocks. Our current serialization code uses a ByteBufferOutputStream, which doubles and re-allocates its backing byte array; this increases the peak memory requirements during serialization (since we need to hold extra memory while expanding the array). In addition, we currently don't account for the extra wasted space at the end of the ByteBuffer's backing array, so a 129 megabyte serialized block may actually consume 256 megabytes of memory. After switching to storing blocks in multiple chunks, we'll be able to efficiently trim the backing buffers so that no space is wasted.
This change is also a prerequisite to being able to cache blocks which are larger than 2GB (although full support for that depends on several other changes which have not bee implemented yet).
Author: Josh Rosen <joshrosen@databricks.com>
Closes#11748 from JoshRosen/chunked-block-serialization.
## What changes were proposed in this pull request?
As each acceptor/selector in Jetty will use one thread, the number of threads should at least be the number of acceptors and selectors plus 1. Otherwise, the thread pool of Jetty server may be exhausted by acceptors/selectors and not be able to response any request.
To avoid wasting threads, the PR limits the max number of acceptors and selectors and also updates the max thread number if necessary.
## How was this patch tested?
Just make sure we don't break any existing tests
Author: Shixiong Zhu <shixiong@databricks.com>
Closes#11615 from zsxwing/SPARK-13776.
## What changes were proposed in this pull request?
Logging was made private in Spark 2.0. If we move it, then users would be able to create a Logging trait themselves to avoid changing their own code.
## How was this patch tested?
existing tests.
Author: Wenchen Fan <wenchen@databricks.com>
Closes#11764 from cloud-fan/logger.
JIRA Issue:https://issues.apache.org/jira/browse/SPARK-13901
In getAllowedLocalityLevel method of TaskSetManager,we get wrong logDebug information when jump to the next locality level.So we should fix it.
Author: trueyao <501663994@qq.com>
Closes#11719 from trueyao/logDebug-localityWait.
Because ClassTags are available when constructing ShuffledRDD we can use them to automatically use Kryo for shuffle serialization when the RDD's types are known to be compatible with Kryo.
This patch introduces `SerializerManager`, a component which picks the "best" serializer for a shuffle given the elements' ClassTags. It will automatically pick a Kryo serializer for ShuffledRDDs whose key, value, and/or combiner types are primitives, arrays of primitives, or strings. In the future we can use this class as a narrow extension point to integrate specialized serializers for other types, such as ByteBuffers.
In a planned followup patch, I will extend the BlockManager APIs so that we're able to use similar automatic serializer selection when caching RDDs (this is a little trickier because the ClassTags need to be threaded through many more places).
Author: Josh Rosen <joshrosen@databricks.com>
Closes#11755 from JoshRosen/automatically-pick-best-serializer.
## What changes were proposed in this pull request?
In SparkContext, throw Illegalargumentexception when trying to broadcast rdd directly, instead of logging the warning.
## How was this patch tested?
mvn clean install
Add UT in BroadcastSuite
Author: Wesley Tang <tangmingjun@mininglamp.com>
Closes#11735 from breakdawn/master.
## What changes were proposed in this pull request?
PipedRDD creates a child thread to read output of the parent stage and feed it to the pipe process. Used a variable to save the exception thrown in the child thread and then propagating the exception in the main thread if the variable was set.
## How was this patch tested?
- Added a unit test
- Ran all the existing tests in PipedRDDSuite and they all pass with the change
- Tested the patch with a real pipe() job, bounced the executor node which ran the parent stage to simulate a fetch failure and observed that the parent stage was re-ran.
Author: Tejas Patil <tejasp@fb.com>
Closes#11628 from tejasapatil/pipe_rdd.
JIRA: https://issues.apache.org/jira/browse/SPARK-13396
Stop using our internal deprecated .metrics on ExceptionFailure instead use accumUpdates
Author: GayathriMurali <gayathri.m.softie@gmail.com>
Closes#11544 from GayathriMurali/SPARK-13396.
## What changes were proposed in this pull request?
Follow up to https://github.com/apache/spark/pull/11657
- Also update `String.getBytes("UTF-8")` to use `StandardCharsets.UTF_8`
- And fix one last new Coverity warning that turned up (use of unguarded `wait()` replaced by simpler/more robust `java.util.concurrent` classes in tests)
- And while we're here cleaning up Coverity warnings, just fix about 15 more build warnings
## How was this patch tested?
Jenkins tests
Author: Sean Owen <sowen@cloudera.com>
Closes#11725 from srowen/SPARK-13823.2.
## What changes were proposed in this pull request?
Force at least two dispatcher-event-loop threads. Since SparkDeploySchedulerBackend (in AppClient) calls askWithRetry to CoarseGrainedScheduler in the same process, there the driver needs at least two dispatcher threads to prevent the dispatcher thread from hanging.
## How was this patch tested?
Manual.
Author: Yonathan Randolph <yonathangmail.com>
Author: Yonathan Randolph <yonathan@liftigniter.com>
Closes#11728 from yonran/SPARK-13906.
## What changes were proposed in this pull request?
a minor fix for the comments of a method in RPC Dispatcher
## How was this patch tested?
existing unit tests
Author: CodingCat <zhunansjtu@gmail.com>
Closes#11738 from CodingCat/minor_rpc.
## What changes were proposed in this pull request?
This patch contains the functionality to balance the load of the cluster-mode drivers among workers
This patch restores the changes in https://github.com/apache/spark/pull/1106 which was erased due to the merging of https://github.com/apache/spark/pull/731
## How was this patch tested?
test with existing test cases
Author: CodingCat <zhunansjtu@gmail.com>
Closes#11702 from CodingCat/SPARK-13803.
Three different things were needed to get rid of spurious warnings:
- silence deprecation warnings when cloning configuration
- change the way SparkHadoopUtil instantiates SparkConf to silence
warnings
- avoid creating new SparkConf instances where it's not needed.
On top of that, I changed the way that Logging.scala detects the repl;
now it uses a method that is overridden in the repl's Main class, and
the hack in Utils.scala is not needed anymore. This makes the 2.11 repl
behave like the 2.10 one and set the default log level to WARN, which
is a lot better. Previously, this wasn't working because the 2.11 repl
triggers log initialization earlier than the 2.10 one.
I also removed and simplified some other code in the 2.11 repl's Main
to avoid replicating logic that already exists elsewhere in Spark.
Tested the 2.11 repl in local and yarn modes.
Author: Marcelo Vanzin <vanzin@cloudera.com>
Closes#11510 from vanzin/SPARK-13626.
This patch refactors the MemoryStore to remove the concept of `pendingUnrollMemory`. It also fixes fixes SPARK-6157: "Unrolling with MEMORY_AND_DISK should always release memory".
Key changes:
- Inline `MemoryStore.tryToPut` at its three call sites in the `MemoryStore`.
- Inline `Memory.unrollSafely` at its only call site (in `MemoryStore.putIterator`).
- Inline `MemoryManager.acquireStorageMemory` at its call sites.
- Simplify the code as a result of this inlining (some parameters have fixed values after inlining, so lots of branches can be removed).
- Remove the `pendingUnrollMemory` map by returning the amount of unrollMemory allocated when returning an iterator after a failed `putIterator` call.
- Change `putIterator` to return an instance of `PartiallyUnrolledIterator`, a special iterator subclass which will automatically free the unroll memory of its partially-unrolled elements when the iterator is consumed. To handle cases where the iterator is not consumed (e.g. when a MEMORY_ONLY put fails), `PartiallyUnrolledIterator` exposes a `close()` method which may be called to discard the unrolled values and free their memory.
Author: Josh Rosen <joshrosen@databricks.com>
Closes#11613 from JoshRosen/cleanup-unroll-memory.
I am using dynamic container allocation and speculation and am seeing issues with the active task accounting. The Executor UI still shows active tasks on the an executor but the job/stage is all completed. I think its also affecting the dynamic allocation being able to release containers because it thinks there are still tasks.
There are multiple issues with this:
- If the task end for tasks (in this case probably because of speculation) comes in after the stage is finished, then the DAGScheduler.handleTaskCompletion will skip the task completion event
Author: Thomas Graves <tgraves@prevailsail.corp.gq1.yahoo.com>
Author: Thomas Graves <tgraves@staydecay.corp.gq1.yahoo.com>
Author: Tom Graves <tgraves@yahoo-inc.com>
Closes#10951 from tgravescs/SPARK-11701.
## Problem description:
Mesos shuffle service is completely unusable since Spark 1.6.0 . The problem seems to occur since the move from akka to netty in the networking layer. Until now, a connection from the driver to each shuffle service was used as a signal for the shuffle service to determine, whether the driver is still running. Since 1.6.0, this connection is closed after spark.shuffle.io.connectionTimeout (or spark.network.timeout if the former is not set) due to it being idle. The shuffle service interprets this as a signal that the driver has stopped, despite the driver still being alive. Thus, shuffle files are deleted before the application has stopped.
### Context and analysis:
spark shuffle fails with mesos after 2mins: https://issues.apache.org/jira/browse/SPARK-12583
External shuffle service broken w/ Mesos: https://issues.apache.org/jira/browse/SPARK-13159
This is a follow up on #11207 .
## What changes were proposed in this pull request?
This PR adds a heartbeat signal from the Driver (in MesosExternalShuffleClient) to all registered external mesos shuffle service instances. In MesosExternalShuffleBlockHandler, a thread periodically checks whether a driver has timed out and cleans an application's shuffle files if this is the case.
## How was the this patch tested?
This patch has been tested on a small mesos test cluster using the spark-shell. Log output from mesos shuffle service:
```
16/02/19 15:13:45 INFO mesos.MesosExternalShuffleBlockHandler: Received registration request from app 294def07-3249-4e0f-8d71-bf8c83c58a50-0018 (remote address /xxx.xxx.xxx.xxx:52391, heartbeat timeout 120000 ms).
16/02/19 15:13:47 INFO shuffle.ExternalShuffleBlockResolver: Registered executor AppExecId{appId=294def07-3249-4e0f-8d71-bf8c83c58a50-0018, execId=3} with ExecutorShuffleInfo{localDirs=[/foo/blockmgr-c84c0697-a3f9-4f61-9c64-4d3ee227c047], subDirsPerLocalDir=64, shuffleManager=sort}
16/02/19 15:13:47 INFO shuffle.ExternalShuffleBlockResolver: Registered executor AppExecId{appId=294def07-3249-4e0f-8d71-bf8c83c58a50-0018, execId=7} with ExecutorShuffleInfo{localDirs=[/foo/blockmgr-bf46497a-de80-47b9-88f9-563123b59e03], subDirsPerLocalDir=64, shuffleManager=sort}
16/02/19 15:16:02 INFO mesos.MesosExternalShuffleBlockHandler: Application 294def07-3249-4e0f-8d71-bf8c83c58a50-0018 timed out. Removing shuffle files.
16/02/19 15:16:02 INFO shuffle.ExternalShuffleBlockResolver: Application 294def07-3249-4e0f-8d71-bf8c83c58a50-0018 removed, cleanupLocalDirs = true
16/02/19 15:16:02 INFO shuffle.ExternalShuffleBlockResolver: Cleaning up executor AppExecId{appId=294def07-3249-4e0f-8d71-bf8c83c58a50-0018, execId=3}'s 1 local dirs
16/02/19 15:16:02 INFO shuffle.ExternalShuffleBlockResolver: Cleaning up executor AppExecId{appId=294def07-3249-4e0f-8d71-bf8c83c58a50-0018, execId=7}'s 1 local dirs
```
Note: there are 2 executors running on this slave.
Author: Bertrand Bossy <bertrand.bossy@teralytics.net>
Closes#11272 from bbossy/SPARK-12583-mesos-shuffle-service-heartbeat.
This patch upgrades Py4J from 0.9.1 to 0.9.2 in order to include a patch which modifies Py4J to use the current thread's ContextClassLoader when performing reflection / class loading. This is necessary in order to fix [SPARK-5185](https://issues.apache.org/jira/browse/SPARK-5185), a longstanding issue affecting the use of `--jars` and `--packages` in PySpark.
In order to demonstrate that the fix works, I removed the workarounds which were added as part of [SPARK-6027](https://issues.apache.org/jira/browse/SPARK-6027) / #4779 and other patches.
Py4J diff: https://github.com/bartdag/py4j/compare/0.9.1...0.9.2
/cc zsxwing tdas davies brkyvz
Author: Josh Rosen <joshrosen@databricks.com>
Closes#11687 from JoshRosen/py4j-0.9.2.
When reading data from the DiskStore and attempting to cache it back into the memory store, we should guard against race conditions where multiple readers are attempting to re-cache the same block in memory.
This patch accomplishes this by synchronizing on the block's `BlockInfo` object while trying to re-cache a block.
(Will file JIRA as soon as ASF JIRA stops being down / laggy).
Author: Josh Rosen <joshrosen@databricks.com>
Closes#11660 from JoshRosen/concurrent-recaching-fixes.
trait SynchronizedSet in package mutable is deprecated
Author: Wilson Wu <wilson888888888@gmail.com>
Closes#11580 from wilson888888888/spark-synchronizedset.
## What changes were proposed in this pull request?
This PR fixes 135 typos over 107 files:
* 121 typos in comments
* 11 typos in testcase name
* 3 typos in log messages
## How was this patch tested?
Manual.
Author: Dongjoon Hyun <dongjoon@apache.org>
Closes#11689 from dongjoon-hyun/fix_more_typos.
## What changes were proposed in this pull request?
- Fixes calls to `new String(byte[])` or `String.getBytes()` that rely on platform default encoding, to use UTF-8
- Same for `InputStreamReader` and `OutputStreamWriter` constructors
- Standardizes on UTF-8 everywhere
- Standardizes specifying the encoding with `StandardCharsets.UTF-8`, not the Guava constant or "UTF-8" (which means handling `UnuspportedEncodingException`)
- (also addresses the other remaining Coverity scan issues, which are pretty trivial; these are separated into commit 1deecd8d9c )
## How was this patch tested?
Jenkins tests
Author: Sean Owen <sowen@cloudera.com>
Closes#11657 from srowen/SPARK-13823.
## What changes were proposed in this pull request?
Currently, when a java.net.BindException is thrown, it displays the following message:
java.net.BindException: Address already in use: Service '$serviceName' failed after 16 retries!
This change adds port configuration suggestions to the BindException, for example, for the UI, it now displays
java.net.BindException: Address already in use: Service 'SparkUI' failed after 16 retries! Consider explicitly setting the appropriate port for 'SparkUI' (for example spark.ui.port for SparkUI) to an available port or increasing spark.port.maxRetries.
## How was this patch tested?
Manual tests
Author: Bjorn Jonsson <bjornjon@gmail.com>
Closes#11644 from bjornjon/master.
## What changes were proposed in this pull request?
The current RPC can't handle large blocks very well, it's very slow to fetch 100M block (about 1 minute). Once switch to block manager to fetch that, it took about 10 seconds (still could be improved).
## How was this patch tested?
existing unit tests.
Author: Davies Liu <davies@databricks.com>
Closes#11659 from davies/direct_result.
When dynamic resource allocation is enabled fetching broadcast variables from removed executors were causing job failures and SPARK-9591 fixed this problem by trying all locations of a block before giving up. However, the locations of a block is retrieved only once from the driver in this process and the locations in this list can be stale due to dynamic resource allocation. This situation gets worse when running on a large cluster as the size of this location list can be in the order of several hundreds out of which there may be tens of stale entries. What we have observed is with the default settings of 3 max retries and 5s between retries (that's 15s per location) the time it takes to read a broadcast variable can be as high as ~17m (70 failed attempts * 15s/attempt)
Author: Nezih Yigitbasi <nyigitbasi@netflix.com>
Closes#11241 from nezihyigitbasi/SPARK-13328.
In preparation for the demise of assemblies, this change allows the
YARN backend to use multiple jars and globs as the "Spark jar". The
config option has been renamed to "spark.yarn.jars" to reflect that.
A second option "spark.yarn.archive" was also added; if set, this
takes precedence and uploads an archive expected to contain the jar
files with the Spark code and its dependencies.
Existing deployments should keep working, mostly. This change drops
support for the "SPARK_JAR" environment variable, and also does not
fall back to using "jarOfClass" if no configuration is set, falling
back to finding files under SPARK_HOME instead. This should be fine
since "jarOfClass" probably wouldn't work unless you were using
spark-submit anyway.
Tested with the unit tests, and trying the different config options
on a YARN cluster.
Author: Marcelo Vanzin <vanzin@cloudera.com>
Closes#11500 from vanzin/SPARK-13577.
## What changes were proposed in this pull request?
Here lists all cases that Master cannot talk with Worker for a while and then network is back.
1. Master doesn't know the network issue (not yet timeout)
a. Worker doesn't know the network issue (onDisconnected is not called)
- Worker keeps sending Heartbeat. Both Worker and Master don't know the network issue. Nothing to do. (Finally, Master will notice the heartbeat timeout if network is not recovered)
b. Worker knows the network issue (onDisconnected is called)
- Worker stops sending Heartbeat and sends `RegisterWorker` to master. Master will reply `RegisterWorkerFailed("Duplicate worker ID")`. Worker calls "System.exit(1)" (Finally, Master will notice the heartbeat timeout if network is not recovered) (May leak driver processes. See [SPARK-13602](https://issues.apache.org/jira/browse/SPARK-13602))
2. Worker timeout (Master knows the network issue). In such case, master removes Worker and its executors and drivers.
a. Worker doesn't know the network issue (onDisconnected is not called)
- Worker keeps sending Heartbeat.
- If the network is back, say Master receives Heartbeat, Master sends `ReconnectWorker` to Worker
- Worker send `RegisterWorker` to master.
- Master accepts `RegisterWorker` but doesn't know executors and drivers in Worker. (may leak executors)
b. Worker knows the network issue (onDisconnected is called)
- Worker stop sending `Heartbeat`. Worker will send "RegisterWorker" to master.
- Master accepts `RegisterWorker` but doesn't know executors and drivers in Worker. (may leak executors)
This PR fixes executors and drivers leak in 2.a and 2.b when Worker reregisters with Master. The approach is making Worker send `WorkerLatestState` to sync the state after registering with master successfully. Then Master will ask Worker to kill unknown executors and drivers.
Note: Worker cannot just kill executors after registering with master because in the worker, `LaunchExecutor` and `RegisteredWorker` are processed in two threads. If `LaunchExecutor` happens before `RegisteredWorker`, Worker's executor list will contain new executors after Master accepts `RegisterWorker`. We should not kill these executors. So sending the list to Master and let Master tell Worker which executors should be killed.
## How was this patch tested?
test("SPARK-13604: Master should ask Worker kill unknown executors and drivers")
Author: Shixiong Zhu <shixiong@databricks.com>
Closes#11455 from zsxwing/orphan-executors.
## What changes were proposed in this pull request?
Since the opening curly brace, '{', has many usages as discussed in [SPARK-3854](https://issues.apache.org/jira/browse/SPARK-3854), this PR adds a ScalaStyle rule to prevent '){' pattern for the following majority pattern and fixes the code accordingly. If we enforce this in ScalaStyle from now, it will improve the Scala code quality and reduce review time.
```
// Correct:
if (true) {
println("Wow!")
}
// Incorrect:
if (true){
println("Wow!")
}
```
IntelliJ also shows new warnings based on this.
## How was this patch tested?
Pass the Jenkins ScalaStyle test.
Author: Dongjoon Hyun <dongjoon@apache.org>
Closes#11637 from dongjoon-hyun/SPARK-3854.
Today, both the MemoryStore and DiskStore implement a common `BlockStore` API, but I feel that this API is inappropriate because it abstracts away important distinctions between the behavior of these two stores.
For instance, the disk store doesn't have a notion of storing deserialized objects, so it's confusing for it to expose object-based APIs like putIterator() and getValues() instead of only exposing binary APIs and pushing the responsibilities of serialization and deserialization to the client. Similarly, the DiskStore put() methods accepted a `StorageLevel` parameter even though the disk store can only store blocks in one form.
As part of a larger BlockManager interface cleanup, this patch remove the BlockStore interface and refines the MemoryStore and DiskStore interfaces to reflect more narrow sets of responsibilities for those components. Some of the benefits of this interface cleanup are reflected in simplifications to several unit tests to eliminate now-unnecessary mocking, significant simplification of the BlockManager's `getLocal()` and `doPut()` methods, and a narrower API between the MemoryStore and DiskStore.
Author: Josh Rosen <joshrosen@databricks.com>
Closes#11534 from JoshRosen/remove-blockstore-interface.
The contains() method does not return consistently with get() if the key is deprecated. For example,
import org.apache.spark.SparkConf
val conf = new SparkConf()
conf.set("spark.io.compression.lz4.block.size", "12345") # display some deprecated warning message
conf.get("spark.io.compression.lz4.block.size") # return 12345
conf.get("spark.io.compression.lz4.blockSize") # return 12345
conf.contains("spark.io.compression.lz4.block.size") # return true
conf.contains("spark.io.compression.lz4.blockSize") # return false
The fix will make the contains() and get() more consistent.
I've added a test case for this.
(Please explain how this patch was tested. E.g. unit tests, integration tests, manual tests)
Unit tests should be sufficient.
Author: bomeng <bmeng@us.ibm.com>
Closes#11568 from bomeng/SPARK-13727.
We have a recoverable Spark streaming job with checkpoint enabled, it could be executed correctly at first time, but throw following exception when restarted and recovered from checkpoint.
```
org.apache.spark.SparkException: RDD transformations and actions can only be invoked by the driver, not inside of other transformations; for example, rdd1.map(x => rdd2.values.count() * x) is invalid because the values transformation and count action cannot be performed inside of the rdd1.map transformation. For more information, see SPARK-5063.
at org.apache.spark.rdd.RDD.org$apache$spark$rdd$RDD$$sc(RDD.scala:87)
at org.apache.spark.rdd.RDD.withScope(RDD.scala:352)
at org.apache.spark.rdd.RDD.union(RDD.scala:565)
at org.apache.spark.streaming.Repo$$anonfun$createContext$1.apply(Repo.scala:23)
at org.apache.spark.streaming.Repo$$anonfun$createContext$1.apply(Repo.scala:19)
at org.apache.spark.streaming.dstream.DStream$$anonfun$foreachRDD$1$$anonfun$apply$mcV$sp$3.apply(DStream.scala:627)
```
According to exception, it shows I invoked transformations and actions in other transformations, but I did not. The real reason is that I used external RDD in DStream operation. External RDD data is not stored in checkpoint, so that during recovering, the initial value of _sc in this RDD is assigned to null and hit above exception. But you can find the error message is misleading, it indicates nothing about the real issue
Here is the code to reproduce it.
```scala
object Repo {
def createContext(ip: String, port: Int, checkpointDirectory: String):StreamingContext = {
println("Creating new context")
val sparkConf = new SparkConf().setAppName("Repo").setMaster("local[2]")
val ssc = new StreamingContext(sparkConf, Seconds(2))
ssc.checkpoint(checkpointDirectory)
var cached = ssc.sparkContext.parallelize(Seq("apple, banana"))
val words = ssc.socketTextStream(ip, port).flatMap(_.split(" "))
words.foreachRDD((rdd: RDD[String]) => {
val res = rdd.map(word => (word, word.length)).collect()
println("words: " + res.mkString(", "))
cached = cached.union(rdd)
cached.checkpoint()
println("cached words: " + cached.collect.mkString(", "))
})
ssc
}
def main(args: Array[String]) {
val ip = "localhost"
val port = 9999
val dir = "/home/maowei/tmp"
val ssc = StreamingContext.getOrCreate(dir,
() => {
createContext(ip, port, dir)
})
ssc.start()
ssc.awaitTermination()
}
}
```
Author: mwws <wei.mao@intel.com>
Closes#11595 from mwws/SPARK-MissleadingLog.
## What changes were proposed in this pull request?
Previously the Mesos framework webui URL was being derived only from the Spark UI address leaving no possibility to configure it. This commit makes it configurable. If unset it falls back to the previous behavior.
Motivation:
This change is necessary in order to be able to install Spark on DCOS and to be able to give it a custom service link. The configured `webui_url` is configured to point to a reverse proxy in the DCOS environment.
## How was this patch tested?
Locally, using unit tests and on DCOS testing and stable revision.
Author: Sergiusz Urbaniak <sur@mesosphere.io>
Closes#11369 from s-urbaniak/sur-webui-url.
## What changes were proposed in this pull request?
Originally the page is sorted by AppID by default.
After tests with users' feedback, we think it might be best to sort by completed time (desc).
## How was this patch tested?
Manually test, with screenshot as follows.
![sorted-by-complete-time-desc](https://cloud.githubusercontent.com/assets/11683054/13647686/d6dea924-e5fa-11e5-8fc5-68e039b74b6f.png)
Author: zhuol <zhuol@yahoo-inc.com>
Closes#11608 from zhuoliu/13775.
## What changes were proposed in this pull request?
When a worker is lost, the executors on this worker are also lost. But Master's ApplicationPage still displays their states as running.
This patch just sets the executor state to `LOST` when a worker is lost.
## How was this patch tested?
manual tests
Author: Shixiong Zhu <shixiong@databricks.com>
Closes#11609 from zsxwing/SPARK-13778.
## What changes were proposed in this pull request?
Fix this use case, which was already fixed in SPARK-10548 in 1.6 but was broken in master due to #9264:
```
(1 to 100).par.foreach { _ => sc.parallelize(1 to 5).map { i => (i, i) }.toDF("a", "b").count() }
```
This threw `IllegalArgumentException` consistently before this patch. For more detail, see the JIRA.
## How was this patch tested?
New test in `SQLExecutionSuite`.
Author: Andrew Or <andrew@databricks.com>
Closes#11586 from andrewor14/fix-concurrent-sql.
## What changes were proposed in this pull request?
In order to make `docs/examples` (and other related code) more simple/readable/user-friendly, this PR replaces existing codes like the followings by using `diamond` operator.
```
- final ArrayList<Product2<Object, Object>> dataToWrite =
- new ArrayList<Product2<Object, Object>>();
+ final ArrayList<Product2<Object, Object>> dataToWrite = new ArrayList<>();
```
Java 7 or higher supports **diamond** operator which replaces the type arguments required to invoke the constructor of a generic class with an empty set of type parameters (<>). Currently, Spark Java code use mixed usage of this.
## How was this patch tested?
Manual.
Pass the existing tests.
Author: Dongjoon Hyun <dongjoon@apache.org>
Closes#11541 from dongjoon-hyun/SPARK-13702.
## What changes were proposed in this pull request?
If a job is being scheduled in one thread which has a dependency on an
RDD currently executing a shuffle in another thread, Spark would throw a
NullPointerException. This patch synchronizes access to `mapStatuses` and
skips null status entries (which are in-progress shuffle tasks).
## How was this patch tested?
Our client code unit test suite, which was reliably reproducing the race
condition with 10 threads, shows that this fixes it. I have not found a minimal
test case to add to Spark, but I will attempt to do so if desired.
The same test case was tripping up on SPARK-4454, which was fixed by
making other DAGScheduler code thread-safe.
shivaram srowen
Author: Andy Sloane <asloane@tetrationanalytics.com>
Closes#11505 from a1k0n/SPARK-13631.
## What changes were proposed in this pull request?
This issue fixes the following potential bugs and Java coding style detected by Coverity and Checkstyle.
- Implement both null and type checking in equals functions.
- Fix wrong type casting logic in SimpleJavaBean2.equals.
- Add `implement Cloneable` to `UTF8String` and `SortedIterator`.
- Remove dereferencing before null check in `AbstractBytesToBytesMapSuite`.
- Fix coding style: Add '{}' to single `for` statement in mllib examples.
- Remove unused imports in `ColumnarBatch` and `JavaKinesisStreamSuite`.
- Remove unused fields in `ChunkFetchIntegrationSuite`.
- Add `stop()` to prevent resource leak.
Please note that the last two checkstyle errors exist on newly added commits after [SPARK-13583](https://issues.apache.org/jira/browse/SPARK-13583).
## How was this patch tested?
manual via `./dev/lint-java` and Coverity site.
Author: Dongjoon Hyun <dongjoon@apache.org>
Closes#11530 from dongjoon-hyun/SPARK-13692.
When a cached block is spilled to disk and read back in serialized form (i.e. as bytes), the current BlockManager implementation will attempt to re-insert the serialized block into the MemoryStore even if the block's storage level requests deserialized caching.
This behavior adds some complexity to the MemoryStore but I don't think it offers many performance benefits and I'd like to remove it in order to simplify a larger refactoring patch. Therefore, this patch changes the behavior so that disk store reads will only cache bytes in the memory store for blocks with serialized storage levels.
There are two places where we request serialized bytes from the BlockStore:
1. getLocalBytes(), which is only called when reading local copies of TorrentBroadcast pieces. Broadcast pieces are always cached using a serialized storage level, so this won't lead to a mismatch in serialization forms if spilled bytes read from disk are cached as bytes in the memory store.
2. the non-shuffle-block branch in getBlockData(), which is only called by the NettyBlockRpcServer when responding to requests to read remote blocks. Caching the serialized bytes in memory will only benefit us if those cached bytes are read before they're evicted and the likelihood of that happening seems low since the frequency of remote reads of non-broadcast cached blocks seems very low. Caching these bytes when they have a low probability of being read is bad if it risks the eviction of blocks which are cached in their expected serialized/deserialized forms, since those blocks seem more likely to be read in local computation.
Given the argument above, I think this change is unlikely to cause performance regressions.
Author: Josh Rosen <joshrosen@databricks.com>
Closes#11533 from JoshRosen/remove-memorystore-level-mismatch.
## What changes were proposed in this pull request?
In WebUI, now Jetty Server starts with SPARK_LOCAL_IP config value if it
is configured otherwise it starts with default value as '0.0.0.0'.
It is continuation as per the closed PR https://github.com/apache/spark/pull/11133 for the JIRA SPARK-13117 and discussion in SPARK-13117.
## How was this patch tested?
This has been verified using the command 'netstat -tnlp | grep <PID>' to check on which IP/hostname is binding with the below steps.
In the below results, mentioned PID in the command is the corresponding process id.
#### Without the patch changes,
Web UI(Jetty Server) is not taking the value configured for SPARK_LOCAL_IP and it is listening to all the interfaces.
###### Master
```
[devarajstobdtserver2 sbin]$ netstat -tnlp | grep 3930
tcp6 0 0 :::8080 :::* LISTEN 3930/java
```
###### Worker
```
[devarajstobdtserver2 sbin]$ netstat -tnlp | grep 4090
tcp6 0 0 :::8081 :::* LISTEN 4090/java
```
###### History Server Process,
```
[devarajstobdtserver2 sbin]$ netstat -tnlp | grep 2471
tcp6 0 0 :::18080 :::* LISTEN 2471/java
```
###### Driver
```
[devarajstobdtserver2 spark-master]$ netstat -tnlp | grep 6556
tcp6 0 0 :::4040 :::* LISTEN 6556/java
```
#### With the patch changes
##### i. With SPARK_LOCAL_IP configured
If the SPARK_LOCAL_IP is configured then all the processes Web UI(Jetty Server) is getting bind to the configured value.
###### Master
```
[devarajstobdtserver2 sbin]$ netstat -tnlp | grep 1561
tcp6 0 0 x.x.x.x:8080 :::* LISTEN 1561/java
```
###### Worker
```
[devarajstobdtserver2 sbin]$ netstat -tnlp | grep 2229
tcp6 0 0 x.x.x.x:8081 :::* LISTEN 2229/java
```
###### History Server
```
[devarajstobdtserver2 sbin]$ netstat -tnlp | grep 3747
tcp6 0 0 x.x.x.x:18080 :::* LISTEN 3747/java
```
###### Driver
```
[devarajstobdtserver2 spark-master]$ netstat -tnlp | grep 6013
tcp6 0 0 x.x.x.x:4040 :::* LISTEN 6013/java
```
##### ii. Without SPARK_LOCAL_IP configured
If the SPARK_LOCAL_IP is not configured then all the processes Web UI(Jetty Server) will start with the '0.0.0.0' as default value.
###### Master
```
[devarajstobdtserver2 sbin]$ netstat -tnlp | grep 4573
tcp6 0 0 :::8080 :::* LISTEN 4573/java
```
###### Worker
```
[devarajstobdtserver2 sbin]$ netstat -tnlp | grep 4703
tcp6 0 0 :::8081 :::* LISTEN 4703/java
```
###### History Server
```
[devarajstobdtserver2 sbin]$ netstat -tnlp | grep 4846
tcp6 0 0 :::18080 :::* LISTEN 4846/java
```
###### Driver
```
[devarajstobdtserver2 sbin]$ netstat -tnlp | grep 5437
tcp6 0 0 :::4040 :::* LISTEN 5437/java
```
Author: Devaraj K <devaraj@apache.org>
Closes#11490 from devaraj-kavali/SPARK-13117-v1.
In preparation for larger refactoring, this patch removes the confusing `returnValues` option from the BlockStore put() APIs: returning the value is only useful in one place (caching) and in other situations, such as block replication, it's simpler to put() and then get().
As part of this change, I needed to refactor `BlockManager.doPut()`'s block replication code. I also changed `doPut()` to access the memory and disk stores directly rather than calling them through the BlockStore interface; this is in anticipation of a followup patch to remove the BlockStore interface so that the disk store can expose a binary-data-oriented API which is not concerned with Java objects or serialization.
These changes should be covered by the existing storage unit tests. The best way to review this patch is probably to look at the individual commits, all of which are small and have useful descriptions to guide the review.
/cc davies for review.
Author: Josh Rosen <joshrosen@databricks.com>
Closes#11502 from JoshRosen/remove-returnvalues.
## What changes were proposed in this pull request?
AppClient runs in the driver side. It should not call `Utils.tryOrExit` as it will send exception to SparkUncaughtExceptionHandler and call `System.exit`. This PR just removed `Utils.tryOrExit`.
## How was this patch tested?
manual tests.
Author: Shixiong Zhu <shixiong@databricks.com>
Closes#11566 from zsxwing/SPARK-13711.
`HadoopFsRelation` is used for reading most files into Spark SQL. However today this class mixes the concerns of file management, schema reconciliation, scan building, bucketing, partitioning, and writing data. As a result, many data sources are forced to reimplement the same functionality and the various layers have accumulated a fair bit of inefficiency. This PR is a first cut at separating this into several components / interfaces that are each described below. Additionally, all implementations inside of Spark (parquet, csv, json, text, orc, svmlib) have been ported to the new API `FileFormat`. External libraries, such as spark-avro will also need to be ported to work with Spark 2.0.
### HadoopFsRelation
A simple `case class` that acts as a container for all of the metadata required to read from a datasource. All discovery, resolution and merging logic for schemas and partitions has been removed. This an internal representation that no longer needs to be exposed to developers.
```scala
case class HadoopFsRelation(
sqlContext: SQLContext,
location: FileCatalog,
partitionSchema: StructType,
dataSchema: StructType,
bucketSpec: Option[BucketSpec],
fileFormat: FileFormat,
options: Map[String, String]) extends BaseRelation
```
### FileFormat
The primary interface that will be implemented by each different format including external libraries. Implementors are responsible for reading a given format and converting it into `InternalRow` as well as writing out an `InternalRow`. A format can optionally return a schema that is inferred from a set of files.
```scala
trait FileFormat {
def inferSchema(
sqlContext: SQLContext,
options: Map[String, String],
files: Seq[FileStatus]): Option[StructType]
def prepareWrite(
sqlContext: SQLContext,
job: Job,
options: Map[String, String],
dataSchema: StructType): OutputWriterFactory
def buildInternalScan(
sqlContext: SQLContext,
dataSchema: StructType,
requiredColumns: Array[String],
filters: Array[Filter],
bucketSet: Option[BitSet],
inputFiles: Array[FileStatus],
broadcastedConf: Broadcast[SerializableConfiguration],
options: Map[String, String]): RDD[InternalRow]
}
```
The current interface is based on what was required to get all the tests passing again, but still mixes a couple of concerns (i.e. `bucketSet` is passed down to the scan instead of being resolved by the planner). Additionally, scans are still returning `RDD`s instead of iterators for single files. In a future PR, bucketing should be removed from this interface and the scan should be isolated to a single file.
### FileCatalog
This interface is used to list the files that make up a given relation, as well as handle directory based partitioning.
```scala
trait FileCatalog {
def paths: Seq[Path]
def partitionSpec(schema: Option[StructType]): PartitionSpec
def allFiles(): Seq[FileStatus]
def getStatus(path: Path): Array[FileStatus]
def refresh(): Unit
}
```
Currently there are two implementations:
- `HDFSFileCatalog` - based on code from the old `HadoopFsRelation`. Infers partitioning by recursive listing and caches this data for performance
- `HiveFileCatalog` - based on the above, but it uses the partition spec from the Hive Metastore.
### ResolvedDataSource
Produces a logical plan given the following description of a Data Source (which can come from DataFrameReader or a metastore):
- `paths: Seq[String] = Nil`
- `userSpecifiedSchema: Option[StructType] = None`
- `partitionColumns: Array[String] = Array.empty`
- `bucketSpec: Option[BucketSpec] = None`
- `provider: String`
- `options: Map[String, String]`
This class is responsible for deciding which of the Data Source APIs a given provider is using (including the non-file based ones). All reconciliation of partitions, buckets, schema from metastores or inference is done here.
### DataSourceAnalysis / DataSourceStrategy
Responsible for analyzing and planning reading/writing of data using any of the Data Source APIs, including:
- pruning the files from partitions that will be read based on filters.
- appending partition columns*
- applying additional filters when a data source can not evaluate them internally.
- constructing an RDD that is bucketed correctly when required*
- sanity checking schema match-up and other analysis when writing.
*In the future we should do that following:
- Break out file handling into its own Strategy as its sufficiently complex / isolated.
- Push the appending of partition columns down in to `FileFormat` to avoid an extra copy / unvectorization.
- Use a custom RDD for scans instead of `SQLNewNewHadoopRDD2`
Author: Michael Armbrust <michael@databricks.com>
Author: Wenchen Fan <wenchen@databricks.com>
Closes#11509 from marmbrus/fileDataSource.
This is, in a way, the basics to enable SPARK-529 (which was closed as
won't fix but I think is still valuable). In fact, Spark SQL created
something for that, and this change basically factors out that code
and inserts it into SparkConf, with some extra bells and whistles.
To showcase the usage of this pattern, I modified the YARN backend
to use the new config keys (defined in the new `config` package object
under `o.a.s.deploy.yarn`). Most of the changes are mechanic, although
logic had to be slightly modified in a handful of places.
Author: Marcelo Vanzin <vanzin@cloudera.com>
Closes#10205 from vanzin/conf-opts.
## What changes were proposed in this pull request?
Now that dead executors are shown in the executors table (#10058) the totals table is updated to include the separate totals for alive and dead executors as well as the current total, as originally discussed in #10668
## How was this patch tested?
Manually verified by running the Standalone Web UI in the latest Safari and Firefox ESR
Author: Alex Bozarth <ajbozart@us.ibm.com>
Closes#11381 from ajbozarth/spark13459.
## What changes were proposed in this pull request?
Remove old deprecated ThreadPoolExecutor and replace with ExecutionContext using a ForkJoinPool. The downside of this is that scala's ForkJoinPool doesn't give us a way to specify the thread pool name (and is a wrapper of Java's in 2.12) except by providing a custom factory. Note that we can't use Java's ForkJoinPool directly in Scala 2.11 since it uses a ExecutionContext which reports system parallelism. One other implicit change that happens is the old ExecutionContext would have reported a different default parallelism since it used system parallelism rather than threadpool parallelism (this was likely not intended but also likely not a huge difference).
The previous version of this PR attempted to use an execution context constructed on the ThreadPool (but not the deprecated ThreadPoolExecutor class) so as to keep the ability to have human readable named threads but this reported system parallelism.
## How was this patch tested?
unit tests: streaming/testOnly org.apache.spark.streaming.util.*
Author: Holden Karau <holden@us.ibm.com>
Closes#11423 from holdenk/SPARK-13398-move-away-from-ThreadPoolTaskSupport-java-forkjoin.
## What changes were proposed in this pull request?
This PR fixes typos in comments and testcase name of code.
## How was this patch tested?
manual.
Author: Dongjoon Hyun <dongjoon@apache.org>
Closes#11481 from dongjoon-hyun/minor_fix_typos_in_code.
## What changes were proposed in this pull request?
Fixes (another) compile problem due to inadvertent use of Option.contains, only in Scala 2.11
## How was this patch tested?
Jenkins tests
Author: Sean Owen <sowen@cloudera.com>
Closes#11496 from srowen/SPARK-13423.3.
## What changes were proposed in this pull request?
Fixes compile problem due to inadvertent use of `Option.contains`, only in Scala 2.11. The change should have been to replace `Option.exists(_ == x)` with `== Some(x)`. Replacing exists with contains only makes sense for collections. Replacing use of `Option.exists` still makes sense though as it's misleading.
## How was this patch tested?
Jenkins tests / compilation
(If this patch involves UI changes, please attach a screenshot; otherwise, remove this)
Author: Sean Owen <sowen@cloudera.com>
Closes#11493 from srowen/SPARK-13423.2.
## What changes were proposed in this pull request?
After SPARK-6990, `dev/lint-java` keeps Java code healthy and helps PR review by saving much time.
This issue aims remove unused imports from Java/Scala code and add `UnusedImports` checkstyle rule to help developers.
## How was this patch tested?
```
./dev/lint-java
./build/sbt compile
```
Author: Dongjoon Hyun <dongjoon@apache.org>
Closes#11438 from dongjoon-hyun/SPARK-13583.
## What changes were proposed in this pull request?
Make some cross-cutting code improvements according to static analysis. These are individually up for discussion since they exist in separate commits that can be reverted. The changes are broadly:
- Inner class should be static
- Mismatched hashCode/equals
- Overflow in compareTo
- Unchecked warnings
- Misuse of assert, vs junit.assert
- get(a) + getOrElse(b) -> getOrElse(a,b)
- Array/String .size -> .length (occasionally, -> .isEmpty / .nonEmpty) to avoid implicit conversions
- Dead code
- tailrec
- exists(_ == ) -> contains find + nonEmpty -> exists filter + size -> count
- reduce(_+_) -> sum map + flatten -> map
The most controversial may be .size -> .length simply because of its size. It is intended to avoid implicits that might be expensive in some places.
## How was the this patch tested?
Existing Jenkins unit tests.
Author: Sean Owen <sowen@cloudera.com>
Closes#11292 from srowen/SPARK-13423.
Moved TestExecutor.scala from src to test package and removed the unused file TestClient.scala.
Author: Devaraj K <devaraj@apache.org>
Closes#11474 from devaraj-kavali/SPARK-13621.
## What changes were proposed in this pull request?
In order to tell OutputStream that the task has failed or not, we should call the failure callbacks BEFORE calling writer.close().
## How was this patch tested?
Added new unit tests.
Author: Davies Liu <davies@databricks.com>
Closes#11450 from davies/callback.
CacheManager directly calls MemoryStore.unrollSafely() and has its own logic for handling graceful fallback to disk when cached data does not fit in memory. However, this logic also exists inside of the MemoryStore itself, so this appears to be unnecessary duplication.
Thanks to the addition of block-level read/write locks in #10705, we can refactor the code to remove the CacheManager and replace it with an atomic `BlockManager.getOrElseUpdate()` method.
This pull request replaces / subsumes #10748.
/cc andrewor14 and nongli for review. Note that this changes the locking semantics of a couple of internal BlockManager methods (`doPut()` and `lockNewBlockForWriting`), so please pay attention to the Scaladoc changes and new test cases for those methods.
Author: Josh Rosen <joshrosen@databricks.com>
Closes#11436 from JoshRosen/remove-cachemanager.
The Hive client library is not smart enough to notice that the current
user is a proxy user; so when using a proxy user, it fails to fetch
delegation tokens from the metastore because of a missing kerberos
TGT for the current user.
To fix it, just run the code that fetches the delegation token as the
real logged in user.
Tested on a kerberos cluster both submitting normally and with a proxy
user; Hive and HBase tokens are retrieved correctly in both cases.
Author: Marcelo Vanzin <vanzin@cloudera.com>
Closes#11358 from vanzin/SPARK-13478.
## What changes were proposed in this pull request?
Just fixed the log place introduced by #11401
## How was this patch tested?
unit tests.
Author: Shixiong Zhu <shixiong@databricks.com>
Closes#11432 from zsxwing/SPARK-13522-follow-up.
## What changes were proposed in this pull request?
Sometimes, network disconnection event won't be triggered for other potential race conditions that we may not have thought of, then the executor will keep sending heartbeats to driver and won't exit.
This PR adds a new configuration `spark.executor.heartbeat.maxFailures` to kill Executor when it's unable to heartbeat to the driver more than `spark.executor.heartbeat.maxFailures` times.
## How was this patch tested?
unit tests
Author: Shixiong Zhu <shixiong@databricks.com>
Closes#11401 from zsxwing/SPARK-13522.
## What changes were proposed in this pull request?
Now by default, it shows as ascending order of appId. We might prefer to display as descending order by default, which will show the latest application at the top.
## How was this patch tested?
Manual tested. See screenshot below:
![desc-sort](https://cloud.githubusercontent.com/assets/11683054/13307473/102f4cf8-db31-11e5-8dd5-391edbf32f0d.png)
Author: zhuol <zhuol@yahoo-inc.com>
Closes#11357 from zhuoliu/13481.
## What changes were proposed in this pull request?
When the driver removes an executor's state, the connection between the driver and the executor may be still alive so that the executor cannot exit automatically (E.g., Master will send RemoveExecutor when a work is lost but the executor is still alive), so the driver should try to tell the executor to stop itself. Otherwise, we will leak an executor.
This PR modified the driver to send `StopExecutor` to the executor when it's removed.
## How was this patch tested?
manual test: increase the worker heartbeat interval to force it's always timeout and the leak executors are gone.
Author: Shixiong Zhu <shixiong@databricks.com>
Closes#11399 from zsxwing/SPARK-13519.
## What changes were proposed in this pull request?
TaskContext supports task completion callback, which gets called regardless of task failures. However, there is no way for the listener to know if there is an error. This patch adds a new listener that gets called when a task fails.
## How was the this patch tested?
New unit test case and integration test case covering the code path
Author: Reynold Xin <rxin@databricks.com>
Closes#11340 from rxin/SPARK-13465.
## Motivation
As a pre-requisite to off-heap caching of blocks, we need a mechanism to prevent pages / blocks from being evicted while they are being read. With on-heap objects, evicting a block while it is being read merely leads to memory-accounting problems (because we assume that an evicted block is a candidate for garbage-collection, which will not be true during a read), but with off-heap memory this will lead to either data corruption or segmentation faults.
## Changes
### BlockInfoManager and reader/writer locks
This patch adds block-level read/write locks to the BlockManager. It introduces a new `BlockInfoManager` component, which is contained within the `BlockManager`, holds the `BlockInfo` objects that the `BlockManager` uses for tracking block metadata, and exposes APIs for locking blocks in either shared read or exclusive write modes.
`BlockManager`'s `get*()` and `put*()` methods now implicitly acquire the necessary locks. After a `get()` call successfully retrieves a block, that block is locked in a shared read mode. A `put()` call will block until it acquires an exclusive write lock. If the write succeeds, the write lock will be downgraded to a shared read lock before returning to the caller. This `put()` locking behavior allows us store a block and then immediately turn around and read it without having to worry about it having been evicted between the write and the read, which will allow us to significantly simplify `CacheManager` in the future (see #10748).
See `BlockInfoManagerSuite`'s test cases for a more detailed specification of the locking semantics.
### Auto-release of locks at the end of tasks
Our locking APIs support explicit release of locks (by calling `unlock()`), but it's not always possible to guarantee that locks will be released prior to the end of the task. One reason for this is our iterator interface: since our iterators don't support an explicit `close()` operator to signal that no more records will be consumed, operations like `take()` or `limit()` don't have a good means to release locks on their input iterators' blocks. Another example is broadcast variables, whose block locks can only be released at the end of the task.
To address this, `BlockInfoManager` uses a pair of maps to track the set of locks acquired by each task. Lock acquisitions automatically record the current task attempt id by obtaining it from `TaskContext`. When a task finishes, code in `Executor` calls `BlockInfoManager.unlockAllLocksForTask(taskAttemptId)` to free locks.
### Locking and the MemoryStore
In order to prevent in-memory blocks from being evicted while they are being read, the `MemoryStore`'s `evictBlocksToFreeSpace()` method acquires write locks on blocks which it is considering as candidates for eviction. These lock acquisitions are non-blocking, so a block which is being read will not be evicted. By holding write locks until the eviction is performed or skipped (in case evicting the blocks would not free enough memory), we avoid a race where a new reader starts to read a block after the block has been marked as an eviction candidate but before it has been removed.
### Locking and remote block transfer
This patch makes small changes to to block transfer and network layer code so that locks acquired by the BlockTransferService are released as soon as block transfer messages are consumed and released by Netty. This builds on top of #11193, a bug fix related to freeing of network layer ManagedBuffers.
## FAQ
- **Why not use Java's built-in [`ReadWriteLock`](https://docs.oracle.com/javase/7/docs/api/java/util/concurrent/locks/ReadWriteLock.html)?**
Our locks operate on a per-task rather than per-thread level. Under certain circumstances a task may consist of multiple threads, so using `ReadWriteLock` would mean that we might call `unlock()` from a thread which didn't hold the lock in question, an operation which has undefined semantics. If we could rely on Java 8 classes, we might be able to use [`StampedLock`](https://docs.oracle.com/javase/8/docs/api/java/util/concurrent/locks/StampedLock.html) to work around this issue.
- **Why not detect "leaked" locks in tests?**:
See above notes about `take()` and `limit`.
Author: Josh Rosen <joshrosen@databricks.com>
Closes#10705 from JoshRosen/pin-pages.
Our nightly doc snapshot builds are failing due to some issue involving the Guava Stopwatch constructor:
```
[error] /home/jenkins/workspace/spark-master-docs/spark/core/src/main/scala/org/apache/spark/scheduler/cluster/mesos/CoarseMesosSchedulerBackend.scala:496: constructor Stopwatch in class Stopwatch cannot be accessed in class CoarseMesosSchedulerBackend
[error] val stopwatch = new Stopwatch()
[error] ^
```
This Stopwatch constructor was deprecated in newer versions of Guava (fd0cbc2c5c) and it's possible that some classpath issues affecting Unidoc could be causing this to trigger compilation failures.
In order to work around this issue, this patch removes this use of Stopwatch since we don't use it anywhere else in the Spark codebase.
Author: Josh Rosen <joshrosen@databricks.com>
Closes#11376 from JoshRosen/remove-stopwatch.
When uses clicks more than one time on any stage in the DAG graph on the *Job* web UI page, many new *Stage* web UI pages are opened, but only half of their DAG graphs are expanded.
After this PR's fix, every newly opened *Stage* page's DAG graph is expanded.
Before:
![](https://cloud.githubusercontent.com/assets/15843379/13279144/74808e86-db10-11e5-8514-cecf31af8908.png)
After:
![](https://cloud.githubusercontent.com/assets/15843379/13279145/77ca5dec-db10-11e5-9457-8e1985461328.png)
## What changes were proposed in this pull request?
- Removed the `expandDagViz` parameter for _Stage_ page and related codes
- Added a `onclick` function setting `expandDagVizArrowKey(false)` as `true`
## How was this patch tested?
Manual tests (with this fix) to verified this fix work:
- clicked many times on _Job_ Page's DAG Graph → each newly opened Stage page's DAG graph is expanded
Manual tests (with this fix) to verified this fix do not break features we already had:
- refreshed many times for a same _Stage_ page (whose DAG already expanded) → DAG remained expanded upon every refresh
- refreshed many times for a same _Stage_ page (whose DAG unexpanded) → DAG remained unexpanded upon every refresh
- refreshed many times for a same _Job_ page (whose DAG already expanded) → DAG remained expanded upon every refresh
- refreshed many times for a same _Job_ page (whose DAG unexpanded) → DAG remained unexpanded upon every refresh
Author: Liwei Lin <proflin.me@gmail.com>
Closes#11368 from proflin/SPARK-13468.
Fixed the HTTP Server Host Name/IP issue i.e. HTTP Server to take the
configured host name/IP and not '0.0.0.0' always.
Author: Devaraj K <devaraj@apache.org>
Closes#11133 from devaraj-kavali/SPARK-13117.
## What changes were proposed in this pull request?
When we pass a Python function to JVM side, we also need to send its context, e.g. `envVars`, `pythonIncludes`, `pythonExec`, etc. However, it's annoying to pass around so many parameters at many places. This PR abstract python function along with its context, to simplify some pyspark code and make the logic more clear.
## How was the this patch tested?
by existing unit tests.
Author: Wenchen Fan <wenchen@databricks.com>
Closes#11342 from cloud-fan/python-clean.
Added an exception to be thrown in UnifiedMemoryManager.scala if the configuration given for executor memory is too low. Also modified the exception message thrown when driver memory is too low.
This patch was tested manually by passing in config options to Spark shell. I also added a test in UnifiedMemoryManagerSuite.scala
Author: Daniel Jalova <djalova@us.ibm.com>
Closes#11255 from djalova/SPARK-12759.
## What changes were proposed in this pull request?
Generates code for SortMergeJoin.
## How was the this patch tested?
Unit tests and manually tested with TPCDS Q72, which showed 70% performance improvements (from 42s to 25s), but micro benchmark only show minor improvements, it may depends the distribution of data and number of columns.
Author: Davies Liu <davies@databricks.com>
Closes#11248 from davies/gen_smj.
## What changes were proposed in this pull request?
History page now sorts the appID as a string, which can lead to unexpected order for the case "application_11111_9" and "application_11111_20".
Add a new sort type called appId-numeric can fix it.
## How was the this patch tested?
This patch was manually tested with UI. See the screenshot below:
![sortappidbetter](https://cloud.githubusercontent.com/assets/11683054/13185564/7f941a16-d707-11e5-8fb7-0316368d3030.png)
Author: zhuol <zhuol@yahoo-inc.com>
Closes#11259 from zhuoliu/13364.
JIRA: https://issues.apache.org/jira/browse/SPARK-13358
When trying to run a benchmark, I found that on my Ubuntu linux grep is not in /usr/bin/ but /bin/. So wondering if it is better to use which to retrieve grep path.
cc davies
Author: Liang-Chi Hsieh <viirya@gmail.com>
Closes#11231 from viirya/benchmark-grep-path.
## What changes were proposed in this pull request?
When there are some special characters (e.g., `"`, `\`) in `label`, DAG will be broken. This patch just escapes `label` to avoid DAG being broken by some special characters
## How was the this patch tested?
Jenkins tests
Author: Shixiong Zhu <shixiong@databricks.com>
Closes#11309 from zsxwing/SPARK-13298.
## What changes were proposed in this pull request?
This patch removes SparkContext.metricsSystem. SparkContext.metricsSystem returns MetricsSystem, which is a private class. I think it was added by accident.
In addition, I also removed an unused private[spark] method schedulerBackend setter.
## How was the this patch tested?
N/A.
Author: Reynold Xin <rxin@databricks.com>
This patch had conflicts when merged, resolved by
Committer: Josh Rosen <joshrosen@databricks.com>
Closes#11282 from rxin/SPARK-13413.
Currently the Mesos cluster dispatcher is not using offers from multiple roles correctly, as it simply aggregates all the offers resource values into one, but doesn't apply them correctly before calling the driver as Mesos needs the resources from the offers to be specified which role it originally belongs to. Multiple roles is already supported with fine/coarse grain scheduler, so porting that logic here to the cluster scheduler.
https://issues.apache.org/jira/browse/SPARK-10749
Author: Timothy Chen <tnachen@gmail.com>
Closes#8872 from tnachen/cluster_multi_roles.
## What changes were proposed in this pull request?
This PR tries to fix all typos in all markdown files under `docs` module,
and fixes similar typos in other comments, too.
## How was the this patch tested?
manual tests.
Author: Dongjoon Hyun <dongjoon@apache.org>
Closes#11300 from dongjoon-hyun/minor_fix_typos.
## What changes were proposed in this pull request?
This PR removes the support of SIMR, since SIMR is not actively used and maintained for a long time, also is not supported from `SparkSubmit`, so here propose to remove it.
## How was the this patch tested?
This patch is tested locally by running unit tests.
Author: jerryshao <sshao@hortonworks.com>
Closes#11296 from jerryshao/SPARK-13426.
## What changes were proposed in this pull request?
`JobWaiter.taskSucceeded` will be called for each task. When `resultHandler` throws an exception, `taskSucceeded` will also throw it for each task. DAGScheduler just catches it and reports it like this:
```Scala
try {
job.listener.taskSucceeded(rt.outputId, event.result)
} catch {
case e: Exception =>
// TODO: Perhaps we want to mark the resultStage as failed?
job.listener.jobFailed(new SparkDriverExecutionException(e))
}
```
Therefore `JobWaiter.jobFailed` may be called multiple times.
So `JobWaiter.jobFailed` should use `Promise.tryFailure` instead of `Promise.failure` because the latter one doesn't support calling multiple times.
## How was the this patch tested?
Jenkins tests.
Author: Shixiong Zhu <shixiong@databricks.com>
Closes#11280 from zsxwing/SPARK-13408.
`TaskMetrics.fromAccumulatorUpdates()` can fail if accumulators have been garbage-collected on the driver. To guard against this, this patch introduces `ListenerTaskMetrics`, a subclass of `TaskMetrics` which is used only in `TaskMetrics.fromAccumulatorUpdates()` and which eliminates the need to access the original accumulators on the driver.
Author: Josh Rosen <joshrosen@databricks.com>
Closes#11276 from JoshRosen/accum-updates-fix.
Clarify that reduce functions need to be commutative, and fold functions do not
See https://github.com/apache/spark/pull/11091
Author: Sean Owen <sowen@cloudera.com>
Closes#11217 from srowen/SPARK-13339.
## What changes were proposed in this pull request?
Fix some comparisons between unequal types that cause IJ warnings and in at least one case a likely bug (TaskSetManager)
## How was the this patch tested?
Running Jenkins tests
Author: Sean Owen <sowen@cloudera.com>
Closes#11253 from srowen/SPARK-13371.
This commit removes an unnecessary duplicate check in addPendingTask that meant
that scheduling a task set took time proportional to (# tasks)^2.
Author: Sital Kedia <skedia@fb.com>
Closes#11175 from sitalkedia/fix_stuck_driver.
See http://openjdk.java.net/jeps/223 for more information about the JDK 9 version string scheme.
Author: Claes Redestad <claes.redestad@gmail.com>
Closes#11160 from cl4es/master.
Due to being on a Windows platform I have been unable to run the tests as described in the "Contributing to Spark" instructions. As the change is only to two lines of code in the Web UI, which I have manually built and tested, I am submitting this pull request anyway. I hope this is OK.
Is it worth considering also including this fix in any future 1.5.x releases (if any)?
I confirm this is my own original work and license it to the Spark project under its open source license.
Author: markpavey <mark.pavey@thefilter.com>
Closes#11135 from markpavey/JIRA_SPARK-13142_WindowsWebUILogFix.
Overrode the start() method, which was previously starting a thread causing a race condition. I believe this should fix the flaky test.
Author: Michael Gummelt <mgummelt@mesosphere.io>
Closes#11164 from mgummelt/fix_mesos_tests.
This JIRA is related to
https://github.com/apache/spark/pull/5852
Had to do some minor rework and test to make sure it
works with current version of spark.
Author: Sanket <schintap@untilservice-lm>
Closes#10838 from redsanket/limit-outbound-connections.
When the HistoryServer is showing an incomplete app, it needs to check if there is a newer version of the app available. It does this by checking if a version of the app has been loaded with a larger *filesize*. If so, it detaches the current UI, attaches the new one, and redirects back to the same URL to show the new UI.
https://issues.apache.org/jira/browse/SPARK-7889
Author: Steve Loughran <stevel@hortonworks.com>
Author: Imran Rashid <irashid@cloudera.com>
Closes#11118 from squito/SPARK-7889-alternate.
This commit removes an unnecessary duplicate check in addPendingTask that meant
that scheduling a task set took time proportional to (# tasks)^2.
Author: Sital Kedia <skedia@fb.com>
Closes#11167 from sitalkedia/fix_stuck_driver and squashes the following commits:
3fe1af8 [Sital Kedia] [SPARK-13279] Remove unnecessary duplicate check in addPendingTask function
Made sure the old tables continue to use the old css and the new DataTables use the new css. Also fixed it so the Safari Web Inspector doesn't throw errors when on the new DataTables pages.
Author: Alex Bozarth <ajbozart@us.ibm.com>
Closes#11038 from ajbozarth/spark13124.
The "getPersistentRDDs()" is a useful API of SparkContext to get cached RDDs. However, the JavaSparkContext does not have this API.
Add a simple getPersistentRDDs() to get java.util.Map<Integer, JavaRDD> for Java users.
Author: Junyang <fly.shenjy@gmail.com>
Closes#10978 from flyjy/master.
Remove spark.closure.serializer option and use JavaSerializer always
CC andrewor14 rxin I see there's a discussion in the JIRA but just thought I'd offer this for a look at what the change would be.
Author: Sean Owen <sowen@cloudera.com>
Closes#11150 from srowen/SPARK-12414.
The right margin of the history page is little bit off. A simple fix for that issue.
Author: zhuol <zhuol@yahoo-inc.com>
Closes#11029 from zhuoliu/13126.
The column width for the new DataTables now adjusts for the current page rather than being hard-coded for the entire table's data.
Author: Alex Bozarth <ajbozart@us.ibm.com>
Closes#11057 from ajbozarth/spark13163.
This is the next iteration of tnachen's previous PR: https://github.com/apache/spark/pull/4027
In that PR, we resolved with andrewor14 and pwendell to implement the Mesos scheduler's support of `spark.executor.cores` to be consistent with YARN and Standalone. This PR implements that resolution.
This PR implements two high-level features. These two features are co-dependent, so they're implemented both here:
- Mesos support for spark.executor.cores
- Multiple executors per slave
We at Mesosphere have been working with Typesafe on a Spark/Mesos integration test suite: https://github.com/typesafehub/mesos-spark-integration-tests, which passes for this PR.
The contribution is my original work and I license the work to the project under the project's open source license.
Author: Michael Gummelt <mgummelt@mesosphere.io>
Closes#10993 from mgummelt/executor_sizing.
This PR improve the lookup of BytesToBytesMap by:
1. Generate code for calculate the hash code of grouping keys.
2. Do not use MemoryLocation, fetch the baseObject and offset for key and value directly (remove the indirection).
Author: Davies Liu <davies@databricks.com>
Closes#11010 from davies/gen_map.
Call shuffleMetrics's incRemoteBytesRead and incRemoteBlocksFetched when polling FetchResult from `results` so as to always use shuffleMetrics in one thread.
Also fix a race condition that could cause memory leak.
Author: Shixiong Zhu <shixiong@databricks.com>
Closes#11138 from zsxwing/SPARK-13245.
Adds the benchmark results as comments.
The codegen version is slower than the interpreted version for `simple` case becasue of 3 reasons:
1. codegen version use a more complex hash algorithm than interpreted version, i.e. `Murmur3_x86_32.hashInt` vs [simple multiplication and addition](https://github.com/apache/spark/blob/master/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/expressions/rows.scala#L153).
2. codegen version will write the hash value to a row first and then read it out. I tried to create a `GenerateHasher` that can generate code to return hash value directly and got about 60% speed up for the `simple` case, does it worth?
3. the row in `simple` case only has one int field, so the runtime reflection may be removed because of branch prediction, which makes the interpreted version faster.
The `array` case is also slow for similar reasons, e.g. array elements are of same type, so interpreted version can probably get rid of runtime reflection by branch prediction.
Author: Wenchen Fan <wenchen@databricks.com>
Closes#10917 from cloud-fan/hash-benchmark.
Since Spark requires at least JRE 1.7, it is safe to use built-in java.nio.Files.
Author: Jakob Odersky <jakob@odersky.com>
Closes#11098 from jodersky/SPARK-13176.
Additional changes to #10835, mainly related to style and visibility. This patch also adds back a few deprecated methods for backward compatibility.
Author: Andrew Or <andrew@databricks.com>
Closes#10958 from andrewor14/task-metrics-to-accums-followups.
There is a bug when we try to grow the buffer, OOM is ignore wrongly (the assert also skipped by JVM), then we try grow the array again, this one will trigger spilling free the current page, the current record we inserted will be invalid.
The root cause is that JVM has less free memory than MemoryManager thought, it will OOM when allocate a page without trigger spilling. We should catch the OOM, and acquire memory again to trigger spilling.
And also, we could not grow the array in `insertRecord` of `InMemorySorter` (it was there just for easy testing).
Author: Davies Liu <davies@databricks.com>
Closes#11095 from davies/fix_expand.