Deduplicate Local and Cluster schedulers.
The code in LocalScheduler/LocalTaskSetManager was nearly identical
to the code in ClusterScheduler/ClusterTaskSetManager. The redundancy
made making updating the schedulers unnecessarily painful and error-
prone. This commit combines the two into a single TaskScheduler/
TaskSetManager.
Unfortunately the diff makes this change look much more invasive than it is -- TaskScheduler.scala is only superficially changed (names updated, overrides removed) from the old ClusterScheduler.scala, and the same with
TaskSetManager.scala.
Thanks @rxin for suggesting this change!
Clean up shuffle files once their metadata is gone
Previously, we would only clean the in-memory metadata for consolidated shuffle files.
Additionally, fixes a bug where the Metadata Cleaner was ignoring type-specific TTLs.
Refactored the streaming scheduler and added StreamingListener interface
- Refactored the streaming scheduler for cleaner code. Specifically, the JobManager was renamed to JobScheduler, as it does the actual scheduling of Spark jobs to the SparkContext. The earlier Scheduler was renamed to JobGenerator, as it actually generates the jobs from the DStreams. The JobScheduler starts the JobGenerator. Also, moved all the scheduler related code from spark.streaming to spark.streaming.scheduler package.
- Implemented the StreamingListener interface, similar to SparkListener. The streaming version of StatusReportListener prints the batch processing time statistics (for now). Added StreamingListernerSuite to test it.
- Refactored streaming TestSuiteBase for deduping code in the other streaming testsuites.
Track and report task result serialisation time.
- DirectTaskResult now has a ByteBuffer valueBytes instead of a T value.
- DirectTaskResult now has a member function T value() that deserialises valueBytes.
- Executor serialises value into a ByteBuffer and passes it to DTR's ctor.
- Executor tracks the time taken to do so and puts it in a new field in TaskMetrics.
- StagePage now reports serialisation time from TaskMetrics along with the other things it reported.
Previously, we would only clean the in-memory metadata for consolidated
shuffle files.
Additionally, fixes a bug where the Metadata Cleaner was ignoring type-
specific TTLs.
Add collectPartition to JavaRDD interface.
This interface is useful for implementing `take` from other language frontends where the data is serialized. Also remove `takePartition` from PythonRDD and use `collectPartition` in rdd.py.
Thanks @concretevitamin for the original change and tests.
Change the implementation to use runJob instead of PartitionPruningRDD.
Also update the unit tests and the python take implementation
to use the new interface.
despite having a low number of nodes and relatively small workload (16 nodes, <1.5 TB data).
This would cause an entire job to fail at the beginning of the reduce phase.
There is no particular reason for this value to be small as a timeout should only occur
in an exceptional situation.
Also centralized the reading of spark.akka.askTimeout to AkkaUtils (surely this can later
be cleaned up to use Typesafe).
Finally, deleted some lurking implicits. If anyone can think of a reason they should still
be there, please let me know.
Fix for spark.task.maxFailures not enforced correctly.
Docs at http://spark.incubator.apache.org/docs/latest/configuration.html say:
```
spark.task.maxFailures
Number of individual task failures before giving up on the job. Should be greater than or equal to 1. Number of allowed retries = this value - 1.
```
Previous implementation worked incorrectly. When for example `spark.task.maxFailures` was set to 1, the job was aborted only after the second task failure, not after the first one.
- Refactored Scheduler + JobManager to JobGenerator + JobScheduler and
added JobSet for cleaner code. Moved scheduler related code to
streaming.scheduler package.
- Added StreamingListener trait (similar to SparkListener) to enable
gathering to streaming stats like processing times and delays.
StreamingContext.addListener() to added listeners.
- Deduped some code in streaming tests by modifying TestSuiteBase, and
added StreamingListenerSuite.
- Made file stream more robust to transient failures.
- Changed Spark.setCheckpointDir API to not have the second
'useExisting' parameter. Spark will always create a unique directory
for checkpointing underneath the directory provide to the funtion.
- Fixed bug wrt local relative paths as checkpoint directory.
- Made DStream and RDD checkpointing use
SparkContext.hadoopConfiguration, so that more HDFS compatible
filesystems are supported for checkpointing.
stageId <--> jobId mapping in DAGScheduler
Okay, I think this one is ready to go -- or at least it's ready for review and discussion. It's a carry-over of https://github.com/mesos/spark/pull/842 with updates for the newer job cancellation functionality. The prior discussion still applies. I've actually changed the job cancellation flow a bit: Instead of ``cancelTasks`` going to the TaskScheduler and then ``taskSetFailed`` coming back to the DAGScheduler (resulting in ``abortStage`` there), the DAGScheduler now takes care of figuring out which stages should be cancelled, tells the TaskScheduler to cancel tasks for those stages, then does the cleanup within the DAGScheduler directly without the need for any further prompting by the TaskScheduler.
I know of three outstanding issues, each of which can and should, I believe, be handled in follow-up pull requests:
1) https://spark-project.atlassian.net/browse/SPARK-960
2) JobLogger should be re-factored to eliminate duplication
3) Related to 2), the WebUI should also become a consumer of the DAGScheduler's new understanding of the relationship between jobs and stages so that it can display progress indication and the like grouped by job. Right now, some of this information is just being sent out as part of ``SparkListenerJobStart`` messages, but more or different job <--> stage information may need to be exported from the DAGScheduler to meet listeners needs.
Except for the eventQueue -> Actor commit, the rest can be cherry-picked almost cleanly into branch-0.8. A little merging is needed in MapOutputTracker and the DAGScheduler. Merged versions of those files are in aba2b40ce0
Note that between the recent Actor change in the DAGScheduler and the cleaning up of DAGScheduler data structures on job completion in this PR, some races have been introduced into the DAGSchedulerSuite. Those tests usually pass, and I don't think that better-behaved code that doesn't directly inspect DAGScheduler data structures should be seeing any problems, but I'll work on fixing DAGSchedulerSuite as either an addition to this PR or as a separate request.
UPDATE: Fixed the race that I introduced. Created a JIRA issue (SPARK-965) for the one that was introduced with the switch to eventProcessorActor in the DAGScheduler.
Change the name of input argument in ClusterScheduler#initialize from context to backend.
The SchedulerBackend used to be called ClusterSchedulerContext so just want to make small
change of the input param in the ClusterScheduler#initialize to reflect this.
Added logging of scheduler delays to UI
This commit adds two metrics to the UI:
1) The time to get task results, if they're fetched remotely
2) The scheduler delay. When the scheduler starts getting overwhelmed (because it can't keep up with the rate at which tasks are being submitted), the result is that tasks get delayed on the tail-end: the message from the worker saying that the task has completed ends up in a long queue and takes a while to be processed by the scheduler. This commit records that delay in the UI so that users can tell when the scheduler is becoming the bottleneck.
Memoize preferred locations in ZippedPartitionsBaseRDD
so preferred location computation doesn't lead to exponential explosion.
This was a problem in GraphX where we have a whole chain of RDDs that are ZippedPartitionsRDD's, and the preferred locations were taking eternity to compute.
(cherry picked from commit e36fe55a03)
Signed-off-by: Reynold Xin <rxin@apache.org>
The SchedulerBackend used to be called ClusterSchedulerContext so just want to make small
change of the input param in the ClusterScheduler#initialize to reflect this.
Hadoop 2.2 migration
Includes support for the YARN API stabilized in the Hadoop 2.2 release, and a few style patches.
Short description for each set of commits:
a98f5a0 - "Misc style changes in the 'yarn' package"
a67ebf4 - "A few more style fixes in the 'yarn' package"
Both of these are some minor style changes, such as fixing lines over 100 chars, to the existing YARN code.
ab8652f - "Add a 'new-yarn' directory ... "
Copies everything from `SPARK_HOME/yarn` to `SPARK_HOME/new-yarn`. No actual code changes here.
4f1c3fa - "Hadoop 2.2 YARN API migration ..."
API patches to code in the `SPARK_HOME/new-yarn` directory. There are a few more small style changes mixed in, too.
Based on @colorant's Hadoop 2.2 support for the scala-2.10 branch in #141.
a1a1c62 - "Add optional Hadoop 2.2 settings in sbt build ... "
If Spark should be built against Hadoop 2.2, then:
a) the `org.apache.spark.deploy.yarn` package will be compiled from the `new-yarn` directory.
b) Protobuf v2.5 will be used as a Spark dependency, since Hadoop 2.2 depends on it. Also, Spark will be built against a version of Akka v2.0.5 that's built against Protobuf 2.5, named `akka-2.0.5-protobuf-2.5`. The patched Akka is here: https://github.com/harveyfeng/akka/tree/2.0.5-protobuf-2.5, and was published to local Ivy during testing.
There's also a new boolean environment variable, `SPARK_IS_NEW_HADOOP`, that users can manually set if their `SPARK_HADOOP_VERSION` specification does not start with `2.2`, which is how the build file tries to detect a 2.2 version. Not sure if this is necessary or done in the best way, though...
Fix small bug in web UI and minor clean-up.
There was a bug where sorting order didn't work correctly for write time metrics.
I also cleaned up some earlier code that fixed the same issue for read and
write bytes.
There was a bug where sorting order didn't work correctly for write time metrics.
I also cleaned up some earlier code that fixed the same issue for read and
write bytes.
...and make sure that DAGScheduler data structures are cleaned up on job completion.
Initial effort and discussion at https://github.com/mesos/spark/pull/842
Re-enable zk:// urls for Mesos SparkContexts
This was broken in PR #71 when we explicitly disallow anything that didn't fit a mesos:// url.
Although it is not really clear that a zk:// url should match Mesos, it is what the docs say and it is necessary for backwards compatibility.
Additionally added a unit test for the creation of all types of TaskSchedulers. Since YARN and Mesos are not necessarily available in the system, they are allowed to pass as long as the YARN/Mesos code paths are exercised.
Scheduler quits when newStage fails
The current scheduler thread does not handle exceptions from newStage stage while launching new jobs. The thread fails on any exception that gets triggered at that level, leaving the cluster hanging with no schduler.
The current scheduler thread does not handle exceptions from createStage stage while launching new jobs. The thread fails on any exception that gets triggered at that level, leaving the cluster hanging with no schduler.
This was broken in PR #71 when we explicitly disallow anything that
didn't fit a mesos:// url.
Although it is not really clear that a zk:// url should match Mesos,
it is what the docs say and it is necessary for backwards compatibility.
SPARK-965: https://spark-project.atlassian.net/browse/SPARK-965
SPARK-966: https://spark-project.atlassian.net/browse/SPARK-966
* Add back DAGScheduler.start(), eventProcessActor is created and started here.
Notice that function is only called by SparkContext.
* Cancel the scheduled stage resubmission task when stopping eventProcessActor
* Add a new DAGSchedulerEvent ResubmitFailedStages
This event message is sent by the scheduled stage resubmission task to eventProcessActor. In this way, DAGScheduler.resubmitFailedStages is guaranteed to be executed from the same thread that runs DAGScheduler.processEvent.
Please refer to discussion in SPARK-966 for details.
add http timeout for httpbroadcast
While pulling task bytecode from HttpBroadcast server, there's no timeout value set. This may cause spark executor code hang and other task in the same executor process wait for the lock. I have encountered the issue in my cluster. Here's the stacktrace I captured : https://gist.github.com/haitaoyao/7655830
So add a time out value to ensure the task fail fast.
Custom Serializers for PySpark
This pull request adds support for custom serializers to PySpark. For now, all Python-transformed (or parallelize()d RDDs) are serialized with the same serializer that's specified when creating SparkContext.
For now, PySpark includes `PickleSerDe` and `MarshalSerDe` classes for using Python's `pickle` and `marshal` serializers. It's pretty easy to add support for other serializers, although I still need to add instructions on this.
A few notable changes:
- The Scala `PythonRDD` class no longer manipulates Pickled objects; data from `textFile` is written to Python as MUTF-8 strings. The Python code performs the appropriate bookkeeping to track which deserializer should be used when reading an underlying JavaRDD. This mechanism could also be used to support other data exchange formats, such as MsgPack.
- Several magic numbers were refactored into constants.
- Batching is implemented by wrapping / decorating an unbatched SerDe.
Log a warning if a task's serialized size is very big
As per Reynold's instructions, we now create a warning level log entry if a task's serialized size is too big. "Too big" is currently defined as 100kb. This warning message is generated at most once for each stage.
OpenHashSet fixes
Incorporated ideas from pull request #200.
- Use Murmur Hash 3 finalization step to scramble the bits of HashCode
instead of the simpler version in java.util.HashMap; the latter one
had trouble with ranges of consecutive integers. Murmur Hash 3 is used
by fastutil.
- Don't check keys for equality when re-inserting due to growing the
table; the keys will already be unique.
- Remember the grow threshold instead of recomputing it on each insert
Also added unit tests for size estimation for specialized hash sets and maps.
Use the proper partition index in mapPartitionsWIthIndex
mapPartitionsWithIndex uses TaskContext.partitionId as the partition index. TaskContext.partitionId used to be identical to the partition index in a RDD. However, pull request #186 introduced a scenario (with partition pruning) that the two can be different. This pull request uses the right partition index in all mapPartitionsWithIndex related calls.
Also removed the extra MapPartitionsWIthContextRDD and put all the mapPartitions related functionality in MapPartitionsRDD.
For SPARK-527, Support spark-shell when running on YARN
sync to trunk and resubmit here
In current YARN mode approaching, the application is run in the Application Master as a user program thus the whole spark context is on remote.
This approaching won't support application that involve local interaction and need to be run on where it is launched.
So In this pull request I have a YarnClientClusterScheduler and backend added.
With this scheduler, the user application is launched locally,While the executor will be launched by YARN on remote nodes with a thin AM which only launch the executor and monitor the Driver Actor status, so that when client app is done, it can finish the YARN Application as well.
This enables spark-shell to run upon YARN.
This also enable other Spark applications to have the spark context to run locally with a master-url "yarn-client". Thus e.g. SparkPi could have the result output locally on console instead of output in the log of the remote machine where AM is running on.
Docs also updated to show how to use this yarn-client mode.
- Use Murmur Hash 3 finalization step to scramble the bits of HashCode
instead of the simpler version in java.util.HashMap; the latter one
had trouble with ranges of consecutive integers. Murmur Hash 3 is used
by fastutil.
- Don't check keys for equality when re-inserting due to growing the
table; the keys will already be unique
- Remember the grow threshold instead of recomputing it on each insert
Add graphite sink for metrics
This adds a metrics sink for graphite. The sink must
be configured with the host and port of a graphite node
and optionally may be configured with a prefix that will
be prepended to all metrics that are sent to graphite.
XORShift RNG with unit tests and benchmark
This patch was introduced to address SPARK-950 - the discussion below the ticket explains not only the rationale, but also the design and testing decisions: https://spark-project.atlassian.net/browse/SPARK-950
To run unit test, start SBT console and type:
compile
test-only org.apache.spark.util.XORShiftRandomSuite
To run benchmark, type:
project core
console
Once the Scala console starts, type:
org.apache.spark.util.XORShiftRandom.benchmark(100000000)
XORShiftRandom is also an object with a main method taking the
number of iterations as an argument, so you can also run it
from the command line.
Fix 'timeWriting' stat for shuffle files
Due to concurrent git branches, changes from shuffle file consolidation patch
caused the shuffle write timing patch to no longer actually measure the time,
since it requires time be measured after the stream has been closed.
Also changed the semantics of the index parameter in mapPartitionsWithIndex from the partition index of the output partition to the partition index in the current RDD.
- Don't check keys for equality when re-inserting due to growing the
table; the keys will already be unique
- Remember the grow threshold instead of recomputing it on each insert
- Use Murmur Hash 3 finalization step to scramble the bits of HashCode
instead of the simpler version in java.util.HashMap; the latter one
had trouble with ranges of consecutive integers. Murmur Hash 3 is used
by fastutil.
- Use Object.equals() instead of Scala's == to compare keys, because the
latter does extra casts for numeric types (see the equals method in
https://github.com/scala/scala/blob/master/src/library/scala/runtime/BoxesRunTime.java)
Due to concurrent git branches, changes from shuffle file consolidation patch
caused the shuffle write timing patch to no longer actually measure the time,
since it requires time be measured after the stream has been closed.
With this scheduler, the user application is launched locally,
While the executor will be launched by YARN on remote nodes.
This enables spark-shell to run upon YARN.
PartitionPruningRDD is using index from parent
I was getting a ArrayIndexOutOfBoundsException exception after doing union on pruned RDD. The index it was using on the partition was the index in the original RDD not the new pruned RDD.
To run unit test, start SBT console and type:
compile
test-only org.apache.spark.util.XORShiftRandomSuite
To run benchmark, type:
project core
console
Once the Scala console starts, type:
org.apache.spark.util.XORShiftRandom.benchmark(100000000)
Simple cleanup on Spark's Scala code
Simple cleanup on Spark's Scala code while testing some modules:
-) Remove some of unused imports as I found them
-) Remove ";" in the imports statements
-) Remove () at the end of method calls like size that does not have size effect.
-) Remove some of unused imports as I found them
-) Remove ";" in the imports statements
-) Remove () at the end of method call like size that does not have size effect.
Fix bug where scheduler could hang after task failure.
When a task fails, we need to call reviveOffers() so that the
task can be rescheduled on a different machine. In the current code,
the state in ClusterTaskSetManager indicating which tasks are
pending may be updated after revive offers is called (there's a
race condition here), so when revive offers is called, the task set
manager does not yet realize that there are failed tasks that need
to be relaunched.
This isn't currently unit tested but will be once my pull request for
merging the cluster and local schedulers goes in -- at which point
many more of the unit tests will exercise the code paths through
the cluster scheduler (currently the failure test suite uses the local
scheduler, which is why we didn't see this bug before).
I've diff'd this patch against my own -- since they were both created
independently, this means that two sets of eyes have gone over all the
merge conflicts that were created, so I'm feeling significantly more
confident in the resulting PR.
@rxin has looked at the changes to the repl and is resoundingly
confident that they are correct.
Don't retry tasks when they fail due to a NotSerializableException
As with my previous pull request, this will be unit tested once the Cluster and Local schedulers get merged.
When a task fails, we need to call reviveOffers() so that the
task can be rescheduled on a different machine. In the current code,
the state in ClusterTaskSetManager indicating which tasks are
pending may be updated after revive offers is called (there's a
race condition here), so when revive offers is called, the task set
manager does not yet realize that there are failed tasks that need
to be relaunched.
When a task fails, we need to call reviveOffers() so that the
task can be rescheduled on a different machine. In the current code,
the state in ClusterTaskSetManager indicating which tasks are
pending may be updated after revive offers is called (there's a
race condition here), so when revive offers is called, the task set
manager does not yet realize that there are failed tasks that need
to be relaunched.
Don't ignore spark.cores.max when using Mesos Coarse mode
totalCoresAcquired is decremented but never incremented, causing Spark to effectively ignore spark.cores.max in coarse grained Mesos mode.
Migrate the daemon thread started by DAGScheduler to Akka actor
`DAGScheduler` adopts an event queue and a daemon thread polling the it to process events sent to a `DAGScheduler`. This is a classical actor use case. By migrating this thread to Akka actor, we may benefit from both cleaner code and better performance (context switching cost of Akka actor is much less than that of a native thread).
But things become a little complicated when taking existing test code into consideration.
Code in `DAGSchedulerSuite` is somewhat tightly coupled with `DAGScheduler`, and directly calls `DAGScheduler.processEvent` instead of posting event messages to `DAGScheduler`. To minimize code change, I chose to let the actor to delegate messages to `processEvent`. Maybe this doesn't follow conventional actor usage, but I tried to make it apparently correct.
Another tricky part is that, since `DAGScheduler` depends on the `ActorSystem` provided by its field `env`, `env` cannot be null. But the `dagScheduler` field created in `DAGSchedulerSuite.before` was given a null `env`. What's more, `BlockManager.blockIdsToBlockManagers` checks whether `env` is null to determine whether to run the production code or the test code (bad smell here, huh?). I went through all callers of `BlockManager.blockIdsToBlockManagers`, and made sure that if `env != null` holds, then `blockManagerMaster == null` must also hold. That's the logic behind `BlockManager.scala` [line 896](https://github.com/liancheng/incubator-spark/compare/dagscheduler-actor-refine?expand=1#diff-2b643ea78c1add0381754b1f47eec132L896).
At last, since `DAGScheduler` instances are always `start()`ed after creation, I removed the `start()` method, and starts the `eventProcessActor` within the constructor.
For now, this only adds MarshalSerializer, but it lays the groundwork
for other supporting custom serializers. Many of these mechanisms
can also be used to support deserialization of different data formats
sent by Java, such as data encoded by MsgPack.
This also fixes a bug in SparkContext.union().
Fix secure hdfs access for spark on yarn
https://github.com/apache/incubator-spark/pull/23 broke secure hdfs access. Not sure if it works with secure hdfs on standalone. Fixing it at least for spark on yarn.
The broadcasting of jobconf change also broke secure hdfs access as it didn't take into account things calling the getPartitions before sparkContext is initialized. The DAGScheduler does this as it tries to getShuffleMapStage.
This adds a metrics sink for graphite. The sink must
be configured with the host and port of a graphite node
and optionally may be configured with a prefix that will
be prepended to all metrics that are sent to graphite.
Include appId in executor cmd line args
add the appId back into the executor cmd line args.
I also made a pretty lame regression test, just to make sure it doesn't get dropped in the future. not sure it will run on the build server, though, b/c `ExecutorRunner.buildCommandSeq()` expects to be abel to run the scripts in `bin`.
Never store shuffle blocks in BlockManager
After the BlockId refactor (PR #114), it became very clear that ShuffleBlocks are of no use
within BlockManager (they had a no-arg constructor!). This patch completely eliminates
them, saving us around 100-150 bytes per shuffle block.
The total, system-wide overhead per shuffle block is now a flat 8 bytes, excluding
state saved by the MapOutputTracker.
Note: This should *not* be merged directly into 0.8.0 -- see #138
After the BlockId refactor (PR #114), it became very clear that ShuffleBlocks are of no use
within BlockManager (they had a no-arg constructor!). This patch completely eliminates
them, saving us around 100-150 bytes per shuffle block.
The total, system-wide overhead per shuffle block is now a flat 8 bytes, excluding
state saved by the MapOutputTracker.
add javadoc to JobLogger, and some small fix
against Spark-941
add javadoc to JobLogger, output more info for RDD, modify recordStageDepGraph to avoid output duplicate stage dependency information
(cherry picked from commit 518cf22eb2)
Signed-off-by: Reynold Xin <rxin@apache.org>
- ShuffleBlocks has been removed and replaced by ShuffleWriterGroup.
- ShuffleWriterGroup no longer contains a reference to a ShuffleFileGroup.
- ShuffleFile has been removed and its contents are now within ShuffleFileGroup.
- ShuffleBlockManager.forShuffle has been replaced by a more stateful forMapTask.
Overhead of each shuffle block for consolidation has been reduced from >300 bytes
to 8 bytes (1 primitive Long). Verified via profiler testing with 1 mil shuffle blocks,
net overhead was ~8,400,000 bytes.
Despite the memory-optimized implementation incurring extra CPU overhead, the runtime
of the shuffle phase in this test was only around 2% slower, while the reduce phase
was 40% faster, when compared to not using any shuffle file consolidation.
If we support custom serializers, the Python
worker will know what type of input to expect,
so we won't need to wrap Tuple2 and Strings into
pickled tuples and strings.
Handle ConcurrentModificationExceptions in SparkContext init.
System.getProperties.toMap will fail-fast when concurrently modified,
and it seems like some other thread started by SparkContext does
a System.setProperty during it's initialization.
Handle this by just looping on ConcurrentModificationException, which
seems the safest, since the non-fail-fast methods (Hastable.entrySet)
have undefined behavior under concurrent modification.
The code in LocalScheduler/LocalTaskSetManager was nearly identical
to the code in ClusterScheduler/ClusterTaskSetManager. The redundancy
made making updating the schedulers unnecessarily painful and error-
prone. This commit combines the two into a single TaskScheduler/
TaskSetManager.
Fixed incorrect log message in local scheduler
This change is especially relevant at the moment, because some users are seeing this failure, and the log message is misleading/incorrect (because for the tests, the max failures is set to 0, not 4)
Pull SparkHadoopUtil out of SparkEnv (jira SPARK-886)
Having the logic to initialize the correct SparkHadoopUtil in SparkEnv prevents it from being used until after the SparkContext is initialized. This causes issues like https://spark-project.atlassian.net/browse/SPARK-886. It also makes it hard to use in singleton objects. For instance I want to use it in the security code.
Add support for local:// URI scheme for addJars()
This PR adds support for a new URI scheme for SparkContext.addJars(): `local://file/path`.
The *local* scheme indicates that the `/file/path` exists on every worker node. The reason for its existence is for big library JARs, which would be really expensive to serve using the standard HTTP fileserver distribution method, especially for big clusters. Today the only inexpensive method (assuming such a file is on every host, via say NFS, rsync, etc.) of doing this is to add the JAR to the SPARK_CLASSPATH, but we want a method where the user does not need to modify the Spark configuration.
I would add something to the docs, but it's not obvious where to add it.
Oh, and it would be great if this could be merged in time for 0.8.1.
Display both task ID and task attempt ID in UI, and rename taskId to taskAttemptId
Previously only the task attempt ID was shown in the UI; this was confusing because the job can be shown as complete while there are tasks still running. Showing the task ID in addition to the attempt ID makes it clear which tasks are redundant.
This commit also renames taskId to taskAttemptId in TaskInfo and in the local/cluster schedulers. This identifier was used to uniquely identify attempts, not tasks, so the current naming was confusing. The new naming is also more consistent with map reduce.
System.getProperties.toMap will fail-fast when concurrently modified,
and it seems like some other thread started by SparkContext does
a System.setProperty during it's initialization.
Handle this by just looping on ConcurrentModificationException, which
seems the safest, since the non-fail-fast methods (Hastable.entrySet)
have undefined behavior under concurrent modification.
Added new Spark Streaming operations
New operations
- transformWith which allows arbitrary 2-to-1 DStream transform, added to Scala and Java API
- StreamingContext.transform to allow arbitrary n-to-1 DStream
- leftOuterJoin and rightOuterJoin between 2 DStreams, added to Scala and Java API
- missing variations of join and cogroup added to Scala Java API
- missing JavaStreamingContext.union
Updated a number of Java and Scala API docs
Properly display the name of a stage in the UI.
This fixes a bug introduced by the fix for SPARK-940, which
changed the UI to display the RDD name rather than the stage
name. As a result, no name for the stage was shown when
using the Spark shell, which meant that there was no way to
click on the stage to see more details (e.g., the running
tasks). This commit changes the UI back to using the
stage name.
@pwendell -- let me know if this change was intentional
This fixes a bug introduced by the fix for SPARK-940, which
changed the UI to display the RDD name rather than the stage
name. As a result, no name for the stage was shown when
using the Spark shell, which meant that there was no way to
click on the stage to see more details (e.g., the running
tasks). This commit changes the UI back to using the
stage name.
This patch adds an operator called repartition with more straightforward
semantics than the current `coalesce` operator. There are a few use cases
where this operator is useful:
1. If a user wants to increase the number of partitions in the RDD. This
is more common now with streaming. E.g. a user is ingesting data on one
node but they want to add more partitions to ensure parallelism of
subsequent operations across threads or the cluster.
Right now they have to call rdd.coalesce(numSplits, shuffle=true) - that's
super confusing.
2. If a user has input data where the number of partitions is not known. E.g.
> sc.textFile("some file").coalesce(50)....
This is both vague semantically (am I growing or shrinking this RDD) but also,
may not work correctly if the base RDD has fewer than 50 partitions.
The new operator forces shuffles every time, so it will always produce exactly
the number of new partitions. It also throws an exception rather than silently
not-working if a bad input is passed.
I am currently adding streaming tests (requires refactoring some of the test
suite to allow testing at partition granularity), so this is not ready for
merge yet. But feedback is welcome.
Show "GETTING_RESULTS" state in UI.
This commit adds a set of calls using the SparkListener interface
that indicate when a task is remotely fetching results, so that
we can display this (potentially time-consuming) phase of execution
to users through the UI.
This should fix SPARK-902, an issue where some
Java API Function classes could cause
AbstractMethodErrors when user code is compiled
using the Eclipse compiler.
Thanks to @MartinWeindel for diagnosing this
problem.
(This PR subsumes / closes#30)
This patch fixes a bug where the Spark UI didn't display the correct number of total
tasks if the number of tasks in a Stage doesn't equal the number of RDD partitions.
It also cleans up the listener API a bit by embedding this information in the
StageInfo class rather than passing it seperately.
Split MapOutputTracker into Master/Worker classes
Previously, MapOutputTracker contained fields and methods that were only applicable to the master or worker instances. This commit introduces a MasterMapOutputTracker class to prevent the master-specific methods from being accessed on workers.
I also renamed a few methods and made others protected/private.
This commit adds a set of calls using the SparkListener interface
that indicate when a task is remotely fetching results, so that
we can display this (potentially time-consuming) phase of execution
to users through the UI.
Made the following traits/interfaces/classes non-public:
Made the following traits/interfaces/classes non-public:
SparkHadoopWriter
SparkHadoopMapRedUtil
SparkHadoopMapReduceUtil
SparkHadoopUtil
PythonAccumulatorParam
BlockManagerSlaveActor
Provide Instrumentation for Shuffle Write Performance
Shuffle write performance can have a major impact on the performance of jobs. This patch adds a few pieces of instrumentation related to shuffle writes. They are:
1. A listing of the time spent performing blocking writes for each task. This is implemented by keeping track of the aggregate delay seen by many individual writes.
2. An undocumented option `spark.shuffle.sync` which forces shuffle data to sync to disk. This is necessary for measuring shuffle performance in the absence of the OS buffer cache.
3. An internal utility which micro-benchmarks write throughput for simulated shuffle outputs.
I'm going to do some performance testing on this to see whether these small timing calls add overhead. From a feature perspective, however, I consider this complete. Any feedback is appreciated.
Don't setup the uncaught exception handler in local mode.
This avoids unit test failures for Spark streaming.
java.util.concurrent.RejectedExecutionException: Task org.apache.spark.streaming.JobManager$JobHandler@38cf728d rejected from java.util.concurrent.ThreadPoolExecutor@3b69a41e[Terminated, pool size = 0, active threads = 0, queued tasks = 0, completed tasks = 14]
at java.util.concurrent.ThreadPoolExecutor$AbortPolicy.rejectedExecution(ThreadPoolExecutor.java:2048)
at java.util.concurrent.ThreadPoolExecutor.reject(ThreadPoolExecutor.java:821)
at java.util.concurrent.ThreadPoolExecutor.execute(ThreadPoolExecutor.java:1372)
at org.apache.spark.streaming.JobManager.runJob(JobManager.scala:54)
at org.apache.spark.streaming.Scheduler$$anonfun$generateJobs$2.apply(Scheduler.scala:108)
at org.apache.spark.streaming.Scheduler$$anonfun$generateJobs$2.apply(Scheduler.scala:108)
at scala.collection.mutable.ResizableArray$class.foreach(ResizableArray.scala:60)
at scala.collection.mutable.ArrayBuffer.foreach(ArrayBuffer.scala:47)
at org.apache.spark.streaming.Scheduler.generateJobs(Scheduler.scala:108)
at org.apache.spark.streaming.Scheduler$$anonfun$1.apply$mcVJ$sp(Scheduler.scala:41)
at org.apache.spark.streaming.util.RecurringTimer.org$apache$spark$streaming$util$RecurringTimer$$loop(RecurringTimer.scala:66)
at org.apache.spark.streaming.util.RecurringTimer$$anon$1.run(RecurringTimer.scala:34)
Code de-duplication in BlockManager
The BlockManager has a few methods that duplicate most of their code. This pull request extracts the duplicated code into private doPut(), doGetLocal(), and doGetRemote() methods that unify the storing/reading of bytes or objects.
I believe that I preserved the logic of the original code, but I'd appreciate some help in reviewing this.
The Spark shuffle phase can produce a large number of files, as one file is created
per mapper per reducer. For large or repeated jobs, this often produces millions of
shuffle files, which sees extremely degredaded performance from the OS file system.
This patch seeks to reduce that burden by combining multipe shuffle files into one.
This PR draws upon the work of Jason Dai in https://github.com/mesos/spark/pull/669.
However, it simplifies the design in order to get the majority of the gain with less
overall intellectual and code burden. The vast majority of code in this pull request
is a refactor to allow the insertion of a clean layer of indirection between logical
block ids and physical files. This, I feel, provides some design clarity in addition
to enabling shuffle file consolidation.
The main goal is to produce one shuffle file per reducer per active mapper thread.
This allows us to isolate the mappers (simplifying the failure modes), while still
allowing us to reduce the number of mappers tremendously for large tasks. In order
to accomplish this, we simply create a new set of shuffle files for every parallel
task, and return the files to a pool which will be given out to the next run task.
The main goal of this refactor was to allow the interposition of a new layer which
maps logical BlockIds to physical locations other than a file with the same name
as the BlockId. In particular, BlockIds will need to be mappable to chunks of files,
as multiple will be stored in the same file.
In order to accomplish this, the following changes have been made:
- Creation of DiskBlockManager, which manages the association of logical BlockIds
to physical disk locations (called FileSegments). By default, Blocks are simply
mapped to physical files of the same name, as before.
- The DiskStore now indirects all requests for a given BlockId through the DiskBlockManager
in order to resolve the actual File location.
- DiskBlockObjectWriter has been merged into BlockObjectWriter.
- The Netty PathResolver has been changed to map BlockIds into FileSegments, as this
codepath is the only one that uses Netty, and that is likely to remain the case.
Overall, I think this refactor produces a clearer division between the logical Block
paradigm and their physical on-disk location. There is now an explicit (and documented)
mapping from one to the other.
Previously, MapOutputTracker contained fields and methods that
were only applicable to the master or worker instances. This
commit introduces a MasterMapOutputTracker class to prevent
the master-specific methods from being accessed on workers.
I also renamed a few methods and made others protected/private.
Job cancellation via job group id.
This PR adds a simple API to group together a set of jobs belonging to a thread and threads spawned from it. It also allows the cancellation of all jobs in this group.
An example:
sc.setJobDescription("this_is_the_group_id", "some job description")
sc.parallelize(1 to 10000, 2).map { i => Thread.sleep(10); i }.count()
In a separate thread:
sc.cancelJobGroup("this_is_the_group_id")
Faster and stable/reliable broadcast
HttpBroadcast is noticeably slow, but the alternatives (TreeBroadcast or BitTorrentBroadcast) are notoriously unreliable. The main problem with them is they try to manage the memory for the pieces of a broadcast themselves. Right now, the BroadcastManager does not know which machines the tasks reading from a broadcast variable is running and when they have finished. Consequently, we try to guess and often guess wrong, which blows up the memory usage and kills/hangs jobs.
This very simple implementation solves the problem by not trying to manage the intermediate pieces; instead, it offloads that duty to the BlockManager which is quite good at juggling blocks. Otherwise, it is very similar to the BitTorrentBroadcast implementation (without fancy optimizations). And it runs much faster than HttpBroadcast we have right now.
I've been using this for another project for last couple of weeks, and just today did some benchmarking against the Http one. The following shows the improvements for increasing broadcast size for cold runs. Each line represent the number of receivers.
![fix-bc-first](https://f.cloud.github.com/assets/232966/1349342/ffa149e4-36e7-11e3-9fa6-c74555829356.png)
After the first broadcast is over, i.e., after JVM is wormed up and for HttpBroadcast the server is already running (I think), the following are the improvements for warm runs.
![fix-bc-succ](https://f.cloud.github.com/assets/232966/1349352/5a948bae-36e8-11e3-98ce-34f19ebd33e0.jpg)
The curves are not as nice as the cold runs, but the improvements are obvious, specially for larger broadcasts and more receivers.
Depending on how it goes, we should deprecate and/or remove old TreeBroadcast and BitTorrentBroadcast implementations, and hopefully, SPARK-889 will not be necessary any more.
Mainly, this occurs if you provide a messed up MASTER url (one that doesn't match one
of our regexes). Previously, we would default to Mesos, fail, and then start the shell
anyway, except that any Spark command would fail.
The interface was used only by the DAG scheduler (so it wasn't necessary
to define the additional interface), and the naming makes it very
confusing when reading the code (because "listener" was used
to describe the DAG scheduler, rather than SparkListeners, which
implement a nearly-identical interface but serve a different
function).
Renamed StandaloneX to CoarseGrainedX.
(as suggested by @rxin here https://github.com/apache/incubator-spark/pull/14)
The previous names were confusing because the components weren't just
used in Standalone mode. The scheduler used for Standalone
mode is called SparkDeploySchedulerBackend, so referring to the base class
as StandaloneSchedulerBackend was misleading.
Refactor BlockId into an actual type
Converts all of our BlockId strings into actual BlockId types. Here are some advantages of doing this now:
+ Type safety
+ Code clarity - it's now obvious what the key of a shuffle or rdd block is, for instance. Additionally, appearing in tuple/map type signatures is a big readability bonus. A Seq[(String, BlockStatus)] is not very clear. Further, we can now use more Scala features, like matching on BlockId types.
+ Explicit usage - we can now formally tell where various BlockIds are being used (without doing string searches); this makes updating current BlockIds a much clearer process, and compiler-supported.
(I'm looking at you, shuffle file consolidation.)
+ It will only get harder to make this change as time goes on.
Downside is, of course, that this is a very invasive change touching a lot of different files, which will inevitably lead to merge conflicts for many.
This is an unfortunately invasive change which converts all of our BlockId
strings into actual BlockId types. Here are some advantages of doing this now:
+ Type safety
+ Code clarity - it's now obvious what the key of a shuffle or rdd block is,
for instance. Additionally, appearing in tuple/map type signatures is a big
readability bonus. A Seq[(String, BlockStatus)] is not very clear.
Further, we can now use more Scala features, like matching on BlockId types.
+ Explicit usage - we can now formally tell where various BlockIds are being used
(without doing string searches); this makes updating current BlockIds a much
clearer process, and compiler-supported.
(I'm looking at you, shuffle file consolidation.)
+ It will only get harder to make this change as time goes on.
Since this touches a lot of files, it'd be best to either get this patch
in quickly or throw it on the ground to avoid too many secondary merge conflicts.
Add an optional closure parameter to HadoopRDD instantiation to use when creating local JobConfs.
Having HadoopRDD accept this optional closure eliminates the need for the HadoopFileRDD added earlier. It makes the HadoopRDD more general, in that the caller can specify any JobConf initialization flow.
Address review comments, move to incubator spark
Also includes a small fix to speculative execution.
<edit> Continued from https://github.com/mesos/spark/pull/914 </edit>
Standalone Scheduler fault tolerance using ZooKeeper
This patch implements full distributed fault tolerance for standalone scheduler Masters.
There is only one master Leader at a time, which is actively serving scheduling
requests. If this Leader crashes, another master will eventually be elected, reconstruct
the state from the first Master, and continue serving scheduling requests.
Leader election is performed using the ZooKeeper leader election pattern. We try to minimize
the use of ZooKeeper and the assumptions about ZooKeeper's behavior, so there is a layer of
retries and session monitoring on top of the ZooKeeper client.
Master failover follows directly from the single-node Master recovery via the file
system (patch d5a96fe), save that the Master state is stored in ZooKeeper instead.
Configuration:
By default, no recovery mechanism is enabled (spark.deploy.recoveryMode = NONE).
By setting spark.deploy.recoveryMode to ZOOKEEPER and setting spark.deploy.zookeeper.url
to an appropriate ZooKeeper URL, ZooKeeper recovery mode is enabled.
By setting spark.deploy.recoveryMode to FILESYSTEM and setting spark.deploy.recoveryDirectory
to an appropriate directory accessible by the Master, we will keep the behavior of from d5a96fe.
Additionally, places where a Master could be specificied by a spark:// url can now take
comma-delimited lists to specify backup masters. Note that this is only used for registration
of NEW Workers and application Clients. Once a Worker or Client has registered with the
Master Leader, it is "in the system" and will never need to register again.
SPARK-900 Use coarser grained naming for metrics
see SPARK-900 Use coarser grained naming for metrics.
Now the new metric name is formatted as {XXX.YYY.ZZZ.COUNTER_UNIT}, XXX.YYY.ZZZ represents the group name, which can group several metrics under the same Ganglia view.
Don't allocate Kryo buffers unless needed
I noticed that the Kryo serializer could be slower than the Java one by 2-3x on small shuffles because it spend a lot of time initializing Kryo Input and Output objects. This is because our default buffer size for them is very large. Since the serializer is often used on streams, I made the initialization lazy for that, and used a smaller buffer (auto-managed by Kryo) for input.
Conflicts:
bagel/pom.xml
core/pom.xml
core/src/test/scala/org/apache/spark/ui/UISuite.scala
examples/pom.xml
mllib/pom.xml
pom.xml
project/SparkBuild.scala
repl/pom.xml
streaming/pom.xml
tools/pom.xml
In scala 2.10, a shorter representation is used for naming artifacts
so changed to shorter scala version for artifacts and made it a property in pom.
Allow users to pass broadcasted Configurations and cache InputFormats across Hadoop file reads.
Note: originally from https://github.com/mesos/spark/pull/942
Currently motivated by Shark queries on Hive-partitioned tables, where there's a JobConf broadcast for every Hive-partition (i.e., every subdirectory read). The only thing different about those JobConfs is the input path - the Hadoop Configuration that the JobConfs are constructed from remain the same.
This PR only modifies the old Hadoop API RDDs, but similar additions to the new API might reduce computation latencies a little bit for high-frequency FileInputDStreams (which only uses the new API right now).
As a small bonus, added InputFormats caching, to avoid reflection calls for every RDD#compute().
Few other notes:
Added a general soft-reference hashmap in SparkHadoopUtil because I wanted to avoid adding another class to SparkEnv.
SparkContext default hadoopConfiguration isn't cached. There's no equals() method for Configuration, so there isn't a good way to determine when configuration properties have changed.
SPARK-920/921 - JSON endpoint updates
920 - Removal of duplicate scheme part of Spark URI, it was appearing as spark://spark//host:port in the JSON field.
JSON now delivered as:
url:spark://127.0.0.1:7077
921 - Adding the URL of the Main Application UI will allow custom interfaces (that use the JSON output) to redirect from the standalone UI.
One major change was the use of messages instead of raw functions as the
parameter of Akka scheduled timers. Since messages are serialized, unlike
raw functions, the behavior is easier to think about and doesn't cause
race conditions when exceptions are thrown.
Another change is to avoid using global pointers that might change without
a lock.
The previous names were confusing because the components weren't just
used in Standalone mode -- in fact, the scheduler used for Standalone
mode is called SparkDeploySchedulerBackend. So, the previous names
were misleading.
Currently PythonPartitioner determines partition ID by hashing a
byte-array representation of PySpark's key. This PR lets
PythonPartitioner use the actual partition ID, which is required e.g.
for sorting via PySpark.
This patch implements full distributed fault tolerance for standalone scheduler Masters.
There is only one master Leader at a time, which is actively serving scheduling
requests. If this Leader crashes, another master will eventually be elected, reconstruct
the state from the first Master, and continue serving scheduling requests.
Leader election is performed using the ZooKeeper leader election pattern. We try to minimize
the use of ZooKeeper and the assumptions about ZooKeeper's behavior, so there is a layer of
retries and session monitoring on top of the ZooKeeper client.
Master failover follows directly from the single-node Master recovery via the file
system (patch 194ba4b8), save that the Master state is stored in ZooKeeper instead.
Configuration:
By default, no recovery mechanism is enabled (spark.deploy.recoveryMode = NONE).
By setting spark.deploy.recoveryMode to ZOOKEEPER and setting spark.deploy.zookeeper.url
to an appropriate ZooKeeper URL, ZooKeeper recovery mode is enabled.
By setting spark.deploy.recoveryMode to FILESYSTEM and setting spark.deploy.recoveryDirectory
to an appropriate directory accessible by the Master, we will keep the behavior of from 194ba4b8.
Additionally, places where a Master could be specificied by a spark:// url can now take
comma-delimited lists to specify backup masters. Note that this is only used for registration
of NEW Workers and application Clients. Once a Worker or Client has registered with the
Master Leader, it is "in the system" and will never need to register again.
Forthcoming:
Documentation, tests (! - only ad hoc testing has been performed so far)
I do not intend for this commit to be merged until tests are added, but this patch should
still be mostly reviewable until then.
Implements a basic form of Standalone Scheduler fault recovery. In particular,
this allows faults to be manually recovered from by means of restarting the
Master process on the same machine. This is the majority of the code necessary
for general fault tolerance, which will first elect a leader and then recover
the Master state.
In order to enable fault recovery, the Master will persist a small amount of state related
to the registration of Workers and Applications to disk. If the Master is started and
sees that this state is still around, it will enter Recovery mode, during which time it
will not schedule any new Executors on Workers (but it does accept the registration of
new Clients and Workers).
At this point, the Master attempts to reconnect to all Workers and Client applications
that were registered at the time of failure. After confirming either the existence
or nonexistence of all such nodes (within a certain timeout), the Master will exit
Recovery mode and resume normal scheduling.
Improved organization of scheduling packages.
This commit does not change any code -- only file organization.
Please let me know if there was some masterminded strategy behind
the existing organization that I failed to understand!
There are two components of this change:
(1) Moving files out of the cluster package, and down
a level to the scheduling package. These files are all used by
the local scheduler in addition to the cluster scheduler(s), so
should not be in the cluster package. As a result of this change,
none of the files in the local package reference files in the
cluster package.
(2) Moving the mesos package to within the cluster package.
The mesos scheduling code is for a cluster, and represents a
specific case of cluster scheduling (the Mesos-related classes
often subclass cluster scheduling classes). Thus, the most logical
place for it seems to be within the cluster package.
The one thing about the scheduling code that seems a little funny to me
is the naming of the SchedulerBackends. The StandaloneSchedulerBackend
is not just for Standalone mode, but instead is used by Mesos coarse grained
mode and Yarn, and the backend that *is* just for Standalone mode is instead called SparkDeploySchedulerBackend. I didn't change this because I wasn't sure if there
was a reason for this naming that I'm just not aware of.
some minor fixes to MemoryStore
This is a repeat of #5, moved to its own branch in my repo.
This makes all updates to on ; it skips on synchronizing the reads where it can get away with it.
This commit does not change any code -- only file organization.
There are two components of this change:
(1) Moving files out of the cluster package, and down
a level to the scheduling package. These files are all used by
the local scheduler in addition to the cluster scheduler(s), so
should not be in the cluster package. As a result of this change,
none of the files in the local package reference files in the
cluster package.
(2) Moving the mesos package to within the cluster package.
The mesos scheduling code is for a cluster, and represents a
specific case of cluster scheduling (the Mesos-related classes
often subclass cluster scheduling classes). Thus, the most logical
place for it is within the cluster package.
This change requires adding an extra failure mode: tasks can complete
successfully, but the result gets lost or flushed from the block manager
before it's been fetched.
Make "currentMemory" @volatile, so that it's reads in ensureFreeSpace() are atomic and up-to-date--i.e., currentMemory can't increase while putLock is held (though it could decrease, which would only help ensureFreeSpace()).
The fact that DynamicVariable uses an InheritableThreadLocal
can cause problems where the properties end up being shared
across threads in certain circumstances.
StandaloneSchedulerBackend instead of the smaller IDs used within Spark
(that lack the application name).
This was reported by ClearStory in
https://github.com/clearstorydata/spark/pull/9.
Also fixed some messages that said slave instead of executor.
Include the useful tip that if shuffle=true, coalesce can actually
increase the number of partitions.
This makes coalesce more like a generic `RDD.repartition` operation.
(Ideally this `RDD.repartition` could automatically choose either a coalesce or
a shuffle if numPartitions was either less than or greater than, respectively,
the current number of partitions.)
The sc.StorageLevel -> StorageLevel pathway is a bit janky, but otherwise
the shell would have to call a private method of SparkContext. Having
StorageLevel available in sc also doesn't seem like the end of the world.
There may be a better solution, though.
As for creating the StorageLevel object itself, this seems to be the best
way in Python 2 for creating singleton, enum-like objects:
http://stackoverflow.com/questions/36932/how-can-i-represent-an-enum-in-python
Caching the results of local actions (e.g., rdd.first()) causes the driver to
store entire partitions in its own memory, which may be highly constrained.
This patch simply makes the CacheManager avoid caching the result of all locally-run computations.
- Use "fluid" layout that can expand to wide browser windows, instead of
the old one's limit of 1200 px
- Remove unnecessary <hr> elements
- Switch back to Bootstrap's default theme and tweak progress bar colors
- Make headers more consistent between deploy and app UIs
- Replace some inline CSS with stylesheets
This commit makes Spark invocation saner by using an assembly JAR to
find all of Spark's dependencies instead of adding all the JARs in
lib_managed. It also packages the examples into an assembly and uses
that as SPARK_EXAMPLES_JAR. Finally, it replaces the old "run" script
with two better-named scripts: "run-examples" for examples, and
"spark-class" for Spark internal classes (e.g. REPL, master, etc). This
is also designed to minimize the confusion people have in trying to use
"run" to run their own classes; it's not meant to do that, but now at
least if they look at it, they can modify run-examples to do a decent
job for them.
As part of this, Bagel's examples are also now properly moved to the
examples package instead of bagel.
- Use SPARK_PUBLIC_DNS environment variable if set (for EC2)
- Use a non-ephemeral port (3030 instead of 33000) by default
- Updated test to use non-ephemeral port too
- When a resourceOffers() call has multiple offers, force the TaskSets
to consider them in increasing order of locality levels so that they
get a chance to launch stuff locally across all offers
- Simplify ClusterScheduler.prioritizeContainers
- Add docs on the new configuration options
- Added periodic revival of offers in StandaloneSchedulerBackend
- Replaced task scheduling aggression with multi-level delay scheduling
in ClusterTaskSetManager
- Fixed ZippedRDD preferred locations because they can't currently be
process-local
- Fixed some uses of hostPort
- Replace use of hostPort vs host in Task.preferredLocations with a
TaskLocation class that contains either an executorId and a host or
just a host. This is part of a bigger effort to eliminate hostPort
based data structures and just use executorID, since the hostPort vs
host stuff is confusing (and not checkable with static typing, leading
to ugly debug code), and hostPorts are not provided by Mesos.
- Replaced most hostPort-based data structures and fields as above.
- Simplified ClusterTaskSetManager to deal with preferred locations in a
more concise way and generally be more concise.
- Updated the way ClusterTaskSetManager handles racks: instead of
enqueueing a task to a separate queue for all the hosts in the rack,
which would create lots of large queues, have one queue per rack name.
- Removed non-local fallback stuff in ClusterScheduler that tried to
launch less-local tasks on a node once the local ones were all
assigned. This change didn't work because many cluster schedulers send
offers for just one node at a time (even the standalone and YARN ones
do so as nodes join the cluster one by one). Thus, lots of non-local
tasks would be assigned even though a node with locality for them
would be able to receive tasks just a short time later.
- Renamed MapOutputTracker "generations" to "epochs".
It made the JSON creation slightly more complicated, but reduces one external dependency. The scala library also properly escape "/" (which lift-json doesn't).
These are used all over the place now and they are not specific to memory at all.
memoryBytesToString --> bytesToString
memoryMegabytesToString --> megabytesToString
Ops teams need to ensure that the cluster is functional and performant. Having to scrape the html source for worker state won't work reliably, and will be slow. By exposing the state in the json output, ops teams are able to ensure a fully functional environment by querying for the json output and parsing for dead nodes.
1. Renamed SparkContext.addLocalProperty to setLocalProperty. And allow this function to unset a property.
2. Renamed SparkContext.setDescription to setCurrentJobDescription.
3. Throw an exception if the fair scheduler allocation file is invalid.
1) UI crashed if the executor UI was loaded before any tasks started.
2) The total tasks was incorrectly reported due to using string (rather
than int) arithmetic.
The TaskState class's isFinished() method didn't return true for
KILLED tasks, which means some resources are never reclaimed
for tasks that are killed. This also made it inconsistent with the
isFinished() method used by CoarseMesosSchedulerBackend.
JobConf's constructor loads default config files in some verisons of
Hadoop, which is quite slow, and we only need the Configuration object
to pass the correct ClassLoader.
This commit fixes issues where SparkListeners that take a while to
process events slow the DAGScheduler.
This commit also fixes a bug in the UI where if a user goes to a
web page of a stage that does not exist, they can create a memory
leak (granted, this is not an issue at small scale -- probably only
an issue if someone actively tried to DOS the UI).
One bug caused the UI to crash if you try to look at a job's status
before any of the tasks have finished.
The second bug was a concurrency issue where two different threads
(the scheduling thread and a UI thread) could be reading/updating
the data structures in JobProgressListener concurrently.
The third bug mis-used an Option, also causing the UI to crash
under certain conditions.
Removal of items from ArrayBuffers in the UI code was slow and
significantly impacted scheduler throughput. This commit
improves scheduler throughput by 5x.
- Changes ALS to accept RDD[Rating] instead of (Int, Int, Double) making it
easier to call from Java
- Renames class methods from `train` to `run` to enable static methods to be
called from Java.
- Add unit tests which check if both static / class methods can be called.
- Also add examples which port the main() function in ALS, KMeans to the
examples project.
Couple of minor changes to existing code:
- Add a toJavaRDD method in RDD to convert scala RDD to java RDD easily
- Workaround a bug where using double[] from Java leads to class cast exception in
KMeans init
A ThreadLocal SparkEnv.env is facing various situations leading to
NullPointerExceptions, where SparkEnv.env set in one thread is not
gettable in another thread, but often assumed to be available.
See, e.g., https://groups.google.com/forum/#!topic/spark-developers/GLx8yunSj0A
This hotfixes SparkEnv.env to return either (a) the ThreadLocal
value if non-null, or (b) the previously set value in any thread.
This approach preserves SparkEnv.set() thread safety needed by
RDD.compute() and possibly other places. A refactoring that
parameterizes SparkEnv should be addressed subsequently.
On branch adatao-global-SparkEnv
Changes to be committed:
modified: core/src/main/scala/spark/SparkEnv.scala
requiring Spark to be installed. Using 'make_distribution.sh' a user
can put a Spark distribution at a URI supported by Mesos (e.g.,
'hdfs://...') and then set that when launching their job. Also added
SPARK_EXECUTOR_URI for the REPL.
This fixes SPARK-832, an issue where PySpark
would not work when the master and workers used
different SPARK_HOME paths.
This change may potentially break code that relied
on the master's PYTHONPATH being used on workers.
To have custom PYTHONPATH additions used on the
workers, users should set a custom PYTHONPATH in
spark-env.sh rather than setting it in the shell.
This adds a tab which displays system property and classpath information. This
can be useful in debugging various types of issues such as:
1. Extra/incorrect Hadoop jars being included in the classpath
2. Spark launching with a different JRE version than intended
3. Spark system properties not being set to intended values
4. User added jars that conflict with Spark jars
1. Set akka log level to ERROR before shutting down the actorSystem.
This avoids akka log messages (like Spray) from falling back to INFO
on the Stdout logger
2. Initialize netty to use SLF4J in LocalSparkContext. This ensures that
stack trace thrown during shutdown is handled by SLF4J instead of stdout
If a task fails before the metrics are initialized, it remains possible
that the metrics field will be `None`. This patch accounts for that possbility
by keeping metrics as an `Option` at all times.
Before, when withReplacement was set to true, we would not get a sample
bigger than the RDD's count().
Conflicts:
core/src/main/scala/spark/RDD.scala
core/src/test/scala/spark/RDDSuite.scala
variable conditional on if its set, and created addCredentials in each of the SparkHadoopUtil classes
to only add the credentials when the profile is hadoop2-yarn.
The previous version assumed that a CLASSPATH environment variable was
set by the "run" script when launching the process that starts the
ExecutorRunner, but unfortunately this is not true in tests. Instead, we
factor the classpath calculation into an extenral script and call that.
NOTE: This includes a Windows version but hasn't yet been tested there.
- Split SPARK_JAVA_OPTS into multiple command-line arguments if it
contains spaces; this splitting follows quoting rules in bash
- Add the Scala JARs to the classpath if they're not in the CLASSPATH
variable because the ExecutorRunner is launched with "scala" (this can
happen when using local-cluster URLs in spark-shell)
The old version reused the object within each task, leading to
overwriting of the object when a mutable type is used, which is expected
to be common in fold.
Conflicts:
core/src/test/scala/spark/ShuffleSuite.scala