[SPARK-20679][ML] Support recommending for a subset of users/items in ALSModel

This PR adds methods `recommendForUserSubset` and `recommendForItemSubset` to `ALSModel`. These allow recommending for a specified set of user / item ids rather than for every user / item (as in the `recommendForAllX` methods).

The subset methods take a `DataFrame` as input, containing ids in the column specified by the param `userCol` or `itemCol`. The model will generate recommendations for each _unique_ id in this input dataframe.

## How was this patch tested?
New unit tests in `ALSSuite` and Python doctests in `ALS`. Ran updated examples locally.

Author: Nick Pentreath <nickp@za.ibm.com>

Closes #18748 from MLnick/als-recommend-df.
This commit is contained in:
Nick Pentreath 2017-10-09 10:42:33 +02:00
parent fe7b219ae3
commit 98057583dd
6 changed files with 205 additions and 8 deletions

View file

@ -118,9 +118,18 @@ public class JavaALSExample {
Dataset<Row> userRecs = model.recommendForAllUsers(10);
// Generate top 10 user recommendations for each movie
Dataset<Row> movieRecs = model.recommendForAllItems(10);
// Generate top 10 movie recommendations for a specified set of users
Dataset<Row> users = ratings.select(als.getUserCol()).distinct().limit(3);
Dataset<Row> userSubsetRecs = model.recommendForUserSubset(users, 10);
// Generate top 10 user recommendations for a specified set of movies
Dataset<Row> movies = ratings.select(als.getItemCol()).distinct().limit(3);
Dataset<Row> movieSubSetRecs = model.recommendForItemSubset(movies, 10);
// $example off$
userRecs.show();
movieRecs.show();
userSubsetRecs.show();
movieSubSetRecs.show();
spark.stop();
}

View file

@ -60,8 +60,17 @@ if __name__ == "__main__":
userRecs = model.recommendForAllUsers(10)
# Generate top 10 user recommendations for each movie
movieRecs = model.recommendForAllItems(10)
# Generate top 10 movie recommendations for a specified set of users
users = ratings.select(als.getUserCol()).distinct().limit(3)
userSubsetRecs = model.recommendForUserSubset(users, 10)
# Generate top 10 user recommendations for a specified set of movies
movies = ratings.select(als.getItemCol()).distinct().limit(3)
movieSubSetRecs = model.recommendForItemSubset(movies, 10)
# $example off$
userRecs.show()
movieRecs.show()
userSubsetRecs.show()
movieSubSetRecs.show()
spark.stop()

View file

@ -80,9 +80,18 @@ object ALSExample {
val userRecs = model.recommendForAllUsers(10)
// Generate top 10 user recommendations for each movie
val movieRecs = model.recommendForAllItems(10)
// Generate top 10 movie recommendations for a specified set of users
val users = ratings.select(als.getUserCol).distinct().limit(3)
val userSubsetRecs = model.recommendForUserSubset(users, 10)
// Generate top 10 user recommendations for a specified set of movies
val movies = ratings.select(als.getItemCol).distinct().limit(3)
val movieSubSetRecs = model.recommendForItemSubset(movies, 10)
// $example off$
userRecs.show()
movieRecs.show()
userSubsetRecs.show()
movieSubSetRecs.show()
spark.stop()
}

View file

