spark-instrumented-optimizer/docs/java-programming-guide.md
Prashant Sharma 181ec50307 [java8API] SPARK-964 Investigate the potential for using JDK 8 lambda expressions for the Java/Scala APIs
Author: Prashant Sharma <prashant.s@imaginea.com>
Author: Patrick Wendell <pwendell@gmail.com>

Closes #17 from ScrapCodes/java8-lambdas and squashes the following commits:

95850e6 [Patrick Wendell] Some doc improvements and build changes to the Java 8 patch.
85a954e [Prashant Sharma] Nit. import orderings.
673f7ac [Prashant Sharma] Added support for -java-home as well
80a13e8 [Prashant Sharma] Used fake class tag syntax
26eb3f6 [Prashant Sharma] Patrick's comments on PR.
35d8d79 [Prashant Sharma] Specified java 8 building in the docs
31d4cd6 [Prashant Sharma] Maven build to support -Pjava8-tests flag.
4ab87d3 [Prashant Sharma] Review feedback on the pr
c33dc2c [Prashant Sharma] SPARK-964, Java 8 API Support.
2014-03-03 22:31:30 -08:00

9.9 KiB

layout title
global Java Programming Guide

The Spark Java API exposes all the Spark features available in the Scala version to Java. To learn the basics of Spark, we recommend reading through the Scala programming guide first; it should be easy to follow even if you don't know Scala. This guide will show how to use the Spark features described there in Java.

The Spark Java API is defined in the org.apache.spark.api.java package, and includes a JavaSparkContext for initializing Spark and JavaRDD classes, which support the same methods as their Scala counterparts but take Java functions and return Java data and collection types. The main differences have to do with passing functions to RDD operations (e.g. map) and handling RDDs of different types, as discussed next.

Key Differences in the Java API

There are a few key differences between the Java and Scala APIs:

  • Java does not support anonymous or first-class functions, so functions are passed using anonymous classes that implement the org.apache.spark.api.java.function.Function, Function2, etc. interfaces.
  • To maintain type safety, the Java API defines specialized Function and RDD classes for key-value pairs and doubles. For example, JavaPairRDD stores key-value pairs.
  • Some methods are defined on the basis of the passed anonymous function's (a.k.a lambda expression) return type, for example mapToPair(...) or flatMapToPair returns JavaPairRDD, similarly mapToDouble and flatMapToDouble returns JavaDoubleRDD.
  • RDD methods like collect() and countByKey() return Java collections types, such as java.util.List and java.util.Map.
  • Key-value pairs, which are simply written as (key, value) in Scala, are represented by the scala.Tuple2 class, and need to be created using new Tuple2<K, V>(key, value).

RDD Classes

Spark defines additional operations on RDDs of key-value pairs and doubles, such as reduceByKey, join, and stdev.

In the Scala API, these methods are automatically added using Scala's implicit conversions mechanism.

In the Java API, the extra methods are defined in the JavaPairRDD and JavaDoubleRDD classes. RDD methods like map are overloaded by specialized PairFunction and DoubleFunction classes, allowing them to return RDDs of the appropriate types. Common methods like filter and sample are implemented by each specialized RDD class, so filtering a PairRDD returns a new PairRDD, etc (this acheives the "same-result-type" principle used by the Scala collections framework).

Function Interfaces

The following table lists the function interfaces used by the Java API. Each interface has a single abstract method, call(), that must be implemented.

ClassFunction Type
Function<T, R>T => R
DoubleFunction<T>T => Double
PairFunction<T, K, V>T => Tuple2<K, V>
FlatMapFunction<T, R>T => Iterable<R>
DoubleFlatMapFunction<T>T => Iterable<Double>
PairFlatMapFunction<T, K, V>T => Iterable<Tuple2<K, V>>
Function2<T1, T2, R>T1, T2 => R (function of two arguments)

Storage Levels

RDD storage level constants, such as MEMORY_AND_DISK, are declared in the org.apache.spark.api.java.StorageLevels class. To define your own storage level, you can use StorageLevels.create(...).

Other Features

