[SPARK-16112][SPARKR] Programming guide for gapply/gapplyCollect
## What changes were proposed in this pull request? Updates programming guide for spark.gapply/spark.gapplyCollect. Similar to other examples I used `faithful` dataset to demonstrate gapply's functionality. Please, let me know if you prefer another example. ## How was this patch tested? Existing test cases in R Author: Narine Kokhlikyan <narine@slice.com> Closes #14090 from NarineK/gapplyProgGuide.
This commit is contained in:
parent
5ec0d692b0
commit
4167304836
138
docs/sparkr.md
138
docs/sparkr.md
|
@ -272,11 +272,11 @@ In SparkR, we support several kinds of User-Defined Functions:
|
|||
|
||||
##### dapply
|
||||
Apply a function to each partition of a `SparkDataFrame`. The function to be applied to each partition of the `SparkDataFrame`
|
||||
and should have only one parameter, to which a `data.frame` corresponds to each partition will be passed. The output of function
|
||||
should be a `data.frame`. Schema specifies the row format of the resulting a `SparkDataFrame`. It must match the R function's output.
|
||||
and should have only one parameter, to which a `data.frame` corresponds to each partition will be passed. The output of function should be a `data.frame`. Schema specifies the row format of the resulting a `SparkDataFrame`. It must match to [data types](#data-type-mapping-between-r-and-spark) of returned value.
|
||||
|
||||
<div data-lang="r" markdown="1">
|
||||
{% highlight r %}
|
||||
|
||||
# Convert waiting time from hours to seconds.
|
||||
# Note that we can apply UDF to DataFrame.
|
||||
schema <- structType(structField("eruptions", "double"), structField("waiting", "double"),
|
||||
|
@ -295,8 +295,8 @@ head(collect(df1))
|
|||
|
||||
##### dapplyCollect
|
||||
Like `dapply`, apply a function to each partition of a `SparkDataFrame` and collect the result back. The output of function
|
||||
should be a `data.frame`. But, Schema is not required to be passed. Note that `dapplyCollect` only can be used if the
|
||||
output of UDF run on all the partitions can fit in driver memory.
|
||||
should be a `data.frame`. But, Schema is not required to be passed. Note that `dapplyCollect` can fail if the output of UDF run on all the partition cannot be pulled to the driver and fit in driver memory.
|
||||
|
||||
<div data-lang="r" markdown="1">
|
||||
{% highlight r %}
|
||||
|
||||
|
@ -316,6 +316,136 @@ head(ldf, 3)
|
|||
{% endhighlight %}
|
||||
</div>
|
||||
|
||||
#### Run a given function on a large dataset grouping by input column(s) and using `gapply` or `gapplyCollect`
|
||||
|
||||
##### gapply
|
||||
Apply a function to each group of a `SparkDataFrame`. The function is to be applied to each group of the `SparkDataFrame` and should have only two parameters: grouping key and R `data.frame` corresponding to
|
||||
that key. The groups are chosen from `SparkDataFrame`s column(s).
|
||||
The output of function should be a `data.frame`. Schema specifies the row format of the resulting
|
||||
`SparkDataFrame`. It must represent R function's output schema on the basis of Spark data types. The column names of the returned `data.frame` are set by user. Below is the data type mapping between R
|
||||
and Spark.
|
||||
|
||||
#### Data type mapping between R and Spark
|
||||
<table class="table">
|
||||
<tr><th>R</th><th>Spark</th></tr>
|
||||
<tr>
|
||||
<td>byte</td>
|
||||
<td>byte</td>
|
||||
</tr>
|
||||
<tr>
|
||||
<td>integer</td>
|
||||
<td>integer</td>
|
||||
</tr>
|
||||
<tr>
|
||||
<td>float</td>
|
||||
<td>float</td>
|
||||
</tr>
|
||||
<tr>
|
||||
<td>double</td>
|
||||
<td>double</td>
|
||||
</tr>
|
||||
<tr>
|
||||
<td>numeric</td>
|
||||
<td>double</td>
|
||||
</tr>
|
||||
<tr>
|
||||
<td>character</td>
|
||||
<td>string</td>
|
||||
</tr>
|
||||
<tr>
|
||||
<td>string</td>
|
||||
<td>string</td>
|
||||
</tr>
|
||||
<tr>
|
||||
<td>binary</td>
|
||||
<td>binary</td>
|
||||
</tr>
|
||||
<tr>
|
||||
<td>raw</td>
|
||||
<td>binary</td>
|
||||
</tr>
|
||||
<tr>
|
||||
<td>logical</td>
|
||||
<td>boolean</td>
|
||||
</tr>
|
||||
<tr>
|
||||
<td><a href="https://stat.ethz.ch/R-manual/R-devel/library/base/html/DateTimeClasses.html">POSIXct</a></td>
|
||||
<td>timestamp</td>
|
||||
</tr>
|
||||
<tr>
|
||||
<td><a href="https://stat.ethz.ch/R-manual/R-devel/library/base/html/DateTimeClasses.html">POSIXlt</a></td>
|
||||
<td>timestamp</td>
|
||||
</tr>
|
||||
<tr>
|
||||
<td><a href="https://stat.ethz.ch/R-manual/R-devel/library/base/html/Dates.html">Date</a></td>
|
||||
<td>date</td>
|
||||
</tr>
|
||||
<tr>
|
||||
<td>array</td>
|
||||
<td>array</td>
|
||||
</tr>
|
||||
<tr>
|
||||
<td>list</td>
|
||||
<td>array</td>
|
||||
</tr>
|
||||
<tr>
|
||||
<td>env</td>
|
||||
<td>map</td>
|
||||
</tr>
|
||||
</table>
|
||||
|
||||
<div data-lang="r" markdown="1">
|
||||
{% highlight r %}
|
||||
|
||||
# Determine six waiting times with the largest eruption time in minutes.
|
||||
schema <- structType(structField("waiting", "double"), structField("max_eruption", "double"))
|
||||
result <- gapply(
|
||||
df,
|
||||
"waiting",
|
||||
function(key, x) {
|
||||
y <- data.frame(key, max(x$eruptions))
|
||||
},
|
||||
schema)
|
||||
head(collect(arrange(result, "max_eruption", decreasing = TRUE)))
|
||||
|
||||
## waiting max_eruption
|
||||
##1 64 5.100
|
||||
##2 69 5.067
|
||||
##3 71 5.033
|
||||
##4 87 5.000
|
||||
##5 63 4.933
|
||||
##6 89 4.900
|
||||
{% endhighlight %}
|
||||
</div>
|
||||
|
||||
##### gapplyCollect
|
||||
Like `gapply`, applies a function to each partition of a `SparkDataFrame` and collect the result back to R data.frame. The output of the function should be a `data.frame`. But, the schema is not required to be passed. Note that `gapplyCollect` can fail if the output of UDF run on all the partition cannot be pulled to the driver and fit in driver memory.
|
||||
|
||||
<div data-lang="r" markdown="1">
|
||||
{% highlight r %}
|
||||
|
||||
# Determine six waiting times with the largest eruption time in minutes.
|
||||
result <- gapplyCollect(
|
||||
df,
|
||||
"waiting",
|
||||
function(key, x) {
|
||||
y <- data.frame(key, max(x$eruptions))
|
||||
colnames(y) <- c("waiting", "max_eruption")
|
||||
y
|
||||
})
|
||||
head(result[order(result$max_eruption, decreasing = TRUE), ])
|
||||
|
||||
## waiting max_eruption
|
||||
##1 64 5.100
|
||||
##2 69 5.067
|
||||
##3 71 5.033
|
||||
##4 87 5.000
|
||||
##5 63 4.933
|
||||
##6 89 4.900
|
||||
|
||||
{% endhighlight %}
|
||||
</div>
|
||||
|
||||
#### Run local R functions distributed using `spark.lapply`
|
||||
|
||||
##### spark.lapply
|
||||
|
|
Loading…
Reference in a new issue