[SPARK-16403][EXAMPLES] Cleanup to remove unused imports, consistent style, minor fixes

## What changes were proposed in this pull request?

Cleanup of examples, mostly from PySpark-ML to fix minor issues:  unused imports, style consistency, pipeline_example is a duplicate, use future print funciton, and a spelling error.

* The "Pipeline Example" is duplicated by "Simple Text Classification Pipeline" in Scala, Python, and Java.

* "Estimator Transformer Param Example" is duplicated by "Simple Params Example" in Scala, Python and Java

* Synced random_forest_classifier_example.py with Scala by adding IndexToString label converted

* Synced train_validation_split.py (in Scala ModelSelectionViaTrainValidationExample) by adjusting data split, adding grid for intercept.

* RegexTokenizer was doing nothing in tokenizer_example.py and JavaTokenizerExample.java, synced with Scala version

## How was this patch tested?
local tests and run modified examples

Author: Bryan Cutler <cutlerb@gmail.com>

Closes #14081 from BryanCutler/examples-cleanup-SPARK-16403.
This commit is contained in:
Bryan Cutler 2016-07-14 09:12:46 +01:00 committed by Sean Owen
parent 252d4f27f2
commit e3f8a03367
56 changed files with 142 additions and 646 deletions

View file

@ -60,7 +60,7 @@ public class JavaPipelineExample {
.setOutputCol("features");
LogisticRegression lr = new LogisticRegression()
.setMaxIter(10)
.setRegParam(0.01);
.setRegParam(0.001);
Pipeline pipeline = new Pipeline()
.setStages(new PipelineStage[] {tokenizer, hashingTF, lr});
@ -71,7 +71,7 @@ public class JavaPipelineExample {
Dataset<Row> test = spark.createDataFrame(Arrays.asList(
new JavaDocument(4L, "spark i j k"),
new JavaDocument(5L, "l m n"),
new JavaDocument(6L, "mapreduce spark"),
new JavaDocument(6L, "spark hadoop spark"),
new JavaDocument(7L, "apache hadoop")
), JavaDocument.class);

View file

@ -1,113 +0,0 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.spark.examples.ml;
import java.util.List;
import com.google.common.collect.Lists;
import org.apache.spark.ml.classification.LogisticRegressionModel;
import org.apache.spark.ml.param.ParamMap;
import org.apache.spark.ml.classification.LogisticRegression;
import org.apache.spark.mllib.linalg.Vectors;
import org.apache.spark.mllib.regression.LabeledPoint;
import org.apache.spark.sql.Dataset;
import org.apache.spark.sql.Row;
import org.apache.spark.sql.SparkSession;
/**
* A simple example demonstrating ways to specify parameters for Estimators and Transformers.
* Run with
* {{{
* bin/run-example ml.JavaSimpleParamsExample
* }}}
*/
public class JavaSimpleParamsExample {
public static void main(String[] args) {
SparkSession spark = SparkSession
.builder()
.appName("JavaSimpleParamsExample")
.getOrCreate();
// Prepare training data.
// We use LabeledPoint, which is a JavaBean. Spark SQL can convert RDDs of JavaBeans
// into DataFrames, where it uses the bean metadata to infer the schema.
List<LabeledPoint> localTraining = Lists.newArrayList(
new LabeledPoint(1.0, Vectors.dense(0.0, 1.1, 0.1)),
new LabeledPoint(0.0, Vectors.dense(2.0, 1.0, -1.0)),
new LabeledPoint(0.0, Vectors.dense(2.0, 1.3, 1.0)),
new LabeledPoint(1.0, Vectors.dense(0.0, 1.2, -0.5)));
Dataset<Row> training =
spark.createDataFrame(localTraining, LabeledPoint.class);
// Create a LogisticRegression instance. This instance is an Estimator.
LogisticRegression lr = new LogisticRegression();
// Print out the parameters, documentation, and any default values.
System.out.println("LogisticRegression parameters:\n" + lr.explainParams() + "\n");
// We may set parameters using setter methods.
lr.setMaxIter(10)
.setRegParam(0.01);
// Learn a LogisticRegression model. This uses the parameters stored in lr.
LogisticRegressionModel model1 = lr.fit(training);
// Since model1 is a Model (i.e., a Transformer produced by an Estimator),
// we can view the parameters it used during fit().
// This prints the parameter (name: value) pairs, where names are unique IDs for this
// LogisticRegression instance.
System.out.println("Model 1 was fit using parameters: " + model1.parent().extractParamMap());
// We may alternatively specify parameters using a ParamMap.
ParamMap paramMap = new ParamMap();
paramMap.put(lr.maxIter().w(20)); // Specify 1 Param.
paramMap.put(lr.maxIter(), 30); // This overwrites the original maxIter.
double[] thresholds = {0.5, 0.5};
paramMap.put(lr.regParam().w(0.1), lr.thresholds().w(thresholds)); // Specify multiple Params.
// One can also combine ParamMaps.
ParamMap paramMap2 = new ParamMap();
paramMap2.put(lr.probabilityCol().w("myProbability")); // Change output column name.
ParamMap paramMapCombined = paramMap.$plus$plus(paramMap2);
// Now learn a new model using the paramMapCombined parameters.
// paramMapCombined overrides all parameters set earlier via lr.set* methods.
LogisticRegressionModel model2 = lr.fit(training, paramMapCombined);
System.out.println("Model 2 was fit using parameters: " + model2.parent().extractParamMap());
// Prepare test documents.
List<LabeledPoint> localTest = Lists.newArrayList(
new LabeledPoint(1.0, Vectors.dense(-1.0, 1.5, 1.3)),
new LabeledPoint(0.0, Vectors.dense(3.0, 2.0, -0.1)),
new LabeledPoint(1.0, Vectors.dense(0.0, 2.2, -1.5)));
Dataset<Row> test = spark.createDataFrame(localTest, LabeledPoint.class);
// Make predictions on test documents using the Transformer.transform() method.
// LogisticRegressionModel.transform will only use the 'features' column.
// Note that model2.transform() outputs a 'myProbability' column instead of the usual
// 'probability' column since we renamed the lr.probabilityCol parameter previously.
Dataset<Row> results = model2.transform(test);
Dataset<Row> rows = results.select("features", "label", "myProbability", "prediction");
for (Row r: rows.collectAsList()) {
System.out.println("(" + r.get(0) + ", " + r.get(1) + ") -> prob=" + r.get(2)
+ ", prediction=" + r.get(3));
}
spark.stop();
}
}

View file

@ -1,93 +0,0 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.spark.examples.ml;
import java.util.List;
import com.google.common.collect.Lists;
import org.apache.spark.ml.Pipeline;
import org.apache.spark.ml.PipelineModel;
import org.apache.spark.ml.PipelineStage;
import org.apache.spark.ml.classification.LogisticRegression;
import org.apache.spark.ml.feature.HashingTF;
import org.apache.spark.ml.feature.Tokenizer;
import org.apache.spark.sql.Dataset;
import org.apache.spark.sql.Row;
import org.apache.spark.sql.SparkSession;
/**
* A simple text classification pipeline that recognizes "spark" from input text. It uses the Java
* bean classes {@link LabeledDocument} and {@link Document} defined in the Scala counterpart of
* this example {@link SimpleTextClassificationPipeline}. Run with
* <pre>
* bin/run-example ml.JavaSimpleTextClassificationPipeline
* </pre>
*/
public class JavaSimpleTextClassificationPipeline {
public static void main(String[] args) {
SparkSession spark = SparkSession
.builder()
.appName("JavaSimpleTextClassificationPipeline")
.getOrCreate();
// Prepare training documents, which are labeled.
List<LabeledDocument> localTraining = Lists.newArrayList(
new LabeledDocument(0L, "a b c d e spark", 1.0),
new LabeledDocument(1L, "b d", 0.0),
new LabeledDocument(2L, "spark f g h", 1.0),
new LabeledDocument(3L, "hadoop mapreduce", 0.0));
Dataset<Row> training =
spark.createDataFrame(localTraining, LabeledDocument.class);
// Configure an ML pipeline, which consists of three stages: tokenizer, hashingTF, and lr.
Tokenizer tokenizer = new Tokenizer()
.setInputCol("text")
.setOutputCol("words");
HashingTF hashingTF = new HashingTF()
.setNumFeatures(1000)
.setInputCol(tokenizer.getOutputCol())
.setOutputCol("features");
LogisticRegression lr = new LogisticRegression()
.setMaxIter(10)
.setRegParam(0.001);
Pipeline pipeline = new Pipeline()
.setStages(new PipelineStage[] {tokenizer, hashingTF, lr});
// Fit the pipeline to training documents.
PipelineModel model = pipeline.fit(training);
// Prepare test documents, which are unlabeled.
List<Document> localTest = Lists.newArrayList(
new Document(4L, "spark i j k"),
new Document(5L, "l m n"),
new Document(6L, "spark hadoop spark"),
new Document(7L, "apache hadoop"));
Dataset<Row> test = spark.createDataFrame(localTest, Document.class);
// Make predictions on test documents.
Dataset<Row> predictions = model.transform(test);
for (Row r: predictions.select("id", "text", "probability", "prediction").collectAsList()) {
System.out.println("(" + r.get(0) + ", " + r.get(1) + ") --> prob=" + r.get(2)
+ ", prediction=" + r.get(3));
}
spark.stop();
}
}

View file

@ -47,7 +47,7 @@ public class JavaStopWordsRemoverExample {
.setOutputCol("filtered");
List<Row> data = Arrays.asList(
RowFactory.create(Arrays.asList("I", "saw", "the", "red", "baloon")),
RowFactory.create(Arrays.asList("I", "saw", "the", "red", "balloon")),
RowFactory.create(Arrays.asList("Mary", "had", "a", "little", "lamb"))
);

View file

@ -57,17 +57,24 @@ public class JavaTokenizerExample {
Tokenizer tokenizer = new Tokenizer().setInputCol("sentence").setOutputCol("words");
Dataset<Row> wordsDataFrame = tokenizer.transform(sentenceDataFrame);
for (Row r : wordsDataFrame.select("words", "label").takeAsList(3)) {
RegexTokenizer regexTokenizer = new RegexTokenizer()
.setInputCol("sentence")
.setOutputCol("words")
.setPattern("\\W"); // alternatively .setPattern("\\w+").setGaps(false);
Dataset<Row> tokenized = tokenizer.transform(sentenceDataFrame);
for (Row r : tokenized.select("words", "label").takeAsList(3)) {
java.util.List<String> words = r.getList(0);
for (String word : words) System.out.print(word + " ");
System.out.println();
}
RegexTokenizer regexTokenizer = new RegexTokenizer()
.setInputCol("sentence")
.setOutputCol("words")
.setPattern("\\W"); // alternatively .setPattern("\\w+").setGaps(false);
Dataset<Row> regexTokenized = regexTokenizer.transform(sentenceDataFrame);
for (Row r : regexTokenized.select("words", "label").takeAsList(3)) {
java.util.List<String> words = r.getList(0);
for (String word : words) System.out.print(word + " ");
System.out.println();
}
// $example off$
spark.stop();
}

View file

@ -32,7 +32,7 @@ Run with:
if __name__ == "__main__":
spark = SparkSession \
.builder \
.appName("PythonAFTSurvivalRegressionExample") \
.appName("AFTSurvivalRegressionExample") \
.getOrCreate()
# $example on$

View file

@ -31,7 +31,7 @@ Run with:
if __name__ == "__main__":
spark = SparkSession\
.builder\
.appName("PythonBisectingKMeansExample")\
.appName("BisectingKMeansExample")\
.getOrCreate()
# $example on$

View file

@ -24,7 +24,7 @@ from pyspark.ml.evaluation import BinaryClassificationEvaluator
from pyspark.ml.feature import HashingTF, Tokenizer
from pyspark.ml.tuning import CrossValidator, ParamGridBuilder
# $example off$
from pyspark.sql import Row, SparkSession
from pyspark.sql import SparkSession
"""
A simple example demonstrating model selection using CrossValidator.
@ -39,6 +39,7 @@ if __name__ == "__main__":
.builder\
.appName("CrossValidatorExample")\
.getOrCreate()
# $example on$
# Prepare training documents, which are labeled.
training = spark.createDataFrame([

View file

@ -34,15 +34,16 @@ if __name__ == "__main__":
if len(sys.argv) > 2:
print("Usage: dataframe_example.py <libsvm file>", file=sys.stderr)
exit(-1)
spark = SparkSession\
.builder\
.appName("DataFrameExample")\
.getOrCreate()
if len(sys.argv) == 2:
elif len(sys.argv) == 2:
input = sys.argv[1]
else:
input = "data/mllib/sample_libsvm_data.txt"
spark = SparkSession \
.builder \
.appName("DataFrameExample") \
.getOrCreate()
# Load input data
print("Loading LIBSVM file with UDT from " + input + ".")
df = spark.read.format("libsvm").load(input).cache()

View file

@ -31,7 +31,7 @@ from pyspark.sql import SparkSession
if __name__ == "__main__":
spark = SparkSession\
.builder\
.appName("decision_tree_classification_example")\
.appName("DecisionTreeClassificationExample")\
.getOrCreate()
# $example on$

View file

@ -18,6 +18,7 @@
"""
Estimator Transformer Param Example.
"""
from __future__ import print_function
# $example on$
from pyspark.ml.linalg import Vectors
@ -42,7 +43,7 @@ if __name__ == "__main__":
# Create a LogisticRegression instance. This instance is an Estimator.
lr = LogisticRegression(maxIter=10, regParam=0.01)
# Print out the parameters, documentation, and any default values.
print "LogisticRegression parameters:\n" + lr.explainParams() + "\n"
print("LogisticRegression parameters:\n" + lr.explainParams() + "\n")
# Learn a LogisticRegression model. This uses the parameters stored in lr.
model1 = lr.fit(training)
@ -51,8 +52,8 @@ if __name__ == "__main__":
# we can view the parameters it used during fit().
# This prints the parameter (name: value) pairs, where names are unique IDs for this
# LogisticRegression instance.
print "Model 1 was fit using parameters: "
print model1.extractParamMap()
print("Model 1 was fit using parameters: ")
print(model1.extractParamMap())
# We may alternatively specify parameters using a Python dictionary as a paramMap
paramMap = {lr.maxIter: 20}
@ -67,8 +68,8 @@ if __name__ == "__main__":
# Now learn a new model using the paramMapCombined parameters.
# paramMapCombined overrides all parameters set earlier via lr.set* methods.
model2 = lr.fit(training, paramMapCombined)
print "Model 2 was fit using parameters: "
print model2.extractParamMap()
print("Model 2 was fit using parameters: ")
print(model2.extractParamMap())
# Prepare test data
test = spark.createDataFrame([
@ -81,9 +82,12 @@ if __name__ == "__main__":
# Note that model2.transform() outputs a "myProbability" column instead of the usual
# 'probability' column since we renamed the lr.probabilityCol parameter previously.
prediction = model2.transform(test)
selected = prediction.select("features", "label", "myProbability", "prediction")
for row in selected.collect():
print row
result = prediction.select("features", "label", "myProbability", "prediction") \
.collect()
for row in result:
print("features=%s, label=%s -> prob=%s, prediction=%s"
% (row.features, row.label, row.myProbability, row.prediction))
# $example off$
spark.stop()

View file

@ -31,7 +31,7 @@ Run with:
if __name__ == "__main__":
spark = SparkSession\
.builder\
.appName("PythonGuassianMixtureExample")\
.appName("GaussianMixtureExample")\
.getOrCreate()
# $example on$

View file

@ -31,7 +31,7 @@ from pyspark.sql import SparkSession
if __name__ == "__main__":
spark = SparkSession\
.builder\
.appName("gradient_boosted_tree_classifier_example")\
.appName("GradientBoostedTreeClassifierExample")\
.getOrCreate()
# $example on$

View file

@ -31,7 +31,7 @@ from pyspark.sql import SparkSession
if __name__ == "__main__":
spark = SparkSession\
.builder\
.appName("gradient_boosted_tree_regressor_example")\
.appName("GradientBoostedTreeRegressorExample")\
.getOrCreate()
# $example on$

View file

@ -21,7 +21,7 @@ Isotonic Regression Example.
from __future__ import print_function
# $example on$
from pyspark.ml.regression import IsotonicRegression, IsotonicRegressionModel
from pyspark.ml.regression import IsotonicRegression
# $example off$
from pyspark.sql import SparkSession
@ -30,11 +30,11 @@ An example demonstrating isotonic regression.
Run with:
bin/spark-submit examples/src/main/python/ml/isotonic_regression_example.py
"""
if __name__ == "__main__":
if __name__ == "__main__":
spark = SparkSession\
.builder\
.appName("PythonIsotonicRegressionExample")\
.appName("IsotonicRegressionExample")\
.getOrCreate()
# $example on$

View file

@ -31,12 +31,10 @@ Run with:
This example requires NumPy (http://www.numpy.org/).
"""
if __name__ == "__main__":
spark = SparkSession\
.builder\
.appName("PythonKMeansExample")\
.appName("KMeansExample")\
.getOrCreate()
# $example on$

View file

@ -23,16 +23,13 @@ from pyspark.ml.clustering import LDA
# $example off$
from pyspark.sql import SparkSession
"""
An example demonstrating LDA.
Run with:
bin/spark-submit examples/src/main/python/ml/lda_example.py
"""
if __name__ == "__main__":
# Creates a SparkSession
spark = SparkSession \
.builder \
.appName("LDAExample") \

View file

@ -31,18 +31,23 @@ if __name__ == "__main__":
# Load training data
data = spark.read.format("libsvm")\
.load("data/mllib/sample_multiclass_classification_data.txt")
# Split the data into train and test
splits = data.randomSplit([0.6, 0.4], 1234)
train = splits[0]
test = splits[1]
# specify layers for the neural network:
# input layer of size 4 (features), two intermediate of size 5 and 4
# and output of size 3 (classes)
layers = [4, 5, 4, 3]
# create the trainer and set its parameters
trainer = MultilayerPerceptronClassifier(maxIter=100, layers=layers, blockSize=128, seed=1234)
# train the model
model = trainer.fit(train)
# compute accuracy on the test set
result = model.transform(test)
predictionAndLabels = result.select("prediction", "label")

View file

@ -34,8 +34,10 @@ if __name__ == "__main__":
(1, ["I", "wish", "Java", "could", "use", "case", "classes"]),
(2, ["Logistic", "regression", "models", "are", "neat"])
], ["label", "words"])
ngram = NGram(inputCol="words", outputCol="ngrams")
ngramDataFrame = ngram.transform(wordDataFrame)
for ngrams_label in ngramDataFrame.select("ngrams", "label").take(3):
print(ngrams_label)
# $example off$

View file

@ -26,13 +26,14 @@ from pyspark.sql import SparkSession
if __name__ == "__main__":
spark = SparkSession\
.builder\
.appName("naive_bayes_example")\
.appName("NaiveBayesExample")\
.getOrCreate()
# $example on$
# Load training data
data = spark.read.format("libsvm") \
.load("data/mllib/sample_libsvm_data.txt")
# Split the data into train and test
splits = data.randomSplit([0.6, 0.4], 1234)
train = splits[0]
@ -43,6 +44,7 @@ if __name__ == "__main__":
# train the model
model = nb.fit(train)
# compute accuracy on the test set
result = model.transform(test)
predictionAndLabels = result.select("prediction", "label")

View file

@ -30,11 +30,10 @@ Run with:
bin/spark-submit examples/src/main/python/ml/one_vs_rest_example.py
"""
if __name__ == "__main__":
spark = SparkSession \
.builder \
.appName("PythonOneVsRestExample") \
.appName("OneVsRestExample") \
.getOrCreate()
# $example on$
@ -62,7 +61,7 @@ if __name__ == "__main__":
# compute the classification error on test data.
accuracy = evaluator.evaluate(predictions)
print("Test Error : " + str(1 - accuracy))
print("Test Error = %g" % (1.0 - accuracy))
# $example off$
spark.stop()

View file

@ -41,6 +41,7 @@ if __name__ == "__main__":
stringIndexer = StringIndexer(inputCol="category", outputCol="categoryIndex")
model = stringIndexer.fit(df)
indexed = model.transform(df)
encoder = OneHotEncoder(dropLast=False, inputCol="categoryIndex", outputCol="categoryVec")
encoded = encoder.transform(indexed)
encoded.select("id", "categoryVec").show()

View file

@ -34,8 +34,10 @@ if __name__ == "__main__":
(Vectors.dense([2.0, 0.0, 3.0, 4.0, 5.0]),),
(Vectors.dense([4.0, 0.0, 0.0, 6.0, 7.0]),)]
df = spark.createDataFrame(data, ["features"])
pca = PCA(k=3, inputCol="features", outputCol="pcaFeatures")
model = pca.fit(df)
result = model.transform(df).select("pcaFeatures")
result.show(truncate=False)
# $example off$

View file

@ -38,12 +38,13 @@ if __name__ == "__main__":
(0L, "a b c d e spark", 1.0),
(1L, "b d", 0.0),
(2L, "spark f g h", 1.0),
(3L, "hadoop mapreduce", 0.0)], ["id", "text", "label"])
(3L, "hadoop mapreduce", 0.0)
], ["id", "text", "label"])
# Configure an ML pipeline, which consists of three stages: tokenizer, hashingTF, and lr.
tokenizer = Tokenizer(inputCol="text", outputCol="words")
hashingTF = HashingTF(inputCol=tokenizer.getOutputCol(), outputCol="features")
lr = LogisticRegression(maxIter=10, regParam=0.01)
lr = LogisticRegression(maxIter=10, regParam=0.001)
pipeline = Pipeline(stages=[tokenizer, hashingTF, lr])
# Fit the pipeline to training documents.
@ -53,8 +54,9 @@ if __name__ == "__main__":
test = spark.createDataFrame([
(4L, "spark i j k"),
(5L, "l m n"),
(6L, "mapreduce spark"),
(7L, "apache hadoop")], ["id", "text"])
(6L, "spark hadoop spark"),
(7L, "apache hadoop")
], ["id", "text"])
# Make predictions on test documents and print columns of interest.
prediction = model.transform(test)

