[WIP] SPARK-1430: Support sparse data in Python MLlib

This PR adds a SparseVector class in PySpark and updates all the regression, classification and clustering algorithms and models to support sparse data, similar to MLlib. I chose to add this class because SciPy is quite difficult to install in many environments (more so than NumPy), but I plan to add support for SciPy sparse vectors later too, and make the methods work transparently on objects of either type.

On the Scala side, we keep Python sparse vectors sparse and pass them to MLlib. We always return dense vectors from our models.

Some to-do items left:
- [x] Support SciPy's scipy.sparse matrix objects when SciPy is available. We can easily add a function to convert these to our own SparseVector.
- [x] MLlib currently uses a vector with one extra column on the left to represent what we call LabeledPoint in Scala. Do we really want this? It may get annoying once you deal with sparse data since you must add/subtract 1 to each feature index when training. We can remove this API in 1.0 and use tuples for labeling.
- [x] Explain how to use these in the Python MLlib docs.

CC @mengxr, @joshrosen

Author: Matei Zaharia <matei@databricks.com>

Closes #341 from mateiz/py-ml-update and squashes the following commits:

d52e763 [Matei Zaharia] Remove no-longer-needed slice code and handle review comments
ea5a25a [Matei Zaharia] Fix remaining uses of copyto() after merge
b9f97a3 [Matei Zaharia] Fix test
1e1bd0f [Matei Zaharia] Add MLlib logistic regression example in Python
88bc01f [Matei Zaharia] Clean up inheritance of LinearModel in Python, and expose its parametrs
37ab747 [Matei Zaharia] Fix some examples and docs due to changes in MLlib API
da0f27e [Matei Zaharia] Added a MLlib K-means example and updated docs to discuss sparse data
c48e85a [Matei Zaharia] Added some tests for passing lists as input, and added mllib/tests.py to run-tests script.
a07ba10 [Matei Zaharia] Fix some typos and calculation of initial weights
74eefe7 [Matei Zaharia] Added LabeledPoint class in Python
889dde8 [Matei Zaharia] Support scipy.sparse matrices in all our algorithms and models
ab244d1 [Matei Zaharia] Allow SparseVectors to be initialized using a dict
a5d6426 [Matei Zaharia] Add linalg.py to run-tests script
0e7a3d8 [Matei Zaharia] Keep vectors sparse in Java when reading LabeledPoints
eaee759 [Matei Zaharia] Update regression, classification and clustering models for sparse data
2abbb44 [Matei Zaharia] Further work to get linear models working with sparse data
154f45d [Matei Zaharia] Update docs, name some magic values
881fef7 [Matei Zaharia] Added a sparse vector in Python and made Java-Python format more compact
This commit is contained in:
Matei Zaharia 2014-04-15 20:33:24 -07:00 committed by Patrick Wendell
parent 8517911efb
commit 63ca581d9c
18 changed files with 1371 additions and 217 deletions

View file

