This is a bug fix for #42 (79d07d6604).
In Master, we do not bind() each SparkUI because we do not want to start a server for each finished application. However, when we remove the associated application, we call stop() on the SparkUI, which throws an assertion failure.
This fix ensures we don't call stop() on a SparkUI that was never bind()'ed.
Author: Andrew Or <andrewor14@gmail.com>
Closes#188 from andrewor14/ui-fix and squashes the following commits:
94a925f [Andrew Or] Do not stop SparkUI if bind() is not called
This is for JIRA:https://spark-project.atlassian.net/browse/SPARK-1099
And this is what I do in this patch (also commented in the JIRA) @aarondav
This is really a behavioral change, so I do this with great caution, and welcome any review advice:
1 I change the "MASTER=local" pattern of create LocalBackEnd . In the past, we passed 1 core to it . now it use a default cores
The reason here is that when someone use spark-shell to start local mode , Repl will use this "MASTER=local" pattern as default.
So if one also specify cores in the spark-shell command line, it will all go in here. So here pass 1 core is not suitalbe reponding to our change here.
2 In the LocalBackEnd , the "totalCores" variable are fetched following a different rule(in the past it just take in a userd passed cores, like 1 in "MASTER=local" pattern, 2 in "MASTER=local[2]" pattern"
rules:
a The second argument of LocalBackEnd 's constructor indicating cores have a default value which is Int.MaxValue. If user didn't pass it , its first default value is Int.MaxValue
b In getMaxCores, we first compare the former value to Int.MaxValue. if it's not equal, we think that user has passed their desired value, so just use it
c. If b is not satified, we then get cores from spark.cores.max, and we get real logical cores from Runtime. And if cores specified by spark.cores.max is bigger than logical cores, we use logical cores, otherwise we use spark.cores.max
3 In SparkContextSchedulerCreationSuite 's test("local") case, assertion is modified from 1 to logical cores, because "MASTER=local" pattern use default vaules.
Author: qqsun8819 <jin.oyj@alibaba-inc.com>
Closes#110 from qqsun8819/local-cores and squashes the following commits:
731aefa [qqsun8819] 1 LocalBackend not change 2 In SparkContext do some process to the cores and pass it to original LocalBackend constructor
78b9c60 [qqsun8819] 1 SparkContext MASTER=local pattern use default cores instead of 1 to construct LocalBackEnd , for use of spark-shell and cores specified in cmd line 2 some test case change from local to local[1]. 3 SparkContextSchedulerCreationSuite test spark.cores.max config in local pattern
6ae1ee8 [qqsun8819] Add a static function in LocalBackEnd to let it use spark.cores.max specified cores when no cores are passed to it
The fleeting nature of the Spark Web UI has long been a problem reported by many users: The existing Web UI disappears as soon as the associated application terminates. This is because SparkUI is tightly coupled with SparkContext, and cannot be instantiated independently from it. To solve this, some state must be saved to persistent storage while the application is still running.
The approach taken by this PR involves persisting the UI state through SparkListenerEvents. This requires a major refactor of the SparkListener interface because existing events (1) maintain deep references, making de/serialization is difficult, and (2) do not encode all the information displayed on the UI. In this design, each existing listener for the UI (e.g. ExecutorsListener) maintains state that can be fully constructed from SparkListenerEvents. This state is then supplied to the parent UI (e.g. ExecutorsUI), which renders the associated page(s) on demand.
This PR introduces two important classes: the **EventLoggingListener**, and the **ReplayListenerBus**. In a live application, SparkUI registers an EventLoggingListener with the SparkContext in addition to the existing listeners. Over the course of the application, this listener serializes and logs all events to persisted storage. Then, after the application has finished, the SparkUI can be revived by replaying all the logged events to the existing UI listeners through the ReplayListenerBus.
This feature is currently integrated with the Master Web UI, which optionally rebuilds a SparkUI from event logs as soon as the corresponding application finishes.
More details can be found in the commit messages, comments within the code, and the [design doc](https://spark-project.atlassian.net/secure/attachment/12900/PersistingSparkWebUI.pdf). Comments and feedback are most welcome.
Author: Andrew Or <andrewor14@gmail.com>
Author: andrewor14 <andrewor14@gmail.com>
Closes#42 from andrewor14/master and squashes the following commits:
e5f14fa [Andrew Or] Merge github.com:apache/spark
a1c5cd9 [Andrew Or] Merge github.com:apache/spark
b8ba817 [Andrew Or] Remove UI from map when removing application in Master
83af656 [Andrew Or] Scraps and pieces (no functionality change)
222adcd [Andrew Or] Merge github.com:apache/spark
124429f [Andrew Or] Clarify LiveListenerBus behavior + Add tests for new behavior
f80bd31 [Andrew Or] Simplify static handler and BlockManager status update logic
9e14f97 [Andrew Or] Moved around functionality + renamed classes per Patrick
6740e49 [Andrew Or] Fix comment nits
650eb12 [Andrew Or] Add unit tests + Fix bugs found through tests
45fd84c [Andrew Or] Remove now deprecated test
c5c2c8f [Andrew Or] Remove list of (TaskInfo, TaskMetrics) from StageInfo
3456090 [Andrew Or] Address Patrick's comments
bf80e3d [Andrew Or] Imports, comments, and code formatting, once again (minor)
ac69ec8 [Andrew Or] Fix test fail
d801d11 [Andrew Or] Merge github.com:apache/spark (major)
dc93915 [Andrew Or] Imports, comments, and code formatting (minor)
77ba283 [Andrew Or] Address Kay's and Patrick's comments
b6eaea7 [Andrew Or] Treating SparkUI as a handler of MasterUI
d59da5f [Andrew Or] Avoid logging all the blocks on each executor
d6e3b4a [Andrew Or] Merge github.com:apache/spark
ca258a4 [Andrew Or] Master UI - add support for reading compressed event logs
176e68e [Andrew Or] Fix deprecated message for JavaSparkContext (minor)
4f69c4a [Andrew Or] Master UI - Rebuild SparkUI on application finish
291b2be [Andrew Or] Correct directory in log message "INFO: Logging events to <dir>"
1ba3407 [Andrew Or] Add a few configurable options to event logging
e375431 [Andrew Or] Add new constructors for SparkUI
18b256d [Andrew Or] Refactor out event logging and replaying logic from UI
bb4c503 [Andrew Or] Use a more mnemonic path for logging
aef411c [Andrew Or] Fix bug: storage status was not reflected on UI in the local case
03eda0b [Andrew Or] Fix HDFS flush behavior
36b3e5d [Andrew Or] Add HDFS support for event logging
cceff2b [andrewor14] Fix 100 char format fail
2fee310 [Andrew Or] Address Patrick's comments
2981d61 [Andrew Or] Move SparkListenerBus out of DAGScheduler + Clean up
5d2cec1 [Andrew Or] JobLogger: ID -> Id
0503e4b [Andrew Or] Fix PySpark tests + remove sc.clearFiles/clearJars
4d2fb0c [Andrew Or] Fix format fail
faa113e [Andrew Or] General clean up
d47585f [Andrew Or] Clean up FileLogger
472fd8a [Andrew Or] Fix a couple of tests
996d7a2 [Andrew Or] Reflect RDD unpersist on UI
7b2f811 [Andrew Or] Guard against TaskMetrics NPE + Fix tests
d1f4285 [Andrew Or] Migrate from lift-json to json4s-jackson
28019ca [Andrew Or] Merge github.com:apache/spark
bbe3501 [Andrew Or] Embed storage status and RDD info in Task events
6631c02 [Andrew Or] More formatting changes, this time mainly for Json DSL
70e7e7a [Andrew Or] Formatting changes
e9e1c6d [Andrew Or] Move all JSON de/serialization logic to JsonProtocol
d646df6 [Andrew Or] Completely decouple SparkUI from SparkContext
6814da0 [Andrew Or] Explicitly register each UI listener rather than through some magic
64d2ce1 [Andrew Or] Fix BlockManagerUI bug by introducing new event
4273013 [Andrew Or] Add a gateway SparkListener to simplify event logging
904c729 [Andrew Or] Fix another major bug
5ac906d [Andrew Or] Mostly naming, formatting, and code style changes
3fd584e [Andrew Or] Fix two major bugs
f3fc13b [Andrew Or] General refactor
4dfcd22 [Andrew Or] Merge git://git.apache.org/incubator-spark into persist-ui
b3976b0 [Andrew Or] Add functionality of reconstructing a persisted UI from SparkContext
8add36b [Andrew Or] JobProgressUI: Add JSON functionality
d859efc [Andrew Or] BlockManagerUI: Add JSON functionality
c4cd480 [Andrew Or] Also deserialize new events
8a2ebe6 [Andrew Or] Fix bugs for EnvironmentUI and ExecutorsUI
de8a1cd [Andrew Or] Serialize events both to and from JSON (rather than just to)
bf0b2e9 [Andrew Or] ExecutorUI: Serialize events rather than arbitary executor information
bb222b9 [Andrew Or] ExecutorUI: render completely from JSON
dcbd312 [Andrew Or] Add JSON Serializability for all SparkListenerEvent's
10ed49d [Andrew Or] Merge github.com:apache/incubator-spark into persist-ui
8e09306 [Andrew Or] Use JSON for ExecutorsUI
e3ae35f [Andrew Or] Merge github.com:apache/incubator-spark
3ddeb7e [Andrew Or] Also privatize fields
090544a [Andrew Or] Privatize methods
13920c9 [Andrew Or] Update docs
bd5a1d7 [Andrew Or] Typo: phyiscal -> physical
287ef44 [Andrew Or] Avoid reading the entire batch into memory; also simplify streaming logic
3df7005 [Andrew Or] Merge branch 'master' of github.com:andrewor14/incubator-spark
a531d2e [Andrew Or] Relax assumptions on compressors and serializers when batching
164489d [Andrew Or] Relax assumptions on compressors and serializers when batching
Move the PR#517 of apache-incubator-spark to the apache-spark
Author: Mridul Muralidharan <mridul@gmail.com>
Closes#159 from mridulm/master and squashes the following commits:
5ff59c2 [Mridul Muralidharan] Change property in suite also
167fad8 [Mridul Muralidharan] Address review comments
9bda70e [Mridul Muralidharan] Address review comments, akwats add to failedExecutors
270d841 [Mridul Muralidharan] Address review comments
fa5d9f1 [Mridul Muralidharan] Bugfixes/improvements to scheduler : PR #517
Author: Thomas Graves <tgraves@apache.org>
Closes#173 from tgravescs/SPARK-1203 and squashes the following commits:
4fd5ded [Thomas Graves] adding import
964e3f7 [Thomas Graves] SPARK-1203 fix saving to hdfs from yarn
If a stage which has completed once loss parts of data, it will be resubmitted. At this time, it appears that stage.completionTime > stage.submissionTime.
Author: shiyun.wxm <shiyun.wxm@taobao.com>
Closes#170 from BlackNiuza/duration_problem and squashes the following commits:
a86d261 [shiyun.wxm] tow space indent
c0d7b24 [shiyun.wxm] change the style
3b072e1 [shiyun.wxm] fix scala style
f20701e [shiyun.wxm] bugfix: "Duration" in "Active Stages" in stages page
Author: witgo <witgo@qq.com>
Closes#150 from witgo/SPARK-1256 and squashes the following commits:
08044a2 [witgo] Merge branch 'master' of https://github.com/apache/spark into SPARK-1256
c99b030 [witgo] Fix SPARK-1256
https://spark-project.atlassian.net/browse/SPARK-1102
Create a saveAsNewAPIHadoopDataset method
By @mateiz: "Right now RDDs can only be saved as files using the new Hadoop API, not as "datasets" with no filename and just a JobConf. See http://codeforhire.com/2014/02/18/using-spark-with-mongodb/ for an example of how you have to give a bogus filename. For the old Hadoop API, we have saveAsHadoopDataset."
Author: CodingCat <zhunansjtu@gmail.com>
Closes#12 from CodingCat/SPARK-1102 and squashes the following commits:
6ba0c83 [CodingCat] add test cases for saveAsHadoopDataSet (new&old API)
a8d11ba [CodingCat] style fix.........
95a6929 [CodingCat] code clean
7643c88 [CodingCat] change the parameter type back to Configuration
a8583ee [CodingCat] Create a saveAsNewAPIHadoopDataset method
This reverts commit ca4bf8c572.
Jetty 9 requires JDK7 which is probably not a dependency we want to bump right now. Before Spark 1.0 we should consider upgrading to Jetty 8. However, in the mean time to ease some pain let's revert this. Sorry for not catching this during the initial review. cc/ @rxin
Author: Patrick Wendell <pwendell@gmail.com>
Closes#167 from pwendell/jetty-revert and squashes the following commits:
811b1c5 [Patrick Wendell] Revert "SPARK-1236 - Upgrade Jetty to 9.1.3.v20140225."
Here's the addition of min and max to statscounter.py and min and max methods to rdd.py.
Author: Dan McClary <dan.mcclary@gmail.com>
Closes#144 from dwmclary/SPARK-1246-add-min-max-to-stat-counter and squashes the following commits:
fd3fd4b [Dan McClary] fixed error, updated test
82cde0e [Dan McClary] flipped incorrectly assigned inf values in StatCounter
5d96799 [Dan McClary] added max and min to StatCounter repr for pyspark
21dd366 [Dan McClary] added max and min to StatCounter output, updated doc
1a97558 [Dan McClary] added max and min to StatCounter output, updated doc
a5c13b0 [Dan McClary] Added min and max to Scala and Java RDD, added min and max to StatCounter
ed67136 [Dan McClary] broke min/max out into separate transaction, added to rdd.py
1e7056d [Dan McClary] added underscore to getBucket
37a7dea [Dan McClary] cleaned up boundaries for histogram -- uses real min/max when buckets are derived
29981f2 [Dan McClary] fixed indentation on doctest comment
eaf89d9 [Dan McClary] added correct doctest for histogram
4916016 [Dan McClary] added histogram method, added max and min to statscounter
This is a very small change on top of @andrewor14's patch in #147.
Author: Patrick Wendell <pwendell@gmail.com>
Author: Andrew Or <andrewor14@gmail.com>
Closes#152 from pwendell/akka-frame and squashes the following commits:
e5fb3ff [Patrick Wendell] Reversing test order
393af4c [Patrick Wendell] Small improvement suggested by Andrew Or
8045103 [Patrick Wendell] Breaking out into two tests
2b4e085 [Patrick Wendell] Consolidate Executor use of akka frame size
c9b6109 [Andrew Or] Simplify test + make access to akka frame size more modular
281d7c9 [Andrew Or] Throw exception on spark.akka.frameSize exceeded + Unit tests
https://spark-project.atlassian.net/browse/SPARK-1240
It seems that the current implementation does not handle the empty RDD case when run takeSample
In this patch, before calling sample() inside takeSample API, I add a checker for this case and returns an empty Array when it's a empty RDD; also in sample(), I add a checker for the invalid fraction value
In the test case, I also add several lines for this case
Author: CodingCat <zhunansjtu@gmail.com>
Closes#135 from CodingCat/SPARK-1240 and squashes the following commits:
fef57d4 [CodingCat] fix the same problem in PySpark
36db06b [CodingCat] create new test cases for takeSample from an empty red
810948d [CodingCat] further fix
a40e8fb [CodingCat] replace if with require
ad483fd [CodingCat] handle the case with empty RDD when take sample
This is more general than simply passing a string name and leaves more room for performance optimizations.
Note that this is technically an API breaking change in the following two ways:
1. The shuffle serializer specification in ShuffleDependency now require an object instead of a String (of the class name), but I suspect nobody else in this world has used this API other than me in GraphX and Shark.
2. Serializer's in Spark from now on are required to be serializable.
Author: Reynold Xin <rxin@apache.org>
Closes#149 from rxin/serializer and squashes the following commits:
5acaccd [Reynold Xin] Properly call serializer's constructors.
2a8d75a [Reynold Xin] Added more documentation for the serializer option in ShuffleDependency.
7420185 [Reynold Xin] Allow user to pass Serializer object instead of class name for shuffle.
Author: Michael Armbrust <michael@databricks.com>
Closes#141 from marmbrus/mutablePair and squashes the following commits:
f5c4783 [Michael Armbrust] Change function name to update
8bfd973 [Michael Armbrust] Fix serialization of MutablePair. Also provide an interface for easy updating.
Author: Reynold Xin <rxin@apache.org>
Closes#113 from rxin/jetty9 and squashes the following commits:
867a2ce [Reynold Xin] Updated Jetty version to 9.1.3.v20140225 in Maven build file.
d7c97ca [Reynold Xin] Return the correctly bound port.
d14706f [Reynold Xin] Upgrade Jetty to 9.1.3.v20140225.
Author: Patrick Wendell <pwendell@gmail.com>
Closes#112 from pwendell/pyspark-take and squashes the following commits:
daae80e [Patrick Wendell] SPARK-1019: pyspark RDD take() throws an NPE
Author: CodingCat <zhunansjtu@gmail.com>
Closes#133 from CodingCat/SPARK-1160-2 and squashes the following commits:
6607155 [CodingCat] hot fix for PR105 - change to Java annotation
https://spark-project.atlassian.net/browse/SPARK-1160
reported by @mateiz: "It's redundant with collect() and the name doesn't make sense in Java, where we return a List (we can't return an array due to the way Java generics work). It's also missing in Python."
In this patch, I deprecated the method and changed the source files using it by replacing toArray with collect() directly
Author: CodingCat <zhunansjtu@gmail.com>
Closes#105 from CodingCat/SPARK-1060 and squashes the following commits:
286f163 [CodingCat] deprecate in JavaRDDLike
ee17b4e [CodingCat] add message and since
2ff7319 [CodingCat] deprecate toArray in RDD
Author: liguoqiang <liguoqiang@rd.tuan800.com>
Closes#44 from witgo/SPARK-1149 and squashes the following commits:
3dcdcaf [liguoqiang] Merge branch 'master' into SPARK-1149
8425395 [liguoqiang] Merge remote-tracking branch 'upstream/master' into SPARK-1149
3dad595 [liguoqiang] review comment
e3e56aa [liguoqiang] Merge branch 'master' into SPARK-1149
b0d5c07 [liguoqiang] review comment
d0a6005 [liguoqiang] review comment
3395ee7 [liguoqiang] Merge remote-tracking branch 'upstream/master' into SPARK-1149
ac006a3 [liguoqiang] code Formatting
3feb3a8 [liguoqiang] Merge branch 'master' into SPARK-1149
adc443e [liguoqiang] partitions check bugfix
928e1e3 [liguoqiang] Added a unit test for PairRDDFunctions.lookup with bad partitioner
db6ecc5 [liguoqiang] Merge branch 'master' into SPARK-1149
1e3331e [liguoqiang] Merge branch 'master' into SPARK-1149
3348619 [liguoqiang] Optimize performance for partitions check
61e5a87 [liguoqiang] Merge branch 'master' into SPARK-1149
e68210a [liguoqiang] add partition index check to submitJob
3a65903 [liguoqiang] make the code more readable
6bb725e [liguoqiang] fix #SPARK-1149 Bad partitioners can cause Spark to hang
This patch removes Ganglia integration from the default build. It
allows users willing to link against LGPL code to use Ganglia
by adding build flags or linking against a new Spark artifact called
spark-ganglia-lgpl.
This brings Spark in line with the Apache policy on LGPL code
enumerated here:
https://www.apache.org/legal/3party.html#options-optional
Author: Patrick Wendell <pwendell@gmail.com>
Closes#108 from pwendell/ganglia and squashes the following commits:
326712a [Patrick Wendell] Responding to review feedback
5f28ee4 [Patrick Wendell] SPARK-1167: Remove metrics-ganglia from default build due to LGPL issues.
This patch removes the `generator` field and simplifies + documents
the tracking of callsites.
There are two places where we care about call sites, when a job is
run and when an RDD is created. This patch retains both of those
features but does a slight refactoring and renaming to make things
less confusing.
There was another feature of an rdd called the `generator` which was
by default the user class that in which the RDD was created. This is
used exclusively in the JobLogger. It been subsumed by the ability
to name a job group. The job logger can later be refectored to
read the job group directly (will require some work) but for now
this just preserves the default logged value of the user class.
I'm not sure any users ever used the ability to override this.
Author: Patrick Wendell <pwendell@gmail.com>
Closes#106 from pwendell/callsite and squashes the following commits:
fc1d009 [Patrick Wendell] Compile fix
e17fb76 [Patrick Wendell] Review feedback: callSite -> creationSite
62e77ef [Patrick Wendell] Review feedback
576e60b [Patrick Wendell] SPARK-1205: Clean up callSite/origin/generator.
This makes two changes.
1) Spark uses the shaded version of asm that is (conveniently) published
with Kryo.
2) Existing exclude rules around asm are updated to reflect the new groupId
of `org.ow2.asm`. This made all of the old rules not work with newer Hadoop
versions that pull in new asm versions.
Author: Patrick Wendell <pwendell@gmail.com>
Closes#100 from pwendell/asm and squashes the following commits:
9235f3f [Patrick Wendell] SPARK-782 Clean up for ASM dependency.
Currently, when fetch a file, the connection's connect timeout
and read timeout is based on the default jvm setting, in this change, I change it to
use spark.worker.timeout. This can be usefull, when the
connection status between worker is not perfect. And prevent
prematurely remove task set.
Author: Jiacheng Guo <guojc03@gmail.com>
Closes#98 from guojc/master and squashes the following commits:
abfe698 [Jiacheng Guo] add space according request
2a37c34 [Jiacheng Guo] Add timeout for fetch file Currently, when fetch a file, the connection's connect timeout and read timeout is based on the default jvm setting, in this change, I change it to use spark.worker.timeout. This can be usefull, when the connection status between worker is not perfect. And prevent prematurely remove task set.
(Continued from old repo, prior discussion at https://github.com/apache/incubator-spark/pull/615)
This patch cements our deprecation of the SPARK_MEM environment variable by replacing it with three more specialized variables:
SPARK_DAEMON_MEMORY, SPARK_EXECUTOR_MEMORY, and SPARK_DRIVER_MEMORY
The creation of the latter two variables means that we can safely set driver/job memory without accidentally setting the executor memory. Neither is public.
SPARK_EXECUTOR_MEMORY is only used by the Mesos scheduler (and set within SparkContext). The proper way of configuring executor memory is through the "spark.executor.memory" property.
SPARK_DRIVER_MEMORY is the new way of specifying the amount of memory run by jobs launched by spark-class, without possibly affecting executor memory.
Other memory considerations:
- The repl's memory can be set through the "--drivermem" command-line option, which really just sets SPARK_DRIVER_MEMORY.
- run-example doesn't use spark-class, so the only way to modify examples' memory is actually an unusual use of SPARK_JAVA_OPTS (which is normally overriden in all cases by spark-class).
This patch also fixes a lurking bug where spark-shell misused spark-class (the first argument is supposed to be the main class name, not java options), as well as a bug in the Windows spark-class2.cmd. I have not yet tested this patch on either Windows or Mesos, however.
Author: Aaron Davidson <aaron@databricks.com>
Closes#99 from aarondav/sparkmem and squashes the following commits:
9df4c68 [Aaron Davidson] SPARK-929: Fully deprecate usage of SPARK_MEM
Author: Patrick Wendell <pwendell@gmail.com>
Closes#107 from pwendell/logging and squashes the following commits:
be21c11 [Patrick Wendell] Logging fix
SPARK-1194: https://spark-project.atlassian.net/browse/SPARK-1194
In the current implementation, when selecting candidate blocks to be swapped out, once we find a block from the same RDD that the block to be stored belongs to, cache eviction fails and aborts.
In this PR, we keep selecting blocks *not* from the RDD that the block to be stored belongs to until either enough free space can be ensured (cache eviction succeeds) or all such blocks are checked (cache eviction fails).
Author: Cheng Lian <lian.cs.zju@gmail.com>
Closes#96 from liancheng/fix-spark-1194 and squashes the following commits:
2524ab9 [Cheng Lian] Added regression test case for SPARK-1194
6e40c22 [Cheng Lian] Remove redundant comments
40cdcb2 [Cheng Lian] Bug fix, and addressed PR comments from @mridulm
62c92ac [Cheng Lian] Fixed SPARK-1194 https://spark-project.atlassian.net/browse/SPARK-1194
Author: Prashant Sharma <prashant.s@imaginea.com>
Author: Prashant Sharma <scrapcodes@gmail.com>
Closes#80 from ScrapCodes/SPARK-1165/RDD.intersection and squashes the following commits:
9b015e9 [Prashant Sharma] Added a note, shuffle is required for intersection.
1fea813 [Prashant Sharma] correct the lines wrapping
d0c71f3 [Prashant Sharma] SPARK-1165 RDD.intersection in java
d6effee [Prashant Sharma] SPARK-1165 Implemented RDD.intersection in python.
Hadoop uses the config mapreduce.map.input.file to indicate the input filename to the map when the input split is of type FileSplit. Some of the hadoop input and output formats set or use this config. This config can also be used by user code.
PipedRDD runs an external process and the configs aren't available to that process. Hadoop Streaming does something very similar and the way they make configs available is exporting them into the environment replacing '.' with '_'. Spark should also export this variable when launching the pipe command so the user code has access to that config.
Note that the config mapreduce.map.input.file is the new one, the old one which is deprecated but not yet removed is map.input.file. So we should handle both.
Perhaps it would be better to abstract this out somehow so it goes into the HadoopParition code?
Author: Thomas Graves <tgraves@apache.org>
Closes#94 from tgravescs/map_input_file and squashes the following commits:
cc97a6a [Thomas Graves] Update test to check for existence of command, add a getPipeEnvVars function to HadoopRDD
e3401dc [Thomas Graves] Merge remote-tracking branch 'upstream/master' into map_input_file
2ba805e [Thomas Graves] set map_input_file environment variable in PipedRDD
This patch allows the FaultToleranceTest to work in newer versions of Docker.
See https://spark-project.atlassian.net/browse/SPARK-1136 for more details.
Besides changing the Docker and FaultToleranceTest internals, this patch also changes the behavior of Master to accept new Workers which share an address with a Worker that we are currently trying to recover. This can only happen when the Worker itself was restarted and got the same IP address/port at the same time as a Master recovery occurs.
Finally, this adds a good bit of ASCII art to the test to make failures, successes, and actions more apparent. This is very much needed.
Author: Aaron Davidson <aaron@databricks.com>
Closes#5 from aarondav/zookeeper and squashes the following commits:
5d7a72a [Aaron Davidson] SPARK-1136: Fix FaultToleranceTest for Docker 0.8.1
This patch changes "yarn-standalone" to "yarn-cluster" (but still supports the former). It also cleans up the Running on YARN docs and adds a section on how to view logs.
Author: Sandy Ryza <sandy@cloudera.com>
Closes#95 from sryza/sandy-spark-1197 and squashes the following commits:
563ef3a [Sandy Ryza] Review feedback
6ad06d4 [Sandy Ryza] Change yarn-standalone to yarn-cluster and fix up running on YARN docs
resubmit pull request. was https://github.com/apache/incubator-spark/pull/332.
Author: Thomas Graves <tgraves@apache.org>
Closes#33 from tgravescs/security-branch-0.9-with-client-rebase and squashes the following commits:
dfe3918 [Thomas Graves] Fix merge conflict since startUserClass now using runAsUser
05eebed [Thomas Graves] Fix dependency lost in upmerge
d1040ec [Thomas Graves] Fix up various imports
05ff5e0 [Thomas Graves] Fix up imports after upmerging to master
ac046b3 [Thomas Graves] Merge remote-tracking branch 'upstream/master' into security-branch-0.9-with-client-rebase
13733e1 [Thomas Graves] Pass securityManager and SparkConf around where we can. Switch to use sparkConf for reading config whereever possible. Added ConnectionManagerSuite unit tests.
4a57acc [Thomas Graves] Change UI createHandler routines to createServlet since they now return servlets
2f77147 [Thomas Graves] Rework from comments
50dd9f2 [Thomas Graves] fix header in SecurityManager
ecbfb65 [Thomas Graves] Fix spacing and formatting
b514bec [Thomas Graves] Fix reference to config
ed3d1c1 [Thomas Graves] Add security.md
6f7ddf3 [Thomas Graves] Convert SaslClient and SaslServer to scala, change spark.authenticate.ui to spark.ui.acls.enable, and fix up various other things from review comments
2d9e23e [Thomas Graves] Merge remote-tracking branch 'upstream/master' into security-branch-0.9-with-client-rebase_rework
5721c5a [Thomas Graves] update AkkaUtilsSuite test for the actorSelection changes, fix typos based on comments, and remove extra lines I missed in rebase from AkkaUtils
f351763 [Thomas Graves] Add Security to Spark - Akka, Http, ConnectionManager, UI to use servlets
This is a port of a pull request original targeted at incubator-spark: https://github.com/apache/incubator-spark/pull/180
Essentially if a user returns a generative iterator (from a flatMap operation), when trying to persist the data, Spark would first unroll the iterator into an ArrayBuffer, and then try to figure out if it could store the data. In cases where the user provided an iterator that generated more data then available memory, this would case a crash. With this patch, if the user requests a persist with a 'StorageLevel.DISK_ONLY', the iterator will be unrolled as it is inputed into the serializer.
To do this, two changes where made:
1) The type of the 'values' argument in the putValues method of the BlockStore interface was changed from ArrayBuffer to Iterator (and all code interfacing with this method was modified to connect correctly.
2) The JavaSerializer now calls the ObjectOutputStream 'reset' method every 1000 objects. This was done because the ObjectOutputStream caches objects (thus preventing them from being GC'd) to write more compact serialization. If reset is never called, eventually the memory fills up, if it is called too often then the serialization streams become much larger because of redundant class descriptions.
Author: Kyle Ellrott <kellrott@gmail.com>
Closes#50 from kellrott/iterator-to-disk and squashes the following commits:
9ef7cb8 [Kyle Ellrott] Fixing formatting issues.
60e0c57 [Kyle Ellrott] Fixing issues (formatting, variable names, etc.) from review comments
8aa31cd [Kyle Ellrott] Merge ../incubator-spark into iterator-to-disk
33ac390 [Kyle Ellrott] Merge branch 'iterator-to-disk' of github.com:kellrott/incubator-spark into iterator-to-disk
2f684ea [Kyle Ellrott] Refactoring the BlockManager to replace the Either[Either[A,B]] usage. Now using trait 'Values'. Also modified BlockStore.putBytes call to return PutResult, so that it behaves like putValues.
f70d069 [Kyle Ellrott] Adding docs for spark.serializer.objectStreamReset configuration
7ccc74b [Kyle Ellrott] Moving the 'LargeIteratorSuite' to simply test persistance of iterators. It doesn't try to invoke a OOM error any more
16a4cea [Kyle Ellrott] Streamlined the LargeIteratorSuite unit test. It should now run in ~25 seconds. Confirmed that it still crashes an unpatched copy of Spark.
c2fb430 [Kyle Ellrott] Removing more un-needed array-buffer to iterator conversions
627a8b7 [Kyle Ellrott] Wrapping a few long lines
0f28ec7 [Kyle Ellrott] Adding second putValues to BlockStore interface that accepts an ArrayBuffer (rather then an Iterator). This will allow BlockStores to have slightly different behaviors dependent on whether they get an Iterator or ArrayBuffer. In the case of the MemoryStore, it needs to duplicate and cache an Iterator into an ArrayBuffer, but if handed a ArrayBuffer, it can skip the duplication.
656c33e [Kyle Ellrott] Fixing the JavaSerializer to read from the SparkConf rather then the System property.
8644ee8 [Kyle Ellrott] Merge branch 'master' into iterator-to-disk
00c98e0 [Kyle Ellrott] Making the Java ObjectStreamSerializer reset rate configurable by the system variable 'spark.serializer.objectStreamReset', default is not 10000.
40fe1d7 [Kyle Ellrott] Removing rouge space
31fe08e [Kyle Ellrott] Removing un-needed semi-colons
9df0276 [Kyle Ellrott] Added check to make sure that streamed-to-dist RDD actually returns good data in the LargeIteratorSuite
a6424ba [Kyle Ellrott] Wrapping long line
2eeda75 [Kyle Ellrott] Fixing dumb mistake ("||" instead of "&&")
0e6f808 [Kyle Ellrott] Deleting temp output directory when done
95c7f67 [Kyle Ellrott] Simplifying StorageLevel checks
56f71cd [Kyle Ellrott] Merge branch 'master' into iterator-to-disk
44ec35a [Kyle Ellrott] Adding some comments.
5eb2b7e [Kyle Ellrott] Changing the JavaSerializer reset to occur every 1000 objects.
f403826 [Kyle Ellrott] Merge branch 'master' into iterator-to-disk
81d670c [Kyle Ellrott] Adding unit test for straight to disk iterator methods.
d32992f [Kyle Ellrott] Merge remote-tracking branch 'origin/master' into iterator-to-disk
cac1fad [Kyle Ellrott] Fixing MemoryStore, so that it converts incoming iterators to ArrayBuffer objects. This was previously done higher up the stack.
efe1102 [Kyle Ellrott] Changing CacheManager and BlockManager to pass iterators directly to the serializer when a 'DISK_ONLY' persist is called. This is in response to SPARK-942.
https://spark-project.atlassian.net/browse/SPARK-1171
When the executor is removed, the current implementation will only minus the freeCores of that executor. Actually we should minus the totalCores...
Author: CodingCat <zhunansjtu@gmail.com>
Author: Nan Zhu <CodingCat@users.noreply.github.com>
Closes#63 from CodingCat/simplify_CoarseGrainedSchedulerBackend and squashes the following commits:
f6bf93f [Nan Zhu] code clean
19c2bb4 [CodingCat] use copy idiom to reconstruct the workerOffers
43c13e9 [CodingCat] keep WorkerOffer immutable
af470d3 [CodingCat] style fix
0c0e409 [CodingCat] simplify the implementation of CoarseGrainedSchedulerBackend
Author: Prashant Sharma <prashant.s@imaginea.com>
Closes#72 from ScrapCodes/SPARK-1164/deprecate-reducebykeytodriver and squashes the following commits:
ee521cd [Prashant Sharma] SPARK-1164 Deprecated reduceByKeyToDriver as it is an alias for reduceByKeyLocally
Author: Prashant Sharma <prashant.s@imaginea.com>
Author: Patrick Wendell <pwendell@gmail.com>
Closes#17 from ScrapCodes/java8-lambdas and squashes the following commits:
95850e6 [Patrick Wendell] Some doc improvements and build changes to the Java 8 patch.
85a954e [Prashant Sharma] Nit. import orderings.
673f7ac [Prashant Sharma] Added support for -java-home as well
80a13e8 [Prashant Sharma] Used fake class tag syntax
26eb3f6 [Prashant Sharma] Patrick's comments on PR.
35d8d79 [Prashant Sharma] Specified java 8 building in the docs
31d4cd6 [Prashant Sharma] Maven build to support -Pjava8-tests flag.
4ab87d3 [Prashant Sharma] Review feedback on the pr
c33dc2c [Prashant Sharma] SPARK-964, Java 8 API Support.
This method appears to be broken -- since it never removes
anything from messages, and it adds new messages to it,
the while loop is an infinite loop. The method also does not appear
to have ever been used since the code was added in 2012, so
this commit removes it.
cc @mateiz who originally added this method in case there's a reason it should be here! (63051dd2bc)
Author: Kay Ousterhout <kayousterhout@gmail.com>
Closes#69 from kayousterhout/remove_get_fifo and squashes the following commits:
053bc59 [Kay Ousterhout] Remove broken/unused Connection.getChunkFIFO method.
Lookup didn't have a unit test. Added two tests, one for with a partitioner, and one for without.
Author: Bryn Keller <bryn.keller@intel.com>
Closes#36 from xoltar/lookup and squashes the following commits:
3bc0d44 [Bryn Keller] Added a unit test for PairRDDFunctions.lookup
This metric is confusing: it adds up all of the time to fetch
shuffle inputs, but fetches often happen in parallel, so
remoteFetchTime can be much longer than the task execution time.
@squito it looks like you added this metric -- do you have a use case for it?
cc @shivaram -- I know you've looked at the shuffle performance a lot so chime in here if this metric has turned out to be useful for you!
Author: Kay Ousterhout <kayousterhout@gmail.com>
Closes#62 from kayousterhout/remove_fetch_variable and squashes the following commits:
43341eb [Kay Ousterhout] Remote the remoteFetchTime metric.
It looks like this comment was added a while ago by @mridulm as part of a merge and was accidentally checked in. We should remove it.
Author: Kay Ousterhout <kayousterhout@gmail.com>
Closes#61 from kayousterhout/remove_comment and squashes the following commits:
0b2b3f2 [Kay Ousterhout] Removed accidentally checked in comment
Previously, ZooKeeperPersistenceEngine would crash the whole Master process if
there was stored data from a prior Spark version. Now, we just delete these files.
Author: Aaron Davidson <aaron@databricks.com>
Closes#4 from aarondav/zookeeper2 and squashes the following commits:
fa8b40f [Aaron Davidson] SPARK-1137: Make ZK PersistenceEngine not crash for wrong serialVersionUID
Thanks for Diana Carroll to report this issue (https://spark-project.atlassian.net/browse/SPARK-1100)
the current saveAsTextFile/SequenceFile will overwrite the output directory silently if the directory already exists, this behaviour is not desirable because
overwriting the data silently is not user-friendly
if the partition number of two writing operation changed, then the output directory will contain the results generated by two runnings
My fix includes:
add some new APIs with a flag for users to define whether he/she wants to overwrite the directory:
if the flag is set to true, then the output directory is deleted first and then written into the new data to prevent the output directory contains results from multiple rounds of running;
if the flag is set to false, Spark will throw an exception if the output directory already exists
changed JavaAPI part
default behaviour is overwriting
Two questions
should we deprecate the old APIs without such a flag?
I noticed that Spark Streaming also called these APIs, I thought we don't need to change the related part in streaming? @tdas
Author: CodingCat <zhunansjtu@gmail.com>
Closes#11 from CodingCat/SPARK-1100 and squashes the following commits:
6a4e3a3 [CodingCat] code clean
ef2d43f [CodingCat] add new test cases and code clean
ac63136 [CodingCat] checkOutputSpecs not applicable to FSOutputFormat
ec490e8 [CodingCat] prevent Spark from overwriting directory silently and leaving dirty directory
https://spark-project.atlassian.net/browse/SPARK-1150
fix the repo location in create_release script
Author: Mark Grover <mark@apache.org>
Closes#48 from CodingCat/script_fixes and squashes the following commits:
01f4bf7 [Mark Grover] Fixing some nitpicks
d2244d4 [Mark Grover] SPARK-676: Abbreviation in SPARK_MEM but not in SPARK_WORKER_MEMORY
This commit randomizes the order of resource offers to avoid scheduling
all tasks on the same small set of machines.
This is a much simpler solution to SPARK-979 than #7.
Author: Kay Ousterhout <kayousterhout@gmail.com>
Closes#27 from kayousterhout/randomize and squashes the following commits:
435d817 [Kay Ousterhout] [SPARK-979] Randomize order of offers.
This reopens https://github.com/apache/incubator-spark/pull/538 against the new repo
Author: Sandy Ryza <sandy@cloudera.com>
Closes#29 from sryza/sandy-spark-1051 and squashes the following commits:
708ce49 [Sandy Ryza] SPARK-1051. doAs submitting user in YARN
This trait seems to have been created a while ago when there
were multiple implementations; now that there's just one, I think it
makes sense to merge it into the BlockFetcherIterator trait.
Author: Kay Ousterhout <kayousterhout@gmail.com>
Closes#39 from kayousterhout/remove_tracker and squashes the following commits:
8173939 [Kay Ousterhout] Remote BlockFetchTracker.
(Ported from https://github.com/apache/incubator-spark/pull/637 )
Author: Sean Owen <sowen@cloudera.com>
Closes#31 from srowen/SPARK-1084.1 and squashes the following commits:
6c4a32c [Sean Owen] Suppress warnings about legitimate unchecked array creations, or change code to avoid it
f35b833 [Sean Owen] Fix two misc javadoc problems
254e8ef [Sean Owen] Fix one new style error introduced in scaladoc warning commit
5b2fce2 [Sean Owen] Fix scaladoc invocation warning, and enable javac warnings properly, with plugin config updates
007762b [Sean Owen] Remove dead scaladoc links
b8ff8cb [Sean Owen] Replace deprecated Ant <tasks> with <target>
For standalone HA mode, A status is useful to identify the current master, already in json format too.
Author: Raymond Liu <raymond.liu@intel.com>
Closes#24 from colorant/status and squashes the following commits:
df630b3 [Raymond Liu] Show Master status on UI page
If the seed is zero, XORShift generates all zeros, which would create unexpected result.
JIRA: https://spark-project.atlassian.net/browse/SPARK-1129
Author: Xiangrui Meng <meng@databricks.com>
Closes#645 from mengxr/xor and squashes the following commits:
1b086ab [Xiangrui Meng] use MurmurHash3 to set seed in XORShiftRandom
45c6f16 [Xiangrui Meng] minor style change
51f4050 [Xiangrui Meng] use a predefined seed when seed is zero in XORShiftRandom
ClusterScheduler was renamed to TaskSchedulerImpl; this commit
updates comments and tests accordingly.
Author: Kay Ousterhout <kayousterhout@gmail.com>
Closes#9 from kayousterhout/cluster_scheduler_death and squashes the following commits:
d6fd119 [Kay Ousterhout] Remove references to ClusterScheduler.
PR [402](https://github.com/apache/incubator-spark/pull/402) from incubator repo.
Author: Prashant Sharma <prashant.s@imaginea.com>
Closes#19 from ScrapCodes/java-api-completeness and squashes the following commits:
11d0c2b [Prashant Sharma] Integer -> java.lang.Integer
737819a [Prashant Sharma] SPARK-1095 add explicit return types to APIs.
3ddc8bb [Prashant Sharma] Deprected *With functions in scala and added a few missing Java APIs
The aim of the Json4s project is to provide a common API for
Scala JSON libraries. It is Apache-licensed, easier for
downstream distributions to package, and mostly API-compatible
with lift-json. Furthermore, the Jackson-backed implementation
parses faster than lift-json on all but the smallest inputs.
Author: William Benton <willb@redhat.com>
Closes#582 from willb/json4s and squashes the following commits:
7ca62c4 [William Benton] Replace lift-json with json4s-jackson.
[SPARK-1108] This allows us to use, e.g. HBase's TableOutputFormat with PairRDDFunctions.saveAsNewAPIHadoopFile, which otherwise would throw NullPointerException because the output table name hasn't been configured.
Note this bug also affects branch-0.9
Author: Bryn Keller <bryn.keller@intel.com>
Closes#638 from xoltar/SPARK-1108 and squashes the following commits:
7e94e7d [Bryn Keller] Import, comment, and format cleanup per code review
7cbcaa1 [Bryn Keller] For outputformats that are Configurable, call setConf before sending data to them. This allows us to use, e.g. HBase TableOutputFormat, which otherwise would throw NullPointerException because the output table name hasn't been configured
In the previous code, if you had a failing map stage and then tried to
run reduce stages on it repeatedly, the first reduce stage would fail
correctly, but the later ones would mistakenly believe that all map
outputs are available and start failing infinitely with fetch failures
from "null".
These classes can't be migrated:
StorageLevels: impossible to create static fields in Scala
JavaSparkContextVarargsWorkaround: incompatible varargs
JavaAPISuite: should test Java APIs in pure Java (for sanity)
Author: Punya Biswal <pbiswal@palantir.com>
Closes#605 from punya/move-java-sources and squashes the following commits:
25b00b2 [Punya Biswal] Remove redundant type param; reformat
853da46 [Punya Biswal] Use factory method rather than constructor
e5d53d9 [Punya Biswal] Migrate Java code to Scala or move it to src/main/java
The current doc hints spark doesn't support accumulators of type `Long`, which is wrong.
JIRA: https://spark-project.atlassian.net/browse/SPARK-1117
Author: Xiangrui Meng <meng@databricks.com>
Closes#631 from mengxr/acc and squashes the following commits:
45ecd25 [Xiangrui Meng] update accumulator docs
The original poster of this bug is @guojc, who opened a PR that preceded this one at https://github.com/apache/incubator-spark/pull/612.
ExternalAppendOnlyMap uses key hash code to order the buffer streams from which spilled files are read back into memory. When a buffer stream is empty, the default hash code for that stream is equal to Int.MaxValue. This is, however, a perfectly legitimate candidate for a key hash code. When reading from a spilled map containing such a key, a hash collision may occur, in which case we attempt to read from an empty stream and throw NoSuchElementException.
The fix is to maintain the invariant that empty buffer streams are never added back to the merge queue to be considered. This guarantees that we never read from an empty buffer stream, ever again.
This PR also includes two new tests for hash collisions.
Author: Andrew Or <andrewor14@gmail.com>
Closes#624 from andrewor14/spilling-bug and squashes the following commits:
9e7263d [Andrew Or] Slightly optimize next()
2037ae2 [Andrew Or] Move a few comments around...
cf95942 [Andrew Or] Remove default value of Int.MaxValue for minKeyHash
c11f03b [Andrew Or] Fix Int.MaxValue hash collision bug in ExternalAppendOnlyMap
21c1a39 [Andrew Or] Add hash collision tests to ExternalAppendOnlyMapSuite
Fixes an error where HDFS URL's cause an exception. Should be merged into master and 0.9.
Author: Patrick Wendell <pwendell@gmail.com>
Closes#625 from pwendell/url-validation and squashes the following commits:
d14bfe3 [Patrick Wendell] SPARK-1111: URL Validation Throws Error for HDFS URL's
We changed the behavior in 0.9.0 from requiring that mergeCombiners be null when mapSideCombine was false to requiring that mergeCombiners *never* be null, for external sorting. This patch adds a require() to make this behavior change explicitly messaged rather than resulting in a NPE.
Author: Aaron Davidson <aaron@databricks.com>
Closes#623 from aarondav/master and squashes the following commits:
520b80c [Aaron Davidson] Super minor: Add require for mergeCombiners in combineByKey
Optimized imports and arranged according to scala style guide @
https://cwiki.apache.org/confluence/display/SPARK/Spark+Code+Style+Guide#SparkCodeStyleGuide-Imports
Author: NirmalReddy <nirmal.reddy@imaginea.com>
Author: NirmalReddy <nirmal_reddy2000@yahoo.com>
Closes#613 from NirmalReddy/opt-imports and squashes the following commits:
578b4f5 [NirmalReddy] imported java.lang.Double as JDouble
a2cbcc5 [NirmalReddy] addressed the comments
776d664 [NirmalReddy] Optimized imports in core
Our usage of fake ClassTags in this manner is probably not healthy, but I'm not sure if there's a better solution available, so I just cleaned up and documented the current one.
Author: Aaron Davidson <aaron@databricks.com>
Closes#604 from aarondav/master and squashes the following commits:
b398e89 [Aaron Davidson] SPARK-1098: Minor cleanup of ClassTag usage in Java API
Author: Andrew Ash <andrew@andrewash.com>
Closes#608 from ash211/patch-7 and squashes the following commits:
bd85f2a [Andrew Ash] Worker registration logging fix
Author: Punya Biswal <pbiswal@palantir.com>
Closes#600 from punya/subtractByKey-java and squashes the following commits:
e961913 [Punya Biswal] Hide implicit ClassTags from Java API
c5d317b [Punya Biswal] Add subtractByKey to the JavaPairRDD wrapper
https://spark-project.atlassian.net/browse/SPARK-1092?jql=project%20%3D%20SPARK
print warning information if user set SPARK_MEM to regulate memory usage of executors
----
OUTDATED:
Currently, users will usually set SPARK_MEM to control the memory usage of driver programs, (in spark-class)
91 JAVA_OPTS="$OUR_JAVA_OPTS"
92 JAVA_OPTS="$JAVA_OPTS -Djava.library.path=$SPARK_LIBRARY_PATH"
93 JAVA_OPTS="$JAVA_OPTS -Xms$SPARK_MEM -Xmx$SPARK_MEM"
if they didn't set spark.executor.memory, the value in this environment variable will also affect the memory usage of executors, because the following lines in SparkContext
privatespark val executorMemory = conf.getOption("spark.executor.memory")
.orElse(Option(System.getenv("SPARK_MEM")))
.map(Utils.memoryStringToMb)
.getOrElse(512)
also
since SPARK_MEM has been (proposed to) deprecated in SPARK-929 (https://spark-project.atlassian.net/browse/SPARK-929) and the corresponding PR (https://github.com/apache/incubator-spark/pull/104)
we should remove this line
Author: CodingCat <zhunansjtu@gmail.com>
Closes#602 from CodingCat/clean_spark_mem and squashes the following commits:
302bb28 [CodingCat] print warning information if user use SPARK_MEM to regulate executor memory usage
SPARK-1076: [Fix#578] add @transient to some vals
I'll try to be more careful next time.
Author: Xiangrui Meng <meng@databricks.com>
Closes#591 and squashes the following commits:
2b4f044 [Xiangrui Meng] add @transient to prev in ZippedWithIndexRDD add @transient to seed in PartitionwiseSampledRDD
SPARK-1076: Convert Int to Long to avoid overflow
Patch for PR #578.
Author: Xiangrui Meng <meng@databricks.com>
Closes#589 and squashes the following commits:
98c435e [Xiangrui Meng] cast Int to Long to avoid Int overflow
SPARK-1076: zipWithIndex and zipWithUniqueId to RDD
Assign ranks to an ordered or unordered data set is a common operation. This could be done by first counting records in each partition and then assign ranks in parallel.
The purpose of assigning ranks to an unordered set is usually to get a unique id for each item, e.g., to map feature names to feature indices. In such cases, the assignment could be done without counting records, saving one spark job.
https://spark-project.atlassian.net/browse/SPARK-1076
== update ==
Because assigning ranks is very similar to Scala's zipWithIndex, I changed the method name to zipWithIndex and put the index in the value field.
Author: Xiangrui Meng <meng@databricks.com>
Closes#578 and squashes the following commits:
52a05e1 [Xiangrui Meng] changed assignRanks to zipWithIndex changed assignUniqueIds to zipWithUniqueId minor updates
756881c [Xiangrui Meng] simplified RankedRDD by implementing assignUniqueIds separately moved couting iterator size to Utils do not count items in the last partition and skip counting if there is only one partition
630868c [Xiangrui Meng] newline
21b434b [Xiangrui Meng] add assignRanks and assignUniqueIds to RDD
Minor fix for ZooKeeperPersistenceEngine to use configured working dir
Author: Raymond Liu <raymond.liu@intel.com>
Closes#583 and squashes the following commits:
91b0609 [Raymond Liu] Minor fix for ZooKeeperPersistenceEngine to use configured working dir
SPARK-1072 Use binary search when needed in RangePartioner
Author: Holden Karau <holden@pigscanfly.ca>
Closes#571 and squashes the following commits:
f31a2e1 [Holden Karau] Swith to using CollectionsUtils in Partitioner
4c7a0c3 [Holden Karau] Add CollectionsUtil as suggested by aarondav
7099962 [Holden Karau] Add the binary search to only init once
1bef01d [Holden Karau] CR feedback
a21e097 [Holden Karau] Use binary search if we have more than 1000 elements inside of RangePartitioner
SPARK-1058, Fix Style Errors and Add Scala Style to Spark Build. Pt 2
Continuation of PR #557
With this all scala style errors are fixed across the code base !!
The reason for creating a separate PR was to not interrupt an already reviewed and ready to merge PR. Hope this gets reviewed soon and merged too.
Author: Prashant Sharma <prashant.s@imaginea.com>
Closes#567 and squashes the following commits:
3b1ec30 [Prashant Sharma] scala style fixes
[SPARK-1038] Add more fields in JsonProtocol and add tests that verify the JSON itself
This is a PR for SPARK-1038. Two major changes:
1 add some fields to JsonProtocol which is new and important to standalone-related data structures
2 Use Diff in liftweb.json to verity the stringified Json output for detecting someone mod type T to Option[T]
Author: qqsun8819 <jin.oyj@alibaba-inc.com>
Closes#551 and squashes the following commits:
fdf0b4e [qqsun8819] [SPARK-1038] 1. Change code style for more readable according to rxin review 2. change submitdate hard-coded string to a date object toString for more complexiblity
095a26f [qqsun8819] [SPARK-1038] mod according to review of pwendel, use hard-coded json string for json data validation. Each test use its own json string
0524e41 [qqsun8819] Merge remote-tracking branch 'upstream/master' into json-protocol
d203d5c [qqsun8819] [SPARK-1038] Add more fields in JsonProtocol and add tests that verify the JSON itself
[SPARK-1060] startJettyServer should explicitly use IP information
https://spark-project.atlassian.net/browse/SPARK-1060
In the current implementation, the webserver in Master/Worker is started with
val (srv, bPort) = JettyUtils.startJettyServer("0.0.0.0", port, handlers)
inside startJettyServer:
val server = new Server(currentPort) //here, the Server will take "0.0.0.0" as the hostname, i.e. will always bind to the IP address of the first NIC
this can cause wrong IP binding, e.g. if the host has two NICs, N1 and N2, the user specify the SPARK_LOCAL_IP as the N2's IP address, however, when starting the web server, for the reason stated above, it will always bind to the N1's address
Author: CodingCat <zhunansjtu@gmail.com>
== Merge branch commits ==
commit 6c6d9a8ccc9ec4590678a3b34cb03df19092029d
Author: CodingCat <zhunansjtu@gmail.com>
Date: Thu Feb 6 14:53:34 2014 -0500
startJettyServer should explicitly use IP information
[WIP] SPARK-1067: Default log4j initialization causes errors for those not using log4j
To fix this - we add a check when initializing log4j.
Author: Patrick Wendell <pwendell@gmail.com>
== Merge branch commits ==
commit ffdce513877f64b6eed6d36138c3e0003d392889
Author: Patrick Wendell <pwendell@gmail.com>
Date: Fri Feb 7 15:22:29 2014 -0800
Logging fix
Kill drivers in postStop() for Worker.
JIRA SPARK-1068:https://spark-project.atlassian.net/browse/SPARK-1068
Author: Qiuzhuang Lian <Qiuzhuang.Lian@gmail.com>
== Merge branch commits ==
commit 9c19ce63637eee9369edd235979288d3d9fc9105
Author: Qiuzhuang Lian <Qiuzhuang.Lian@gmail.com>
Date: Sat Feb 8 16:07:39 2014 +0800
Kill drivers in postStop() for Worker.
JIRA SPARK-1068:https://spark-project.atlassian.net/browse/SPARK-1068
External spilling - generalize batching logic
The existing implementation consists of a hack for Kryo specifically and only works for LZF compression. Introducing an intermediate batch-level stream takes care of pre-fetching and other arbitrary behavior of higher level streams in a more general way.
Author: Andrew Or <andrewor14@gmail.com>
== Merge branch commits ==
commit 3ddeb7ef89a0af2b685fb5d071aa0f71c975cc82
Author: Andrew Or <andrewor14@gmail.com>
Date: Wed Feb 5 12:09:32 2014 -0800
Also privatize fields
commit 090544a87a0767effd0c835a53952f72fc8d24f0
Author: Andrew Or <andrewor14@gmail.com>
Date: Wed Feb 5 10:58:23 2014 -0800
Privatize methods
commit 13920c918efe22e66a1760b14beceb17a61fd8cc
Author: Andrew Or <andrewor14@gmail.com>
Date: Tue Feb 4 16:34:15 2014 -0800
Update docs
commit bd5a1d7350467ed3dc19c2de9b2c9f531f0e6aa3
Author: Andrew Or <andrewor14@gmail.com>
Date: Tue Feb 4 13:44:24 2014 -0800
Typo: phyiscal -> physical
commit 287ef44e593ad72f7434b759be3170d9ee2723d2
Author: Andrew Or <andrewor14@gmail.com>
Date: Tue Feb 4 13:38:32 2014 -0800
Avoid reading the entire batch into memory; also simplify streaming logic
Additionally, address formatting comments.
commit 3df700509955f7074821e9aab1e74cb53c58b5a5
Merge: a531d2e 164489d
Author: Andrew Or <andrewor14@gmail.com>
Date: Mon Feb 3 18:27:49 2014 -0800
Merge branch 'master' of github.com:andrewor14/incubator-spark
commit a531d2e347acdcecf2d0ab72cd4f965ab5e145d8
Author: Andrew Or <andrewor14@gmail.com>
Date: Mon Feb 3 18:18:04 2014 -0800
Relax assumptions on compressors and serializers when batching
This commit introduces an intermediate layer of an input stream on the batch level.
This guards against interference from higher level streams (i.e. compression and
deserialization streams), especially pre-fetching, without specifically targeting
particular libraries (Kryo) and forcing shuffle spill compression to use LZF.
commit 164489d6f176bdecfa9dabec2dfce5504d1ee8af
Author: Andrew Or <andrewor14@gmail.com>
Date: Mon Feb 3 18:18:04 2014 -0800
Relax assumptions on compressors and serializers when batching
This commit introduces an intermediate layer of an input stream on the batch level.
This guards against interference from higher level streams (i.e. compression and
deserialization streams), especially pre-fetching, without specifically targeting
particular libraries (Kryo) and forcing shuffle spill compression to use LZF.
Only run ResubmitFailedStages event after a fetch fails
Previously, the ResubmitFailedStages event was called every
200 milliseconds, leading to a lot of unnecessary event processing
and clogged DAGScheduler logs.
Author: Kay Ousterhout <kayousterhout@gmail.com>
== Merge branch commits ==
commit e603784b3a562980e6f1863845097effe2129d3b
Author: Kay Ousterhout <kayousterhout@gmail.com>
Date: Wed Feb 5 11:34:41 2014 -0800
Re-add check for empty set of failed stages
commit d258f0ef50caff4bbb19fb95a6b82186db1935bf
Author: Kay Ousterhout <kayousterhout@gmail.com>
Date: Wed Jan 15 23:35:41 2014 -0800
Only run ResubmitFailedStages event after a fetch fails
Previously, the ResubmitFailedStages event was called every
200 milliseconds, leading to a lot of unnecessary event processing
and clogged DAGScheduler logs.
Inform DAG scheduler about all started/finished tasks.
Previously, the DAG scheduler was not always informed
when tasks started and finished. The simplest example here
is for speculated tasks: the DAGScheduler was only told about
the first attempt of a task, meaning that SparkListeners were
also not told about multiple task attempts, so users can't see
what's going on with speculation in the UI. The DAGScheduler
also wasn't always told about finished tasks, so in the UI, some
tasks will never be shown as finished (this occurs, for example,
if a task set gets killed).
The other problem is that the fairness accounting was wrong
-- the number of running tasks in a pool was decreased when a
task set was considered done, even if all of its tasks hadn't
yet finished.
Author: Kay Ousterhout <kayousterhout@gmail.com>
== Merge branch commits ==
commit c8d547d0f7a17f5a193bef05f5872b9f475675c5
Author: Kay Ousterhout <kayousterhout@gmail.com>
Date: Wed Jan 15 16:47:33 2014 -0800
Addressed Reynold's review comments.
Always use a TaskEndReason (remove the option), and explicitly
signal when we don't know the reason. Also, always tell
DAGScheduler (and associated listeners) about started tasks, even
when they're speculated.
commit 3fee1e2e3c06b975ff7f95d595448f38cce97a04
Author: Kay Ousterhout <kayousterhout@gmail.com>
Date: Wed Jan 8 22:58:13 2014 -0800
Fixed broken test and improved logging
commit ff12fcaa2567c5d02b75a1d5db35687225bcd46f
Author: Kay Ousterhout <kayousterhout@gmail.com>
Date: Sun Dec 29 21:08:20 2013 -0800
Inform DAG scheduler about all finished tasks.
Previously, the DAG scheduler was not always informed
when tasks finished. For example, when a task set was
aborted, the DAG scheduler was never told when the tasks
in that task set finished. The DAG scheduler was also
never told about the completion of speculated tasks.
This led to confusion with SparkListeners because information
about the completion of those tasks was never passed on to
the listeners (so in the UI, for example, some tasks will never
be shown as finished).
The other problem is that the fairness accounting was wrong
-- the number of running tasks in a pool was decreased when a
task set was considered done, even if all of its tasks hadn't
yet finished.
SPARK-1056. Fix header comment in Executor to not imply that it's only u...
...sed for Mesos and Standalone.
Author: Sandy Ryza <sandy@cloudera.com>
== Merge branch commits ==
commit 1f2443d902a26365a5c23e4af9077e1539ed2eab
Author: Sandy Ryza <sandy@cloudera.com>
Date: Thu Feb 6 15:03:50 2014 -0800
SPARK-1056. Fix header comment in Executor to not imply that it's only used for Mesos and Standalone
remove actorToWorker in master.scala, which is actually not used
actorToWorker is actually not used in the code....just remove it
Author: CodingCat <zhunansjtu@gmail.com>
== Merge branch commits ==
commit 52656c2d4bbf9abcd8bef65d454badb9cb14a32c
Author: CodingCat <zhunansjtu@gmail.com>
Date: Thu Feb 6 00:28:26 2014 -0500
remove actorToWorker in master.scala, which is actually not used
Fixed warnings in test compilation.
This commit fixes two problems: a redundant import, and a
deprecated function.
Author: Kay Ousterhout <kayousterhout@gmail.com>
== Merge branch commits ==
commit da9d2e13ee4102bc58888df0559c65cb26232a82
Author: Kay Ousterhout <kayousterhout@gmail.com>
Date: Wed Feb 5 11:41:51 2014 -0800
Fixed warnings in test compilation.
This commit fixes two problems: a redundant import, and a
deprecated function.
Refactor RDD sampling and add randomSplit to RDD (update)
Replace SampledRDD by PartitionwiseSampledRDD, which accepts a RandomSampler instance as input. The current sample with/without replacement can be easily integrated via BernoulliSampler and PoissonSampler. The benefits are:
1) RDD.randomSplit is implemented in the same way, related to https://github.com/apache/incubator-spark/pull/513
2) Stratified sampling and importance sampling can be implemented in the same manner as well.
Unit tests are included for samplers and RDD.randomSplit.
This should performance better than my previous request where the BernoulliSampler creates many Iterator instances:
https://github.com/apache/incubator-spark/pull/513
Author: Xiangrui Meng <meng@databricks.com>
== Merge branch commits ==
commit e8ce957e5f0a600f2dec057924f4a2ca6adba373
Author: Xiangrui Meng <meng@databricks.com>
Date: Mon Feb 3 12:21:08 2014 -0800
more docs to PartitionwiseSampledRDD
commit fbb4586d0478ff638b24bce95f75ff06f713d43b
Author: Xiangrui Meng <meng@databricks.com>
Date: Mon Feb 3 00:44:23 2014 -0800
move XORShiftRandom to util.random and use it in BernoulliSampler
commit 987456b0ee8612fd4f73cb8c40967112dc3c4c2d
Author: Xiangrui Meng <meng@databricks.com>
Date: Sat Feb 1 11:06:59 2014 -0800
relax assertions in SortingSuite because the RangePartitioner has large variance in this case
commit 3690aae416b2dc9b2f9ba32efa465ba7948477f4
Author: Xiangrui Meng <meng@databricks.com>
Date: Sat Feb 1 09:56:28 2014 -0800
test split ratio of RDD.randomSplit
commit 8a410bc933a60c4d63852606f8bbc812e416d6ae
Author: Xiangrui Meng <meng@databricks.com>
Date: Sat Feb 1 09:25:22 2014 -0800
add a test to ensure seed distribution and minor style update
commit ce7e866f674c30ab48a9ceb09da846d5362ab4b6
Author: Xiangrui Meng <meng@databricks.com>
Date: Fri Jan 31 18:06:22 2014 -0800
minor style change
commit 750912b4d77596ed807d361347bd2b7e3b9b7a74
Author: Xiangrui Meng <meng@databricks.com>
Date: Fri Jan 31 18:04:54 2014 -0800
fix some long lines
commit c446a25c38d81db02821f7f194b0ce5ab4ed7ff5
Author: Xiangrui Meng <meng@databricks.com>
Date: Fri Jan 31 17:59:59 2014 -0800
add complement to BernoulliSampler and minor style changes
commit dbe2bc2bd888a7bdccb127ee6595840274499403
Author: Xiangrui Meng <meng@databricks.com>
Date: Fri Jan 31 17:45:08 2014 -0800
switch to partition-wise sampling for better performance
commit a1fca5232308feb369339eac67864c787455bb23
Merge: ac712e4 cf6128f
Author: Xiangrui Meng <meng@databricks.com>
Date: Fri Jan 31 16:33:09 2014 -0800
Merge branch 'sample' of github.com:mengxr/incubator-spark into sample
commit cf6128fb672e8c589615adbd3eaa3cbdb72bd461
Author: Xiangrui Meng <meng@databricks.com>
Date: Sun Jan 26 14:40:07 2014 -0800
set SampledRDD deprecated in 1.0
commit f430f847c3df91a3894687c513f23f823f77c255
Author: Xiangrui Meng <meng@databricks.com>
Date: Sun Jan 26 14:38:59 2014 -0800
update code style
commit a8b5e2021a9204e318c80a44d00c5c495f1befb6
Author: Xiangrui Meng <meng@databricks.com>
Date: Sun Jan 26 12:56:27 2014 -0800
move package random to util.random
commit ab0fa2c4965033737a9e3a9bf0a59cbb0df6a6f5
Author: Xiangrui Meng <meng@databricks.com>
Date: Sun Jan 26 12:50:35 2014 -0800
add Apache headers and update code style
commit 985609fe1a55655ad11966e05a93c18c138a403d
Author: Xiangrui Meng <meng@databricks.com>
Date: Sun Jan 26 11:49:25 2014 -0800
add new lines
commit b21bddf29850a2c006a868869b8f91960a029322
Author: Xiangrui Meng <meng@databricks.com>
Date: Sun Jan 26 11:46:35 2014 -0800
move samplers to random.IndependentRandomSampler and add tests
commit c02dacb4a941618e434cefc129c002915db08be6
Author: Xiangrui Meng <meng@databricks.com>
Date: Sat Jan 25 15:20:24 2014 -0800
add RandomSampler
commit 8ff7ba3c5cf1fc338c29ae8b5fa06c222640e89c
Author: Xiangrui Meng <meng@databricks.com>
Date: Fri Jan 24 13:23:22 2014 -0800
init impl of IndependentlySampledRDD
Remove explicit conversion to PairRDDFunctions in cogroup()
As SparkContext._ is already imported, using the implicit conversion appears to make the code much cleaner. Perhaps there was some sinister reason for doing the conversion explicitly, however.
Author: Aaron Davidson <aaron@databricks.com>
== Merge branch commits ==
commit aa4a63f1bfd5b5178fe67364dd7ce4d84c357996
Author: Aaron Davidson <aaron@databricks.com>
Date: Sun Feb 2 23:48:04 2014 -0800
Remove explicit conversion to PairRDDFunctions in cogroup()
As SparkContext._ is already imported, using the implicit conversion
appears to make the code much cleaner. Perhaps there was some sinister
reason for doing the converion explicitly, however.
Issue with failed worker registrations
I've been going through the spark source after having some odd issues with workers dying and not coming back. After some digging (I'm very new to scala and spark) I believe I've found a worker registration issue. It looks to me like a failed registration follows the same code path as a successful registration which end up with workers believing they are connected (since they received a `RegisteredWorker` event) even tho they are not registered on the Master.
This is a quick fix that I hope addresses this issue (assuming I didn't completely miss-read the code and I'm about to look like a silly person :P)
I'm opening this pr now to start a chat with you guys while I do some more testing on my side :)
Author: Erik Selin <erik.selin@jadedpixel.com>
== Merge branch commits ==
commit 973012f8a2dcf1ac1e68a69a2086a1b9a50f401b
Author: Erik Selin <erik.selin@jadedpixel.com>
Date: Tue Jan 28 23:36:12 2014 -0500
break logwarning into two lines to respect line character limit.
commit e3754dc5b94730f37e9806974340e6dd93400f85
Author: Erik Selin <erik.selin@jadedpixel.com>
Date: Tue Jan 28 21:16:21 2014 -0500
add log warning when worker registration fails due to attempt to re-register on same address.
commit 14baca241fa7823e1213cfc12a3ff2a9b865b1ed
Author: Erik Selin <erik.selin@jadedpixel.com>
Date: Wed Jan 22 21:23:26 2014 -0500
address code style comment
commit 71c0d7e6f59cd378d4e24994c21140ab893954ee
Author: Erik Selin <erik.selin@jadedpixel.com>
Date: Wed Jan 22 16:01:42 2014 -0500
Make a failed registration not persist, not send a `RegisteredWordker` event and not run `schedule` but rather send a `RegisterWorkerFailed` message to the worker attempting to register.
This fixes SPARK-1043, a bug introduced in 0.9.0
where PySpark couldn't serialize strings > 64kB.
This fix was written by @tyro89 and @bouk in #512.
This commit squashes and rebases their pull request
in order to fix some merge conflicts.
Allow files added through SparkContext.addFile() to be overwritten
This is useful for the cases when a file needs to be refreshed and downloaded by the executors periodically. For example, a possible use case is: the driver periodically renews a Hadoop delegation token and writes it to a token file. The token file needs to be downloaded by the executors whenever it gets renewed. However, the current implementation throws an exception when the target file exists and its contents do not match those of the new source. This PR adds an option to allow files to be overwritten to support use cases similar to the above.
Replace the check for None Option with isDefined and isEmpty in Scala code
Propose to replace the Scala check for Option "!= None" with Option.isDefined and "=== None" with Option.isEmpty.
I think this, using method call if possible then operator function plus argument, will make the Scala code easier to read and understand.
Pass compile and tests.
Fix PySpark hang when input files are deleted (SPARK-1025)
This pull request addresses [SPARK-1025](https://spark-project.atlassian.net/browse/SPARK-1025), an issue where PySpark could hang if its input files were deleted.
This fixes an issue where collectAsMap() could
fail when called on a JavaPairRDD that was derived
by transforming a non-JavaPairRDD.
The root problem was that we were creating the
JavaPairRDD's ClassTag by casting a
ClassTag[AnyRef] to a ClassTag[Tuple2[K2, V2]].
To fix this, I cast a ClassTag[Tuple2[_, _]]
instead, since this actually produces a ClassTag
of the appropriate type because ClassTags don't
capture type parameters:
scala> implicitly[ClassTag[Tuple2[_, _]]] == implicitly[ClassTag[Tuple2[Int, Int]]]
res8: Boolean = true
scala> implicitly[ClassTag[AnyRef]].asInstanceOf[ClassTag[Tuple2[Int, Int]]] == implicitly[ClassTag[Tuple2[Int, Int]]]
res9: Boolean = false
Remove Hadoop object cloning and warn users making Hadoop RDD's.
The code introduced in #359 used Hadoop's WritableUtils.clone() to
duplicate objects when reading from Hadoop files. Some users have
reported exceptions when cloning data in various file formats,
including Avro and another custom format.
This patch removes that functionality to ensure stability for the
0.9 release. Instead, it puts a clear warning in the documentation
that copying may be necessary for Hadoop data sets.
Fix two bugs in PySpark cartesian(): SPARK-978 and SPARK-1034
This pull request fixes two bugs in PySpark's `cartesian()` method:
- [SPARK-978](https://spark-project.atlassian.net/browse/SPARK-978): PySpark's cartesian method throws ClassCastException exception
- [SPARK-1034](https://spark-project.atlassian.net/browse/SPARK-1034): Py4JException on PySpark Cartesian Result
The JIRAs have more details describing the fixes.
The code introduced in #359 used Hadoop's WritableUtils.clone() to
duplicate objects when reading from Hadoop files. Some users have
reported exceptions when cloning data in verious file formats,
including Avro and another custom format.
This patch removes that functionality to ensure stability for the
0.9 release. Instead, it puts a clear warning in the documentation
that copying may be necessary for Hadoop data sets.
Fix bug in worker clean-up in UI
Introduced in d5a96fec (/cc @aarondav).
This should be picked into 0.8 and 0.9 as well. The bug causes old (zombie) workers on a node to not disappear immediately from the UI when a new one registers.
fix for SPARK-1027
fix for SPARK-1027 (https://spark-project.atlassian.net/browse/SPARK-1027)
FIXES
1. change sparkhome from String to Option(String) in ApplicationDesc
2. remove sparkhome parameter in LaunchExecutor message
3. adjust involved files
This bug leads to a small performance hit because task
set managers will get offered each rejected resource
offer twice, but doesn't lead to any incorrect functionality.
Minor api usability changes
- Expose checkpoint directory - since it is autogenerated now
- null check for jars
- Expose SparkHadoopUtil : so that configuration creation is abstracted even from user code to avoid duplication of functionality already in spark.
Remove Typesafe Config usage and conf files to fix nested property names
With Typesafe Config we had the subtle problem of no longer allowing
nested property names, which are used for a few of our properties:
http://apache-spark-developers-list.1001551.n3.nabble.com/Config-properties-broken-in-master-td208.html
This PR is for branch 0.9 but should be added into master too.
(cherry picked from commit 34e911ce9a)
Signed-off-by: Patrick Wendell <pwendell@gmail.com>
This is useful for the cases when a file needs to be refreshed and downloaded
by the executors periodically.
Signed-off-by: Yinan Li <liyinan926@gmail.com>
Fail rather than hanging if a task crashes the JVM.
Prior to this commit, if a task crashes the JVM, the task (and
all other tasks running on that executor) is marked at KILLED rather
than FAILED. As a result, the TaskSetManager will retry the task
indefinitely rather than failing the job after maxFailures. Eventually,
this makes the job hang, because the Standalone Scheduler removes
the application after 10 works have failed, and then the app is left
in a state where it's disconnected from the master and waiting to reconnect.
This commit fixes that problem by marking tasks as FAILED rather than
killed when an executor is lost.
The downside of this commit is that if task A fails because another
task running on the same executor caused the VM to crash, the failure
will incorrectly be counted as a failure of task A. This should not
be an issue because we typically set maxFailures to 3, and it is
unlikely that a task will be co-located with a JVM-crashing task
multiple times.
Prior to this commit, if a task crashes the JVM, the task (and
all other tasks running on that executor) is marked at KILLED rather
than FAILED. As a result, the TaskSetManager will retry the task
indefiniteily rather than failing the job after maxFailures. This
commit fixes that problem by marking tasks as FAILED rather than
killed when an executor is lost.
The downside of this commit is that if task A fails because another
task running on the same executor caused the VM to crash, the failure
will incorrectly be counted as a failure of task A. This should not
be an issue because we typically set maxFailures to 3, and it is
unlikely that a task will be co-located with a JVM-crashing task
multiple times.
Workers should use working directory as spark home if it's not specified
If users don't set SPARK_HOME in their environment file when launching an application, the standalone cluster should default to the spark home of the worker.
Note that previously Broadcast class was accidentally marked as private[spark]. It needs to be public
for broadcast variables to work. Also exposing the broadcast varaible id.
GraphX: Unifying Graphs and Tables
GraphX extends Spark's distributed fault-tolerant collections API and interactive console with a new graph API which leverages recent advances in graph systems (e.g., [GraphLab](http://graphlab.org)) to enable users to easily and interactively build, transform, and reason about graph structured data at scale. See http://amplab.github.io/graphx/.
Thanks to @jegonzal, @rxin, @ankurdave, @dcrankshaw, @jianpingjwang, @amatsukawa, @kellrott, and @adamnovak.
Tasks left:
- [x] Graph-level uncache
- [x] Uncache previous iterations in Pregel
- [x] ~~Uncache previous iterations in GraphLab~~ (postponed to post-release)
- [x] - Describe GC issue with GraphLab
- [ ] Write `docs/graphx-programming-guide.md`
- [x] - Mention future Bagel support in docs
- [ ] - Section on caching/uncaching in docs: As with Spark, cache something that is used more than once. In an iterative algorithm, try to cache and force (i.e., materialize) something every iteration, then uncache the cached things that depended on the newly materialized RDD but that won't be referenced again.
- [x] Undo modifications to core collections and instead copy them to org.apache.spark.graphx
- [x] Make Graph serializable to work around capture in Spark shell
- [x] Rename graph -> graphx in package name and subproject
- [x] Remove standalone PageRank
- [x] ~~Fix amplab/graphx#52 by checking `iter.hasNext`~~
Improvements to external sorting
1. Adds the option of compressing outputs.
2. Adds batching to the serialization to prevent OOM on the read side.
3. Slight renaming of config options.
4. Use Spark's buffer size for reads in addition to writes.
Automatically unpersisting RDDs that have been cleaned up from DStreams
Earlier RDDs generated by DStreams were forgotten but not unpersisted. The system relied on the natural BlockManager LRU to drop the data. The cleaner.ttl was a hammer to clean up RDDs but it is something that needs to be set separately and need to be set very conservatively (at best, few minutes). This automatic unpersisting allows the system to handle this automatically, which reduces memory usage. As a side effect it will also improve GC performance as there are less number of objects stored in memory. In fact, for some workloads, it may allow RDDs to be cached as deserialized, which speeds up processing without too much GC overheads.
This is disabled by default. To enable it set configuration spark.streaming.unpersist to true. In future release, this will be set to true by default.
Also, reduced sleep time in TaskSchedulerImpl.stop() from 5 second to 1 second. From my conversation with Matei, there does not seem to be any good reason for the sleep for letting messages be sent out be so long.
1. Adds the option of compressing outputs.
2. Adds batching to the serialization to prevent OOM on the read side.
3. Slight renaming of config options.
4. Use Spark's buffer size for reads in addition to writes.
Remove now un-needed hostPort option
I noticed this was logging some scary error messages in various places. After I looked into it, this is no longer really used. I removed the option and re-wrote the one remaining use case (it was unnecessary there anyways).
Disable shuffle file consolidation by default
After running various performance tests for the 0.9 release, this still seems to have performance issues even on XFS. So let's keep this off-by-default for 0.9 and users can experiment with it depending on their disk configurations.
Remove simple redundant return statements for Scala methods/functions
Remove simple redundant return statements for Scala methods/functions:
-) Only change simple return statements at the end of method
-) Ignore the complex if-else check
-) Ignore the ones inside synchronized
-) Add small changes to making var to val if possible and remove () for simple get
This hopefully makes the review simpler =)
Pass compile and tests.
Setting load defaults to true in executor
This preserves the behavior in earlier releases. If properties are set for the executors via `spark-env.sh` on the slaves, then they should take precedence over spark defaults. This is useful for if system administrators are setting properties for a standalone cluster, such as shuffle locations.
/cc @andrewor14 who initially reported this issue.
Stop SparkListenerBus daemon thread when DAGScheduler is stopped.
Otherwise this leads to hundreds of SparkListenerBus daemon threads in our unit tests (and also problematic if user applications launches multiple SparkContext).
We clone hadoop key and values by default and reuse objects if asked to.
We try to clone for most common types of writables and we call WritableUtils.clone otherwise intention is to optimize, for example for NullWritable there is no need and for Long, int and String creating a new object with value set would be faster than doing copy on object hopefully.
There is another way to do this PR where we ask for both key and values whether to clone them or not, but could not think of a use case for it except either of them is actually a NullWritable for which I have already worked around. So thought that would be unnecessary.
API for automatic driver recovery for streaming programs and other bug fixes
1. Added Scala and Java API for automatically loading checkpoint if it exists in the provided checkpoint directory.
Scala API: `StreamingContext.getOrCreate(<checkpoint dir>, <function to create new StreamingContext>)` returns a StreamingContext
Java API: `JavaStreamingContext.getOrCreate(<checkpoint dir>, <factory obj of type JavaStreamingContextFactory>)`, return a JavaStreamingContext
See the RecoverableNetworkWordCount below as an example of how to use it.
2. Refactored streaming.Checkpoint*** code to fix bugs and make the DStream metadata checkpoint writing and reading more robust. Specifically, it fixes and improves the logic behind backing up and writing metadata checkpoint files. Also, it ensure that spark.driver.* and spark.hostPort is cleared from SparkConf before being written to checkpoint.
3. Fixed bug in cleaning up of checkpointed RDDs created by DStream. Specifically, this fix ensures that checkpointed RDD's files are not prematurely cleaned up, thus ensuring reliable recovery.
4. TimeStampedHashMap is upgraded to optionally update the timestamp on map.get(key). This allows clearing of data based on access time (i.e., clear records were last accessed before a threshold timestamp).
5. Added caching for file modification time in FileInputDStream using the updated TimeStampedHashMap. Without the caching, enumerating the mod times to find new files can take seconds if there are 1000s of files. This cache is automatically cleared.
This PR is not entirely final as I may make some minor additions - a Java examples, and adding StreamingContext.getOrCreate to unit test.
Edit: Java example to be added later, unit test added.
External Sorting for Aggregator and CoGroupedRDDs (Revisited)
(This pull request is re-opened from https://github.com/apache/incubator-spark/pull/303, which was closed because Jenkins / github was misbehaving)
The target issue for this patch is the out-of-memory exceptions triggered by aggregate operations such as reduce, groupBy, join, and cogroup. The existing AppendOnlyMap used by these operations resides purely in memory, and grows with the size of the input data until the amount of allocated memory is exceeded. Under large workloads, this problem is aggravated by the fact that OOM frequently occurs only after a very long (> 1 hour) map phase, in which case the entire job must be restarted.
The solution is to spill the contents of this map to disk once a certain memory threshold is exceeded. This functionality is provided by ExternalAppendOnlyMap, which additionally sorts this buffer before writing it out to disk, and later merges these buffers back in sorted order.
Under normal circumstances in which OOM is not triggered, ExternalAppendOnlyMap is simply a wrapper around AppendOnlyMap and incurs little overhead. Only when the memory usage is expected to exceed the given threshold does ExternalAppendOnlyMap spill to disk.
Aside from trivial formatting changes, use nulls instead of Options for
DiskMapIterator, and add documentation for spark.shuffle.externalSorting
and spark.shuffle.memoryFraction.
Also, set spark.shuffle.memoryFraction to 0.3, and spark.storage.memoryFraction = 0.6.
Yarn client addjar and misc fixes
Fix the addJar functionality in yarn-client mode, add support for the other options supported in yarn-standalone mode, set the application type on yarn in hadoop 2.X, add documentation, change heartbeat interval to be same code as the yarn-standalone so it doesn't take so long to get containers and exit.
Make DEBUG-level logs consummable.
Removes two things that caused issues with the debug logs:
(a) Internal polling in the DAGScheduler was polluting the logs.
(b) The Scala REPL logs were really noisy.
Removes two things that caused issues with the debug logs:
(a) Internal polling in the DAGScheduler was polluting the logs.
(b) The Scala REPL logs were really noisy.
Fix bug added when we changed AppDescription.maxCores to an Option
The Scala compiler warned about this -- we were comparing an Option against an integer now.
This is an alternative to the existing approach, which evenly distributes the
collective shuffle memory among all running tasks. In the new approach, each
thread requests a chunk of memory whenever its map is about to multiplicatively
grow. If there is sufficient memory in the global pool, the thread allocates it
and grows its map. Otherwise, it spills.
A danger with the previous approach is that a new task may quickly fill up its
map before old tasks finish spilling, potentially causing an OOM. This approach
prevents this scenario as it favors existing tasks over new tasks; any thread
that may step over the boundary of other threads defensively backs off and
starts spilling.
Testing through spark-perf reveals: (1) When no spills have occured, the
performance of external sorting using this memory management approach is
essentially the same as without external sorting. (2) When one or more spills
have occured, the performance of external sorting is a small multiple (3x) worse
Add some missing Java API methods
These are primarily for setting job groups, canceling jobs, and setting names on RDDs. Seemed like useful stuff to expose in Java.
Bug fixes for updating the RDD block's memory and disk usage information
Bug fixes for updating the RDD block's memory and disk usage information.
From the code context, we can find that the memSize and diskSize here are both always equal to the size of the block. Actually, they never be zero. Thus, the logic here is wrong for recording the block usage in BlockStatus, especially for the blocks which are dropped from memory to ensure space for the new input rdd blocks. I have tested it that this would cause the storage metrics shown in the Storage webpage wrong and misleading. With this patch, the metrics will be okay.
Finally, Merry Christmas, guys:)
SPARK-998: Support Launching Driver Inside of Standalone Mode
[NOTE: I need to bring the tests up to date with new changes, so for now they will fail]
This patch provides support for launching driver programs inside of a standalone cluster manager. It also supports monitoring and re-launching of driver programs which is useful for long running, recoverable applications such as Spark Streaming jobs. For those jobs, this patch allows a deployment mode which is resilient to the failure of any worker node, failure of a master node (provided a multi-master setup), and even failures of the applicaiton itself, provided they are recoverable on a restart. Driver information, such as the status and logs from a driver, is displayed in the UI
There are a few small TODO's here, but the code is generally feature-complete. They are:
- Bring tests up to date and add test coverage
- Restarting on failure should be optional and maybe off by default.
- See if we can re-use akka connections to facilitate clients behind a firewall
A sensible place to start for review would be to look at the `DriverClient` class which presents users the ability to launch their driver program. I've also added an example program (`DriverSubmissionTest`) that allows you to test this locally and play around with killing workers, etc. Most of the code is devoted to persisting driver state in the cluster manger, exposing it in the UI, and dealing correctly with various types of failures.
Instructions to test locally:
- `sbt/sbt assembly/assembly examples/assembly`
- start a local version of the standalone cluster manager
```
./spark-class org.apache.spark.deploy.client.DriverClient \
-j -Dspark.test.property=something \
-e SPARK_TEST_KEY=SOMEVALUE \
launch spark://10.99.1.14:7077 \
../path-to-examples-assembly-jar \
org.apache.spark.examples.DriverSubmissionTest 1000 some extra options --some-option-here -X 13
```
- Go in the UI and make sure it started correctly, look at the output etc
- Kill workers, the driver program, masters, etc.
Minor style cleanup. Mostly on indenting & line width changes.
Focused on the few important files since they are the files that new contributors usually read first.
Set boolean param name for call to SparkHadoopMapReduceUtil.newTaskAttemptID
Set boolean param name for call to SparkHadoopMapReduceUtil.newTaskAttemptID to make it clear which param being set.
Remove calls to deprecated mapred's OutputCommitter.cleanupJob
Since Hadoop 1.0.4 the mapred OutputCommitter.commitJob should do cleanup job via call to OutputCommitter.cleanupJob,
Remove SparkHadoopWriter.cleanup since it is used only by PairRDDFunctions.
In fact the implementation of mapred OutputCommitter.commitJob looks like this:
public void commitJob(JobContext jobContext) throws IOException {
cleanupJob(jobContext);
}
the mapred OutputCommitter.commitJob should do cleanup job.
In fact the implementation of mapred OutputCommitter.commitJob looks like this:
public void commitJob(JobContext jobContext) throws IOException {
cleanupJob(jobContext);
}
(The jobContext input argument is type of org.apache.hadoop.mapred.JobContext)
Get rid of `Either[ActorRef, ActorSelection]'
In this pull request, instead of returning an `Either[ActorRef, ActorSelection]`, `registerOrLookup` identifies the remote actor blockingly to obtain an `ActorRef`, or throws an exception if the remote actor doesn't exist or the lookup times out (configured by `spark.akka.lookupTimeout`). This function is only called when an `SparkEnv` is constructed (instantiating driver or executor), so the blocking call is considered acceptable. Executor side `ActorSelection`s/`ActorRef`s to driver side `MapOutputTrackerMasterActor` and `BlockManagerMasterActor` are affected by this pull request.
`ActorSelection` is dangerous and should be used with care. It's only absolutely safe to send messages via an `ActorSelection` when the remote actor is stateless, so that actor incarnation is irrelevant. But as pointed by @ScrapCodes in the comments below, executor exits immediately once the connection to the driver lost, `ActorSelection`s are not harmful in this scenario. So this pull request is mostly a code style patch.
Add way to limit default # of cores used by apps in standalone mode
Also documents the spark.deploy.spreadOut option, and fixes a config option that had a dash in its name.
Don't leave os.arch unset after BlockManagerSuite
Recent SparkConf changes meant that BlockManagerSuite was now leaving the os.arch System.property unset. That's a problem for any subsequent tests that rely upon having a valid os.arch. This is true for CompressionCodecSuite in the usual maven build test order, even though it isn't usually true for the sbt build.
To make this work I had to rename the defaults file. Otherwise
maven's pattern matching rules included it when trying to match
other log4j.properties files.
I also fixed a bug in the existing maven build where two
<transformers> tags were present in assembly/pom.xml
such that one overwrote the other.
Suggested small changes to Java code for slightly more standard style, encapsulation and in some cases performance
Sorry if this is too abrupt or not a welcome set of changes, but thought I'd see if I could contribute a little. I'm a Java developer and just getting seriously into Spark. So I thought I'd suggest a number of small changes to the couple Java parts of the code to make it a little tighter, more standard and even a bit faster.
Feel free to take all, some or none of this. Happy to explain any of it.
```
[error] /pod/home/anovak/build/graphx/core/src/main/scala/org/apache/spark/storage/ShuffleBlockManager.scala:172: not enough arguments for constructor PrimitiveKeyOpenHashMap: (initialCapacity: Int)(implicit evidence$3: ClassManifest[Int], implicit evidence$4: ClassManifest[Int])org.apache.spark.util.collection.PrimitiveKeyOpenHashMap[Int,Int]
[error] private val mapIdToIndex = new PrimitiveKeyOpenHashMap[Int, Int]()
[error] ^
[info] No documentation generated with unsucessful compiler run
[error] one error found
[error] (core/compile:doc) Scaladoc generation failed
[error] Total time: 67 s, completed Jan 6, 2014 2:20:51 PM
```
In theory a no-argument constructor ought not to differ from one with a single argument that has a default value, but in practice there seems to be an issue.
Fix handling of empty SPARK_EXAMPLES_JAR
Currently if SPARK_EXAMPLES_JAR is left unset you get a null pointer exception when running the examples (atleast on spark on yarn). The null now gets turned into a string of "null" when its put into the SparkConf so addJar no longer properly ignores it. This fixes that so that it can be left unset.
Quiet ERROR-level Akka Logs
This fixes an issue I've seen where akka logs a bunch of things at ERROR level when connecting to a standalone cluster, even in the normal case. I noticed that even when lifecycle logging was disabled, the netty code inside of akka still logged away via akka's EndpointWriter class. There are also some other log streams that I think are new in akka 2.2.1 that I've disabled.
Finally, I added some better logging to the standalone client. This makes it more clear when a connection failure occurs what is going on. Previously it never explicitly said if a connection attempt had failed.
The commit messages here have some more detail.
Removing SPARK_EXAMPLES_JAR in the code
This re-writes all of the examples to use the `SparkContext.jarOfClass` mechanism for loading the examples jar. This necessary for environments like YARN and the Standalone mode where example programs will be submit from inside the cluster rather than at the client using `./spark-example`.
This still leaves SPARK_EXAMPLES_JAR in place in the shell scripts for setting up the classpath if `./spark-example` is run.
Although we can send messages via an ActorSelection, it would be better to identify the actor and obtain an ActorRef first, so that we can get informed earlier if the remote actor doesn't exist, and get rid of the annoying Either wrapper.
Without these it's a bit less clear what's going on for the user.
One thing I realize when doing this is that akka itself actually retries
the initial association. So the retry we currently have is redundant with
akka's.
I noticed when connecting to a standalone cluster Spark gives a bunch
of Akka ERROR logs that make it seem like something is failing.
This patch does two things:
1. Akka dead letter logging is turned on/off according to the existing
lifecycle spark property.
2. We explicitly silence akka's EndpointWriter log in log4j. This is necessary
because for some reason that log doesn't pick up on the lifecycle
logging settings. After a few hours of debugging this was the only solution
I found that worked.
Further, divide this threshold by the number of tasks running concurrently.
Note that this does not guard against the following scenario: a new task
quickly fills up its share of the memory before old tasks finish spilling
their contents, in which case the total memory used by such maps may exceed
what was specified. Currently, spark.shuffle.safetyFraction mitigates the
effect of this.
Remove erroneous FAILED state for killed tasks.
Currently, when tasks are killed, the Executor first sends a
status update for the task with a "KILLED" state, and then
sends a second status update with a "FAILED" state saying that
the task failed due to an exception. The second FAILED state is
misleading/unncessary, and occurs due to a NonLocalReturnControl
Exception that gets thrown due to the way we kill tasks. This
commit eliminates that problem.
I'm not at all sure that this is the best way to fix this problem,
so alternate suggestions welcome. @rxin guessing you're the right
person to look at this.
Improvements to DStream window ops and refactoring of Spark's CheckpointSuite
- Added a new RDD - PartitionerAwareUnionRDD. Using this RDD, one can take multiple RDDs partitioned by the same partitioner and unify them into a single RDD while preserving the partitioner. So m RDDs with p partitions each will be unified to a single RDD with p partitions and the same partitioner. The preferred location for each partition of the unified RDD will be the most common preferred location of the corresponding partitions of the parent RDDs. For example, location of partition 0 of the unified RDD will be where most of partition 0 of the parent RDDs are located.
- Improved the performance of DStream's reduceByKeyAndWindow and groupByKeyAndWindow. Both these operations work by doing per-batch reduceByKey/groupByKey and then using PartitionerAwareUnionRDD to union the RDDs across the window. This eliminates a shuffle related to the window operation, which can reduce batch processing time by 30-40% for simple workloads.
- Fixed bugs and simplified Spark's CheckpointSuite. Some of the tests were incorrect and unreliable. Added missing tests for ZippedRDD. I can go into greater detail if necessary.
- Added mapSideCombine option to combineByKeyAndWindow.
SPARK-991: Report information gleaned from a Python stacktrace in the UI
Scala:
- Added setCallSite/clearCallSite to SparkContext and JavaSparkContext.
These functions mutate a LocalProperty called "externalCallSite."
- Add a wrapper, getCallSite, that checks for an externalCallSite and, if
none is found, calls the usual Utils.formatSparkCallSite.
- Change everything that calls Utils.formatSparkCallSite to call
getCallSite instead. Except getCallSite.
- Add wrappers to setCallSite/clearCallSite wrappers to JavaSparkContext.
Python:
- Add a gruesome hack to rdd.py that inspects the traceback and guesses
what you want to see in the UI.
- Add a RAII wrapper around said gruesome hack that calls
setCallSite/clearCallSite as appropriate.
- Wire said RAII wrapper up around three calls into the Scala code.
I'm not sure that I hit all the spots with the RAII wrapper. I'm also
not sure that my gruesome hack does exactly what we want.
One could also approach this change by refactoring
runJob/submitJob/runApproximateJob to take a call site, then threading
that parameter through everything that needs to know it.
One might object to the pointless-looking wrappers in JavaSparkContext.
Unfortunately, I can't directly access the SparkContext from
Python---or, if I can, I don't know how---so I need to wrap everything
that matters in JavaSparkContext.
Conflicts:
core/src/main/scala/org/apache/spark/api/java/JavaSparkContext.scala
Currently, when tasks are killed, the Executor first sends a
status update for the task with a "KILLED" state, and then
sends a second status update with a "FAILED" state saying that
the task failed due to an exception. The second FAILED state is
misleading/unncessary, and occurs due to a NonLocalReturnControl
Exception that gets thrown due to the way we kill tasks. This
commit eliminates that problem.
Also replaced SparkConf.getOrElse with just a "get" that takes a default
value, and added getInt, getLong, etc to make code that uses this
simpler later on.
Approximate distinct count
Added countApproxDistinct() to RDD and countApproxDistinctByKey() to PairRDDFunctions to approximately count distinct number of elements and distinct number of values per key, respectively. Both functions use HyperLogLog from stream-lib for counting. Both functions take a parameter that controls the trade-off between accuracy and memory consumption. Also added Scala docs and test suites for both methods.
Bug fixes for file input stream and checkpointing
- Fixed bugs in the file input stream that led the stream to fail due to transient HDFS errors (listing files when a background thread it deleting fails caused errors, etc.)
- Updated Spark's CheckpointRDD and Streaming's CheckpointWriter to use SparkContext.hadoopConfiguration, to allow checkpoints to be written to any HDFS compatible store requiring special configuration.
- Changed the API of SparkContext.setCheckpointDir() - eliminated the unnecessary 'useExisting' parameter. Now SparkContext will always create a unique subdirectory within the user specified checkpoint directory. This is to ensure that previous checkpoint files are not accidentally overwritten.
- Fixed bug where setting checkpoint directory as a relative local path caused the checkpointing to fail.
This gives us a couple advantages:
- Uses spark.local.dir and randomly selects a directory/disk.
- Ensure files are deleted on normal DiskBlockManager cleanup.
- Availability of same stats as usual DiskBlockObjectWriter (currenty unused).
Also enable basic cleanup when iterator is fully drained.
Still requires cleanup for operations that fail or don't go through all elements.
Changed naming of StageCompleted event to be consistent
The rest of the SparkListener events are named with "SparkListener"
as the prefix of the name; this commit renames the StageCompleted
event to SparkListenerStageCompleted for consistency.
1. Adds a default log4j file that gets loaded if users haven't specified a log4j file.
2. Isolates use of the tools assembly jar. I found this produced SLF4J warnings
after building with SBT (and I've seen similar warnings on the mailing list).
- Got rid of global SparkContext.globalConf
- Pass SparkConf to serializers and compression codecs
- Made SparkConf public instead of private[spark]
- Improved API of SparkContext and SparkConf
- Switched executor environment vars to be passed through SparkConf
- Fixed some places that were still using system properties
- Fixed some tests, though others are still failing
This still fails several tests in core, repl and streaming, likely due
to properties not being set or cleared correctly (some of the tests run
fine in isolation).
Removed unused OtherFailure TaskEndReason.
The OtherFailure TaskEndReason was added by @mateiz 3 years ago in this commit: 24a1e7f838
Unless I am missing something, it doesn't seem to have been used then, and is not used now, so seems safe for deletion.
The rest of the SparkListener events are named with "SparkListener"
as the prefix of the name; this commit renames the StageCompleted
event to SparkListenerStageCompleted for consistency.
Deduplicate Local and Cluster schedulers.
The code in LocalScheduler/LocalTaskSetManager was nearly identical
to the code in ClusterScheduler/ClusterTaskSetManager. The redundancy
made making updating the schedulers unnecessarily painful and error-
prone. This commit combines the two into a single TaskScheduler/
TaskSetManager.
Unfortunately the diff makes this change look much more invasive than it is -- TaskScheduler.scala is only superficially changed (names updated, overrides removed) from the old ClusterScheduler.scala, and the same with
TaskSetManager.scala.
Thanks @rxin for suggesting this change!
Clean up shuffle files once their metadata is gone
Previously, we would only clean the in-memory metadata for consolidated shuffle files.
Additionally, fixes a bug where the Metadata Cleaner was ignoring type-specific TTLs.
Refactored the streaming scheduler and added StreamingListener interface
- Refactored the streaming scheduler for cleaner code. Specifically, the JobManager was renamed to JobScheduler, as it does the actual scheduling of Spark jobs to the SparkContext. The earlier Scheduler was renamed to JobGenerator, as it actually generates the jobs from the DStreams. The JobScheduler starts the JobGenerator. Also, moved all the scheduler related code from spark.streaming to spark.streaming.scheduler package.
- Implemented the StreamingListener interface, similar to SparkListener. The streaming version of StatusReportListener prints the batch processing time statistics (for now). Added StreamingListernerSuite to test it.
- Refactored streaming TestSuiteBase for deduping code in the other streaming testsuites.
Track and report task result serialisation time.
- DirectTaskResult now has a ByteBuffer valueBytes instead of a T value.
- DirectTaskResult now has a member function T value() that deserialises valueBytes.
- Executor serialises value into a ByteBuffer and passes it to DTR's ctor.
- Executor tracks the time taken to do so and puts it in a new field in TaskMetrics.
- StagePage now reports serialisation time from TaskMetrics along with the other things it reported.
Previously, we would only clean the in-memory metadata for consolidated
shuffle files.
Additionally, fixes a bug where the Metadata Cleaner was ignoring type-
specific TTLs.
Add collectPartition to JavaRDD interface.
This interface is useful for implementing `take` from other language frontends where the data is serialized. Also remove `takePartition` from PythonRDD and use `collectPartition` in rdd.py.
Thanks @concretevitamin for the original change and tests.
Change the implementation to use runJob instead of PartitionPruningRDD.
Also update the unit tests and the python take implementation
to use the new interface.
despite having a low number of nodes and relatively small workload (16 nodes, <1.5 TB data).
This would cause an entire job to fail at the beginning of the reduce phase.
There is no particular reason for this value to be small as a timeout should only occur
in an exceptional situation.
Also centralized the reading of spark.akka.askTimeout to AkkaUtils (surely this can later
be cleaned up to use Typesafe).
Finally, deleted some lurking implicits. If anyone can think of a reason they should still
be there, please let me know.
Fix for spark.task.maxFailures not enforced correctly.
Docs at http://spark.incubator.apache.org/docs/latest/configuration.html say:
```
spark.task.maxFailures
Number of individual task failures before giving up on the job. Should be greater than or equal to 1. Number of allowed retries = this value - 1.
```
Previous implementation worked incorrectly. When for example `spark.task.maxFailures` was set to 1, the job was aborted only after the second task failure, not after the first one.
- Refactored Scheduler + JobManager to JobGenerator + JobScheduler and
added JobSet for cleaner code. Moved scheduler related code to
streaming.scheduler package.
- Added StreamingListener trait (similar to SparkListener) to enable
gathering to streaming stats like processing times and delays.
StreamingContext.addListener() to added listeners.
- Deduped some code in streaming tests by modifying TestSuiteBase, and
added StreamingListenerSuite.
- Made file stream more robust to transient failures.
- Changed Spark.setCheckpointDir API to not have the second
'useExisting' parameter. Spark will always create a unique directory
for checkpointing underneath the directory provide to the funtion.
- Fixed bug wrt local relative paths as checkpoint directory.
- Made DStream and RDD checkpointing use
SparkContext.hadoopConfiguration, so that more HDFS compatible
filesystems are supported for checkpointing.
stageId <--> jobId mapping in DAGScheduler
Okay, I think this one is ready to go -- or at least it's ready for review and discussion. It's a carry-over of https://github.com/mesos/spark/pull/842 with updates for the newer job cancellation functionality. The prior discussion still applies. I've actually changed the job cancellation flow a bit: Instead of ``cancelTasks`` going to the TaskScheduler and then ``taskSetFailed`` coming back to the DAGScheduler (resulting in ``abortStage`` there), the DAGScheduler now takes care of figuring out which stages should be cancelled, tells the TaskScheduler to cancel tasks for those stages, then does the cleanup within the DAGScheduler directly without the need for any further prompting by the TaskScheduler.
I know of three outstanding issues, each of which can and should, I believe, be handled in follow-up pull requests:
1) https://spark-project.atlassian.net/browse/SPARK-960
2) JobLogger should be re-factored to eliminate duplication
3) Related to 2), the WebUI should also become a consumer of the DAGScheduler's new understanding of the relationship between jobs and stages so that it can display progress indication and the like grouped by job. Right now, some of this information is just being sent out as part of ``SparkListenerJobStart`` messages, but more or different job <--> stage information may need to be exported from the DAGScheduler to meet listeners needs.
Except for the eventQueue -> Actor commit, the rest can be cherry-picked almost cleanly into branch-0.8. A little merging is needed in MapOutputTracker and the DAGScheduler. Merged versions of those files are in aba2b40ce0
Note that between the recent Actor change in the DAGScheduler and the cleaning up of DAGScheduler data structures on job completion in this PR, some races have been introduced into the DAGSchedulerSuite. Those tests usually pass, and I don't think that better-behaved code that doesn't directly inspect DAGScheduler data structures should be seeing any problems, but I'll work on fixing DAGSchedulerSuite as either an addition to this PR or as a separate request.
UPDATE: Fixed the race that I introduced. Created a JIRA issue (SPARK-965) for the one that was introduced with the switch to eventProcessorActor in the DAGScheduler.
Change the name of input argument in ClusterScheduler#initialize from context to backend.
The SchedulerBackend used to be called ClusterSchedulerContext so just want to make small
change of the input param in the ClusterScheduler#initialize to reflect this.
Added logging of scheduler delays to UI
This commit adds two metrics to the UI:
1) The time to get task results, if they're fetched remotely
2) The scheduler delay. When the scheduler starts getting overwhelmed (because it can't keep up with the rate at which tasks are being submitted), the result is that tasks get delayed on the tail-end: the message from the worker saying that the task has completed ends up in a long queue and takes a while to be processed by the scheduler. This commit records that delay in the UI so that users can tell when the scheduler is becoming the bottleneck.
Memoize preferred locations in ZippedPartitionsBaseRDD
so preferred location computation doesn't lead to exponential explosion.
This was a problem in GraphX where we have a whole chain of RDDs that are ZippedPartitionsRDD's, and the preferred locations were taking eternity to compute.
(cherry picked from commit e36fe55a03)
Signed-off-by: Reynold Xin <rxin@apache.org>
The SchedulerBackend used to be called ClusterSchedulerContext so just want to make small
change of the input param in the ClusterScheduler#initialize to reflect this.
Hadoop 2.2 migration
Includes support for the YARN API stabilized in the Hadoop 2.2 release, and a few style patches.
Short description for each set of commits:
a98f5a0 - "Misc style changes in the 'yarn' package"
a67ebf4 - "A few more style fixes in the 'yarn' package"
Both of these are some minor style changes, such as fixing lines over 100 chars, to the existing YARN code.
ab8652f - "Add a 'new-yarn' directory ... "
Copies everything from `SPARK_HOME/yarn` to `SPARK_HOME/new-yarn`. No actual code changes here.
4f1c3fa - "Hadoop 2.2 YARN API migration ..."
API patches to code in the `SPARK_HOME/new-yarn` directory. There are a few more small style changes mixed in, too.
Based on @colorant's Hadoop 2.2 support for the scala-2.10 branch in #141.
a1a1c62 - "Add optional Hadoop 2.2 settings in sbt build ... "
If Spark should be built against Hadoop 2.2, then:
a) the `org.apache.spark.deploy.yarn` package will be compiled from the `new-yarn` directory.
b) Protobuf v2.5 will be used as a Spark dependency, since Hadoop 2.2 depends on it. Also, Spark will be built against a version of Akka v2.0.5 that's built against Protobuf 2.5, named `akka-2.0.5-protobuf-2.5`. The patched Akka is here: https://github.com/harveyfeng/akka/tree/2.0.5-protobuf-2.5, and was published to local Ivy during testing.
There's also a new boolean environment variable, `SPARK_IS_NEW_HADOOP`, that users can manually set if their `SPARK_HADOOP_VERSION` specification does not start with `2.2`, which is how the build file tries to detect a 2.2 version. Not sure if this is necessary or done in the best way, though...
Fix small bug in web UI and minor clean-up.
There was a bug where sorting order didn't work correctly for write time metrics.
I also cleaned up some earlier code that fixed the same issue for read and
write bytes.
There was a bug where sorting order didn't work correctly for write time metrics.
I also cleaned up some earlier code that fixed the same issue for read and
write bytes.
...and make sure that DAGScheduler data structures are cleaned up on job completion.
Initial effort and discussion at https://github.com/mesos/spark/pull/842
Re-enable zk:// urls for Mesos SparkContexts
This was broken in PR #71 when we explicitly disallow anything that didn't fit a mesos:// url.
Although it is not really clear that a zk:// url should match Mesos, it is what the docs say and it is necessary for backwards compatibility.
Additionally added a unit test for the creation of all types of TaskSchedulers. Since YARN and Mesos are not necessarily available in the system, they are allowed to pass as long as the YARN/Mesos code paths are exercised.
Scheduler quits when newStage fails
The current scheduler thread does not handle exceptions from newStage stage while launching new jobs. The thread fails on any exception that gets triggered at that level, leaving the cluster hanging with no schduler.
The current scheduler thread does not handle exceptions from createStage stage while launching new jobs. The thread fails on any exception that gets triggered at that level, leaving the cluster hanging with no schduler.
This was broken in PR #71 when we explicitly disallow anything that
didn't fit a mesos:// url.
Although it is not really clear that a zk:// url should match Mesos,
it is what the docs say and it is necessary for backwards compatibility.
SPARK-965: https://spark-project.atlassian.net/browse/SPARK-965
SPARK-966: https://spark-project.atlassian.net/browse/SPARK-966
* Add back DAGScheduler.start(), eventProcessActor is created and started here.
Notice that function is only called by SparkContext.
* Cancel the scheduled stage resubmission task when stopping eventProcessActor
* Add a new DAGSchedulerEvent ResubmitFailedStages
This event message is sent by the scheduled stage resubmission task to eventProcessActor. In this way, DAGScheduler.resubmitFailedStages is guaranteed to be executed from the same thread that runs DAGScheduler.processEvent.
Please refer to discussion in SPARK-966 for details.
add http timeout for httpbroadcast
While pulling task bytecode from HttpBroadcast server, there's no timeout value set. This may cause spark executor code hang and other task in the same executor process wait for the lock. I have encountered the issue in my cluster. Here's the stacktrace I captured : https://gist.github.com/haitaoyao/7655830
So add a time out value to ensure the task fail fast.
Custom Serializers for PySpark
This pull request adds support for custom serializers to PySpark. For now, all Python-transformed (or parallelize()d RDDs) are serialized with the same serializer that's specified when creating SparkContext.
For now, PySpark includes `PickleSerDe` and `MarshalSerDe` classes for using Python's `pickle` and `marshal` serializers. It's pretty easy to add support for other serializers, although I still need to add instructions on this.
A few notable changes:
- The Scala `PythonRDD` class no longer manipulates Pickled objects; data from `textFile` is written to Python as MUTF-8 strings. The Python code performs the appropriate bookkeeping to track which deserializer should be used when reading an underlying JavaRDD. This mechanism could also be used to support other data exchange formats, such as MsgPack.
- Several magic numbers were refactored into constants.
- Batching is implemented by wrapping / decorating an unbatched SerDe.
Log a warning if a task's serialized size is very big
As per Reynold's instructions, we now create a warning level log entry if a task's serialized size is too big. "Too big" is currently defined as 100kb. This warning message is generated at most once for each stage.
OpenHashSet fixes
Incorporated ideas from pull request #200.
- Use Murmur Hash 3 finalization step to scramble the bits of HashCode
instead of the simpler version in java.util.HashMap; the latter one
had trouble with ranges of consecutive integers. Murmur Hash 3 is used
by fastutil.
- Don't check keys for equality when re-inserting due to growing the
table; the keys will already be unique.
- Remember the grow threshold instead of recomputing it on each insert
Also added unit tests for size estimation for specialized hash sets and maps.
Use the proper partition index in mapPartitionsWIthIndex
mapPartitionsWithIndex uses TaskContext.partitionId as the partition index. TaskContext.partitionId used to be identical to the partition index in a RDD. However, pull request #186 introduced a scenario (with partition pruning) that the two can be different. This pull request uses the right partition index in all mapPartitionsWithIndex related calls.
Also removed the extra MapPartitionsWIthContextRDD and put all the mapPartitions related functionality in MapPartitionsRDD.
For SPARK-527, Support spark-shell when running on YARN
sync to trunk and resubmit here
In current YARN mode approaching, the application is run in the Application Master as a user program thus the whole spark context is on remote.
This approaching won't support application that involve local interaction and need to be run on where it is launched.
So In this pull request I have a YarnClientClusterScheduler and backend added.
With this scheduler, the user application is launched locally,While the executor will be launched by YARN on remote nodes with a thin AM which only launch the executor and monitor the Driver Actor status, so that when client app is done, it can finish the YARN Application as well.
This enables spark-shell to run upon YARN.
This also enable other Spark applications to have the spark context to run locally with a master-url "yarn-client". Thus e.g. SparkPi could have the result output locally on console instead of output in the log of the remote machine where AM is running on.
Docs also updated to show how to use this yarn-client mode.
- Use Murmur Hash 3 finalization step to scramble the bits of HashCode
instead of the simpler version in java.util.HashMap; the latter one
had trouble with ranges of consecutive integers. Murmur Hash 3 is used
by fastutil.
- Don't check keys for equality when re-inserting due to growing the
table; the keys will already be unique
- Remember the grow threshold instead of recomputing it on each insert
Add graphite sink for metrics
This adds a metrics sink for graphite. The sink must
be configured with the host and port of a graphite node
and optionally may be configured with a prefix that will
be prepended to all metrics that are sent to graphite.
XORShift RNG with unit tests and benchmark
This patch was introduced to address SPARK-950 - the discussion below the ticket explains not only the rationale, but also the design and testing decisions: https://spark-project.atlassian.net/browse/SPARK-950
To run unit test, start SBT console and type:
compile
test-only org.apache.spark.util.XORShiftRandomSuite
To run benchmark, type:
project core
console
Once the Scala console starts, type:
org.apache.spark.util.XORShiftRandom.benchmark(100000000)
XORShiftRandom is also an object with a main method taking the
number of iterations as an argument, so you can also run it
from the command line.
Fix 'timeWriting' stat for shuffle files
Due to concurrent git branches, changes from shuffle file consolidation patch
caused the shuffle write timing patch to no longer actually measure the time,
since it requires time be measured after the stream has been closed.
Also changed the semantics of the index parameter in mapPartitionsWithIndex from the partition index of the output partition to the partition index in the current RDD.
- Don't check keys for equality when re-inserting due to growing the
table; the keys will already be unique
- Remember the grow threshold instead of recomputing it on each insert
- Use Murmur Hash 3 finalization step to scramble the bits of HashCode
instead of the simpler version in java.util.HashMap; the latter one
had trouble with ranges of consecutive integers. Murmur Hash 3 is used
by fastutil.
- Use Object.equals() instead of Scala's == to compare keys, because the
latter does extra casts for numeric types (see the equals method in
https://github.com/scala/scala/blob/master/src/library/scala/runtime/BoxesRunTime.java)
Due to concurrent git branches, changes from shuffle file consolidation patch
caused the shuffle write timing patch to no longer actually measure the time,
since it requires time be measured after the stream has been closed.
With this scheduler, the user application is launched locally,
While the executor will be launched by YARN on remote nodes.
This enables spark-shell to run upon YARN.
PartitionPruningRDD is using index from parent
I was getting a ArrayIndexOutOfBoundsException exception after doing union on pruned RDD. The index it was using on the partition was the index in the original RDD not the new pruned RDD.
To run unit test, start SBT console and type:
compile
test-only org.apache.spark.util.XORShiftRandomSuite
To run benchmark, type:
project core
console
Once the Scala console starts, type:
org.apache.spark.util.XORShiftRandom.benchmark(100000000)
Simple cleanup on Spark's Scala code
Simple cleanup on Spark's Scala code while testing some modules:
-) Remove some of unused imports as I found them
-) Remove ";" in the imports statements
-) Remove () at the end of method calls like size that does not have size effect.
-) Remove some of unused imports as I found them
-) Remove ";" in the imports statements
-) Remove () at the end of method call like size that does not have size effect.
Fix bug where scheduler could hang after task failure.
When a task fails, we need to call reviveOffers() so that the
task can be rescheduled on a different machine. In the current code,
the state in ClusterTaskSetManager indicating which tasks are
pending may be updated after revive offers is called (there's a
race condition here), so when revive offers is called, the task set
manager does not yet realize that there are failed tasks that need
to be relaunched.
This isn't currently unit tested but will be once my pull request for
merging the cluster and local schedulers goes in -- at which point
many more of the unit tests will exercise the code paths through
the cluster scheduler (currently the failure test suite uses the local
scheduler, which is why we didn't see this bug before).
I've diff'd this patch against my own -- since they were both created
independently, this means that two sets of eyes have gone over all the
merge conflicts that were created, so I'm feeling significantly more
confident in the resulting PR.
@rxin has looked at the changes to the repl and is resoundingly
confident that they are correct.
Don't retry tasks when they fail due to a NotSerializableException
As with my previous pull request, this will be unit tested once the Cluster and Local schedulers get merged.
When a task fails, we need to call reviveOffers() so that the
task can be rescheduled on a different machine. In the current code,
the state in ClusterTaskSetManager indicating which tasks are
pending may be updated after revive offers is called (there's a
race condition here), so when revive offers is called, the task set
manager does not yet realize that there are failed tasks that need
to be relaunched.
When a task fails, we need to call reviveOffers() so that the
task can be rescheduled on a different machine. In the current code,
the state in ClusterTaskSetManager indicating which tasks are
pending may be updated after revive offers is called (there's a
race condition here), so when revive offers is called, the task set
manager does not yet realize that there are failed tasks that need
to be relaunched.
Don't ignore spark.cores.max when using Mesos Coarse mode
totalCoresAcquired is decremented but never incremented, causing Spark to effectively ignore spark.cores.max in coarse grained Mesos mode.
Migrate the daemon thread started by DAGScheduler to Akka actor
`DAGScheduler` adopts an event queue and a daemon thread polling the it to process events sent to a `DAGScheduler`. This is a classical actor use case. By migrating this thread to Akka actor, we may benefit from both cleaner code and better performance (context switching cost of Akka actor is much less than that of a native thread).
But things become a little complicated when taking existing test code into consideration.
Code in `DAGSchedulerSuite` is somewhat tightly coupled with `DAGScheduler`, and directly calls `DAGScheduler.processEvent` instead of posting event messages to `DAGScheduler`. To minimize code change, I chose to let the actor to delegate messages to `processEvent`. Maybe this doesn't follow conventional actor usage, but I tried to make it apparently correct.
Another tricky part is that, since `DAGScheduler` depends on the `ActorSystem` provided by its field `env`, `env` cannot be null. But the `dagScheduler` field created in `DAGSchedulerSuite.before` was given a null `env`. What's more, `BlockManager.blockIdsToBlockManagers` checks whether `env` is null to determine whether to run the production code or the test code (bad smell here, huh?). I went through all callers of `BlockManager.blockIdsToBlockManagers`, and made sure that if `env != null` holds, then `blockManagerMaster == null` must also hold. That's the logic behind `BlockManager.scala` [line 896](https://github.com/liancheng/incubator-spark/compare/dagscheduler-actor-refine?expand=1#diff-2b643ea78c1add0381754b1f47eec132L896).
At last, since `DAGScheduler` instances are always `start()`ed after creation, I removed the `start()` method, and starts the `eventProcessActor` within the constructor.
For now, this only adds MarshalSerializer, but it lays the groundwork
for other supporting custom serializers. Many of these mechanisms
can also be used to support deserialization of different data formats
sent by Java, such as data encoded by MsgPack.
This also fixes a bug in SparkContext.union().
Fix secure hdfs access for spark on yarn
https://github.com/apache/incubator-spark/pull/23 broke secure hdfs access. Not sure if it works with secure hdfs on standalone. Fixing it at least for spark on yarn.
The broadcasting of jobconf change also broke secure hdfs access as it didn't take into account things calling the getPartitions before sparkContext is initialized. The DAGScheduler does this as it tries to getShuffleMapStage.
This adds a metrics sink for graphite. The sink must
be configured with the host and port of a graphite node
and optionally may be configured with a prefix that will
be prepended to all metrics that are sent to graphite.
Include appId in executor cmd line args
add the appId back into the executor cmd line args.
I also made a pretty lame regression test, just to make sure it doesn't get dropped in the future. not sure it will run on the build server, though, b/c `ExecutorRunner.buildCommandSeq()` expects to be abel to run the scripts in `bin`.
Removed unused return value in SparkContext.runJob
Return type of this `runJob` version is `Unit`:
def runJob[T, U: ClassManifest](
rdd: RDD[T],
func: (TaskContext, Iterator[T]) => U,
partitions: Seq[Int],
allowLocal: Boolean,
resultHandler: (Int, U) => Unit) {
...
}
It's obviously unnecessary to "return" `result`.