View file

@ -30,13 +30,15 @@ if __name__ == "__main__":
.getOrCreate()
# $example on$
df = spark\
.createDataFrame([(Vectors.dense([-2.0, 2.3]),),
(Vectors.dense([0.0, 0.0]),),
(Vectors.dense([0.6, -1.1]),)],
["features"])
df = spark.createDataFrame([
(Vectors.dense([-2.0, 2.3]),),
(Vectors.dense([0.0, 0.0]),),
(Vectors.dense([0.6, -1.1]),)
], ["features"])
px = PolynomialExpansion(degree=3, inputCol="features", outputCol="polyFeatures")
polyDF = px.transform(df)
for expanded in polyDF.select("polyFeatures").take(3):
print(expanded)
# $example off$

View file

@ -22,18 +22,22 @@ from pyspark.ml.feature import QuantileDiscretizer
# $example off$
from pyspark.sql import SparkSession
if __name__ == "__main__":
spark = SparkSession.builder.appName("QuantileDiscretizerExample").getOrCreate()
spark = SparkSession\
.builder\
.appName("QuantileDiscretizerExample")\
.getOrCreate()
# $example on$
data = [(0, 18.0,), (1, 19.0,), (2, 8.0,), (3, 5.0,), (4, 2.2,)]
df = spark.createDataFrame(data, ["id", "hour"])
# $example off$
# Output of QuantileDiscretizer for such small datasets can depend on the number of
# partitions. Here we force a single partition to ensure consistent results.
# Note this is not necessary for normal use cases
df = df.repartition(1)
# $example on$
discretizer = QuantileDiscretizer(numBuckets=3, inputCol="hour", outputCol="result")