@ -344,6 +344,21 @@ class ALSModel private[ml] (
recommendForAll(userFactors, itemFactors, $(userCol), $(itemCol), numItems)
}
/**
* Returns top `numItems` items recommended for each user id in the input data set. Note that if
* there are duplicate ids in the input dataset, only one set of recommendations per unique id
* will be returned.
* @param dataset a Dataset containing a column of user ids. The column name must match `userCol`.
* @param numItems max number of recommendations for each user.
* @return a DataFrame of (userCol: Int, recommendations), where recommendations are
* stored as an array of (itemCol: Int, rating: Float) Rows.
*/
@Since("2.3.0")
def recommendForUserSubset(dataset: Dataset[_], numItems: Int): DataFrame = {
val srcFactorSubset = getSourceFactorSubset(dataset, userFactors, $(userCol))
recommendForAll(srcFactorSubset, itemFactors, $(userCol), $(itemCol), numItems)
}
/**
* Returns top `numUsers` users recommended for each item, for all items.
* @param numUsers max number of recommendations for each item
@ -355,6 +370,39 @@ class ALSModel private[ml] (
recommendForAll(itemFactors, userFactors, $(itemCol), $(userCol), numUsers)
}
/**
* Returns top `numUsers` users recommended for each item id in the input data set. Note that if
* there are duplicate ids in the input dataset, only one set of recommendations per unique id
* will be returned.
* @param dataset a Dataset containing a column of item ids. The column name must match `itemCol`.
* @param numUsers max number of recommendations for each item.
* @return a DataFrame of (itemCol: Int, recommendations), where recommendations are
* stored as an array of (userCol: Int, rating: Float) Rows.
*/
@Since("2.3.0")
def recommendForItemSubset(dataset: Dataset[_], numUsers: Int): DataFrame = {
val srcFactorSubset = getSourceFactorSubset(dataset, itemFactors, $(itemCol))
recommendForAll(srcFactorSubset, userFactors, $(itemCol), $(userCol), numUsers)
}
/**
* Returns a subset of a factor DataFrame limited to only those unique ids contained
* in the input dataset.
* @param dataset input Dataset containing id column to user to filter factors.
* @param factors factor DataFrame to filter.
* @param column column name containing the ids in the input dataset.
* @return DataFrame containing factors only for those ids present in both the input dataset and
* the factor DataFrame.
*/
private def getSourceFactorSubset(
dataset: Dataset[_],
factors: DataFrame,
column: String): DataFrame = {
factors
.join(dataset.select(column), factors("id") === dataset(column), joinType = "left_semi")
.select(factors("id"), factors("features"))
}
/**
* Makes recommendations for all users (or items).
*

View file

@ -723,9 +723,9 @@ class ALSSuite
val numUsers = model.userFactors.count
val numItems = model.itemFactors.count
val expected = Map(
0 -> Array((3, 54f), (4, 44f), (5, 42f), (6, 28f)),
1 -> Array((3, 39f), (5, 33f), (4, 26f), (6, 16f)),
2 -> Array((3, 51f), (5, 45f), (4, 30f), (6, 18f))
0 -> Seq((3, 54f), (4, 44f), (5, 42f), (6, 28f)),
1 -> Seq((3, 39f), (5, 33f), (4, 26f), (6, 16f)),
2 -> Seq((3, 51f), (5, 45f), (4, 30f), (6, 18f))
)
Seq(2, 4, 6).foreach { k =>
@ -743,10 +743,10 @@ class ALSSuite
val numUsers = model.userFactors.count
val numItems = model.itemFactors.count
val expected = Map(
3 -> Array((0, 54f), (2, 51f), (1, 39f)),
4 -> Array((0, 44f), (2, 30f), (1, 26f)),
5 -> Array((2, 45f), (0, 42f), (1, 33f)),
6 -> Array((0, 28f), (2, 18f), (1, 16f))
3 -> Seq((0, 54f), (2, 51f), (1, 39f)),
4 -> Seq((0, 44f), (2, 30f), (1, 26f)),
5 -> Seq((2, 45f), (0, 42f), (1, 33f)),
6 -> Seq((0, 28f), (2, 18f), (1, 16f))
)
Seq(2, 3, 4).foreach { k =>
@ -759,9 +759,93 @@ class ALSSuite
}
}
test("recommendForUserSubset with k <, = and > num_items") {
val spark = this.spark
import spark.implicits._
val model = getALSModel
val numItems = model.itemFactors.count
val expected = Map(
0 -> Seq((3, 54f), (4, 44f), (5, 42f), (6, 28f)),
2 -> Seq((3, 51f), (5, 45f), (4, 30f), (6, 18f))
)
val userSubset = expected.keys.toSeq.toDF("user")
val numUsersSubset = userSubset.count
Seq(2, 4, 6).foreach { k =>
val n = math.min(k, numItems).toInt
val expectedUpToN = expected.mapValues(_.slice(0, n))
val topItems = model.recommendForUserSubset(userSubset, k)
assert(topItems.count() == numUsersSubset)
assert(topItems.columns.contains("user"))
checkRecommendations(topItems, expectedUpToN, "item")
}
}
test("recommendForItemSubset with k <, = and > num_users") {
val spark = this.spark
import spark.implicits._
val model = getALSModel
val numUsers = model.userFactors.count
val expected = Map(
3 -> Seq((0, 54f), (2, 51f), (1, 39f)),
6 -> Seq((0, 28f), (2, 18f), (1, 16f))
)
val itemSubset = expected.keys.toSeq.toDF("item")
val numItemsSubset = itemSubset.count
Seq(2, 3, 4).foreach { k =>
val n = math.min(k, numUsers).toInt
val expectedUpToN = expected.mapValues(_.slice(0, n))
val topUsers = model.recommendForItemSubset(itemSubset, k)
assert(topUsers.count() == numItemsSubset)
assert(topUsers.columns.contains("item"))
checkRecommendations(topUsers, expectedUpToN, "user")
}
}
test("subset recommendations eliminate duplicate ids, returns same results as unique ids") {
val spark = this.spark
import spark.implicits._
val model = getALSModel
val k = 2
val users = Seq(0, 1).toDF("user")
val dupUsers = Seq(0, 1, 0, 1).toDF("user")
val singleUserRecs = model.recommendForUserSubset(users, k)
val dupUserRecs = model.recommendForUserSubset(dupUsers, k)
.as[(Int, Seq[(Int, Float)])].collect().toMap
assert(singleUserRecs.count == dupUserRecs.size)
checkRecommendations(singleUserRecs, dupUserRecs, "item")
val items = Seq(3, 4, 5).toDF("item")
val dupItems = Seq(3, 4, 5, 4, 5).toDF("item")
val singleItemRecs = model.recommendForItemSubset(items, k)
val dupItemRecs = model.recommendForItemSubset(dupItems, k)
.as[(Int, Seq[(Int, Float)])].collect().toMap
assert(singleItemRecs.count == dupItemRecs.size)
checkRecommendations(singleItemRecs, dupItemRecs, "user")
}
test("subset recommendations on full input dataset equivalent to recommendForAll") {
val spark = this.spark
import spark.implicits._
val model = getALSModel
val k = 2
val userSubset = model.userFactors.withColumnRenamed("id", "user").drop("features")
val userSubsetRecs = model.recommendForUserSubset(userSubset, k)
val allUserRecs = model.recommendForAllUsers(k).as[(Int, Seq[(Int, Float)])].collect().toMap
checkRecommendations(userSubsetRecs, allUserRecs, "item")
val itemSubset = model.itemFactors.withColumnRenamed("id", "item").drop("features")
val itemSubsetRecs = model.recommendForItemSubset(itemSubset, k)
val allItemRecs = model.recommendForAllItems(k).as[(Int, Seq[(Int, Float)])].collect().toMap
checkRecommendations(itemSubsetRecs, allItemRecs, "user")
}
private def checkRecommendations(
topK: DataFrame,
expected: Map[Int, Array[(Int, Float)]],
expected: Map[Int, Seq[(Int, Float)]],
dstColName: String): Unit = {
val spark = this.spark
import spark.implicits._

View file

@ -90,6 +90,14 @@ class ALS(JavaEstimator, HasCheckpointInterval, HasMaxIter, HasPredictionCol, Ha
>>> item_recs.where(item_recs.item == 2)\
.select("recommendations.user", "recommendations.rating").collect()
[Row(user=[2, 1, 0], rating=[4.901..., 3.981..., -0.138...])]
>>> user_subset = df.where(df.user == 2)
>>> user_subset_recs = model.recommendForUserSubset(user_subset, 3)
>>> user_subset_recs.select("recommendations.item", "recommendations.rating").first()
Row(item=[2, 1, 0], rating=[4.901..., 1.056..., -1.501...])
>>> item_subset = df.where(df.item == 0)
>>> item_subset_recs = model.recommendForItemSubset(item_subset, 3)
>>> item_subset_recs.select("recommendations.user", "recommendations.rating").first()
Row(user=[0, 1, 2], rating=[3.910..., 2.625..., -1.501...])
>>> als_path = temp_path + "/als"
>>> als.save(als_path)
>>> als2 = ALS.load(als_path)
@ -414,6 +422,36 @@ class ALSModel(JavaModel, JavaMLWritable, JavaMLReadable):
"""
return self._call_java("recommendForAllItems", numUsers)
@since("2.3.0")
def recommendForUserSubset(self, dataset, numItems):
"""
Returns top `numItems` items recommended for each user id in the input data set. Note that
if there are duplicate ids in the input dataset, only one set of recommendations per unique
id will be returned.
:param dataset: a Dataset containing a column of user ids. The column name must match
`userCol`.
:param numItems: max number of recommendations for each user
:return: a DataFrame of (userCol, recommendations), where recommendations are
stored as an array of (itemCol, rating) Rows.
"""
return self._call_java("recommendForUserSubset", dataset, numItems)
@since("2.3.0")
def recommendForItemSubset(self, dataset, numUsers):
"""
Returns top `numUsers` users recommended for each item id in the input data set. Note that
if there are duplicate ids in the input dataset, only one set of recommendations per unique
id will be returned.
:param dataset: a Dataset containing a column of item ids. The column name must match
`itemCol`.
:param numUsers: max number of recommendations for each item
:return: a DataFrame of (itemCol, recommendations), where recommendations are
stored as an array of (userCol, rating) Rows.
"""
return self._call_java("recommendForItemSubset", dataset, numUsers)
if __name__ == "__main__":
import doctest