...and make sure that DAGScheduler data structures are cleaned up on job completion.
Initial effort and discussion at https://github.com/mesos/spark/pull/842
Re-enable zk:// urls for Mesos SparkContexts
This was broken in PR #71 when we explicitly disallow anything that didn't fit a mesos:// url.
Although it is not really clear that a zk:// url should match Mesos, it is what the docs say and it is necessary for backwards compatibility.
Additionally added a unit test for the creation of all types of TaskSchedulers. Since YARN and Mesos are not necessarily available in the system, they are allowed to pass as long as the YARN/Mesos code paths are exercised.
Scheduler quits when newStage fails
The current scheduler thread does not handle exceptions from newStage stage while launching new jobs. The thread fails on any exception that gets triggered at that level, leaving the cluster hanging with no schduler.
The current scheduler thread does not handle exceptions from createStage stage while launching new jobs. The thread fails on any exception that gets triggered at that level, leaving the cluster hanging with no schduler.
This was broken in PR #71 when we explicitly disallow anything that
didn't fit a mesos:// url.
Although it is not really clear that a zk:// url should match Mesos,
it is what the docs say and it is necessary for backwards compatibility.
SPARK-965: https://spark-project.atlassian.net/browse/SPARK-965
SPARK-966: https://spark-project.atlassian.net/browse/SPARK-966
* Add back DAGScheduler.start(), eventProcessActor is created and started here.
Notice that function is only called by SparkContext.
* Cancel the scheduled stage resubmission task when stopping eventProcessActor
* Add a new DAGSchedulerEvent ResubmitFailedStages
This event message is sent by the scheduled stage resubmission task to eventProcessActor. In this way, DAGScheduler.resubmitFailedStages is guaranteed to be executed from the same thread that runs DAGScheduler.processEvent.
Please refer to discussion in SPARK-966 for details.
add http timeout for httpbroadcast
While pulling task bytecode from HttpBroadcast server, there's no timeout value set. This may cause spark executor code hang and other task in the same executor process wait for the lock. I have encountered the issue in my cluster. Here's the stacktrace I captured : https://gist.github.com/haitaoyao/7655830
So add a time out value to ensure the task fail fast.
Custom Serializers for PySpark
This pull request adds support for custom serializers to PySpark. For now, all Python-transformed (or parallelize()d RDDs) are serialized with the same serializer that's specified when creating SparkContext.
For now, PySpark includes `PickleSerDe` and `MarshalSerDe` classes for using Python's `pickle` and `marshal` serializers. It's pretty easy to add support for other serializers, although I still need to add instructions on this.
A few notable changes:
- The Scala `PythonRDD` class no longer manipulates Pickled objects; data from `textFile` is written to Python as MUTF-8 strings. The Python code performs the appropriate bookkeeping to track which deserializer should be used when reading an underlying JavaRDD. This mechanism could also be used to support other data exchange formats, such as MsgPack.
- Several magic numbers were refactored into constants.
- Batching is implemented by wrapping / decorating an unbatched SerDe.
Log a warning if a task's serialized size is very big
As per Reynold's instructions, we now create a warning level log entry if a task's serialized size is too big. "Too big" is currently defined as 100kb. This warning message is generated at most once for each stage.
OpenHashSet fixes
Incorporated ideas from pull request #200.
- Use Murmur Hash 3 finalization step to scramble the bits of HashCode
instead of the simpler version in java.util.HashMap; the latter one
had trouble with ranges of consecutive integers. Murmur Hash 3 is used
by fastutil.
- Don't check keys for equality when re-inserting due to growing the
table; the keys will already be unique.
- Remember the grow threshold instead of recomputing it on each insert
Also added unit tests for size estimation for specialized hash sets and maps.
Use the proper partition index in mapPartitionsWIthIndex
mapPartitionsWithIndex uses TaskContext.partitionId as the partition index. TaskContext.partitionId used to be identical to the partition index in a RDD. However, pull request #186 introduced a scenario (with partition pruning) that the two can be different. This pull request uses the right partition index in all mapPartitionsWithIndex related calls.
Also removed the extra MapPartitionsWIthContextRDD and put all the mapPartitions related functionality in MapPartitionsRDD.
For SPARK-527, Support spark-shell when running on YARN
sync to trunk and resubmit here
In current YARN mode approaching, the application is run in the Application Master as a user program thus the whole spark context is on remote.
This approaching won't support application that involve local interaction and need to be run on where it is launched.
So In this pull request I have a YarnClientClusterScheduler and backend added.
With this scheduler, the user application is launched locally,While the executor will be launched by YARN on remote nodes with a thin AM which only launch the executor and monitor the Driver Actor status, so that when client app is done, it can finish the YARN Application as well.
This enables spark-shell to run upon YARN.
This also enable other Spark applications to have the spark context to run locally with a master-url "yarn-client". Thus e.g. SparkPi could have the result output locally on console instead of output in the log of the remote machine where AM is running on.
Docs also updated to show how to use this yarn-client mode.
- Use Murmur Hash 3 finalization step to scramble the bits of HashCode
instead of the simpler version in java.util.HashMap; the latter one
had trouble with ranges of consecutive integers. Murmur Hash 3 is used
by fastutil.
- Don't check keys for equality when re-inserting due to growing the
table; the keys will already be unique
- Remember the grow threshold instead of recomputing it on each insert
Add graphite sink for metrics
This adds a metrics sink for graphite. The sink must
be configured with the host and port of a graphite node
and optionally may be configured with a prefix that will
be prepended to all metrics that are sent to graphite.
XORShift RNG with unit tests and benchmark
This patch was introduced to address SPARK-950 - the discussion below the ticket explains not only the rationale, but also the design and testing decisions: https://spark-project.atlassian.net/browse/SPARK-950
To run unit test, start SBT console and type:
compile
test-only org.apache.spark.util.XORShiftRandomSuite
To run benchmark, type:
project core
console
Once the Scala console starts, type:
org.apache.spark.util.XORShiftRandom.benchmark(100000000)
XORShiftRandom is also an object with a main method taking the
number of iterations as an argument, so you can also run it
from the command line.
Fix 'timeWriting' stat for shuffle files
Due to concurrent git branches, changes from shuffle file consolidation patch
caused the shuffle write timing patch to no longer actually measure the time,
since it requires time be measured after the stream has been closed.
Also changed the semantics of the index parameter in mapPartitionsWithIndex from the partition index of the output partition to the partition index in the current RDD.
- Don't check keys for equality when re-inserting due to growing the
table; the keys will already be unique
- Remember the grow threshold instead of recomputing it on each insert
- Use Murmur Hash 3 finalization step to scramble the bits of HashCode
instead of the simpler version in java.util.HashMap; the latter one
had trouble with ranges of consecutive integers. Murmur Hash 3 is used
by fastutil.
- Use Object.equals() instead of Scala's == to compare keys, because the
latter does extra casts for numeric types (see the equals method in
https://github.com/scala/scala/blob/master/src/library/scala/runtime/BoxesRunTime.java)
Due to concurrent git branches, changes from shuffle file consolidation patch
caused the shuffle write timing patch to no longer actually measure the time,
since it requires time be measured after the stream has been closed.
With this scheduler, the user application is launched locally,
While the executor will be launched by YARN on remote nodes.
This enables spark-shell to run upon YARN.
PartitionPruningRDD is using index from parent
I was getting a ArrayIndexOutOfBoundsException exception after doing union on pruned RDD. The index it was using on the partition was the index in the original RDD not the new pruned RDD.
To run unit test, start SBT console and type:
compile
test-only org.apache.spark.util.XORShiftRandomSuite
To run benchmark, type:
project core
console
Once the Scala console starts, type:
org.apache.spark.util.XORShiftRandom.benchmark(100000000)
Simple cleanup on Spark's Scala code
Simple cleanup on Spark's Scala code while testing some modules:
-) Remove some of unused imports as I found them
-) Remove ";" in the imports statements
-) Remove () at the end of method calls like size that does not have size effect.
-) Remove some of unused imports as I found them
-) Remove ";" in the imports statements
-) Remove () at the end of method call like size that does not have size effect.
Fix bug where scheduler could hang after task failure.
When a task fails, we need to call reviveOffers() so that the
task can be rescheduled on a different machine. In the current code,
the state in ClusterTaskSetManager indicating which tasks are
pending may be updated after revive offers is called (there's a
race condition here), so when revive offers is called, the task set
manager does not yet realize that there are failed tasks that need
to be relaunched.
This isn't currently unit tested but will be once my pull request for
merging the cluster and local schedulers goes in -- at which point
many more of the unit tests will exercise the code paths through
the cluster scheduler (currently the failure test suite uses the local
scheduler, which is why we didn't see this bug before).
I've diff'd this patch against my own -- since they were both created
independently, this means that two sets of eyes have gone over all the
merge conflicts that were created, so I'm feeling significantly more
confident in the resulting PR.
@rxin has looked at the changes to the repl and is resoundingly
confident that they are correct.
Don't retry tasks when they fail due to a NotSerializableException
As with my previous pull request, this will be unit tested once the Cluster and Local schedulers get merged.
When a task fails, we need to call reviveOffers() so that the
task can be rescheduled on a different machine. In the current code,
the state in ClusterTaskSetManager indicating which tasks are
pending may be updated after revive offers is called (there's a
race condition here), so when revive offers is called, the task set
manager does not yet realize that there are failed tasks that need
to be relaunched.
Don't ignore spark.cores.max when using Mesos Coarse mode
totalCoresAcquired is decremented but never incremented, causing Spark to effectively ignore spark.cores.max in coarse grained Mesos mode.
Migrate the daemon thread started by DAGScheduler to Akka actor
`DAGScheduler` adopts an event queue and a daemon thread polling the it to process events sent to a `DAGScheduler`. This is a classical actor use case. By migrating this thread to Akka actor, we may benefit from both cleaner code and better performance (context switching cost of Akka actor is much less than that of a native thread).
But things become a little complicated when taking existing test code into consideration.
Code in `DAGSchedulerSuite` is somewhat tightly coupled with `DAGScheduler`, and directly calls `DAGScheduler.processEvent` instead of posting event messages to `DAGScheduler`. To minimize code change, I chose to let the actor to delegate messages to `processEvent`. Maybe this doesn't follow conventional actor usage, but I tried to make it apparently correct.
Another tricky part is that, since `DAGScheduler` depends on the `ActorSystem` provided by its field `env`, `env` cannot be null. But the `dagScheduler` field created in `DAGSchedulerSuite.before` was given a null `env`. What's more, `BlockManager.blockIdsToBlockManagers` checks whether `env` is null to determine whether to run the production code or the test code (bad smell here, huh?). I went through all callers of `BlockManager.blockIdsToBlockManagers`, and made sure that if `env != null` holds, then `blockManagerMaster == null` must also hold. That's the logic behind `BlockManager.scala` [line 896](https://github.com/liancheng/incubator-spark/compare/dagscheduler-actor-refine?expand=1#diff-2b643ea78c1add0381754b1f47eec132L896).
At last, since `DAGScheduler` instances are always `start()`ed after creation, I removed the `start()` method, and starts the `eventProcessActor` within the constructor.