From df58a95a33b739462dbe84e098839af2a8643d45 Mon Sep 17 00:00:00 2001 From: zero323 Date: Tue, 25 Apr 2017 22:00:45 -0700 Subject: [PATCH] [SPARK-20437][R] R wrappers for rollup and cube ## What changes were proposed in this pull request? - Add `rollup` and `cube` methods and corresponding generics. - Add short description to the vignette. ## How was this patch tested? - Existing unit tests. - Additional unit tests covering new features. - `check-cran.sh`. Author: zero323 Closes #17728 from zero323/SPARK-20437. --- R/pkg/NAMESPACE | 2 + R/pkg/R/DataFrame.R | 73 +++++++++++++++- R/pkg/R/generics.R | 8 ++ R/pkg/inst/tests/testthat/test_sparkSQL.R | 102 ++++++++++++++++++++++ R/pkg/vignettes/sparkr-vignettes.Rmd | 15 ++++ docs/sparkr.md | 30 +++++++ 6 files changed, 229 insertions(+), 1 deletion(-) diff --git a/R/pkg/NAMESPACE b/R/pkg/NAMESPACE index 95d5cc6d1c..2800461658 100644 --- a/R/pkg/NAMESPACE +++ b/R/pkg/NAMESPACE @@ -101,6 +101,7 @@ exportMethods("arrange", "createOrReplaceTempView", "crossJoin", "crosstab", + "cube", "dapply", "dapplyCollect", "describe", @@ -143,6 +144,7 @@ exportMethods("arrange", "registerTempTable", "rename", "repartition", + "rollup", "sample", "sample_frac", "sampleBy", diff --git a/R/pkg/R/DataFrame.R b/R/pkg/R/DataFrame.R index 88a138fd8e..cd6f03a13d 100644 --- a/R/pkg/R/DataFrame.R +++ b/R/pkg/R/DataFrame.R @@ -1321,7 +1321,7 @@ setMethod("toRDD", #' Groups the SparkDataFrame using the specified columns, so we can run aggregation on them. #' #' @param x a SparkDataFrame. -#' @param ... variable(s) (character names(s) or Column(s)) to group on. +#' @param ... character name(s) or Column(s) to group on. #' @return A GroupedData. #' @family SparkDataFrame functions #' @aliases groupBy,SparkDataFrame-method @@ -1337,6 +1337,7 @@ setMethod("toRDD", #' agg(groupBy(df, "department", "gender"), salary="avg", "age" -> "max") #' } #' @note groupBy since 1.4.0 +#' @seealso \link{agg}, \link{cube}, \link{rollup} setMethod("groupBy", signature(x = "SparkDataFrame"), function(x, ...) { @@ -3642,3 +3643,73 @@ setMethod("checkpoint", df <- callJMethod(x@sdf, "checkpoint", as.logical(eager)) dataFrame(df) }) + +#' cube +#' +#' Create a multi-dimensional cube for the SparkDataFrame using the specified columns. +#' +#' If grouping expression is missing \code{cube} creates a single global aggregate and is equivalent to +#' direct application of \link{agg}. +#' +#' @param x a SparkDataFrame. +#' @param ... character name(s) or Column(s) to group on. +#' @return A GroupedData. +#' @family SparkDataFrame functions +#' @aliases cube,SparkDataFrame-method +#' @rdname cube +#' @name cube +#' @export +#' @examples +#' \dontrun{ +#' df <- createDataFrame(mtcars) +#' mean(cube(df, "cyl", "gear", "am"), "mpg") +#' +#' # Following calls are equivalent +#' agg(cube(carsDF), mean(carsDF$mpg)) +#' agg(carsDF, mean(carsDF$mpg)) +#' } +#' @note cube since 2.3.0 +#' @seealso \link{agg}, \link{groupBy}, \link{rollup} +setMethod("cube", + signature(x = "SparkDataFrame"), + function(x, ...) { + cols <- list(...) + jcol <- lapply(cols, function(x) if (class(x) == "Column") x@jc else column(x)@jc) + sgd <- callJMethod(x@sdf, "cube", jcol) + groupedData(sgd) + }) + +#' rollup +#' +#' Create a multi-dimensional rollup for the SparkDataFrame using the specified columns. +#' +#' If grouping expression is missing \code{rollup} creates a single global aggregate and is equivalent to +#' direct application of \link{agg}. +#' +#' @param x a SparkDataFrame. +#' @param ... character name(s) or Column(s) to group on. +#' @return A GroupedData. +#' @family SparkDataFrame functions +#' @aliases rollup,SparkDataFrame-method +#' @rdname rollup +#' @name rollup +#' @export +#' @examples +#'\dontrun{ +#' df <- createDataFrame(mtcars) +#' mean(rollup(df, "cyl", "gear", "am"), "mpg") +#' +#' # Following calls are equivalent +#' agg(rollup(carsDF), mean(carsDF$mpg)) +#' agg(carsDF, mean(carsDF$mpg)) +#' } +#' @note rollup since 2.3.0 +#' @seealso \link{agg}, \link{cube}, \link{groupBy} +setMethod("rollup", + signature(x = "SparkDataFrame"), + function(x, ...) { + cols <- list(...) + jcol <- lapply(cols, function(x) if (class(x) == "Column") x@jc else column(x)@jc) + sgd <- callJMethod(x@sdf, "rollup", jcol) + groupedData(sgd) + }) diff --git a/R/pkg/R/generics.R b/R/pkg/R/generics.R index 5e7a1c60c2..749ee9b54c 100644 --- a/R/pkg/R/generics.R +++ b/R/pkg/R/generics.R @@ -483,6 +483,10 @@ setGeneric("createOrReplaceTempView", # @export setGeneric("crossJoin", function(x, y) { standardGeneric("crossJoin") }) +#' @rdname cube +#' @export +setGeneric("cube", function(x, ...) { standardGeneric("cube") }) + #' @rdname dapply #' @export setGeneric("dapply", function(x, func, schema) { standardGeneric("dapply") }) @@ -631,6 +635,10 @@ setGeneric("sample", standardGeneric("sample") }) +#' @rdname rollup +#' @export +setGeneric("rollup", function(x, ...) { standardGeneric("rollup") }) + #' @rdname sample #' @export setGeneric("sample_frac", diff --git a/R/pkg/inst/tests/testthat/test_sparkSQL.R b/R/pkg/inst/tests/testthat/test_sparkSQL.R index c21ba2f1a1..2cef7191d4 100644 --- a/R/pkg/inst/tests/testthat/test_sparkSQL.R +++ b/R/pkg/inst/tests/testthat/test_sparkSQL.R @@ -1816,6 +1816,108 @@ test_that("pivot GroupedData column", { expect_error(collect(sum(pivot(groupBy(df, "year"), "course", list("R", "R")), "earnings"))) }) +test_that("test multi-dimensional aggregations with cube and rollup", { + df <- createDataFrame(data.frame( + id = 1:6, + year = c(2016, 2016, 2016, 2017, 2017, 2017), + salary = c(10000, 15000, 20000, 22000, 32000, 21000), + department = c("management", "rnd", "sales", "management", "rnd", "sales") + )) + + actual_cube <- collect( + orderBy( + agg( + cube(df, "year", "department"), + expr("sum(salary) AS total_salary"), expr("avg(salary) AS average_salary") + ), + "year", "department" + ) + ) + + expected_cube <- data.frame( + year = c(rep(NA, 4), rep(2016, 4), rep(2017, 4)), + department = rep(c(NA, "management", "rnd", "sales"), times = 3), + total_salary = c( + 120000, # Total + 10000 + 22000, 15000 + 32000, 20000 + 21000, # Department only + 20000 + 15000 + 10000, # 2016 + 10000, 15000, 20000, # 2016 each department + 21000 + 32000 + 22000, # 2017 + 22000, 32000, 21000 # 2017 each department + ), + average_salary = c( + # Total + mean(c(20000, 15000, 10000, 21000, 32000, 22000)), + # Mean by department + mean(c(10000, 22000)), mean(c(15000, 32000)), mean(c(20000, 21000)), + mean(c(10000, 15000, 20000)), # 2016 + 10000, 15000, 20000, # 2016 each department + mean(c(21000, 32000, 22000)), # 2017 + 22000, 32000, 21000 # 2017 each department + ), + stringsAsFactors = FALSE + ) + + expect_equal(actual_cube, expected_cube) + + # cube should accept column objects + expect_equal( + count(sum(cube(df, df$year, df$department), "salary")), + 12 + ) + + # cube without columns should result in a single aggregate + expect_equal( + collect(agg(cube(df), expr("sum(salary) as total_salary"))), + data.frame(total_salary = 120000) + ) + + actual_rollup <- collect( + orderBy( + agg( + rollup(df, "year", "department"), + expr("sum(salary) AS total_salary"), expr("avg(salary) AS average_salary") + ), + "year", "department" + ) + ) + + expected_rollup <- data.frame( + year = c(NA, rep(2016, 4), rep(2017, 4)), + department = c(NA, rep(c(NA, "management", "rnd", "sales"), times = 2)), + total_salary = c( + 120000, # Total + 20000 + 15000 + 10000, # 2016 + 10000, 15000, 20000, # 2016 each department + 21000 + 32000 + 22000, # 2017 + 22000, 32000, 21000 # 2017 each department + ), + average_salary = c( + # Total + mean(c(20000, 15000, 10000, 21000, 32000, 22000)), + mean(c(10000, 15000, 20000)), # 2016 + 10000, 15000, 20000, # 2016 each department + mean(c(21000, 32000, 22000)), # 2017 + 22000, 32000, 21000 # 2017 each department + ), + stringsAsFactors = FALSE + ) + + expect_equal(actual_rollup, expected_rollup) + + # cube should accept column objects + expect_equal( + count(sum(rollup(df, df$year, df$department), "salary")), + 9 + ) + + # rollup without columns should result in a single aggregate + expect_equal( + collect(agg(rollup(df), expr("sum(salary) as total_salary"))), + data.frame(total_salary = 120000) + ) +}) + test_that("arrange() and orderBy() on a DataFrame", { df <- read.json(jsonPath) sorted <- arrange(df, df$age) diff --git a/R/pkg/vignettes/sparkr-vignettes.Rmd b/R/pkg/vignettes/sparkr-vignettes.Rmd index f81dbab10b..4b9d6c3806 100644 --- a/R/pkg/vignettes/sparkr-vignettes.Rmd +++ b/R/pkg/vignettes/sparkr-vignettes.Rmd @@ -308,6 +308,21 @@ numCyl <- summarize(groupBy(carsDF, carsDF$cyl), count = n(carsDF$cyl)) head(numCyl) ``` +Use `cube` or `rollup` to compute subtotals across multiple dimensions. + +```{r} +mean(cube(carsDF, "cyl", "gear", "am"), "mpg") +``` + +generates groupings for {(`cyl`, `gear`, `am`), (`cyl`, `gear`), (`cyl`), ()}, while + +```{r} +mean(rollup(carsDF, "cyl", "gear", "am"), "mpg") +``` + +generates groupings for all possible combinations of grouping columns. + + #### Operating on Columns SparkR also provides a number of functions that can directly applied to columns for data processing and during aggregation. The example below shows the use of basic arithmetic functions. diff --git a/docs/sparkr.md b/docs/sparkr.md index a1a35a7757..e015ab260f 100644 --- a/docs/sparkr.md +++ b/docs/sparkr.md @@ -264,6 +264,36 @@ head(arrange(waiting_counts, desc(waiting_counts$count))) {% endhighlight %} +In addition to standard aggregations, SparkR supports [OLAP cube](https://en.wikipedia.org/wiki/OLAP_cube) operators `cube`: + +
+{% highlight r %} +head(agg(cube(df, "cyl", "disp", "gear"), avg(df$mpg))) +## cyl disp gear avg(mpg) +##1 NA 140.8 4 22.8 +##2 4 75.7 4 30.4 +##3 8 400.0 3 19.2 +##4 8 318.0 3 15.5 +##5 NA 351.0 NA 15.8 +##6 NA 275.8 NA 16.3 +{% endhighlight %} +
+ +and `rollup`: + +
+{% highlight r %} +head(agg(rollup(df, "cyl", "disp", "gear"), avg(df$mpg))) +## cyl disp gear avg(mpg) +##1 4 75.7 4 30.4 +##2 8 400.0 3 19.2 +##3 8 318.0 3 15.5 +##4 4 78.7 NA 32.4 +##5 8 304.0 3 15.2 +##6 4 79.0 NA 27.3 +{% endhighlight %} +
+ ### Operating on Columns SparkR also provides a number of functions that can directly applied to columns for data processing and during aggregation. The example below shows the use of basic arithmetic functions.