The Java API supports other Spark features, including accumulators, broadcast variables, and caching.

Upgrading From Pre-1.0 Versions of Spark

In version 1.0 of Spark the Java API was refactored to better support Java 8 lambda expressions. Users upgrading from older versions of Spark should note the following changes:

  • All org.apache.spark.api.java.function.* have been changed from abstract classes to interfaces. This means that concrete implementations of these Function classes will need to use implements rather than extends.
  • Certain transformation functions now have multiple versions depending on the return type. In Spark core, the map functions (map, flatMap, mapPartitons) have type-specific versions, e.g. mapToPair and mapToDouble. Spark Streaming also uses the same approach, e.g. transformToPair.

Example

As an example, we will implement word count using the Java API.

{% highlight java %} import org.apache.spark.api.java.; import org.apache.spark.api.java.function.;

JavaSparkContext sc = new JavaSparkContext(...); JavaRDD lines = ctx.textFile("hdfs://..."); JavaRDD words = lines.flatMap( new FlatMapFunction<String, String>() { public Iterable call(String s) { return Arrays.asList(s.split(" ")); } } ); {% endhighlight %}

The word count program starts by creating a JavaSparkContext, which accepts the same parameters as its Scala counterpart. JavaSparkContext supports the same data loading methods as the regular SparkContext; here, textFile loads lines from text files stored in HDFS.

To split the lines into words, we use flatMap to split each line on whitespace. flatMap is passed a FlatMapFunction that accepts a string and returns an java.lang.Iterable of strings.

Here, the FlatMapFunction was created inline; another option is to subclass FlatMapFunction and pass an instance to flatMap:

{% highlight java %} class Split extends FlatMapFunction<String, String> { public Iterable call(String s) { return Arrays.asList(s.split(" ")); } ); JavaRDD words = lines.flatMap(new Split()); {% endhighlight %}

Java 8+ users can also write the above FlatMapFunction in a more concise way using a lambda expression:

{% highlight java %} JavaRDD words = lines.flatMap(s -> Arrays.asList(s.split(" "))); {% endhighlight %}

This lambda syntax can be applied to all anonymous classes in Java 8.

Continuing with the word count example, we map each word to a (word, 1) pair:

{% highlight java %} import scala.Tuple2; JavaPairRDD<String, Integer> ones = words.mapToPair( new PairFunction<String, String, Integer>() { public Tuple2<String, Integer> call(String s) { return new Tuple2(s, 1); } } ); {% endhighlight %}

Note that mapToPair was passed a PairFunction<String, String, Integer> and returned a JavaPairRDD<String, Integer>.

To finish the word count program, we will use reduceByKey to count the occurrences of each word:

{% highlight java %} JavaPairRDD<String, Integer> counts = ones.reduceByKey( new Function2<Integer, Integer, Integer>() { public Integer call(Integer i1, Integer i2) { return i1 + i2; } } ); {% endhighlight %}

Here, reduceByKey is passed a Function2, which implements a function with two arguments. The resulting JavaPairRDD contains (word, count) pairs.

In this example, we explicitly showed each intermediate RDD. It is also possible to chain the RDD transformations, so the word count example could also be written as:

{% highlight java %} JavaPairRDD<String, Integer> counts = lines.flatMapToPair( ... ).map( ... ).reduceByKey( ... ); {% endhighlight %}

There is no performance difference between these approaches; the choice is just a matter of style.

Javadoc

We currently provide documentation for the Java API as Scaladoc, in the org.apache.spark.api.java package, because some of the classes are implemented in Scala. It is important to note that the types and function definitions show Scala syntax (for example, def reduce(func: Function2[T, T]): T instead of T reduce(Function2<T, T> func)). In addition, the Scala trait modifier is used for Java interface classes. We hope to generate documentation with Java-style syntax in the future to avoid these quirks.

Where to Go from Here

Spark includes several sample programs using the Java API in examples/src/main/java. You can run them by passing the class name to the bin/run-example script included in Spark; for example:

./bin/run-example org.apache.spark.examples.JavaWordCount

Each example program prints usage help when run without any arguments.