View file

@ -23,7 +23,7 @@ from __future__ import print_function
# $example on$
from pyspark.ml import Pipeline
from pyspark.ml.classification import RandomForestClassifier
from pyspark.ml.feature import StringIndexer, VectorIndexer
from pyspark.ml.feature import IndexToString, StringIndexer, VectorIndexer
from pyspark.ml.evaluation import MulticlassClassificationEvaluator
# $example off$
from pyspark.sql import SparkSession
@ -31,7 +31,7 @@ from pyspark.sql import SparkSession
if __name__ == "__main__":
spark = SparkSession\
.builder\
.appName("random_forest_classifier_example")\
.appName("RandomForestClassifierExample")\
.getOrCreate()
# $example on$
@ -41,6 +41,7 @@ if __name__ == "__main__":
# Index labels, adding metadata to the label column.
# Fit on whole dataset to include all labels in index.
labelIndexer = StringIndexer(inputCol="label", outputCol="indexedLabel").fit(data)
# Automatically identify categorical features, and index them.
# Set maxCategories so features with > 4 distinct values are treated as continuous.
featureIndexer =\
@ -52,8 +53,12 @@ if __name__ == "__main__":
# Train a RandomForest model.
rf = RandomForestClassifier(labelCol="indexedLabel", featuresCol="indexedFeatures", numTrees=10)
# Convert indexed labels back to original labels.
labelConverter = IndexToString(inputCol="prediction", outputCol="predictedLabel",
labels=labelIndexer.labels)
# Chain indexers and forest in a Pipeline
pipeline = Pipeline(stages=[labelIndexer, featureIndexer, rf])
pipeline = Pipeline(stages=[labelIndexer, featureIndexer, rf, labelConverter])
# Train model. This also runs the indexers.
model = pipeline.fit(trainingData)
@ -62,7 +67,7 @@ if __name__ == "__main__":
predictions = model.transform(testData)
# Select example rows to display.
predictions.select("prediction", "indexedLabel", "features").show(5)
predictions.select("predictedLabel", "label", "features").show(5)
# Select (prediction, true label) and compute test error
evaluator = MulticlassClassificationEvaluator(

View file

@ -31,7 +31,7 @@ from pyspark.sql import SparkSession
if __name__ == "__main__":
spark = SparkSession\
.builder\
.appName("random_forest_regressor_example")\
.appName("RandomForestRegressorExample")\
.getOrCreate()
# $example on$

View file

@ -34,10 +34,12 @@ if __name__ == "__main__":
(8, "CA", 12, 0.0),
(9, "NZ", 15, 0.0)],
["id", "country", "hour", "clicked"])
formula = RFormula(
formula="clicked ~ country + hour",
featuresCol="features",
labelCol="label")
output = formula.fit(dataset).transform(dataset)
output.select("features", "label").show()
# $example off$