@ -356,16 +356,17 @@ error.
import org.apache.spark.SparkContext
import org.apache.spark.mllib.classification.SVMWithSGD
import org.apache.spark.mllib.regression.LabeledPoint
import org.apache.spark.mllib.linalg.Vectors
// Load and parse the data file
val data = sc.textFile("mllib/data/sample_svm_data.txt")
val parsedData = data.map { line =>
val parts = line.split(' ')
LabeledPoint(parts(0).toDouble, parts.tail.map(x => x.toDouble).toArray)
val parts = line.split(' ').map(_.toDouble)
LabeledPoint(parts(0), Vectors.dense(parts.tail))
}
// Run training algorithm to build the model
val numIterations = 20
val numIterations = 100
val model = SVMWithSGD.train(parsedData, numIterations)
// Evaluate model on training examples and compute training error
@ -401,21 +402,22 @@ val modelL1 = svmAlg.run(parsedData)
The following example demonstrate how to load training data, parse it as an RDD of LabeledPoint.
The example then uses LinearRegressionWithSGD to build a simple linear model to predict label
values. We compute the Mean Squared Error at the end to evaluate
[goodness of fit](http://en.wikipedia.org/wiki/Goodness_of_fit)
[goodness of fit](http://en.wikipedia.org/wiki/Goodness_of_fit).
{% highlight scala %}
import org.apache.spark.mllib.regression.LinearRegressionWithSGD
import org.apache.spark.mllib.regression.LabeledPoint
import org.apache.spark.mllib.linalg.Vectors
// Load and parse the data
val data = sc.textFile("mllib/data/ridge-data/lpsa.data")
val parsedData = data.map { line =>
val parts = line.split(',')
LabeledPoint(parts(0).toDouble, parts(1).split(' ').map(x => x.toDouble).toArray)
LabeledPoint(parts(0).toDouble, Vectors.dense(parts(1).split(' ').map(_.toDouble)))
}
// Building the model
val numIterations = 20
val numIterations = 100
val model = LinearRegressionWithSGD.train(parsedData, numIterations)
// Evaluate model on training examples and compute training error
@ -423,7 +425,7 @@ val valuesAndPreds = parsedData.map { point =>
val prediction = model.predict(point.features)
(point.label, prediction)
}
val MSE = valuesAndPreds.map{ case(v, p) => math.pow((v - p), 2)}.reduce(_ + _)/valuesAndPreds.count
val MSE = valuesAndPreds.map{case(v, p) => math.pow((v - p), 2)}.reduce(_ + _) / valuesAndPreds.count
println("training Mean Squared Error = " + MSE)
{% endhighlight %}
@ -518,18 +520,22 @@ and make predictions with the resulting model to compute the training error.
{% highlight python %}
from pyspark.mllib.classification import LogisticRegressionWithSGD
from pyspark.mllib.regression import LabeledPoint
from numpy import array
# Load and parse the data
def parsePoint(line):
values = [float(x) for x in line.split(' ')]
return LabeledPoint(values[0], values[1:])
data = sc.textFile("mllib/data/sample_svm_data.txt")
parsedData = data.map(lambda line: array([float(x) for x in line.split(' ')]))
model = LogisticRegressionWithSGD.train(parsedData)
parsedData = data.map(parsePoint)
# Build the model
labelsAndPreds = parsedData.map(lambda point: (int(point.item(0)),
model.predict(point.take(range(1, point.size)))))
model = LogisticRegressionWithSGD.train(parsedData)
# Evaluating the model on training data
labelsAndPreds = parsedData.map(lambda p: (p.label, model.predict(p.features)))
trainErr = labelsAndPreds.filter(lambda (v, p): v != p).count() / float(parsedData.count())
print("Training Error = " + str(trainErr))
{% endhighlight %}
@ -538,22 +544,25 @@ print("Training Error = " + str(trainErr))
The following example demonstrate how to load training data, parse it as an RDD of LabeledPoint.
The example then uses LinearRegressionWithSGD to build a simple linear model to predict label
values. We compute the Mean Squared Error at the end to evaluate
[goodness of fit](http://en.wikipedia.org/wiki/Goodness_of_fit)
[goodness of fit](http://en.wikipedia.org/wiki/Goodness_of_fit).
{% highlight python %}
from pyspark.mllib.regression import LinearRegressionWithSGD
from pyspark.mllib.regression import LabeledPoint, LinearRegressionWithSGD
from numpy import array
# Load and parse the data
def parsePoint(line):
values = [float(x) for x in line.replace(',', ' ').split(' ')]
return LabeledPoint(values[0], values[1:])
data = sc.textFile("mllib/data/ridge-data/lpsa.data")
parsedData = data.map(lambda line: array([float(x) for x in line.replace(',', ' ').split(' ')]))
parsedData = data.map(parsePoint)
# Build the model
model = LinearRegressionWithSGD.train(parsedData)
# Evaluate the model on training data
valuesAndPreds = parsedData.map(lambda point: (point.item(0),
model.predict(point.take(range(1, point.size)))))
MSE = valuesAndPreds.map(lambda (v, p): (v - p)**2).reduce(lambda x, y: x + y)/valuesAndPreds.count()
valuesAndPreds = parsedData.map(lambda p: (p.label, model.predict(p.features)))
MSE = valuesAndPreds.map(lambda (v, p): (v - p)**2).reduce(lambda x, y: x + y) / valuesAndPreds.count()
print("Mean Squared Error = " + str(MSE))
{% endhighlight %}
{% endhighlight %}

View file

@ -48,14 +48,15 @@ optimal *k* is usually one where there is an "elbow" in the WSSSE graph.
{% highlight scala %}
import org.apache.spark.mllib.clustering.KMeans
import org.apache.spark.mllib.linalg.Vectors
// Load and parse the data
val data = sc.textFile("kmeans_data.txt")
val parsedData = data.map( _.split(' ').map(_.toDouble))
val data = sc.textFile("data/kmeans_data.txt")
val parsedData = data.map(s => Vectors.dense(s.split(' ').map(_.toDouble)))
// Cluster the data into two classes using KMeans
val numIterations = 20
val numClusters = 2
val numIterations = 20
val clusters = KMeans.train(parsedData, numClusters, numIterations)
// Evaluate clustering by computing Within Set Sum of Squared Errors
@ -85,12 +86,12 @@ from numpy import array
from math import sqrt
# Load and parse the data
data = sc.textFile("kmeans_data.txt")
data = sc.textFile("data/kmeans_data.txt")
parsedData = data.map(lambda line: array([float(x) for x in line.split(' ')]))
# Build the model (cluster the data)
clusters = KMeans.train(parsedData, 2, maxIterations=10,
runs=30, initialization_mode="random")
runs=10, initialization_mode="random")
# Evaluate clustering by computing Within Set Sum of Squared Errors
def error(point):

View file

@ -7,8 +7,9 @@ title: Machine Learning Library (MLlib)
MLlib is a Spark implementation of some common machine learning (ML)
functionality, as well associated tests and data generators. MLlib
currently supports four common types of machine learning problem settings,
namely, binary classification, regression, clustering and collaborative
filtering, as well as an underlying gradient descent optimization primitive.
namely classification, regression, clustering and collaborative filtering,
as well as an underlying gradient descent optimization primitive and several
linear algebra methods.
# Available Methods
The following links provide a detailed explanation of the methods and usage examples for each of them:
@ -32,6 +33,28 @@ The following links provide a detailed explanation of the methods and usage exam
* Singular Value Decomposition
* Principal Component Analysis
# Data Types
Most MLlib algorithms operate on RDDs containing vectors. In Java and Scala, the
[Vector](api/mllib/index.html#org.apache.spark.mllib.linalg.Vector) class is used to
represent vectors. You can create either dense or sparse vectors using the
[Vectors](api/mllib/index.html#org.apache.spark.mllib.linalg.Vectors$) factory.
In Python, MLlib can take the following vector types:
* [NumPy](http://www.numpy.org) arrays
* Standard Python lists (e.g. `[1, 2, 3]`)
* The MLlib [SparseVector](api/pyspark/pyspark.mllib.linalg.SparseVector-class.html) class
* [SciPy sparse matrices](http://docs.scipy.org/doc/scipy/reference/sparse.html)
For efficiency, we recommend using NumPy arrays over lists, and using the
[CSC format](http://docs.scipy.org/doc/scipy/reference/generated/scipy.sparse.csc_matrix.html#scipy.sparse.csc_matrix)
for SciPy matrices, or MLlib's own SparseVector class.
Several other simple data types are used throughout the library, e.g. the LabeledPoint
class ([Java/Scala](api/mllib/index.html#org.apache.spark.mllib.regression.LabeledPoint),
[Python](api/pyspark/pyspark.mllib.regression.LabeledPoint-class.html)) for labeled data.
# Dependencies
MLlib uses the [jblas](https://github.com/mikiobraun/jblas) linear algebra library, which itself
depends on native Fortran routines. You may need to install the

View file

@ -23,7 +23,7 @@ import org.apache.spark.annotation.DeveloperApi
import org.apache.spark.api.java.JavaRDD
import org.apache.spark.mllib.classification._
import org.apache.spark.mllib.clustering._
import org.apache.spark.mllib.linalg.Vectors
import org.apache.spark.mllib.linalg.{SparseVector, Vector, Vectors}
import org.apache.spark.mllib.recommendation._
import org.apache.spark.mllib.regression._
import org.apache.spark.rdd.RDD
@ -31,56 +31,112 @@ import org.apache.spark.rdd.RDD
/**
* :: DeveloperApi ::
* The Java stubs necessary for the Python mllib bindings.
*
* See python/pyspark/mllib/_common.py for the mutually agreed upon data format.
*/
@DeveloperApi
class PythonMLLibAPI extends Serializable {
private def deserializeDoubleVector(bytes: Array[Byte]): Array[Double] = {
val packetLength = bytes.length
if (packetLength < 16) {
throw new IllegalArgumentException("Byte array too short.")
}
val bb = ByteBuffer.wrap(bytes)
bb.order(ByteOrder.nativeOrder())
val magic = bb.getLong()
if (magic != 1) {
private val DENSE_VECTOR_MAGIC: Byte = 1
private val SPARSE_VECTOR_MAGIC: Byte = 2
private val DENSE_MATRIX_MAGIC: Byte = 3
private val LABELED_POINT_MAGIC: Byte = 4
private def deserializeDoubleVector(bytes: Array[Byte], offset: Int = 0): Vector = {
require(bytes.length - offset >= 5, "Byte array too short")
val magic = bytes(offset)
if (magic == DENSE_VECTOR_MAGIC) {
deserializeDenseVector(bytes, offset)
} else if (magic == SPARSE_VECTOR_MAGIC) {
deserializeSparseVector(bytes, offset)
} else {
throw new IllegalArgumentException("Magic " + magic + " is wrong.")
}
val length = bb.getLong()
if (packetLength != 16 + 8 * length) {
throw new IllegalArgumentException("Length " + length + " is wrong.")
}
}
private def deserializeDenseVector(bytes: Array[Byte], offset: Int = 0): Vector = {
val packetLength = bytes.length - offset
require(packetLength >= 5, "Byte array too short")
val bb = ByteBuffer.wrap(bytes, offset, bytes.length - offset)
bb.order(ByteOrder.nativeOrder())
val magic = bb.get()
require(magic == DENSE_VECTOR_MAGIC, "Invalid magic: " + magic)
val length = bb.getInt()
require (packetLength == 5 + 8 * length, "Invalid packet length: " + packetLength)
val db = bb.asDoubleBuffer()
val ans = new Array[Double](length.toInt)
db.get(ans)
ans
Vectors.dense(ans)
}
private def serializeDoubleVector(doubles: Array[Double]): Array[Byte] = {
private def deserializeSparseVector(bytes: Array[Byte], offset: Int = 0): Vector = {
val packetLength = bytes.length - offset
require(packetLength >= 9, "Byte array too short")
val bb = ByteBuffer.wrap(bytes, offset, bytes.length - offset)
bb.order(ByteOrder.nativeOrder())
val magic = bb.get()
require(magic == SPARSE_VECTOR_MAGIC, "Invalid magic: " + magic)
val size = bb.getInt()
val nonZeros = bb.getInt()
require (packetLength == 9 + 12 * nonZeros, "Invalid packet length: " + packetLength)
val ib = bb.asIntBuffer()
val indices = new Array[Int](nonZeros)
ib.get(indices)
bb.position(bb.position() + 4 * nonZeros)
val db = bb.asDoubleBuffer()
val values = new Array[Double](nonZeros)
db.get(values)
Vectors.sparse(size, indices, values)
}
private def serializeDenseVector(doubles: Array[Double]): Array[Byte] = {
val len = doubles.length
val bytes = new Array[Byte](16 + 8 * len)
val bytes = new Array[Byte](5 + 8 * len)
val bb = ByteBuffer.wrap(bytes)
bb.order(ByteOrder.nativeOrder())
bb.putLong(1)
bb.putLong(len)
bb.put(DENSE_VECTOR_MAGIC)
bb.putInt(len)
val db = bb.asDoubleBuffer()
db.put(doubles)
bytes
}
private def serializeSparseVector(vector: SparseVector): Array[Byte] = {
val nonZeros = vector.indices.length
val bytes = new Array[Byte](9 + 12 * nonZeros)
val bb = ByteBuffer.wrap(bytes)
bb.order(ByteOrder.nativeOrder())
bb.put(SPARSE_VECTOR_MAGIC)
bb.putInt(vector.size)
bb.putInt(nonZeros)
val ib = bb.asIntBuffer()
ib.put(vector.indices)
bb.position(bb.position() + 4 * nonZeros)
val db = bb.asDoubleBuffer()
db.put(vector.values)
bytes
}
private def serializeDoubleVector(vector: Vector): Array[Byte] = vector match {
case s: SparseVector =>
serializeSparseVector(s)
case _ =>
serializeDenseVector(vector.toArray)
}
private def deserializeDoubleMatrix(bytes: Array[Byte]): Array[Array[Double]] = {
val packetLength = bytes.length
if (packetLength < 24) {
if (packetLength < 9) {
throw new IllegalArgumentException("Byte array too short.")
}
val bb = ByteBuffer.wrap(bytes)
bb.order(ByteOrder.nativeOrder())
val magic = bb.getLong()
if (magic != 2) {
val magic = bb.get()
if (magic != DENSE_MATRIX_MAGIC) {
throw new IllegalArgumentException("Magic " + magic + " is wrong.")
}
val rows = bb.getLong()
val cols = bb.getLong()
if (packetLength != 24 + 8 * rows * cols) {
val rows = bb.getInt()
val cols = bb.getInt()
if (packetLength != 9 + 8 * rows * cols) {
throw new IllegalArgumentException("Size " + rows + "x" + cols + " is wrong.")
}
val db = bb.asDoubleBuffer()
@ -98,12 +154,12 @@ class PythonMLLibAPI extends Serializable {
if (rows > 0) {
cols = doubles(0).length
}
val bytes = new Array[Byte](24 + 8 * rows * cols)
val bytes = new Array[Byte](9 + 8 * rows * cols)
val bb = ByteBuffer.wrap(bytes)
bb.order(ByteOrder.nativeOrder())
bb.putLong(2)
bb.putLong(rows)
bb.putLong(cols)
bb.put(DENSE_MATRIX_MAGIC)
bb.putInt(rows)
bb.putInt(cols)
val db = bb.asDoubleBuffer()
for (i <- 0 until rows) {
db.put(doubles(i))
@ -111,18 +167,27 @@ class PythonMLLibAPI extends Serializable {
bytes
}
private def deserializeLabeledPoint(bytes: Array[Byte]): LabeledPoint = {
require(bytes.length >= 9, "Byte array too short")
val magic = bytes(0)
if (magic != LABELED_POINT_MAGIC) {
throw new IllegalArgumentException("Magic " + magic + " is wrong.")
}
val labelBytes = ByteBuffer.wrap(bytes, 1, 8)
labelBytes.order(ByteOrder.nativeOrder())
val label = labelBytes.asDoubleBuffer().get(0)
LabeledPoint(label, deserializeDoubleVector(bytes, 9))
}
private def trainRegressionModel(
trainFunc: (RDD[LabeledPoint], Array[Double]) => GeneralizedLinearModel,
trainFunc: (RDD[LabeledPoint], Vector) => GeneralizedLinearModel,
dataBytesJRDD: JavaRDD[Array[Byte]],
initialWeightsBA: Array[Byte]): java.util.LinkedList[java.lang.Object] = {
val data = dataBytesJRDD.rdd.map(xBytes => {
val x = deserializeDoubleVector(xBytes)
LabeledPoint(x(0), Vectors.dense(x.slice(1, x.length)))
})
val data = dataBytesJRDD.rdd.map(deserializeLabeledPoint)
val initialWeights = deserializeDoubleVector(initialWeightsBA)
val model = trainFunc(data, initialWeights)
val ret = new java.util.LinkedList[java.lang.Object]()
ret.add(serializeDoubleVector(model.weights.toArray))
ret.add(serializeDoubleVector(model.weights))
ret.add(model.intercept: java.lang.Double)
ret
}
@ -143,7 +208,7 @@ class PythonMLLibAPI extends Serializable {
numIterations,
stepSize,
miniBatchFraction,
Vectors.dense(initialWeights)),
initialWeights),
dataBytesJRDD,
initialWeightsBA)
}
@ -166,7 +231,7 @@ class PythonMLLibAPI extends Serializable {
stepSize,
regParam,
miniBatchFraction,
Vectors.dense(initialWeights)),
initialWeights),
dataBytesJRDD,
initialWeightsBA)
}
@ -189,7 +254,7 @@ class PythonMLLibAPI extends Serializable {
stepSize,
regParam,
miniBatchFraction,
Vectors.dense(initialWeights)),
initialWeights),
dataBytesJRDD,
initialWeightsBA)
}
@ -212,7 +277,7 @@ class PythonMLLibAPI extends Serializable {
stepSize,
regParam,
miniBatchFraction,
Vectors.dense(initialWeights)),
initialWeights),
dataBytesJRDD,
initialWeightsBA)
}
@ -233,7 +298,7 @@ class PythonMLLibAPI extends Serializable {
numIterations,
stepSize,
miniBatchFraction,
Vectors.dense(initialWeights)),
initialWeights),
dataBytesJRDD,
initialWeightsBA)
}
@ -244,14 +309,11 @@ class PythonMLLibAPI extends Serializable {
def trainNaiveBayes(
dataBytesJRDD: JavaRDD[Array[Byte]],
lambda: Double): java.util.List[java.lang.Object] = {
val data = dataBytesJRDD.rdd.map(xBytes => {
val x = deserializeDoubleVector(xBytes)
LabeledPoint(x(0), Vectors.dense(x.slice(1, x.length)))
})
val data = dataBytesJRDD.rdd.map(deserializeLabeledPoint)
val model = NaiveBayes.train(data, lambda)
val ret = new java.util.LinkedList[java.lang.Object]()
ret.add(serializeDoubleVector(model.labels))
ret.add(serializeDoubleVector(model.pi))
ret.add(serializeDoubleVector(Vectors.dense(model.labels)))
ret.add(serializeDoubleVector(Vectors.dense(model.pi)))
ret.add(serializeDoubleMatrix(model.theta))
ret
}
@ -265,7 +327,7 @@ class PythonMLLibAPI extends Serializable {
maxIterations: Int,
runs: Int,
initializationMode: String): java.util.List[java.lang.Object] = {
val data = dataBytesJRDD.rdd.map(xBytes => Vectors.dense(deserializeDoubleVector(xBytes)))
val data = dataBytesJRDD.rdd.map(bytes => deserializeDoubleVector(bytes))
val model = KMeans.train(data, k, maxIterations, runs, initializationMode)
val ret = new java.util.LinkedList[java.lang.Object]()
ret.add(serializeDoubleMatrix(model.clusterCenters.map(_.toArray)))

View file

@ -130,9 +130,11 @@ object Vectors {
private[mllib] def fromBreeze(breezeVector: BV[Double]): Vector = {
breezeVector match {
case v: BDV[Double] =>
require(v.offset == 0, s"Do not support non-zero offset ${v.offset}.")
require(v.stride == 1, s"Do not support stride other than 1, but got ${v.stride}.")
new DenseVector(v.data)
if (v.offset == 0 && v.stride == 1) {
new DenseVector(v.data)
} else {
new DenseVector(v.toArray) // Can't use underlying array directly, so make a new one
}
case v: BSV[Double] =>
new SparseVector(v.length, v.index, v.data)
case v: BV[_] =>

View file

@ -82,4 +82,22 @@ class VectorsSuite extends FunSuite {
assert(v.## != another.##)
}
}
test("indexing dense vectors") {
val vec = Vectors.dense(1.0, 2.0, 3.0, 4.0)
assert(vec(0) === 1.0)
assert(vec(3) === 4.0)
}
test("indexing sparse vectors") {
val vec = Vectors.sparse(7, Array(0, 2, 4, 6), Array(1.0, 2.0, 3.0, 4.0))
assert(vec(0) === 1.0)
assert(vec(1) === 0.0)
assert(vec(2) === 2.0)
assert(vec(3) === 0.0)
assert(vec(6) === 4.0)
val vec2 = Vectors.sparse(8, Array(0, 2, 4, 6), Array(1.0, 2.0, 3.0, 4.0))
assert(vec2(6) === 4.0)
assert(vec2(7) === 0.0)
}
}

View file

@ -33,5 +33,6 @@ target: docs/
private: no
exclude: pyspark.cloudpickle pyspark.worker pyspark.join
pyspark.java_gateway pyspark.examples pyspark.shell pyspark.test
pyspark.java_gateway pyspark.examples pyspark.shell pyspark.tests
pyspark.rddsampler pyspark.daemon pyspark.mllib._common
pyspark.mllib.tests

View file

@ -16,8 +16,13 @@
#
"""
This example requires numpy (http://www.numpy.org/)
The K-means algorithm written from scratch against PySpark. In practice,
one may prefer to use the KMeans algorithm in MLlib, as shown in
python/examples/mllib/kmeans.py.
This example requires NumPy (http://www.numpy.org/).
"""
import sys
import numpy as np
@ -49,9 +54,7 @@ if __name__ == "__main__":
K = int(sys.argv[3])
convergeDist = float(sys.argv[4])
# TODO: change this after we port takeSample()
#kPoints = data.takeSample(False, K, 34)
kPoints = data.take(K)
kPoints = data.takeSample(False, K, 1)
tempDist = 1.0
while tempDist > convergeDist:

View file

@ -16,9 +16,13 @@
#
"""
A logistic regression implementation that uses NumPy (http://www.numpy.org) to act on batches
of input data using efficient matrix operations.
A logistic regression implementation that uses NumPy (http://www.numpy.org)
to act on batches of input data using efficient matrix operations.
In practice, one may prefer to use the LogisticRegression algorithm in
MLlib, as shown in python/examples/mllib/logistic_regression.py.
"""
from collections import namedtuple
from math import exp
from os.path import realpath

44
python/examples/mllib/kmeans.py Executable file
View file

@ -0,0 +1,44 @@
#
# Licensed to the Apache Software Foundation (ASF) under one or more
# contributor license agreements. See the NOTICE file distributed with
# this work for additional information regarding copyright ownership.
# The ASF licenses this file to You under the Apache License, Version 2.0
# (the "License"); you may not use this file except in compliance with
# the License. You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.
#
"""
A K-means clustering program using MLlib.
This example requires NumPy (http://www.numpy.org/).
"""
import sys
import numpy as np
from pyspark import SparkContext
from pyspark.mllib.clustering import KMeans
def parseVector(line):
return np.array([float(x) for x in line.split(' ')])
if __name__ == "__main__":
if len(sys.argv) < 4:
print >> sys.stderr, "Usage: kmeans <master> <file> <k>"
exit(-1)
sc = SparkContext(sys.argv[1], "KMeans")
lines = sc.textFile(sys.argv[2])
data = lines.map(parseVector)
k = int(sys.argv[3])
model = KMeans.train(data, k)
print "Final centers: " + str(model.clusterCenters)

View file

@ -0,0 +1,50 @@
#
# Licensed to the Apache Software Foundation (ASF) under one or more
# contributor license agreements. See the NOTICE file distributed with
# this work for additional information regarding copyright ownership.
# The ASF licenses this file to You under the Apache License, Version 2.0
# (the "License"); you may not use this file except in compliance with
# the License. You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.
#
"""
Logistic regression using MLlib.
This example requires NumPy (http://www.numpy.org/).
"""
from math import exp
import sys
import numpy as np
from pyspark import SparkContext
from pyspark.mllib.regression import LabeledPoint
from pyspark.mllib.classification import LogisticRegressionWithSGD
# Parse a line of text into an MLlib LabeledPoint object
def parsePoint(line):
values = [float(s) for s in line.split(' ')]
if values[0] == -1: # Convert -1 labels to 0 for MLlib
values[0] = 0
return LabeledPoint(values[0], values[1:])
if __name__ == "__main__":
if len(sys.argv) != 4:
print >> sys.stderr, "Usage: logistic_regression <master> <file> <iters>"
exit(-1)
sc = SparkContext(sys.argv[1], "PythonLR")
points = sc.textFile(sys.argv[2]).map(parsePoint)
iterations = int(sys.argv[3])
model = LogisticRegressionWithSGD.train(points, iterations)
print "Final weights: " + str(model.weights)
print "Final intercept: " + str(model.intercept)

View file

@ -15,38 +15,86 @@
# limitations under the License.
#
from numpy import ndarray, float64, int64, int32, ones, array_equal, array, dot, shape, complex, issubdtype
from pyspark import SparkContext, RDD
import numpy as np
from pyspark.serializers import Serializer
import struct
import numpy
from numpy import ndarray, float64, int64, int32, array_equal, array
from pyspark import SparkContext, RDD
from pyspark.mllib.linalg import SparseVector
from pyspark.serializers import Serializer
# Double vector format:
"""
Common utilities shared throughout MLlib, primarily for dealing with
different data types. These include:
- Serialization utilities to / from byte arrays that Java can handle
- Serializers for other data types, like ALS Rating objects
- Common methods for linear models
- Methods to deal with the different vector types we support, such as
SparseVector and scipy.sparse matrices.
"""
# Check whether we have SciPy. MLlib works without it too, but if we have it, some methods,
# such as _dot and _serialize_double_vector, start to support scipy.sparse matrices.
_have_scipy = False
_scipy_issparse = None
try:
import scipy.sparse
_have_scipy = True
_scipy_issparse = scipy.sparse.issparse
except:
# No SciPy in environment, but that's okay
pass
# Serialization functions to and from Scala. These use the following formats, understood
# by the PythonMLLibAPI class in Scala:
#
# [8-byte 1] [8-byte length] [length*8 bytes of data]
# Dense double vector format:
#
# [1-byte 1] [4-byte length] [length*8 bytes of data]
#
# Sparse double vector format:
#
# [1-byte 2] [4-byte length] [4-byte nonzeros] [nonzeros*4 bytes of indices] [nonzeros*8 bytes of values]
#
# Double matrix format:
#
# [8-byte 2] [8-byte rows] [8-byte cols] [rows*cols*8 bytes of data]
# [1-byte 3] [4-byte rows] [4-byte cols] [rows*cols*8 bytes of data]
#
# LabeledPoint format:
#
# [1-byte 4] [8-byte label] [dense or sparse vector]
#
# This is all in machine-endian. That means that the Java interpreter and the
# Python interpreter must agree on what endian the machine is.
def _deserialize_byte_array(shape, ba, offset):
"""Wrapper around ndarray aliasing hack.
DENSE_VECTOR_MAGIC = 1
SPARSE_VECTOR_MAGIC = 2
DENSE_MATRIX_MAGIC = 3
LABELED_POINT_MAGIC = 4
def _deserialize_numpy_array(shape, ba, offset, dtype=float64):
"""
Deserialize a numpy array of the given type from an offset in
bytearray ba, assigning it the given shape.
>>> x = array([1.0, 2.0, 3.0, 4.0, 5.0])
>>> array_equal(x, _deserialize_byte_array(x.shape, x.data, 0))
>>> array_equal(x, _deserialize_numpy_array(x.shape, x.data, 0))
True
>>> x = array([1.0, 2.0, 3.0, 4.0]).reshape(2,2)
>>> array_equal(x, _deserialize_byte_array(x.shape, x.data, 0))
>>> array_equal(x, _deserialize_numpy_array(x.shape, x.data, 0))
True
>>> x = array([1, 2, 3], dtype=int32)
>>> array_equal(x, _deserialize_numpy_array(x.shape, x.data, 0, dtype=int32))
True
"""
ar = ndarray(shape=shape, buffer=ba, offset=offset, dtype="float64",
order='C')
ar = ndarray(shape=shape, buffer=ba, offset=offset, dtype=dtype, order='C')
return ar.copy()
def _serialize_double_vector(v):
"""Serialize a double vector into a mutually understood format.
@ -55,160 +103,231 @@ def _serialize_double_vector(v):
>>> array_equal(y, array([1.0, 2.0, 3.0]))
True
"""
if type(v) != ndarray:
v = _convert_vector(v)
if type(v) == ndarray:
return _serialize_dense_vector(v)
elif type(v) == SparseVector:
return _serialize_sparse_vector(v)
else:
raise TypeError("_serialize_double_vector called on a %s; "
"wanted ndarray" % type(v))
"""complex is only datatype that can't be converted to float64"""
if issubdtype(v.dtype, complex):
raise TypeError("_serialize_double_vector called on a %s; "
"wanted ndarray" % type(v))
if v.dtype != float64:
v = v.astype(float64)
"wanted ndarray or SparseVector" % type(v))
def _serialize_dense_vector(v):
"""Serialize a dense vector given as a NumPy array."""
if v.ndim != 1:
raise TypeError("_serialize_double_vector called on a %ddarray; "
"wanted a 1darray" % v.ndim)
if v.dtype != float64:
if numpy.issubdtype(v.dtype, numpy.complex):
raise TypeError("_serialize_double_vector called on an ndarray of %s; "
"wanted ndarray of float64" % v.dtype)
v = v.astype(float64)
length = v.shape[0]
ba = bytearray(16 + 8*length)
header = ndarray(shape=[2], buffer=ba, dtype="int64")
header[0] = 1
header[1] = length
arr_mid = ndarray(shape=[length], buffer=ba, offset=16, dtype="float64")
arr_mid[...] = v
ba = bytearray(5 + 8 * length)
ba[0] = DENSE_VECTOR_MAGIC
length_bytes = ndarray(shape=[1], buffer=ba, offset=1, dtype=int32)
length_bytes[0] = length
_copyto(v, buffer=ba, offset=5, shape=[length], dtype=float64)
return ba
def _serialize_sparse_vector(v):
"""Serialize a pyspark.mllib.linalg.SparseVector."""
nonzeros = len(v.indices)
ba = bytearray(9 + 12 * nonzeros)
ba[0] = SPARSE_VECTOR_MAGIC
header = ndarray(shape=[2], buffer=ba, offset=1, dtype=int32)
header[0] = v.size
header[1] = nonzeros
_copyto(v.indices, buffer=ba, offset=9, shape=[nonzeros], dtype=int32)
values_offset = 9 + 4 * nonzeros
_copyto(v.values, buffer=ba, offset=values_offset, shape=[nonzeros], dtype=float64)
return ba
def _deserialize_double_vector(ba):
"""Deserialize a double vector from a mutually understood format.
>>> x = array([1.0, 2.0, 3.0, 4.0, -1.0, 0.0, -0.0])
>>> array_equal(x, _deserialize_double_vector(_serialize_double_vector(x)))
True
>>> s = SparseVector(4, [1, 3], [3.0, 5.5])
>>> s == _deserialize_double_vector(_serialize_double_vector(s))
True
"""
if type(ba) != bytearray:
raise TypeError("_deserialize_double_vector called on a %s; "
"wanted bytearray" % type(ba))
if len(ba) < 16:
if len(ba) < 5:
raise TypeError("_deserialize_double_vector called on a %d-byte array, "
"which is too short" % len(ba))
if (len(ba) & 7) != 0:
raise TypeError("_deserialize_double_vector called on a %d-byte array, "
"which is not a multiple of 8" % len(ba))
header = ndarray(shape=[2], buffer=ba, dtype="int64")
if header[0] != 1:
if ba[0] == DENSE_VECTOR_MAGIC:
return _deserialize_dense_vector(ba)
elif ba[0] == SPARSE_VECTOR_MAGIC:
return _deserialize_sparse_vector(ba)
else:
raise TypeError("_deserialize_double_vector called on bytearray "
"with wrong magic")
length = header[1]
if len(ba) != 8*length + 16:
raise TypeError("_deserialize_double_vector called on bytearray "
def _deserialize_dense_vector(ba):
"""Deserialize a dense vector into a numpy array."""
if len(ba) < 5:
raise TypeError("_deserialize_dense_vector called on a %d-byte array, "
"which is too short" % len(ba))
length = ndarray(shape=[1], buffer=ba, offset=1, dtype=int32)[0]
if len(ba) != 8 * length + 5:
raise TypeError("_deserialize_dense_vector called on bytearray "
"with wrong length")
return _deserialize_byte_array([length], ba, 16)
return _deserialize_numpy_array([length], ba, 5)
def _deserialize_sparse_vector(ba):
"""Deserialize a sparse vector into a MLlib SparseVector object."""
if len(ba) < 9:
raise TypeError("_deserialize_sparse_vector called on a %d-byte array, "
"which is too short" % len(ba))
header = ndarray(shape=[2], buffer=ba, offset=1, dtype=int32)
size = header[0]
nonzeros = header[1]
if len(ba) != 9 + 12 * nonzeros:
raise TypeError("_deserialize_sparse_vector called on bytearray "
"with wrong length")
indices = _deserialize_numpy_array([nonzeros], ba, 9, dtype=int32)
values = _deserialize_numpy_array([nonzeros], ba, 9 + 4 * nonzeros, dtype=float64)
return SparseVector(int(size), indices, values)
def _serialize_double_matrix(m):
"""Serialize a double matrix into a mutually understood format."""
if (type(m) == ndarray and m.dtype == float64 and m.ndim == 2):
if (type(m) == ndarray and m.ndim == 2):
if m.dtype != float64:
if numpy.issubdtype(m.dtype, numpy.complex):
raise TypeError("_serialize_double_matrix called on an ndarray of %s; "
"wanted ndarray of float64" % m.dtype)
m = m.astype(float64)
rows = m.shape[0]
cols = m.shape[1]
ba = bytearray(24 + 8 * rows * cols)
header = ndarray(shape=[3], buffer=ba, dtype="int64")
header[0] = 2
header[1] = rows
header[2] = cols
arr_mid = ndarray(shape=[rows, cols], buffer=ba, offset=24,
dtype="float64", order='C')
arr_mid[...] = m
ba = bytearray(9 + 8 * rows * cols)
ba[0] = DENSE_MATRIX_MAGIC
lengths = ndarray(shape=[3], buffer=ba, offset=1, dtype=int32)
lengths[0] = rows
lengths[1] = cols
_copyto(m, buffer=ba, offset=9, shape=[rows, cols], dtype=float64)
return ba
else:
raise TypeError("_serialize_double_matrix called on a "
"non-double-matrix")
def _deserialize_double_matrix(ba):
"""Deserialize a double matrix from a mutually understood format."""
if type(ba) != bytearray:
raise TypeError("_deserialize_double_matrix called on a %s; "
"wanted bytearray" % type(ba))
if len(ba) < 24:
if len(ba) < 9:
raise TypeError("_deserialize_double_matrix called on a %d-byte array, "
"which is too short" % len(ba))
if (len(ba) & 7) != 0:
raise TypeError("_deserialize_double_matrix called on a %d-byte array, "
"which is not a multiple of 8" % len(ba))
header = ndarray(shape=[3], buffer=ba, dtype="int64")
if (header[0] != 2):
if ba[0] != DENSE_MATRIX_MAGIC:
raise TypeError("_deserialize_double_matrix called on bytearray "
"with wrong magic")
rows = header[1]
cols = header[2]
if (len(ba) != 8*rows*cols + 24):
lengths = ndarray(shape=[2], buffer=ba, offset=1, dtype=int32)
rows = lengths[0]
cols = lengths[1]
if (len(ba) != 8 * rows * cols + 9):
raise TypeError("_deserialize_double_matrix called on bytearray "
"with wrong length")
return _deserialize_byte_array([rows, cols], ba, 24)
return _deserialize_numpy_array([rows, cols], ba, 9)
def _serialize_labeled_point(p):
"""Serialize a LabeledPoint with a features vector of any type."""
from pyspark.mllib.regression import LabeledPoint
serialized_features = _serialize_double_vector(p.features)
header = bytearray(9)
header[0] = LABELED_POINT_MAGIC
header_float = ndarray(shape=[1], buffer=header, offset=1, dtype=float64)
header_float[0] = p.label
return header + serialized_features
def _copyto(array, buffer, offset, shape, dtype):
"""
Copy the contents of a vector to a destination bytearray at the
given offset.
TODO: In the future this could use numpy.copyto on NumPy 1.7+, but
we should benchmark that to see whether it provides a benefit.
"""
temp_array = ndarray(shape=shape, buffer=buffer, offset=offset, dtype=dtype, order='C')
temp_array[...] = array
def _get_unmangled_rdd(data, serializer):
dataBytes = data.map(serializer)
dataBytes._bypass_serializer = True
dataBytes.cache() # TODO: users should unpersist() this later!
return dataBytes
# Map a pickled Python RDD of Python dense or sparse vectors to a Java RDD of
# _serialized_double_vectors
def _get_unmangled_double_vector_rdd(data):
return _get_unmangled_rdd(data, _serialize_double_vector)
# Map a pickled Python RDD of LabeledPoint to a Java RDD of _serialized_labeled_points
def _get_unmangled_labeled_point_rdd(data):
return _get_unmangled_rdd(data, _serialize_labeled_point)
# Common functions for dealing with and training linear models
def _linear_predictor_typecheck(x, coeffs):
"""Check that x is a one-dimensional vector of the right shape.
This is a temporary hackaround until I actually implement bulk predict."""
"""
Check that x is a one-dimensional vector of the right shape.
This is a temporary hackaround until we actually implement bulk predict.
"""
x = _convert_vector(x)
if type(x) == ndarray:
if x.ndim == 1:
if x.shape == coeffs.shape:
pass
else:
if x.shape != coeffs.shape:
raise RuntimeError("Got array of %d elements; wanted %d"
% (shape(x)[0], shape(coeffs)[0]))
% (numpy.shape(x)[0], coeffs.shape[0]))
else:
raise RuntimeError("Bulk predict not yet supported.")
elif type(x) == SparseVector:
if x.size != coeffs.shape[0]:
raise RuntimeError("Got sparse vector of size %d; wanted %d"
% (x.size, coeffs.shape[0]))
elif (type(x) == RDD):
raise RuntimeError("Bulk predict not yet supported.")
else:
raise TypeError("Argument of type " + type(x).__name__ + " unsupported")
def _get_unmangled_rdd(data, serializer):
dataBytes = data.map(serializer)
dataBytes._bypass_serializer = True
dataBytes.cache()
return dataBytes
# Map a pickled Python RDD of numpy double vectors to a Java RDD of
# _serialized_double_vectors
def _get_unmangled_double_vector_rdd(data):
return _get_unmangled_rdd(data, _serialize_double_vector)
class LinearModel(object):
"""Something that has a vector of coefficients and an intercept."""
def __init__(self, coeff, intercept):
self._coeff = coeff
self._intercept = intercept
class LinearRegressionModelBase(LinearModel):
"""A linear regression model.
>>> lrmb = LinearRegressionModelBase(array([1.0, 2.0]), 0.1)
>>> abs(lrmb.predict(array([-1.03, 7.777])) - 14.624) < 1e-6
True
"""
def predict(self, x):
"""Predict the value of the dependent variable given a vector x"""
"""containing values for the independent variables."""
_linear_predictor_typecheck(x, self._coeff)
return dot(self._coeff, x) + self._intercept
# If we weren't given initial weights, take a zero vector of the appropriate
# length.
def _get_initial_weights(initial_weights, data):
if initial_weights is None:
initial_weights = data.first()
if type(initial_weights) != ndarray:
raise TypeError("At least one data element has type "
+ type(initial_weights).__name__ + " which is not ndarray")
if initial_weights.ndim != 1:
raise TypeError("At least one data element has "
+ initial_weights.ndim + " dimensions, which is not 1")
initial_weights = ones([initial_weights.shape[0] - 1])
initial_weights = _convert_vector(data.first().features)
if type(initial_weights) == ndarray:
if initial_weights.ndim != 1:
raise TypeError("At least one data element has "
+ initial_weights.ndim + " dimensions, which is not 1")
initial_weights = numpy.zeros([initial_weights.shape[0]])
elif type(initial_weights) == SparseVector:
initial_weights = numpy.zeros([initial_weights.size])
return initial_weights
# train_func should take two parameters, namely data and initial_weights, and
# return the result of a call to the appropriate JVM stub.
# _regression_train_wrapper is responsible for setup and error checking.
def _regression_train_wrapper(sc, train_func, klass, data, initial_weights):
initial_weights = _get_initial_weights(initial_weights, data)
dataBytes = _get_unmangled_double_vector_rdd(data)
dataBytes = _get_unmangled_labeled_point_rdd(data)
ans = train_func(dataBytes, _serialize_double_vector(initial_weights))
if len(ans) != 2:
raise RuntimeError("JVM call result had unexpected length")
@ -220,6 +339,9 @@ def _regression_train_wrapper(sc, train_func, klass, data, initial_weights):
+ type(ans[0]).__name__ + " which is not float")
return klass(_deserialize_double_vector(ans[0]), ans[1])
# Functions for serializing ALS Rating objects and tuples
def _serialize_rating(r):
ba = bytearray(16)
intpart = ndarray(shape=[2], buffer=ba, dtype=int32)
@ -227,11 +349,12 @@ def _serialize_rating(r):
intpart[0], intpart[1], doublepart[0] = r
return ba
class RatingDeserializer(Serializer):
def loads(self, stream):
length = struct.unpack("!i", stream.read(4))[0]
ba = stream.read(length)
res = ndarray(shape=(3, ), buffer=ba, dtype="float64", offset=4)
res = ndarray(shape=(3, ), buffer=ba, dtype=float64, offset=4)
return int(res[0]), int(res[1]), res[2]
def load_stream(self, stream):
@ -243,12 +366,86 @@ class RatingDeserializer(Serializer):
except EOFError:
return
def _serialize_tuple(t):
ba = bytearray(8)
intpart = ndarray(shape=[2], buffer=ba, dtype=int32)
intpart[0], intpart[1] = t
return ba
# Vector math functions that support all of our vector types
def _convert_vector(vec):
"""
Convert a vector to a format we support internally. This does
the following:
* For dense NumPy vectors (ndarray), returns them as is
* For our SparseVector class, returns that as is
* For Python lists, converts them to NumPy vectors
* For scipy.sparse.*_matrix column vectors, converts them to
our own SparseVector type.
This should be called before passing any data to our algorithms
or attempting to serialize it to Java.
"""
if type(vec) == ndarray or type(vec) == SparseVector:
return vec
elif type(vec) == list:
return array(vec, dtype=float64)
elif _have_scipy:
if _scipy_issparse(vec):
assert vec.shape[1] == 1, "Expected column vector"
csc = vec.tocsc()
return SparseVector(vec.shape[0], csc.indices, csc.data)
raise TypeError("Expected NumPy array, SparseVector, or scipy.sparse matrix")
def _squared_distance(v1, v2):
"""
Squared distance of two NumPy or sparse vectors.
>>> dense1 = array([1., 2.])
>>> sparse1 = SparseVector(2, [0, 1], [1., 2.])
>>> dense2 = array([2., 1.])
>>> sparse2 = SparseVector(2, [0, 1], [2., 1.])
>>> _squared_distance(dense1, dense2)
2.0
>>> _squared_distance(dense1, sparse2)
2.0
>>> _squared_distance(sparse1, dense2)
2.0
>>> _squared_distance(sparse1, sparse2)
2.0
"""
v1 = _convert_vector(v1)
v2 = _convert_vector(v2)
if type(v1) == ndarray and type(v2) == ndarray:
diff = v1 - v2
return diff.dot(diff)
elif type(v1) == ndarray:
return v2.squared_distance(v1)
else:
return v1.squared_distance(v2)
def _dot(vec, target):
"""
Compute the dot product of a vector of the types we support
(Numpy array, list, SparseVector, or SciPy sparse) and a target
NumPy array that is either 1- or 2-dimensional. Equivalent to
calling numpy.dot of the two vectors, but for SciPy ones, we
have to transpose them because they're column vectors.
"""
if type(vec) == ndarray or type(vec) == SparseVector:
return vec.dot(target)
elif type(vec) == list:
return _convert_vector(vec).dot(target)
else:
return vec.transpose().dot(target)[0]
def _test():
import doctest
globs = globals().copy()
@ -259,5 +456,6 @@ def _test():
if failure_count:
exit(-1)
if __name__ == "__main__":
_test()

View file

@ -17,30 +17,55 @@
import numpy
from numpy import array, dot, shape
from numpy import array, shape
from pyspark import SparkContext
from pyspark.mllib._common import \
_get_unmangled_rdd, _get_unmangled_double_vector_rdd, \
_dot, _get_unmangled_rdd, _get_unmangled_double_vector_rdd, \
_serialize_double_matrix, _deserialize_double_matrix, \
_serialize_double_vector, _deserialize_double_vector, \
_get_initial_weights, _serialize_rating, _regression_train_wrapper, \
LinearModel, _linear_predictor_typecheck
_linear_predictor_typecheck, _get_unmangled_labeled_point_rdd
from pyspark.mllib.linalg import SparseVector
from pyspark.mllib.regression import LabeledPoint, LinearModel
from math import exp, log
class LogisticRegressionModel(LinearModel):
"""A linear binary classification model derived from logistic regression.
>>> data = array([0.0, 0.0, 1.0, 1.0, 1.0, 2.0, 1.0, 3.0]).reshape(4,2)
>>> data = [
... LabeledPoint(0.0, [0.0]),
... LabeledPoint(1.0, [1.0]),
... LabeledPoint(1.0, [2.0]),
... LabeledPoint(1.0, [3.0])
... ]
>>> lrm = LogisticRegressionWithSGD.train(sc.parallelize(data))
>>> lrm.predict(array([1.0])) > 0
True
>>> lrm.predict(array([0.0])) <= 0
True
>>> sparse_data = [
... LabeledPoint(0.0, SparseVector(2, {0: 0.0})),
... LabeledPoint(1.0, SparseVector(2, {1: 1.0})),
... LabeledPoint(0.0, SparseVector(2, {0: 0.0})),
... LabeledPoint(1.0, SparseVector(2, {1: 2.0}))
... ]
>>> lrm = LogisticRegressionWithSGD.train(sc.parallelize(sparse_data))
>>> lrm.predict(array([0.0, 1.0])) > 0
True
>>> lrm.predict(array([0.0, 0.0])) <= 0
True
>>> lrm.predict(SparseVector(2, {1: 1.0})) > 0
True
>>> lrm.predict(SparseVector(2, {1: 0.0})) <= 0
True
"""
def predict(self, x):
_linear_predictor_typecheck(x, self._coeff)
margin = dot(x, self._coeff) + self._intercept
margin = _dot(x, self._coeff) + self._intercept
prob = 1/(1 + exp(-margin))
return 1 if prob > 0.5 else 0
class LogisticRegressionWithSGD(object):
@classmethod
def train(cls, data, iterations=100, step=1.0,
@ -55,14 +80,30 @@ class LogisticRegressionWithSGD(object):
class SVMModel(LinearModel):
"""A support vector machine.
>>> data = array([0.0, 0.0, 1.0, 1.0, 1.0, 2.0, 1.0, 3.0]).reshape(4,2)
>>> data = [
... LabeledPoint(0.0, [0.0]),
... LabeledPoint(1.0, [1.0]),
... LabeledPoint(1.0, [2.0]),
... LabeledPoint(1.0, [3.0])
... ]
>>> svm = SVMWithSGD.train(sc.parallelize(data))
>>> svm.predict(array([1.0])) > 0
True
>>> sparse_data = [
... LabeledPoint(0.0, SparseVector(2, {0: 0.0})),
... LabeledPoint(1.0, SparseVector(2, {1: 1.0})),
... LabeledPoint(0.0, SparseVector(2, {0: 0.0})),
... LabeledPoint(1.0, SparseVector(2, {1: 2.0}))
... ]
>>> svm = SVMWithSGD.train(sc.parallelize(sparse_data))
>>> svm.predict(SparseVector(2, {1: 1.0})) > 0
True
>>> svm.predict(SparseVector(2, {1: 0.0})) <= 0
True
"""
def predict(self, x):
_linear_predictor_typecheck(x, self._coeff)
margin = dot(x, self._coeff) + self._intercept
margin = _dot(x, self._coeff) + self._intercept
return 1 if margin >= 0 else 0
class SVMWithSGD(object):
@ -84,12 +125,26 @@ class NaiveBayesModel(object):
- pi: vector of logs of class priors (dimension C)
- theta: matrix of logs of class conditional probabilities (CxD)
>>> data = array([0.0, 0.0, 1.0, 0.0, 0.0, 2.0, 1.0, 1.0, 0.0]).reshape(3,3)
>>> data = [
... LabeledPoint(0.0, [0.0, 0.0]),
... LabeledPoint(0.0, [0.0, 1.0]),
... LabeledPoint(1.0, [1.0, 0.0]),
... ]
>>> model = NaiveBayes.train(sc.parallelize(data))
>>> model.predict(array([0.0, 1.0]))
0.0
>>> model.predict(array([1.0, 0.0]))
1.0
>>> sparse_data = [
... LabeledPoint(0.0, SparseVector(2, {1: 0.0})),
... LabeledPoint(0.0, SparseVector(2, {1: 1.0})),
... LabeledPoint(1.0, SparseVector(2, {0: 1.0}))
... ]
>>> model = NaiveBayes.train(sc.parallelize(sparse_data))
>>> model.predict(SparseVector(2, {1: 1.0}))
0.0
>>> model.predict(SparseVector(2, {0: 1.0}))
1.0
"""
def __init__(self, labels, pi, theta):
@ -99,7 +154,7 @@ class NaiveBayesModel(object):
def predict(self, x):
"""Return the most likely class for a data vector x"""
return self.labels[numpy.argmax(self.pi + dot(x, self.theta))]
return self.labels[numpy.argmax(self.pi + _dot(x, self.theta))]
class NaiveBayes(object):
@classmethod
@ -119,7 +174,7 @@ class NaiveBayes(object):
@param lambda_: The smoothing parameter
"""
sc = data.context
dataBytes = _get_unmangled_double_vector_rdd(data)
dataBytes = _get_unmangled_labeled_point_rdd(data)
ans = sc._jvm.PythonMLLibAPI().trainNaiveBayes(dataBytes._jrdd, lambda_)
return NaiveBayesModel(
_deserialize_double_vector(ans[0]),

View file

@ -19,37 +19,61 @@ from numpy import array, dot
from math import sqrt
from pyspark import SparkContext
from pyspark.mllib._common import \
_get_unmangled_rdd, _get_unmangled_double_vector_rdd, \
_get_unmangled_rdd, _get_unmangled_double_vector_rdd, _squared_distance, \
_serialize_double_matrix, _deserialize_double_matrix, \
_serialize_double_vector, _deserialize_double_vector, \
_get_initial_weights, _serialize_rating, _regression_train_wrapper
from pyspark.mllib.linalg import SparseVector
class KMeansModel(object):
"""A clustering model derived from the k-means method.
>>> data = array([0.0,0.0, 1.0,1.0, 9.0,8.0, 8.0,9.0]).reshape(4,2)
>>> clusters = KMeans.train(sc.parallelize(data), 2, maxIterations=10, runs=30, initializationMode="random")
>>> clusters.predict(array([0.0, 0.0])) == clusters.predict(array([1.0, 1.0]))
>>> model = KMeans.train(sc.parallelize(data), 2, maxIterations=10, runs=30, initializationMode="random")
>>> model.predict(array([0.0, 0.0])) == model.predict(array([1.0, 1.0]))
True
>>> clusters.predict(array([8.0, 9.0])) == clusters.predict(array([9.0, 8.0]))
>>> model.predict(array([8.0, 9.0])) == model.predict(array([9.0, 8.0]))
True
>>> clusters = KMeans.train(sc.parallelize(data), 2)
>>> model = KMeans.train(sc.parallelize(data), 2)
>>> sparse_data = [
... SparseVector(3, {1: 1.0}),
... SparseVector(3, {1: 1.1}),
... SparseVector(3, {2: 1.0}),
... SparseVector(3, {2: 1.1})
... ]
>>> model = KMeans.train(sc.parallelize(sparse_data), 2, initializationMode="k-means||")
>>> model.predict(array([0., 1., 0.])) == model.predict(array([0, 1.1, 0.]))
True
>>> model.predict(array([0., 0., 1.])) == model.predict(array([0, 0, 1.1]))
True
>>> model.predict(sparse_data[0]) == model.predict(sparse_data[1])
True
>>> model.predict(sparse_data[2]) == model.predict(sparse_data[3])
True
>>> type(model.clusterCenters)
<type 'list'>
"""
def __init__(self, centers_):
self.centers = centers_
def __init__(self, centers):
self.centers = centers
@property
def clusterCenters(self):
"""Get the cluster centers, represented as a list of NumPy arrays."""
return self.centers
def predict(self, x):
"""Find the cluster to which x belongs in this model."""
best = 0
best_distance = 1e75
for i in range(0, self.centers.shape[0]):
diff = x - self.centers[i]
distance = sqrt(dot(diff, diff))
best_distance = float("inf")
for i in range(0, len(self.centers)):
distance = _squared_distance(x, self.centers[i])
if distance < best_distance:
best = i
best_distance = distance
return best
class KMeans(object):
@classmethod
def train(cls, data, k, maxIterations=100, runs=1,
@ -64,7 +88,9 @@ class KMeans(object):
elif type(ans[0]) != bytearray:
raise RuntimeError("JVM call result had first element of type "
+ type(ans[0]) + " which is not bytearray")
return KMeansModel(_deserialize_double_matrix(ans[0]))
matrix = _deserialize_double_matrix(ans[0])
return KMeansModel([row for row in matrix])
def _test():
import doctest
@ -76,5 +102,6 @@ def _test():
if failure_count:
exit(-1)
if __name__ == "__main__":
_test()

View file

@ -0,0 +1,245 @@
#
# Licensed to the Apache Software Foundation (ASF) under one or more
# contributor license agreements. See the NOTICE file distributed with
# this work for additional information regarding copyright ownership.
# The ASF licenses this file to You under the Apache License, Version 2.0
# (the "License"); you may not use this file except in compliance with
# the License. You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.
#
"""
MLlib utilities for linear algebra. For dense vectors, MLlib
uses the NumPy C{array} type, so you can simply pass NumPy arrays
around. For sparse vectors, users can construct a L{SparseVector}
object from MLlib or pass SciPy C{scipy.sparse} column vectors if
SciPy is available in their environment.
"""
from numpy import array, array_equal, ndarray, float64, int32
class SparseVector(object):
"""
A simple sparse vector class for passing data to MLlib. Users may
alternatively pass SciPy's {scipy.sparse} data types.
"""
def __init__(self, size, *args):
"""
Create a sparse vector, using either a dictionary, a list of
(index, value) pairs, or two separate arrays of indices and
values (sorted by index).
@param size: Size of the vector.
@param args: Non-zero entries, as a dictionary, list of tupes,
or two sorted lists containing indices and values.
>>> print SparseVector(4, {1: 1.0, 3: 5.5})
[1: 1.0, 3: 5.5]
>>> print SparseVector(4, [(1, 1.0), (3, 5.5)])
[1: 1.0, 3: 5.5]
>>> print SparseVector(4, [1, 3], [1.0, 5.5])
[1: 1.0, 3: 5.5]
"""
assert type(size) == int, "first argument must be an int"
self.size = size
assert 1 <= len(args) <= 2, "must pass either 2 or 3 arguments"
if len(args) == 1:
pairs = args[0]
if type(pairs) == dict:
pairs = pairs.items()
pairs = sorted(pairs)
self.indices = array([p[0] for p in pairs], dtype=int32)
self.values = array([p[1] for p in pairs], dtype=float64)
else:
assert len(args[0]) == len(args[1]), "index and value arrays not same length"
self.indices = array(args[0], dtype=int32)
self.values = array(args[1], dtype=float64)
for i in xrange(len(self.indices) - 1):
if self.indices[i] >= self.indices[i + 1]:
raise TypeError("indices array must be sorted")
def dot(self, other):
"""
Dot product with a SparseVector or 1- or 2-dimensional Numpy array.
>>> a = SparseVector(4, [1, 3], [3.0, 4.0])
>>> a.dot(a)
25.0
>>> a.dot(array([1., 2., 3., 4.]))
22.0
>>> b = SparseVector(4, [2, 4], [1.0, 2.0])
>>> a.dot(b)
0.0
>>> a.dot(array([[1, 1], [2, 2], [3, 3], [4, 4]]))
array([ 22., 22.])
"""
if type(other) == ndarray:
if other.ndim == 1:
result = 0.0
for i in xrange(len(self.indices)):
result += self.values[i] * other[self.indices[i]]
return result
elif other.ndim == 2:
results = [self.dot(other[:,i]) for i in xrange(other.shape[1])]
return array(results)
else:
raise Exception("Cannot call dot with %d-dimensional array" % other.ndim)
else:
result = 0.0
i, j = 0, 0
while i < len(self.indices) and j < len(other.indices):
if self.indices[i] == other.indices[j]:
result += self.values[i] * other.values[j]
i += 1
j += 1
elif self.indices[i] < other.indices[j]:
i += 1
else:
j += 1
return result
def squared_distance(self, other):
"""
Squared distance from a SparseVector or 1-dimensional NumPy array.
>>> a = SparseVector(4, [1, 3], [3.0, 4.0])
>>> a.squared_distance(a)
0.0
>>> a.squared_distance(array([1., 2., 3., 4.]))
11.0
>>> b = SparseVector(4, [2, 4], [1.0, 2.0])
>>> a.squared_distance(b)
30.0
>>> b.squared_distance(a)
30.0
"""
if type(other) == ndarray:
if other.ndim == 1:
result = 0.0
j = 0 # index into our own array
for i in xrange(other.shape[0]):
if j < len(self.indices) and self.indices[j] == i:
diff = self.values[j] - other[i]
result += diff * diff
j += 1
else:
result += other[i] * other[i]
return result
else:
raise Exception("Cannot call squared_distance with %d-dimensional array" %
other.ndim)
else:
result = 0.0
i, j = 0, 0
while i < len(self.indices) and j < len(other.indices):
if self.indices[i] == other.indices[j]:
diff = self.values[i] - other.values[j]
result += diff * diff
i += 1
j += 1
elif self.indices[i] < other.indices[j]:
result += self.values[i] * self.values[i]
i += 1
else:
result += other.values[j] * other.values[j]
j += 1
while i < len(self.indices):
result += self.values[i] * self.values[i]
i += 1
while j < len(other.indices):
result += other.values[j] * other.values[j]
j += 1
return result
def __str__(self):
inds = self.indices
vals = self.values
entries = ", ".join(["{0}: {1}".format(inds[i], vals[i]) for i in xrange(len(inds))])
return "[" + entries + "]"
def __repr__(self):
inds = self.indices
vals = self.values
entries = ", ".join(["{0}: {1}".format(inds[i], vals[i]) for i in xrange(len(inds))])
return "SparseVector({0}, {{{1}}})".format(self.size, entries)
def __eq__(self, other):
"""
Test SparseVectors for equality.
>>> v1 = SparseVector(4, [(1, 1.0), (3, 5.5)])
>>> v2 = SparseVector(4, [(1, 1.0), (3, 5.5)])
>>> v1 == v2
True
>>> v1 != v2
False
"""
return (isinstance(other, self.__class__)
and other.size == self.size
and array_equal(other.indices, self.indices)
and array_equal(other.values, self.values))
def __ne__(self, other):
return not self.__eq__(other)
class Vectors(object):
"""
Factory methods for working with vectors. Note that dense vectors
are simply represented as NumPy array objects, so there is no need
to covert them for use in MLlib. For sparse vectors, the factory
methods in this class create an MLlib-compatible type, or users
can pass in SciPy's C{scipy.sparse} column vectors.
"""
@staticmethod
def sparse(size, *args):
"""
Create a sparse vector, using either a dictionary, a list of
(index, value) pairs, or two separate arrays of indices and
values (sorted by index).
@param size: Size of the vector.
@param args: Non-zero entries, as a dictionary, list of tupes,
or two sorted lists containing indices and values.
>>> print Vectors.sparse(4, {1: 1.0, 3: 5.5})
[1: 1.0, 3: 5.5]
>>> print Vectors.sparse(4, [(1, 1.0), (3, 5.5)])
[1: 1.0, 3: 5.5]
>>> print Vectors.sparse(4, [1, 3], [1.0, 5.5])
[1: 1.0, 3: 5.5]
"""
return SparseVector(size, *args)
@staticmethod
def dense(elements):
"""
Create a dense vector of 64-bit floats from a Python list. Always
returns a NumPy array.
>>> Vectors.dense([1, 2, 3])
array([ 1., 2., 3.])
"""
return array(elements, dtype=float64)
def _test():
import doctest
(failure_count, test_count) = doctest.testmod(optionflags=doctest.ELLIPSIS)
if failure_count:
exit(-1)
if __name__ == "__main__":
_test()

View file

@ -15,41 +15,98 @@
# limitations under the License.
#
from numpy import array, dot
from numpy import array, ndarray
from pyspark import SparkContext
from pyspark.mllib._common import \
_get_unmangled_rdd, _get_unmangled_double_vector_rdd, \
_dot, _get_unmangled_rdd, _get_unmangled_double_vector_rdd, \
_serialize_double_matrix, _deserialize_double_matrix, \
_serialize_double_vector, _deserialize_double_vector, \
_get_initial_weights, _serialize_rating, _regression_train_wrapper, \
_linear_predictor_typecheck
_linear_predictor_typecheck, _have_scipy, _scipy_issparse
from pyspark.mllib.linalg import SparseVector
class LabeledPoint(object):
"""
The features and labels of a data point.
@param label: Label for this data point.
@param features: Vector of features for this point (NumPy array, list,
pyspark.mllib.linalg.SparseVector, or scipy.sparse column matrix)
"""
def __init__(self, label, features):
self.label = label
if (type(features) == ndarray or type(features) == SparseVector
or (_have_scipy and _scipy_issparse(features))):
self.features = features
elif type(features) == list:
self.features = array(features)
else:
raise TypeError("Expected NumPy array, list, SparseVector, or scipy.sparse matrix")
class LinearModel(object):
"""Something that has a vector of coefficients and an intercept."""
def __init__(self, coeff, intercept):
self._coeff = coeff
"""A linear model that has a vector of coefficients and an intercept."""
def __init__(self, weights, intercept):
self._coeff = weights
self._intercept = intercept
@property
def weights(self):
return self._coeff
@property
def intercept(self):
return self._intercept
class LinearRegressionModelBase(LinearModel):
"""A linear regression model.
>>> lrmb = LinearRegressionModelBase(array([1.0, 2.0]), 0.1)
>>> abs(lrmb.predict(array([-1.03, 7.777])) - 14.624) < 1e-6
True
>>> abs(lrmb.predict(SparseVector(2, {0: -1.03, 1: 7.777})) - 14.624) < 1e-6
True
"""
def predict(self, x):
"""Predict the value of the dependent variable given a vector x"""
"""containing values for the independent variables."""
_linear_predictor_typecheck(x, self._coeff)
return dot(self._coeff, x) + self._intercept
return _dot(x, self._coeff) + self._intercept
class LinearRegressionModel(LinearRegressionModelBase):
"""A linear regression model derived from a least-squares fit.
>>> data = array([0.0, 0.0, 1.0, 1.0, 3.0, 2.0, 2.0, 3.0]).reshape(4,2)
>>> from pyspark.mllib.regression import LabeledPoint
>>> data = [
... LabeledPoint(0.0, [0.0]),
... LabeledPoint(1.0, [1.0]),
... LabeledPoint(3.0, [2.0]),
... LabeledPoint(2.0, [3.0])
... ]
>>> lrm = LinearRegressionWithSGD.train(sc.parallelize(data), initialWeights=array([1.0]))
>>> abs(lrm.predict(array([0.0])) - 0) < 0.5
True
>>> abs(lrm.predict(array([1.0])) - 1) < 0.5
True
>>> abs(lrm.predict(SparseVector(1, {0: 1.0})) - 1) < 0.5
True
>>> data = [
... LabeledPoint(0.0, SparseVector(1, {0: 0.0})),
... LabeledPoint(1.0, SparseVector(1, {0: 1.0})),
... LabeledPoint(3.0, SparseVector(1, {0: 2.0})),
... LabeledPoint(2.0, SparseVector(1, {0: 3.0}))
... ]
>>> lrm = LinearRegressionWithSGD.train(sc.parallelize(data), initialWeights=array([1.0]))
>>> abs(lrm.predict(array([0.0])) - 0) < 0.5
True
>>> abs(lrm.predict(SparseVector(1, {0: 1.0})) - 1) < 0.5
True
"""
class LinearRegressionWithSGD(object):
@classmethod
def train(cls, data, iterations=100, step=1.0,
@ -61,14 +118,39 @@ class LinearRegressionWithSGD(object):
d._jrdd, iterations, step, miniBatchFraction, i),
LinearRegressionModel, data, initialWeights)
class LassoModel(LinearRegressionModelBase):
"""A linear regression model derived from a least-squares fit with an
l_1 penalty term.
>>> data = array([0.0, 0.0, 1.0, 1.0, 3.0, 2.0, 2.0, 3.0]).reshape(4,2)
>>> from pyspark.mllib.regression import LabeledPoint
>>> data = [
... LabeledPoint(0.0, [0.0]),
... LabeledPoint(1.0, [1.0]),
... LabeledPoint(3.0, [2.0]),
... LabeledPoint(2.0, [3.0])
... ]
>>> lrm = LassoWithSGD.train(sc.parallelize(data), initialWeights=array([1.0]))
>>> abs(lrm.predict(array([0.0])) - 0) < 0.5
True
>>> abs(lrm.predict(array([1.0])) - 1) < 0.5
True
>>> abs(lrm.predict(SparseVector(1, {0: 1.0})) - 1) < 0.5
True
>>> data = [
... LabeledPoint(0.0, SparseVector(1, {0: 0.0})),
... LabeledPoint(1.0, SparseVector(1, {0: 1.0})),
... LabeledPoint(3.0, SparseVector(1, {0: 2.0})),
... LabeledPoint(2.0, SparseVector(1, {0: 3.0}))
... ]
>>> lrm = LinearRegressionWithSGD.train(sc.parallelize(data), initialWeights=array([1.0]))
>>> abs(lrm.predict(array([0.0])) - 0) < 0.5
True
>>> abs(lrm.predict(SparseVector(1, {0: 1.0})) - 1) < 0.5
True
"""
class LassoWithSGD(object):
@classmethod
def train(cls, data, iterations=100, step=1.0, regParam=1.0,
@ -80,14 +162,39 @@ class LassoWithSGD(object):
iterations, step, regParam, miniBatchFraction, i),
LassoModel, data, initialWeights)
class RidgeRegressionModel(LinearRegressionModelBase):
"""A linear regression model derived from a least-squares fit with an
l_2 penalty term.
>>> data = array([0.0, 0.0, 1.0, 1.0, 3.0, 2.0, 2.0, 3.0]).reshape(4,2)
>>> from pyspark.mllib.regression import LabeledPoint
>>> data = [
... LabeledPoint(0.0, [0.0]),
... LabeledPoint(1.0, [1.0]),
... LabeledPoint(3.0, [2.0]),
... LabeledPoint(2.0, [3.0])
... ]
>>> lrm = RidgeRegressionWithSGD.train(sc.parallelize(data), initialWeights=array([1.0]))
>>> abs(lrm.predict(array([0.0])) - 0) < 0.5
True
>>> abs(lrm.predict(array([1.0])) - 1) < 0.5
True
>>> abs(lrm.predict(SparseVector(1, {0: 1.0})) - 1) < 0.5
True
>>> data = [
... LabeledPoint(0.0, SparseVector(1, {0: 0.0})),
... LabeledPoint(1.0, SparseVector(1, {0: 1.0})),
... LabeledPoint(3.0, SparseVector(1, {0: 2.0})),
... LabeledPoint(2.0, SparseVector(1, {0: 3.0}))
... ]
>>> lrm = LinearRegressionWithSGD.train(sc.parallelize(data), initialWeights=array([1.0]))
>>> abs(lrm.predict(array([0.0])) - 0) < 0.5
True
>>> abs(lrm.predict(SparseVector(1, {0: 1.0})) - 1) < 0.5
True
"""
class RidgeRegressionWithSGD(object):
@classmethod
def train(cls, data, iterations=100, step=1.0, regParam=1.0,
@ -99,6 +206,7 @@ class RidgeRegressionWithSGD(object):
iterations, step, regParam, miniBatchFraction, i),
RidgeRegressionModel, data, initialWeights)
def _test():
import doctest
globs = globals().copy()

View file

@ -0,0 +1,302 @@
#
# Licensed to the Apache Software Foundation (ASF) under one or more
# contributor license agreements. See the NOTICE file distributed with
# this work for additional information regarding copyright ownership.
# The ASF licenses this file to You under the Apache License, Version 2.0
# (the "License"); you may not use this file except in compliance with
# the License. You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.
#
"""
Fuller unit tests for Python MLlib.
"""
from numpy import array, array_equal
import unittest
from pyspark.mllib._common import _convert_vector, _serialize_double_vector, \
_deserialize_double_vector, _dot, _squared_distance
from pyspark.mllib.linalg import SparseVector
from pyspark.mllib.regression import LabeledPoint
from pyspark.tests import PySparkTestCase
_have_scipy = False
try:
import scipy.sparse
_have_scipy = True
except:
# No SciPy, but that's okay, we'll skip those tests
pass
class VectorTests(unittest.TestCase):
def test_serialize(self):
sv = SparseVector(4, {1: 1, 3: 2})
dv = array([1., 2., 3., 4.])
lst = [1, 2, 3, 4]
self.assertTrue(sv is _convert_vector(sv))
self.assertTrue(dv is _convert_vector(dv))
self.assertTrue(array_equal(dv, _convert_vector(lst)))
self.assertEquals(sv,
_deserialize_double_vector(_serialize_double_vector(sv)))
self.assertTrue(array_equal(dv,
_deserialize_double_vector(_serialize_double_vector(dv))))
self.assertTrue(array_equal(dv,
_deserialize_double_vector(_serialize_double_vector(lst))))
def test_dot(self):
sv = SparseVector(4, {1: 1, 3: 2})
dv = array([1., 2., 3., 4.])
lst = [1, 2, 3, 4]
mat = array([[1., 2., 3., 4.],
[1., 2., 3., 4.],
[1., 2., 3., 4.],
[1., 2., 3., 4.]])
self.assertEquals(10.0, _dot(sv, dv))
self.assertTrue(array_equal(array([3., 6., 9., 12.]), _dot(sv, mat)))
self.assertEquals(30.0, _dot(dv, dv))
self.assertTrue(array_equal(array([10., 20., 30., 40.]), _dot(dv, mat)))
self.assertEquals(30.0, _dot(lst, dv))
self.assertTrue(array_equal(array([10., 20., 30., 40.]), _dot(lst, mat)))
def test_squared_distance(self):
sv = SparseVector(4, {1: 1, 3: 2})
dv = array([1., 2., 3., 4.])
lst = [4, 3, 2, 1]
self.assertEquals(15.0, _squared_distance(sv, dv))
self.assertEquals(25.0, _squared_distance(sv, lst))
self.assertEquals(20.0, _squared_distance(dv, lst))
self.assertEquals(15.0, _squared_distance(dv, sv))
self.assertEquals(25.0, _squared_distance(lst, sv))
self.assertEquals(20.0, _squared_distance(lst, dv))
self.assertEquals(0.0, _squared_distance(sv, sv))
self.assertEquals(0.0, _squared_distance(dv, dv))
self.assertEquals(0.0, _squared_distance(lst, lst))
class ListTests(PySparkTestCase):
"""
Test MLlib algorithms on plain lists, to make sure they're passed through
as NumPy arrays.
"""
def test_clustering(self):
from pyspark.mllib.clustering import KMeans
data = [
[0, 1.1],
[0, 1.2],
[1.1, 0],
[1.2, 0],
]
clusters = KMeans.train(self.sc.parallelize(data), 2, initializationMode="k-means||")
self.assertEquals(clusters.predict(data[0]), clusters.predict(data[1]))
self.assertEquals(clusters.predict(data[2]), clusters.predict(data[3]))
def test_classification(self):
from pyspark.mllib.classification import LogisticRegressionWithSGD, SVMWithSGD, NaiveBayes
data = [
LabeledPoint(0.0, [1, 0]),
LabeledPoint(1.0, [0, 1]),
LabeledPoint(0.0, [2, 0]),
LabeledPoint(1.0, [0, 2])
]
rdd = self.sc.parallelize(data)
features = [p.features.tolist() for p in data]
lr_model = LogisticRegressionWithSGD.train(rdd)
self.assertTrue(lr_model.predict(features[0]) <= 0)
self.assertTrue(lr_model.predict(features[1]) > 0)
self.assertTrue(lr_model.predict(features[2]) <= 0)
self.assertTrue(lr_model.predict(features[3]) > 0)
svm_model = SVMWithSGD.train(rdd)
self.assertTrue(svm_model.predict(features[0]) <= 0)
self.assertTrue(svm_model.predict(features[1]) > 0)
self.assertTrue(svm_model.predict(features[2]) <= 0)
self.assertTrue(svm_model.predict(features[3]) > 0)
nb_model = NaiveBayes.train(rdd)
self.assertTrue(nb_model.predict(features[0]) <= 0)
self.assertTrue(nb_model.predict(features[1]) > 0)
self.assertTrue(nb_model.predict(features[2]) <= 0)
self.assertTrue(nb_model.predict(features[3]) > 0)
def test_regression(self):
from pyspark.mllib.regression import LinearRegressionWithSGD, LassoWithSGD, \
RidgeRegressionWithSGD
data = [
LabeledPoint(-1.0, [0, -1]),
LabeledPoint(1.0, [0, 1]),
LabeledPoint(-1.0, [0, -2]),
LabeledPoint(1.0, [0, 2])
]
rdd = self.sc.parallelize(data)
features = [p.features.tolist() for p in data]
lr_model = LinearRegressionWithSGD.train(rdd)
self.assertTrue(lr_model.predict(features[0]) <= 0)
self.assertTrue(lr_model.predict(features[1]) > 0)
self.assertTrue(lr_model.predict(features[2]) <= 0)
self.assertTrue(lr_model.predict(features[3]) > 0)
lasso_model = LassoWithSGD.train(rdd)
self.assertTrue(lasso_model.predict(features[0]) <= 0)
self.assertTrue(lasso_model.predict(features[1]) > 0)
self.assertTrue(lasso_model.predict(features[2]) <= 0)
self.assertTrue(lasso_model.predict(features[3]) > 0)
rr_model = RidgeRegressionWithSGD.train(rdd)
self.assertTrue(rr_model.predict(features[0]) <= 0)
self.assertTrue(rr_model.predict(features[1]) > 0)
self.assertTrue(rr_model.predict(features[2]) <= 0)
self.assertTrue(rr_model.predict(features[3]) > 0)
@unittest.skipIf(not _have_scipy, "SciPy not installed")
class SciPyTests(PySparkTestCase):
"""
Test both vector operations and MLlib algorithms with SciPy sparse matrices,
if SciPy is available.
"""
def test_serialize(self):
from scipy.sparse import lil_matrix
lil = lil_matrix((4, 1))
lil[1, 0] = 1
lil[3, 0] = 2
sv = SparseVector(4, {1: 1, 3: 2})
self.assertEquals(sv, _convert_vector(lil))
self.assertEquals(sv, _convert_vector(lil.tocsc()))
self.assertEquals(sv, _convert_vector(lil.tocoo()))
self.assertEquals(sv, _convert_vector(lil.tocsr()))
self.assertEquals(sv, _convert_vector(lil.todok()))
self.assertEquals(sv,
_deserialize_double_vector(_serialize_double_vector(lil)))
self.assertEquals(sv,
_deserialize_double_vector(_serialize_double_vector(lil.tocsc())))
self.assertEquals(sv,
_deserialize_double_vector(_serialize_double_vector(lil.tocsr())))
self.assertEquals(sv,
_deserialize_double_vector(_serialize_double_vector(lil.todok())))
def test_dot(self):
from scipy.sparse import lil_matrix
lil = lil_matrix((4, 1))
lil[1, 0] = 1
lil[3, 0] = 2
dv = array([1., 2., 3., 4.])
sv = SparseVector(4, {0: 1, 1: 2, 2: 3, 3: 4})
mat = array([[1., 2., 3., 4.],
[1., 2., 3., 4.],
[1., 2., 3., 4.],
[1., 2., 3., 4.]])
self.assertEquals(10.0, _dot(lil, dv))
self.assertTrue(array_equal(array([3., 6., 9., 12.]), _dot(lil, mat)))
def test_squared_distance(self):
from scipy.sparse import lil_matrix
lil = lil_matrix((4, 1))
lil[1, 0] = 3
lil[3, 0] = 2
dv = array([1., 2., 3., 4.])
sv = SparseVector(4, {0: 1, 1: 2, 2: 3, 3: 4})
self.assertEquals(15.0, _squared_distance(lil, dv))
self.assertEquals(15.0, _squared_distance(lil, sv))
self.assertEquals(15.0, _squared_distance(dv, lil))
self.assertEquals(15.0, _squared_distance(sv, lil))
def scipy_matrix(self, size, values):
"""Create a column SciPy matrix from a dictionary of values"""
from scipy.sparse import lil_matrix
lil = lil_matrix((size, 1))
for key, value in values.items():
lil[key, 0] = value
return lil
def test_clustering(self):
from pyspark.mllib.clustering import KMeans
data = [
self.scipy_matrix(3, {1: 1.0}),
self.scipy_matrix(3, {1: 1.1}),
self.scipy_matrix(3, {2: 1.0}),
self.scipy_matrix(3, {2: 1.1})
]
clusters = KMeans.train(self.sc.parallelize(data), 2, initializationMode="k-means||")
self.assertEquals(clusters.predict(data[0]), clusters.predict(data[1]))
self.assertEquals(clusters.predict(data[2]), clusters.predict(data[3]))
def test_classification(self):
from pyspark.mllib.classification import LogisticRegressionWithSGD, SVMWithSGD, NaiveBayes
data = [
LabeledPoint(0.0, self.scipy_matrix(2, {0: 1.0})),
LabeledPoint(1.0, self.scipy_matrix(2, {1: 1.0})),
LabeledPoint(0.0, self.scipy_matrix(2, {0: 2.0})),
LabeledPoint(1.0, self.scipy_matrix(2, {1: 2.0}))
]
rdd = self.sc.parallelize(data)
features = [p.features for p in data]
lr_model = LogisticRegressionWithSGD.train(rdd)
self.assertTrue(lr_model.predict(features[0]) <= 0)
self.assertTrue(lr_model.predict(features[1]) > 0)
self.assertTrue(lr_model.predict(features[2]) <= 0)
self.assertTrue(lr_model.predict(features[3]) > 0)
svm_model = SVMWithSGD.train(rdd)
self.assertTrue(svm_model.predict(features[0]) <= 0)
self.assertTrue(svm_model.predict(features[1]) > 0)
self.assertTrue(svm_model.predict(features[2]) <= 0)
self.assertTrue(svm_model.predict(features[3]) > 0)
nb_model = NaiveBayes.train(rdd)
self.assertTrue(nb_model.predict(features[0]) <= 0)
self.assertTrue(nb_model.predict(features[1]) > 0)
self.assertTrue(nb_model.predict(features[2]) <= 0)
self.assertTrue(nb_model.predict(features[3]) > 0)
def test_regression(self):
from pyspark.mllib.regression import LinearRegressionWithSGD, LassoWithSGD, \
RidgeRegressionWithSGD
data = [
LabeledPoint(-1.0, self.scipy_matrix(2, {1: -1.0})),
LabeledPoint(1.0, self.scipy_matrix(2, {1: 1.0})),
LabeledPoint(-1.0, self.scipy_matrix(2, {1: -2.0})),
LabeledPoint(1.0, self.scipy_matrix(2, {1: 2.0}))
]
rdd = self.sc.parallelize(data)
features = [p.features for p in data]
lr_model = LinearRegressionWithSGD.train(rdd)
self.assertTrue(lr_model.predict(features[0]) <= 0)
self.assertTrue(lr_model.predict(features[1]) > 0)
self.assertTrue(lr_model.predict(features[2]) <= 0)
self.assertTrue(lr_model.predict(features[3]) > 0)
lasso_model = LassoWithSGD.train(rdd)
self.assertTrue(lasso_model.predict(features[0]) <= 0)
self.assertTrue(lasso_model.predict(features[1]) > 0)
self.assertTrue(lasso_model.predict(features[2]) <= 0)
self.assertTrue(lasso_model.predict(features[3]) > 0)
rr_model = RidgeRegressionWithSGD.train(rdd)
self.assertTrue(rr_model.predict(features[0]) <= 0)
self.assertTrue(rr_model.predict(features[1]) > 0)
self.assertTrue(rr_model.predict(features[2]) <= 0)
self.assertTrue(rr_model.predict(features[3]) > 0)
if __name__ == "__main__":
if not _have_scipy:
print "NOTE: Skipping SciPy tests as it does not seem to be installed"
unittest.main()
if not _have_scipy:
print "NOTE: SciPy tests were skipped as it does not seem to be installed"

View file

@ -34,7 +34,7 @@ rm -rf metastore warehouse
function run_test() {
SPARK_TESTING=0 $FWDIR/bin/pyspark $1 2>&1 | tee -a > unit-tests.log
FAILED=$((PIPESTATUS[0]||$FAILED))
# Fail and exit on the first test failure.
if [[ $FAILED != 0 ]]; then
cat unit-tests.log | grep -v "^[0-9][0-9]*" # filter all lines starting with a number.
@ -57,8 +57,10 @@ run_test "pyspark/tests.py"
run_test "pyspark/mllib/_common.py"
run_test "pyspark/mllib/classification.py"
run_test "pyspark/mllib/clustering.py"
run_test "pyspark/mllib/linalg.py"
run_test "pyspark/mllib/recommendation.py"
run_test "pyspark/mllib/regression.py"
run_test "pyspark/mllib/tests.py"
if [[ $FAILED == 0 ]]; then
echo -en "\033[32m" # Green