[SPARK-15750][MLLIB][PYSPARK] Constructing FPGrowth fails when no numPartitions specified in pyspark

## What changes were proposed in this pull request?

Change FPGrowth from private to private[spark]. If no numPartitions is specified, then default value -1 is used. But -1 is only valid in the construction function of FPGrowth, but not in setNumPartitions. So I make this change and use the constructor directly rather than using set method.
## How was this patch tested?

Unit test is added

Author: Jeff Zhang <zjffdu@apache.org>

Closes #13493 from zjffdu/SPARK-15750.
This commit is contained in:
Jeff Zhang 2018-05-07 14:47:58 -07:00 committed by Joseph K. Bradley
parent d83e963724
commit 56a52e0a58
3 changed files with 14 additions and 5 deletions

View file

@ -572,10 +572,7 @@ private[python] class PythonMLLibAPI extends Serializable {
data: JavaRDD[java.lang.Iterable[Any]],
minSupport: Double,
numPartitions: Int): FPGrowthModel[Any] = {
val fpg = new FPGrowth()
.setMinSupport(minSupport)
.setNumPartitions(numPartitions)
val fpg = new FPGrowth(minSupport, numPartitions)
val model = fpg.run(data.rdd.map(_.asScala.toArray))
new FPGrowthModelWrapper(model)
}

View file

@ -162,7 +162,7 @@ object FPGrowthModel extends Loader[FPGrowthModel[_]] {
*
*/
@Since("1.3.0")
class FPGrowth private (
class FPGrowth private[spark] (
private var minSupport: Double,
private var numPartitions: Int) extends Logging with Serializable {

View file

@ -57,6 +57,7 @@ from pyspark.mllib.linalg import Vector, SparseVector, DenseVector, VectorUDT, _
DenseMatrix, SparseMatrix, Vectors, Matrices, MatrixUDT
from pyspark.mllib.linalg.distributed import RowMatrix
from pyspark.mllib.classification import StreamingLogisticRegressionWithSGD
from pyspark.mllib.fpm import FPGrowth
from pyspark.mllib.recommendation import Rating
from pyspark.mllib.regression import LabeledPoint, StreamingLinearRegressionWithSGD
from pyspark.mllib.random import RandomRDDs
@ -1762,6 +1763,17 @@ class DimensionalityReductionTests(MLlibTestCase):
self.assertEqualUpToSign(pcs.toArray()[:, k - 1], expected_pcs[:, k - 1])
class FPGrowthTest(MLlibTestCase):
def test_fpgrowth(self):
data = [["a", "b", "c"], ["a", "b", "d", "e"], ["a", "c", "e"], ["a", "c", "f"]]
rdd = self.sc.parallelize(data, 2)
model1 = FPGrowth.train(rdd, 0.6, 2)
# use default data partition number when numPartitions is not specified
model2 = FPGrowth.train(rdd, 0.6)
self.assertEqual(sorted(model1.freqItemsets().collect()),
sorted(model2.freqItemsets().collect()))
if __name__ == "__main__":
from pyspark.mllib.tests import *
if not _have_scipy: