External spilling - generalize batching logic
The existing implementation consists of a hack for Kryo specifically and only works for LZF compression. Introducing an intermediate batch-level stream takes care of pre-fetching and other arbitrary behavior of higher level streams in a more general way.
Author: Andrew Or <andrewor14@gmail.com>
== Merge branch commits ==
commit 3ddeb7ef89a0af2b685fb5d071aa0f71c975cc82
Author: Andrew Or <andrewor14@gmail.com>
Date: Wed Feb 5 12:09:32 2014 -0800
Also privatize fields
commit 090544a87a0767effd0c835a53952f72fc8d24f0
Author: Andrew Or <andrewor14@gmail.com>
Date: Wed Feb 5 10:58:23 2014 -0800
Privatize methods
commit 13920c918efe22e66a1760b14beceb17a61fd8cc
Author: Andrew Or <andrewor14@gmail.com>
Date: Tue Feb 4 16:34:15 2014 -0800
Update docs
commit bd5a1d7350467ed3dc19c2de9b2c9f531f0e6aa3
Author: Andrew Or <andrewor14@gmail.com>
Date: Tue Feb 4 13:44:24 2014 -0800
Typo: phyiscal -> physical
commit 287ef44e593ad72f7434b759be3170d9ee2723d2
Author: Andrew Or <andrewor14@gmail.com>
Date: Tue Feb 4 13:38:32 2014 -0800
Avoid reading the entire batch into memory; also simplify streaming logic
Additionally, address formatting comments.
commit 3df700509955f7074821e9aab1e74cb53c58b5a5
Merge: a531d2e 164489d
Author: Andrew Or <andrewor14@gmail.com>
Date: Mon Feb 3 18:27:49 2014 -0800
Merge branch 'master' of github.com:andrewor14/incubator-spark
commit a531d2e347acdcecf2d0ab72cd4f965ab5e145d8
Author: Andrew Or <andrewor14@gmail.com>
Date: Mon Feb 3 18:18:04 2014 -0800
Relax assumptions on compressors and serializers when batching
This commit introduces an intermediate layer of an input stream on the batch level.
This guards against interference from higher level streams (i.e. compression and
deserialization streams), especially pre-fetching, without specifically targeting
particular libraries (Kryo) and forcing shuffle spill compression to use LZF.
commit 164489d6f176bdecfa9dabec2dfce5504d1ee8af
Author: Andrew Or <andrewor14@gmail.com>
Date: Mon Feb 3 18:18:04 2014 -0800
Relax assumptions on compressors and serializers when batching
This commit introduces an intermediate layer of an input stream on the batch level.
This guards against interference from higher level streams (i.e. compression and
deserialization streams), especially pre-fetching, without specifically targeting
particular libraries (Kryo) and forcing shuffle spill compression to use LZF.
Only run ResubmitFailedStages event after a fetch fails
Previously, the ResubmitFailedStages event was called every
200 milliseconds, leading to a lot of unnecessary event processing
and clogged DAGScheduler logs.
Author: Kay Ousterhout <kayousterhout@gmail.com>
== Merge branch commits ==
commit e603784b3a562980e6f1863845097effe2129d3b
Author: Kay Ousterhout <kayousterhout@gmail.com>
Date: Wed Feb 5 11:34:41 2014 -0800
Re-add check for empty set of failed stages
commit d258f0ef50caff4bbb19fb95a6b82186db1935bf
Author: Kay Ousterhout <kayousterhout@gmail.com>
Date: Wed Jan 15 23:35:41 2014 -0800
Only run ResubmitFailedStages event after a fetch fails
Previously, the ResubmitFailedStages event was called every
200 milliseconds, leading to a lot of unnecessary event processing
and clogged DAGScheduler logs.
Inform DAG scheduler about all started/finished tasks.
Previously, the DAG scheduler was not always informed
when tasks started and finished. The simplest example here
is for speculated tasks: the DAGScheduler was only told about
the first attempt of a task, meaning that SparkListeners were
also not told about multiple task attempts, so users can't see
what's going on with speculation in the UI. The DAGScheduler
also wasn't always told about finished tasks, so in the UI, some
tasks will never be shown as finished (this occurs, for example,
if a task set gets killed).
The other problem is that the fairness accounting was wrong
-- the number of running tasks in a pool was decreased when a
task set was considered done, even if all of its tasks hadn't
yet finished.
Author: Kay Ousterhout <kayousterhout@gmail.com>
== Merge branch commits ==
commit c8d547d0f7a17f5a193bef05f5872b9f475675c5
Author: Kay Ousterhout <kayousterhout@gmail.com>
Date: Wed Jan 15 16:47:33 2014 -0800
Addressed Reynold's review comments.
Always use a TaskEndReason (remove the option), and explicitly
signal when we don't know the reason. Also, always tell
DAGScheduler (and associated listeners) about started tasks, even
when they're speculated.
commit 3fee1e2e3c06b975ff7f95d595448f38cce97a04
Author: Kay Ousterhout <kayousterhout@gmail.com>
Date: Wed Jan 8 22:58:13 2014 -0800
Fixed broken test and improved logging
commit ff12fcaa2567c5d02b75a1d5db35687225bcd46f
Author: Kay Ousterhout <kayousterhout@gmail.com>
Date: Sun Dec 29 21:08:20 2013 -0800
Inform DAG scheduler about all finished tasks.
Previously, the DAG scheduler was not always informed
when tasks finished. For example, when a task set was
aborted, the DAG scheduler was never told when the tasks
in that task set finished. The DAG scheduler was also
never told about the completion of speculated tasks.
This led to confusion with SparkListeners because information
about the completion of those tasks was never passed on to
the listeners (so in the UI, for example, some tasks will never
be shown as finished).
The other problem is that the fairness accounting was wrong
-- the number of running tasks in a pool was decreased when a
task set was considered done, even if all of its tasks hadn't
yet finished.
SPARK-1056. Fix header comment in Executor to not imply that it's only u...
...sed for Mesos and Standalone.
Author: Sandy Ryza <sandy@cloudera.com>
== Merge branch commits ==
commit 1f2443d902a26365a5c23e4af9077e1539ed2eab
Author: Sandy Ryza <sandy@cloudera.com>
Date: Thu Feb 6 15:03:50 2014 -0800
SPARK-1056. Fix header comment in Executor to not imply that it's only used for Mesos and Standalone
remove actorToWorker in master.scala, which is actually not used
actorToWorker is actually not used in the code....just remove it
Author: CodingCat <zhunansjtu@gmail.com>
== Merge branch commits ==
commit 52656c2d4bbf9abcd8bef65d454badb9cb14a32c
Author: CodingCat <zhunansjtu@gmail.com>
Date: Thu Feb 6 00:28:26 2014 -0500
remove actorToWorker in master.scala, which is actually not used
Fixed warnings in test compilation.
This commit fixes two problems: a redundant import, and a
deprecated function.
Author: Kay Ousterhout <kayousterhout@gmail.com>
== Merge branch commits ==
commit da9d2e13ee4102bc58888df0559c65cb26232a82
Author: Kay Ousterhout <kayousterhout@gmail.com>
Date: Wed Feb 5 11:41:51 2014 -0800
Fixed warnings in test compilation.
This commit fixes two problems: a redundant import, and a
deprecated function.
Refactor RDD sampling and add randomSplit to RDD (update)
Replace SampledRDD by PartitionwiseSampledRDD, which accepts a RandomSampler instance as input. The current sample with/without replacement can be easily integrated via BernoulliSampler and PoissonSampler. The benefits are:
1) RDD.randomSplit is implemented in the same way, related to https://github.com/apache/incubator-spark/pull/513
2) Stratified sampling and importance sampling can be implemented in the same manner as well.
Unit tests are included for samplers and RDD.randomSplit.
This should performance better than my previous request where the BernoulliSampler creates many Iterator instances:
https://github.com/apache/incubator-spark/pull/513
Author: Xiangrui Meng <meng@databricks.com>
== Merge branch commits ==
commit e8ce957e5f0a600f2dec057924f4a2ca6adba373
Author: Xiangrui Meng <meng@databricks.com>
Date: Mon Feb 3 12:21:08 2014 -0800
more docs to PartitionwiseSampledRDD
commit fbb4586d0478ff638b24bce95f75ff06f713d43b
Author: Xiangrui Meng <meng@databricks.com>
Date: Mon Feb 3 00:44:23 2014 -0800
move XORShiftRandom to util.random and use it in BernoulliSampler
commit 987456b0ee8612fd4f73cb8c40967112dc3c4c2d
Author: Xiangrui Meng <meng@databricks.com>
Date: Sat Feb 1 11:06:59 2014 -0800
relax assertions in SortingSuite because the RangePartitioner has large variance in this case
commit 3690aae416b2dc9b2f9ba32efa465ba7948477f4
Author: Xiangrui Meng <meng@databricks.com>
Date: Sat Feb 1 09:56:28 2014 -0800
test split ratio of RDD.randomSplit
commit 8a410bc933a60c4d63852606f8bbc812e416d6ae
Author: Xiangrui Meng <meng@databricks.com>
Date: Sat Feb 1 09:25:22 2014 -0800
add a test to ensure seed distribution and minor style update
commit ce7e866f674c30ab48a9ceb09da846d5362ab4b6
Author: Xiangrui Meng <meng@databricks.com>
Date: Fri Jan 31 18:06:22 2014 -0800
minor style change
commit 750912b4d77596ed807d361347bd2b7e3b9b7a74
Author: Xiangrui Meng <meng@databricks.com>
Date: Fri Jan 31 18:04:54 2014 -0800
fix some long lines
commit c446a25c38d81db02821f7f194b0ce5ab4ed7ff5
Author: Xiangrui Meng <meng@databricks.com>
Date: Fri Jan 31 17:59:59 2014 -0800
add complement to BernoulliSampler and minor style changes
commit dbe2bc2bd888a7bdccb127ee6595840274499403
Author: Xiangrui Meng <meng@databricks.com>
Date: Fri Jan 31 17:45:08 2014 -0800
switch to partition-wise sampling for better performance
commit a1fca5232308feb369339eac67864c787455bb23
Merge: ac712e4 cf6128f
Author: Xiangrui Meng <meng@databricks.com>
Date: Fri Jan 31 16:33:09 2014 -0800
Merge branch 'sample' of github.com:mengxr/incubator-spark into sample
commit cf6128fb672e8c589615adbd3eaa3cbdb72bd461
Author: Xiangrui Meng <meng@databricks.com>
Date: Sun Jan 26 14:40:07 2014 -0800
set SampledRDD deprecated in 1.0
commit f430f847c3df91a3894687c513f23f823f77c255
Author: Xiangrui Meng <meng@databricks.com>
Date: Sun Jan 26 14:38:59 2014 -0800
update code style
commit a8b5e2021a9204e318c80a44d00c5c495f1befb6
Author: Xiangrui Meng <meng@databricks.com>
Date: Sun Jan 26 12:56:27 2014 -0800
move package random to util.random
commit ab0fa2c4965033737a9e3a9bf0a59cbb0df6a6f5
Author: Xiangrui Meng <meng@databricks.com>
Date: Sun Jan 26 12:50:35 2014 -0800
add Apache headers and update code style
commit 985609fe1a55655ad11966e05a93c18c138a403d
Author: Xiangrui Meng <meng@databricks.com>
Date: Sun Jan 26 11:49:25 2014 -0800
add new lines
commit b21bddf29850a2c006a868869b8f91960a029322
Author: Xiangrui Meng <meng@databricks.com>
Date: Sun Jan 26 11:46:35 2014 -0800
move samplers to random.IndependentRandomSampler and add tests
commit c02dacb4a941618e434cefc129c002915db08be6
Author: Xiangrui Meng <meng@databricks.com>
Date: Sat Jan 25 15:20:24 2014 -0800
add RandomSampler
commit 8ff7ba3c5cf1fc338c29ae8b5fa06c222640e89c
Author: Xiangrui Meng <meng@databricks.com>
Date: Fri Jan 24 13:23:22 2014 -0800
init impl of IndependentlySampledRDD
Remove explicit conversion to PairRDDFunctions in cogroup()
As SparkContext._ is already imported, using the implicit conversion appears to make the code much cleaner. Perhaps there was some sinister reason for doing the conversion explicitly, however.
Author: Aaron Davidson <aaron@databricks.com>
== Merge branch commits ==
commit aa4a63f1bfd5b5178fe67364dd7ce4d84c357996
Author: Aaron Davidson <aaron@databricks.com>
Date: Sun Feb 2 23:48:04 2014 -0800
Remove explicit conversion to PairRDDFunctions in cogroup()
As SparkContext._ is already imported, using the implicit conversion
appears to make the code much cleaner. Perhaps there was some sinister
reason for doing the converion explicitly, however.
Issue with failed worker registrations
I've been going through the spark source after having some odd issues with workers dying and not coming back. After some digging (I'm very new to scala and spark) I believe I've found a worker registration issue. It looks to me like a failed registration follows the same code path as a successful registration which end up with workers believing they are connected (since they received a `RegisteredWorker` event) even tho they are not registered on the Master.
This is a quick fix that I hope addresses this issue (assuming I didn't completely miss-read the code and I'm about to look like a silly person :P)
I'm opening this pr now to start a chat with you guys while I do some more testing on my side :)
Author: Erik Selin <erik.selin@jadedpixel.com>
== Merge branch commits ==
commit 973012f8a2dcf1ac1e68a69a2086a1b9a50f401b
Author: Erik Selin <erik.selin@jadedpixel.com>
Date: Tue Jan 28 23:36:12 2014 -0500
break logwarning into two lines to respect line character limit.
commit e3754dc5b94730f37e9806974340e6dd93400f85
Author: Erik Selin <erik.selin@jadedpixel.com>
Date: Tue Jan 28 21:16:21 2014 -0500
add log warning when worker registration fails due to attempt to re-register on same address.
commit 14baca241fa7823e1213cfc12a3ff2a9b865b1ed
Author: Erik Selin <erik.selin@jadedpixel.com>
Date: Wed Jan 22 21:23:26 2014 -0500
address code style comment
commit 71c0d7e6f59cd378d4e24994c21140ab893954ee
Author: Erik Selin <erik.selin@jadedpixel.com>
Date: Wed Jan 22 16:01:42 2014 -0500
Make a failed registration not persist, not send a `RegisteredWordker` event and not run `schedule` but rather send a `RegisterWorkerFailed` message to the worker attempting to register.
This fixes SPARK-1043, a bug introduced in 0.9.0
where PySpark couldn't serialize strings > 64kB.
This fix was written by @tyro89 and @bouk in #512.
This commit squashes and rebases their pull request
in order to fix some merge conflicts.
Allow files added through SparkContext.addFile() to be overwritten
This is useful for the cases when a file needs to be refreshed and downloaded by the executors periodically. For example, a possible use case is: the driver periodically renews a Hadoop delegation token and writes it to a token file. The token file needs to be downloaded by the executors whenever it gets renewed. However, the current implementation throws an exception when the target file exists and its contents do not match those of the new source. This PR adds an option to allow files to be overwritten to support use cases similar to the above.
Replace the check for None Option with isDefined and isEmpty in Scala code
Propose to replace the Scala check for Option "!= None" with Option.isDefined and "=== None" with Option.isEmpty.
I think this, using method call if possible then operator function plus argument, will make the Scala code easier to read and understand.
Pass compile and tests.
Fix PySpark hang when input files are deleted (SPARK-1025)
This pull request addresses [SPARK-1025](https://spark-project.atlassian.net/browse/SPARK-1025), an issue where PySpark could hang if its input files were deleted.
This fixes an issue where collectAsMap() could
fail when called on a JavaPairRDD that was derived
by transforming a non-JavaPairRDD.
The root problem was that we were creating the
JavaPairRDD's ClassTag by casting a
ClassTag[AnyRef] to a ClassTag[Tuple2[K2, V2]].
To fix this, I cast a ClassTag[Tuple2[_, _]]
instead, since this actually produces a ClassTag
of the appropriate type because ClassTags don't
capture type parameters:
scala> implicitly[ClassTag[Tuple2[_, _]]] == implicitly[ClassTag[Tuple2[Int, Int]]]
res8: Boolean = true
scala> implicitly[ClassTag[AnyRef]].asInstanceOf[ClassTag[Tuple2[Int, Int]]] == implicitly[ClassTag[Tuple2[Int, Int]]]
res9: Boolean = false
Remove Hadoop object cloning and warn users making Hadoop RDD's.
The code introduced in #359 used Hadoop's WritableUtils.clone() to
duplicate objects when reading from Hadoop files. Some users have
reported exceptions when cloning data in various file formats,
including Avro and another custom format.
This patch removes that functionality to ensure stability for the
0.9 release. Instead, it puts a clear warning in the documentation
that copying may be necessary for Hadoop data sets.
Fix two bugs in PySpark cartesian(): SPARK-978 and SPARK-1034
This pull request fixes two bugs in PySpark's `cartesian()` method:
- [SPARK-978](https://spark-project.atlassian.net/browse/SPARK-978): PySpark's cartesian method throws ClassCastException exception
- [SPARK-1034](https://spark-project.atlassian.net/browse/SPARK-1034): Py4JException on PySpark Cartesian Result
The JIRAs have more details describing the fixes.
The code introduced in #359 used Hadoop's WritableUtils.clone() to
duplicate objects when reading from Hadoop files. Some users have
reported exceptions when cloning data in verious file formats,
including Avro and another custom format.
This patch removes that functionality to ensure stability for the
0.9 release. Instead, it puts a clear warning in the documentation
that copying may be necessary for Hadoop data sets.
Fix bug in worker clean-up in UI
Introduced in d5a96fec (/cc @aarondav).
This should be picked into 0.8 and 0.9 as well. The bug causes old (zombie) workers on a node to not disappear immediately from the UI when a new one registers.
fix for SPARK-1027
fix for SPARK-1027 (https://spark-project.atlassian.net/browse/SPARK-1027)
FIXES
1. change sparkhome from String to Option(String) in ApplicationDesc
2. remove sparkhome parameter in LaunchExecutor message
3. adjust involved files
This bug leads to a small performance hit because task
set managers will get offered each rejected resource
offer twice, but doesn't lead to any incorrect functionality.
Minor api usability changes
- Expose checkpoint directory - since it is autogenerated now
- null check for jars
- Expose SparkHadoopUtil : so that configuration creation is abstracted even from user code to avoid duplication of functionality already in spark.
Remove Typesafe Config usage and conf files to fix nested property names
With Typesafe Config we had the subtle problem of no longer allowing
nested property names, which are used for a few of our properties:
http://apache-spark-developers-list.1001551.n3.nabble.com/Config-properties-broken-in-master-td208.html
This PR is for branch 0.9 but should be added into master too.
(cherry picked from commit 34e911ce9a)
Signed-off-by: Patrick Wendell <pwendell@gmail.com>
This is useful for the cases when a file needs to be refreshed and downloaded
by the executors periodically.
Signed-off-by: Yinan Li <liyinan926@gmail.com>
Fail rather than hanging if a task crashes the JVM.
Prior to this commit, if a task crashes the JVM, the task (and
all other tasks running on that executor) is marked at KILLED rather
than FAILED. As a result, the TaskSetManager will retry the task
indefinitely rather than failing the job after maxFailures. Eventually,
this makes the job hang, because the Standalone Scheduler removes
the application after 10 works have failed, and then the app is left
in a state where it's disconnected from the master and waiting to reconnect.
This commit fixes that problem by marking tasks as FAILED rather than
killed when an executor is lost.
The downside of this commit is that if task A fails because another
task running on the same executor caused the VM to crash, the failure
will incorrectly be counted as a failure of task A. This should not
be an issue because we typically set maxFailures to 3, and it is
unlikely that a task will be co-located with a JVM-crashing task
multiple times.
Prior to this commit, if a task crashes the JVM, the task (and
all other tasks running on that executor) is marked at KILLED rather
than FAILED. As a result, the TaskSetManager will retry the task
indefiniteily rather than failing the job after maxFailures. This
commit fixes that problem by marking tasks as FAILED rather than
killed when an executor is lost.
The downside of this commit is that if task A fails because another
task running on the same executor caused the VM to crash, the failure
will incorrectly be counted as a failure of task A. This should not
be an issue because we typically set maxFailures to 3, and it is
unlikely that a task will be co-located with a JVM-crashing task
multiple times.
Workers should use working directory as spark home if it's not specified
If users don't set SPARK_HOME in their environment file when launching an application, the standalone cluster should default to the spark home of the worker.
Note that previously Broadcast class was accidentally marked as private[spark]. It needs to be public
for broadcast variables to work. Also exposing the broadcast varaible id.
GraphX: Unifying Graphs and Tables
GraphX extends Spark's distributed fault-tolerant collections API and interactive console with a new graph API which leverages recent advances in graph systems (e.g., [GraphLab](http://graphlab.org)) to enable users to easily and interactively build, transform, and reason about graph structured data at scale. See http://amplab.github.io/graphx/.
Thanks to @jegonzal, @rxin, @ankurdave, @dcrankshaw, @jianpingjwang, @amatsukawa, @kellrott, and @adamnovak.
Tasks left:
- [x] Graph-level uncache
- [x] Uncache previous iterations in Pregel
- [x] ~~Uncache previous iterations in GraphLab~~ (postponed to post-release)
- [x] - Describe GC issue with GraphLab
- [ ] Write `docs/graphx-programming-guide.md`
- [x] - Mention future Bagel support in docs
- [ ] - Section on caching/uncaching in docs: As with Spark, cache something that is used more than once. In an iterative algorithm, try to cache and force (i.e., materialize) something every iteration, then uncache the cached things that depended on the newly materialized RDD but that won't be referenced again.
- [x] Undo modifications to core collections and instead copy them to org.apache.spark.graphx
- [x] Make Graph serializable to work around capture in Spark shell
- [x] Rename graph -> graphx in package name and subproject
- [x] Remove standalone PageRank
- [x] ~~Fix amplab/graphx#52 by checking `iter.hasNext`~~
Improvements to external sorting
1. Adds the option of compressing outputs.
2. Adds batching to the serialization to prevent OOM on the read side.
3. Slight renaming of config options.
4. Use Spark's buffer size for reads in addition to writes.
Automatically unpersisting RDDs that have been cleaned up from DStreams
Earlier RDDs generated by DStreams were forgotten but not unpersisted. The system relied on the natural BlockManager LRU to drop the data. The cleaner.ttl was a hammer to clean up RDDs but it is something that needs to be set separately and need to be set very conservatively (at best, few minutes). This automatic unpersisting allows the system to handle this automatically, which reduces memory usage. As a side effect it will also improve GC performance as there are less number of objects stored in memory. In fact, for some workloads, it may allow RDDs to be cached as deserialized, which speeds up processing without too much GC overheads.
This is disabled by default. To enable it set configuration spark.streaming.unpersist to true. In future release, this will be set to true by default.
Also, reduced sleep time in TaskSchedulerImpl.stop() from 5 second to 1 second. From my conversation with Matei, there does not seem to be any good reason for the sleep for letting messages be sent out be so long.
1. Adds the option of compressing outputs.
2. Adds batching to the serialization to prevent OOM on the read side.
3. Slight renaming of config options.
4. Use Spark's buffer size for reads in addition to writes.
Remove now un-needed hostPort option
I noticed this was logging some scary error messages in various places. After I looked into it, this is no longer really used. I removed the option and re-wrote the one remaining use case (it was unnecessary there anyways).
Disable shuffle file consolidation by default
After running various performance tests for the 0.9 release, this still seems to have performance issues even on XFS. So let's keep this off-by-default for 0.9 and users can experiment with it depending on their disk configurations.
Remove simple redundant return statements for Scala methods/functions
Remove simple redundant return statements for Scala methods/functions:
-) Only change simple return statements at the end of method
-) Ignore the complex if-else check
-) Ignore the ones inside synchronized
-) Add small changes to making var to val if possible and remove () for simple get
This hopefully makes the review simpler =)
Pass compile and tests.
Setting load defaults to true in executor
This preserves the behavior in earlier releases. If properties are set for the executors via `spark-env.sh` on the slaves, then they should take precedence over spark defaults. This is useful for if system administrators are setting properties for a standalone cluster, such as shuffle locations.
/cc @andrewor14 who initially reported this issue.
Stop SparkListenerBus daemon thread when DAGScheduler is stopped.
Otherwise this leads to hundreds of SparkListenerBus daemon threads in our unit tests (and also problematic if user applications launches multiple SparkContext).
We clone hadoop key and values by default and reuse objects if asked to.
We try to clone for most common types of writables and we call WritableUtils.clone otherwise intention is to optimize, for example for NullWritable there is no need and for Long, int and String creating a new object with value set would be faster than doing copy on object hopefully.
There is another way to do this PR where we ask for both key and values whether to clone them or not, but could not think of a use case for it except either of them is actually a NullWritable for which I have already worked around. So thought that would be unnecessary.
API for automatic driver recovery for streaming programs and other bug fixes
1. Added Scala and Java API for automatically loading checkpoint if it exists in the provided checkpoint directory.
Scala API: `StreamingContext.getOrCreate(<checkpoint dir>, <function to create new StreamingContext>)` returns a StreamingContext
Java API: `JavaStreamingContext.getOrCreate(<checkpoint dir>, <factory obj of type JavaStreamingContextFactory>)`, return a JavaStreamingContext
See the RecoverableNetworkWordCount below as an example of how to use it.
2. Refactored streaming.Checkpoint*** code to fix bugs and make the DStream metadata checkpoint writing and reading more robust. Specifically, it fixes and improves the logic behind backing up and writing metadata checkpoint files. Also, it ensure that spark.driver.* and spark.hostPort is cleared from SparkConf before being written to checkpoint.
3. Fixed bug in cleaning up of checkpointed RDDs created by DStream. Specifically, this fix ensures that checkpointed RDD's files are not prematurely cleaned up, thus ensuring reliable recovery.
4. TimeStampedHashMap is upgraded to optionally update the timestamp on map.get(key). This allows clearing of data based on access time (i.e., clear records were last accessed before a threshold timestamp).
5. Added caching for file modification time in FileInputDStream using the updated TimeStampedHashMap. Without the caching, enumerating the mod times to find new files can take seconds if there are 1000s of files. This cache is automatically cleared.
This PR is not entirely final as I may make some minor additions - a Java examples, and adding StreamingContext.getOrCreate to unit test.
Edit: Java example to be added later, unit test added.
External Sorting for Aggregator and CoGroupedRDDs (Revisited)
(This pull request is re-opened from https://github.com/apache/incubator-spark/pull/303, which was closed because Jenkins / github was misbehaving)
The target issue for this patch is the out-of-memory exceptions triggered by aggregate operations such as reduce, groupBy, join, and cogroup. The existing AppendOnlyMap used by these operations resides purely in memory, and grows with the size of the input data until the amount of allocated memory is exceeded. Under large workloads, this problem is aggravated by the fact that OOM frequently occurs only after a very long (> 1 hour) map phase, in which case the entire job must be restarted.
The solution is to spill the contents of this map to disk once a certain memory threshold is exceeded. This functionality is provided by ExternalAppendOnlyMap, which additionally sorts this buffer before writing it out to disk, and later merges these buffers back in sorted order.
Under normal circumstances in which OOM is not triggered, ExternalAppendOnlyMap is simply a wrapper around AppendOnlyMap and incurs little overhead. Only when the memory usage is expected to exceed the given threshold does ExternalAppendOnlyMap spill to disk.
Aside from trivial formatting changes, use nulls instead of Options for
DiskMapIterator, and add documentation for spark.shuffle.externalSorting
and spark.shuffle.memoryFraction.
Also, set spark.shuffle.memoryFraction to 0.3, and spark.storage.memoryFraction = 0.6.
Yarn client addjar and misc fixes
Fix the addJar functionality in yarn-client mode, add support for the other options supported in yarn-standalone mode, set the application type on yarn in hadoop 2.X, add documentation, change heartbeat interval to be same code as the yarn-standalone so it doesn't take so long to get containers and exit.
Make DEBUG-level logs consummable.
Removes two things that caused issues with the debug logs:
(a) Internal polling in the DAGScheduler was polluting the logs.
(b) The Scala REPL logs were really noisy.
Removes two things that caused issues with the debug logs:
(a) Internal polling in the DAGScheduler was polluting the logs.
(b) The Scala REPL logs were really noisy.
Fix bug added when we changed AppDescription.maxCores to an Option
The Scala compiler warned about this -- we were comparing an Option against an integer now.
This is an alternative to the existing approach, which evenly distributes the
collective shuffle memory among all running tasks. In the new approach, each
thread requests a chunk of memory whenever its map is about to multiplicatively
grow. If there is sufficient memory in the global pool, the thread allocates it
and grows its map. Otherwise, it spills.
A danger with the previous approach is that a new task may quickly fill up its
map before old tasks finish spilling, potentially causing an OOM. This approach
prevents this scenario as it favors existing tasks over new tasks; any thread
that may step over the boundary of other threads defensively backs off and
starts spilling.
Testing through spark-perf reveals: (1) When no spills have occured, the
performance of external sorting using this memory management approach is
essentially the same as without external sorting. (2) When one or more spills
have occured, the performance of external sorting is a small multiple (3x) worse
Add some missing Java API methods
These are primarily for setting job groups, canceling jobs, and setting names on RDDs. Seemed like useful stuff to expose in Java.
Bug fixes for updating the RDD block's memory and disk usage information
Bug fixes for updating the RDD block's memory and disk usage information.
From the code context, we can find that the memSize and diskSize here are both always equal to the size of the block. Actually, they never be zero. Thus, the logic here is wrong for recording the block usage in BlockStatus, especially for the blocks which are dropped from memory to ensure space for the new input rdd blocks. I have tested it that this would cause the storage metrics shown in the Storage webpage wrong and misleading. With this patch, the metrics will be okay.
Finally, Merry Christmas, guys:)
SPARK-998: Support Launching Driver Inside of Standalone Mode
[NOTE: I need to bring the tests up to date with new changes, so for now they will fail]
This patch provides support for launching driver programs inside of a standalone cluster manager. It also supports monitoring and re-launching of driver programs which is useful for long running, recoverable applications such as Spark Streaming jobs. For those jobs, this patch allows a deployment mode which is resilient to the failure of any worker node, failure of a master node (provided a multi-master setup), and even failures of the applicaiton itself, provided they are recoverable on a restart. Driver information, such as the status and logs from a driver, is displayed in the UI
There are a few small TODO's here, but the code is generally feature-complete. They are:
- Bring tests up to date and add test coverage
- Restarting on failure should be optional and maybe off by default.
- See if we can re-use akka connections to facilitate clients behind a firewall
A sensible place to start for review would be to look at the `DriverClient` class which presents users the ability to launch their driver program. I've also added an example program (`DriverSubmissionTest`) that allows you to test this locally and play around with killing workers, etc. Most of the code is devoted to persisting driver state in the cluster manger, exposing it in the UI, and dealing correctly with various types of failures.
Instructions to test locally:
- `sbt/sbt assembly/assembly examples/assembly`
- start a local version of the standalone cluster manager
```
./spark-class org.apache.spark.deploy.client.DriverClient \
-j -Dspark.test.property=something \
-e SPARK_TEST_KEY=SOMEVALUE \
launch spark://10.99.1.14:7077 \
../path-to-examples-assembly-jar \
org.apache.spark.examples.DriverSubmissionTest 1000 some extra options --some-option-here -X 13
```
- Go in the UI and make sure it started correctly, look at the output etc
- Kill workers, the driver program, masters, etc.
Minor style cleanup. Mostly on indenting & line width changes.
Focused on the few important files since they are the files that new contributors usually read first.
Set boolean param name for call to SparkHadoopMapReduceUtil.newTaskAttemptID
Set boolean param name for call to SparkHadoopMapReduceUtil.newTaskAttemptID to make it clear which param being set.
Remove calls to deprecated mapred's OutputCommitter.cleanupJob
Since Hadoop 1.0.4 the mapred OutputCommitter.commitJob should do cleanup job via call to OutputCommitter.cleanupJob,
Remove SparkHadoopWriter.cleanup since it is used only by PairRDDFunctions.
In fact the implementation of mapred OutputCommitter.commitJob looks like this:
public void commitJob(JobContext jobContext) throws IOException {
cleanupJob(jobContext);
}
the mapred OutputCommitter.commitJob should do cleanup job.
In fact the implementation of mapred OutputCommitter.commitJob looks like this:
public void commitJob(JobContext jobContext) throws IOException {
cleanupJob(jobContext);
}
(The jobContext input argument is type of org.apache.hadoop.mapred.JobContext)
Get rid of `Either[ActorRef, ActorSelection]'
In this pull request, instead of returning an `Either[ActorRef, ActorSelection]`, `registerOrLookup` identifies the remote actor blockingly to obtain an `ActorRef`, or throws an exception if the remote actor doesn't exist or the lookup times out (configured by `spark.akka.lookupTimeout`). This function is only called when an `SparkEnv` is constructed (instantiating driver or executor), so the blocking call is considered acceptable. Executor side `ActorSelection`s/`ActorRef`s to driver side `MapOutputTrackerMasterActor` and `BlockManagerMasterActor` are affected by this pull request.
`ActorSelection` is dangerous and should be used with care. It's only absolutely safe to send messages via an `ActorSelection` when the remote actor is stateless, so that actor incarnation is irrelevant. But as pointed by @ScrapCodes in the comments below, executor exits immediately once the connection to the driver lost, `ActorSelection`s are not harmful in this scenario. So this pull request is mostly a code style patch.
Add way to limit default # of cores used by apps in standalone mode
Also documents the spark.deploy.spreadOut option, and fixes a config option that had a dash in its name.
Don't leave os.arch unset after BlockManagerSuite
Recent SparkConf changes meant that BlockManagerSuite was now leaving the os.arch System.property unset. That's a problem for any subsequent tests that rely upon having a valid os.arch. This is true for CompressionCodecSuite in the usual maven build test order, even though it isn't usually true for the sbt build.
To make this work I had to rename the defaults file. Otherwise
maven's pattern matching rules included it when trying to match
other log4j.properties files.
I also fixed a bug in the existing maven build where two
<transformers> tags were present in assembly/pom.xml
such that one overwrote the other.
Suggested small changes to Java code for slightly more standard style, encapsulation and in some cases performance
Sorry if this is too abrupt or not a welcome set of changes, but thought I'd see if I could contribute a little. I'm a Java developer and just getting seriously into Spark. So I thought I'd suggest a number of small changes to the couple Java parts of the code to make it a little tighter, more standard and even a bit faster.
Feel free to take all, some or none of this. Happy to explain any of it.
```
[error] /pod/home/anovak/build/graphx/core/src/main/scala/org/apache/spark/storage/ShuffleBlockManager.scala:172: not enough arguments for constructor PrimitiveKeyOpenHashMap: (initialCapacity: Int)(implicit evidence$3: ClassManifest[Int], implicit evidence$4: ClassManifest[Int])org.apache.spark.util.collection.PrimitiveKeyOpenHashMap[Int,Int]
[error] private val mapIdToIndex = new PrimitiveKeyOpenHashMap[Int, Int]()
[error] ^
[info] No documentation generated with unsucessful compiler run
[error] one error found
[error] (core/compile:doc) Scaladoc generation failed
[error] Total time: 67 s, completed Jan 6, 2014 2:20:51 PM
```
In theory a no-argument constructor ought not to differ from one with a single argument that has a default value, but in practice there seems to be an issue.
Fix handling of empty SPARK_EXAMPLES_JAR
Currently if SPARK_EXAMPLES_JAR is left unset you get a null pointer exception when running the examples (atleast on spark on yarn). The null now gets turned into a string of "null" when its put into the SparkConf so addJar no longer properly ignores it. This fixes that so that it can be left unset.
Quiet ERROR-level Akka Logs
This fixes an issue I've seen where akka logs a bunch of things at ERROR level when connecting to a standalone cluster, even in the normal case. I noticed that even when lifecycle logging was disabled, the netty code inside of akka still logged away via akka's EndpointWriter class. There are also some other log streams that I think are new in akka 2.2.1 that I've disabled.
Finally, I added some better logging to the standalone client. This makes it more clear when a connection failure occurs what is going on. Previously it never explicitly said if a connection attempt had failed.
The commit messages here have some more detail.
Removing SPARK_EXAMPLES_JAR in the code
This re-writes all of the examples to use the `SparkContext.jarOfClass` mechanism for loading the examples jar. This necessary for environments like YARN and the Standalone mode where example programs will be submit from inside the cluster rather than at the client using `./spark-example`.
This still leaves SPARK_EXAMPLES_JAR in place in the shell scripts for setting up the classpath if `./spark-example` is run.
Although we can send messages via an ActorSelection, it would be better to identify the actor and obtain an ActorRef first, so that we can get informed earlier if the remote actor doesn't exist, and get rid of the annoying Either wrapper.
Without these it's a bit less clear what's going on for the user.
One thing I realize when doing this is that akka itself actually retries
the initial association. So the retry we currently have is redundant with
akka's.
I noticed when connecting to a standalone cluster Spark gives a bunch
of Akka ERROR logs that make it seem like something is failing.
This patch does two things:
1. Akka dead letter logging is turned on/off according to the existing
lifecycle spark property.
2. We explicitly silence akka's EndpointWriter log in log4j. This is necessary
because for some reason that log doesn't pick up on the lifecycle
logging settings. After a few hours of debugging this was the only solution
I found that worked.
Further, divide this threshold by the number of tasks running concurrently.
Note that this does not guard against the following scenario: a new task
quickly fills up its share of the memory before old tasks finish spilling
their contents, in which case the total memory used by such maps may exceed
what was specified. Currently, spark.shuffle.safetyFraction mitigates the
effect of this.
Remove erroneous FAILED state for killed tasks.
Currently, when tasks are killed, the Executor first sends a
status update for the task with a "KILLED" state, and then
sends a second status update with a "FAILED" state saying that
the task failed due to an exception. The second FAILED state is
misleading/unncessary, and occurs due to a NonLocalReturnControl
Exception that gets thrown due to the way we kill tasks. This
commit eliminates that problem.
I'm not at all sure that this is the best way to fix this problem,
so alternate suggestions welcome. @rxin guessing you're the right
person to look at this.
Improvements to DStream window ops and refactoring of Spark's CheckpointSuite
- Added a new RDD - PartitionerAwareUnionRDD. Using this RDD, one can take multiple RDDs partitioned by the same partitioner and unify them into a single RDD while preserving the partitioner. So m RDDs with p partitions each will be unified to a single RDD with p partitions and the same partitioner. The preferred location for each partition of the unified RDD will be the most common preferred location of the corresponding partitions of the parent RDDs. For example, location of partition 0 of the unified RDD will be where most of partition 0 of the parent RDDs are located.
- Improved the performance of DStream's reduceByKeyAndWindow and groupByKeyAndWindow. Both these operations work by doing per-batch reduceByKey/groupByKey and then using PartitionerAwareUnionRDD to union the RDDs across the window. This eliminates a shuffle related to the window operation, which can reduce batch processing time by 30-40% for simple workloads.
- Fixed bugs and simplified Spark's CheckpointSuite. Some of the tests were incorrect and unreliable. Added missing tests for ZippedRDD. I can go into greater detail if necessary.
- Added mapSideCombine option to combineByKeyAndWindow.
SPARK-991: Report information gleaned from a Python stacktrace in the UI
Scala:
- Added setCallSite/clearCallSite to SparkContext and JavaSparkContext.
These functions mutate a LocalProperty called "externalCallSite."
- Add a wrapper, getCallSite, that checks for an externalCallSite and, if
none is found, calls the usual Utils.formatSparkCallSite.
- Change everything that calls Utils.formatSparkCallSite to call
getCallSite instead. Except getCallSite.
- Add wrappers to setCallSite/clearCallSite wrappers to JavaSparkContext.
Python:
- Add a gruesome hack to rdd.py that inspects the traceback and guesses
what you want to see in the UI.
- Add a RAII wrapper around said gruesome hack that calls
setCallSite/clearCallSite as appropriate.
- Wire said RAII wrapper up around three calls into the Scala code.
I'm not sure that I hit all the spots with the RAII wrapper. I'm also
not sure that my gruesome hack does exactly what we want.
One could also approach this change by refactoring
runJob/submitJob/runApproximateJob to take a call site, then threading
that parameter through everything that needs to know it.
One might object to the pointless-looking wrappers in JavaSparkContext.
Unfortunately, I can't directly access the SparkContext from
Python---or, if I can, I don't know how---so I need to wrap everything
that matters in JavaSparkContext.
Conflicts:
core/src/main/scala/org/apache/spark/api/java/JavaSparkContext.scala
Currently, when tasks are killed, the Executor first sends a
status update for the task with a "KILLED" state, and then
sends a second status update with a "FAILED" state saying that
the task failed due to an exception. The second FAILED state is
misleading/unncessary, and occurs due to a NonLocalReturnControl
Exception that gets thrown due to the way we kill tasks. This
commit eliminates that problem.
Also replaced SparkConf.getOrElse with just a "get" that takes a default
value, and added getInt, getLong, etc to make code that uses this
simpler later on.
Approximate distinct count
Added countApproxDistinct() to RDD and countApproxDistinctByKey() to PairRDDFunctions to approximately count distinct number of elements and distinct number of values per key, respectively. Both functions use HyperLogLog from stream-lib for counting. Both functions take a parameter that controls the trade-off between accuracy and memory consumption. Also added Scala docs and test suites for both methods.
Bug fixes for file input stream and checkpointing
- Fixed bugs in the file input stream that led the stream to fail due to transient HDFS errors (listing files when a background thread it deleting fails caused errors, etc.)
- Updated Spark's CheckpointRDD and Streaming's CheckpointWriter to use SparkContext.hadoopConfiguration, to allow checkpoints to be written to any HDFS compatible store requiring special configuration.
- Changed the API of SparkContext.setCheckpointDir() - eliminated the unnecessary 'useExisting' parameter. Now SparkContext will always create a unique subdirectory within the user specified checkpoint directory. This is to ensure that previous checkpoint files are not accidentally overwritten.
- Fixed bug where setting checkpoint directory as a relative local path caused the checkpointing to fail.
This gives us a couple advantages:
- Uses spark.local.dir and randomly selects a directory/disk.
- Ensure files are deleted on normal DiskBlockManager cleanup.
- Availability of same stats as usual DiskBlockObjectWriter (currenty unused).
Also enable basic cleanup when iterator is fully drained.
Still requires cleanup for operations that fail or don't go through all elements.
Changed naming of StageCompleted event to be consistent
The rest of the SparkListener events are named with "SparkListener"
as the prefix of the name; this commit renames the StageCompleted
event to SparkListenerStageCompleted for consistency.
1. Adds a default log4j file that gets loaded if users haven't specified a log4j file.
2. Isolates use of the tools assembly jar. I found this produced SLF4J warnings
after building with SBT (and I've seen similar warnings on the mailing list).
- Got rid of global SparkContext.globalConf
- Pass SparkConf to serializers and compression codecs
- Made SparkConf public instead of private[spark]
- Improved API of SparkContext and SparkConf
- Switched executor environment vars to be passed through SparkConf
- Fixed some places that were still using system properties
- Fixed some tests, though others are still failing
This still fails several tests in core, repl and streaming, likely due
to properties not being set or cleared correctly (some of the tests run
fine in isolation).
Removed unused OtherFailure TaskEndReason.
The OtherFailure TaskEndReason was added by @mateiz 3 years ago in this commit: 24a1e7f838
Unless I am missing something, it doesn't seem to have been used then, and is not used now, so seems safe for deletion.
The rest of the SparkListener events are named with "SparkListener"
as the prefix of the name; this commit renames the StageCompleted
event to SparkListenerStageCompleted for consistency.
Deduplicate Local and Cluster schedulers.
The code in LocalScheduler/LocalTaskSetManager was nearly identical
to the code in ClusterScheduler/ClusterTaskSetManager. The redundancy
made making updating the schedulers unnecessarily painful and error-
prone. This commit combines the two into a single TaskScheduler/
TaskSetManager.
Unfortunately the diff makes this change look much more invasive than it is -- TaskScheduler.scala is only superficially changed (names updated, overrides removed) from the old ClusterScheduler.scala, and the same with
TaskSetManager.scala.
Thanks @rxin for suggesting this change!
Clean up shuffle files once their metadata is gone
Previously, we would only clean the in-memory metadata for consolidated shuffle files.
Additionally, fixes a bug where the Metadata Cleaner was ignoring type-specific TTLs.
Refactored the streaming scheduler and added StreamingListener interface
- Refactored the streaming scheduler for cleaner code. Specifically, the JobManager was renamed to JobScheduler, as it does the actual scheduling of Spark jobs to the SparkContext. The earlier Scheduler was renamed to JobGenerator, as it actually generates the jobs from the DStreams. The JobScheduler starts the JobGenerator. Also, moved all the scheduler related code from spark.streaming to spark.streaming.scheduler package.
- Implemented the StreamingListener interface, similar to SparkListener. The streaming version of StatusReportListener prints the batch processing time statistics (for now). Added StreamingListernerSuite to test it.
- Refactored streaming TestSuiteBase for deduping code in the other streaming testsuites.
Track and report task result serialisation time.
- DirectTaskResult now has a ByteBuffer valueBytes instead of a T value.
- DirectTaskResult now has a member function T value() that deserialises valueBytes.
- Executor serialises value into a ByteBuffer and passes it to DTR's ctor.
- Executor tracks the time taken to do so and puts it in a new field in TaskMetrics.
- StagePage now reports serialisation time from TaskMetrics along with the other things it reported.
Previously, we would only clean the in-memory metadata for consolidated
shuffle files.
Additionally, fixes a bug where the Metadata Cleaner was ignoring type-
specific TTLs.
Add collectPartition to JavaRDD interface.
This interface is useful for implementing `take` from other language frontends where the data is serialized. Also remove `takePartition` from PythonRDD and use `collectPartition` in rdd.py.
Thanks @concretevitamin for the original change and tests.
Change the implementation to use runJob instead of PartitionPruningRDD.
Also update the unit tests and the python take implementation
to use the new interface.
despite having a low number of nodes and relatively small workload (16 nodes, <1.5 TB data).
This would cause an entire job to fail at the beginning of the reduce phase.
There is no particular reason for this value to be small as a timeout should only occur
in an exceptional situation.
Also centralized the reading of spark.akka.askTimeout to AkkaUtils (surely this can later
be cleaned up to use Typesafe).
Finally, deleted some lurking implicits. If anyone can think of a reason they should still
be there, please let me know.