Commit graph

4571 commits

Author SHA1 Message Date
Raymond Liu 0f2e3c6e31 Merge branch 'master' into scala-2.10 2013-11-13 16:55:11 +08:00
Reynold Xin 3d4ad84b63 Merge pull request #148 from squito/include_appId
Include appId in executor cmd line args

add the appId back into the executor cmd line args.

I also made a pretty lame regression test, just to make sure it doesn't get dropped in the future.  not sure it will run on the build server, though, b/c `ExecutorRunner.buildCommandSeq()` expects to be abel to run the scripts in `bin`.
2013-11-07 11:08:27 -08:00
Imran Rashid ca66f5d5a2 fix formatting 2013-11-07 07:23:59 -06:00
Imran Rashid 8d3cdda9a2 very basic regression test to make sure appId doesnt get dropped in future 2013-11-07 01:35:48 -06:00
Reynold Xin be7e8da98a Merge pull request #23 from jerryshao/multi-user
Add Spark multi-user support for standalone mode and Mesos

This PR add multi-user support for Spark both standalone mode and Mesos (coarse and fine grained ) mode, user can specify the user name who submit app through environment variable `SPARK_USER` or use default one. Executor will communicate with Hadoop using  specified user name.

Also I fixed one bug in JobLogger when different user wrote job log to specified folder which has no right file  permission.