View file

@ -1,95 +0,0 @@
#
# Licensed to the Apache Software Foundation (ASF) under one or more
# contributor license agreements. See the NOTICE file distributed with
# this work for additional information regarding copyright ownership.
# The ASF licenses this file to You under the Apache License, Version 2.0
# (the "License"); you may not use this file except in compliance with
# the License. You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.
#
from __future__ import print_function
import pprint
import sys
from pyspark.ml.classification import LogisticRegression
from pyspark.ml.linalg import DenseVector
from pyspark.sql import Row, SparkSession
"""
A simple example demonstrating ways to specify parameters for Estimators and Transformers.
Run with:
bin/spark-submit examples/src/main/python/ml/simple_params_example.py
"""
if __name__ == "__main__":
spark = SparkSession \
.builder \
.appName("SimpleParamsExample") \
.getOrCreate()
# prepare training data.
# We create an RDD of LabeledPoints and convert them into a DataFrame.
# A LabeledPoint is an Object with two fields named label and features
# and Spark SQL identifies these fields and creates the schema appropriately.
training = spark.createDataFrame([
Row(label=1.0, features=DenseVector([0.0, 1.1, 0.1])),
Row(label=0.0, features=DenseVector([2.0, 1.0, -1.0])),
Row(label=0.0, features=DenseVector([2.0, 1.3, 1.0])),
Row(label=1.0, features=DenseVector([0.0, 1.2, -0.5]))])
# Create a LogisticRegression instance with maxIter = 10.
# This instance is an Estimator.
lr = LogisticRegression(maxIter=10)
# Print out the parameters, documentation, and any default values.
print("LogisticRegression parameters:\n" + lr.explainParams() + "\n")
# We may also set parameters using setter methods.
lr.setRegParam(0.01)
# Learn a LogisticRegression model. This uses the parameters stored in lr.
model1 = lr.fit(training)
# Since model1 is a Model (i.e., a Transformer produced by an Estimator),
# we can view the parameters it used during fit().
# This prints the parameter (name: value) pairs, where names are unique IDs for this
# LogisticRegression instance.
print("Model 1 was fit using parameters:\n")
pprint.pprint(model1.extractParamMap())
# We may alternatively specify parameters using a parameter map.
# paramMap overrides all lr parameters set earlier.
paramMap = {lr.maxIter: 20, lr.thresholds: [0.5, 0.5], lr.probabilityCol: "myProbability"}
# Now learn a new model using the new parameters.
model2 = lr.fit(training, paramMap)
print("Model 2 was fit using parameters:\n")
pprint.pprint(model2.extractParamMap())
# prepare test data.
test = spark.createDataFrame([
Row(label=1.0, features=DenseVector([-1.0, 1.5, 1.3])),
Row(label=0.0, features=DenseVector([3.0, 2.0, -0.1])),
Row(label=0.0, features=DenseVector([0.0, 2.2, -1.5]))])
# Make predictions on test data using the Transformer.transform() method.
# LogisticRegressionModel.transform will only use the 'features' column.
# Note that model2.transform() outputs a 'myProbability' column instead of the usual
# 'probability' column since we renamed the lr.probabilityCol parameter previously.
result = model2.transform(test) \
.select("features", "label", "myProbability", "prediction") \
.collect()
for row in result:
print("features=%s,label=%s -> prob=%s, prediction=%s"
% (row.features, row.label, row.myProbability, row.prediction))
spark.stop()

