Adding algorithm for implicit feedback data to ALS
This commit is contained in:
parent
a106ed8b97
commit
737f01a1ef
|
@ -21,7 +21,8 @@ import scala.collection.mutable.{ArrayBuffer, BitSet}
|
||||||
import scala.util.Random
|
import scala.util.Random
|
||||||
import scala.util.Sorting
|
import scala.util.Sorting
|
||||||
|
|
||||||
import org.apache.spark.{HashPartitioner, Partitioner, SparkContext}
|
import org.apache.spark.broadcast.Broadcast
|
||||||
|
import org.apache.spark.{Logging, HashPartitioner, Partitioner, SparkContext}
|
||||||
import org.apache.spark.storage.StorageLevel
|
import org.apache.spark.storage.StorageLevel
|
||||||
import org.apache.spark.rdd.RDD
|
import org.apache.spark.rdd.RDD
|
||||||
import org.apache.spark.serializer.KryoRegistrator
|
import org.apache.spark.serializer.KryoRegistrator
|
||||||
|
@ -61,6 +62,12 @@ case class Rating(val user: Int, val product: Int, val rating: Double)
|
||||||
/**
|
/**
|
||||||
* Alternating Least Squares matrix factorization.
|
* Alternating Least Squares matrix factorization.
|
||||||
*
|
*
|
||||||
|
* ALS attempts to estimate the ratings matrix `R` as the product of two lower-rank matrices,
|
||||||
|
* `X` and `Y`, i.e. `Xt * Y = R`. Typically these approximations are called 'factor' matrices.
|
||||||
|
* The general approach is iterative. During each iteration, one of the factor matrices is held
|
||||||
|
* constant, while the other is solved for using least squares. The newly-solved factor matrix is
|
||||||
|
* then held constant while solving for the other factor matrix.
|
||||||
|
*
|
||||||
* This is a blocked implementation of the ALS factorization algorithm that groups the two sets
|
* This is a blocked implementation of the ALS factorization algorithm that groups the two sets
|
||||||
* of factors (referred to as "users" and "products") into blocks and reduces communication by only
|
* of factors (referred to as "users" and "products") into blocks and reduces communication by only
|
||||||
* sending one copy of each user vector to each product block on each iteration, and only for the
|
* sending one copy of each user vector to each product block on each iteration, and only for the
|
||||||
|
@ -70,11 +77,21 @@ case class Rating(val user: Int, val product: Int, val rating: Double)
|
||||||
* vectors it receives from each user block it will depend on). This allows us to send only an
|
* vectors it receives from each user block it will depend on). This allows us to send only an
|
||||||
* array of feature vectors between each user block and product block, and have the product block
|
* array of feature vectors between each user block and product block, and have the product block
|
||||||
* find the users' ratings and update the products based on these messages.
|
* find the users' ratings and update the products based on these messages.
|
||||||
|
*
|
||||||
|
* For implicit preference data, the algorithm used is based on
|
||||||
|
* "Collaborative Filtering for Implicit Feedback Datasets", available at
|
||||||
|
* [[http://research.yahoo.com/pub/2433]], adapted for the blocked approach used here.
|
||||||
|
*
|
||||||
|
* Essentially instead of finding the low-rank approximations to the rating matrix `R`,
|
||||||
|
* this finds the approximations for a preference matrix `P` where the elements of `P` are 1 if r > 0
|
||||||
|
* and 0 if r = 0. The ratings then act as 'confidence' values related to strength of indicated user
|
||||||
|
* preferences rather than explicit ratings given to items.
|
||||||
*/
|
*/
|
||||||
class ALS private (var numBlocks: Int, var rank: Int, var iterations: Int, var lambda: Double)
|
class ALS private (var numBlocks: Int, var rank: Int, var iterations: Int, var lambda: Double,
|
||||||
extends Serializable
|
var implicitPrefs: Boolean, var alpha: Double)
|
||||||
|
extends Serializable with Logging
|
||||||
{
|
{
|
||||||
def this() = this(-1, 10, 10, 0.01)
|
def this() = this(-1, 10, 10, 0.01, false, 1.0)
|
||||||
|
|
||||||
/**
|
/**
|
||||||
* Set the number of blocks to parallelize the computation into; pass -1 for an auto-configured
|
* Set the number of blocks to parallelize the computation into; pass -1 for an auto-configured
|
||||||
|
@ -103,6 +120,16 @@ class ALS private (var numBlocks: Int, var rank: Int, var iterations: Int, var l
|
||||||
this
|
this
|
||||||
}
|
}
|
||||||
|
|
||||||
|
def setImplicitPrefs(implicitPrefs: Boolean): ALS = {
|
||||||
|
this.implicitPrefs = implicitPrefs
|
||||||
|
this
|
||||||
|
}
|
||||||
|
|
||||||
|
def setAlpha(alpha: Double): ALS = {
|
||||||
|
this.alpha = alpha
|
||||||
|
this
|
||||||
|
}
|
||||||
|
|
||||||
/**
|
/**
|
||||||
* Run ALS with the configured parameters on an input RDD of (user, product, rating) triples.
|
* Run ALS with the configured parameters on an input RDD of (user, product, rating) triples.
|
||||||
* Returns a MatrixFactorizationModel with feature vectors for each user and product.
|
* Returns a MatrixFactorizationModel with feature vectors for each user and product.
|
||||||
|
@ -147,19 +174,24 @@ class ALS private (var numBlocks: Int, var rank: Int, var iterations: Int, var l
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
for (iter <- 0 until iterations) {
|
for (iter <- 1 to iterations) {
|
||||||
// perform ALS update
|
// perform ALS update
|
||||||
products = updateFeatures(users, userOutLinks, productInLinks, partitioner, rank, lambda)
|
logInfo("Re-computing I given U (Iteration %d/%d)".format(iter, iterations))
|
||||||
users = updateFeatures(products, productOutLinks, userInLinks, partitioner, rank, lambda)
|
// YtY / XtX is an Option[DoubleMatrix] and is only required for the implicit feedback model
|
||||||
|
val YtY = computeYtY(users)
|
||||||
|
val YtYb = ratings.context.broadcast(YtY)
|
||||||
|
products = updateFeatures(users, userOutLinks, productInLinks, partitioner, rank, lambda,
|
||||||
|
alpha, YtYb)
|
||||||
|
logInfo("Re-computing U given I (Iteration %d/%d)".format(iter, iterations))
|
||||||
|
val XtX = computeYtY(products)
|
||||||
|
val XtXb = ratings.context.broadcast(XtX)
|
||||||
|
users = updateFeatures(products, productOutLinks, userInLinks, partitioner, rank, lambda,
|
||||||
|
alpha, XtXb)
|
||||||
}
|
}
|
||||||
|
|
||||||
// Flatten and cache the two final RDDs to un-block them
|
// Flatten and cache the two final RDDs to un-block them
|
||||||
val usersOut = users.join(userOutLinks).flatMap { case (b, (factors, outLinkBlock)) =>
|
val usersOut = unblockFactors(users, userOutLinks)
|
||||||
for (i <- 0 until factors.length) yield (outLinkBlock.elementIds(i), factors(i))
|
val productsOut = unblockFactors(products, productOutLinks)
|
||||||
}
|
|
||||||
val productsOut = products.join(productOutLinks).flatMap { case (b, (factors, outLinkBlock)) =>
|
|
||||||
for (i <- 0 until factors.length) yield (outLinkBlock.elementIds(i), factors(i))
|
|
||||||
}
|
|
||||||
|
|
||||||
usersOut.persist()
|
usersOut.persist()
|
||||||
productsOut.persist()
|
productsOut.persist()
|
||||||
|
@ -167,6 +199,41 @@ class ALS private (var numBlocks: Int, var rank: Int, var iterations: Int, var l
|
||||||
new MatrixFactorizationModel(rank, usersOut, productsOut)
|
new MatrixFactorizationModel(rank, usersOut, productsOut)
|
||||||
}
|
}
|
||||||
|
|
||||||
|
/**
|
||||||
|
* Computes the (`rank x rank`) matrix `YtY`, where `Y` is the (`nui x rank`) matrix of factors
|
||||||
|
* for each user (or product), in a distributed fashion. Here `reduceByKeyLocally` is used as
|
||||||
|
* the driver program requires `YtY` to broadcast it to the slaves
|
||||||
|
* @param factors the (block-distributed) user or product factor vectors
|
||||||
|
* @return Option[YtY] - whose value is only used in the implicit preference model
|
||||||
|
*/
|
||||||
|
def computeYtY(factors: RDD[(Int, Array[Array[Double]])]) = {
|
||||||
|
implicitPrefs match {
|
||||||
|
case true => {
|
||||||
|
Option(
|
||||||
|
factors.flatMapValues{ case factorArray =>
|
||||||
|
factorArray.map{ vector =>
|
||||||
|
val x = new DoubleMatrix(vector)
|
||||||
|
x.mmul(x.transpose())
|
||||||
|
}
|
||||||
|
}.reduceByKeyLocally((a, b) => a.addi(b))
|
||||||
|
.values
|
||||||
|
.reduce((a, b) => a.addi(b))
|
||||||
|
)
|
||||||
|
}
|
||||||
|
case false => None
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
/**
|
||||||
|
* Flatten out blocked user or product factors into an RDD of (id, factor vector) pairs
|
||||||
|
*/
|
||||||
|
def unblockFactors(blockedFactors: RDD[(Int, Array[Array[Double]])],
|
||||||
|
outLinks: RDD[(Int, OutLinkBlock)]) = {
|
||||||
|
blockedFactors.join(outLinks).flatMap{ case (b, (factors, outLinkBlock)) =>
|
||||||
|
for (i <- 0 until factors.length) yield (outLinkBlock.elementIds(i), factors(i))
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
/**
|
/**
|
||||||
* Make the out-links table for a block of the users (or products) dataset given the list of
|
* Make the out-links table for a block of the users (or products) dataset given the list of
|
||||||
* (user, product, rating) values for the users in that block (or the opposite for products).
|
* (user, product, rating) values for the users in that block (or the opposite for products).
|
||||||
|
@ -251,7 +318,9 @@ class ALS private (var numBlocks: Int, var rank: Int, var iterations: Int, var l
|
||||||
userInLinks: RDD[(Int, InLinkBlock)],
|
userInLinks: RDD[(Int, InLinkBlock)],
|
||||||
partitioner: Partitioner,
|
partitioner: Partitioner,
|
||||||
rank: Int,
|
rank: Int,
|
||||||
lambda: Double)
|
lambda: Double,
|
||||||
|
alpha: Double,
|
||||||
|
YtY: Broadcast[Option[DoubleMatrix]])
|
||||||
: RDD[(Int, Array[Array[Double]])] =
|
: RDD[(Int, Array[Array[Double]])] =
|
||||||
{
|
{
|
||||||
val numBlocks = products.partitions.size
|
val numBlocks = products.partitions.size
|
||||||
|
@ -265,7 +334,9 @@ class ALS private (var numBlocks: Int, var rank: Int, var iterations: Int, var l
|
||||||
toSend.zipWithIndex.map{ case (buf, idx) => (idx, (bid, buf.toArray)) }
|
toSend.zipWithIndex.map{ case (buf, idx) => (idx, (bid, buf.toArray)) }
|
||||||
}.groupByKey(partitioner)
|
}.groupByKey(partitioner)
|
||||||
.join(userInLinks)
|
.join(userInLinks)
|
||||||
.mapValues{ case (messages, inLinkBlock) => updateBlock(messages, inLinkBlock, rank, lambda) }
|
.mapValues{ case (messages, inLinkBlock) =>
|
||||||
|
updateBlock(messages, inLinkBlock, rank, lambda, alpha, YtY)
|
||||||
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
/**
|
/**
|
||||||
|
@ -273,7 +344,7 @@ class ALS private (var numBlocks: Int, var rank: Int, var iterations: Int, var l
|
||||||
* it received from each product and its InLinkBlock.
|
* it received from each product and its InLinkBlock.
|
||||||
*/
|
*/
|
||||||
def updateBlock(messages: Seq[(Int, Array[Array[Double]])], inLinkBlock: InLinkBlock,
|
def updateBlock(messages: Seq[(Int, Array[Array[Double]])], inLinkBlock: InLinkBlock,
|
||||||
rank: Int, lambda: Double)
|
rank: Int, lambda: Double, alpha: Double, YtY: Broadcast[Option[DoubleMatrix]])
|
||||||
: Array[Array[Double]] =
|
: Array[Array[Double]] =
|
||||||
{
|
{
|
||||||
// Sort the incoming block factor messages by block ID and make them an array
|
// Sort the incoming block factor messages by block ID and make them an array
|
||||||
|
@ -298,8 +369,14 @@ class ALS private (var numBlocks: Int, var rank: Int, var iterations: Int, var l
|
||||||
fillXtX(x, tempXtX)
|
fillXtX(x, tempXtX)
|
||||||
val (us, rs) = inLinkBlock.ratingsForBlock(productBlock)(p)
|
val (us, rs) = inLinkBlock.ratingsForBlock(productBlock)(p)
|
||||||
for (i <- 0 until us.length) {
|
for (i <- 0 until us.length) {
|
||||||
userXtX(us(i)).addi(tempXtX)
|
implicitPrefs match {
|
||||||
SimpleBlas.axpy(rs(i), x, userXy(us(i)))
|
case false =>
|
||||||
|
userXtX(us(i)).addi(tempXtX)
|
||||||
|
SimpleBlas.axpy(rs(i), x, userXy(us(i)))
|
||||||
|
case true =>
|
||||||
|
userXtX(us(i)).addi(tempXtX.mul(alpha * rs(i)))
|
||||||
|
SimpleBlas.axpy(1 + alpha * rs(i), x, userXy(us(i)))
|
||||||
|
}
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
@ -311,7 +388,10 @@ class ALS private (var numBlocks: Int, var rank: Int, var iterations: Int, var l
|
||||||
// Add regularization
|
// Add regularization
|
||||||
(0 until rank).foreach(i => fullXtX.data(i*rank + i) += lambda)
|
(0 until rank).foreach(i => fullXtX.data(i*rank + i) += lambda)
|
||||||
// Solve the resulting matrix, which is symmetric and positive-definite
|
// Solve the resulting matrix, which is symmetric and positive-definite
|
||||||
Solve.solvePositive(fullXtX, userXy(index)).data
|
implicitPrefs match {
|
||||||
|
case false => Solve.solvePositive(fullXtX, userXy(index)).data
|
||||||
|
case true => Solve.solvePositive(fullXtX.add(YtY.value.get), userXy(index)).data
|
||||||
|
}
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
|
@ -381,7 +461,7 @@ object ALS {
|
||||||
blocks: Int)
|
blocks: Int)
|
||||||
: MatrixFactorizationModel =
|
: MatrixFactorizationModel =
|
||||||
{
|
{
|
||||||
new ALS(blocks, rank, iterations, lambda).run(ratings)
|
new ALS(blocks, rank, iterations, lambda, false, 1.0).run(ratings)
|
||||||
}
|
}
|
||||||
|
|
||||||
/**
|
/**
|
||||||
|
@ -419,6 +499,68 @@ object ALS {
|
||||||
train(ratings, rank, iterations, 0.01, -1)
|
train(ratings, rank, iterations, 0.01, -1)
|
||||||
}
|
}
|
||||||
|
|
||||||
|
/**
|
||||||
|
* Train a matrix factorization model given an RDD of 'implicit preferences' given by users
|
||||||
|
* to some products, in the form of (userID, productID, preference) pairs. We approximate the
|
||||||
|
* ratings matrix as the product of two lower-rank matrices of a given rank (number of features).
|
||||||
|
* To solve for these features, we run a given number of iterations of ALS. This is done using
|
||||||
|
* a level of parallelism given by `blocks`.
|
||||||
|
*
|
||||||
|
* @param ratings RDD of (userID, productID, rating) pairs
|
||||||
|
* @param rank number of features to use
|
||||||
|
* @param iterations number of iterations of ALS (recommended: 10-20)
|
||||||
|
* @param lambda regularization factor (recommended: 0.01)
|
||||||
|
* @param blocks level of parallelism to split computation into
|
||||||
|
* @param alpha confidence parameter (only applies when immplicitPrefs = true)
|
||||||
|
*/
|
||||||
|
def trainImplicit(
|
||||||
|
ratings: RDD[Rating],
|
||||||
|
rank: Int,
|
||||||
|
iterations: Int,
|
||||||
|
lambda: Double,
|
||||||
|
blocks: Int,
|
||||||
|
alpha: Double)
|
||||||
|
: MatrixFactorizationModel =
|
||||||
|
{
|
||||||
|
new ALS(blocks, rank, iterations, lambda, true, alpha).run(ratings)
|
||||||
|
}
|
||||||
|
|
||||||
|
/**
|
||||||
|
* Train a matrix factorization model given an RDD of 'implicit preferences' given by users to
|
||||||
|
* some products, in the form of (userID, productID, preference) pairs. We approximate the
|
||||||
|
* ratings matrix as the product of two lower-rank matrices of a given rank (number of features).
|
||||||
|
* To solve for these features, we run a given number of iterations of ALS. The level of
|
||||||
|
* parallelism is determined automatically based on the number of partitions in `ratings`.
|
||||||
|
*
|
||||||
|
* @param ratings RDD of (userID, productID, rating) pairs
|
||||||
|
* @param rank number of features to use
|
||||||
|
* @param iterations number of iterations of ALS (recommended: 10-20)
|
||||||
|
* @param lambda regularization factor (recommended: 0.01)
|
||||||
|
*/
|
||||||
|
def trainImplicit(ratings: RDD[Rating], rank: Int, iterations: Int, lambda: Double, alpha: Double)
|
||||||
|
: MatrixFactorizationModel =
|
||||||
|
{
|
||||||
|
trainImplicit(ratings, rank, iterations, lambda, -1, alpha)
|
||||||
|
}
|
||||||
|
|
||||||
|
/**
|
||||||
|
* Train a matrix factorization model given an RDD of 'implicit preferences' ratings given by
|
||||||
|
* users to some products, in the form of (userID, productID, rating) pairs. We approximate the
|
||||||
|
* ratings matrix as the product of two lower-rank matrices of a given rank (number of features).
|
||||||
|
* To solve for these features, we run a given number of iterations of ALS. The level of
|
||||||
|
* parallelism is determined automatically based on the number of partitions in `ratings`.
|
||||||
|
* Model parameters `alpha` and `lambda` are set to reasonable default values
|
||||||
|
*
|
||||||
|
* @param ratings RDD of (userID, productID, rating) pairs
|
||||||
|
* @param rank number of features to use
|
||||||
|
* @param iterations number of iterations of ALS (recommended: 10-20)
|
||||||
|
*/
|
||||||
|
def trainImplicit(ratings: RDD[Rating], rank: Int, iterations: Int)
|
||||||
|
: MatrixFactorizationModel =
|
||||||
|
{
|
||||||
|
trainImplicit(ratings, rank, iterations, 0.01, -1, 1.0)
|
||||||
|
}
|
||||||
|
|
||||||
private class ALSRegistrator extends KryoRegistrator {
|
private class ALSRegistrator extends KryoRegistrator {
|
||||||
override def registerClasses(kryo: Kryo) {
|
override def registerClasses(kryo: Kryo) {
|
||||||
kryo.register(classOf[Rating])
|
kryo.register(classOf[Rating])
|
||||||
|
@ -426,29 +568,37 @@ object ALS {
|
||||||
}
|
}
|
||||||
|
|
||||||
def main(args: Array[String]) {
|
def main(args: Array[String]) {
|
||||||
if (args.length != 5 && args.length != 6) {
|
if (args.length < 5 || args.length > 9) {
|
||||||
println("Usage: ALS <master> <ratings_file> <rank> <iterations> <output_dir> [<blocks>]")
|
println("Usage: ALS <master> <ratings_file> <rank> <iterations> <output_dir> " +
|
||||||
|
"[<lambda>] [<implicitPrefs>] [<alpha>] [<blocks>]")
|
||||||
System.exit(1)
|
System.exit(1)
|
||||||
}
|
}
|
||||||
val (master, ratingsFile, rank, iters, outputDir) =
|
val (master, ratingsFile, rank, iters, outputDir) =
|
||||||
(args(0), args(1), args(2).toInt, args(3).toInt, args(4))
|
(args(0), args(1), args(2).toInt, args(3).toInt, args(4))
|
||||||
val blocks = if (args.length == 6) args(5).toInt else -1
|
val lambda = if (args.length >= 6) args(5).toDouble else 0.01
|
||||||
|
val implicitPrefs = if (args.length >= 7) args(6).toBoolean else false
|
||||||
|
val alpha = if (args.length >= 8) args(7).toDouble else 1
|
||||||
|
val blocks = if (args.length == 9) args(8).toInt else -1
|
||||||
|
|
||||||
System.setProperty("spark.serializer", "org.apache.spark.serializer.KryoSerializer")
|
System.setProperty("spark.serializer", "org.apache.spark.serializer.KryoSerializer")
|
||||||
System.setProperty("spark.kryo.registrator", classOf[ALSRegistrator].getName)
|
System.setProperty("spark.kryo.registrator", classOf[ALSRegistrator].getName)
|
||||||
System.setProperty("spark.kryo.referenceTracking", "false")
|
System.setProperty("spark.kryo.referenceTracking", "false")
|
||||||
System.setProperty("spark.kryoserializer.buffer.mb", "8")
|
System.setProperty("spark.kryoserializer.buffer.mb", "8")
|
||||||
System.setProperty("spark.locality.wait", "10000")
|
System.setProperty("spark.locality.wait", "10000")
|
||||||
|
|
||||||
val sc = new SparkContext(master, "ALS")
|
val sc = new SparkContext(master, "ALS")
|
||||||
val ratings = sc.textFile(ratingsFile).map { line =>
|
val ratings = sc.textFile(ratingsFile).map { line =>
|
||||||
val fields = line.split(',')
|
val fields = line.split("\\D{2}|\\s|,")
|
||||||
Rating(fields(0).toInt, fields(1).toInt, fields(2).toDouble)
|
Rating(fields(0).toInt, fields(1).toInt, fields(2).toDouble)
|
||||||
}
|
}
|
||||||
val model = ALS.train(ratings, rank, iters, 0.01, blocks)
|
val model = new ALS(rank = rank, iterations = iters, lambda = lambda,
|
||||||
|
numBlocks = blocks, implicitPrefs = implicitPrefs, alpha = alpha).run(ratings)
|
||||||
|
|
||||||
model.userFeatures.map{ case (id, vec) => id + "," + vec.mkString(" ") }
|
model.userFeatures.map{ case (id, vec) => id + "," + vec.mkString(" ") }
|
||||||
.saveAsTextFile(outputDir + "/userFeatures")
|
.saveAsTextFile(outputDir + "/userFeatures")
|
||||||
model.productFeatures.map{ case (id, vec) => id + "," + vec.mkString(" ") }
|
model.productFeatures.map{ case (id, vec) => id + "," + vec.mkString(" ") }
|
||||||
.saveAsTextFile(outputDir + "/productFeatures")
|
.saveAsTextFile(outputDir + "/productFeatures")
|
||||||
println("Final user/product features written to " + outputDir)
|
println("Final user/product features written to " + outputDir)
|
||||||
System.exit(0)
|
sc.stop()
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
|
@ -19,6 +19,7 @@ package org.apache.spark.mllib.recommendation;
|
||||||
|
|
||||||
import java.io.Serializable;
|
import java.io.Serializable;
|
||||||
import java.util.List;
|
import java.util.List;
|
||||||
|
import java.lang.Math;
|
||||||
|
|
||||||
import scala.Tuple2;
|
import scala.Tuple2;
|
||||||
|
|
||||||
|
@ -48,7 +49,7 @@ public class JavaALSSuite implements Serializable {
|
||||||
}
|
}
|
||||||
|
|
||||||
void validatePrediction(MatrixFactorizationModel model, int users, int products, int features,
|
void validatePrediction(MatrixFactorizationModel model, int users, int products, int features,
|
||||||
DoubleMatrix trueRatings, double matchThreshold) {
|
DoubleMatrix trueRatings, double matchThreshold, boolean implicitPrefs, DoubleMatrix truePrefs) {
|
||||||
DoubleMatrix predictedU = new DoubleMatrix(users, features);
|
DoubleMatrix predictedU = new DoubleMatrix(users, features);
|
||||||
List<scala.Tuple2<Object, double[]>> userFeatures = model.userFeatures().toJavaRDD().collect();
|
List<scala.Tuple2<Object, double[]>> userFeatures = model.userFeatures().toJavaRDD().collect();
|
||||||
for (int i = 0; i < features; ++i) {
|
for (int i = 0; i < features; ++i) {
|
||||||
|
@ -68,12 +69,32 @@ public class JavaALSSuite implements Serializable {
|
||||||
|
|
||||||
DoubleMatrix predictedRatings = predictedU.mmul(predictedP.transpose());
|
DoubleMatrix predictedRatings = predictedU.mmul(predictedP.transpose());
|
||||||
|
|
||||||
for (int u = 0; u < users; ++u) {
|
if (!implicitPrefs) {
|
||||||
for (int p = 0; p < products; ++p) {
|
for (int u = 0; u < users; ++u) {
|
||||||
double prediction = predictedRatings.get(u, p);
|
for (int p = 0; p < products; ++p) {
|
||||||
double correct = trueRatings.get(u, p);
|
double prediction = predictedRatings.get(u, p);
|
||||||
Assert.assertTrue(Math.abs(prediction - correct) < matchThreshold);
|
double correct = trueRatings.get(u, p);
|
||||||
|
Assert.assertTrue(String.format("Prediction=%2.4f not below match threshold of %2.2f",
|
||||||
|
prediction, matchThreshold), Math.abs(prediction - correct) < matchThreshold);
|
||||||
|
}
|
||||||
}
|
}
|
||||||
|
} else {
|
||||||
|
// For implicit prefs we use the confidence-weighted RMSE to test (ref Mahout's implicit ALS tests)
|
||||||
|
double sqErr = 0.0;
|
||||||
|
double denom = 0.0;
|
||||||
|
for (int u = 0; u < users; ++u) {
|
||||||
|
for (int p = 0; p < products; ++p) {
|
||||||
|
double prediction = predictedRatings.get(u, p);
|
||||||
|
double truePref = truePrefs.get(u, p);
|
||||||
|
double confidence = 1.0 + /* alpha = */ 1.0 * trueRatings.get(u, p);
|
||||||
|
double err = confidence * (truePref - prediction) * (truePref - prediction);
|
||||||
|
sqErr += err;
|
||||||
|
denom += 1.0;
|
||||||
|
}
|
||||||
|
}
|
||||||
|
double rmse = Math.sqrt(sqErr / denom);
|
||||||
|
Assert.assertTrue(String.format("Confidence-weighted RMSE=%2.4f above threshold of %2.2f",
|
||||||
|
rmse, matchThreshold), Math.abs(rmse) < matchThreshold);
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
|
@ -83,12 +104,12 @@ public class JavaALSSuite implements Serializable {
|
||||||
int iterations = 15;
|
int iterations = 15;
|
||||||
int users = 10;
|
int users = 10;
|
||||||
int products = 10;
|
int products = 10;
|
||||||
scala.Tuple2<List<Rating>, DoubleMatrix> testData = ALSSuite.generateRatingsAsJavaList(
|
scala.Tuple3<List<Rating>, DoubleMatrix, DoubleMatrix> testData = ALSSuite.generateRatingsAsJavaList(
|
||||||
users, products, features, 0.7);
|
users, products, features, 0.7, false);
|
||||||
|
|
||||||
JavaRDD<Rating> data = sc.parallelize(testData._1());
|
JavaRDD<Rating> data = sc.parallelize(testData._1());
|
||||||
MatrixFactorizationModel model = ALS.train(data.rdd(), features, iterations);
|
MatrixFactorizationModel model = ALS.train(data.rdd(), features, iterations);
|
||||||
validatePrediction(model, users, products, features, testData._2(), 0.3);
|
validatePrediction(model, users, products, features, testData._2(), 0.3, false, testData._3());
|
||||||
}
|
}
|
||||||
|
|
||||||
@Test
|
@Test
|
||||||
|
@ -97,14 +118,46 @@ public class JavaALSSuite implements Serializable {
|
||||||
int iterations = 15;
|
int iterations = 15;
|
||||||
int users = 20;
|
int users = 20;
|
||||||
int products = 30;
|
int products = 30;
|
||||||
scala.Tuple2<List<Rating>, DoubleMatrix> testData = ALSSuite.generateRatingsAsJavaList(
|
scala.Tuple3<List<Rating>, DoubleMatrix, DoubleMatrix> testData = ALSSuite.generateRatingsAsJavaList(
|
||||||
users, products, features, 0.7);
|
users, products, features, 0.7, false);
|
||||||
|
|
||||||
JavaRDD<Rating> data = sc.parallelize(testData._1());
|
JavaRDD<Rating> data = sc.parallelize(testData._1());
|
||||||
|
|
||||||
MatrixFactorizationModel model = new ALS().setRank(features)
|
MatrixFactorizationModel model = new ALS().setRank(features)
|
||||||
.setIterations(iterations)
|
.setIterations(iterations)
|
||||||
.run(data.rdd());
|
.run(data.rdd());
|
||||||
validatePrediction(model, users, products, features, testData._2(), 0.3);
|
validatePrediction(model, users, products, features, testData._2(), 0.3, false, testData._3());
|
||||||
|
}
|
||||||
|
|
||||||
|
@Test
|
||||||
|
public void runImplicitALSUsingStaticMethods() {
|
||||||
|
int features = 1;
|
||||||
|
int iterations = 15;
|
||||||
|
int users = 40;
|
||||||
|
int products = 80;
|
||||||
|
scala.Tuple3<List<Rating>, DoubleMatrix, DoubleMatrix> testData = ALSSuite.generateRatingsAsJavaList(
|
||||||
|
users, products, features, 0.7, true);
|
||||||
|
|
||||||
|
JavaRDD<Rating> data = sc.parallelize(testData._1());
|
||||||
|
MatrixFactorizationModel model = ALS.trainImplicit(data.rdd(), features, iterations);
|
||||||
|
validatePrediction(model, users, products, features, testData._2(), 0.4, true, testData._3());
|
||||||
|
}
|
||||||
|
|
||||||
|
@Test
|
||||||
|
public void runImplicitALSUsingConstructor() {
|
||||||
|
int features = 2;
|
||||||
|
int iterations = 15;
|
||||||
|
int users = 50;
|
||||||
|
int products = 100;
|
||||||
|
scala.Tuple3<List<Rating>, DoubleMatrix, DoubleMatrix> testData = ALSSuite.generateRatingsAsJavaList(
|
||||||
|
users, products, features, 0.7, true);
|
||||||
|
|
||||||
|
JavaRDD<Rating> data = sc.parallelize(testData._1());
|
||||||
|
|
||||||
|
MatrixFactorizationModel model = new ALS().setRank(features)
|
||||||
|
.setIterations(iterations)
|
||||||
|
.setImplicitPrefs(true)
|
||||||
|
.run(data.rdd());
|
||||||
|
validatePrediction(model, users, products, features, testData._2(), 0.4, true, testData._3());
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
|
@ -34,16 +34,19 @@ object ALSSuite {
|
||||||
users: Int,
|
users: Int,
|
||||||
products: Int,
|
products: Int,
|
||||||
features: Int,
|
features: Int,
|
||||||
samplingRate: Double): (java.util.List[Rating], DoubleMatrix) = {
|
samplingRate: Double,
|
||||||
val (sampledRatings, trueRatings) = generateRatings(users, products, features, samplingRate)
|
implicitPrefs: Boolean): (java.util.List[Rating], DoubleMatrix, DoubleMatrix) = {
|
||||||
(seqAsJavaList(sampledRatings), trueRatings)
|
val (sampledRatings, trueRatings, truePrefs) =
|
||||||
|
generateRatings(users, products, features, samplingRate, implicitPrefs)
|
||||||
|
(seqAsJavaList(sampledRatings), trueRatings, truePrefs)
|
||||||
}
|
}
|
||||||
|
|
||||||
def generateRatings(
|
def generateRatings(
|
||||||
users: Int,
|
users: Int,
|
||||||
products: Int,
|
products: Int,
|
||||||
features: Int,
|
features: Int,
|
||||||
samplingRate: Double): (Seq[Rating], DoubleMatrix) = {
|
samplingRate: Double,
|
||||||
|
implicitPrefs: Boolean = false): (Seq[Rating], DoubleMatrix, DoubleMatrix) = {
|
||||||
val rand = new Random(42)
|
val rand = new Random(42)
|
||||||
|
|
||||||
// Create a random matrix with uniform values from -1 to 1
|
// Create a random matrix with uniform values from -1 to 1
|
||||||
|
@ -52,14 +55,20 @@ object ALSSuite {
|
||||||
|
|
||||||
val userMatrix = randomMatrix(users, features)
|
val userMatrix = randomMatrix(users, features)
|
||||||
val productMatrix = randomMatrix(features, products)
|
val productMatrix = randomMatrix(features, products)
|
||||||
val trueRatings = userMatrix.mmul(productMatrix)
|
val (trueRatings, truePrefs) = implicitPrefs match {
|
||||||
|
case true =>
|
||||||
|
val raw = new DoubleMatrix(users, products, Array.fill(users * products)(rand.nextInt(10).toDouble): _*)
|
||||||
|
val prefs = new DoubleMatrix(users, products, raw.data.map(v => if (v > 0) 1.0 else 0.0): _*)
|
||||||
|
(raw, prefs)
|
||||||
|
case false => (userMatrix.mmul(productMatrix), null)
|
||||||
|
}
|
||||||
|
|
||||||
val sampledRatings = {
|
val sampledRatings = {
|
||||||
for (u <- 0 until users; p <- 0 until products if rand.nextDouble() < samplingRate)
|
for (u <- 0 until users; p <- 0 until products if rand.nextDouble() < samplingRate)
|
||||||
yield Rating(u, p, trueRatings.get(u, p))
|
yield Rating(u, p, trueRatings.get(u, p))
|
||||||
}
|
}
|
||||||
|
|
||||||
(sampledRatings, trueRatings)
|
(sampledRatings, trueRatings, truePrefs)
|
||||||
}
|
}
|
||||||
|
|
||||||
}
|
}
|
||||||
|
@ -85,6 +94,14 @@ class ALSSuite extends FunSuite with BeforeAndAfterAll {
|
||||||
testALS(20, 30, 2, 15, 0.7, 0.3)
|
testALS(20, 30, 2, 15, 0.7, 0.3)
|
||||||
}
|
}
|
||||||
|
|
||||||
|
test("rank-1 matrices implicit") {
|
||||||
|
testALS(40, 80, 1, 15, 0.7, 0.4, true)
|
||||||
|
}
|
||||||
|
|
||||||
|
test("rank-2 matrices implicit") {
|
||||||
|
testALS(50, 100, 2, 15, 0.7, 0.4, true)
|
||||||
|
}
|
||||||
|
|
||||||
/**
|
/**
|
||||||
* Test if we can correctly factorize R = U * P where U and P are of known rank.
|
* Test if we can correctly factorize R = U * P where U and P are of known rank.
|
||||||
*
|
*
|
||||||
|
@ -96,11 +113,14 @@ class ALSSuite extends FunSuite with BeforeAndAfterAll {
|
||||||
* @param matchThreshold max difference allowed to consider a predicted rating correct
|
* @param matchThreshold max difference allowed to consider a predicted rating correct
|
||||||
*/
|
*/
|
||||||
def testALS(users: Int, products: Int, features: Int, iterations: Int,
|
def testALS(users: Int, products: Int, features: Int, iterations: Int,
|
||||||
samplingRate: Double, matchThreshold: Double)
|
samplingRate: Double, matchThreshold: Double, implicitPrefs: Boolean = false)
|
||||||
{
|
{
|
||||||
val (sampledRatings, trueRatings) = ALSSuite.generateRatings(users, products,
|
val (sampledRatings, trueRatings, truePrefs) = ALSSuite.generateRatings(users, products,
|
||||||
features, samplingRate)
|
features, samplingRate, implicitPrefs)
|
||||||
val model = ALS.train(sc.parallelize(sampledRatings), features, iterations)
|
val model = implicitPrefs match {
|
||||||
|
case false => ALS.train(sc.parallelize(sampledRatings), features, iterations)
|
||||||
|
case true => ALS.trainImplicit(sc.parallelize(sampledRatings), features, iterations)
|
||||||
|
}
|
||||||
|
|
||||||
val predictedU = new DoubleMatrix(users, features)
|
val predictedU = new DoubleMatrix(users, features)
|
||||||
for ((u, vec) <- model.userFeatures.collect(); i <- 0 until features) {
|
for ((u, vec) <- model.userFeatures.collect(); i <- 0 until features) {
|
||||||
|
@ -112,12 +132,31 @@ class ALSSuite extends FunSuite with BeforeAndAfterAll {
|
||||||
}
|
}
|
||||||
val predictedRatings = predictedU.mmul(predictedP.transpose)
|
val predictedRatings = predictedU.mmul(predictedP.transpose)
|
||||||
|
|
||||||
for (u <- 0 until users; p <- 0 until products) {
|
if (!implicitPrefs) {
|
||||||
val prediction = predictedRatings.get(u, p)
|
for (u <- 0 until users; p <- 0 until products) {
|
||||||
val correct = trueRatings.get(u, p)
|
val prediction = predictedRatings.get(u, p)
|
||||||
if (math.abs(prediction - correct) > matchThreshold) {
|
val correct = trueRatings.get(u, p)
|
||||||
fail("Model failed to predict (%d, %d): %f vs %f\ncorr: %s\npred: %s\nU: %s\n P: %s".format(
|
if (math.abs(prediction - correct) > matchThreshold) {
|
||||||
u, p, correct, prediction, trueRatings, predictedRatings, predictedU, predictedP))
|
fail("Model failed to predict (%d, %d): %f vs %f\ncorr: %s\npred: %s\nU: %s\n P: %s".format(
|
||||||
|
u, p, correct, prediction, trueRatings, predictedRatings, predictedU, predictedP))
|
||||||
|
}
|
||||||
|
}
|
||||||
|
} else {
|
||||||
|
// For implicit prefs we use the confidence-weighted RMSE to test (ref Mahout's tests)
|
||||||
|
var sqErr = 0.0
|
||||||
|
var denom = 0.0
|
||||||
|
for (u <- 0 until users; p <- 0 until products) {
|
||||||
|
val prediction = predictedRatings.get(u, p)
|
||||||
|
val truePref = truePrefs.get(u, p)
|
||||||
|
val confidence = 1 + 1.0 * trueRatings.get(u, p)
|
||||||
|
val err = confidence * (truePref - prediction) * (truePref - prediction)
|
||||||
|
sqErr += err
|
||||||
|
denom += 1
|
||||||
|
}
|
||||||
|
val rmse = math.sqrt(sqErr / denom)
|
||||||
|
if (math.abs(rmse) > matchThreshold) {
|
||||||
|
fail("Model failed to predict RMSE: %f\ncorr: %s\npred: %s\nU: %s\n P: %s".format(
|
||||||
|
rmse, truePrefs, predictedRatings, predictedU, predictedP))
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
Loading…
Reference in a new issue