[SPARK-6263] [MLLIB] Python MLlib API missing items: Utils
Implement missing API in pyspark. MLUtils * appendBias * loadVectors `kFold` is also missing however I am not sure `ClassTag` can be passed or restored through python. Author: lewuathe <lewuathe@me.com> Closes #5707 from Lewuathe/SPARK-6263 and squashes the following commits: 16863ea [lewuathe] Merge master 3fc27e7 [lewuathe] Merge branch 'master' into SPARK-6263 6084e9c [lewuathe] Resolv conflict d2aa2a0 [lewuathe] Resolv conflict 9c329d8 [lewuathe] Fix efficiency 3a12a2d [lewuathe] Merge branch 'master' into SPARK-6263 1d4714b [lewuathe] Fix style b29e2bc [lewuathe] Remove scipy dependencies e32eb40 [lewuathe] Merge branch 'master' into SPARK-6263 25d3c9d [lewuathe] Remove unnecessary imports 7ec04db [lewuathe] Resolv conflict 1502d13 [lewuathe] Resolv conflict d6bd416 [lewuathe] Check existence of scipy.sparse 5d555b1 [lewuathe] Construct scipy.sparse matrix c345a44 [lewuathe] Merge branch 'master' into SPARK-6263 b8b5ef7 [lewuathe] Fix unnecessary sort method d254be7 [lewuathe] Merge branch 'master' into SPARK-6263 62a9c7e [lewuathe] Fix appendBias return type 454c73d [lewuathe] Merge branch 'master' into SPARK-6263 a353354 [lewuathe] Remove unnecessary appendBias implementation 44295c2 [lewuathe] Merge branch 'master' into SPARK-6263 64f72ad [lewuathe] Merge branch 'master' into SPARK-6263 c728046 [lewuathe] Fix style 2980569 [lewuathe] [SPARK-6263] Python MLlib API missing items: Utils
This commit is contained in:
parent
31b4a3d7f2
commit
184de91d15
|
@ -75,6 +75,15 @@ private[python] class PythonMLLibAPI extends Serializable {
|
|||
minPartitions: Int): JavaRDD[LabeledPoint] =
|
||||
MLUtils.loadLabeledPoints(jsc.sc, path, minPartitions)
|
||||
|
||||
/**
|
||||
* Loads and serializes vectors saved with `RDD#saveAsTextFile`.
|
||||
* @param jsc Java SparkContext
|
||||
* @param path file or directory path in any Hadoop-supported file system URI
|
||||
* @return serialized vectors in a RDD
|
||||
*/
|
||||
def loadVectors(jsc: JavaSparkContext, path: String): RDD[Vector] =
|
||||
MLUtils.loadVectors(jsc.sc, path)
|
||||
|
||||
private def trainRegressionModel(
|
||||
learner: GeneralizedLinearAlgorithm[_ <: GeneralizedLinearModel],
|
||||
data: JavaRDD[LabeledPoint],
|
||||
|
|
|
@ -54,6 +54,7 @@ from pyspark.mllib.feature import Word2Vec
|
|||
from pyspark.mllib.feature import IDF
|
||||
from pyspark.mllib.feature import StandardScaler, ElementwiseProduct
|
||||
from pyspark.mllib.util import LinearDataGenerator
|
||||
from pyspark.mllib.util import MLUtils
|
||||
from pyspark.serializers import PickleSerializer
|
||||
from pyspark.streaming import StreamingContext
|
||||
from pyspark.sql import SQLContext
|
||||
|
@ -1290,6 +1291,48 @@ class StreamingLinearRegressionWithTests(MLLibStreamingTestCase):
|
|||
self.assertTrue(mean_absolute_errors[1] - mean_absolute_errors[-1] > 2)
|
||||
|
||||
|
||||
class MLUtilsTests(MLlibTestCase):
|
||||
def test_append_bias(self):
|
||||
data = [2.0, 2.0, 2.0]
|
||||
ret = MLUtils.appendBias(data)
|
||||
self.assertEqual(ret[3], 1.0)
|
||||
self.assertEqual(type(ret), DenseVector)
|
||||
|
||||
def test_append_bias_with_vector(self):
|
||||
data = Vectors.dense([2.0, 2.0, 2.0])
|
||||
ret = MLUtils.appendBias(data)
|
||||
self.assertEqual(ret[3], 1.0)
|
||||
self.assertEqual(type(ret), DenseVector)
|
||||
|
||||
def test_append_bias_with_sp_vector(self):
|
||||
data = Vectors.sparse(3, {0: 2.0, 2: 2.0})
|
||||
expected = Vectors.sparse(4, {0: 2.0, 2: 2.0, 3: 1.0})
|
||||
# Returned value must be SparseVector
|
||||
ret = MLUtils.appendBias(data)
|
||||
self.assertEqual(ret, expected)
|
||||
self.assertEqual(type(ret), SparseVector)
|
||||
|
||||
def test_load_vectors(self):
|
||||
import shutil
|
||||
data = [
|
||||
[1.0, 2.0, 3.0],
|
||||
[1.0, 2.0, 3.0]
|
||||
]
|
||||
temp_dir = tempfile.mkdtemp()
|
||||
load_vectors_path = os.path.join(temp_dir, "test_load_vectors")
|
||||
try:
|
||||
self.sc.parallelize(data).saveAsTextFile(load_vectors_path)
|
||||
ret_rdd = MLUtils.loadVectors(self.sc, load_vectors_path)
|
||||
ret = ret_rdd.collect()
|
||||
self.assertEqual(len(ret), 2)
|
||||
self.assertEqual(ret[0], DenseVector([1.0, 2.0, 3.0]))
|
||||
self.assertEqual(ret[1], DenseVector([1.0, 2.0, 3.0]))
|
||||
except:
|
||||
self.fail()
|
||||
finally:
|
||||
shutil.rmtree(load_vectors_path)
|
||||
|
||||
|
||||
if __name__ == "__main__":
|
||||
if not _have_scipy:
|
||||
print("NOTE: Skipping SciPy tests as it does not seem to be installed")
|
||||
|
|
|
@ -169,6 +169,28 @@ class MLUtils(object):
|
|||
minPartitions = minPartitions or min(sc.defaultParallelism, 2)
|
||||
return callMLlibFunc("loadLabeledPoints", sc, path, minPartitions)
|
||||
|
||||
@staticmethod
|
||||
def appendBias(data):
|
||||
"""
|
||||
Returns a new vector with `1.0` (bias) appended to
|
||||
the end of the input vector.
|
||||
"""
|
||||
vec = _convert_to_vector(data)
|
||||
if isinstance(vec, SparseVector):
|
||||
newIndices = np.append(vec.indices, len(vec))
|
||||
newValues = np.append(vec.values, 1.0)
|
||||
return SparseVector(len(vec) + 1, newIndices, newValues)
|
||||
else:
|
||||
return _convert_to_vector(np.append(vec.toArray(), 1.0))
|
||||
|
||||
@staticmethod
|
||||
def loadVectors(sc, path):
|
||||
"""
|
||||
Loads vectors saved using `RDD[Vector].saveAsTextFile`
|
||||
with the default number of partitions.
|
||||
"""
|
||||
return callMLlibFunc("loadVectors", sc, path)
|
||||
|
||||
|
||||
class Saveable(object):
|
||||
"""
|
||||
|
|
Loading…
Reference in a new issue