027ed2d11b
## What changes were proposed in this pull request? The hashSeed method allocates 64 bytes instead of 8. Other bytes are always zeros (thanks to default behavior of ByteBuffer). And they could be excluded from hash calculation because they don't differentiate inputs. ## How was this patch tested? By running the existing tests - XORShiftRandomSuite Closes #20793 from MaxGekk/hash-buff-size. Lead-authored-by: Maxim Gekk <maxim.gekk@databricks.com> Co-authored-by: Maxim Gekk <max.gekk@gmail.com> Signed-off-by: Sean Owen <sean.owen@databricks.com>
341 lines
15 KiB
Python
341 lines
15 KiB
Python
#
|
|
# Licensed to the Apache Software Foundation (ASF) under one or more
|
|
# contributor license agreements. See the NOTICE file distributed with
|
|
# this work for additional information regarding copyright ownership.
|
|
# The ASF licenses this file to You under the Apache License, Version 2.0
|
|
# (the "License"); you may not use this file except in compliance with
|
|
# the License. You may obtain a copy of the License at
|
|
#
|
|
# http://www.apache.org/licenses/LICENSE-2.0
|
|
#
|
|
# Unless required by applicable law or agreed to in writing, software
|
|
# distributed under the License is distributed on an "AS IS" BASIS,
|
|
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
|
|
# See the License for the specific language governing permissions and
|
|
# limitations under the License.
|
|
#
|
|
|
|
from shutil import rmtree
|
|
import tempfile
|
|
import unittest
|
|
|
|
import numpy as np
|
|
|
|
from pyspark.ml.classification import DecisionTreeClassifier, LogisticRegression, \
|
|
MultilayerPerceptronClassifier, OneVsRest
|
|
from pyspark.ml.clustering import DistributedLDAModel, KMeans, LocalLDAModel, LDA, LDAModel
|
|
from pyspark.ml.fpm import FPGrowth
|
|
from pyspark.ml.linalg import Matrices, Vectors
|
|
from pyspark.ml.recommendation import ALS
|
|
from pyspark.ml.regression import GeneralizedLinearRegression, LinearRegression
|
|
from pyspark.sql import Row
|
|
from pyspark.testing.mlutils import SparkSessionTestCase
|
|
|
|
|
|
class LogisticRegressionTest(SparkSessionTestCase):
|
|
|
|
def test_binomial_logistic_regression_with_bound(self):
|
|
|
|
df = self.spark.createDataFrame(
|
|
[(1.0, 1.0, Vectors.dense(0.0, 5.0)),
|
|
(0.0, 2.0, Vectors.dense(1.0, 2.0)),
|
|
(1.0, 3.0, Vectors.dense(2.0, 1.0)),
|
|
(0.0, 4.0, Vectors.dense(3.0, 3.0)), ], ["label", "weight", "features"])
|
|
|
|
lor = LogisticRegression(regParam=0.01, weightCol="weight",
|
|
lowerBoundsOnCoefficients=Matrices.dense(1, 2, [-1.0, -1.0]),
|
|
upperBoundsOnIntercepts=Vectors.dense(0.0))
|
|
model = lor.fit(df)
|
|
self.assertTrue(
|
|
np.allclose(model.coefficients.toArray(), [-0.2944, -0.0484], atol=1E-4))
|
|
self.assertTrue(np.isclose(model.intercept, 0.0, atol=1E-4))
|
|
|
|
def test_multinomial_logistic_regression_with_bound(self):
|
|
|
|
data_path = "data/mllib/sample_multiclass_classification_data.txt"
|
|
df = self.spark.read.format("libsvm").load(data_path)
|
|
|
|
lor = LogisticRegression(regParam=0.01,
|
|
lowerBoundsOnCoefficients=Matrices.dense(3, 4, range(12)),
|
|
upperBoundsOnIntercepts=Vectors.dense(0.0, 0.0, 0.0))
|
|
model = lor.fit(df)
|
|
expected = [[4.593, 4.5516, 9.0099, 12.2904],
|
|
[1.0, 8.1093, 7.0, 10.0],
|
|
[3.041, 5.0, 8.0, 11.0]]
|
|
for i in range(0, len(expected)):
|
|
self.assertTrue(
|
|
np.allclose(model.coefficientMatrix.toArray()[i], expected[i], atol=1E-4))
|
|
self.assertTrue(
|
|
np.allclose(model.interceptVector.toArray(), [-0.9057, -1.1392, -0.0033], atol=1E-4))
|
|
|
|
|
|
class MultilayerPerceptronClassifierTest(SparkSessionTestCase):
|
|
|
|
def test_raw_and_probability_prediction(self):
|
|
|
|
data_path = "data/mllib/sample_multiclass_classification_data.txt"
|
|
df = self.spark.read.format("libsvm").load(data_path)
|
|
|
|
mlp = MultilayerPerceptronClassifier(maxIter=100, layers=[4, 5, 4, 3],
|
|
blockSize=128, seed=123)
|
|
model = mlp.fit(df)
|
|
test = self.sc.parallelize([Row(features=Vectors.dense(0.1, 0.1, 0.25, 0.25))]).toDF()
|
|
result = model.transform(test).head()
|
|
expected_prediction = 2.0
|
|
expected_probability = [0.0, 0.0, 1.0]
|
|
expected_rawPrediction = [-11.6081922998, -8.15827998691, 22.17757045]
|
|
self.assertTrue(result.prediction, expected_prediction)
|
|
self.assertTrue(np.allclose(result.probability, expected_probability, atol=1E-4))
|
|
self.assertTrue(np.allclose(result.rawPrediction, expected_rawPrediction, atol=1E-4))
|
|
|
|
|
|
class OneVsRestTests(SparkSessionTestCase):
|
|
|
|
def test_copy(self):
|
|
df = self.spark.createDataFrame([(0.0, Vectors.dense(1.0, 0.8)),
|
|
(1.0, Vectors.sparse(2, [], [])),
|
|
(2.0, Vectors.dense(0.5, 0.5))],
|
|
["label", "features"])
|
|
lr = LogisticRegression(maxIter=5, regParam=0.01)
|
|
ovr = OneVsRest(classifier=lr)
|
|
ovr1 = ovr.copy({lr.maxIter: 10})
|
|
self.assertEqual(ovr.getClassifier().getMaxIter(), 5)
|
|
self.assertEqual(ovr1.getClassifier().getMaxIter(), 10)
|
|
model = ovr.fit(df)
|
|
model1 = model.copy({model.predictionCol: "indexed"})
|
|
self.assertEqual(model1.getPredictionCol(), "indexed")
|
|
|
|
def test_output_columns(self):
|
|
df = self.spark.createDataFrame([(0.0, Vectors.dense(1.0, 0.8)),
|
|
(1.0, Vectors.sparse(2, [], [])),
|
|
(2.0, Vectors.dense(0.5, 0.5))],
|
|
["label", "features"])
|
|
lr = LogisticRegression(maxIter=5, regParam=0.01)
|
|
ovr = OneVsRest(classifier=lr, parallelism=1)
|
|
model = ovr.fit(df)
|
|
output = model.transform(df)
|
|
self.assertEqual(output.columns, ["label", "features", "rawPrediction", "prediction"])
|
|
|
|
def test_parallelism_doesnt_change_output(self):
|
|
df = self.spark.createDataFrame([(0.0, Vectors.dense(1.0, 0.8)),
|
|
(1.0, Vectors.sparse(2, [], [])),
|
|
(2.0, Vectors.dense(0.5, 0.5))],
|
|
["label", "features"])
|
|
ovrPar1 = OneVsRest(classifier=LogisticRegression(maxIter=5, regParam=.01), parallelism=1)
|
|
modelPar1 = ovrPar1.fit(df)
|
|
ovrPar2 = OneVsRest(classifier=LogisticRegression(maxIter=5, regParam=.01), parallelism=2)
|
|
modelPar2 = ovrPar2.fit(df)
|
|
for i, model in enumerate(modelPar1.models):
|
|
self.assertTrue(np.allclose(model.coefficients.toArray(),
|
|
modelPar2.models[i].coefficients.toArray(), atol=1E-4))
|
|
self.assertTrue(np.allclose(model.intercept, modelPar2.models[i].intercept, atol=1E-4))
|
|
|
|
def test_support_for_weightCol(self):
|
|
df = self.spark.createDataFrame([(0.0, Vectors.dense(1.0, 0.8), 1.0),
|
|
(1.0, Vectors.sparse(2, [], []), 1.0),
|
|
(2.0, Vectors.dense(0.5, 0.5), 1.0)],
|
|
["label", "features", "weight"])
|
|
# classifier inherits hasWeightCol
|
|
lr = LogisticRegression(maxIter=5, regParam=0.01)
|
|
ovr = OneVsRest(classifier=lr, weightCol="weight")
|
|
self.assertIsNotNone(ovr.fit(df))
|
|
# classifier doesn't inherit hasWeightCol
|
|
dt = DecisionTreeClassifier()
|
|
ovr2 = OneVsRest(classifier=dt, weightCol="weight")
|
|
self.assertIsNotNone(ovr2.fit(df))
|
|
|
|
|
|
class KMeansTests(SparkSessionTestCase):
|
|
|
|
def test_kmeans_cosine_distance(self):
|
|
data = [(Vectors.dense([1.0, 1.0]),), (Vectors.dense([10.0, 10.0]),),
|
|
(Vectors.dense([1.0, 0.5]),), (Vectors.dense([10.0, 4.4]),),
|
|
(Vectors.dense([-1.0, 1.0]),), (Vectors.dense([-100.0, 90.0]),)]
|
|
df = self.spark.createDataFrame(data, ["features"])
|
|
kmeans = KMeans(k=3, seed=1, distanceMeasure="cosine")
|
|
model = kmeans.fit(df)
|
|
result = model.transform(df).collect()
|
|
self.assertTrue(result[0].prediction == result[1].prediction)
|
|
self.assertTrue(result[2].prediction == result[3].prediction)
|
|
self.assertTrue(result[4].prediction == result[5].prediction)
|
|
|
|
|
|
class LDATest(SparkSessionTestCase):
|
|
|
|
def _compare(self, m1, m2):
|
|
"""
|
|
Temp method for comparing instances.
|
|
TODO: Replace with generic implementation once SPARK-14706 is merged.
|
|
"""
|
|
self.assertEqual(m1.uid, m2.uid)
|
|
self.assertEqual(type(m1), type(m2))
|
|
self.assertEqual(len(m1.params), len(m2.params))
|
|
for p in m1.params:
|
|
if m1.isDefined(p):
|
|
self.assertEqual(m1.getOrDefault(p), m2.getOrDefault(p))
|
|
self.assertEqual(p.parent, m2.getParam(p.name).parent)
|
|
if isinstance(m1, LDAModel):
|
|
self.assertEqual(m1.vocabSize(), m2.vocabSize())
|
|
self.assertEqual(m1.topicsMatrix(), m2.topicsMatrix())
|
|
|
|
def test_persistence(self):
|
|
# Test save/load for LDA, LocalLDAModel, DistributedLDAModel.
|
|
df = self.spark.createDataFrame([
|
|
[1, Vectors.dense([0.0, 1.0])],
|
|
[2, Vectors.sparse(2, {0: 1.0})],
|
|
], ["id", "features"])
|
|
# Fit model
|
|
lda = LDA(k=2, seed=1, optimizer="em")
|
|
distributedModel = lda.fit(df)
|
|
self.assertTrue(distributedModel.isDistributed())
|
|
localModel = distributedModel.toLocal()
|
|
self.assertFalse(localModel.isDistributed())
|
|
# Define paths
|
|
path = tempfile.mkdtemp()
|
|
lda_path = path + "/lda"
|
|
dist_model_path = path + "/distLDAModel"
|
|
local_model_path = path + "/localLDAModel"
|
|
# Test LDA
|
|
lda.save(lda_path)
|
|
lda2 = LDA.load(lda_path)
|
|
self._compare(lda, lda2)
|
|
# Test DistributedLDAModel
|
|
distributedModel.save(dist_model_path)
|
|
distributedModel2 = DistributedLDAModel.load(dist_model_path)
|
|
self._compare(distributedModel, distributedModel2)
|
|
# Test LocalLDAModel
|
|
localModel.save(local_model_path)
|
|
localModel2 = LocalLDAModel.load(local_model_path)
|
|
self._compare(localModel, localModel2)
|
|
# Clean up
|
|
try:
|
|
rmtree(path)
|
|
except OSError:
|
|
pass
|
|
|
|
|
|
class FPGrowthTests(SparkSessionTestCase):
|
|
def setUp(self):
|
|
super(FPGrowthTests, self).setUp()
|
|
self.data = self.spark.createDataFrame(
|
|
[([1, 2], ), ([1, 2], ), ([1, 2, 3], ), ([1, 3], )],
|
|
["items"])
|
|
|
|
def test_association_rules(self):
|
|
fp = FPGrowth()
|
|
fpm = fp.fit(self.data)
|
|
|
|
expected_association_rules = self.spark.createDataFrame(
|
|
[([3], [1], 1.0, 1.0), ([2], [1], 1.0, 1.0)],
|
|
["antecedent", "consequent", "confidence", "lift"]
|
|
)
|
|
actual_association_rules = fpm.associationRules
|
|
|
|
self.assertEqual(actual_association_rules.subtract(expected_association_rules).count(), 0)
|
|
self.assertEqual(expected_association_rules.subtract(actual_association_rules).count(), 0)
|
|
|
|
def test_freq_itemsets(self):
|
|
fp = FPGrowth()
|
|
fpm = fp.fit(self.data)
|
|
|
|
expected_freq_itemsets = self.spark.createDataFrame(
|
|
[([1], 4), ([2], 3), ([2, 1], 3), ([3], 2), ([3, 1], 2)],
|
|
["items", "freq"]
|
|
)
|
|
actual_freq_itemsets = fpm.freqItemsets
|
|
|
|
self.assertEqual(actual_freq_itemsets.subtract(expected_freq_itemsets).count(), 0)
|
|
self.assertEqual(expected_freq_itemsets.subtract(actual_freq_itemsets).count(), 0)
|
|
|
|
def tearDown(self):
|
|
del self.data
|
|
|
|
|
|
class ALSTest(SparkSessionTestCase):
|
|
|
|
def test_storage_levels(self):
|
|
df = self.spark.createDataFrame(
|
|
[(0, 0, 4.0), (0, 1, 2.0), (1, 1, 3.0), (1, 2, 4.0), (2, 1, 1.0), (2, 2, 5.0)],
|
|
["user", "item", "rating"])
|
|
als = ALS().setMaxIter(1).setRank(1)
|
|
# test default params
|
|
als.fit(df)
|
|
self.assertEqual(als.getIntermediateStorageLevel(), "MEMORY_AND_DISK")
|
|
self.assertEqual(als._java_obj.getIntermediateStorageLevel(), "MEMORY_AND_DISK")
|
|
self.assertEqual(als.getFinalStorageLevel(), "MEMORY_AND_DISK")
|
|
self.assertEqual(als._java_obj.getFinalStorageLevel(), "MEMORY_AND_DISK")
|
|
# test non-default params
|
|
als.setIntermediateStorageLevel("MEMORY_ONLY_2")
|
|
als.setFinalStorageLevel("DISK_ONLY")
|
|
als.fit(df)
|
|
self.assertEqual(als.getIntermediateStorageLevel(), "MEMORY_ONLY_2")
|
|
self.assertEqual(als._java_obj.getIntermediateStorageLevel(), "MEMORY_ONLY_2")
|
|
self.assertEqual(als.getFinalStorageLevel(), "DISK_ONLY")
|
|
self.assertEqual(als._java_obj.getFinalStorageLevel(), "DISK_ONLY")
|
|
|
|
|
|
class GeneralizedLinearRegressionTest(SparkSessionTestCase):
|
|
|
|
def test_tweedie_distribution(self):
|
|
|
|
df = self.spark.createDataFrame(
|
|
[(1.0, Vectors.dense(0.0, 0.0)),
|
|
(1.0, Vectors.dense(1.0, 2.0)),
|
|
(2.0, Vectors.dense(0.0, 0.0)),
|
|
(2.0, Vectors.dense(1.0, 1.0)), ], ["label", "features"])
|
|
|
|
glr = GeneralizedLinearRegression(family="tweedie", variancePower=1.6)
|
|
model = glr.fit(df)
|
|
self.assertTrue(np.allclose(model.coefficients.toArray(), [-0.4645, 0.3402], atol=1E-4))
|
|
self.assertTrue(np.isclose(model.intercept, 0.7841, atol=1E-4))
|
|
|
|
model2 = glr.setLinkPower(-1.0).fit(df)
|
|
self.assertTrue(np.allclose(model2.coefficients.toArray(), [-0.6667, 0.5], atol=1E-4))
|
|
self.assertTrue(np.isclose(model2.intercept, 0.6667, atol=1E-4))
|
|
|
|
def test_offset(self):
|
|
|
|
df = self.spark.createDataFrame(
|
|
[(0.2, 1.0, 2.0, Vectors.dense(0.0, 5.0)),
|
|
(0.5, 2.1, 0.5, Vectors.dense(1.0, 2.0)),
|
|
(0.9, 0.4, 1.0, Vectors.dense(2.0, 1.0)),
|
|
(0.7, 0.7, 0.0, Vectors.dense(3.0, 3.0))], ["label", "weight", "offset", "features"])
|
|
|
|
glr = GeneralizedLinearRegression(family="poisson", weightCol="weight", offsetCol="offset")
|
|
model = glr.fit(df)
|
|
self.assertTrue(np.allclose(model.coefficients.toArray(), [0.664647, -0.3192581],
|
|
atol=1E-4))
|
|
self.assertTrue(np.isclose(model.intercept, -1.561613, atol=1E-4))
|
|
|
|
|
|
class LinearRegressionTest(SparkSessionTestCase):
|
|
|
|
def test_linear_regression_with_huber_loss(self):
|
|
|
|
data_path = "data/mllib/sample_linear_regression_data.txt"
|
|
df = self.spark.read.format("libsvm").load(data_path)
|
|
|
|
lir = LinearRegression(loss="huber", epsilon=2.0)
|
|
model = lir.fit(df)
|
|
|
|
expectedCoefficients = [0.136, 0.7648, -0.7761, 2.4236, 0.537,
|
|
1.2612, -0.333, -0.5694, -0.6311, 0.6053]
|
|
expectedIntercept = 0.1607
|
|
expectedScale = 9.758
|
|
|
|
self.assertTrue(
|
|
np.allclose(model.coefficients.toArray(), expectedCoefficients, atol=1E-3))
|
|
self.assertTrue(np.isclose(model.intercept, expectedIntercept, atol=1E-3))
|
|
self.assertTrue(np.isclose(model.scale, expectedScale, atol=1E-3))
|
|
|
|
|
|
if __name__ == "__main__":
|
|
from pyspark.ml.tests.test_algorithms import *
|
|
|
|
try:
|
|
import xmlrunner
|
|
testRunner = xmlrunner.XMLTestRunner(output='target/test-reports')
|
|
except ImportError:
|
|
testRunner = None
|
|
unittest.main(testRunner=testRunner, verbosity=2)
|