View file

@ -1,72 +0,0 @@
#
# Licensed to the Apache Software Foundation (ASF) under one or more
# contributor license agreements. See the NOTICE file distributed with
# this work for additional information regarding copyright ownership.
# The ASF licenses this file to You under the Apache License, Version 2.0
# (the "License"); you may not use this file except in compliance with
# the License. You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.
#
from __future__ import print_function
from pyspark.ml import Pipeline
from pyspark.ml.classification import LogisticRegression
from pyspark.ml.feature import HashingTF, Tokenizer
from pyspark.sql import Row, SparkSession
"""
A simple text classification pipeline that recognizes "spark" from
input text. This is to show how to create and configure a Spark ML
pipeline in Python. Run with:
bin/spark-submit examples/src/main/python/ml/simple_text_classification_pipeline.py
"""
if __name__ == "__main__":
spark = SparkSession\
.builder\
.appName("SimpleTextClassificationPipeline")\
.getOrCreate()
# Prepare training documents, which are labeled.
training = spark.createDataFrame([
(0, "a b c d e spark", 1.0),
(1, "b d", 0.0),
(2, "spark f g h", 1.0),
(3, "hadoop mapreduce", 0.0)
], ["id", "text", "label"])
# Configure an ML pipeline, which consists of tree stages: tokenizer, hashingTF, and lr.
tokenizer = Tokenizer(inputCol="text", outputCol="words")
hashingTF = HashingTF(numFeatures=1000, inputCol=tokenizer.getOutputCol(), outputCol="features")
lr = LogisticRegression(maxIter=10, regParam=0.001)
pipeline = Pipeline(stages=[tokenizer, hashingTF, lr])
# Fit the pipeline to training documents.
model = pipeline.fit(training)
# Prepare test documents, which are unlabeled.
test = spark.createDataFrame([
(4, "spark i j k"),
(5, "l m n"),
(6, "spark hadoop spark"),
(7, "apache hadoop")
], ["id", "text"])
# Make predictions on test documents and print columns of interest.
prediction = model.transform(test)
selected = prediction.select("id", "text", "prediction")
for row in selected.collect():
print(row)
spark.stop()

