[SPARK-14412][.2][ML] rename *RDDStorageLevel to *StorageLevel in ml.ALS
## What changes were proposed in this pull request? As discussed in #12660, this PR renames * intermediateRDDStorageLevel -> intermediateStorageLevel * finalRDDStorageLevel -> finalStorageLevel The argument name in `ALS.train` will be addressed in SPARK-15027. ## How was this patch tested? Existing unit tests. Author: Xiangrui Meng <meng@databricks.com> Closes #12803 from mengxr/SPARK-14412.
This commit is contained in:
parent
5886b6217b
commit
7fbe1bb24d
|
@ -154,37 +154,37 @@ private[recommendation] trait ALSParams extends ALSModelParams with HasMaxIter w
|
|||
def getNonnegative: Boolean = $(nonnegative)
|
||||
|
||||
/**
|
||||
* Param for StorageLevel for intermediate RDDs. Pass in a string representation of
|
||||
* Param for StorageLevel for intermediate datasets. Pass in a string representation of
|
||||
* [[StorageLevel]]. Cannot be "NONE".
|
||||
* Default: "MEMORY_AND_DISK".
|
||||
*
|
||||
* @group expertParam
|
||||
*/
|
||||
val intermediateRDDStorageLevel = new Param[String](this, "intermediateRDDStorageLevel",
|
||||
"StorageLevel for intermediate RDDs. Cannot be 'NONE'. Default: 'MEMORY_AND_DISK'.",
|
||||
val intermediateStorageLevel = new Param[String](this, "intermediateStorageLevel",
|
||||
"StorageLevel for intermediate datasets. Cannot be 'NONE'. Default: 'MEMORY_AND_DISK'.",
|
||||
(s: String) => Try(StorageLevel.fromString(s)).isSuccess && s != "NONE")
|
||||
|
||||
/** @group expertGetParam */
|
||||
def getIntermediateRDDStorageLevel: String = $(intermediateRDDStorageLevel)
|
||||
def getIntermediateStorageLevel: String = $(intermediateStorageLevel)
|
||||
|
||||
/**
|
||||
* Param for StorageLevel for ALS model factor RDDs. Pass in a string representation of
|
||||
* Param for StorageLevel for ALS model factors. Pass in a string representation of
|
||||
* [[StorageLevel]].
|
||||
* Default: "MEMORY_AND_DISK".
|
||||
*
|
||||
* @group expertParam
|
||||
*/
|
||||
val finalRDDStorageLevel = new Param[String](this, "finalRDDStorageLevel",
|
||||
"StorageLevel for ALS model factor RDDs. Default: 'MEMORY_AND_DISK'.",
|
||||
val finalStorageLevel = new Param[String](this, "finalStorageLevel",
|
||||
"StorageLevel for ALS model factors. Default: 'MEMORY_AND_DISK'.",
|
||||
(s: String) => Try(StorageLevel.fromString(s)).isSuccess)
|
||||
|
||||
/** @group expertGetParam */
|
||||
def getFinalRDDStorageLevel: String = $(finalRDDStorageLevel)
|
||||
def getFinalStorageLevel: String = $(finalStorageLevel)
|
||||
|
||||
setDefault(rank -> 10, maxIter -> 10, regParam -> 0.1, numUserBlocks -> 10, numItemBlocks -> 10,
|
||||
implicitPrefs -> false, alpha -> 1.0, userCol -> "user", itemCol -> "item",
|
||||
ratingCol -> "rating", nonnegative -> false, checkpointInterval -> 10,
|
||||
intermediateRDDStorageLevel -> "MEMORY_AND_DISK", finalRDDStorageLevel -> "MEMORY_AND_DISK")
|
||||
intermediateStorageLevel -> "MEMORY_AND_DISK", finalStorageLevel -> "MEMORY_AND_DISK")
|
||||
|
||||
/**
|
||||
* Validates and transforms the input schema.
|
||||
|
@ -406,14 +406,14 @@ class ALS(@Since("1.4.0") override val uid: String) extends Estimator[ALSModel]
|
|||
|
||||
/** @group expertSetParam */
|
||||
@Since("2.0.0")
|
||||
def setIntermediateRDDStorageLevel(value: String): this.type = {
|
||||
set(intermediateRDDStorageLevel, value)
|
||||
def setIntermediateStorageLevel(value: String): this.type = {
|
||||
set(intermediateStorageLevel, value)
|
||||
}
|
||||
|
||||
/** @group expertSetParam */
|
||||
@Since("2.0.0")
|
||||
def setFinalRDDStorageLevel(value: String): this.type = {
|
||||
set(finalRDDStorageLevel, value)
|
||||
def setFinalStorageLevel(value: String): this.type = {
|
||||
set(finalStorageLevel, value)
|
||||
}
|
||||
|
||||
/**
|
||||
|
@ -446,8 +446,8 @@ class ALS(@Since("1.4.0") override val uid: String) extends Estimator[ALSModel]
|
|||
numUserBlocks = $(numUserBlocks), numItemBlocks = $(numItemBlocks),
|
||||
maxIter = $(maxIter), regParam = $(regParam), implicitPrefs = $(implicitPrefs),
|
||||
alpha = $(alpha), nonnegative = $(nonnegative),
|
||||
intermediateRDDStorageLevel = StorageLevel.fromString($(intermediateRDDStorageLevel)),
|
||||
finalRDDStorageLevel = StorageLevel.fromString($(finalRDDStorageLevel)),
|
||||
intermediateRDDStorageLevel = StorageLevel.fromString($(intermediateStorageLevel)),
|
||||
finalRDDStorageLevel = StorageLevel.fromString($(finalStorageLevel)),
|
||||
checkpointInterval = $(checkpointInterval), seed = $(seed))
|
||||
val userDF = userFactors.toDF("id", "features")
|
||||
val itemDF = itemFactors.toDF("id", "features")
|
||||
|
|
|
@ -525,13 +525,13 @@ class ALSStorageSuite
|
|||
|
||||
test("invalid storage params") {
|
||||
intercept[IllegalArgumentException] {
|
||||
new ALS().setIntermediateRDDStorageLevel("foo")
|
||||
new ALS().setIntermediateStorageLevel("foo")
|
||||
}
|
||||
intercept[IllegalArgumentException] {
|
||||
new ALS().setIntermediateRDDStorageLevel("NONE")
|
||||
new ALS().setIntermediateStorageLevel("NONE")
|
||||
}
|
||||
intercept[IllegalArgumentException] {
|
||||
new ALS().setFinalRDDStorageLevel("foo")
|
||||
new ALS().setFinalStorageLevel("foo")
|
||||
}
|
||||
}
|
||||
|
||||
|
@ -563,8 +563,8 @@ class ALSStorageSuite
|
|||
val nonDefaultListener = new IntermediateRDDStorageListener
|
||||
sc.addSparkListener(nonDefaultListener)
|
||||
val nonDefaultModel = als
|
||||
.setFinalRDDStorageLevel("MEMORY_ONLY")
|
||||
.setIntermediateRDDStorageLevel("DISK_ONLY")
|
||||
.setFinalStorageLevel("MEMORY_ONLY")
|
||||
.setIntermediateStorageLevel("DISK_ONLY")
|
||||
.fit(data)
|
||||
// check final factor RDD non-default storage levels
|
||||
val levels = sc.getPersistentRDDs.collect {
|
||||
|
@ -617,7 +617,7 @@ object ALSSuite {
|
|||
"alpha" -> 0.9,
|
||||
"nonnegative" -> true,
|
||||
"checkpointInterval" -> 20,
|
||||
"intermediateRDDStorageLevel" -> "MEMORY_ONLY",
|
||||
"finalRDDStorageLevel" -> "MEMORY_AND_DISK_SER"
|
||||
"intermediateStorageLevel" -> "MEMORY_ONLY",
|
||||
"finalStorageLevel" -> "MEMORY_AND_DISK_SER"
|
||||
)
|
||||
}
|
||||
|
|
|
@ -119,35 +119,35 @@ class ALS(JavaEstimator, HasCheckpointInterval, HasMaxIter, HasPredictionCol, Ha
|
|||
nonnegative = Param(Params._dummy(), "nonnegative",
|
||||
"whether to use nonnegative constraint for least squares",
|
||||
typeConverter=TypeConverters.toBoolean)
|
||||
intermediateRDDStorageLevel = Param(Params._dummy(), "intermediateRDDStorageLevel",
|
||||
"StorageLevel for intermediate RDDs. Cannot be 'NONE'. " +
|
||||
"Default: 'MEMORY_AND_DISK'.",
|
||||
typeConverter=TypeConverters.toString)
|
||||
finalRDDStorageLevel = Param(Params._dummy(), "finalRDDStorageLevel",
|
||||
"StorageLevel for ALS model factor RDDs. " +
|
||||
"Default: 'MEMORY_AND_DISK'.",
|
||||
typeConverter=TypeConverters.toString)
|
||||
intermediateStorageLevel = Param(Params._dummy(), "intermediateStorageLevel",
|
||||
"StorageLevel for intermediate datasets. Cannot be 'NONE'. " +
|
||||
"Default: 'MEMORY_AND_DISK'.",
|
||||
typeConverter=TypeConverters.toString)
|
||||
finalStorageLevel = Param(Params._dummy(), "finalStorageLevel",
|
||||
"StorageLevel for ALS model factors. " +
|
||||
"Default: 'MEMORY_AND_DISK'.",
|
||||
typeConverter=TypeConverters.toString)
|
||||
|
||||
@keyword_only
|
||||
def __init__(self, rank=10, maxIter=10, regParam=0.1, numUserBlocks=10, numItemBlocks=10,
|
||||
implicitPrefs=False, alpha=1.0, userCol="user", itemCol="item", seed=None,
|
||||
ratingCol="rating", nonnegative=False, checkpointInterval=10,
|
||||
intermediateRDDStorageLevel="MEMORY_AND_DISK",
|
||||
finalRDDStorageLevel="MEMORY_AND_DISK"):
|
||||
intermediateStorageLevel="MEMORY_AND_DISK",
|
||||
finalStorageLevel="MEMORY_AND_DISK"):
|
||||
"""
|
||||
__init__(self, rank=10, maxIter=10, regParam=0.1, numUserBlocks=10, numItemBlocks=10, \
|
||||
implicitPrefs=false, alpha=1.0, userCol="user", itemCol="item", seed=None, \
|
||||
ratingCol="rating", nonnegative=false, checkpointInterval=10, \
|
||||
intermediateRDDStorageLevel="MEMORY_AND_DISK", \
|
||||
finalRDDStorageLevel="MEMORY_AND_DISK")
|
||||
intermediateStorageLevel="MEMORY_AND_DISK", \
|
||||
finalStorageLevel="MEMORY_AND_DISK")
|
||||
"""
|
||||
super(ALS, self).__init__()
|
||||
self._java_obj = self._new_java_obj("org.apache.spark.ml.recommendation.ALS", self.uid)
|
||||
self._setDefault(rank=10, maxIter=10, regParam=0.1, numUserBlocks=10, numItemBlocks=10,
|
||||
implicitPrefs=False, alpha=1.0, userCol="user", itemCol="item", seed=None,
|
||||
ratingCol="rating", nonnegative=False, checkpointInterval=10,
|
||||
intermediateRDDStorageLevel="MEMORY_AND_DISK",
|
||||
finalRDDStorageLevel="MEMORY_AND_DISK")
|
||||
intermediateStorageLevel="MEMORY_AND_DISK",
|
||||
finalStorageLevel="MEMORY_AND_DISK")
|
||||
kwargs = self.__init__._input_kwargs
|
||||
self.setParams(**kwargs)
|
||||
|
||||
|
@ -156,14 +156,14 @@ class ALS(JavaEstimator, HasCheckpointInterval, HasMaxIter, HasPredictionCol, Ha
|
|||
def setParams(self, rank=10, maxIter=10, regParam=0.1, numUserBlocks=10, numItemBlocks=10,
|
||||
implicitPrefs=False, alpha=1.0, userCol="user", itemCol="item", seed=None,
|
||||
ratingCol="rating", nonnegative=False, checkpointInterval=10,
|
||||
intermediateRDDStorageLevel="MEMORY_AND_DISK",
|
||||
finalRDDStorageLevel="MEMORY_AND_DISK"):
|
||||
intermediateStorageLevel="MEMORY_AND_DISK",
|
||||
finalStorageLevel="MEMORY_AND_DISK"):
|
||||
"""
|
||||
setParams(self, rank=10, maxIter=10, regParam=0.1, numUserBlocks=10, numItemBlocks=10, \
|
||||
implicitPrefs=False, alpha=1.0, userCol="user", itemCol="item", seed=None, \
|
||||
ratingCol="rating", nonnegative=False, checkpointInterval=10, \
|
||||
intermediateRDDStorageLevel="MEMORY_AND_DISK", \
|
||||
finalRDDStorageLevel="MEMORY_AND_DISK")
|
||||
intermediateStorageLevel="MEMORY_AND_DISK", \
|
||||
finalStorageLevel="MEMORY_AND_DISK")
|
||||
Sets params for ALS.
|
||||
"""
|
||||
kwargs = self.setParams._input_kwargs
|
||||
|
@ -316,34 +316,34 @@ class ALS(JavaEstimator, HasCheckpointInterval, HasMaxIter, HasPredictionCol, Ha
|
|||
return self.getOrDefault(self.nonnegative)
|
||||
|
||||
@since("2.0.0")
|
||||
def setIntermediateRDDStorageLevel(self, value):
|
||||
def setIntermediateStorageLevel(self, value):
|
||||
"""
|
||||
Sets the value of :py:attr:`intermediateRDDStorageLevel`.
|
||||
Sets the value of :py:attr:`intermediateStorageLevel`.
|
||||
"""
|
||||
self._set(intermediateRDDStorageLevel=value)
|
||||
self._set(intermediateStorageLevel=value)
|
||||
return self
|
||||
|
||||
@since("2.0.0")
|
||||
def getIntermediateRDDStorageLevel(self):
|
||||
def getIntermediateStorageLevel(self):
|
||||
"""
|
||||
Gets the value of intermediateRDDStorageLevel or its default value.
|
||||
Gets the value of intermediateStorageLevel or its default value.
|
||||
"""
|
||||
return self.getOrDefault(self.intermediateRDDStorageLevel)
|
||||
return self.getOrDefault(self.intermediateStorageLevel)
|
||||
|
||||
@since("2.0.0")
|
||||
def setFinalRDDStorageLevel(self, value):
|
||||
def setFinalStorageLevel(self, value):
|
||||
"""
|
||||
Sets the value of :py:attr:`finalRDDStorageLevel`.
|
||||
Sets the value of :py:attr:`finalStorageLevel`.
|
||||
"""
|
||||
self._set(finalRDDStorageLevel=value)
|
||||
self._set(finalStorageLevel=value)
|
||||
return self
|
||||
|
||||
@since("2.0.0")
|
||||
def getFinalRDDStorageLevel(self):
|
||||
def getFinalStorageLevel(self):
|
||||
"""
|
||||
Gets the value of finalRDDStorageLevel or its default value.
|
||||
Gets the value of finalStorageLevel or its default value.
|
||||
"""
|
||||
return self.getOrDefault(self.finalRDDStorageLevel)
|
||||
return self.getOrDefault(self.finalStorageLevel)
|
||||
|
||||
|
||||
class ALSModel(JavaModel, JavaMLWritable, JavaMLReadable):
|
||||
|
|
|
@ -1012,18 +1012,18 @@ class ALSTest(PySparkTestCase):
|
|||
als = ALS().setMaxIter(1).setRank(1)
|
||||
# test default params
|
||||
als.fit(df)
|
||||
self.assertEqual(als.getIntermediateRDDStorageLevel(), "MEMORY_AND_DISK")
|
||||
self.assertEqual(als._java_obj.getIntermediateRDDStorageLevel(), "MEMORY_AND_DISK")
|
||||
self.assertEqual(als.getFinalRDDStorageLevel(), "MEMORY_AND_DISK")
|
||||
self.assertEqual(als._java_obj.getFinalRDDStorageLevel(), "MEMORY_AND_DISK")
|
||||
self.assertEqual(als.getIntermediateStorageLevel(), "MEMORY_AND_DISK")
|
||||
self.assertEqual(als._java_obj.getIntermediateStorageLevel(), "MEMORY_AND_DISK")
|
||||
self.assertEqual(als.getFinalStorageLevel(), "MEMORY_AND_DISK")
|
||||
self.assertEqual(als._java_obj.getFinalStorageLevel(), "MEMORY_AND_DISK")
|
||||
# test non-default params
|
||||
als.setIntermediateRDDStorageLevel("MEMORY_ONLY_2")
|
||||
als.setFinalRDDStorageLevel("DISK_ONLY")
|
||||
als.setIntermediateStorageLevel("MEMORY_ONLY_2")
|
||||
als.setFinalStorageLevel("DISK_ONLY")
|
||||
als.fit(df)
|
||||
self.assertEqual(als.getIntermediateRDDStorageLevel(), "MEMORY_ONLY_2")
|
||||
self.assertEqual(als._java_obj.getIntermediateRDDStorageLevel(), "MEMORY_ONLY_2")
|
||||
self.assertEqual(als.getFinalRDDStorageLevel(), "DISK_ONLY")
|
||||
self.assertEqual(als._java_obj.getFinalRDDStorageLevel(), "DISK_ONLY")
|
||||
self.assertEqual(als.getIntermediateStorageLevel(), "MEMORY_ONLY_2")
|
||||
self.assertEqual(als._java_obj.getIntermediateStorageLevel(), "MEMORY_ONLY_2")
|
||||
self.assertEqual(als.getFinalStorageLevel(), "DISK_ONLY")
|
||||
self.assertEqual(als._java_obj.getFinalStorageLevel(), "DISK_ONLY")
|
||||
|
||||
|
||||
if __name__ == "__main__":
|
||||
|
|
Loading…
Reference in a new issue