04e44b37cc
This PR update PySpark to support Python 3 (tested with 3.4). Known issue: unpickle array from Pyrolite is broken in Python 3, those tests are skipped. TODO: ec2/spark-ec2.py is not fully tested with python3. Author: Davies Liu <davies@databricks.com> Author: twneale <twneale@gmail.com> Author: Josh Rosen <joshrosen@databricks.com> Closes #5173 from davies/python3 and squashes the following commits: d7d6323 [Davies Liu] fix tests 6c52a98 [Davies Liu] fix mllib test 99e334f [Davies Liu] update timeout b716610 [Davies Liu] Merge branch 'master' of github.com:apache/spark into python3 cafd5ec [Davies Liu] adddress comments from @mengxr bf225d7 [Davies Liu] Merge branch 'master' of github.com:apache/spark into python3 179fc8d [Davies Liu] tuning flaky tests 8c8b957 [Davies Liu] fix ResourceWarning in Python 3 5c57c95 [Davies Liu] Merge branch 'master' of github.com:apache/spark into python3 4006829 [Davies Liu] fix test 2fc0066 [Davies Liu] add python3 path 71535e9 [Davies Liu] fix xrange and divide 5a55ab4 [Davies Liu] Merge branch 'master' of github.com:apache/spark into python3 125f12c [Davies Liu] Merge branch 'master' of github.com:apache/spark into python3 ed498c8 [Davies Liu] fix compatibility with python 3 820e649 [Davies Liu] Merge branch 'master' of github.com:apache/spark into python3 e8ce8c9 [Davies Liu] Merge branch 'master' of github.com:apache/spark into python3 ad7c374 [Davies Liu] fix mllib test and warning ef1fc2f [Davies Liu] fix tests 4eee14a [Davies Liu] Merge branch 'master' of github.com:apache/spark into python3 20112ff [Davies Liu] Merge branch 'master' of github.com:apache/spark into python3 59bb492 [Davies Liu] fix tests 1da268c [Davies Liu] Merge branch 'master' of github.com:apache/spark into python3 ca0fdd3 [Davies Liu] fix code style 9563a15 [Davies Liu] add imap back for python 2 0b1ec04 [Davies Liu] make python examples work with Python 3 d2fd566 [Davies Liu] Merge branch 'master' of github.com:apache/spark into python3 a716d34 [Davies Liu] test with python 3.4 f1700e8 [Davies Liu] fix test in python3 671b1db [Davies Liu] fix test in python3 692ff47 [Davies Liu] fix flaky test 7b9699f [Davies Liu] invalidate import cache for Python 3.3+ 9c58497 [Davies Liu] fix kill worker 309bfbf [Davies Liu] keep compatibility 5707476 [Davies Liu] cleanup, fix hash of string in 3.3+ 8662d5b [Davies Liu] Merge branch 'master' of github.com:apache/spark into python3 f53e1f0 [Davies Liu] fix tests 70b6b73 [Davies Liu] compile ec2/spark_ec2.py in python 3 a39167e [Davies Liu] support customize class in __main__ 814c77b [Davies Liu] run unittests with python 3 7f4476e [Davies Liu] mllib tests passed d737924 [Davies Liu] pass ml tests 375ea17 [Davies Liu] SQL tests pass 6cc42a9 [Davies Liu] rename 431a8de [Davies Liu] streaming tests pass 78901a7 [Davies Liu] fix hash of serializer in Python 3 24b2f2e [Davies Liu] pass all RDD tests 35f48fe [Davies Liu] run future again 1eebac2 [Davies Liu] fix conflict in ec2/spark_ec2.py 6e3c21d [Davies Liu] make cloudpickle work with Python3 2fb2db3 [Josh Rosen] Guard more changes behind sys.version; still doesn't run 1aa5e8f [twneale] Turned out `pickle.DictionaryType is dict` == True, so swapped it out 7354371 [twneale] buffer --> memoryview I'm not super sure if this a valid change, but the 2.7 docs recommend using memoryview over buffer where possible, so hoping it'll work. b69ccdf [twneale] Uses the pure python pickle._Pickler instead of c-extension _pickle.Pickler. It appears pyspark 2.7 uses the pure python pickler as well, so this shouldn't degrade pickling performance (?). f40d925 [twneale] xrange --> range e104215 [twneale] Replaces 2.7 types.InstsanceType with 3.4 `object`....could be horribly wrong depending on how types.InstanceType is used elsewhere in the package--see http://bugs.python.org/issue8206 79de9d0 [twneale] Replaces python2.7 `file` with 3.4 _io.TextIOWrapper 2adb42d [Josh Rosen] Fix up some import differences between Python 2 and 3 854be27 [Josh Rosen] Run `futurize` on Python code: 7c5b4ce [Josh Rosen] Remove Python 3 check in shell.py.
410 lines
16 KiB
Python
410 lines
16 KiB
Python
#
|
|
# Licensed to the Apache Software Foundation (ASF) under one or more
|
|
# contributor license agreements. See the NOTICE file distributed with
|
|
# this work for additional information regarding copyright ownership.
|
|
# The ASF licenses this file to You under the Apache License, Version 2.0
|
|
# (the "License"); you may not use this file except in compliance with
|
|
# the License. You may obtain a copy of the License at
|
|
#
|
|
# http://www.apache.org/licenses/LICENSE-2.0
|
|
#
|
|
# Unless required by applicable law or agreed to in writing, software
|
|
# distributed under the License is distributed on an "AS IS" BASIS,
|
|
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
|
|
# See the License for the specific language governing permissions and
|
|
# limitations under the License.
|
|
#
|
|
|
|
"""
|
|
Python package for random data generation.
|
|
"""
|
|
|
|
from functools import wraps
|
|
|
|
from pyspark.mllib.common import callMLlibFunc
|
|
|
|
|
|
__all__ = ['RandomRDDs', ]
|
|
|
|
|
|
def toArray(f):
|
|
@wraps(f)
|
|
def func(sc, *a, **kw):
|
|
rdd = f(sc, *a, **kw)
|
|
return rdd.map(lambda vec: vec.toArray())
|
|
return func
|
|
|
|
|
|
class RandomRDDs(object):
|
|
"""
|
|
Generator methods for creating RDDs comprised of i.i.d samples from
|
|
some distribution.
|
|
"""
|
|
|
|
@staticmethod
|
|
def uniformRDD(sc, size, numPartitions=None, seed=None):
|
|
"""
|
|
Generates an RDD comprised of i.i.d. samples from the
|
|
uniform distribution U(0.0, 1.0).
|
|
|
|
To transform the distribution in the generated RDD from U(0.0, 1.0)
|
|
to U(a, b), use
|
|
C{RandomRDDs.uniformRDD(sc, n, p, seed)\
|
|
.map(lambda v: a + (b - a) * v)}
|
|
|
|
:param sc: SparkContext used to create the RDD.
|
|
:param size: Size of the RDD.
|
|
:param numPartitions: Number of partitions in the RDD (default: `sc.defaultParallelism`).
|
|
:param seed: Random seed (default: a random long integer).
|
|
:return: RDD of float comprised of i.i.d. samples ~ `U(0.0, 1.0)`.
|
|
|
|
>>> x = RandomRDDs.uniformRDD(sc, 100).collect()
|
|
>>> len(x)
|
|
100
|
|
>>> max(x) <= 1.0 and min(x) >= 0.0
|
|
True
|
|
>>> RandomRDDs.uniformRDD(sc, 100, 4).getNumPartitions()
|
|
4
|
|
>>> parts = RandomRDDs.uniformRDD(sc, 100, seed=4).getNumPartitions()
|
|
>>> parts == sc.defaultParallelism
|
|
True
|
|
"""
|
|
return callMLlibFunc("uniformRDD", sc._jsc, size, numPartitions, seed)
|
|
|
|
@staticmethod
|
|
def normalRDD(sc, size, numPartitions=None, seed=None):
|
|
"""
|
|
Generates an RDD comprised of i.i.d. samples from the standard normal
|
|
distribution.
|
|
|
|
To transform the distribution in the generated RDD from standard normal
|
|
to some other normal N(mean, sigma^2), use
|
|
C{RandomRDDs.normal(sc, n, p, seed)\
|
|
.map(lambda v: mean + sigma * v)}
|
|
|
|
:param sc: SparkContext used to create the RDD.
|
|
:param size: Size of the RDD.
|
|
:param numPartitions: Number of partitions in the RDD (default: `sc.defaultParallelism`).
|
|
:param seed: Random seed (default: a random long integer).
|
|
:return: RDD of float comprised of i.i.d. samples ~ N(0.0, 1.0).
|
|
|
|
>>> x = RandomRDDs.normalRDD(sc, 1000, seed=1)
|
|
>>> stats = x.stats()
|
|
>>> stats.count()
|
|
1000
|
|
>>> abs(stats.mean() - 0.0) < 0.1
|
|
True
|
|
>>> abs(stats.stdev() - 1.0) < 0.1
|
|
True
|
|
"""
|
|
return callMLlibFunc("normalRDD", sc._jsc, size, numPartitions, seed)
|
|
|
|
@staticmethod
|
|
def logNormalRDD(sc, mean, std, size, numPartitions=None, seed=None):
|
|
"""
|
|
Generates an RDD comprised of i.i.d. samples from the log normal
|
|
distribution with the input mean and standard distribution.
|
|
|
|
:param sc: SparkContext used to create the RDD.
|
|
:param mean: mean for the log Normal distribution
|
|
:param std: std for the log Normal distribution
|
|
:param size: Size of the RDD.
|
|
:param numPartitions: Number of partitions in the RDD (default: `sc.defaultParallelism`).
|
|
:param seed: Random seed (default: a random long integer).
|
|
:return: RDD of float comprised of i.i.d. samples ~ log N(mean, std).
|
|
|
|
>>> from math import sqrt, exp
|
|
>>> mean = 0.0
|
|
>>> std = 1.0
|
|
>>> expMean = exp(mean + 0.5 * std * std)
|
|
>>> expStd = sqrt((exp(std * std) - 1.0) * exp(2.0 * mean + std * std))
|
|
>>> x = RandomRDDs.logNormalRDD(sc, mean, std, 1000, seed=2)
|
|
>>> stats = x.stats()
|
|
>>> stats.count()
|
|
1000
|
|
>>> abs(stats.mean() - expMean) < 0.5
|
|
True
|
|
>>> from math import sqrt
|
|
>>> abs(stats.stdev() - expStd) < 0.5
|
|
True
|
|
"""
|
|
return callMLlibFunc("logNormalRDD", sc._jsc, float(mean), float(std),
|
|
size, numPartitions, seed)
|
|
|
|
@staticmethod
|
|
def poissonRDD(sc, mean, size, numPartitions=None, seed=None):
|
|
"""
|
|
Generates an RDD comprised of i.i.d. samples from the Poisson
|
|
distribution with the input mean.
|
|
|
|
:param sc: SparkContext used to create the RDD.
|
|
:param mean: Mean, or lambda, for the Poisson distribution.
|
|
:param size: Size of the RDD.
|
|
:param numPartitions: Number of partitions in the RDD (default: `sc.defaultParallelism`).
|
|
:param seed: Random seed (default: a random long integer).
|
|
:return: RDD of float comprised of i.i.d. samples ~ Pois(mean).
|
|
|
|
>>> mean = 100.0
|
|
>>> x = RandomRDDs.poissonRDD(sc, mean, 1000, seed=2)
|
|
>>> stats = x.stats()
|
|
>>> stats.count()
|
|
1000
|
|
>>> abs(stats.mean() - mean) < 0.5
|
|
True
|
|
>>> from math import sqrt
|
|
>>> abs(stats.stdev() - sqrt(mean)) < 0.5
|
|
True
|
|
"""
|
|
return callMLlibFunc("poissonRDD", sc._jsc, float(mean), size, numPartitions, seed)
|
|
|
|
@staticmethod
|
|
def exponentialRDD(sc, mean, size, numPartitions=None, seed=None):
|
|
"""
|
|
Generates an RDD comprised of i.i.d. samples from the Exponential
|
|
distribution with the input mean.
|
|
|
|
:param sc: SparkContext used to create the RDD.
|
|
:param mean: Mean, or 1 / lambda, for the Exponential distribution.
|
|
:param size: Size of the RDD.
|
|
:param numPartitions: Number of partitions in the RDD (default: `sc.defaultParallelism`).
|
|
:param seed: Random seed (default: a random long integer).
|
|
:return: RDD of float comprised of i.i.d. samples ~ Exp(mean).
|
|
|
|
>>> mean = 2.0
|
|
>>> x = RandomRDDs.exponentialRDD(sc, mean, 1000, seed=2)
|
|
>>> stats = x.stats()
|
|
>>> stats.count()
|
|
1000
|
|
>>> abs(stats.mean() - mean) < 0.5
|
|
True
|
|
>>> from math import sqrt
|
|
>>> abs(stats.stdev() - sqrt(mean)) < 0.5
|
|
True
|
|
"""
|
|
return callMLlibFunc("exponentialRDD", sc._jsc, float(mean), size, numPartitions, seed)
|
|
|
|
@staticmethod
|
|
def gammaRDD(sc, shape, scale, size, numPartitions=None, seed=None):
|
|
"""
|
|
Generates an RDD comprised of i.i.d. samples from the Gamma
|
|
distribution with the input shape and scale.
|
|
|
|
:param sc: SparkContext used to create the RDD.
|
|
:param shape: shape (> 0) parameter for the Gamma distribution
|
|
:param scale: scale (> 0) parameter for the Gamma distribution
|
|
:param size: Size of the RDD.
|
|
:param numPartitions: Number of partitions in the RDD (default: `sc.defaultParallelism`).
|
|
:param seed: Random seed (default: a random long integer).
|
|
:return: RDD of float comprised of i.i.d. samples ~ Gamma(shape, scale).
|
|
|
|
>>> from math import sqrt
|
|
>>> shape = 1.0
|
|
>>> scale = 2.0
|
|
>>> expMean = shape * scale
|
|
>>> expStd = sqrt(shape * scale * scale)
|
|
>>> x = RandomRDDs.gammaRDD(sc, shape, scale, 1000, seed=2)
|
|
>>> stats = x.stats()
|
|
>>> stats.count()
|
|
1000
|
|
>>> abs(stats.mean() - expMean) < 0.5
|
|
True
|
|
>>> abs(stats.stdev() - expStd) < 0.5
|
|
True
|
|
"""
|
|
return callMLlibFunc("gammaRDD", sc._jsc, float(shape),
|
|
float(scale), size, numPartitions, seed)
|
|
|
|
@staticmethod
|
|
@toArray
|
|
def uniformVectorRDD(sc, numRows, numCols, numPartitions=None, seed=None):
|
|
"""
|
|
Generates an RDD comprised of vectors containing i.i.d. samples drawn
|
|
from the uniform distribution U(0.0, 1.0).
|
|
|
|
:param sc: SparkContext used to create the RDD.
|
|
:param numRows: Number of Vectors in the RDD.
|
|
:param numCols: Number of elements in each Vector.
|
|
:param numPartitions: Number of partitions in the RDD.
|
|
:param seed: Seed for the RNG that generates the seed for the generator in each partition.
|
|
:return: RDD of Vector with vectors containing i.i.d samples ~ `U(0.0, 1.0)`.
|
|
|
|
>>> import numpy as np
|
|
>>> mat = np.matrix(RandomRDDs.uniformVectorRDD(sc, 10, 10).collect())
|
|
>>> mat.shape
|
|
(10, 10)
|
|
>>> mat.max() <= 1.0 and mat.min() >= 0.0
|
|
True
|
|
>>> RandomRDDs.uniformVectorRDD(sc, 10, 10, 4).getNumPartitions()
|
|
4
|
|
"""
|
|
return callMLlibFunc("uniformVectorRDD", sc._jsc, numRows, numCols, numPartitions, seed)
|
|
|
|
@staticmethod
|
|
@toArray
|
|
def normalVectorRDD(sc, numRows, numCols, numPartitions=None, seed=None):
|
|
"""
|
|
Generates an RDD comprised of vectors containing i.i.d. samples drawn
|
|
from the standard normal distribution.
|
|
|
|
:param sc: SparkContext used to create the RDD.
|
|
:param numRows: Number of Vectors in the RDD.
|
|
:param numCols: Number of elements in each Vector.
|
|
:param numPartitions: Number of partitions in the RDD (default: `sc.defaultParallelism`).
|
|
:param seed: Random seed (default: a random long integer).
|
|
:return: RDD of Vector with vectors containing i.i.d. samples ~ `N(0.0, 1.0)`.
|
|
|
|
>>> import numpy as np
|
|
>>> mat = np.matrix(RandomRDDs.normalVectorRDD(sc, 100, 100, seed=1).collect())
|
|
>>> mat.shape
|
|
(100, 100)
|
|
>>> abs(mat.mean() - 0.0) < 0.1
|
|
True
|
|
>>> abs(mat.std() - 1.0) < 0.1
|
|
True
|
|
"""
|
|
return callMLlibFunc("normalVectorRDD", sc._jsc, numRows, numCols, numPartitions, seed)
|
|
|
|
@staticmethod
|
|
@toArray
|
|
def logNormalVectorRDD(sc, mean, std, numRows, numCols, numPartitions=None, seed=None):
|
|
"""
|
|
Generates an RDD comprised of vectors containing i.i.d. samples drawn
|
|
from the log normal distribution.
|
|
|
|
:param sc: SparkContext used to create the RDD.
|
|
:param mean: Mean of the log normal distribution
|
|
:param std: Standard Deviation of the log normal distribution
|
|
:param numRows: Number of Vectors in the RDD.
|
|
:param numCols: Number of elements in each Vector.
|
|
:param numPartitions: Number of partitions in the RDD (default: `sc.defaultParallelism`).
|
|
:param seed: Random seed (default: a random long integer).
|
|
:return: RDD of Vector with vectors containing i.i.d. samples ~ log `N(mean, std)`.
|
|
|
|
>>> import numpy as np
|
|
>>> from math import sqrt, exp
|
|
>>> mean = 0.0
|
|
>>> std = 1.0
|
|
>>> expMean = exp(mean + 0.5 * std * std)
|
|
>>> expStd = sqrt((exp(std * std) - 1.0) * exp(2.0 * mean + std * std))
|
|
>>> m = RandomRDDs.logNormalVectorRDD(sc, mean, std, 100, 100, seed=1).collect()
|
|
>>> mat = np.matrix(m)
|
|
>>> mat.shape
|
|
(100, 100)
|
|
>>> abs(mat.mean() - expMean) < 0.1
|
|
True
|
|
>>> abs(mat.std() - expStd) < 0.1
|
|
True
|
|
"""
|
|
return callMLlibFunc("logNormalVectorRDD", sc._jsc, float(mean), float(std),
|
|
numRows, numCols, numPartitions, seed)
|
|
|
|
@staticmethod
|
|
@toArray
|
|
def poissonVectorRDD(sc, mean, numRows, numCols, numPartitions=None, seed=None):
|
|
"""
|
|
Generates an RDD comprised of vectors containing i.i.d. samples drawn
|
|
from the Poisson distribution with the input mean.
|
|
|
|
:param sc: SparkContext used to create the RDD.
|
|
:param mean: Mean, or lambda, for the Poisson distribution.
|
|
:param numRows: Number of Vectors in the RDD.
|
|
:param numCols: Number of elements in each Vector.
|
|
:param numPartitions: Number of partitions in the RDD (default: `sc.defaultParallelism`)
|
|
:param seed: Random seed (default: a random long integer).
|
|
:return: RDD of Vector with vectors containing i.i.d. samples ~ Pois(mean).
|
|
|
|
>>> import numpy as np
|
|
>>> mean = 100.0
|
|
>>> rdd = RandomRDDs.poissonVectorRDD(sc, mean, 100, 100, seed=1)
|
|
>>> mat = np.mat(rdd.collect())
|
|
>>> mat.shape
|
|
(100, 100)
|
|
>>> abs(mat.mean() - mean) < 0.5
|
|
True
|
|
>>> from math import sqrt
|
|
>>> abs(mat.std() - sqrt(mean)) < 0.5
|
|
True
|
|
"""
|
|
return callMLlibFunc("poissonVectorRDD", sc._jsc, float(mean), numRows, numCols,
|
|
numPartitions, seed)
|
|
|
|
@staticmethod
|
|
@toArray
|
|
def exponentialVectorRDD(sc, mean, numRows, numCols, numPartitions=None, seed=None):
|
|
"""
|
|
Generates an RDD comprised of vectors containing i.i.d. samples drawn
|
|
from the Exponential distribution with the input mean.
|
|
|
|
:param sc: SparkContext used to create the RDD.
|
|
:param mean: Mean, or 1 / lambda, for the Exponential distribution.
|
|
:param numRows: Number of Vectors in the RDD.
|
|
:param numCols: Number of elements in each Vector.
|
|
:param numPartitions: Number of partitions in the RDD (default: `sc.defaultParallelism`)
|
|
:param seed: Random seed (default: a random long integer).
|
|
:return: RDD of Vector with vectors containing i.i.d. samples ~ Exp(mean).
|
|
|
|
>>> import numpy as np
|
|
>>> mean = 0.5
|
|
>>> rdd = RandomRDDs.exponentialVectorRDD(sc, mean, 100, 100, seed=1)
|
|
>>> mat = np.mat(rdd.collect())
|
|
>>> mat.shape
|
|
(100, 100)
|
|
>>> abs(mat.mean() - mean) < 0.5
|
|
True
|
|
>>> from math import sqrt
|
|
>>> abs(mat.std() - sqrt(mean)) < 0.5
|
|
True
|
|
"""
|
|
return callMLlibFunc("exponentialVectorRDD", sc._jsc, float(mean), numRows, numCols,
|
|
numPartitions, seed)
|
|
|
|
@staticmethod
|
|
@toArray
|
|
def gammaVectorRDD(sc, shape, scale, numRows, numCols, numPartitions=None, seed=None):
|
|
"""
|
|
Generates an RDD comprised of vectors containing i.i.d. samples drawn
|
|
from the Gamma distribution.
|
|
|
|
:param sc: SparkContext used to create the RDD.
|
|
:param shape: Shape (> 0) of the Gamma distribution
|
|
:param scale: Scale (> 0) of the Gamma distribution
|
|
:param numRows: Number of Vectors in the RDD.
|
|
:param numCols: Number of elements in each Vector.
|
|
:param numPartitions: Number of partitions in the RDD (default: `sc.defaultParallelism`).
|
|
:param seed: Random seed (default: a random long integer).
|
|
:return: RDD of Vector with vectors containing i.i.d. samples ~ Gamma(shape, scale).
|
|
|
|
>>> import numpy as np
|
|
>>> from math import sqrt
|
|
>>> shape = 1.0
|
|
>>> scale = 2.0
|
|
>>> expMean = shape * scale
|
|
>>> expStd = sqrt(shape * scale * scale)
|
|
>>> mat = np.matrix(RandomRDDs.gammaVectorRDD(sc, shape, scale, 100, 100, seed=1).collect())
|
|
>>> mat.shape
|
|
(100, 100)
|
|
>>> abs(mat.mean() - expMean) < 0.1
|
|
True
|
|
>>> abs(mat.std() - expStd) < 0.1
|
|
True
|
|
"""
|
|
return callMLlibFunc("gammaVectorRDD", sc._jsc, float(shape), float(scale),
|
|
numRows, numCols, numPartitions, seed)
|
|
|
|
|
|
def _test():
|
|
import doctest
|
|
from pyspark.context import SparkContext
|
|
globs = globals().copy()
|
|
# The small batch size here ensures that we see multiple batches,
|
|
# even in these small test examples:
|
|
globs['sc'] = SparkContext('local[2]', 'PythonTest', batchSize=2)
|
|
(failure_count, test_count) = doctest.testmod(globs=globs, optionflags=doctest.ELLIPSIS)
|
|
globs['sc'].stop()
|
|
if failure_count:
|
|
exit(-1)
|
|
|
|
|
|
if __name__ == "__main__":
|
|
_test()
|