9fcf0ea718
Disallow the use of unused imports: - Unnecessary increases the memory footprint of the application - Removes the imports that are required for the examples in the docstring from the file-scope to the example itself. This keeps the files itself clean, and gives a more complete example as it also includes the imports :) ``` fokkodriesprongFan spark % flake8 python | grep -i "imported but unused" python/pyspark/cloudpickle.py:46:1: F401 'functools.partial' imported but unused python/pyspark/cloudpickle.py:55:1: F401 'traceback' imported but unused python/pyspark/heapq3.py:868:5: F401 '_heapq.*' imported but unused python/pyspark/__init__.py:61:1: F401 'pyspark.version.__version__' imported but unused python/pyspark/__init__.py:62:1: F401 'pyspark._globals._NoValue' imported but unused python/pyspark/__init__.py:115:1: F401 'pyspark.sql.SQLContext' imported but unused python/pyspark/__init__.py:115:1: F401 'pyspark.sql.HiveContext' imported but unused python/pyspark/__init__.py:115:1: F401 'pyspark.sql.Row' imported but unused python/pyspark/rdd.py:21:1: F401 're' imported but unused python/pyspark/rdd.py:29:1: F401 'tempfile.NamedTemporaryFile' imported but unused python/pyspark/mllib/regression.py:26:1: F401 'pyspark.mllib.linalg.SparseVector' imported but unused python/pyspark/mllib/clustering.py:28:1: F401 'pyspark.mllib.linalg.SparseVector' imported but unused python/pyspark/mllib/clustering.py:28:1: F401 'pyspark.mllib.linalg.DenseVector' imported but unused python/pyspark/mllib/classification.py:26:1: F401 'pyspark.mllib.linalg.SparseVector' imported but unused python/pyspark/mllib/feature.py:28:1: F401 'pyspark.mllib.linalg.DenseVector' imported but unused python/pyspark/mllib/feature.py:28:1: F401 'pyspark.mllib.linalg.SparseVector' imported but unused python/pyspark/mllib/feature.py:30:1: F401 'pyspark.mllib.regression.LabeledPoint' imported but unused python/pyspark/mllib/tests/test_linalg.py:18:1: F401 'sys' imported but unused python/pyspark/mllib/tests/test_linalg.py:642:5: F401 'pyspark.mllib.tests.test_linalg.*' imported but unused python/pyspark/mllib/tests/test_feature.py:21:1: F401 'numpy.random' imported but unused python/pyspark/mllib/tests/test_feature.py:21:1: F401 'numpy.exp' imported but unused python/pyspark/mllib/tests/test_feature.py:23:1: F401 'pyspark.mllib.linalg.Vector' imported but unused python/pyspark/mllib/tests/test_feature.py:23:1: F401 'pyspark.mllib.linalg.VectorUDT' imported but unused python/pyspark/mllib/tests/test_feature.py:185:5: F401 'pyspark.mllib.tests.test_feature.*' imported but unused python/pyspark/mllib/tests/test_util.py:97:5: F401 'pyspark.mllib.tests.test_util.*' imported but unused python/pyspark/mllib/tests/test_stat.py:23:1: F401 'pyspark.mllib.linalg.Vector' imported but unused python/pyspark/mllib/tests/test_stat.py:23:1: F401 'pyspark.mllib.linalg.SparseVector' imported but unused python/pyspark/mllib/tests/test_stat.py:23:1: F401 'pyspark.mllib.linalg.DenseVector' imported but unused python/pyspark/mllib/tests/test_stat.py:23:1: F401 'pyspark.mllib.linalg.VectorUDT' imported but unused python/pyspark/mllib/tests/test_stat.py:23:1: F401 'pyspark.mllib.linalg._convert_to_vector' imported but unused python/pyspark/mllib/tests/test_stat.py:23:1: F401 'pyspark.mllib.linalg.DenseMatrix' imported but unused python/pyspark/mllib/tests/test_stat.py:23:1: F401 'pyspark.mllib.linalg.SparseMatrix' imported but unused python/pyspark/mllib/tests/test_stat.py:23:1: F401 'pyspark.mllib.linalg.MatrixUDT' imported but unused python/pyspark/mllib/tests/test_stat.py:181:5: F401 'pyspark.mllib.tests.test_stat.*' imported but unused python/pyspark/mllib/tests/test_streaming_algorithms.py:18:1: F401 'time.time' imported but unused python/pyspark/mllib/tests/test_streaming_algorithms.py:18:1: F401 'time.sleep' imported but unused python/pyspark/mllib/tests/test_streaming_algorithms.py:470:5: F401 'pyspark.mllib.tests.test_streaming_algorithms.*' imported but unused python/pyspark/mllib/tests/test_algorithms.py:295:5: F401 'pyspark.mllib.tests.test_algorithms.*' imported but unused python/pyspark/tests/test_serializers.py:90:13: F401 'xmlrunner' imported but unused python/pyspark/tests/test_rdd.py:21:1: F401 'sys' imported but unused python/pyspark/tests/test_rdd.py:29:1: F401 'pyspark.resource.ResourceProfile' imported but unused python/pyspark/tests/test_rdd.py:885:5: F401 'pyspark.tests.test_rdd.*' imported but unused python/pyspark/tests/test_readwrite.py:19:1: F401 'sys' imported but unused python/pyspark/tests/test_readwrite.py:22:1: F401 'array.array' imported but unused python/pyspark/tests/test_readwrite.py:309:5: F401 'pyspark.tests.test_readwrite.*' imported but unused python/pyspark/tests/test_join.py:62:5: F401 'pyspark.tests.test_join.*' imported but unused python/pyspark/tests/test_taskcontext.py:19:1: F401 'shutil' imported but unused python/pyspark/tests/test_taskcontext.py:325:5: F401 'pyspark.tests.test_taskcontext.*' imported but unused python/pyspark/tests/test_conf.py:36:5: F401 'pyspark.tests.test_conf.*' imported but unused python/pyspark/tests/test_broadcast.py:148:5: F401 'pyspark.tests.test_broadcast.*' imported but unused python/pyspark/tests/test_daemon.py:76:5: F401 'pyspark.tests.test_daemon.*' imported but unused python/pyspark/tests/test_util.py:77:5: F401 'pyspark.tests.test_util.*' imported but unused python/pyspark/tests/test_pin_thread.py:19:1: F401 'random' imported but unused python/pyspark/tests/test_pin_thread.py:149:5: F401 'pyspark.tests.test_pin_thread.*' imported but unused python/pyspark/tests/test_worker.py:19:1: F401 'sys' imported but unused python/pyspark/tests/test_worker.py:26:5: F401 'resource' imported but unused python/pyspark/tests/test_worker.py:203:5: F401 'pyspark.tests.test_worker.*' imported but unused python/pyspark/tests/test_profiler.py:101:5: F401 'pyspark.tests.test_profiler.*' imported but unused python/pyspark/tests/test_shuffle.py:18:1: F401 'sys' imported but unused python/pyspark/tests/test_shuffle.py:171:5: F401 'pyspark.tests.test_shuffle.*' imported but unused python/pyspark/tests/test_rddbarrier.py:43:5: F401 'pyspark.tests.test_rddbarrier.*' imported but unused python/pyspark/tests/test_context.py:129:13: F401 'userlibrary.UserClass' imported but unused python/pyspark/tests/test_context.py:140:13: F401 'userlib.UserClass' imported but unused python/pyspark/tests/test_context.py:310:5: F401 'pyspark.tests.test_context.*' imported but unused python/pyspark/tests/test_appsubmit.py:241:5: F401 'pyspark.tests.test_appsubmit.*' imported but unused python/pyspark/streaming/dstream.py:18:1: F401 'sys' imported but unused python/pyspark/streaming/tests/test_dstream.py:27:1: F401 'pyspark.RDD' imported but unused python/pyspark/streaming/tests/test_dstream.py:647:5: F401 'pyspark.streaming.tests.test_dstream.*' imported but unused python/pyspark/streaming/tests/test_kinesis.py:83:5: F401 'pyspark.streaming.tests.test_kinesis.*' imported but unused python/pyspark/streaming/tests/test_listener.py:152:5: F401 'pyspark.streaming.tests.test_listener.*' imported but unused python/pyspark/streaming/tests/test_context.py:178:5: F401 'pyspark.streaming.tests.test_context.*' imported but unused python/pyspark/testing/utils.py:30:5: F401 'scipy.sparse' imported but unused python/pyspark/testing/utils.py:36:5: F401 'numpy as np' imported but unused python/pyspark/ml/regression.py:25:1: F401 'pyspark.ml.tree._TreeEnsembleParams' imported but unused python/pyspark/ml/regression.py:25:1: F401 'pyspark.ml.tree._HasVarianceImpurity' imported but unused python/pyspark/ml/regression.py:29:1: F401 'pyspark.ml.wrapper.JavaParams' imported but unused python/pyspark/ml/util.py:19:1: F401 'sys' imported but unused python/pyspark/ml/__init__.py:25:1: F401 'pyspark.ml.pipeline' imported but unused python/pyspark/ml/pipeline.py:18:1: F401 'sys' imported but unused python/pyspark/ml/stat.py:22:1: F401 'pyspark.ml.linalg.DenseMatrix' imported but unused python/pyspark/ml/stat.py:22:1: F401 'pyspark.ml.linalg.Vectors' imported but unused python/pyspark/ml/tests/test_training_summary.py:18:1: F401 'sys' imported but unused python/pyspark/ml/tests/test_training_summary.py:364:5: F401 'pyspark.ml.tests.test_training_summary.*' imported but unused python/pyspark/ml/tests/test_linalg.py:381:5: F401 'pyspark.ml.tests.test_linalg.*' imported but unused python/pyspark/ml/tests/test_tuning.py:427:9: F401 'pyspark.sql.functions as F' imported but unused python/pyspark/ml/tests/test_tuning.py:757:5: F401 'pyspark.ml.tests.test_tuning.*' imported but unused python/pyspark/ml/tests/test_wrapper.py:120:5: F401 'pyspark.ml.tests.test_wrapper.*' imported but unused python/pyspark/ml/tests/test_feature.py:19:1: F401 'sys' imported but unused python/pyspark/ml/tests/test_feature.py:304:5: F401 'pyspark.ml.tests.test_feature.*' imported but unused python/pyspark/ml/tests/test_image.py:19:1: F401 'py4j' imported but unused python/pyspark/ml/tests/test_image.py:22:1: F401 'pyspark.testing.mlutils.PySparkTestCase' imported but unused python/pyspark/ml/tests/test_image.py:71:5: F401 'pyspark.ml.tests.test_image.*' imported but unused python/pyspark/ml/tests/test_persistence.py:456:5: F401 'pyspark.ml.tests.test_persistence.*' imported but unused python/pyspark/ml/tests/test_evaluation.py:56:5: F401 'pyspark.ml.tests.test_evaluation.*' imported but unused python/pyspark/ml/tests/test_stat.py:43:5: F401 'pyspark.ml.tests.test_stat.*' imported but unused python/pyspark/ml/tests/test_base.py:70:5: F401 'pyspark.ml.tests.test_base.*' imported but unused python/pyspark/ml/tests/test_param.py:20:1: F401 'sys' imported but unused python/pyspark/ml/tests/test_param.py:375:5: F401 'pyspark.ml.tests.test_param.*' imported but unused python/pyspark/ml/tests/test_pipeline.py:62:5: F401 'pyspark.ml.tests.test_pipeline.*' imported but unused python/pyspark/ml/tests/test_algorithms.py:333:5: F401 'pyspark.ml.tests.test_algorithms.*' imported but unused python/pyspark/ml/param/__init__.py:18:1: F401 'sys' imported but unused python/pyspark/resource/tests/test_resources.py:17:1: F401 'random' imported but unused python/pyspark/resource/tests/test_resources.py:20:1: F401 'pyspark.resource.ResourceProfile' imported but unused python/pyspark/resource/tests/test_resources.py:75:5: F401 'pyspark.resource.tests.test_resources.*' imported but unused python/pyspark/sql/functions.py:32:1: F401 'pyspark.sql.udf.UserDefinedFunction' imported but unused python/pyspark/sql/functions.py:34:1: F401 'pyspark.sql.pandas.functions.pandas_udf' imported but unused python/pyspark/sql/session.py:30:1: F401 'pyspark.sql.types.Row' imported but unused python/pyspark/sql/session.py:30:1: F401 'pyspark.sql.types.StringType' imported but unused python/pyspark/sql/readwriter.py:1084:5: F401 'pyspark.sql.Row' imported but unused python/pyspark/sql/context.py:26:1: F401 'pyspark.sql.types.IntegerType' imported but unused python/pyspark/sql/context.py:26:1: F401 'pyspark.sql.types.Row' imported but unused python/pyspark/sql/context.py:26:1: F401 'pyspark.sql.types.StringType' imported but unused python/pyspark/sql/context.py:27:1: F401 'pyspark.sql.udf.UDFRegistration' imported but unused python/pyspark/sql/streaming.py:1212:5: F401 'pyspark.sql.Row' imported but unused python/pyspark/sql/tests/test_utils.py:55:5: F401 'pyspark.sql.tests.test_utils.*' imported but unused python/pyspark/sql/tests/test_pandas_map.py:18:1: F401 'sys' imported but unused python/pyspark/sql/tests/test_pandas_map.py:22:1: F401 'pyspark.sql.functions.pandas_udf' imported but unused python/pyspark/sql/tests/test_pandas_map.py:22:1: F401 'pyspark.sql.functions.PandasUDFType' imported but unused python/pyspark/sql/tests/test_pandas_map.py:119:5: F401 'pyspark.sql.tests.test_pandas_map.*' imported but unused python/pyspark/sql/tests/test_catalog.py:193:5: F401 'pyspark.sql.tests.test_catalog.*' imported but unused python/pyspark/sql/tests/test_group.py:39:5: F401 'pyspark.sql.tests.test_group.*' imported but unused python/pyspark/sql/tests/test_session.py:361:5: F401 'pyspark.sql.tests.test_session.*' imported but unused python/pyspark/sql/tests/test_conf.py:49:5: F401 'pyspark.sql.tests.test_conf.*' imported but unused python/pyspark/sql/tests/test_pandas_cogrouped_map.py:19:1: F401 'sys' imported but unused python/pyspark/sql/tests/test_pandas_cogrouped_map.py:21:1: F401 'pyspark.sql.functions.sum' imported but unused python/pyspark/sql/tests/test_pandas_cogrouped_map.py:21:1: F401 'pyspark.sql.functions.PandasUDFType' imported but unused python/pyspark/sql/tests/test_pandas_cogrouped_map.py:29:5: F401 'pandas.util.testing.assert_series_equal' imported but unused python/pyspark/sql/tests/test_pandas_cogrouped_map.py:32:5: F401 'pyarrow as pa' imported but unused python/pyspark/sql/tests/test_pandas_cogrouped_map.py:248:5: F401 'pyspark.sql.tests.test_pandas_cogrouped_map.*' imported but unused python/pyspark/sql/tests/test_udf.py:24:1: F401 'py4j' imported but unused python/pyspark/sql/tests/test_pandas_udf_typehints.py:246:5: F401 'pyspark.sql.tests.test_pandas_udf_typehints.*' imported but unused python/pyspark/sql/tests/test_functions.py:19:1: F401 'sys' imported but unused python/pyspark/sql/tests/test_functions.py:362:9: F401 'pyspark.sql.functions.exists' imported but unused python/pyspark/sql/tests/test_functions.py:387:5: F401 'pyspark.sql.tests.test_functions.*' imported but unused python/pyspark/sql/tests/test_pandas_udf_scalar.py:21:1: F401 'sys' imported but unused python/pyspark/sql/tests/test_pandas_udf_scalar.py:45:5: F401 'pyarrow as pa' imported but unused python/pyspark/sql/tests/test_pandas_udf_window.py:355:5: F401 'pyspark.sql.tests.test_pandas_udf_window.*' imported but unused python/pyspark/sql/tests/test_arrow.py:38:5: F401 'pyarrow as pa' imported but unused python/pyspark/sql/tests/test_pandas_grouped_map.py:20:1: F401 'sys' imported but unused python/pyspark/sql/tests/test_pandas_grouped_map.py:38:5: F401 'pyarrow as pa' imported but unused python/pyspark/sql/tests/test_dataframe.py:382:9: F401 'pyspark.sql.DataFrame' imported but unused python/pyspark/sql/avro/functions.py:125:5: F401 'pyspark.sql.Row' imported but unused python/pyspark/sql/pandas/functions.py:19:1: F401 'sys' imported but unused ``` After: ``` fokkodriesprongFan spark % flake8 python | grep -i "imported but unused" fokkodriesprongFan spark % ``` ### What changes were proposed in this pull request? Removing unused imports from the Python files to keep everything nice and tidy. ### Why are the changes needed? Cleaning up of the imports that aren't used, and suppressing the imports that are used as references to other modules, preserving backward compatibility. ### Does this PR introduce _any_ user-facing change? No. ### How was this patch tested? Adding the rule to the existing Flake8 checks. Closes #29121 from Fokko/SPARK-32319. Authored-by: Fokko Driesprong <fokko@apache.org> Signed-off-by: Dongjoon Hyun <dongjoon@apache.org>
477 lines
18 KiB
Python
477 lines
18 KiB
Python
#
|
|
# Licensed to the Apache Software Foundation (ASF) under one or more
|
|
# contributor license agreements. See the NOTICE file distributed with
|
|
# this work for additional information regarding copyright ownership.
|
|
# The ASF licenses this file to You under the Apache License, Version 2.0
|
|
# (the "License"); you may not use this file except in compliance with
|
|
# the License. You may obtain a copy of the License at
|
|
#
|
|
# http://www.apache.org/licenses/LICENSE-2.0
|
|
#
|
|
# Unless required by applicable law or agreed to in writing, software
|
|
# distributed under the License is distributed on an "AS IS" BASIS,
|
|
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
|
|
# See the License for the specific language governing permissions and
|
|
# limitations under the License.
|
|
#
|
|
|
|
import unittest
|
|
|
|
from numpy import array, random, exp, dot, all, mean, abs
|
|
from numpy import sum as array_sum
|
|
|
|
from pyspark import SparkContext
|
|
from pyspark.mllib.clustering import StreamingKMeans, StreamingKMeansModel
|
|
from pyspark.mllib.classification import StreamingLogisticRegressionWithSGD
|
|
from pyspark.mllib.linalg import Vectors
|
|
from pyspark.mllib.regression import LabeledPoint, StreamingLinearRegressionWithSGD
|
|
from pyspark.mllib.util import LinearDataGenerator
|
|
from pyspark.streaming import StreamingContext
|
|
from pyspark.testing.utils import eventually
|
|
|
|
|
|
class MLLibStreamingTestCase(unittest.TestCase):
|
|
def setUp(self):
|
|
self.sc = SparkContext('local[4]', "MLlib tests")
|
|
self.ssc = StreamingContext(self.sc, 1.0)
|
|
|
|
def tearDown(self):
|
|
self.ssc.stop(False)
|
|
self.sc.stop()
|
|
|
|
|
|
class StreamingKMeansTest(MLLibStreamingTestCase):
|
|
def test_model_params(self):
|
|
"""Test that the model params are set correctly"""
|
|
stkm = StreamingKMeans()
|
|
stkm.setK(5).setDecayFactor(0.0)
|
|
self.assertEqual(stkm._k, 5)
|
|
self.assertEqual(stkm._decayFactor, 0.0)
|
|
|
|
# Model not set yet.
|
|
self.assertIsNone(stkm.latestModel())
|
|
self.assertRaises(ValueError, stkm.trainOn, [0.0, 1.0])
|
|
|
|
stkm.setInitialCenters(
|
|
centers=[[0.0, 0.0], [1.0, 1.0]], weights=[1.0, 1.0])
|
|
self.assertEqual(
|
|
stkm.latestModel().centers, [[0.0, 0.0], [1.0, 1.0]])
|
|
self.assertEqual(stkm.latestModel().clusterWeights, [1.0, 1.0])
|
|
|
|
def test_accuracy_for_single_center(self):
|
|
"""Test that parameters obtained are correct for a single center."""
|
|
centers, batches = self.streamingKMeansDataGenerator(
|
|
batches=5, numPoints=5, k=1, d=5, r=0.1, seed=0)
|
|
stkm = StreamingKMeans(1)
|
|
stkm.setInitialCenters([[0., 0., 0., 0., 0.]], [0.])
|
|
input_stream = self.ssc.queueStream(
|
|
[self.sc.parallelize(batch, 1) for batch in batches])
|
|
stkm.trainOn(input_stream)
|
|
|
|
self.ssc.start()
|
|
|
|
def condition():
|
|
self.assertEqual(stkm.latestModel().clusterWeights, [25.0])
|
|
return True
|
|
eventually(condition, catch_assertions=True)
|
|
|
|
realCenters = array_sum(array(centers), axis=0)
|
|
for i in range(5):
|
|
modelCenters = stkm.latestModel().centers[0][i]
|
|
self.assertAlmostEqual(centers[0][i], modelCenters, 1)
|
|
self.assertAlmostEqual(realCenters[i], modelCenters, 1)
|
|
|
|
def streamingKMeansDataGenerator(self, batches, numPoints,
|
|
k, d, r, seed, centers=None):
|
|
rng = random.RandomState(seed)
|
|
|
|
# Generate centers.
|
|
centers = [rng.randn(d) for i in range(k)]
|
|
|
|
return centers, [[Vectors.dense(centers[j % k] + r * rng.randn(d))
|
|
for j in range(numPoints)]
|
|
for i in range(batches)]
|
|
|
|
def test_trainOn_model(self):
|
|
"""Test the model on toy data with four clusters."""
|
|
stkm = StreamingKMeans()
|
|
initCenters = [[1.0, 1.0], [-1.0, 1.0], [-1.0, -1.0], [1.0, -1.0]]
|
|
stkm.setInitialCenters(
|
|
centers=initCenters, weights=[1.0, 1.0, 1.0, 1.0])
|
|
|
|
# Create a toy dataset by setting a tiny offset for each point.
|
|
offsets = [[0, 0.1], [0, -0.1], [0.1, 0], [-0.1, 0]]
|
|
batches = []
|
|
for offset in offsets:
|
|
batches.append([[offset[0] + center[0], offset[1] + center[1]]
|
|
for center in initCenters])
|
|
|
|
batches = [self.sc.parallelize(batch, 1) for batch in batches]
|
|
input_stream = self.ssc.queueStream(batches)
|
|
stkm.trainOn(input_stream)
|
|
self.ssc.start()
|
|
|
|
# Give enough time to train the model.
|
|
def condition():
|
|
finalModel = stkm.latestModel()
|
|
self.assertTrue(all(finalModel.centers == array(initCenters)))
|
|
self.assertEqual(finalModel.clusterWeights, [5.0, 5.0, 5.0, 5.0])
|
|
return True
|
|
eventually(condition, catch_assertions=True)
|
|
|
|
def test_predictOn_model(self):
|
|
"""Test that the model predicts correctly on toy data."""
|
|
stkm = StreamingKMeans()
|
|
stkm._model = StreamingKMeansModel(
|
|
clusterCenters=[[1.0, 1.0], [-1.0, 1.0], [-1.0, -1.0], [1.0, -1.0]],
|
|
clusterWeights=[1.0, 1.0, 1.0, 1.0])
|
|
|
|
predict_data = [[[1.5, 1.5]], [[-1.5, 1.5]], [[-1.5, -1.5]], [[1.5, -1.5]]]
|
|
predict_data = [self.sc.parallelize(batch, 1) for batch in predict_data]
|
|
predict_stream = self.ssc.queueStream(predict_data)
|
|
predict_val = stkm.predictOn(predict_stream)
|
|
|
|
result = []
|
|
|
|
def update(rdd):
|
|
rdd_collect = rdd.collect()
|
|
if rdd_collect:
|
|
result.append(rdd_collect)
|
|
|
|
predict_val.foreachRDD(update)
|
|
self.ssc.start()
|
|
|
|
def condition():
|
|
self.assertEqual(result, [[0], [1], [2], [3]])
|
|
return True
|
|
|
|
eventually(condition, catch_assertions=True)
|
|
|
|
@unittest.skip("SPARK-10086: Flaky StreamingKMeans test in PySpark")
|
|
def test_trainOn_predictOn(self):
|
|
"""Test that prediction happens on the updated model."""
|
|
stkm = StreamingKMeans(decayFactor=0.0, k=2)
|
|
stkm.setInitialCenters([[0.0], [1.0]], [1.0, 1.0])
|
|
|
|
# Since decay factor is set to zero, once the first batch
|
|
# is passed the clusterCenters are updated to [-0.5, 0.7]
|
|
# which causes 0.2 & 0.3 to be classified as 1, even though the
|
|
# classification based in the initial model would have been 0
|
|
# proving that the model is updated.
|
|
batches = [[[-0.5], [0.6], [0.8]], [[0.2], [-0.1], [0.3]]]
|
|
batches = [self.sc.parallelize(batch) for batch in batches]
|
|
input_stream = self.ssc.queueStream(batches)
|
|
predict_results = []
|
|
|
|
def collect(rdd):
|
|
rdd_collect = rdd.collect()
|
|
if rdd_collect:
|
|
predict_results.append(rdd_collect)
|
|
|
|
stkm.trainOn(input_stream)
|
|
predict_stream = stkm.predictOn(input_stream)
|
|
predict_stream.foreachRDD(collect)
|
|
|
|
self.ssc.start()
|
|
|
|
def condition():
|
|
self.assertEqual(predict_results, [[0, 1, 1], [1, 0, 1]])
|
|
return True
|
|
|
|
eventually(condition, catch_assertions=True)
|
|
|
|
|
|
class StreamingLogisticRegressionWithSGDTests(MLLibStreamingTestCase):
|
|
|
|
@staticmethod
|
|
def generateLogisticInput(offset, scale, nPoints, seed):
|
|
"""
|
|
Generate 1 / (1 + exp(-x * scale + offset))
|
|
|
|
where,
|
|
x is randomnly distributed and the threshold
|
|
and labels for each sample in x is obtained from a random uniform
|
|
distribution.
|
|
"""
|
|
rng = random.RandomState(seed)
|
|
x = rng.randn(nPoints)
|
|
sigmoid = 1. / (1 + exp(-(dot(x, scale) + offset)))
|
|
y_p = rng.rand(nPoints)
|
|
cut_off = y_p <= sigmoid
|
|
y_p[cut_off] = 1.0
|
|
y_p[~cut_off] = 0.0
|
|
return [
|
|
LabeledPoint(y_p[i], Vectors.dense([x[i]]))
|
|
for i in range(nPoints)]
|
|
|
|
def test_parameter_accuracy(self):
|
|
"""
|
|
Test that the final value of weights is close to the desired value.
|
|
"""
|
|
input_batches = [
|
|
self.sc.parallelize(self.generateLogisticInput(0, 1.5, 100, 42 + i))
|
|
for i in range(20)]
|
|
input_stream = self.ssc.queueStream(input_batches)
|
|
|
|
slr = StreamingLogisticRegressionWithSGD(
|
|
stepSize=0.2, numIterations=25)
|
|
slr.setInitialWeights([0.0])
|
|
slr.trainOn(input_stream)
|
|
|
|
self.ssc.start()
|
|
|
|
def condition():
|
|
rel = (1.5 - slr.latestModel().weights.array[0]) / 1.5
|
|
self.assertAlmostEqual(rel, 0.1, 1)
|
|
return True
|
|
|
|
eventually(condition, timeout=60.0, catch_assertions=True)
|
|
|
|
def test_convergence(self):
|
|
"""
|
|
Test that weights converge to the required value on toy data.
|
|
"""
|
|
input_batches = [
|
|
self.sc.parallelize(self.generateLogisticInput(0, 1.5, 100, 42 + i))
|
|
for i in range(20)]
|
|
input_stream = self.ssc.queueStream(input_batches)
|
|
models = []
|
|
|
|
slr = StreamingLogisticRegressionWithSGD(
|
|
stepSize=0.2, numIterations=25)
|
|
slr.setInitialWeights([0.0])
|
|
slr.trainOn(input_stream)
|
|
input_stream.foreachRDD(
|
|
lambda x: models.append(slr.latestModel().weights[0]))
|
|
|
|
self.ssc.start()
|
|
|
|
def condition():
|
|
self.assertEqual(len(models), len(input_batches))
|
|
return True
|
|
|
|
# We want all batches to finish for this test.
|
|
eventually(condition, 60.0, catch_assertions=True)
|
|
|
|
t_models = array(models)
|
|
diff = t_models[1:] - t_models[:-1]
|
|
# Test that weights improve with a small tolerance
|
|
self.assertTrue(all(diff >= -0.1))
|
|
self.assertTrue(array_sum(diff > 0) > 1)
|
|
|
|
@staticmethod
|
|
def calculate_accuracy_error(true, predicted):
|
|
return sum(abs(array(true) - array(predicted))) / len(true)
|
|
|
|
def test_predictions(self):
|
|
"""Test predicted values on a toy model."""
|
|
input_batches = []
|
|
for i in range(20):
|
|
batch = self.sc.parallelize(
|
|
self.generateLogisticInput(0, 1.5, 100, 42 + i))
|
|
input_batches.append(batch.map(lambda x: (x.label, x.features)))
|
|
input_stream = self.ssc.queueStream(input_batches)
|
|
|
|
slr = StreamingLogisticRegressionWithSGD(
|
|
stepSize=0.2, numIterations=25)
|
|
slr.setInitialWeights([1.5])
|
|
predict_stream = slr.predictOnValues(input_stream)
|
|
true_predicted = []
|
|
predict_stream.foreachRDD(lambda x: true_predicted.append(x.collect()))
|
|
self.ssc.start()
|
|
|
|
def condition():
|
|
self.assertEqual(len(true_predicted), len(input_batches))
|
|
return True
|
|
|
|
eventually(condition, catch_assertions=True)
|
|
|
|
# Test that the accuracy error is no more than 0.4 on each batch.
|
|
for batch in true_predicted:
|
|
true, predicted = zip(*batch)
|
|
self.assertTrue(
|
|
self.calculate_accuracy_error(true, predicted) < 0.4)
|
|
|
|
def test_training_and_prediction(self):
|
|
"""Test that the model improves on toy data with no. of batches"""
|
|
input_batches = [
|
|
self.sc.parallelize(self.generateLogisticInput(0, 1.5, 100, 42 + i))
|
|
for i in range(40)]
|
|
predict_batches = [
|
|
b.map(lambda lp: (lp.label, lp.features)) for b in input_batches]
|
|
|
|
slr = StreamingLogisticRegressionWithSGD(
|
|
stepSize=0.01, numIterations=25)
|
|
slr.setInitialWeights([-0.1])
|
|
errors = []
|
|
|
|
def collect_errors(rdd):
|
|
true, predicted = zip(*rdd.collect())
|
|
errors.append(self.calculate_accuracy_error(true, predicted))
|
|
|
|
true_predicted = []
|
|
input_stream = self.ssc.queueStream(input_batches)
|
|
predict_stream = self.ssc.queueStream(predict_batches)
|
|
slr.trainOn(input_stream)
|
|
ps = slr.predictOnValues(predict_stream)
|
|
ps.foreachRDD(lambda x: collect_errors(x))
|
|
|
|
self.ssc.start()
|
|
|
|
def condition():
|
|
# Test that the improvement in error is > 0.3
|
|
if len(errors) == len(predict_batches):
|
|
self.assertGreater(errors[1] - errors[-1], 0.3)
|
|
if len(errors) >= 3 and errors[1] - errors[-1] > 0.3:
|
|
return True
|
|
return "Latest errors: " + ", ".join(map(lambda x: str(x), errors))
|
|
|
|
eventually(condition, timeout=180.0)
|
|
|
|
|
|
class StreamingLinearRegressionWithTests(MLLibStreamingTestCase):
|
|
|
|
def assertArrayAlmostEqual(self, array1, array2, dec):
|
|
for i, j in array1, array2:
|
|
self.assertAlmostEqual(i, j, dec)
|
|
|
|
def test_parameter_accuracy(self):
|
|
"""Test that coefs are predicted accurately by fitting on toy data."""
|
|
|
|
# Test that fitting (10*X1 + 10*X2), (X1, X2) gives coefficients
|
|
# (10, 10)
|
|
slr = StreamingLinearRegressionWithSGD(stepSize=0.2, numIterations=25)
|
|
slr.setInitialWeights([0.0, 0.0])
|
|
xMean = [0.0, 0.0]
|
|
xVariance = [1.0 / 3.0, 1.0 / 3.0]
|
|
|
|
# Create ten batches with 100 sample points in each.
|
|
batches = []
|
|
for i in range(10):
|
|
batch = LinearDataGenerator.generateLinearInput(
|
|
0.0, [10.0, 10.0], xMean, xVariance, 100, 42 + i, 0.1)
|
|
batches.append(self.sc.parallelize(batch))
|
|
|
|
input_stream = self.ssc.queueStream(batches)
|
|
slr.trainOn(input_stream)
|
|
self.ssc.start()
|
|
|
|
def condition():
|
|
self.assertArrayAlmostEqual(
|
|
slr.latestModel().weights.array, [10., 10.], 1)
|
|
self.assertAlmostEqual(slr.latestModel().intercept, 0.0, 1)
|
|
return True
|
|
|
|
eventually(condition, catch_assertions=True)
|
|
|
|
def test_parameter_convergence(self):
|
|
"""Test that the model parameters improve with streaming data."""
|
|
slr = StreamingLinearRegressionWithSGD(stepSize=0.2, numIterations=25)
|
|
slr.setInitialWeights([0.0])
|
|
|
|
# Create ten batches with 100 sample points in each.
|
|
batches = []
|
|
for i in range(10):
|
|
batch = LinearDataGenerator.generateLinearInput(
|
|
0.0, [10.0], [0.0], [1.0 / 3.0], 100, 42 + i, 0.1)
|
|
batches.append(self.sc.parallelize(batch))
|
|
|
|
model_weights = []
|
|
input_stream = self.ssc.queueStream(batches)
|
|
input_stream.foreachRDD(
|
|
lambda x: model_weights.append(slr.latestModel().weights[0]))
|
|
slr.trainOn(input_stream)
|
|
self.ssc.start()
|
|
|
|
def condition():
|
|
self.assertEqual(len(model_weights), len(batches))
|
|
return True
|
|
|
|
# We want all batches to finish for this test.
|
|
eventually(condition, catch_assertions=True)
|
|
|
|
w = array(model_weights)
|
|
diff = w[1:] - w[:-1]
|
|
self.assertTrue(all(diff >= -0.1))
|
|
|
|
def test_prediction(self):
|
|
"""Test prediction on a model with weights already set."""
|
|
# Create a model with initial Weights equal to coefs
|
|
slr = StreamingLinearRegressionWithSGD(stepSize=0.2, numIterations=25)
|
|
slr.setInitialWeights([10.0, 10.0])
|
|
|
|
# Create ten batches with 100 sample points in each.
|
|
batches = []
|
|
for i in range(10):
|
|
batch = LinearDataGenerator.generateLinearInput(
|
|
0.0, [10.0, 10.0], [0.0, 0.0], [1.0 / 3.0, 1.0 / 3.0],
|
|
100, 42 + i, 0.1)
|
|
batches.append(
|
|
self.sc.parallelize(batch).map(lambda lp: (lp.label, lp.features)))
|
|
|
|
input_stream = self.ssc.queueStream(batches)
|
|
output_stream = slr.predictOnValues(input_stream)
|
|
samples = []
|
|
output_stream.foreachRDD(lambda x: samples.append(x.collect()))
|
|
|
|
self.ssc.start()
|
|
|
|
def condition():
|
|
self.assertEqual(len(samples), len(batches))
|
|
return True
|
|
|
|
# We want all batches to finish for this test.
|
|
eventually(condition, catch_assertions=True)
|
|
|
|
# Test that mean absolute error on each batch is less than 0.1
|
|
for batch in samples:
|
|
true, predicted = zip(*batch)
|
|
self.assertTrue(mean(abs(array(true) - array(predicted))) < 0.1)
|
|
|
|
def test_train_prediction(self):
|
|
"""Test that error on test data improves as model is trained."""
|
|
slr = StreamingLinearRegressionWithSGD(stepSize=0.2, numIterations=25)
|
|
slr.setInitialWeights([0.0])
|
|
|
|
# Create fifteen batches with 100 sample points in each.
|
|
batches = []
|
|
for i in range(15):
|
|
batch = LinearDataGenerator.generateLinearInput(
|
|
0.0, [10.0], [0.0], [1.0 / 3.0], 100, 42 + i, 0.1)
|
|
batches.append(self.sc.parallelize(batch))
|
|
|
|
predict_batches = [
|
|
b.map(lambda lp: (lp.label, lp.features)) for b in batches]
|
|
errors = []
|
|
|
|
def func(rdd):
|
|
true, predicted = zip(*rdd.collect())
|
|
errors.append(mean(abs(true) - abs(predicted)))
|
|
|
|
input_stream = self.ssc.queueStream(batches)
|
|
output_stream = self.ssc.queueStream(predict_batches)
|
|
slr.trainOn(input_stream)
|
|
output_stream = slr.predictOnValues(output_stream)
|
|
output_stream.foreachRDD(func)
|
|
self.ssc.start()
|
|
|
|
def condition():
|
|
if len(errors) == len(predict_batches):
|
|
self.assertGreater(errors[1] - errors[-1], 2)
|
|
if len(errors) >= 3 and errors[1] - errors[-1] > 2:
|
|
return True
|
|
return "Latest errors: " + ", ".join(map(lambda x: str(x), errors))
|
|
|
|
eventually(condition, timeout=180.0)
|
|
|
|
|
|
if __name__ == "__main__":
|
|
from pyspark.mllib.tests.test_streaming_algorithms import * # noqa: F401
|
|
|
|
try:
|
|
import xmlrunner
|
|
testRunner = xmlrunner.XMLTestRunner(output='target/test-reports', verbosity=2)
|
|
except ImportError:
|
|
testRunner = None
|
|
unittest.main(testRunner=testRunner, verbosity=2)
|