[SPARK-14300][DOCS][MLLIB] Scala MLlib examples code merge and clean up

## What changes were proposed in this pull request?

https://issues.apache.org/jira/browse/SPARK-14300

Duplicated code found in scala/examples/mllib, below all deleted in this PR:

- DenseGaussianMixture.scala
- StreamingLinearRegression.scala

## delete reasons:

#### delete: mllib/DenseGaussianMixture.scala

- duplicate of mllib/GaussianMixtureExample

#### delete: mllib/StreamingLinearRegression.scala

- duplicate of mllib/StreamingLinearRegressionExample

When merging and cleaning those code, be sure not disturb the previous example on and off blocks.

## How was this patch tested?

Test with `SKIP_API=1 jekyll` manually to make sure that works well.

Author: Xin Ren <iamshrek@126.com>

Closes #12195 from keypointt/SPARK-14300.
This commit is contained in:
Xin Ren 2016-10-26 13:33:23 -07:00 committed by Joseph K. Bradley
parent fb0a8a8dd7
commit dcdda19785
3 changed files with 19 additions and 148 deletions

View file

@ -1,75 +0,0 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
// scalastyle:off println
package org.apache.spark.examples.mllib
import org.apache.spark.{SparkConf, SparkContext}
import org.apache.spark.mllib.clustering.GaussianMixture
import org.apache.spark.mllib.linalg.Vectors
/**
* An example Gaussian Mixture Model EM app. Run with
* {{{
* ./bin/run-example mllib.DenseGaussianMixture <input> <k> <convergenceTol>
* }}}
* If you use it as a template to create your own app, please use `spark-submit` to submit your app.
*/
object DenseGaussianMixture {
def main(args: Array[String]): Unit = {
if (args.length < 3) {
println("usage: DenseGmmEM <input file> <k> <convergenceTol> [maxIterations]")
} else {
val maxIterations = if (args.length > 3) args(3).toInt else 100
run(args(0), args(1).toInt, args(2).toDouble, maxIterations)
}
}
private def run(inputFile: String, k: Int, convergenceTol: Double, maxIterations: Int) {
val conf = new SparkConf().setAppName("Gaussian Mixture Model EM example")
val ctx = new SparkContext(conf)
val data = ctx.textFile(inputFile).map { line =>
Vectors.dense(line.trim.split(' ').map(_.toDouble))
}.cache()
val clusters = new GaussianMixture()
.setK(k)
.setConvergenceTol(convergenceTol)
.setMaxIterations(maxIterations)
.run(data)
for (i <- 0 until clusters.k) {
println("weight=%f\nmu=%s\nsigma=\n%s\n" format
(clusters.weights(i), clusters.gaussians(i).mu, clusters.gaussians(i).sigma))
}
println("The membership value of each vector to all mixture components (first <= 100):")
val membership = clusters.predictSoft(data)
membership.take(100).foreach { x =>
print(" " + x.mkString(","))
}
println()
println("Cluster labels (first <= 100):")
val clusterLabels = clusters.predict(data)
clusterLabels.take(100).foreach { x =>
print(" " + x)
}
println()
}
}
// scalastyle:on println

View file

@ -1,73 +0,0 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
// scalastyle:off println
package org.apache.spark.examples.mllib
import org.apache.spark.SparkConf
import org.apache.spark.mllib.linalg.Vectors
import org.apache.spark.mllib.regression.{LabeledPoint, StreamingLinearRegressionWithSGD}
import org.apache.spark.streaming.{Seconds, StreamingContext}
/**
* Train a linear regression model on one stream of data and make predictions
* on another stream, where the data streams arrive as text files
* into two different directories.
*
* The rows of the text files must be labeled data points in the form
* `(y,[x1,x2,x3,...,xn])`
* Where n is the number of features. n must be the same for train and test.
*
* Usage: StreamingLinearRegression <trainingDir> <testDir> <batchDuration> <numFeatures>
*
* To run on your local machine using the two directories `trainingDir` and `testDir`,
* with updates every 5 seconds, and 2 features per data point, call:
* $ bin/run-example mllib.StreamingLinearRegression trainingDir testDir 5 2
*
* As you add text files to `trainingDir` the model will continuously update.
* Anytime you add text files to `testDir`, you'll see predictions from the current model.
*
*/
object StreamingLinearRegression {
def main(args: Array[String]) {
if (args.length != 4) {
System.err.println(
"Usage: StreamingLinearRegression <trainingDir> <testDir> <batchDuration> <numFeatures>")
System.exit(1)
}
val conf = new SparkConf().setMaster("local").setAppName("StreamingLinearRegression")
val ssc = new StreamingContext(conf, Seconds(args(2).toLong))
val trainingData = ssc.textFileStream(args(0)).map(LabeledPoint.parse)
val testData = ssc.textFileStream(args(1)).map(LabeledPoint.parse)
val model = new StreamingLinearRegressionWithSGD()
.setInitialWeights(Vectors.zeros(args(3).toInt))
model.trainOn(trainingData)
model.predictOnValues(testData.map(lp => (lp.label, lp.features))).print()
ssc.start()
ssc.awaitTermination()
}
}
// scalastyle:on println

View file

@ -26,6 +26,25 @@ import org.apache.spark.mllib.regression.StreamingLinearRegressionWithSGD
// $example off$ // $example off$
import org.apache.spark.streaming._ import org.apache.spark.streaming._
/**
* Train a linear regression model on one stream of data and make predictions
* on another stream, where the data streams arrive as text files
* into two different directories.
*
* The rows of the text files must be labeled data points in the form
* `(y,[x1,x2,x3,...,xn])`
* Where n is the number of features. n must be the same for train and test.
*
* Usage: StreamingLinearRegressionExample <trainingDir> <testDir>
*
* To run on your local machine using the two directories `trainingDir` and `testDir`,
* with updates every 5 seconds, and 2 features per data point, call:
* $ bin/run-example mllib.StreamingLinearRegressionExample trainingDir testDir
*
* As you add text files to `trainingDir` the model will continuously update.
* Anytime you add text files to `testDir`, you'll see predictions from the current model.
*
*/
object StreamingLinearRegressionExample { object StreamingLinearRegressionExample {
def main(args: Array[String]): Unit = { def main(args: Array[String]): Unit = {