SPARK-900 Use coarser grained naming for metrics
see SPARK-900 Use coarser grained naming for metrics.
Now the new metric name is formatted as {XXX.YYY.ZZZ.COUNTER_UNIT}, XXX.YYY.ZZZ represents the group name, which can group several metrics under the same Ganglia view.
Adding algorithm for implicit feedback data to ALS
This PR adds the commonly used "implicit feedack" variant to ALS.
The implementation is based in part on Mahout's implementation, which is in turn based on [Collaborative Filtering for Implicit Feedback Datasets](http://research.yahoo.com/pub/2433). It has been adapted for the blocked approach used in MLlib.
I have tested this implementation against the MovieLens 100k, 1m and 10m datasets, and confirmed that it produces the same RMSE score as Mahout, as well as my own port of Mahout's implicit ALS implementation to Spark (not that RMSE is necessarily the best metric to judge by for implicit feedback, but it provides a consistent metric for comparison).
It turned out to be more straightforward than I had thought to add this. The main additions are:
1. Adding `implicitPrefs` boolean flag and `alpha` parameter
2. Added the `computeYtY` method. In each least-squares step, the algorithm requires the computation of `YtY`, where `Y` is the {user, item} factor matrix. Since the factors are already block-distributed in an `RDD`, this is quite straightforward to compute but does add an extra operation over the explicit version (but only twice per iteration)
3. Finally the actual solve step in `updateBlock` boils down to:
* a multiplication of the `XtX` matrix by `alpha * rating`
* a multiplication of the `Xty` vector by `1 + alpha * rating`
* when solving for the factor vector, the implicit variant adds the `YtY` matrix to the LHS
4. Added `trainImplicit` methods in the `ALS` object
5. Added test cases for both Scala and Java - based on achieving a confidence-weighted RMSE score < 0.4 (this is taken from Mahout's test cases)
It would be great to get some feedback on this and have people test things out against some datasets (MovieLens and others and perhaps proprietary datasets) both locally and on a cluster if possible. I have not yet tested on a cluster but will try to do that soon.
I have tried to make things as efficient as possible but if there are potential improvements let me know.
The results of a run against ml-1m are below (note the vanilla RMSE scores will be very different from the explicit variant):
**INPUTS**
```
iterations=10
factors=10
lambda=0.01
alpha=1
implicitPrefs=true
```
**RESULTS**
```
Spark MLlib 0.8.0-SNAPSHOT
RMSE = 3.1544
Time: 24.834 sec
```
```
My own port of Mahout's ALS to Spark (updated to 0.8.0-SNAPSHOT)
RMSE = 3.1543
Time: 58.708 sec
```
```
Mahout 0.8
time ./factorize-movielens-1M.sh /path/to/ratings/ml-1m/ratings.dat
real 3m48.648s
user 6m39.254s
sys 0m14.505s
RMSE = 3.1539
```
Results of a run against ml-10m
```
Spark MLlib
RMSE = 3.1200
Time: 162.348 sec
```
```
Mahout 0.8
real 23m2.220s
user 43m39.185s
sys 0m25.316s
RMSE = 3.1187
```
Don't allocate Kryo buffers unless needed
I noticed that the Kryo serializer could be slower than the Java one by 2-3x on small shuffles because it spend a lot of time initializing Kryo Input and Output objects. This is because our default buffer size for them is very large. Since the serializer is often used on streams, I made the initialization lazy for that, and used a smaller buffer (auto-managed by Kryo) for input.
Fix inconsistent and incorrect log messages in shuffle read path
The user-facing messages generated by the CacheManager are currently wrong and somewhat misleading. This patch makes the messages more accurate. It also uses a consistent representation of the partition being fetched (`rdd_xx_yy`) so that it's easier for users to trace what is going on when reading logs.
Resolving package conflicts with hadoop 0.23.9
Hadoop 0.23.9 is having a package conflict with easymock's dependencies.
(cherry picked from commit 023e3fdf00)
Signed-off-by: Reynold Xin <rxin@apache.org>
The problem was with the way the EdgeTripletRDD iterator worked. Calling
toList on it returned the last value repeatedly. Fixed by overriding
toList in the iterator.
merge in remaining changes from `branch-0.8`
This merges in the following changes from `branch-0.8`:
- The scala version is included in the published maven artifact names
- A unit tests which had non-deterministic failures is ignored (see SPARK-908)
- A minor documentation change shows the short version instead of the full version
- Moving the kafka jar to be "provided"
- Changing the default spark ec2 version.
- Some spacing changes caused by Maven's release plugin
Note that I've squashed this into a single commit rather than pull in the branch-0.8 history. There are a bunch of release/revert commits there that make the history super ugly.
Allow users to pass broadcasted Configurations and cache InputFormats across Hadoop file reads.
Note: originally from https://github.com/mesos/spark/pull/942
Currently motivated by Shark queries on Hive-partitioned tables, where there's a JobConf broadcast for every Hive-partition (i.e., every subdirectory read). The only thing different about those JobConfs is the input path - the Hadoop Configuration that the JobConfs are constructed from remain the same.
This PR only modifies the old Hadoop API RDDs, but similar additions to the new API might reduce computation latencies a little bit for high-frequency FileInputDStreams (which only uses the new API right now).
As a small bonus, added InputFormats caching, to avoid reflection calls for every RDD#compute().
Few other notes:
Added a general soft-reference hashmap in SparkHadoopUtil because I wanted to avoid adding another class to SparkEnv.
SparkContext default hadoopConfiguration isn't cached. There's no equals() method for Configuration, so there isn't a good way to determine when configuration properties have changed.