Add a new classmethod to SparkContext to set system properties like is
possible in Scala/Java. Unlike the Java/Scala implementations, there's
no access to System until the JVM bridge is created. Since
SparkContext handles that, move the initialization of the JVM
connection to a separate classmethod that can safely be called
repeatedly as long as the same instance (or no instance) is provided.
Fix the Worker to use CoarseGrainedExecutorBackend and modify classpath ...
...to be explicit about inclusion of spark.jar and app.jar. Be explicit so if there are any conflicts in packaging between spark.jar and app.jar we don't get random results due to the classpath having /*, which can including things in different order.
Basic shuffle file consolidation
The Spark shuffle phase can produce a large number of files, as one file is created
per mapper per reducer. For large or repeated jobs, this often produces millions of
shuffle files, which sees extremely degredaded performance from the OS file system.
This patch seeks to reduce that burden by combining multipe shuffle files into one.
This PR draws upon the work of @jason-dai in https://github.com/mesos/spark/pull/669.
However, it simplifies the design in order to get the majority of the gain with less
overall intellectual and code burden. The vast majority of code in this pull request
is a refactor to allow the insertion of a clean layer of indirection between logical
block ids and physical files. This, I feel, provides some design clarity in addition
to enabling shuffle file consolidation.
The main goal is to produce one shuffle file per reducer per active mapper thread.
This allows us to isolate the mappers (simplifying the failure modes), while still
allowing us to reduce the number of mappers tremendously for large tasks. In order
to accomplish this, we simply create a new set of shuffle files for every parallel
task, and return the files to a pool which will be given out to the next run task.
I have run some ad hoc query testing on 5 m1.xlarge EC2 nodes with 2g of executor memory and the following microbenchmark:
scala> val nums = sc.parallelize(1 to 1000, 1000).flatMap(x => (1 to 1e6.toInt))
scala> def time(x: => Unit) = { val now = System.currentTimeMillis; x; System.currentTimeMillis - now }
scala> (1 to 8).map(_ => time(nums.map(x => (x % 100000, 2000, x)).reduceByKey(_ + _).count) / 1000.0)
For this particular workload, with 1000 mappers and 2000 reducers, I saw the old method running at around 15 minutes, with the consolidated shuffle files running at around 4 minutes. There was a very sharp increase in running time for the non-consolidated version after around 1 million total shuffle files. Below this threshold, however, there wasn't a significant difference between the two.
Better performance measurement of this patch is warranted, and I plan on doing so in the near future as part of a general investigation of our shuffle file bottlenecks and performance.
This commit adds a set of calls using the SparkListener interface
that indicate when a task is remotely fetching results, so that
we can display this (potentially time-consuming) phase of execution
to users through the UI.
Made the following traits/interfaces/classes non-public:
Made the following traits/interfaces/classes non-public:
SparkHadoopWriter
SparkHadoopMapRedUtil
SparkHadoopMapReduceUtil
SparkHadoopUtil
PythonAccumulatorParam
BlockManagerSlaveActor
Provide Instrumentation for Shuffle Write Performance
Shuffle write performance can have a major impact on the performance of jobs. This patch adds a few pieces of instrumentation related to shuffle writes. They are:
1. A listing of the time spent performing blocking writes for each task. This is implemented by keeping track of the aggregate delay seen by many individual writes.
2. An undocumented option `spark.shuffle.sync` which forces shuffle data to sync to disk. This is necessary for measuring shuffle performance in the absence of the OS buffer cache.
3. An internal utility which micro-benchmarks write throughput for simulated shuffle outputs.
I'm going to do some performance testing on this to see whether these small timing calls add overhead. From a feature perspective, however, I consider this complete. Any feedback is appreciated.
Don't setup the uncaught exception handler in local mode.
This avoids unit test failures for Spark streaming.
java.util.concurrent.RejectedExecutionException: Task org.apache.spark.streaming.JobManager$JobHandler@38cf728d rejected from java.util.concurrent.ThreadPoolExecutor@3b69a41e[Terminated, pool size = 0, active threads = 0, queued tasks = 0, completed tasks = 14]
at java.util.concurrent.ThreadPoolExecutor$AbortPolicy.rejectedExecution(ThreadPoolExecutor.java:2048)
at java.util.concurrent.ThreadPoolExecutor.reject(ThreadPoolExecutor.java:821)
at java.util.concurrent.ThreadPoolExecutor.execute(ThreadPoolExecutor.java:1372)
at org.apache.spark.streaming.JobManager.runJob(JobManager.scala:54)
at org.apache.spark.streaming.Scheduler$$anonfun$generateJobs$2.apply(Scheduler.scala:108)
at org.apache.spark.streaming.Scheduler$$anonfun$generateJobs$2.apply(Scheduler.scala:108)
at scala.collection.mutable.ResizableArray$class.foreach(ResizableArray.scala:60)
at scala.collection.mutable.ArrayBuffer.foreach(ArrayBuffer.scala:47)
at org.apache.spark.streaming.Scheduler.generateJobs(Scheduler.scala:108)
at org.apache.spark.streaming.Scheduler$$anonfun$1.apply$mcVJ$sp(Scheduler.scala:41)
at org.apache.spark.streaming.util.RecurringTimer.org$apache$spark$streaming$util$RecurringTimer$$loop(RecurringTimer.scala:66)
at org.apache.spark.streaming.util.RecurringTimer$$anon$1.run(RecurringTimer.scala:34)
Code de-duplication in BlockManager
The BlockManager has a few methods that duplicate most of their code. This pull request extracts the duplicated code into private doPut(), doGetLocal(), and doGetRemote() methods that unify the storing/reading of bytes or objects.
I believe that I preserved the logic of the original code, but I'd appreciate some help in reviewing this.
The Spark shuffle phase can produce a large number of files, as one file is created
per mapper per reducer. For large or repeated jobs, this often produces millions of
shuffle files, which sees extremely degredaded performance from the OS file system.
This patch seeks to reduce that burden by combining multipe shuffle files into one.
This PR draws upon the work of Jason Dai in https://github.com/mesos/spark/pull/669.
However, it simplifies the design in order to get the majority of the gain with less
overall intellectual and code burden. The vast majority of code in this pull request
is a refactor to allow the insertion of a clean layer of indirection between logical
block ids and physical files. This, I feel, provides some design clarity in addition
to enabling shuffle file consolidation.
The main goal is to produce one shuffle file per reducer per active mapper thread.
This allows us to isolate the mappers (simplifying the failure modes), while still
allowing us to reduce the number of mappers tremendously for large tasks. In order
to accomplish this, we simply create a new set of shuffle files for every parallel
task, and return the files to a pool which will be given out to the next run task.
The main goal of this refactor was to allow the interposition of a new layer which
maps logical BlockIds to physical locations other than a file with the same name
as the BlockId. In particular, BlockIds will need to be mappable to chunks of files,
as multiple will be stored in the same file.
In order to accomplish this, the following changes have been made:
- Creation of DiskBlockManager, which manages the association of logical BlockIds
to physical disk locations (called FileSegments). By default, Blocks are simply
mapped to physical files of the same name, as before.
- The DiskStore now indirects all requests for a given BlockId through the DiskBlockManager
in order to resolve the actual File location.
- DiskBlockObjectWriter has been merged into BlockObjectWriter.
- The Netty PathResolver has been changed to map BlockIds into FileSegments, as this
codepath is the only one that uses Netty, and that is likely to remain the case.
Overall, I think this refactor produces a clearer division between the logical Block
paradigm and their physical on-disk location. There is now an explicit (and documented)
mapping from one to the other.
Add an add() method to pyspark accumulators.
Add a regular method for adding a term to accumulators in
pyspark. Currently if you have a non-global accumulator, adding to it
is awkward. The += operator can't be used for non-global accumulators
captured via closure because it's involves an assignment. The only way
to do it is using __iadd__ directly.
Adding this method lets you write code like this:
def main():
sc = SparkContext()
accum = sc.accumulator(0)
rdd = sc.parallelize([1,2,3])
def f(x):
accum.add(x)
rdd.foreach(f)
print accum.value
where using accum += x instead would have caused UnboundLocalError
exceptions in workers. Currently it would have to be written as
accum.__iadd__(x).
Previously, MapOutputTracker contained fields and methods that
were only applicable to the master or worker instances. This
commit introduces a MasterMapOutputTracker class to prevent
the master-specific methods from being accessed on workers.
I also renamed a few methods and made others protected/private.
Add a regular method for adding a term to accumulators in
pyspark. Currently if you have a non-global accumulator, adding to it
is awkward. The += operator can't be used for non-global accumulators
captured via closure because it's involves an assignment. The only way
to do it is using __iadd__ directly.
Adding this method lets you write code like this:
def main():
sc = SparkContext()
accum = sc.accumulator(0)
rdd = sc.parallelize([1,2,3])
def f(x):
accum.add(x)
rdd.foreach(f)
print accum.value
where using accum += x instead would have caused UnboundLocalError
exceptions in workers. Currently it would have to be written as
accum.__iadd__(x).