[SPARK-25117][R] Add EXEPT ALL and INTERSECT ALL support in R

## What changes were proposed in this pull request?
[SPARK-21274](https://issues.apache.org/jira/browse/SPARK-21274) added support for EXCEPT ALL and INTERSECT ALL. This PR adds the support in R.

## How was this patch tested?
Added test in test_sparkSQL.R

Author: Dilip Biswal <dbiswal@us.ibm.com>

Closes #22107 from dilipbiswal/SPARK-25117.
This commit is contained in:
Dilip Biswal 2018-08-17 00:04:04 -07:00 committed by Felix Cheung
parent c1ffb3c10a
commit 162326c0ee
4 changed files with 85 additions and 1 deletions

View file

@ -117,6 +117,7 @@ exportMethods("arrange",
"dropna", "dropna",
"dtypes", "dtypes",
"except", "except",
"exceptAll",
"explain", "explain",
"fillna", "fillna",
"filter", "filter",
@ -131,6 +132,7 @@ exportMethods("arrange",
"hint", "hint",
"insertInto", "insertInto",
"intersect", "intersect",
"intersectAll",
"isLocal", "isLocal",
"isStreaming", "isStreaming",
"join", "join",

View file

@ -2848,6 +2848,35 @@ setMethod("intersect",
dataFrame(intersected) dataFrame(intersected)
}) })
#' intersectAll
#'
#' Return a new SparkDataFrame containing rows in both this SparkDataFrame
#' and another SparkDataFrame while preserving the duplicates.
#' This is equivalent to \code{INTERSECT ALL} in SQL. Also as standard in
#' SQL, this function resolves columns by position (not by name).
#'
#' @param x a SparkDataFrame.
#' @param y a SparkDataFrame.
#' @return A SparkDataFrame containing the result of the intersect all operation.
#' @family SparkDataFrame functions
#' @aliases intersectAll,SparkDataFrame,SparkDataFrame-method
#' @rdname intersectAll
#' @name intersectAll
#' @examples
#'\dontrun{
#' sparkR.session()
#' df1 <- read.json(path)
#' df2 <- read.json(path2)
#' intersectAllDF <- intersectAll(df1, df2)
#' }
#' @note intersectAll since 2.4.0
setMethod("intersectAll",
signature(x = "SparkDataFrame", y = "SparkDataFrame"),
function(x, y) {
intersected <- callJMethod(x@sdf, "intersectAll", y@sdf)
dataFrame(intersected)
})
#' except #' except
#' #'
#' Return a new SparkDataFrame containing rows in this SparkDataFrame #' Return a new SparkDataFrame containing rows in this SparkDataFrame
@ -2867,7 +2896,6 @@ setMethod("intersect",
#' df2 <- read.json(path2) #' df2 <- read.json(path2)
#' exceptDF <- except(df, df2) #' exceptDF <- except(df, df2)
#' } #' }
#' @rdname except
#' @note except since 1.4.0 #' @note except since 1.4.0
setMethod("except", setMethod("except",
signature(x = "SparkDataFrame", y = "SparkDataFrame"), signature(x = "SparkDataFrame", y = "SparkDataFrame"),
@ -2876,6 +2904,35 @@ setMethod("except",
dataFrame(excepted) dataFrame(excepted)
}) })
#' exceptAll
#'
#' Return a new SparkDataFrame containing rows in this SparkDataFrame
#' but not in another SparkDataFrame while preserving the duplicates.
#' This is equivalent to \code{EXCEPT ALL} in SQL. Also as standard in
#' SQL, this function resolves columns by position (not by name).
#'
#' @param x a SparkDataFrame.
#' @param y a SparkDataFrame.
#' @return A SparkDataFrame containing the result of the except all operation.
#' @family SparkDataFrame functions
#' @aliases exceptAll,SparkDataFrame,SparkDataFrame-method
#' @rdname exceptAll
#' @name exceptAll
#' @examples
#'\dontrun{
#' sparkR.session()
#' df1 <- read.json(path)
#' df2 <- read.json(path2)
#' exceptAllDF <- exceptAll(df1, df2)
#' }
#' @note exceptAll since 2.4.0
setMethod("exceptAll",
signature(x = "SparkDataFrame", y = "SparkDataFrame"),
function(x, y) {
excepted <- callJMethod(x@sdf, "exceptAll", y@sdf)
dataFrame(excepted)
})
#' Save the contents of SparkDataFrame to a data source. #' Save the contents of SparkDataFrame to a data source.
#' #'
#' The data source is specified by the \code{source} and a set of options (...). #' The data source is specified by the \code{source} and a set of options (...).

View file

@ -471,6 +471,9 @@ setGeneric("explain", function(x, ...) { standardGeneric("explain") })
#' @rdname except #' @rdname except
setGeneric("except", function(x, y) { standardGeneric("except") }) setGeneric("except", function(x, y) { standardGeneric("except") })
#' @rdname exceptAll
setGeneric("exceptAll", function(x, y) { standardGeneric("exceptAll") })
#' @rdname nafunctions #' @rdname nafunctions
setGeneric("fillna", function(x, value, cols = NULL) { standardGeneric("fillna") }) setGeneric("fillna", function(x, value, cols = NULL) { standardGeneric("fillna") })
@ -495,6 +498,9 @@ setGeneric("insertInto", function(x, tableName, ...) { standardGeneric("insertIn
#' @rdname intersect #' @rdname intersect
setGeneric("intersect", function(x, y) { standardGeneric("intersect") }) setGeneric("intersect", function(x, y) { standardGeneric("intersect") })
#' @rdname intersectAll
setGeneric("intersectAll", function(x, y) { standardGeneric("intersectAll") })
#' @rdname isLocal #' @rdname isLocal
setGeneric("isLocal", function(x) { standardGeneric("isLocal") }) setGeneric("isLocal", function(x) { standardGeneric("isLocal") })

View file

@ -2482,6 +2482,25 @@ test_that("union(), unionByName(), rbind(), except(), and intersect() on a DataF
unlink(jsonPath2) unlink(jsonPath2)
}) })
test_that("intersectAll() and exceptAll()", {
df1 <- createDataFrame(list(list("a", 1), list("a", 1), list("a", 1),
list("a", 1), list("b", 3), list("c", 4)),
schema = c("a", "b"))
df2 <- createDataFrame(list(list("a", 1), list("a", 1), list("b", 3)), schema = c("a", "b"))
intersectAllExpected <- data.frame("a" = c("a", "a", "b"), "b" = c(1, 1, 3),
stringsAsFactors = FALSE)
exceptAllExpected <- data.frame("a" = c("a", "a", "c"), "b" = c(1, 1, 4),
stringsAsFactors = FALSE)
intersectAllDf <- arrange(intersectAll(df1, df2), df1$a)
expect_is(intersectAllDf, "SparkDataFrame")
exceptAllDf <- arrange(exceptAll(df1, df2), df1$a)
expect_is(exceptAllDf, "SparkDataFrame")
intersectAllActual <- collect(intersectAllDf)
expect_identical(intersectAllActual, intersectAllExpected)
exceptAllActual <- collect(exceptAllDf)
expect_identical(exceptAllActual, exceptAllExpected)
})
test_that("withColumn() and withColumnRenamed()", { test_that("withColumn() and withColumnRenamed()", {
df <- read.json(jsonPath) df <- read.json(jsonPath)
newDF <- withColumn(df, "newAge", df$age + 2) newDF <- withColumn(df, "newAge", df$age + 2)