Currently PythonPartitioner determines partition ID by hashing a
byte-array representation of PySpark's key. This PR lets
PythonPartitioner use the actual partition ID, which is required e.g.
for sorting via PySpark.
This patch implements full distributed fault tolerance for standalone scheduler Masters.
There is only one master Leader at a time, which is actively serving scheduling
requests. If this Leader crashes, another master will eventually be elected, reconstruct
the state from the first Master, and continue serving scheduling requests.
Leader election is performed using the ZooKeeper leader election pattern. We try to minimize
the use of ZooKeeper and the assumptions about ZooKeeper's behavior, so there is a layer of
retries and session monitoring on top of the ZooKeeper client.
Master failover follows directly from the single-node Master recovery via the file
system (patch 194ba4b8), save that the Master state is stored in ZooKeeper instead.
Configuration:
By default, no recovery mechanism is enabled (spark.deploy.recoveryMode = NONE).
By setting spark.deploy.recoveryMode to ZOOKEEPER and setting spark.deploy.zookeeper.url
to an appropriate ZooKeeper URL, ZooKeeper recovery mode is enabled.
By setting spark.deploy.recoveryMode to FILESYSTEM and setting spark.deploy.recoveryDirectory
to an appropriate directory accessible by the Master, we will keep the behavior of from 194ba4b8.
Additionally, places where a Master could be specificied by a spark:// url can now take
comma-delimited lists to specify backup masters. Note that this is only used for registration
of NEW Workers and application Clients. Once a Worker or Client has registered with the
Master Leader, it is "in the system" and will never need to register again.
Forthcoming:
Documentation, tests (! - only ad hoc testing has been performed so far)
I do not intend for this commit to be merged until tests are added, but this patch should
still be mostly reviewable until then.
Implements a basic form of Standalone Scheduler fault recovery. In particular,
this allows faults to be manually recovered from by means of restarting the
Master process on the same machine. This is the majority of the code necessary
for general fault tolerance, which will first elect a leader and then recover
the Master state.
In order to enable fault recovery, the Master will persist a small amount of state related
to the registration of Workers and Applications to disk. If the Master is started and
sees that this state is still around, it will enter Recovery mode, during which time it
will not schedule any new Executors on Workers (but it does accept the registration of
new Clients and Workers).
At this point, the Master attempts to reconnect to all Workers and Client applications
that were registered at the time of failure. After confirming either the existence
or nonexistence of all such nodes (within a certain timeout), the Master will exit
Recovery mode and resume normal scheduling.
Improved organization of scheduling packages.
This commit does not change any code -- only file organization.
Please let me know if there was some masterminded strategy behind
the existing organization that I failed to understand!
There are two components of this change:
(1) Moving files out of the cluster package, and down
a level to the scheduling package. These files are all used by
the local scheduler in addition to the cluster scheduler(s), so
should not be in the cluster package. As a result of this change,
none of the files in the local package reference files in the
cluster package.
(2) Moving the mesos package to within the cluster package.
The mesos scheduling code is for a cluster, and represents a
specific case of cluster scheduling (the Mesos-related classes
often subclass cluster scheduling classes). Thus, the most logical
place for it seems to be within the cluster package.
The one thing about the scheduling code that seems a little funny to me
is the naming of the SchedulerBackends. The StandaloneSchedulerBackend
is not just for Standalone mode, but instead is used by Mesos coarse grained
mode and Yarn, and the backend that *is* just for Standalone mode is instead called SparkDeploySchedulerBackend. I didn't change this because I wasn't sure if there
was a reason for this naming that I'm just not aware of.
some minor fixes to MemoryStore
This is a repeat of #5, moved to its own branch in my repo.
This makes all updates to on ; it skips on synchronizing the reads where it can get away with it.
This commit does not change any code -- only file organization.
There are two components of this change:
(1) Moving files out of the cluster package, and down
a level to the scheduling package. These files are all used by
the local scheduler in addition to the cluster scheduler(s), so
should not be in the cluster package. As a result of this change,
none of the files in the local package reference files in the
cluster package.
(2) Moving the mesos package to within the cluster package.
The mesos scheduling code is for a cluster, and represents a
specific case of cluster scheduling (the Mesos-related classes
often subclass cluster scheduling classes). Thus, the most logical
place for it is within the cluster package.
This change requires adding an extra failure mode: tasks can complete
successfully, but the result gets lost or flushed from the block manager
before it's been fetched.
Make "currentMemory" @volatile, so that it's reads in ensureFreeSpace() are atomic and up-to-date--i.e., currentMemory can't increase while putLock is held (though it could decrease, which would only help ensureFreeSpace()).
In MapOutputTrackerSuite, the "remote fetch" test sets spark.driver.port
and spark.hostPort, assuming that they will be cleared by
LocalSparkContext. However, the test never sets sc, so it remains null,
causing LocalSparkContext to skip clearing these properties. Subsequent
tests therefore fail with java.net.BindException: "Address already in
use".
This commit makes LocalSparkContext clear the properties even if sc is
null.
The fact that DynamicVariable uses an InheritableThreadLocal
can cause problems where the properties end up being shared
across threads in certain circumstances.
StandaloneSchedulerBackend instead of the smaller IDs used within Spark
(that lack the application name).
This was reported by ClearStory in
https://github.com/clearstorydata/spark/pull/9.
Also fixed some messages that said slave instead of executor.
Include the useful tip that if shuffle=true, coalesce can actually
increase the number of partitions.
This makes coalesce more like a generic `RDD.repartition` operation.
(Ideally this `RDD.repartition` could automatically choose either a coalesce or
a shuffle if numPartitions was either less than or greater than, respectively,
the current number of partitions.)
The sc.StorageLevel -> StorageLevel pathway is a bit janky, but otherwise
the shell would have to call a private method of SparkContext. Having
StorageLevel available in sc also doesn't seem like the end of the world.
There may be a better solution, though.
As for creating the StorageLevel object itself, this seems to be the best
way in Python 2 for creating singleton, enum-like objects:
http://stackoverflow.com/questions/36932/how-can-i-represent-an-enum-in-python
Caching the results of local actions (e.g., rdd.first()) causes the driver to
store entire partitions in its own memory, which may be highly constrained.
This patch simply makes the CacheManager avoid caching the result of all locally-run computations.
- Use "fluid" layout that can expand to wide browser windows, instead of
the old one's limit of 1200 px
- Remove unnecessary <hr> elements
- Switch back to Bootstrap's default theme and tweak progress bar colors
- Make headers more consistent between deploy and app UIs
- Replace some inline CSS with stylesheets
This includes the following changes:
- The "assembly" package now builds in Maven by default, and creates an
assembly containing both hadoop-client and Spark, unlike the old
BigTop distribution assembly that skipped hadoop-client
- There is now a bigtop-dist package to build the old BigTop assembly
- The repl-bin package is no longer built by default since the scripts
don't reply on it; instead it can be enabled with -Prepl-bin
- Py4J is now included in the assembly/lib folder as a local Maven repo,
so that the Maven package can link to it
- run-example now adds the original Spark classpath as well because the
Maven examples assembly lists spark-core and such as provided
- The various Maven projects add a spark-yarn dependency correctly
This commit makes Spark invocation saner by using an assembly JAR to
find all of Spark's dependencies instead of adding all the JARs in
lib_managed. It also packages the examples into an assembly and uses
that as SPARK_EXAMPLES_JAR. Finally, it replaces the old "run" script
with two better-named scripts: "run-examples" for examples, and
"spark-class" for Spark internal classes (e.g. REPL, master, etc). This
is also designed to minimize the confusion people have in trying to use
"run" to run their own classes; it's not meant to do that, but now at
least if they look at it, they can modify run-examples to do a decent
job for them.
As part of this, Bagel's examples are also now properly moved to the
examples package instead of bagel.
- Use SPARK_PUBLIC_DNS environment variable if set (for EC2)
- Use a non-ephemeral port (3030 instead of 33000) by default
- Updated test to use non-ephemeral port too
- When a resourceOffers() call has multiple offers, force the TaskSets
to consider them in increasing order of locality levels so that they
get a chance to launch stuff locally across all offers
- Simplify ClusterScheduler.prioritizeContainers
- Add docs on the new configuration options
- Added periodic revival of offers in StandaloneSchedulerBackend
- Replaced task scheduling aggression with multi-level delay scheduling
in ClusterTaskSetManager
- Fixed ZippedRDD preferred locations because they can't currently be
process-local
- Fixed some uses of hostPort
- Replace use of hostPort vs host in Task.preferredLocations with a
TaskLocation class that contains either an executorId and a host or
just a host. This is part of a bigger effort to eliminate hostPort
based data structures and just use executorID, since the hostPort vs
host stuff is confusing (and not checkable with static typing, leading
to ugly debug code), and hostPorts are not provided by Mesos.
- Replaced most hostPort-based data structures and fields as above.
- Simplified ClusterTaskSetManager to deal with preferred locations in a
more concise way and generally be more concise.
- Updated the way ClusterTaskSetManager handles racks: instead of
enqueueing a task to a separate queue for all the hosts in the rack,
which would create lots of large queues, have one queue per rack name.
- Removed non-local fallback stuff in ClusterScheduler that tried to
launch less-local tasks on a node once the local ones were all
assigned. This change didn't work because many cluster schedulers send
offers for just one node at a time (even the standalone and YARN ones
do so as nodes join the cluster one by one). Thus, lots of non-local
tasks would be assigned even though a node with locality for them
would be able to receive tasks just a short time later.
- Renamed MapOutputTracker "generations" to "epochs".
It made the JSON creation slightly more complicated, but reduces one external dependency. The scala library also properly escape "/" (which lift-json doesn't).
These are used all over the place now and they are not specific to memory at all.
memoryBytesToString --> bytesToString
memoryMegabytesToString --> megabytesToString
Ops teams need to ensure that the cluster is functional and performant. Having to scrape the html source for worker state won't work reliably, and will be slow. By exposing the state in the json output, ops teams are able to ensure a fully functional environment by querying for the json output and parsing for dead nodes.
1. Renamed SparkContext.addLocalProperty to setLocalProperty. And allow this function to unset a property.
2. Renamed SparkContext.setDescription to setCurrentJobDescription.
3. Throw an exception if the fair scheduler allocation file is invalid.
1) UI crashed if the executor UI was loaded before any tasks started.
2) The total tasks was incorrectly reported due to using string (rather
than int) arithmetic.
The TaskState class's isFinished() method didn't return true for
KILLED tasks, which means some resources are never reclaimed
for tasks that are killed. This also made it inconsistent with the
isFinished() method used by CoarseMesosSchedulerBackend.
JobConf's constructor loads default config files in some verisons of
Hadoop, which is quite slow, and we only need the Configuration object
to pass the correct ClassLoader.
This commit fixes issues where SparkListeners that take a while to
process events slow the DAGScheduler.
This commit also fixes a bug in the UI where if a user goes to a
web page of a stage that does not exist, they can create a memory
leak (granted, this is not an issue at small scale -- probably only
an issue if someone actively tried to DOS the UI).
One bug caused the UI to crash if you try to look at a job's status
before any of the tasks have finished.
The second bug was a concurrency issue where two different threads
(the scheduling thread and a UI thread) could be reading/updating
the data structures in JobProgressListener concurrently.
The third bug mis-used an Option, also causing the UI to crash
under certain conditions.
Removal of items from ArrayBuffers in the UI code was slow and
significantly impacted scheduler throughput. This commit
improves scheduler throughput by 5x.
- Changes ALS to accept RDD[Rating] instead of (Int, Int, Double) making it
easier to call from Java
- Renames class methods from `train` to `run` to enable static methods to be
called from Java.
- Add unit tests which check if both static / class methods can be called.
- Also add examples which port the main() function in ALS, KMeans to the
examples project.
Couple of minor changes to existing code:
- Add a toJavaRDD method in RDD to convert scala RDD to java RDD easily
- Workaround a bug where using double[] from Java leads to class cast exception in
KMeans init
A ThreadLocal SparkEnv.env is facing various situations leading to
NullPointerExceptions, where SparkEnv.env set in one thread is not
gettable in another thread, but often assumed to be available.
See, e.g., https://groups.google.com/forum/#!topic/spark-developers/GLx8yunSj0A
This hotfixes SparkEnv.env to return either (a) the ThreadLocal
value if non-null, or (b) the previously set value in any thread.
This approach preserves SparkEnv.set() thread safety needed by
RDD.compute() and possibly other places. A refactoring that
parameterizes SparkEnv should be addressed subsequently.
On branch adatao-global-SparkEnv
Changes to be committed:
modified: core/src/main/scala/spark/SparkEnv.scala