View file

@ -30,7 +30,7 @@ if __name__ == "__main__":
# $example on$
sentenceData = spark.createDataFrame([
(0, ["I", "saw", "the", "red", "baloon"]),
(0, ["I", "saw", "the", "red", "balloon"]),
(1, ["Mary", "had", "a", "little", "lamb"])
], ["label", "raw"])

View file

@ -32,6 +32,7 @@ if __name__ == "__main__":
df = spark.createDataFrame(
[(0, "a"), (1, "b"), (2, "c"), (3, "a"), (4, "a"), (5, "c")],
["id", "category"])
indexer = StringIndexer(inputCol="category", outputCol="categoryIndex")
indexed = indexer.fit(df).transform(df)
indexed.show()

View file

@ -34,8 +34,10 @@ if __name__ == "__main__":
(0, "I wish Java could use case classes"),
(1, "Logistic regression models are neat")
], ["label", "sentence"])
tokenizer = Tokenizer(inputCol="sentence", outputCol="words")
wordsData = tokenizer.transform(sentenceData)
hashingTF = HashingTF(inputCol="words", outputCol="rawFeatures", numFeatures=20)
featurizedData = hashingTF.transform(wordsData)
# alternatively, CountVectorizer can also be used to get term frequency vectors
@ -43,6 +45,7 @@ if __name__ == "__main__":
idf = IDF(inputCol="rawFeatures", outputCol="features")
idfModel = idf.fit(featurizedData)
rescaledData = idfModel.transform(featurizedData)
for features_label in rescaledData.select("features", "label").take(3):
print(features_label)
# $example off$

View file

@ -34,12 +34,19 @@ if __name__ == "__main__":
(1, "I wish Java could use case classes"),
(2, "Logistic,regression,models,are,neat")
], ["label", "sentence"])
tokenizer = Tokenizer(inputCol="sentence", outputCol="words")
wordsDataFrame = tokenizer.transform(sentenceDataFrame)
for words_label in wordsDataFrame.select("words", "label").take(3):
print(words_label)
regexTokenizer = RegexTokenizer(inputCol="sentence", outputCol="words", pattern="\\W")
# alternatively, pattern="\\w+", gaps(False)
tokenized = tokenizer.transform(sentenceDataFrame)
for words_label in tokenized.select("words", "label").take(3):
print(words_label)
regexTokenized = regexTokenizer.transform(sentenceDataFrame)
for words_label in regexTokenized.select("words", "label").take(3):
print(words_label)
# $example off$
spark.stop()

View file

@ -35,18 +35,21 @@ if __name__ == "__main__":
.builder\
.appName("TrainValidationSplit")\
.getOrCreate()
# $example on$
# Prepare training and test data.
data = spark.read.format("libsvm")\
.load("data/mllib/sample_linear_regression_data.txt")
train, test = data.randomSplit([0.7, 0.3])
lr = LinearRegression(maxIter=10, regParam=0.1)
train, test = data.randomSplit([0.9, 0.1], seed=12345)
lr = LinearRegression(maxIter=10)
# We use a ParamGridBuilder to construct a grid of parameters to search over.
# TrainValidationSplit will try all combinations of values and determine best model using
# the evaluator.
paramGrid = ParamGridBuilder()\
.addGrid(lr.regParam, [0.1, 0.01]) \
.addGrid(lr.fitIntercept, [False, True])\
.addGrid(lr.elasticNetParam, [0.0, 0.5, 1.0])\
.build()
@ -60,6 +63,7 @@ if __name__ == "__main__":
# Run TrainValidationSplit, and choose the best set of parameters.
model = tvs.fit(train)
# Make predictions on test data. model is the model with combination of parameters
# that performed best.
prediction = model.transform(test)

View file

@ -33,9 +33,11 @@ if __name__ == "__main__":
dataset = spark.createDataFrame(
[(0, 18, 1.0, Vectors.dense([0.0, 10.0, 0.5]), 1.0)],
["id", "hour", "mobile", "userFeatures", "clicked"])
assembler = VectorAssembler(
inputCols=["hour", "mobile", "userFeatures"],
outputCol="features")
output = assembler.transform(dataset)
print(output.select("features", "clicked").first())
# $example off$

View file

@ -30,6 +30,7 @@ if __name__ == "__main__":
# $example on$
data = spark.read.format("libsvm").load("data/mllib/sample_libsvm_data.txt")
indexer = VectorIndexer(inputCol="features", outputCol="indexed", maxCategories=10)
indexerModel = indexer.fit(data)

View file

@ -35,9 +35,11 @@ if __name__ == "__main__":
("I wish Java could use case classes".split(" "), ),
("Logistic regression models are neat".split(" "), )
], ["text"])
# Learn a mapping from words to Vectors.
word2Vec = Word2Vec(vectorSize=3, minCount=0, inputCol="text", outputCol="result")
model = word2Vec.fit(documentDF)
result = model.transform(documentDF)
for feature in result.select("result").take(3):
print(feature)

View file

