[SPARK-20437][R] R wrappers for rollup and cube

## What changes were proposed in this pull request?

- Add `rollup` and `cube` methods and corresponding generics.
- Add short description to the vignette.

## How was this patch tested?

- Existing unit tests.
- Additional unit tests covering new features.
- `check-cran.sh`.

Author: zero323 <zero323@users.noreply.github.com>

Closes #17728 from zero323/SPARK-20437.
This commit is contained in:
zero323 2017-04-25 22:00:45 -07:00 committed by Felix Cheung
parent 57e1da3946
commit df58a95a33
6 changed files with 229 additions and 1 deletions

View file

@ -101,6 +101,7 @@ exportMethods("arrange",
"createOrReplaceTempView",
"crossJoin",
"crosstab",
"cube",
"dapply",
"dapplyCollect",
"describe",
@ -143,6 +144,7 @@ exportMethods("arrange",
"registerTempTable",
"rename",
"repartition",
"rollup",
"sample",
"sample_frac",
"sampleBy",

View file

@ -1321,7 +1321,7 @@ setMethod("toRDD",
#' Groups the SparkDataFrame using the specified columns, so we can run aggregation on them.
#'
#' @param x a SparkDataFrame.
#' @param ... variable(s) (character names(s) or Column(s)) to group on.
#' @param ... character name(s) or Column(s) to group on.
#' @return A GroupedData.
#' @family SparkDataFrame functions
#' @aliases groupBy,SparkDataFrame-method
@ -1337,6 +1337,7 @@ setMethod("toRDD",
#' agg(groupBy(df, "department", "gender"), salary="avg", "age" -> "max")
#' }
#' @note groupBy since 1.4.0
#' @seealso \link{agg}, \link{cube}, \link{rollup}
setMethod("groupBy",
signature(x = "SparkDataFrame"),
function(x, ...) {
@ -3642,3 +3643,73 @@ setMethod("checkpoint",
df <- callJMethod(x@sdf, "checkpoint", as.logical(eager))
dataFrame(df)
})
#' cube
#'
#' Create a multi-dimensional cube for the SparkDataFrame using the specified columns.
#'
#' If grouping expression is missing \code{cube} creates a single global aggregate and is equivalent to
#' direct application of \link{agg}.
#'
#' @param x a SparkDataFrame.
#' @param ... character name(s) or Column(s) to group on.
#' @return A GroupedData.
#' @family SparkDataFrame functions
#' @aliases cube,SparkDataFrame-method
#' @rdname cube
#' @name cube
#' @export
#' @examples
#' \dontrun{
#' df <- createDataFrame(mtcars)
#' mean(cube(df, "cyl", "gear", "am"), "mpg")
#'
#' # Following calls are equivalent
#' agg(cube(carsDF), mean(carsDF$mpg))
#' agg(carsDF, mean(carsDF$mpg))
#' }
#' @note cube since 2.3.0
#' @seealso \link{agg}, \link{groupBy}, \link{rollup}
setMethod("cube",
signature(x = "SparkDataFrame"),
function(x, ...) {
cols <- list(...)
jcol <- lapply(cols, function(x) if (class(x) == "Column") x@jc else column(x)@jc)
sgd <- callJMethod(x@sdf, "cube", jcol)
groupedData(sgd)
})
#' rollup
#'
#' Create a multi-dimensional rollup for the SparkDataFrame using the specified columns.
#'
#' If grouping expression is missing \code{rollup} creates a single global aggregate and is equivalent to
#' direct application of \link{agg}.
#'
#' @param x a SparkDataFrame.
#' @param ... character name(s) or Column(s) to group on.
#' @return A GroupedData.
#' @family SparkDataFrame functions
#' @aliases rollup,SparkDataFrame-method
#' @rdname rollup
#' @name rollup
#' @export
#' @examples
#'\dontrun{
#' df <- createDataFrame(mtcars)
#' mean(rollup(df, "cyl", "gear", "am"), "mpg")
#'
#' # Following calls are equivalent
#' agg(rollup(carsDF), mean(carsDF$mpg))
#' agg(carsDF, mean(carsDF$mpg))
#' }
#' @note rollup since 2.3.0
#' @seealso \link{agg}, \link{cube}, \link{groupBy}
setMethod("rollup",
signature(x = "SparkDataFrame"),
function(x, ...) {
cols <- list(...)
jcol <- lapply(cols, function(x) if (class(x) == "Column") x@jc else column(x)@jc)
sgd <- callJMethod(x@sdf, "rollup", jcol)
groupedData(sgd)
})

View file

@ -483,6 +483,10 @@ setGeneric("createOrReplaceTempView",
# @export
setGeneric("crossJoin", function(x, y) { standardGeneric("crossJoin") })
#' @rdname cube
#' @export
setGeneric("cube", function(x, ...) { standardGeneric("cube") })
#' @rdname dapply
#' @export
setGeneric("dapply", function(x, func, schema) { standardGeneric("dapply") })
@ -631,6 +635,10 @@ setGeneric("sample",
standardGeneric("sample")
})
#' @rdname rollup
#' @export
setGeneric("rollup", function(x, ...) { standardGeneric("rollup") })
#' @rdname sample
#' @export
setGeneric("sample_frac",

View file

@ -1816,6 +1816,108 @@ test_that("pivot GroupedData column", {
expect_error(collect(sum(pivot(groupBy(df, "year"), "course", list("R", "R")), "earnings")))
})
test_that("test multi-dimensional aggregations with cube and rollup", {
df <- createDataFrame(data.frame(
id = 1:6,
year = c(2016, 2016, 2016, 2017, 2017, 2017),
salary = c(10000, 15000, 20000, 22000, 32000, 21000),
department = c("management", "rnd", "sales", "management", "rnd", "sales")
))
actual_cube <- collect(
orderBy(
agg(
cube(df, "year", "department"),
expr("sum(salary) AS total_salary"), expr("avg(salary) AS average_salary")
),
"year", "department"
)
)
expected_cube <- data.frame(
year = c(rep(NA, 4), rep(2016, 4), rep(2017, 4)),
department = rep(c(NA, "management", "rnd", "sales"), times = 3),
total_salary = c(
120000, # Total
10000 + 22000, 15000 + 32000, 20000 + 21000, # Department only
20000 + 15000 + 10000, # 2016
10000, 15000, 20000, # 2016 each department
21000 + 32000 + 22000, # 2017
22000, 32000, 21000 # 2017 each department
),
average_salary = c(
# Total
mean(c(20000, 15000, 10000, 21000, 32000, 22000)),
# Mean by department
mean(c(10000, 22000)), mean(c(15000, 32000)), mean(c(20000, 21000)),
mean(c(10000, 15000, 20000)), # 2016
10000, 15000, 20000, # 2016 each department
mean(c(21000, 32000, 22000)), # 2017
22000, 32000, 21000 # 2017 each department
),
stringsAsFactors = FALSE
)
expect_equal(actual_cube, expected_cube)
# cube should accept column objects
expect_equal(
count(sum(cube(df, df$year, df$department), "salary")),
12
)
# cube without columns should result in a single aggregate
expect_equal(
collect(agg(cube(df), expr("sum(salary) as total_salary"))),
data.frame(total_salary = 120000)
)
actual_rollup <- collect(
orderBy(
agg(
rollup(df, "year", "department"),
expr("sum(salary) AS total_salary"), expr("avg(salary) AS average_salary")
),
"year", "department"
)
)
expected_rollup <- data.frame(
year = c(NA, rep(2016, 4), rep(2017, 4)),
department = c(NA, rep(c(NA, "management", "rnd", "sales"), times = 2)),
total_salary = c(
120000, # Total
20000 + 15000 + 10000, # 2016
10000, 15000, 20000, # 2016 each department
21000 + 32000 + 22000, # 2017
22000, 32000, 21000 # 2017 each department
),
average_salary = c(
# Total
mean(c(20000, 15000, 10000, 21000, 32000, 22000)),
mean(c(10000, 15000, 20000)), # 2016
10000, 15000, 20000, # 2016 each department
mean(c(21000, 32000, 22000)), # 2017
22000, 32000, 21000 # 2017 each department
),
stringsAsFactors = FALSE
)
expect_equal(actual_rollup, expected_rollup)
# cube should accept column objects
expect_equal(
count(sum(rollup(df, df$year, df$department), "salary")),
9
)
# rollup without columns should result in a single aggregate
expect_equal(
collect(agg(rollup(df), expr("sum(salary) as total_salary"))),
data.frame(total_salary = 120000)
)
})
test_that("arrange() and orderBy() on a DataFrame", {
df <- read.json(jsonPath)
sorted <- arrange(df, df$age)

View file

@ -308,6 +308,21 @@ numCyl <- summarize(groupBy(carsDF, carsDF$cyl), count = n(carsDF$cyl))
head(numCyl)
```
Use `cube` or `rollup` to compute subtotals across multiple dimensions.
```{r}
mean(cube(carsDF, "cyl", "gear", "am"), "mpg")
```
generates groupings for {(`cyl`, `gear`, `am`), (`cyl`, `gear`), (`cyl`), ()}, while
```{r}
mean(rollup(carsDF, "cyl", "gear", "am"), "mpg")
```
generates groupings for all possible combinations of grouping columns.
#### Operating on Columns
SparkR also provides a number of functions that can directly applied to columns for data processing and during aggregation. The example below shows the use of basic arithmetic functions.

View file

@ -264,6 +264,36 @@ head(arrange(waiting_counts, desc(waiting_counts$count)))
{% endhighlight %}
</div>
In addition to standard aggregations, SparkR supports [OLAP cube](https://en.wikipedia.org/wiki/OLAP_cube) operators `cube`:
<div data-lang="r" markdown="1">
{% highlight r %}
head(agg(cube(df, "cyl", "disp", "gear"), avg(df$mpg)))
## cyl disp gear avg(mpg)
##1 NA 140.8 4 22.8
##2 4 75.7 4 30.4
##3 8 400.0 3 19.2
##4 8 318.0 3 15.5
##5 NA 351.0 NA 15.8
##6 NA 275.8 NA 16.3
{% endhighlight %}
</div>
and `rollup`:
<div data-lang="r" markdown="1">
{% highlight r %}
head(agg(rollup(df, "cyl", "disp", "gear"), avg(df$mpg)))
## cyl disp gear avg(mpg)
##1 4 75.7 4 30.4
##2 8 400.0 3 19.2
##3 8 318.0 3 15.5
##4 4 78.7 NA 32.4
##5 8 304.0 3 15.2
##6 4 79.0 NA 27.3
{% endhighlight %}
</div>
### Operating on Columns
SparkR also provides a number of functions that can directly applied to columns for data processing and during aggregation. The example below shows the use of basic arithmetic functions.