[SPARK-11395][SPARKR] Support over and window specification in SparkR.

This PR:
1. Implement WindowSpec S4 class.
2. Implement Window.partitionBy() and Window.orderBy() as utility functions to create WindowSpec objects.
3. Implement over() of Column class.

Author: Sun Rui <rui.sun@intel.com>
Author: Sun Rui <sunrui2016@gmail.com>

Closes #10094 from sun-rui/SPARK-11395.
This commit is contained in:
Sun Rui 2016-05-05 18:49:43 -07:00 committed by Shivaram Venkataraman
parent 7f5922aa4a
commit 157a49aa41
8 changed files with 364 additions and 7 deletions

View file

@ -26,6 +26,7 @@ Collate:
'pairRDD.R'
'DataFrame.R'
'SQLContext.R'
'WindowSpec.R'
'backend.R'
'broadcast.R'
'client.R'
@ -38,4 +39,5 @@ Collate:
'stats.R'
'types.R'
'utils.R'
'window.R'
RoxygenNote: 5.0.1

View file

@ -216,6 +216,7 @@ exportMethods("%in%",
"next_day",
"ntile",
"otherwise",
"over",
"percent_rank",
"pmod",
"quarter",
@ -315,3 +316,12 @@ export("structField",
"structType.jobj",
"structType.structField",
"print.structType")
exportClasses("WindowSpec")
export("partitionBy",
"rowsBetween",
"rangeBetween")
export("window.partitionBy",
"window.orderBy")

View file

@ -1749,8 +1749,8 @@ setMethod("arrange",
#' @export
setMethod("orderBy",
signature(x = "SparkDataFrame", col = "characterOrColumn"),
function(x, col) {
arrange(x, col)
function(x, col, ...) {
arrange(x, col, ...)
})
#' Filter

188
R/pkg/R/WindowSpec.R Normal file
View file

@ -0,0 +1,188 @@
#
# Licensed to the Apache Software Foundation (ASF) under one or more
# contributor license agreements. See the NOTICE file distributed with
# this work for additional information regarding copyright ownership.
# The ASF licenses this file to You under the Apache License, Version 2.0
# (the "License"); you may not use this file except in compliance with
# the License. You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.
#
# WindowSpec.R - WindowSpec class and methods implemented in S4 OO classes
#' @include generics.R jobj.R column.R
NULL
#' @title S4 class that represents a WindowSpec
#' @description WindowSpec can be created by using window.partitionBy()
#' or window.orderBy()
#' @rdname WindowSpec
#' @seealso \link{window.partitionBy}, \link{window.orderBy}
#'
#' @param sws A Java object reference to the backing Scala WindowSpec
#' @export
setClass("WindowSpec",
slots = list(sws = "jobj"))
setMethod("initialize", "WindowSpec", function(.Object, sws) {
.Object@sws <- sws
.Object
})
windowSpec <- function(sws) {
stopifnot(class(sws) == "jobj")
new("WindowSpec", sws)
}
#' @rdname show
setMethod("show", "WindowSpec",
function(object) {
cat("WindowSpec", callJMethod(object@sws, "toString"), "\n")
})
#' partitionBy
#'
#' Defines the partitioning columns in a WindowSpec.
#'
#' @param x a WindowSpec
#' @return a WindowSpec
#' @rdname partitionBy
#' @name partitionBy
#' @family windowspec_method
#' @export
#' @examples
#' \dontrun{
#' partitionBy(ws, "col1", "col2")
#' partitionBy(ws, df$col1, df$col2)
#' }
setMethod("partitionBy",
signature(x = "WindowSpec"),
function(x, col, ...) {
stopifnot (class(col) %in% c("character", "Column"))
if (class(col) == "character") {
windowSpec(callJMethod(x@sws, "partitionBy", col, list(...)))
} else {
jcols <- lapply(list(col, ...), function(c) {
c@jc
})
windowSpec(callJMethod(x@sws, "partitionBy", jcols))
}
})
#' orderBy
#'
#' Defines the ordering columns in a WindowSpec.
#'
#' @param x a WindowSpec
#' @return a WindowSpec
#' @rdname arrange
#' @name orderBy
#' @family windowspec_method
#' @export
#' @examples
#' \dontrun{
#' orderBy(ws, "col1", "col2")
#' orderBy(ws, df$col1, df$col2)
#' }
setMethod("orderBy",
signature(x = "WindowSpec", col = "character"),
function(x, col, ...) {
windowSpec(callJMethod(x@sws, "orderBy", col, list(...)))
})
#' @rdname arrange
#' @name orderBy
#' @export
setMethod("orderBy",
signature(x = "WindowSpec", col = "Column"),
function(x, col, ...) {
jcols <- lapply(list(col, ...), function(c) {
c@jc
})
windowSpec(callJMethod(x@sws, "orderBy", jcols))
})
#' rowsBetween
#'
#' Defines the frame boundaries, from `start` (inclusive) to `end` (inclusive).
#'
#' Both `start` and `end` are relative positions from the current row. For example, "0" means
#' "current row", while "-1" means the row before the current row, and "5" means the fifth row
#' after the current row.
#'
#' @param x a WindowSpec
#' @param start boundary start, inclusive.
#' The frame is unbounded if this is the minimum long value.
#' @param end boundary end, inclusive.
#' The frame is unbounded if this is the maximum long value.
#' @return a WindowSpec
#' @rdname rowsBetween
#' @name rowsBetween
#' @family windowspec_method
#' @export
#' @examples
#' \dontrun{
#' rowsBetween(ws, 0, 3)
#' }
setMethod("rowsBetween",
signature(x = "WindowSpec", start = "numeric", end = "numeric"),
function(x, start, end) {
# "start" and "end" should be long, due to serde limitation,
# limit "start" and "end" as integer now
windowSpec(callJMethod(x@sws, "rowsBetween", as.integer(start), as.integer(end)))
})
#' rangeBetween
#'
#' Defines the frame boundaries, from `start` (inclusive) to `end` (inclusive).
#'
#' Both `start` and `end` are relative from the current row. For example, "0" means "current row",
#' while "-1" means one off before the current row, and "5" means the five off after the
#' current row.
#' @param x a WindowSpec
#' @param start boundary start, inclusive.
#' The frame is unbounded if this is the minimum long value.
#' @param end boundary end, inclusive.
#' The frame is unbounded if this is the maximum long value.
#' @return a WindowSpec
#' @rdname rangeBetween
#' @name rangeBetween
#' @family windowspec_method
#' @export
#' @examples
#' \dontrun{
#' rangeBetween(ws, 0, 3)
#' }
setMethod("rangeBetween",
signature(x = "WindowSpec", start = "numeric", end = "numeric"),
function(x, start, end) {
# "start" and "end" should be long, due to serde limitation,
# limit "start" and "end" as integer now
windowSpec(callJMethod(x@sws, "rangeBetween", as.integer(start), as.integer(end)))
})
# Note that over is a method of Column class, but it is placed here to
# avoid Roxygen circular-dependency between class Column and WindowSpec.
#' over
#'
#' Define a windowing column.
#'
#' @rdname over
#' @name over
#' @family colum_func
#' @export
setMethod("over",
signature(x = "Column", window = "WindowSpec"),
function(x, window) {
column(callJMethod(x@jc, "over", window@sws))
})

View file

@ -339,9 +339,9 @@ setGeneric("join", function(x, y, ...) { standardGeneric("join") })
# @export
setGeneric("leftOuterJoin", function(x, y, numPartitions) { standardGeneric("leftOuterJoin") })
# @rdname partitionBy
# @export
setGeneric("partitionBy", function(x, numPartitions, ...) { standardGeneric("partitionBy") })
#' @rdname partitionBy
#' @export
setGeneric("partitionBy", function(x, ...) { standardGeneric("partitionBy") })
# @rdname reduceByKey
# @seealso groupByKey
@ -533,7 +533,7 @@ setGeneric("mutate", function(.data, ...) {standardGeneric("mutate") })
#' @rdname arrange
#' @export
setGeneric("orderBy", function(x, col) { standardGeneric("orderBy") })
setGeneric("orderBy", function(x, col, ...) { standardGeneric("orderBy") })
#' @rdname schema
#' @export
@ -733,6 +733,27 @@ setGeneric("when", function(condition, value) { standardGeneric("when") })
#' @export
setGeneric("otherwise", function(x, value) { standardGeneric("otherwise") })
#' @rdname over
#' @export
setGeneric("over", function(x, window) { standardGeneric("over") })
###################### WindowSpec Methods ##########################
#' @rdname rowsBetween
#' @export
setGeneric("rowsBetween", function(x, start, end) { standardGeneric("rowsBetween") })
#' @rdname rangeBetween
#' @export
setGeneric("rangeBetween", function(x, start, end) { standardGeneric("rangeBetween") })
#' @rdname window.partitionBy
#' @export
setGeneric("window.partitionBy", function(col, ...) { standardGeneric("window.partitionBy") })
#' @rdname window.orderBy
#' @export
setGeneric("window.orderBy", function(col, ...) { standardGeneric("window.orderBy") })
###################### Expression Function Methods ##########################

View file

@ -205,8 +205,10 @@ setMethod("flatMapValues",
#' @aliases partitionBy,RDD,integer-method
#' @noRd
setMethod("partitionBy",
signature(x = "RDD", numPartitions = "numeric"),
signature(x = "RDD"),
function(x, numPartitions, partitionFunc = hashCode) {
stopifnot(is.numeric(numPartitions))
partitionFunc <- cleanClosure(partitionFunc)
serializedHashFuncBytes <- serialize(partitionFunc, connection = NULL)

98
R/pkg/R/window.R Normal file
View file

@ -0,0 +1,98 @@
#
# Licensed to the Apache Software Foundation (ASF) under one or more
# contributor license agreements. See the NOTICE file distributed with
# this work for additional information regarding copyright ownership.
# The ASF licenses this file to You under the Apache License, Version 2.0
# (the "License"); you may not use this file except in compliance with
# the License. You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.
#
# window.R - Utility functions for defining window in DataFrames
#' window.partitionBy
#'
#' Creates a WindowSpec with the partitioning defined.
#'
#' @rdname window.partitionBy
#' @name window.partitionBy
#' @export
#' @examples
#' \dontrun{
#' ws <- window.partitionBy("key1", "key2")
#' df1 <- select(df, over(lead("value", 1), ws))
#'
#' ws <- window.partitionBy(df$key1, df$key2)
#' df1 <- select(df, over(lead("value", 1), ws))
#' }
setMethod("window.partitionBy",
signature(col = "character"),
function(col, ...) {
windowSpec(
callJStatic("org.apache.spark.sql.expressions.Window",
"partitionBy",
col,
list(...)))
})
#' @rdname window.partitionBy
#' @name window.partitionBy
#' @export
setMethod("window.partitionBy",
signature(col = "Column"),
function(col, ...) {
jcols <- lapply(list(col, ...), function(c) {
c@jc
})
windowSpec(
callJStatic("org.apache.spark.sql.expressions.Window",
"partitionBy",
jcols))
})
#' window.orderBy
#'
#' Creates a WindowSpec with the ordering defined.
#'
#' @rdname window.orderBy
#' @name window.orderBy
#' @export
#' @examples
#' \dontrun{
#' ws <- window.orderBy("key1", "key2")
#' df1 <- select(df, over(lead("value", 1), ws))
#'
#' ws <- window.orderBy(df$key1, df$key2)
#' df1 <- select(df, over(lead("value", 1), ws))
#' }
setMethod("window.orderBy",
signature(col = "character"),
function(col, ...) {
windowSpec(
callJStatic("org.apache.spark.sql.expressions.Window",
"orderBy",
col,
list(...)))
})
#' @rdname window.orderBy
#' @name window.orderBy
#' @export
setMethod("window.orderBy",
signature(col = "Column"),
function(col, ...) {
jcols <- lapply(list(col, ...), function(c) {
c@jc
})
windowSpec(
callJStatic("org.apache.spark.sql.expressions.Window",
"orderBy",
jcols))
})

View file

@ -2118,6 +2118,42 @@ test_that("repartition by columns on DataFrame", {
expect_equal(nrow(df1), 2)
})
test_that("Window functions on a DataFrame", {
ssc <- callJMethod(sc, "sc")
hiveCtx <- tryCatch({
newJObject("org.apache.spark.sql.hive.test.TestHiveContext", ssc)
},
error = function(err) {
skip("Hive is not build with SparkSQL, skipped")
})
df <- createDataFrame(hiveCtx,
list(list(1L, "1"), list(2L, "2"), list(1L, "1"), list(2L, "2")),
schema = c("key", "value"))
ws <- orderBy(window.partitionBy("key"), "value")
result <- collect(select(df, over(lead("key", 1), ws), over(lead("value", 1), ws)))
names(result) <- c("key", "value")
expected <- data.frame(key = c(1L, NA, 2L, NA),
value = c("1", NA, "2", NA),
stringsAsFactors = FALSE)
expect_equal(result, expected)
ws <- orderBy(window.partitionBy(df$key), df$value)
result <- collect(select(df, over(lead("key", 1), ws), over(lead("value", 1), ws)))
names(result) <- c("key", "value")
expect_equal(result, expected)
ws <- partitionBy(window.orderBy("value"), "key")
result <- collect(select(df, over(lead("key", 1), ws), over(lead("value", 1), ws)))
names(result) <- c("key", "value")
expect_equal(result, expected)
ws <- partitionBy(window.orderBy(df$value), df$key)
result <- collect(select(df, over(lead("key", 1), ws), over(lead("value", 1), ws)))
names(result) <- c("key", "value")
expect_equal(result, expected)
})
unlink(parquetPath)
unlink(jsonPath)
unlink(jsonPathNa)