While play-testing the Scala and Java code examples in the MLlib docs, I noticed a number of small compile errors, and some typos. This led to finding and fixing a few similar items in other docs. Then in the course of building the site docs to check the result, I found a few small suggestions for the build instructions. I also found a few more formatting and markdown issues uncovered when I accidentally used maruku instead of kramdown. Author: Sean Owen <sowen@cloudera.com> Closes #653 from srowen/SPARK-1727 and squashes the following commits: 6e7c38a [Sean Owen] Final doc updates - one more compile error, and use of mean instead of sum and count 8f5e847 [Sean Owen] Fix markdown syntax issues that maruku flags, even though we use kramdown (but only those that do not affect kramdown's output) 99966a9 [Sean Owen] Update issue tracker URL in docs 23c9ac3 [Sean Owen] Add Scala Naive Bayes example, to use existing example data file (whose format needed a tweak) 8c81982 [Sean Owen] Fix small compile errors and typos across MLlib docs
9.4 KiB
layout | title |
---|---|
global | Java Programming Guide |
The Spark Java API exposes all the Spark features available in the Scala version to Java. To learn the basics of Spark, we recommend reading through the Scala programming guide first; it should be easy to follow even if you don't know Scala. This guide will show how to use the Spark features described there in Java.
The Spark Java API is defined in the
org.apache.spark.api.java
package, and includes
a JavaSparkContext
for
initializing Spark and JavaRDD
classes,
which support the same methods as their Scala counterparts but take Java functions and return
Java data and collection types. The main differences have to do with passing functions to RDD
operations (e.g. map) and handling RDDs of different types, as discussed next.
Key Differences in the Java API
There are a few key differences between the Java and Scala APIs:
- Java does not support anonymous or first-class functions, so functions are passed
using anonymous classes that implement the
org.apache.spark.api.java.function.Function
,Function2
, etc. interfaces. - To maintain type safety, the Java API defines specialized Function and RDD
classes for key-value pairs and doubles. For example,
JavaPairRDD
stores key-value pairs. - Some methods are defined on the basis of the passed function's return type.
For example
mapToPair()
returnsJavaPairRDD
, andmapToDouble()
returnsJavaDoubleRDD
. - RDD methods like
collect()
andcountByKey()
return Java collections types, such asjava.util.List
andjava.util.Map
. - Key-value pairs, which are simply written as
(key, value)
in Scala, are represented by thescala.Tuple2
class, and need to be created usingnew Tuple2<K, V>(key, value)
.
RDD Classes
Spark defines additional operations on RDDs of key-value pairs and doubles, such
as reduceByKey
, join
, and stdev
.
In the Scala API, these methods are automatically added using Scala's implicit conversions mechanism.
In the Java API, the extra methods are defined in the
JavaPairRDD
and JavaDoubleRDD
classes. RDD methods like map
are overloaded by specialized PairFunction
and DoubleFunction
classes, allowing them to return RDDs of the appropriate
types. Common methods like filter
and sample
are implemented by
each specialized RDD class, so filtering a PairRDD
returns a new PairRDD
,
etc (this achieves the "same-result-type" principle used by the Scala collections
framework).
Function Interfaces
The following table lists the function interfaces used by the Java API, located in the
org.apache.spark.api.java.function
package. Each interface has a single abstract method, call()
.
Class | Function Type |
---|---|
Function<T, R> | T => R |
DoubleFunction<T> | T => Double |
PairFunction<T, K, V> | T => Tuple2<K, V> |
FlatMapFunction<T, R> | T => Iterable<R> |
DoubleFlatMapFunction<T> | T => Iterable<Double> |
PairFlatMapFunction<T, K, V> | T => Iterable<Tuple2<K, V>> |
Function2<T1, T2, R> | T1, T2 => R (function of two arguments) |
Storage Levels
RDD storage level constants, such as MEMORY_AND_DISK
, are
declared in the org.apache.spark.api.java.StorageLevels class. To
define your own storage level, you can use StorageLevels.create(...).
Other Features
The Java API supports other Spark features, including accumulators, broadcast variables, and caching.
Upgrading From Pre-1.0 Versions of Spark
In version 1.0 of Spark the Java API was refactored to better support Java 8 lambda expressions. Users upgrading from older versions of Spark should note the following changes:
- All
org.apache.spark.api.java.function.*
have been changed from abstract classes to interfaces. This means that concrete implementations of theseFunction
classes will need to useimplements
rather thanextends
. - Certain transformation functions now have multiple versions depending
on the return type. In Spark core, the map functions (
map
,flatMap
, andmapPartitions
) have type-specific versions, e.g.mapToPair
andmapToDouble
. Spark Streaming also uses the same approach, e.g.transformToPair
.
Example
As an example, we will implement word count using the Java API.
{% highlight java %} import org.apache.spark.api.java.; import org.apache.spark.api.java.function.;
JavaSparkContext jsc = new JavaSparkContext(...); JavaRDD lines = jsc.textFile("hdfs://..."); JavaRDD words = lines.flatMap( new FlatMapFunction<String, String>() { @Override public Iterable call(String s) { return Arrays.asList(s.split(" ")); } } ); {% endhighlight %}
The word count program starts by creating a JavaSparkContext
, which accepts
the same parameters as its Scala counterpart. JavaSparkContext
supports the
same data loading methods as the regular SparkContext
; here, textFile
loads lines from text files stored in HDFS.
To split the lines into words, we use flatMap
to split each line on
whitespace. flatMap
is passed a FlatMapFunction
that accepts a string and
returns an java.lang.Iterable
of strings.
Here, the FlatMapFunction
was created inline; another option is to subclass
FlatMapFunction
and pass an instance to flatMap
:
{% highlight java %} class Split extends FlatMapFunction<String, String> { @Override public Iterable call(String s) { return Arrays.asList(s.split(" ")); } } JavaRDD words = lines.flatMap(new Split()); {% endhighlight %}
Java 8+ users can also write the above FlatMapFunction
in a more concise way using
a lambda expression:
{% highlight java %} JavaRDD words = lines.flatMap(s -> Arrays.asList(s.split(" "))); {% endhighlight %}
This lambda syntax can be applied to all anonymous classes in Java 8.
Continuing with the word count example, we map each word to a (word, 1)
pair:
{% highlight java %} import scala.Tuple2; JavaPairRDD<String, Integer> ones = words.mapToPair( new PairFunction<String, String, Integer>() { @Override public Tuple2<String, Integer> call(String s) { return new Tuple2<String, Integer>(s, 1); } } ); {% endhighlight %}
Note that mapToPair
was passed a PairFunction<String, String, Integer>
and
returned a JavaPairRDD<String, Integer>
.
To finish the word count program, we will use reduceByKey
to count the
occurrences of each word:
{% highlight java %} JavaPairRDD<String, Integer> counts = ones.reduceByKey( new Function2<Integer, Integer, Integer>() { @Override public Integer call(Integer i1, Integer i2) { return i1 + i2; } } ); {% endhighlight %}
Here, reduceByKey
is passed a Function2
, which implements a function with
two arguments. The resulting JavaPairRDD
contains (word, count)
pairs.
In this example, we explicitly showed each intermediate RDD. It is also possible to chain the RDD transformations, so the word count example could also be written as:
{% highlight java %} JavaPairRDD<String, Integer> counts = lines.flatMapToPair( ... ).map( ... ).reduceByKey( ... ); {% endhighlight %}
There is no performance difference between these approaches; the choice is just a matter of style.
API Docs
API documentation for Spark in Java is available in Javadoc format.
Where to Go from Here
Spark includes several sample programs using the Java API in
examples/src/main/java
. You can run them by passing the class name to the
bin/run-example
script included in Spark; for example:
./bin/run-example org.apache.spark.examples.JavaWordCount
Each example program prints usage help when run without any arguments.