...and make sure that DAGScheduler data structures are cleaned up on job completion.
Initial effort and discussion at https://github.com/mesos/spark/pull/842
This also includes:
-Change `isNewYarn` to `isNewHadoop`, since the protobuf-2.5 dependency is from Hadoop-2.2 itself.
-Regexp bugix
Credits to @alig for this patch.
Re-enable zk:// urls for Mesos SparkContexts
This was broken in PR #71 when we explicitly disallow anything that didn't fit a mesos:// url.
Although it is not really clear that a zk:// url should match Mesos, it is what the docs say and it is necessary for backwards compatibility.
Additionally added a unit test for the creation of all types of TaskSchedulers. Since YARN and Mesos are not necessarily available in the system, they are allowed to pass as long as the YARN/Mesos code paths are exercised.
Scheduler quits when newStage fails
The current scheduler thread does not handle exceptions from newStage stage while launching new jobs. The thread fails on any exception that gets triggered at that level, leaving the cluster hanging with no schduler.
Bugfix: SPARK-965 & SPARK-966
SPARK-965: https://spark-project.atlassian.net/browse/SPARK-965
SPARK-966: https://spark-project.atlassian.net/browse/SPARK-966
* Add back `DAGScheduler.start()`, `eventProcessActor` is created and started here.
Notice that function is only called by `SparkContext`.
* Cancel the scheduled stage resubmission task when stopping `eventProcessActor`
* Add a new `DAGSchedulerEvent` `ResubmitFailedStages`
This event message is sent by the scheduled stage resubmission task to `eventProcessActor`. In this way, `DAGScheduler.resubmitFailedStages()` is guaranteed to be executed from the same thread that runs `DAGScheduler.processEvent()`.
Please refer to discussion in [SPARK-966](https://spark-project.atlassian.net/browse/SPARK-966) for details.
The current scheduler thread does not handle exceptions from createStage stage while launching new jobs. The thread fails on any exception that gets triggered at that level, leaving the cluster hanging with no schduler.
This was broken in PR #71 when we explicitly disallow anything that
didn't fit a mesos:// url.
Although it is not really clear that a zk:// url should match Mesos,
it is what the docs say and it is necessary for backwards compatibility.
SPARK-965: https://spark-project.atlassian.net/browse/SPARK-965
SPARK-966: https://spark-project.atlassian.net/browse/SPARK-966
* Add back DAGScheduler.start(), eventProcessActor is created and started here.
Notice that function is only called by SparkContext.
* Cancel the scheduled stage resubmission task when stopping eventProcessActor
* Add a new DAGSchedulerEvent ResubmitFailedStages
This event message is sent by the scheduled stage resubmission task to eventProcessActor. In this way, DAGScheduler.resubmitFailedStages is guaranteed to be executed from the same thread that runs DAGScheduler.processEvent.
Please refer to discussion in SPARK-966 for details.
add http timeout for httpbroadcast
While pulling task bytecode from HttpBroadcast server, there's no timeout value set. This may cause spark executor code hang and other task in the same executor process wait for the lock. I have encountered the issue in my cluster. Here's the stacktrace I captured : https://gist.github.com/haitaoyao/7655830
So add a time out value to ensure the task fail fast.
Custom Serializers for PySpark
This pull request adds support for custom serializers to PySpark. For now, all Python-transformed (or parallelize()d RDDs) are serialized with the same serializer that's specified when creating SparkContext.
For now, PySpark includes `PickleSerDe` and `MarshalSerDe` classes for using Python's `pickle` and `marshal` serializers. It's pretty easy to add support for other serializers, although I still need to add instructions on this.
A few notable changes:
- The Scala `PythonRDD` class no longer manipulates Pickled objects; data from `textFile` is written to Python as MUTF-8 strings. The Python code performs the appropriate bookkeeping to track which deserializer should be used when reading an underlying JavaRDD. This mechanism could also be used to support other data exchange formats, such as MsgPack.
- Several magic numbers were refactored into constants.
- Batching is implemented by wrapping / decorating an unbatched SerDe.
Log a warning if a task's serialized size is very big
As per Reynold's instructions, we now create a warning level log entry if a task's serialized size is too big. "Too big" is currently defined as 100kb. This warning message is generated at most once for each stage.
OpenHashSet fixes
Incorporated ideas from pull request #200.
- Use Murmur Hash 3 finalization step to scramble the bits of HashCode
instead of the simpler version in java.util.HashMap; the latter one
had trouble with ranges of consecutive integers. Murmur Hash 3 is used
by fastutil.
- Don't check keys for equality when re-inserting due to growing the
table; the keys will already be unique.
- Remember the grow threshold instead of recomputing it on each insert
Also added unit tests for size estimation for specialized hash sets and maps.
Use the proper partition index in mapPartitionsWIthIndex
mapPartitionsWithIndex uses TaskContext.partitionId as the partition index. TaskContext.partitionId used to be identical to the partition index in a RDD. However, pull request #186 introduced a scenario (with partition pruning) that the two can be different. This pull request uses the right partition index in all mapPartitionsWithIndex related calls.
Also removed the extra MapPartitionsWIthContextRDD and put all the mapPartitions related functionality in MapPartitionsRDD.