cf4122e4d4
This PR adds a new SparkR programming guide at the top-level. This will be useful for R users as our APIs don't directly match the Scala/Python APIs and as we need to explain SparkR without using RDDs as examples etc.
cc rxin davies pwendell
cc cafreeman -- Would be great if you could also take a look at this !
Author: Shivaram Venkataraman <shivaram@cs.berkeley.edu>
Closes #6490 from shivaram/sparkr-guide and squashes the following commits:
d5ff360 [Shivaram Venkataraman] Add a section on HiveContext, HQL queries
408dce5 [Shivaram Venkataraman] Fix link
dbb86e3 [Shivaram Venkataraman] Fix minor typo
9aff5e0 [Shivaram Venkataraman] Address comments, use dplyr-like syntax in example
d09703c [Shivaram Venkataraman] Fix default argument in read.df
ea816a1 [Shivaram Venkataraman] Add a new SparkR programming guide Also update write.df, read.df to handle defaults better
(cherry picked from commit 5f48e5c33b
)
Signed-off-by: Davies Liu <davies@databricks.com>
224 lines
8.2 KiB
Markdown
224 lines
8.2 KiB
Markdown
---
|
|
layout: global
|
|
displayTitle: SparkR (R on Spark)
|
|
title: SparkR (R on Spark)
|
|
---
|
|
|
|
* This will become a table of contents (this text will be scraped).
|
|
{:toc}
|
|
|
|
# Overview
|
|
SparkR is an R package that provides a light-weight frontend to use Apache Spark from R.
|
|
In Spark {{site.SPARK_VERSION}}, SparkR provides a distributed data frame implementation that
|
|
supports operations like selection, filtering, aggregation etc. (similar to R data frames,
|
|
[dplyr](https://github.com/hadley/dplyr)) but on large datasets.
|
|
|
|
# SparkR DataFrames
|
|
|
|
A DataFrame is a distributed collection of data organized into named columns. It is conceptually
|
|
equivalent to a table in a relational database or a data frame in R, but with richer
|
|
optimizations under the hood. DataFrames can be constructed from a wide array of sources such as:
|
|
structured data files, tables in Hive, external databases, or existing local R data frames.
|
|
|
|
All of the examples on this page use sample data included in R or the Spark distribution and can be run using the `./bin/sparkR` shell.
|
|
|
|
## Starting Up: SparkContext, SQLContext
|
|
|
|
<div data-lang="r" markdown="1">
|
|
The entry point into SparkR is the `SparkContext` which connects your R program to a Spark cluster.
|
|
You can create a `SparkContext` using `sparkR.init` and pass in options such as the application name
|
|
etc. Further, to work with DataFrames we will need a `SQLContext`, which can be created from the
|
|
SparkContext. If you are working from the SparkR shell, the `SQLContext` and `SparkContext` should
|
|
already be created for you.
|
|
|
|
{% highlight r %}
|
|
sc <- sparkR.init()
|
|
sqlContext <- sparkRSQL.init(sc)
|
|
{% endhighlight %}
|
|
|
|
</div>
|
|
|
|
## Creating DataFrames
|
|
With a `SQLContext`, applications can create `DataFrame`s from a local R data frame, from a [Hive table](sql-programming-guide.html#hive-tables), or from other [data sources](sql-programming-guide.html#data-sources).
|
|
|
|
### From local data frames
|
|
The simplest way to create a data frame is to convert a local R data frame into a SparkR DataFrame. Specifically we can use `createDataFrame` and pass in the local R data frame to create a SparkR DataFrame. As an example, the following creates a `DataFrame` based using the `faithful` dataset from R.
|
|
|
|
<div data-lang="r" markdown="1">
|
|
{% highlight r %}
|
|
df <- createDataFrame(sqlContext, faithful)
|
|
|
|
# Displays the content of the DataFrame to stdout
|
|
head(df)
|
|
## eruptions waiting
|
|
##1 3.600 79
|
|
##2 1.800 54
|
|
##3 3.333 74
|
|
|
|
{% endhighlight %}
|
|
</div>
|
|
|
|
### From Data Sources
|
|
|
|
SparkR supports operating on a variety of data sources through the `DataFrame` interface. This section describes the general methods for loading and saving data using Data Sources. You can check the Spark SQL programming guide for more [specific options](sql-programming-guide.html#manually-specifying-options) that are available for the built-in data sources.
|
|
|
|
The general method for creating DataFrames from data sources is `read.df`. This method takes in the `SQLContext`, the path for the file to load and the type of data source. SparkR supports reading JSON and Parquet files natively and through [Spark Packages](http://spark-packages.org/) you can find data source connectors for popular file formats like [CSV](http://spark-packages.org/package/databricks/spark-csv) and [Avro](http://spark-packages.org/package/databricks/spark-avro).
|
|
|
|
We can see how to use data sources using an example JSON input file. Note that the file that is used here is _not_ a typical JSON file. Each line in the file must contain a separate, self-contained valid JSON object. As a consequence, a regular multi-line JSON file will most often fail.
|
|
|
|
<div data-lang="r" markdown="1">
|
|
|
|
{% highlight r %}
|
|
people <- read.df(sqlContext, "./examples/src/main/resources/people.json", "json")
|
|
head(people)
|
|
## age name
|
|
##1 NA Michael
|
|
##2 30 Andy
|
|
##3 19 Justin
|
|
|
|
# SparkR automatically infers the schema from the JSON file
|
|
printSchema(people)
|
|
# root
|
|
# |-- age: integer (nullable = true)
|
|
# |-- name: string (nullable = true)
|
|
|
|
{% endhighlight %}
|
|
</div>
|
|
|
|
The data sources API can also be used to save out DataFrames into multiple file formats. For example we can save the DataFrame from the previous example
|
|
to a Parquet file using `write.df`
|
|
|
|
<div data-lang="r" markdown="1">
|
|
{% highlight r %}
|
|
write.df(people, path="people.parquet", source="parquet", mode="overwrite")
|
|
{% endhighlight %}
|
|
</div>
|
|
|
|
### From Hive tables
|
|
|
|
You can also create SparkR DataFrames from Hive tables. To do this we will need to create a HiveContext which can access tables in the Hive MetaStore. Note that Spark should have been built with [Hive support](building-spark.html#building-with-hive-and-jdbc-support) and more details on the difference between SQLContext and HiveContext can be found in the [SQL programming guide](sql-programming-guide.html#starting-point-sqlcontext).
|
|
|
|
<div data-lang="r" markdown="1">
|
|
{% highlight r %}
|
|
# sc is an existing SparkContext.
|
|
hiveContext <- sparkRHive.init(sc)
|
|
|
|
sql(hiveContext, "CREATE TABLE IF NOT EXISTS src (key INT, value STRING)")
|
|
sql(hiveContext, "LOAD DATA LOCAL INPATH 'examples/src/main/resources/kv1.txt' INTO TABLE src")
|
|
|
|
# Queries can be expressed in HiveQL.
|
|
results <- hiveContext.sql("FROM src SELECT key, value")
|
|
|
|
# results is now a DataFrame
|
|
head(results)
|
|
## key value
|
|
## 1 238 val_238
|
|
## 2 86 val_86
|
|
## 3 311 val_311
|
|
|
|
{% endhighlight %}
|
|
</div>
|
|
|
|
## DataFrame Operations
|
|
|
|
SparkR DataFrames support a number of functions to do structured data processing.
|
|
Here we include some basic examples and a complete list can be found in the [API](api/R/index.html) docs:
|
|
|
|
### Selecting rows, columns
|
|
|
|
<div data-lang="r" markdown="1">
|
|
{% highlight r %}
|
|
# Create the DataFrame
|
|
df <- createDataFrame(sqlContext, faithful)
|
|
|
|
# Get basic information about the DataFrame
|
|
df
|
|
## DataFrame[eruptions:double, waiting:double]
|
|
|
|
# Select only the "eruptions" column
|
|
head(select(df, df$eruptions))
|
|
## eruptions
|
|
##1 3.600
|
|
##2 1.800
|
|
##3 3.333
|
|
|
|
# You can also pass in column name as strings
|
|
head(select(df, "eruptions"))
|
|
|
|
# Filter the DataFrame to only retain rows with wait times shorter than 50 mins
|
|
head(filter(df, df$waiting < 50))
|
|
## eruptions waiting
|
|
##1 1.750 47
|
|
##2 1.750 47
|
|
##3 1.867 48
|
|
|
|
{% endhighlight %}
|
|
|
|
</div>
|
|
|
|
### Grouping, Aggregation
|
|
|
|
SparkR data frames support a number of commonly used functions to aggregate data after grouping. For example we can compute a histogram of the `waiting` time in the `faithful` dataset as shown below
|
|
|
|
<div data-lang="r" markdown="1">
|
|
{% highlight r %}
|
|
|
|
# We use the `n` operator to count the number of times each waiting time appears
|
|
head(summarize(groupBy(df, df$waiting), count = n(df$waiting)))
|
|
## waiting count
|
|
##1 81 13
|
|
##2 60 6
|
|
##3 68 1
|
|
|
|
# We can also sort the output from the aggregation to get the most common waiting times
|
|
waiting_counts <- summarize(groupBy(df, df$waiting), count = n(df$waiting))
|
|
head(arrange(waiting_counts, desc(waiting_counts$count)))
|
|
|
|
## waiting count
|
|
##1 78 15
|
|
##2 83 14
|
|
##3 81 13
|
|
|
|
{% endhighlight %}
|
|
</div>
|
|
|
|
### Operating on Columns
|
|
|
|
SparkR also provides a number of functions that can directly applied to columns for data processing and during aggregation. The example below shows the use of basic arithmetic functions.
|
|
|
|
<div data-lang="r" markdown="1">
|
|
{% highlight r %}
|
|
|
|
# Convert waiting time from hours to seconds.
|
|
# Note that we can assign this to a new column in the same DataFrame
|
|
df$waiting_secs <- df$waiting * 60
|
|
head(df)
|
|
## eruptions waiting waiting_secs
|
|
##1 3.600 79 4740
|
|
##2 1.800 54 3240
|
|
##3 3.333 74 4440
|
|
|
|
{% endhighlight %}
|
|
</div>
|
|
|
|
## Running SQL Queries from SparkR
|
|
A SparkR DataFrame can also be registered as a temporary table in Spark SQL and registering a DataFrame as a table allows you to run SQL queries over its data.
|
|
The `sql` function enables applications to run SQL queries programmatically and returns the result as a `DataFrame`.
|
|
|
|
<div data-lang="r" markdown="1">
|
|
{% highlight r %}
|
|
# Load a JSON file
|
|
people <- read.df(sqlContext, "./examples/src/main/resources/people.json", "json")
|
|
|
|
# Register this DataFrame as a table.
|
|
registerTempTable(people, "people")
|
|
|
|
# SQL statements can be run by using the sql method
|
|
teenagers <- sql(sqlContext, "SELECT name FROM people WHERE age >= 13 AND age <= 19")
|
|
head(teenagers)
|
|
## name
|
|
##1 Justin
|
|
|
|
{% endhighlight %}
|
|
</div>
|