Show "GETTING_RESULTS" state in UI.
This commit adds a set of calls using the SparkListener interface
that indicate when a task is remotely fetching results, so that
we can display this (potentially time-consuming) phase of execution
to users through the UI.
MQTT Adapter for Spark Streaming
MQTT is a machine-to-machine (M2M)/Internet of Things connectivity protocol.
It was designed as an extremely lightweight publish/subscribe messaging transport. You may read more about it here http://mqtt.org/
Message Queue Telemetry Transport (MQTT) is an open message protocol for M2M communications. It enables the transfer of telemetry-style data in the form of messages from devices like sensors and actuators, to mobile phones, embedded systems on vehicles, or laptops and full scale computers.
The protocol was invented by Andy Stanford-Clark of IBM, and Arlen Nipper of Cirrus Link Solutions
This protocol enables a publish/subscribe messaging model in an extremely lightweight way. It is useful for connections with remote locations where line of code and network bandwidth is a constraint.
MQTT is one of the widely used protocol for 'Internet of Things'. This protocol is getting much attraction as anything and everything is getting connected to internet and they all produce data. Researchers and companies predict some 25 billion devices will be connected to the internet by 2015.
Plugin/Support for MQTT is available in popular MQs like RabbitMQ, ActiveMQ etc.
Support for MQTT in Spark will help people with Internet of Things (IoT) projects to use Spark Streaming for their real time data processing needs (from sensors and other embedded devices etc).
Add classmethod to SparkContext to set system properties.
Add a new classmethod to SparkContext to set system properties like is
possible in Scala/Java. Unlike the Java/Scala implementations, there's
no access to System until the JVM bridge is created. Since
SparkContext handles that, move the initialization of the JVM
connection to a separate classmethod that can safely be called
repeatedly as long as the same instance (or no instance) is provided.
Remove redundant Java Function call() definitions
This should fix [SPARK-902](https://spark-project.atlassian.net/browse/SPARK-902), an issue where some Java API Function classes could cause AbstractMethodErrors when user code is compiled using the Eclipse compiler.
Thanks to @MartinWeindel for diagnosing this problem.
(This PR subsumes #30).
This should fix SPARK-902, an issue where some
Java API Function classes could cause
AbstractMethodErrors when user code is compiled
using the Eclipse compiler.
Thanks to @MartinWeindel for diagnosing this
problem.
(This PR subsumes / closes#30)
SPARK-940: Do not directly pass Stage objects to SparkListener.
This patch updates the SparkListener interface to pass StageInfo objects rather than directly pass spark Stages. The reason for this patch is explained in detail in SPARK-940.
The constructor for SparkContext should pass in self so that we track
the current context and produce errors if another one is created. Add
a doctest to make sure creating multiple contexts triggers the
exception.
This patch fixes a bug where the Spark UI didn't display the correct number of total
tasks if the number of tasks in a Stage doesn't equal the number of RDD partitions.
It also cleans up the listener API a bit by embedding this information in the
StageInfo class rather than passing it seperately.
Split MapOutputTracker into Master/Worker classes
Previously, MapOutputTracker contained fields and methods that were only applicable to the master or worker instances. This commit introduces a MasterMapOutputTracker class to prevent the master-specific methods from being accessed on workers.
I also renamed a few methods and made others protected/private.
Add a new classmethod to SparkContext to set system properties like is
possible in Scala/Java. Unlike the Java/Scala implementations, there's
no access to System until the JVM bridge is created. Since
SparkContext handles that, move the initialization of the JVM
connection to a separate classmethod that can safely be called
repeatedly as long as the same instance (or no instance) is provided.
Fix the Worker to use CoarseGrainedExecutorBackend and modify classpath ...
...to be explicit about inclusion of spark.jar and app.jar. Be explicit so if there are any conflicts in packaging between spark.jar and app.jar we don't get random results due to the classpath having /*, which can including things in different order.
Basic shuffle file consolidation
The Spark shuffle phase can produce a large number of files, as one file is created
per mapper per reducer. For large or repeated jobs, this often produces millions of
shuffle files, which sees extremely degredaded performance from the OS file system.
This patch seeks to reduce that burden by combining multipe shuffle files into one.
This PR draws upon the work of @jason-dai in https://github.com/mesos/spark/pull/669.
However, it simplifies the design in order to get the majority of the gain with less
overall intellectual and code burden. The vast majority of code in this pull request
is a refactor to allow the insertion of a clean layer of indirection between logical
block ids and physical files. This, I feel, provides some design clarity in addition
to enabling shuffle file consolidation.
The main goal is to produce one shuffle file per reducer per active mapper thread.
This allows us to isolate the mappers (simplifying the failure modes), while still
allowing us to reduce the number of mappers tremendously for large tasks. In order
to accomplish this, we simply create a new set of shuffle files for every parallel
task, and return the files to a pool which will be given out to the next run task.
I have run some ad hoc query testing on 5 m1.xlarge EC2 nodes with 2g of executor memory and the following microbenchmark:
scala> val nums = sc.parallelize(1 to 1000, 1000).flatMap(x => (1 to 1e6.toInt))
scala> def time(x: => Unit) = { val now = System.currentTimeMillis; x; System.currentTimeMillis - now }
scala> (1 to 8).map(_ => time(nums.map(x => (x % 100000, 2000, x)).reduceByKey(_ + _).count) / 1000.0)
For this particular workload, with 1000 mappers and 2000 reducers, I saw the old method running at around 15 minutes, with the consolidated shuffle files running at around 4 minutes. There was a very sharp increase in running time for the non-consolidated version after around 1 million total shuffle files. Below this threshold, however, there wasn't a significant difference between the two.
Better performance measurement of this patch is warranted, and I plan on doing so in the near future as part of a general investigation of our shuffle file bottlenecks and performance.
This commit adds a set of calls using the SparkListener interface
that indicate when a task is remotely fetching results, so that
we can display this (potentially time-consuming) phase of execution
to users through the UI.
Made the following traits/interfaces/classes non-public:
Made the following traits/interfaces/classes non-public:
SparkHadoopWriter
SparkHadoopMapRedUtil
SparkHadoopMapReduceUtil
SparkHadoopUtil
PythonAccumulatorParam
BlockManagerSlaveActor
Provide Instrumentation for Shuffle Write Performance
Shuffle write performance can have a major impact on the performance of jobs. This patch adds a few pieces of instrumentation related to shuffle writes. They are:
1. A listing of the time spent performing blocking writes for each task. This is implemented by keeping track of the aggregate delay seen by many individual writes.
2. An undocumented option `spark.shuffle.sync` which forces shuffle data to sync to disk. This is necessary for measuring shuffle performance in the absence of the OS buffer cache.
3. An internal utility which micro-benchmarks write throughput for simulated shuffle outputs.
I'm going to do some performance testing on this to see whether these small timing calls add overhead. From a feature perspective, however, I consider this complete. Any feedback is appreciated.
Don't setup the uncaught exception handler in local mode.
This avoids unit test failures for Spark streaming.
java.util.concurrent.RejectedExecutionException: Task org.apache.spark.streaming.JobManager$JobHandler@38cf728d rejected from java.util.concurrent.ThreadPoolExecutor@3b69a41e[Terminated, pool size = 0, active threads = 0, queued tasks = 0, completed tasks = 14]
at java.util.concurrent.ThreadPoolExecutor$AbortPolicy.rejectedExecution(ThreadPoolExecutor.java:2048)
at java.util.concurrent.ThreadPoolExecutor.reject(ThreadPoolExecutor.java:821)
at java.util.concurrent.ThreadPoolExecutor.execute(ThreadPoolExecutor.java:1372)
at org.apache.spark.streaming.JobManager.runJob(JobManager.scala:54)
at org.apache.spark.streaming.Scheduler$$anonfun$generateJobs$2.apply(Scheduler.scala:108)
at org.apache.spark.streaming.Scheduler$$anonfun$generateJobs$2.apply(Scheduler.scala:108)
at scala.collection.mutable.ResizableArray$class.foreach(ResizableArray.scala:60)
at scala.collection.mutable.ArrayBuffer.foreach(ArrayBuffer.scala:47)
at org.apache.spark.streaming.Scheduler.generateJobs(Scheduler.scala:108)
at org.apache.spark.streaming.Scheduler$$anonfun$1.apply$mcVJ$sp(Scheduler.scala:41)
at org.apache.spark.streaming.util.RecurringTimer.org$apache$spark$streaming$util$RecurringTimer$$loop(RecurringTimer.scala:66)
at org.apache.spark.streaming.util.RecurringTimer$$anon$1.run(RecurringTimer.scala:34)