- ShuffleBlocks has been removed and replaced by ShuffleWriterGroup.
- ShuffleWriterGroup no longer contains a reference to a ShuffleFileGroup.
- ShuffleFile has been removed and its contents are now within ShuffleFileGroup.
- ShuffleBlockManager.forShuffle has been replaced by a more stateful forMapTask.
For some reason, even calling
java.nio.Files.createTempDirectory().getFile.deleteOnExit()
does not delete the directory on exit. Guava's analagous function
seems to work, however.
Overhead of each shuffle block for consolidation has been reduced from >300 bytes
to 8 bytes (1 primitive Long). Verified via profiler testing with 1 mil shuffle blocks,
net overhead was ~8,400,000 bytes.
Despite the memory-optimized implementation incurring extra CPU overhead, the runtime
of the shuffle phase in this test was only around 2% slower, while the reduce phase
was 40% faster, when compared to not using any shuffle file consolidation.
Fast, memory-efficient hash set, hash table implementations optimized for primitive data types.
This pull request adds two hash table implementations optimized for primitive data types. For primitive types, the new hash tables are much faster than the current Spark AppendOnlyMap (3X faster - note that the current AppendOnlyMap is already much better than the Java map) while uses much less space (1/4 of the space).
Details:
This PR first adds a open hash set implementation (OpenHashSet) optimized for primitive types (using Scala's specialization feature). This OpenHashSet is designed to serve as building blocks for more advanced structures. It is currently used to build the following two hash tables, but can be used in the future to build multi-valued hash tables as well (GraphX has this use case). Note that there are some peculiarities in the code for working around some Scala compiler bugs.
Building on top of OpenHashSet, this PR adds two different hash tables implementations:
1. OpenHashSet: for nullable keys, optional specialization for primitive values
2. PrimitiveKeyOpenHashMap: for primitive keys that are not nullable, and optional specialization for primitive values
I tested the update speed of these two implementations using the changeValue function (which is what Aggregator and cogroup would use). Runtime relative to AppendOnlyMap for inserting 10 million items:
Int to Int: ~30%
java.lang.Integer to java.lang.Integer: ~100%
Int to java.lang.Integer: ~50%
java.lang.Integer to Int: ~85%
Document & finish support for local: URIs
Review all the supported URI schemes for addJar / addFile to the Cluster Overview page.
Add support for local: URI to addFile.
Large parts of the VertexSetRDD were restructured to take advantage of:
1) the OpenHashSet as an index map
2) view based lazy mapValues and mapValuesWithVertices
3) the cogroup code is currently disabled (since it is not used in any of the tests)
The GraphImpl was updated to also use the OpenHashSet and PrimitiveOpenHashMap
wherever possible:
1) the LocalVidMaps (used to track replicated vertices) are now implemented
using the OpenHashSet
2) an OpenHashMap is temporarily constructed to combine the local OpenHashSet
with the local (replicated) vertex attribute arrays
3) because the OpenHashSet constructor grabs a class manifest all operations
that construct OpenHashSets have been moved to the GraphImpl Singleton to prevent
implicit variable capture within closures.
Handle ConcurrentModificationExceptions in SparkContext init.
System.getProperties.toMap will fail-fast when concurrently modified,
and it seems like some other thread started by SparkContext does
a System.setProperty during it's initialization.
Handle this by just looping on ConcurrentModificationException, which
seems the safest, since the non-fail-fast methods (Hastable.entrySet)
have undefined behavior under concurrent modification.