---
layout: global
title: Spark Programming Guide
---
At a high level, every Spark application consists of a *driver program* that runs the user's `main` function and executes various *parallel operations* on a cluster. The main abstraction Spark provides is a *distributed dataset*, which is a collection of elements partitioned across the nodes of the cluster that can be operated on in parallel. Distributed datasets are created by starting with a file in the Hadoop file system (HDFS) or an existing collection in the driver program and possibly transforming it. Users may also ask Spark to *cache* a dataset in memory, allowing it to be reused efficiently across parallel operations. Finally, distributed datasets automatically recover from node failures.
A second abstraction in Spark is *shared variables* that can be used in parallel operations. By default, when Spark runs a function in parallel as a set of tasks on different nodes, it ships a copy of each variable used in the function to each task. Sometimes, a variable needs to be shared across tasks, or between tasks and the driver program. Spark supports two types of shared variables: *broadcast variables*, which can be used to cache a value in memory on all nodes, and *accumulators*, which are variables that are only "added" to, such as counters and sums.
This guide shows each of these features and walks through some samples. It assumes some familiarity with Scala, especially with the syntax for [closures](http://www.scala-lang.org/node/133). Note that Spark can be run interactively through the `spark-shell` interpreter. You might want to do that to follow along!
# Linking with Spark
To write a Spark application, you will need to add both Spark and its dependencies to your CLASSPATH. The easiest way to do this is to run `sbt/sbt assembly` to build both Spark and its dependencies into one JAR (`core/target/spark-core-assembly-0.5.0.jar`), then add this to your CLASSPATH. Alternatively, you can publish Spark to the Maven cache on your machine using `sbt/sbt publish-local`. It will be an artifact called `spark-core` under the organization `org.spark-project`.
In addition, you'll need to import some Spark classes and implicit conversions. Add the following lines at the top of your program:
import spark.SparkContext
import SparkContext._
# Initializing Spark
The first thing a Spark program must do is to create a `SparkContext` object, which tells Spark how to access a cluster.
This is done through the following constructor:
new SparkContext(master, jobName, [sparkHome], [jars])
The `master` parameter is a string specifying a [Mesos]({{HOME_PATH}}running-on-mesos.html) cluster to connect to, or a special "local" string to run in local mode, as described below. `jobName` is a name for your job, which will be shown in the Mesos web UI when running on a cluster. Finally, the last two parameters are needed to deploy your code to a cluster if running on Mesos, as described later.
In the Spark interpreter, a special interpreter-aware SparkContext is already created for you, in the variable called `sc`. Making your own SparkContext will not work. You can set which master the context connects to using the `MASTER` environment variable. For example, run `MASTER=local[4] ./spark-shell` to run locally with four cores.
### Master Names
The master name can be in one of three formats:
Transformation | Meaning |
map(func) | Return a new distributed dataset formed by passing each element of the source through a function func. |
filter(func) | Return a new dataset formed by selecting those elements of the source on which func returns true. |
flatMap(func) | Similar to map, but each input item can be mapped to 0 or more output items (so func should return a Seq rather than a single item). |
sample(withReplacement, frac, seed) | Sample a fraction frac of the data, with or without replacement, using a given random number seed. |
union(otherDataset) | Return a new dataset that contains the union of the elements in the source dataset and the argument. |
groupByKey([numTasks]) | When called on a dataset of (K, V) pairs, returns a dataset of (K, Seq[V]) pairs.
Note: By default, this uses only 8 parallel tasks to do the grouping. You can pass an optional numTasks argument to set a different number of tasks.
|
reduceByKey(func, [numTasks]) | When called on a dataset of (K, V) pairs, returns a dataset of (K, V) pairs where the values for each key are aggregated using the given reduce function. Like in groupByKey , the number of reduce tasks is configurable through an optional second argument. |
join(otherDataset, [numTasks]) | When called on datasets of type (K, V) and (K, W), returns a dataset of (K, (V, W)) pairs with all pairs of elements for each key. |
groupWith(otherDataset, [numTasks]) | When called on datasets of type (K, V) and (K, W), returns a dataset of (K, Seq[V], Seq[W]) tuples. This operation is also called CoGroup in other frameworks. |
cartesian(otherDataset) | When called on datasets of types T and U, returns a dataset of (T, U) pairs (all pairs of elements). |
sortByKey([ascendingOrder]) | When called on a dataset of (K, V) pairs where K implements Ordered, returns a dataset of (K, V) pairs sorted by keys in ascending or descending order, as specified in the boolean ascendingOrder argument. |
### Actions
Action | Meaning |
reduce(func) | Aggregate the elements of the dataset using a function func (which takes two arguments and returns one). The function should be associative so that it can be computed correctly in parallel. |
collect() | Return all the elements of the dataset as an array at the driver program. This is usually useful after a filter or other operation that returns a sufficiently small subset of the data. |
count() | Return the number of elements in the dataset. |
take(n) | Return an array with the first n elements of the dataset. Note that this is currently not executed in parallel. Instead, the driver program computes all the elements. |
first() | Return the first element of the dataset (similar to take(1)). |
saveAsTextFile(path) | Write the elements of the dataset as a text file (or set of text files) in a given directory in the local filesystem, HDFS or any other Hadoop-supported file system. Spark will call toString on each element to convert it to a line of text in the file. |
saveAsSequenceFile(path) | Write the elements of the dataset as a Hadoop SequenceFile in a given path in the local filesystem, HDFS or any other Hadoop-supported file system. This is only available on RDDs of key-value pairs that either implement Hadoop's Writable interface or are implicitly convertible to Writable (Spark includes conversions for basic types like Int, Double, String, etc). |
foreach(func) | Run a function func on each element of the dataset. This is usually done for side effects such as updating an accumulator variable (see below) or interacting with external storage systems. |
### Caching
Calling `cache()` on an RDD asks that it be stored in memory after the first time it is computed. Different partitions of the dataset will be stored on the different cluster nodes that computed them, making subsequent uses of the dataset faster. The cache is fault-tolerant -- if any partition of an RDD is lost, it will be recomputed using the transformations that originally created it.
# Shared Variables
Normally, when a function passed to a Spark operation (such as `map` or `reduce`) is executed on a remote cluster node, it works on separate copies of all the variables used in the function. These variables are copied to each machine, and no updates to the variables on the remote machine are propagated back to the driver program. Supporting general, read-write shared variables across tasks would be inefficient. However, Spark does provide two limited types of *shared variables* for two common usage patterns: broadcast variables and accumulators.
## Broadcast Variables
Broadcast variables allow the programmer to keep a read-only variable cached on each machine rather than shipping a copy of it with tasks. They can be used, for example, to give every node a copy of a large input dataset in an efficient manner. Spark also attempts to distribute broadcast variables using efficient broadcast algorithms to reduce communication cost.
Broadcast variables are created from a variable `v` by calling `SparkContext.broadcast(v)`. The broadcast variable is a wrapper around `v`, and its value can be accessed by calling the `value` method. The interpreter session below shows this:
scala> val broadcastVar = sc.broadcast(Array(1, 2, 3))
broadcastVar: spark.Broadcast[Array[Int]] = spark.Broadcast(b5c40191-a864-4c7d-b9bf-d87e1a4e787c)
scala> broadcastVar.value
res0: Array[Int] = Array(1, 2, 3)
After the broadcast variable is created, it should be used instead of the value `v` in any functions run on the cluster so that `v` is not shipped to the nodes more than once. In addition, the object `v` should not be modified after it is broadcast in order to ensure that all nodes get the same value of the broadcast variable (e.g. if the variable is shipped to a new node later).
## Accumulators
Accumulators are variables that are only "added" to through an associative operation and can therefore be efficiently supported in parallel. They can be used to implement counters (as in MapReduce) or sums. Spark natively supports accumulators of type Int and Double, and programmers can add support for new types.
An accumulator is created from an initial value `v` by calling `SparkContext.accumulator(v)`. Tasks running on the cluster can then add to it using the `+=` operator. However, they cannot read its value. Only the driver program can read the accumulator's value, using its `value` method.
The interpreter session below shows an accumulator being used to add up the elements of an array:
scala> val accum = sc.accumulator(0)
accum: spark.Accumulator[Int] = 0
scala> sc.parallelize(Array(1, 2, 3, 4)).foreach(x => accum += x)
...
10/09/29 18:41:08 INFO SparkContext: Tasks finished in 0.317106 s
scala> accum.value
res2: Int = 10
# Where to Go from Here
You can see some [example Spark programs](http://www.spark-project.org/examples.html) on the Spark website.
In addition, Spark includes several sample jobs in `examples/src/main/scala`. Some of them have both Spark versions and local (non-parallel) versions, allowing you to see what had to be changed to make the program run on a cluster. You can run them using by passing the class name to the `run` script included in Spark -- for example, `./run spark.examples.SparkPi`. Each example program prints usage help when run without any arguments.