@ -22,7 +22,6 @@
To run this example use
`$ bin/spark-submit examples/src/main/python/streaming/queue_stream.py
"""
import sys
import time
from pyspark import SparkContext

View file

@ -18,7 +18,6 @@
// scalastyle:off println
package org.apache.spark.examples
import org.apache.spark.SparkConf
import org.apache.spark.sql.SparkSession
/**

View file

@ -33,8 +33,10 @@ import org.apache.spark.sql.SparkSession
*/
object GaussianMixtureExample {
def main(args: Array[String]): Unit = {
// Creates a SparkSession
val spark = SparkSession.builder.appName(s"${this.getClass.getSimpleName}").getOrCreate()
val spark = SparkSession
.builder
.appName(s"${this.getClass.getSimpleName}")
.getOrCreate()
// $example on$
// Loads data

View file

@ -33,8 +33,6 @@ import org.apache.spark.sql.SparkSession
object IsotonicRegressionExample {
def main(args: Array[String]): Unit = {
// Creates a SparkSession.
val spark = SparkSession
.builder
.appName(s"${this.getClass.getSimpleName}")

View file

@ -34,7 +34,6 @@ import org.apache.spark.sql.SparkSession
object KMeansExample {
def main(args: Array[String]): Unit = {
// Creates a SparkSession.
val spark = SparkSession
.builder
.appName(s"${this.getClass.getSimpleName}")

View file

@ -46,6 +46,7 @@ object ModelSelectionViaTrainValidationSplitExample {
val Array(training, test) = data.randomSplit(Array(0.9, 0.1), seed = 12345)
val lr = new LinearRegression()
.setMaxIter(10)
// We use a ParamGridBuilder to construct a grid of parameters to search over.
// TrainValidationSplit will try all combinations of values and determine best model using

View file

@ -39,27 +39,33 @@ object MultilayerPerceptronClassifierExample {
// Load the data stored in LIBSVM format as a DataFrame.
val data = spark.read.format("libsvm")
.load("data/mllib/sample_multiclass_classification_data.txt")
// Split the data into train and test
val splits = data.randomSplit(Array(0.6, 0.4), seed = 1234L)
val train = splits(0)
val test = splits(1)
// specify layers for the neural network:
// input layer of size 4 (features), two intermediate of size 5 and 4
// and output of size 3 (classes)
val layers = Array[Int](4, 5, 4, 3)
// create the trainer and set its parameters
val trainer = new MultilayerPerceptronClassifier()
.setLayers(layers)
.setBlockSize(128)
.setSeed(1234L)
.setMaxIter(100)
// train the model
val model = trainer.fit(train)
// compute accuracy on the test set
val result = model.transform(test)
val predictionAndLabels = result.select("prediction", "label")
val evaluator = new MulticlassClassificationEvaluator()
.setMetricName("accuracy")
println("Accuracy: " + evaluator.evaluate(predictionAndLabels))
// $example off$

View file

@ -30,6 +30,7 @@ object NaiveBayesExample {
.builder
.appName("NaiveBayesExample")
.getOrCreate()
// $example on$
// Load the data stored in LIBSVM format as a DataFrame.
val data = spark.read.format("libsvm").load("data/mllib/sample_libsvm_data.txt")

View file

@ -54,7 +54,7 @@ object PipelineExample {
.setOutputCol("features")
val lr = new LogisticRegression()
.setMaxIter(10)
.setRegParam(0.01)
.setRegParam(0.001)
val pipeline = new Pipeline()
.setStages(Array(tokenizer, hashingTF, lr))
@ -74,7 +74,7 @@ object PipelineExample {
val test = spark.createDataFrame(Seq(
(4L, "spark i j k"),
(5L, "l m n"),
(6L, "mapreduce spark"),
(6L, "spark hadoop spark"),
(7L, "apache hadoop")
)).toDF("id", "text")

View file

@ -28,16 +28,16 @@ object QuantileDiscretizerExample {
.builder
.appName("QuantileDiscretizerExample")
.getOrCreate()
import spark.implicits._
// $example on$
val data = Array((0, 18.0), (1, 19.0), (2, 8.0), (3, 5.0), (4, 2.2))
var df = spark.createDataFrame(data).toDF("id", "hour")
val df = spark.createDataFrame(data).toDF("id", "hour")
// $example off$
// Output of QuantileDiscretizer for such small datasets can depend on the number of
// partitions. Here we force a single partition to ensure consistent results.
// Note this is not necessary for normal use cases
.repartition(1)
// $example on$
val discretizer = new QuantileDiscretizer()
.setInputCol("hour")

View file

@ -36,10 +36,12 @@ object RFormulaExample {
(8, "CA", 12, 0.0),
(9, "NZ", 15, 0.0)
)).toDF("id", "country", "hour", "clicked")
val formula = new RFormula()
.setFormula("clicked ~ country + hour")
.setFeaturesCol("features")
.setLabelCol("label")
val output = formula.fit(dataset).transform(dataset)
output.select("features", "label").show()
// $example off$

View file

@ -1,104 +0,0 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
// scalastyle:off println
package org.apache.spark.examples.ml
import org.apache.spark.ml.classification.LogisticRegression
import org.apache.spark.ml.feature.LabeledPoint
import org.apache.spark.ml.linalg.{Vector, Vectors}
import org.apache.spark.ml.param.ParamMap
import org.apache.spark.sql.{Row, SparkSession}
/**
* A simple example demonstrating ways to specify parameters for Estimators and Transformers.
* Run with
* {{{
* bin/run-example ml.SimpleParamsExample
* }}}
*/
object SimpleParamsExample {
def main(args: Array[String]) {
val spark = SparkSession
.builder
.appName("SimpleParamsExample")
.getOrCreate()
import spark.implicits._
// Prepare training data.
// We use LabeledPoint, which is a case class. Spark SQL can convert RDDs of case classes
// into DataFrames, where it uses the case class metadata to infer the schema.
val training = spark.createDataFrame(Seq(
LabeledPoint(1.0, Vectors.dense(0.0, 1.1, 0.1)),
LabeledPoint(0.0, Vectors.dense(2.0, 1.0, -1.0)),
LabeledPoint(0.0, Vectors.dense(2.0, 1.3, 1.0)),
LabeledPoint(1.0, Vectors.dense(0.0, 1.2, -0.5))))
// Create a LogisticRegression instance. This instance is an Estimator.
val lr = new LogisticRegression()
// Print out the parameters, documentation, and any default values.
println("LogisticRegression parameters:\n" + lr.explainParams() + "\n")
// We may set parameters using setter methods.
lr.setMaxIter(10)
.setRegParam(0.01)
// Learn a LogisticRegression model. This uses the parameters stored in lr.
val model1 = lr.fit(training)
// Since model1 is a Model (i.e., a Transformer produced by an Estimator),
// we can view the parameters it used during fit().
// This prints the parameter (name: value) pairs, where names are unique IDs for this
// LogisticRegression instance.
println("Model 1 was fit using parameters: " + model1.parent.extractParamMap())
// We may alternatively specify parameters using a ParamMap,
// which supports several methods for specifying parameters.
val paramMap = ParamMap(lr.maxIter -> 20)
paramMap.put(lr.maxIter, 30) // Specify 1 Param. This overwrites the original maxIter.
paramMap.put(lr.regParam -> 0.1, lr.thresholds -> Array(0.5, 0.5)) // Specify multiple Params.
// One can also combine ParamMaps.
val paramMap2 = ParamMap(lr.probabilityCol -> "myProbability") // Change output column name
val paramMapCombined = paramMap ++ paramMap2
// Now learn a new model using the paramMapCombined parameters.
// paramMapCombined overrides all parameters set earlier via lr.set* methods.
val model2 = lr.fit(training.toDF(), paramMapCombined)
println("Model 2 was fit using parameters: " + model2.parent.extractParamMap())
// Prepare test data.
val test = spark.createDataFrame(Seq(
LabeledPoint(1.0, Vectors.dense(-1.0, 1.5, 1.3)),
LabeledPoint(0.0, Vectors.dense(3.0, 2.0, -0.1)),
LabeledPoint(1.0, Vectors.dense(0.0, 2.2, -1.5))))
// Make predictions on test data using the Transformer.transform() method.
// LogisticRegressionModel.transform will only use the 'features' column.
// Note that model2.transform() outputs a 'myProbability' column instead of the usual
// 'probability' column since we renamed the lr.probabilityCol parameter previously.
model2.transform(test)
.select("features", "label", "myProbability", "prediction")
.collect()
.foreach { case Row(features: Vector, label: Double, prob: Vector, prediction: Double) =>
println(s"($features, $label) -> prob=$prob, prediction=$prediction")
}
spark.stop()
}
}
// scalastyle:on println

View file

@ -1,93 +0,0 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
// scalastyle:off println
package org.apache.spark.examples.ml
import scala.beans.BeanInfo
import org.apache.spark.ml.Pipeline
import org.apache.spark.ml.classification.LogisticRegression
import org.apache.spark.ml.feature.{HashingTF, Tokenizer}
import org.apache.spark.ml.linalg.Vector
import org.apache.spark.sql.{Row, SparkSession}
@BeanInfo
case class LabeledDocument(id: Long, text: String, label: Double)
@BeanInfo
case class Document(id: Long, text: String)
/**
* A simple text classification pipeline that recognizes "spark" from input text. This is to show
* how to create and configure an ML pipeline. Run with
* {{{
* bin/run-example ml.SimpleTextClassificationPipeline
* }}}
*/
object SimpleTextClassificationPipeline {
def main(args: Array[String]) {
val spark = SparkSession
.builder
.appName("SimpleTextClassificationPipeline")
.getOrCreate()
import spark.implicits._
// Prepare training documents, which are labeled.
val training = spark.createDataFrame(Seq(
LabeledDocument(0L, "a b c d e spark", 1.0),
LabeledDocument(1L, "b d", 0.0),
LabeledDocument(2L, "spark f g h", 1.0),
LabeledDocument(3L, "hadoop mapreduce", 0.0)))
// Configure an ML pipeline, which consists of three stages: tokenizer, hashingTF, and lr.
val tokenizer = new Tokenizer()
.setInputCol("text")
.setOutputCol("words")
val hashingTF = new HashingTF()
.setNumFeatures(1000)
.setInputCol(tokenizer.getOutputCol)
.setOutputCol("features")
val lr = new LogisticRegression()
.setMaxIter(10)
.setRegParam(0.001)
val pipeline = new Pipeline()
.setStages(Array(tokenizer, hashingTF, lr))
// Fit the pipeline to training documents.
val model = pipeline.fit(training.toDF())
// Prepare test documents, which are unlabeled.
val test = spark.createDataFrame(Seq(
Document(4L, "spark i j k"),
Document(5L, "l m n"),
Document(6L, "spark hadoop spark"),
Document(7L, "apache hadoop")))
// Make predictions on test documents.
model.transform(test.toDF())
.select("id", "text", "probability", "prediction")
.collect()
.foreach { case Row(id: Long, text: String, prob: Vector, prediction: Double) =>
println(s"($id, $text) --> prob=$prob, prediction=$prediction")
}
spark.stop()
}
}
// scalastyle:on println

View file

@ -36,7 +36,7 @@ object StopWordsRemoverExample {
.setOutputCol("filtered")
val dataSet = spark.createDataFrame(Seq(
(0, Seq("I", "saw", "the", "red", "baloon")),
(0, Seq("I", "saw", "the", "red", "balloon")),
(1, Seq("Mary", "had", "a", "little", "lamb"))
)).toDF("id", "raw")

View file

@ -40,13 +40,16 @@ object TfIdfExample {
val tokenizer = new Tokenizer().setInputCol("sentence").setOutputCol("words")
val wordsData = tokenizer.transform(sentenceData)
val hashingTF = new HashingTF()
.setInputCol("words").setOutputCol("rawFeatures").setNumFeatures(20)
val featurizedData = hashingTF.transform(wordsData)
// alternatively, CountVectorizer can also be used to get term frequency vectors
val idf = new IDF().setInputCol("rawFeatures").setOutputCol("features")
val idfModel = idf.fit(featurizedData)
val rescaledData = idfModel.transform(featurizedData)
rescaledData.select("features", "label").take(3).foreach(println)
// $example off$

View file

@ -45,6 +45,7 @@ object TokenizerExample {
val tokenized = tokenizer.transform(sentenceDataFrame)
tokenized.select("words", "label").take(3).foreach(println)
val regexTokenized = regexTokenizer.transform(sentenceDataFrame)
regexTokenized.select("words", "label").take(3).foreach(println)
// $example off$

View file

@ -45,6 +45,7 @@ object Word2VecExample {
.setVectorSize(3)
.setMinCount(0)
val model = word2Vec.fit(documentDF)
val result = model.transform(documentDF)
result.select("result").take(3).foreach(println)
// $example off$