[SPARK-7264][ML] Parallel lapply for sparkR

## What changes were proposed in this pull request?

This PR adds a new function in SparkR called `sparkLapply(list, function)`. This function implements a distributed version of `lapply` using Spark as a backend.

TODO:
 - [x] check documentation
 - [ ] check tests

Trivial example in SparkR:

```R
sparkLapply(1:5, function(x) { 2 * x })
```

Output:

```
[[1]]
[1] 2

[[2]]
[1] 4

[[3]]
[1] 6

[[4]]
[1] 8

[[5]]
[1] 10
```

Here is a slightly more complex example to perform distributed training of multiple models. Under the hood, Spark broadcasts the dataset.

```R
library("MASS")
data(menarche)
families <- c("gaussian", "poisson")
train <- function(family){glm(Menarche ~ Age  , family=family, data=menarche)}
results <- sparkLapply(families, train)
```

## How was this patch tested?

This PR was tested in SparkR. I am unfamiliar with R and SparkR, so any feedback on style, testing, etc. will be much appreciated.

cc falaki davies

Author: Timothy Hunter <timhunter@databricks.com>

Closes #12426 from thunterdb/7264.
This commit is contained in:
Timothy Hunter 2016-04-28 22:42:48 -07:00 committed by Xiangrui Meng
parent 4607f6e7f7
commit 769a909d13
3 changed files with 49 additions and 0 deletions

View file

@ -295,6 +295,7 @@ export("as.DataFrame",
"read.json",
"read.parquet",
"read.text",
"spark.lapply",
"sql",
"str",
"tableToDF",

View file

@ -226,6 +226,48 @@ setCheckpointDir <- function(sc, dirName) {
invisible(callJMethod(sc, "setCheckpointDir", suppressWarnings(normalizePath(dirName))))
}
#' @title Run a function over a list of elements, distributing the computations with Spark.
#'
#' @description
#' Applies a function in a manner that is similar to doParallel or lapply to elements of a list.
#' The computations are distributed using Spark. It is conceptually the same as the following code:
#' lapply(list, func)
#'
#' Known limitations:
#' - variable scoping and capture: compared to R's rich support for variable resolutions, the
# distributed nature of SparkR limits how variables are resolved at runtime. All the variables
# that are available through lexical scoping are embedded in the closure of the function and
# available as read-only variables within the function. The environment variables should be
# stored into temporary variables outside the function, and not directly accessed within the
# function.
#'
#' - loading external packages: In order to use a package, you need to load it inside the
#' closure. For example, if you rely on the MASS module, here is how you would use it:
#'\dontrun{
#' train <- function(hyperparam) {
#' library(MASS)
#' lm.ridge(“y ~ x+z”, data, lambda=hyperparam)
#' model
#' }
#'}
#'
#' @rdname spark.lapply
#' @param sc Spark Context to use
#' @param list the list of elements
#' @param func a function that takes one argument.
#' @return a list of results (the exact type being determined by the function)
#' @export
#' @examples
#'\dontrun{
#' doubled <- spark.lapply(1:10, function(x){2 * x})
#'}
spark.lapply <- function(sc, list, func) {
rdd <- parallelize(sc, list, length(list))
results <- map(rdd, func)
local <- collect(results)
local
}
#' Set new log level
#'
#' Set new log level: "ALL", "DEBUG", "ERROR", "FATAL", "INFO", "OFF", "TRACE", "WARN"

View file

@ -141,3 +141,9 @@ test_that("sparkJars sparkPackages as comma-separated strings", {
expect_that(processSparkJars(f), not(gives_warning()))
expect_match(processSparkJars(f), f)
})
test_that("spark.lapply should perform simple transforms", {
sc <- sparkR.init()
doubled <- spark.lapply(sc, 1:10, function(x) { 2 * x })
expect_equal(doubled, as.list(2 * 1:10))
})