[SPARK-15672][R][DOC] R programming guide update
## What changes were proposed in this pull request? Guide for - UDFs with dapply, dapplyCollect - spark.lapply for running parallel R functions ## How was this patch tested? build locally <img width="654" alt="screen shot 2016-06-14 at 03 12 56" src="https://cloud.githubusercontent.com/assets/3419881/16039344/12a3b6a0-31de-11e6-8d77-fe23308075c0.png"> Author: Kai Jiang <jiangkai@gmail.com> Closes #13660 from vectorijk/spark-15672-R-guide-update.
This commit is contained in:
parent
6f915c9ec2
commit
43b04b7ecb
|
@ -246,7 +246,7 @@ setCheckpointDir <- function(sc, dirName) {
|
||||||
#' \preformatted{
|
#' \preformatted{
|
||||||
#' train <- function(hyperparam) {
|
#' train <- function(hyperparam) {
|
||||||
#' library(MASS)
|
#' library(MASS)
|
||||||
#' lm.ridge(“y ~ x+z”, data, lambda=hyperparam)
|
#' lm.ridge("y ~ x+z", data, lambda=hyperparam)
|
||||||
#' model
|
#' model
|
||||||
#' }
|
#' }
|
||||||
#' }
|
#' }
|
||||||
|
|
|
@ -255,6 +255,83 @@ head(df)
|
||||||
{% endhighlight %}
|
{% endhighlight %}
|
||||||
</div>
|
</div>
|
||||||
|
|
||||||
|
### Applying User-Defined Function
|
||||||
|
In SparkR, we support several kinds of User-Defined Functions:
|
||||||
|
|
||||||
|
#### Run a given function on a large dataset using `dapply` or `dapplyCollect`
|
||||||
|
|
||||||
|
##### dapply
|
||||||
|
Apply a function to each partition of a `SparkDataFrame`. The function to be applied to each partition of the `SparkDataFrame`
|
||||||
|
and should have only one parameter, to which a `data.frame` corresponds to each partition will be passed. The output of function
|
||||||
|
should be a `data.frame`. Schema specifies the row format of the resulting a `SparkDataFrame`. It must match the R function's output.
|
||||||
|
<div data-lang="r" markdown="1">
|
||||||
|
{% highlight r %}
|
||||||
|
|
||||||
|
# Convert waiting time from hours to seconds.
|
||||||
|
# Note that we can apply UDF to DataFrame.
|
||||||
|
schema <- structType(structField("eruptions", "double"), structField("waiting", "double"),
|
||||||
|
structField("waiting_secs", "double"))
|
||||||
|
df1 <- dapply(df, function(x) {x <- cbind(x, x$waiting * 60)}, schema)
|
||||||
|
head(collect(df1))
|
||||||
|
## eruptions waiting waiting_secs
|
||||||
|
##1 3.600 79 4740
|
||||||
|
##2 1.800 54 3240
|
||||||
|
##3 3.333 74 4440
|
||||||
|
##4 2.283 62 3720
|
||||||
|
##5 4.533 85 5100
|
||||||
|
##6 2.883 55 3300
|
||||||
|
{% endhighlight %}
|
||||||
|
</div>
|
||||||
|
|
||||||
|
##### dapplyCollect
|
||||||
|
Like `dapply`, apply a function to each partition of a `SparkDataFrame` and collect the result back. The output of function
|
||||||
|
should be a `data.frame`. But, Schema is not required to be passed. Note that `dapplyCollect` only can be used if the
|
||||||
|
output of UDF run on all the partitions can fit in driver memory.
|
||||||
|
<div data-lang="r" markdown="1">
|
||||||
|
{% highlight r %}
|
||||||
|
|
||||||
|
# Convert waiting time from hours to seconds.
|
||||||
|
# Note that we can apply UDF to DataFrame and return a R's data.frame
|
||||||
|
ldf <- dapplyCollect(
|
||||||
|
df,
|
||||||
|
function(x) {
|
||||||
|
x <- cbind(x, "waiting_secs" = x$waiting * 60)
|
||||||
|
})
|
||||||
|
head(ldf, 3)
|
||||||
|
## eruptions waiting waiting_secs
|
||||||
|
##1 3.600 79 4740
|
||||||
|
##2 1.800 54 3240
|
||||||
|
##3 3.333 74 4440
|
||||||
|
|
||||||
|
{% endhighlight %}
|
||||||
|
</div>
|
||||||
|
|
||||||
|
#### Run local R functions distributed using `spark.lapply`
|
||||||
|
|
||||||
|
##### spark.lapply
|
||||||
|
Similar to `lapply` in native R, `spark.lapply` runs a function over a list of elements and distributes the computations with Spark.
|
||||||
|
Applies a function in a manner that is similar to `doParallel` or `lapply` to elements of a list. The results of all the computations
|
||||||
|
should fit in a single machine. If that is not the case they can do something like `df <- createDataFrame(list)` and then use
|
||||||
|
`dapply`
|
||||||
|
<div data-lang="r" markdown="1">
|
||||||
|
{% highlight r %}
|
||||||
|
|
||||||
|
# Perform distributed training of multiple models with spark.lapply. Here, we pass
|
||||||
|
# a read-only list of arguments which specifies family the generalized linear model should be.
|
||||||
|
families <- c("gaussian", "poisson")
|
||||||
|
train <- function(family) {
|
||||||
|
model <- glm(Sepal.Length ~ Sepal.Width + Species, iris, family = family)
|
||||||
|
summary(model)
|
||||||
|
}
|
||||||
|
# Return a list of model's summaries
|
||||||
|
model.summaries <- spark.lapply(families, train)
|
||||||
|
|
||||||
|
# Print the summary of each model
|
||||||
|
print(model.summaries)
|
||||||
|
|
||||||
|
{% endhighlight %}
|
||||||
|
</div>
|
||||||
|
|
||||||
## Running SQL Queries from SparkR
|
## Running SQL Queries from SparkR
|
||||||
A SparkDataFrame can also be registered as a temporary view in Spark SQL and that allows you to run SQL queries over its data.
|
A SparkDataFrame can also be registered as a temporary view in Spark SQL and that allows you to run SQL queries over its data.
|
||||||
The `sql` function enables applications to run SQL queries programmatically and returns the result as a `SparkDataFrame`.
|
The `sql` function enables applications to run SQL queries programmatically and returns the result as a `SparkDataFrame`.
|
||||||
|
|
Loading…
Reference in a new issue