[SPARK-6267] [MLLIB] Python API for IsotonicRegression
https://issues.apache.org/jira/browse/SPARK-6267
Author: Yanbo Liang <ybliang8@gmail.com>
Author: Xiangrui Meng <meng@databricks.com>
Closes #5890 from yanboliang/spark-6267 and squashes the following commits:
f20541d [Yanbo Liang] Merge pull request #3 from mengxr/SPARK-6267
7f202f9 [Xiangrui Meng] use Vector to have the best Python 2&3 compatibility
4bccfee [Yanbo Liang] fix doctest
ec09412 [Yanbo Liang] fix typos
8214bbb [Yanbo Liang] fix code style
5c8ebe5 [Yanbo Liang] Python API for IsotonicRegression
(cherry picked from commit 7b1457839b
)
Signed-off-by: Xiangrui Meng <meng@databricks.com>
This commit is contained in:
parent
8aa6681d5f
commit
384ac3c111
|
@ -282,6 +282,24 @@ private[python] class PythonMLLibAPI extends Serializable {
|
||||||
map(_.asInstanceOf[Object]).asJava
|
map(_.asInstanceOf[Object]).asJava
|
||||||
}
|
}
|
||||||
|
|
||||||
|
/**
|
||||||
|
* Java stub for Python mllib IsotonicRegression.run()
|
||||||
|
*/
|
||||||
|
def trainIsotonicRegressionModel(
|
||||||
|
data: JavaRDD[Vector],
|
||||||
|
isotonic: Boolean): JList[Object] = {
|
||||||
|
val isotonicRegressionAlg = new IsotonicRegression().setIsotonic(isotonic)
|
||||||
|
val input = data.rdd.map { x =>
|
||||||
|
(x(0), x(1), x(2))
|
||||||
|
}.persist(StorageLevel.MEMORY_AND_DISK)
|
||||||
|
try {
|
||||||
|
val model = isotonicRegressionAlg.run(input)
|
||||||
|
List[AnyRef](model.boundaryVector, model.predictionVector).asJava
|
||||||
|
} finally {
|
||||||
|
data.rdd.unpersist(blocking = false)
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
/**
|
/**
|
||||||
* Java stub for Python mllib KMeans.run()
|
* Java stub for Python mllib KMeans.run()
|
||||||
*/
|
*/
|
||||||
|
|
|
@ -21,18 +21,20 @@ import java.io.Serializable
|
||||||
import java.lang.{Double => JDouble}
|
import java.lang.{Double => JDouble}
|
||||||
import java.util.Arrays.binarySearch
|
import java.util.Arrays.binarySearch
|
||||||
|
|
||||||
|
import scala.collection.JavaConverters._
|
||||||
import scala.collection.mutable.ArrayBuffer
|
import scala.collection.mutable.ArrayBuffer
|
||||||
|
|
||||||
import org.json4s._
|
import org.json4s._
|
||||||
import org.json4s.JsonDSL._
|
import org.json4s.JsonDSL._
|
||||||
import org.json4s.jackson.JsonMethods._
|
import org.json4s.jackson.JsonMethods._
|
||||||
|
|
||||||
|
import org.apache.spark.SparkContext
|
||||||
import org.apache.spark.annotation.Experimental
|
import org.apache.spark.annotation.Experimental
|
||||||
import org.apache.spark.api.java.{JavaDoubleRDD, JavaRDD}
|
import org.apache.spark.api.java.{JavaDoubleRDD, JavaRDD}
|
||||||
|
import org.apache.spark.mllib.linalg.{Vector, Vectors}
|
||||||
import org.apache.spark.mllib.util.{Loader, Saveable}
|
import org.apache.spark.mllib.util.{Loader, Saveable}
|
||||||
import org.apache.spark.rdd.RDD
|
import org.apache.spark.rdd.RDD
|
||||||
import org.apache.spark.SparkContext
|
import org.apache.spark.sql.SQLContext
|
||||||
import org.apache.spark.sql.{DataFrame, SQLContext}
|
|
||||||
|
|
||||||
/**
|
/**
|
||||||
* :: Experimental ::
|
* :: Experimental ::
|
||||||
|
@ -57,6 +59,13 @@ class IsotonicRegressionModel (
|
||||||
assertOrdered(boundaries)
|
assertOrdered(boundaries)
|
||||||
assertOrdered(predictions)(predictionOrd)
|
assertOrdered(predictions)(predictionOrd)
|
||||||
|
|
||||||
|
/** A Java-friendly constructor that takes two Iterable parameters and one Boolean parameter. */
|
||||||
|
def this(boundaries: java.lang.Iterable[Double],
|
||||||
|
predictions: java.lang.Iterable[Double],
|
||||||
|
isotonic: java.lang.Boolean) = {
|
||||||
|
this(boundaries.asScala.toArray, predictions.asScala.toArray, isotonic)
|
||||||
|
}
|
||||||
|
|
||||||
/** Asserts the input array is monotone with the given ordering. */
|
/** Asserts the input array is monotone with the given ordering. */
|
||||||
private def assertOrdered(xs: Array[Double])(implicit ord: Ordering[Double]): Unit = {
|
private def assertOrdered(xs: Array[Double])(implicit ord: Ordering[Double]): Unit = {
|
||||||
var i = 1
|
var i = 1
|
||||||
|
@ -132,6 +141,12 @@ class IsotonicRegressionModel (
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
|
/** A convenient method for boundaries called by the Python API. */
|
||||||
|
private[mllib] def boundaryVector: Vector = Vectors.dense(boundaries)
|
||||||
|
|
||||||
|
/** A convenient method for boundaries called by the Python API. */
|
||||||
|
private[mllib] def predictionVector: Vector = Vectors.dense(predictions)
|
||||||
|
|
||||||
override def save(sc: SparkContext, path: String): Unit = {
|
override def save(sc: SparkContext, path: String): Unit = {
|
||||||
IsotonicRegressionModel.SaveLoadV1_0.save(sc, path, boundaries, predictions, isotonic)
|
IsotonicRegressionModel.SaveLoadV1_0.save(sc, path, boundaries, predictions, isotonic)
|
||||||
}
|
}
|
||||||
|
|
|
@ -18,14 +18,16 @@
|
||||||
import numpy as np
|
import numpy as np
|
||||||
from numpy import array
|
from numpy import array
|
||||||
|
|
||||||
|
from pyspark import RDD
|
||||||
from pyspark.mllib.common import callMLlibFunc, _py2java, _java2py, inherit_doc
|
from pyspark.mllib.common import callMLlibFunc, _py2java, _java2py, inherit_doc
|
||||||
from pyspark.mllib.linalg import SparseVector, _convert_to_vector
|
from pyspark.mllib.linalg import SparseVector, Vectors, _convert_to_vector
|
||||||
from pyspark.mllib.util import Saveable, Loader
|
from pyspark.mllib.util import Saveable, Loader
|
||||||
|
|
||||||
__all__ = ['LabeledPoint', 'LinearModel',
|
__all__ = ['LabeledPoint', 'LinearModel',
|
||||||
'LinearRegressionModel', 'LinearRegressionWithSGD',
|
'LinearRegressionModel', 'LinearRegressionWithSGD',
|
||||||
'RidgeRegressionModel', 'RidgeRegressionWithSGD',
|
'RidgeRegressionModel', 'RidgeRegressionWithSGD',
|
||||||
'LassoModel', 'LassoWithSGD']
|
'LassoModel', 'LassoWithSGD', 'IsotonicRegressionModel',
|
||||||
|
'IsotonicRegression']
|
||||||
|
|
||||||
|
|
||||||
class LabeledPoint(object):
|
class LabeledPoint(object):
|
||||||
|
@ -396,6 +398,73 @@ class RidgeRegressionWithSGD(object):
|
||||||
return _regression_train_wrapper(train, RidgeRegressionModel, data, initialWeights)
|
return _regression_train_wrapper(train, RidgeRegressionModel, data, initialWeights)
|
||||||
|
|
||||||
|
|
||||||
|
class IsotonicRegressionModel(Saveable, Loader):
|
||||||
|
|
||||||
|
"""Regression model for isotonic regression.
|
||||||
|
|
||||||
|
>>> data = [(1, 0, 1), (2, 1, 1), (3, 2, 1), (1, 3, 1), (6, 4, 1), (17, 5, 1), (16, 6, 1)]
|
||||||
|
>>> irm = IsotonicRegression.train(sc.parallelize(data))
|
||||||
|
>>> irm.predict(3)
|
||||||
|
2.0
|
||||||
|
>>> irm.predict(5)
|
||||||
|
16.5
|
||||||
|
>>> irm.predict(sc.parallelize([3, 5])).collect()
|
||||||
|
[2.0, 16.5]
|
||||||
|
>>> import os, tempfile
|
||||||
|
>>> path = tempfile.mkdtemp()
|
||||||
|
>>> irm.save(sc, path)
|
||||||
|
>>> sameModel = IsotonicRegressionModel.load(sc, path)
|
||||||
|
>>> sameModel.predict(3)
|
||||||
|
2.0
|
||||||
|
>>> sameModel.predict(5)
|
||||||
|
16.5
|
||||||
|
>>> try:
|
||||||
|
... os.removedirs(path)
|
||||||
|
... except OSError:
|
||||||
|
... pass
|
||||||
|
"""
|
||||||
|
|
||||||
|
def __init__(self, boundaries, predictions, isotonic):
|
||||||
|
self.boundaries = boundaries
|
||||||
|
self.predictions = predictions
|
||||||
|
self.isotonic = isotonic
|
||||||
|
|
||||||
|
def predict(self, x):
|
||||||
|
if isinstance(x, RDD):
|
||||||
|
return x.map(lambda v: self.predict(v))
|
||||||
|
return np.interp(x, self.boundaries, self.predictions)
|
||||||
|
|
||||||
|
def save(self, sc, path):
|
||||||
|
java_boundaries = _py2java(sc, self.boundaries.tolist())
|
||||||
|
java_predictions = _py2java(sc, self.predictions.tolist())
|
||||||
|
java_model = sc._jvm.org.apache.spark.mllib.regression.IsotonicRegressionModel(
|
||||||
|
java_boundaries, java_predictions, self.isotonic)
|
||||||
|
java_model.save(sc._jsc.sc(), path)
|
||||||
|
|
||||||
|
@classmethod
|
||||||
|
def load(cls, sc, path):
|
||||||
|
java_model = sc._jvm.org.apache.spark.mllib.regression.IsotonicRegressionModel.load(
|
||||||
|
sc._jsc.sc(), path)
|
||||||
|
py_boundaries = _java2py(sc, java_model.boundaryVector()).toArray()
|
||||||
|
py_predictions = _java2py(sc, java_model.predictionVector()).toArray()
|
||||||
|
return IsotonicRegressionModel(py_boundaries, py_predictions, java_model.isotonic)
|
||||||
|
|
||||||
|
|
||||||
|
class IsotonicRegression(object):
|
||||||
|
"""
|
||||||
|
Run IsotonicRegression algorithm to obtain isotonic regression model.
|
||||||
|
|
||||||
|
:param data: RDD of (label, feature, weight) tuples.
|
||||||
|
:param isotonic: Whether this is isotonic or antitonic.
|
||||||
|
"""
|
||||||
|
@classmethod
|
||||||
|
def train(cls, data, isotonic=True):
|
||||||
|
"""Train a isotonic regression model on the given data."""
|
||||||
|
boundaries, predictions = callMLlibFunc("trainIsotonicRegressionModel",
|
||||||
|
data.map(_convert_to_vector), bool(isotonic))
|
||||||
|
return IsotonicRegressionModel(boundaries.toArray(), predictions.toArray(), isotonic)
|
||||||
|
|
||||||
|
|
||||||
def _test():
|
def _test():
|
||||||
import doctest
|
import doctest
|
||||||
from pyspark import SparkContext
|
from pyspark import SparkContext
|
||||||
|
|
Loading…
Reference in a new issue