I separate previous [PR750](https://github.com/mesos/spark/pull/750) into two PRs, in this PR I only solve multi-user support problem. I will try to solve security auth problem in subsequent PR because security auth is a complicated problem especially for Shark Server like long-run app (both Kerberos TGT and HDFS delegation token should be renewed or re-created through app's run time).
2013-11-06 23:22:47 -08:00
Imran Rashid 36e832bff0 include the appid in the cmd line arguments to Executors 2013-11-07 01:11:49 -06:00
jerryshao 12dc385a49 Add Spark multi-user support for standalone mode and Mesos 2013-11-07 11:18:09 +08:00
Reynold Xin aadeda5e76 Merge pull request #144 from liancheng/runjob-clean
Removed unused return value in SparkContext.runJob

Return type of this `runJob` version is `Unit`:

    def runJob[T, U: ClassManifest](
        rdd: RDD[T],
        func: (TaskContext, Iterator[T]) => U,
        partitions: Seq[Int],
        allowLocal: Boolean,
        resultHandler: (Int, U) => Unit) {
        ...
    }

It's obviously unnecessary to "return" `result`.
2013-11-06 13:27:47 -08:00
Reynold Xin 951024feea Merge pull request #145 from aarondav/sls-fix
Attempt to fix SparkListenerSuite breakage

Could not reproduce locally, but this test could've been flaky if the build machine was too fast, due to typo. (index 0 is intentionally slowed down to ensure total time is >= 1 ms)

This should be merged into branch-0.8 as well.
2013-11-06 09:36:14 -08:00
Aaron Davidson 80e98d2bd7 Attempt to fix SparkListenerSuite breakage
Could not reproduce locally, but this test could've been flaky if the
build machine was too fast.
2013-11-06 08:03:35 -08:00
Lian, Cheng a0c4565183 Removed unused return value in SparkContext.runJob 2013-11-06 23:18:59 +08:00
Reynold Xin bf4e6131cc Merge pull request #143 from rxin/scheduler-hang
Ignore a task update status if the executor doesn't exist anymore.

Otherwise if the scheduler receives a task update message when the executor's been removed, the scheduler would hang.

It is pretty hard to add unit tests for these right now because it is hard to mock the cluster scheduler. We should do that once @kayousterhout finishes merging the local scheduler and the cluster scheduler.
2013-11-05 23:14:09 -08:00
Reynold Xin a02eed6811 Ignore a task update status if the executor doesn't exist anymore. 2013-11-05 18:46:38 -08:00
Reynold Xin 9f7b9bb1cd Merge pull request #142 from liancheng/dagscheduler-pattern-matching
Using case class deep match to simplify code in DAGScheduler.processEvent

Since all `XxxEvent` pushed in `DAGScheduler.eventQueue` are case classes, deep pattern matching is more convenient to extract event object components.
2013-11-05 10:42:19 -08:00
Lian, Cheng 8b4c994e8c Using compact case class pattern matching syntax to simplify code in DAGScheduler.processEvent 2013-11-05 17:18:42 +08:00
Reynold Xin 81065321c0 Merge pull request #139 from aarondav/shuffle-next
Never store shuffle blocks in BlockManager

After the BlockId refactor (PR #114), it became very clear that ShuffleBlocks are of no use
within BlockManager (they had a no-arg constructor!). This patch completely eliminates
them, saving us around 100-150 bytes per shuffle block.
The total, system-wide overhead per shuffle block is now a flat 8 bytes, excluding
state saved by the MapOutputTracker.

Note: This should *not* be merged directly into 0.8.0 -- see #138
2013-11-04 20:47:14 -08:00
Aaron Davidson 93c90844cb Never store shuffle blocks in BlockManager
After the BlockId refactor (PR #114), it became very clear that ShuffleBlocks are of no use
within BlockManager (they had a no-arg constructor!). This patch completely eliminates
them, saving us around 100-150 bytes per shuffle block.
The total, system-wide overhead per shuffle block is now a flat 8 bytes, excluding
state saved by the MapOutputTracker.
2013-11-04 18:43:42 -08:00
Reynold Xin 0b26a392df Merge pull request #128 from shimingfei/joblogger-doc
add javadoc to JobLogger, and some small fix

against Spark-941

add javadoc to JobLogger, output more info for RDD, modify recordStageDepGraph to avoid output duplicate stage dependency information

(cherry picked from commit 518cf22eb2)
Signed-off-by: Reynold Xin <rxin@apache.org>
2013-11-04 18:22:06 -08:00
Reynold Xin 7a26104ab7 Merge pull request #130 from aarondav/shuffle
Memory-optimized shuffle file consolidation

Reduces overhead of each shuffle block for consolidation from >300 bytes to 8 bytes (1 primitive Long). Verified via profiler testing with 1 mil shuffle blocks, net overhead was ~8,400,000 bytes.

Despite the memory-optimized implementation incurring extra CPU overhead, the runtime of the shuffle phase in this test was only around 2% slower, while the reduce phase was 40% faster, when compared to not using any shuffle file consolidation.

This is accomplished by replacing the map from ShuffleBlockId to FileSegment (i.e., block id to where it's located), which had high overhead due to being a gigantic, timestamped, concurrent map with a more space-efficient structure. Namely, the following are introduced (I have omitted the word "Shuffle" from some names for clarity):
**ShuffleFile** - there is one ShuffleFile per consolidated shuffle file on disk. We store an array of offsets into the physical shuffle file for each ShuffleMapTask that wrote into the file. This is sufficient to reconstruct FileSegments for mappers that are in the file.
**FileGroup** - contains a set of ShuffleFiles, one per reducer, that a MapTask can use to write its output. There is one FileGroup created per _concurrent_ MapTask. The FileGroup contains an array of the mapIds that have been written to all files in the group. The positions of elements in this array map directly onto the positions in each ShuffleFile's offsets array.

In order to locate the FileSegment associated with a BlockId, we have another structure which maps each reducer to the set of ShuffleFiles that were created for it. (There will be as many ShuffleFiles per reducer as there are FileGroups.) To lookup a given ShuffleBlockId (shuffleId, reducerId, mapId), we thus search through all ShuffleFiles associated with that reducer.

As a time optimization, we ensure that FileGroups are only reused for MapTasks with monotonically increasing mapIds. This allows us to perform a binary search to locate a mapId inside a group, and also enables potential future optimization (based on the usual monotonic access order).
2013-11-04 17:54:06 -08:00
Aaron Davidson 1ba11b1c6a Minor cleanup in ShuffleBlockManager 2013-11-04 17:16:41 -08:00
Aaron Davidson 6201e5e249 Refactor ShuffleBlockManager to reduce public interface
- ShuffleBlocks has been removed and replaced by ShuffleWriterGroup.
- ShuffleWriterGroup no longer contains a reference to a ShuffleFileGroup.
- ShuffleFile has been removed and its contents are now within ShuffleFileGroup.
- ShuffleBlockManager.forShuffle has been replaced by a more stateful forMapTask.
2013-11-04 09:41:04 -08:00
Aaron Davidson b0cf19fe3c Add javadoc and remove unused code 2013-11-03 22:16:58 -08:00
Aaron Davidson 39d93ed4b9 Clean up test files properly
For some reason, even calling
java.nio.Files.createTempDirectory().getFile.deleteOnExit()
does not delete the directory on exit. Guava's analagous function
seems to work, however.
2013-11-03 21:52:59 -08:00
Aaron Davidson a0bb569a81 use OpenHashMap, remove monotonicity requirement, fix failure bug 2013-11-03 21:34:56 -08:00
Aaron Davidson 8703898d3f Address Reynold's comments 2013-11-03 21:34:44 -08:00
Aaron Davidson 3ca52309f2 Fix test breakage 2013-11-03 21:34:44 -08:00
Aaron Davidson 1592adfa25 Add documentation and address other comments 2013-11-03 21:34:44 -08:00
Aaron Davidson 7d44dec9bd Fix weird bug with specialized PrimitiveVector 2013-11-03 21:34:43 -08:00
Aaron Davidson 7453f31181 Address minor comments 2013-11-03 21:34:43 -08:00
Aaron Davidson 84991a1b91 Memory-optimized shuffle file consolidation
Overhead of each shuffle block for consolidation has been reduced from >300 bytes
to 8 bytes (1 primitive Long). Verified via profiler testing with 1 mil shuffle blocks,
net overhead was ~8,400,000 bytes.

Despite the memory-optimized implementation incurring extra CPU overhead, the runtime
of the shuffle phase in this test was only around 2% slower, while the reduce phase
was 40% faster, when compared to not using any shuffle file consolidation.
2013-11-03 21:34:13 -08:00
Reynold Xin b5dc3393a5 Merge pull request #70 from rxin/hash1
Fast, memory-efficient hash set, hash table implementations optimized for primitive data types.

This pull request adds two hash table implementations optimized for primitive data types. For primitive types, the new hash tables are much faster than the current Spark AppendOnlyMap (3X faster - note that the current AppendOnlyMap is already much better than the Java map) while uses much less space (1/4 of the space).

Details:

This PR first adds a open hash set implementation (OpenHashSet) optimized for primitive types (using Scala's specialization feature). This OpenHashSet is designed to serve as building blocks for more advanced structures. It is currently used to build the following two hash tables, but can be used in the future to build multi-valued hash tables as well (GraphX has this use case). Note that there are some peculiarities in the code for working around some Scala compiler bugs.

Building on top of OpenHashSet, this PR adds two different hash tables implementations:
1. OpenHashSet: for nullable keys, optional specialization for primitive values
2. PrimitiveKeyOpenHashMap: for primitive keys that are not nullable, and optional specialization for primitive values

I tested the update speed of these two implementations using the changeValue function (which is what Aggregator and cogroup would use). Runtime relative to AppendOnlyMap for inserting 10 million items:

Int to Int: ~30%
java.lang.Integer to java.lang.Integer: ~100%
Int to java.lang.Integer: ~50%
java.lang.Integer to Int: ~85%
2013-11-03 20:43:15 -08:00
Reynold Xin eb5f8a3f97 Code review feedback. 2013-11-03 18:11:44 -08:00
Reynold Xin 1e9543b567 Fixed a bug that uses twice amount of memory for the primitive arrays due to a scala compiler bug.
Also addressed Matei's code review comment.
2013-11-02 23:19:01 -07:00
Reynold Xin da6bb0aedd Merge branch 'master' into hash1 2013-11-02 22:45:15 -07:00
Reynold Xin 41ead7a745 Merge pull request #133 from Mistobaan/link_fix
update default github
2013-11-02 14:41:50 -07:00
Reynold Xin d407c0732a Merge pull request #134 from rxin/readme
Fixed a typo in Hadoop version in README.
2013-11-02 14:36:37 -07:00
Reynold Xin 895747bb05 Fixed a typo in Hadoop version in README. 2013-11-02 12:58:44 -07:00
Fabrizio (Misto) Milo 4b5d61f31f update default github 2013-11-01 18:41:49 -07:00
Reynold Xin e7c7b804b5 Merge pull request #132 from Mistobaan/doc_fix
fix persistent-hdfs
2013-11-01 17:58:10 -07:00
Fabrizio (Misto) Milo 3f89354c45 fix persistent-hdfs 2013-11-01 17:47:37 -07:00
Matei Zaharia d6d11c2edb Merge pull request #129 from velvia/2013-11/document-local-uris
Document & finish support for local: URIs

Review all the supported URI schemes for addJar / addFile to the Cluster Overview page.
Add support for local: URI to addFile.
2013-11-01 15:40:33 -07:00
Evan Chan f3679fd494 Add local: URI support to addFile as well 2013-11-01 11:08:03 -07:00
Evan Chan e54a37fe15 Document all the URIs for addJar/addFile 2013-11-01 10:58:11 -07:00
Matei Zaharia 8f1098a3f0 Merge pull request #117 from stephenh/avoid_concurrent_modification_exception
Handle ConcurrentModificationExceptions in SparkContext init.

System.getProperties.toMap will fail-fast when concurrently modified,
and it seems like some other thread started by SparkContext does
a System.setProperty during it's initialization.

Handle this by just looping on ConcurrentModificationException, which
seems the safest, since the non-fail-fast methods (Hastable.entrySet)
have undefined behavior under concurrent modification.
2013-10-30 20:11:48 -07:00
Matei Zaharia dc9ce16f6b Merge pull request #126 from kayousterhout/local_fix
Fixed incorrect log message in local scheduler

This change is especially relevant at the moment, because some users are seeing this failure, and the log message is misleading/incorrect (because for the tests, the max failures is set to 0, not 4)
2013-10-30 17:01:56 -07:00
Matei Zaharia 33de11c51d Merge pull request #124 from tgravescs/sparkHadoopUtilFix
Pull SparkHadoopUtil out of SparkEnv (jira SPARK-886)

Having the logic to initialize the correct SparkHadoopUtil in SparkEnv prevents it from being used until after the SparkContext is initialized.   This causes issues like https://spark-project.atlassian.net/browse/SPARK-886.  It also makes it hard to use in singleton objects.  For instance I want to use it in the security code.
2013-10-30 16:58:27 -07:00
Kay Ousterhout ff038eb4e0 Fixed incorrect log message in local scheduler 2013-10-30 15:27:23 -07:00
Matei Zaharia 618c1f6cf3 Merge pull request #125 from velvia/2013-10/local-jar-uri
Add support for local:// URI scheme for addJars()

This PR adds support for a new URI scheme for SparkContext.addJars():  `local://file/path`.
The *local* scheme indicates that the `/file/path` exists on every worker node.    The reason for its existence is for big library JARs, which would be really expensive to serve using the standard HTTP fileserver distribution method, especially for big clusters.  Today the only inexpensive method (assuming such a file is on every host, via say NFS, rsync, etc.) of doing this is to add the JAR to the SPARK_CLASSPATH, but we want a method where the user does not need to modify the Spark configuration.

I would add something to the docs, but it's not obvious where to add it.

Oh, and it would be great if this could be merged in time for 0.8.1.
2013-10-30 12:03:44 -07:00
Stephen Haberman 09f3b677cb Avoid match errors when filtering for spark.hadoop settings. 2013-10-30 12:29:39 -05:00
tgravescs f231aaa24c move the hadoopJobMetadata back into SparkEnv 2013-10-30 11:46:12 -05:00