Refactor BlockId into an actual type
Converts all of our BlockId strings into actual BlockId types. Here are some advantages of doing this now:
+ Type safety
+ Code clarity - it's now obvious what the key of a shuffle or rdd block is, for instance. Additionally, appearing in tuple/map type signatures is a big readability bonus. A Seq[(String, BlockStatus)] is not very clear. Further, we can now use more Scala features, like matching on BlockId types.
+ Explicit usage - we can now formally tell where various BlockIds are being used (without doing string searches); this makes updating current BlockIds a much clearer process, and compiler-supported.
(I'm looking at you, shuffle file consolidation.)
+ It will only get harder to make this change as time goes on.
Downside is, of course, that this is a very invasive change touching a lot of different files, which will inevitably lead to merge conflicts for many.
This is an unfortunately invasive change which converts all of our BlockId
strings into actual BlockId types. Here are some advantages of doing this now:
+ Type safety
+ Code clarity - it's now obvious what the key of a shuffle or rdd block is,
for instance. Additionally, appearing in tuple/map type signatures is a big
readability bonus. A Seq[(String, BlockStatus)] is not very clear.
Further, we can now use more Scala features, like matching on BlockId types.
+ Explicit usage - we can now formally tell where various BlockIds are being used
(without doing string searches); this makes updating current BlockIds a much
clearer process, and compiler-supported.
(I'm looking at you, shuffle file consolidation.)
+ It will only get harder to make this change as time goes on.
Since this touches a lot of files, it'd be best to either get this patch
in quickly or throw it on the ground to avoid too many secondary merge conflicts.
Add an optional closure parameter to HadoopRDD instantiation to use when creating local JobConfs.
Having HadoopRDD accept this optional closure eliminates the need for the HadoopFileRDD added earlier. It makes the HadoopRDD more general, in that the caller can specify any JobConf initialization flow.
Address review comments, move to incubator spark
Also includes a small fix to speculative execution.
<edit> Continued from https://github.com/mesos/spark/pull/914 </edit>
Standalone Scheduler fault tolerance using ZooKeeper
This patch implements full distributed fault tolerance for standalone scheduler Masters.
There is only one master Leader at a time, which is actively serving scheduling
requests. If this Leader crashes, another master will eventually be elected, reconstruct
the state from the first Master, and continue serving scheduling requests.
Leader election is performed using the ZooKeeper leader election pattern. We try to minimize
the use of ZooKeeper and the assumptions about ZooKeeper's behavior, so there is a layer of
retries and session monitoring on top of the ZooKeeper client.
Master failover follows directly from the single-node Master recovery via the file
system (patch d5a96fe), save that the Master state is stored in ZooKeeper instead.
Configuration:
By default, no recovery mechanism is enabled (spark.deploy.recoveryMode = NONE).
By setting spark.deploy.recoveryMode to ZOOKEEPER and setting spark.deploy.zookeeper.url
to an appropriate ZooKeeper URL, ZooKeeper recovery mode is enabled.
By setting spark.deploy.recoveryMode to FILESYSTEM and setting spark.deploy.recoveryDirectory
to an appropriate directory accessible by the Master, we will keep the behavior of from d5a96fe.
Additionally, places where a Master could be specificied by a spark:// url can now take
comma-delimited lists to specify backup masters. Note that this is only used for registration
of NEW Workers and application Clients. Once a Worker or Client has registered with the
Master Leader, it is "in the system" and will never need to register again.
A fast and low-memory append-only map for shuffle operations
This is a continuation of the old repo's pull request https://github.com/mesos/spark/pull/823 to add a more efficient hashmap class for shuffles. I've optimized and tested this more thoroughly now so I think it's good to go. I've also addressed some of the comments that were outstanding there.
The idea is to reduce the cost of shuffles by taking advantage of the properties their hashmaps need. In particular, the hashmaps there are append-only, and a common operation is updating a key's value based on the old value. The included AppendOnlyMap class uses open hashing to use less space than Java's (by not having a linked list per bucket), does not support deletes, and has a changeValue operation to update a key in place without following the hash chain twice. In micro-benchmarks against java.util.HashMap and scala.collection.mutable.HashMap, this is 20-30% smaller and 10-40% faster depending on the number and type of keys. It's also noticeably faster than fastutil's Object2ObjectOpenHashMap.
I've also tested this in Spark apps now. While the speed gain is modest (partly due to other overheads, like serialization), there is some, and I think the lower memory usage is worth it. Here's one example where the speedup is most noticeable, in spark-shell on local mode:
```
scala> val nums = sc.parallelize(1 to 8).flatMap(x => (1 to 5e6.toInt)).cache
scala> nums.count
scala> def time(x: => Unit) = { val now = System.currentTimeMillis; x; System.currentTimeMillis - now }
scala> (1 to 8).map(_ => time(nums.map(x => (x % 100000, x)).reduceByKey(_ + _).count) / 1000.0)
```
This prints the following times before and after this change:
```
Before: Vector(4.368, 2.635, 2.549, 2.522, 2.233, 2.222, 2.214, 2.195)
After: Vector(3.588, 1.741, 1.706, 1.648, 1.777, 1.81, 1.776, 1.731)
```
I've also run the spark-perf suite, enhanced with some tests that use Ints (https://github.com/amplab/spark-perf/pull/9), and it shows some speedup on those, but less on the string ones (presumably due to existing overhead): https://gist.github.com/mateiz/6897121.
Use standard abbreviation in metrics description (MBytes -> MB)
This is a small change - older commits are shown here because Github hasn't sync'ed yet with apache.
SPARK-705: implement sortByKey() in PySpark
This PR contains the implementation of a RangePartitioner in Python and uses its partition ID's to get a global sort in PySpark.
SPARK-900 Use coarser grained naming for metrics
see SPARK-900 Use coarser grained naming for metrics.
Now the new metric name is formatted as {XXX.YYY.ZZZ.COUNTER_UNIT}, XXX.YYY.ZZZ represents the group name, which can group several metrics under the same Ganglia view.
Adding algorithm for implicit feedback data to ALS
This PR adds the commonly used "implicit feedack" variant to ALS.
The implementation is based in part on Mahout's implementation, which is in turn based on [Collaborative Filtering for Implicit Feedback Datasets](http://research.yahoo.com/pub/2433). It has been adapted for the blocked approach used in MLlib.
I have tested this implementation against the MovieLens 100k, 1m and 10m datasets, and confirmed that it produces the same RMSE score as Mahout, as well as my own port of Mahout's implicit ALS implementation to Spark (not that RMSE is necessarily the best metric to judge by for implicit feedback, but it provides a consistent metric for comparison).
It turned out to be more straightforward than I had thought to add this. The main additions are:
1. Adding `implicitPrefs` boolean flag and `alpha` parameter
2. Added the `computeYtY` method. In each least-squares step, the algorithm requires the computation of `YtY`, where `Y` is the {user, item} factor matrix. Since the factors are already block-distributed in an `RDD`, this is quite straightforward to compute but does add an extra operation over the explicit version (but only twice per iteration)
3. Finally the actual solve step in `updateBlock` boils down to:
* a multiplication of the `XtX` matrix by `alpha * rating`
* a multiplication of the `Xty` vector by `1 + alpha * rating`
* when solving for the factor vector, the implicit variant adds the `YtY` matrix to the LHS
4. Added `trainImplicit` methods in the `ALS` object
5. Added test cases for both Scala and Java - based on achieving a confidence-weighted RMSE score < 0.4 (this is taken from Mahout's test cases)
It would be great to get some feedback on this and have people test things out against some datasets (MovieLens and others and perhaps proprietary datasets) both locally and on a cluster if possible. I have not yet tested on a cluster but will try to do that soon.
I have tried to make things as efficient as possible but if there are potential improvements let me know.
The results of a run against ml-1m are below (note the vanilla RMSE scores will be very different from the explicit variant):
**INPUTS**
```
iterations=10
factors=10
lambda=0.01
alpha=1
implicitPrefs=true
```
**RESULTS**
```
Spark MLlib 0.8.0-SNAPSHOT
RMSE = 3.1544
Time: 24.834 sec
```
```
My own port of Mahout's ALS to Spark (updated to 0.8.0-SNAPSHOT)
RMSE = 3.1543
Time: 58.708 sec
```
```
Mahout 0.8
time ./factorize-movielens-1M.sh /path/to/ratings/ml-1m/ratings.dat
real 3m48.648s
user 6m39.254s
sys 0m14.505s
RMSE = 3.1539
```
Results of a run against ml-10m
```
Spark MLlib
RMSE = 3.1200
Time: 162.348 sec
```
```
Mahout 0.8
real 23m2.220s
user 43m39.185s
sys 0m25.316s
RMSE = 3.1187
```
Don't allocate Kryo buffers unless needed
I noticed that the Kryo serializer could be slower than the Java one by 2-3x on small shuffles because it spend a lot of time initializing Kryo Input and Output objects. This is because our default buffer size for them is very large. Since the serializer is often used on streams, I made the initialization lazy for that, and used a smaller buffer (auto-managed by Kryo) for input.
Fix inconsistent and incorrect log messages in shuffle read path
The user-facing messages generated by the CacheManager are currently wrong and somewhat misleading. This patch makes the messages more accurate. It also uses a consistent representation of the partition being fetched (`rdd_xx_yy`) so that it's easier for users to trace what is going on when reading logs.