[SPARK-7227] [SPARKR] Support fillna / dropna in R DataFrame.

Author: Sun Rui <rui.sun@intel.com>

Closes #6183 from sun-rui/SPARK-7227 and squashes the following commits:

dd6f5b3 [Sun Rui] Rename readEnv() back to readMap(). Add alias na.omit() for dropna().
41cf725 [Sun Rui] [SPARK-7227][SPARKR] Support fillna / dropna in R DataFrame.

(cherry picked from commit 46576ab303)
Signed-off-by: Shivaram Venkataraman <shivaram@cs.berkeley.edu>
This commit is contained in:
Sun Rui 2015-05-31 15:01:21 -07:00 committed by Shivaram Venkataraman
parent bab0fab68f
commit f1d4e7e311
6 changed files with 267 additions and 3 deletions

View file

@ -19,9 +19,11 @@ exportMethods("arrange",
"count",
"describe",
"distinct",
"dropna",
"dtypes",
"except",
"explain",
"fillna",
"filter",
"first",
"group_by",

View file

@ -1429,3 +1429,128 @@ setMethod("describe",
sdf <- callJMethod(x@sdf, "describe", listToSeq(colList))
dataFrame(sdf)
})
#' dropna
#'
#' Returns a new DataFrame omitting rows with null values.
#'
#' @param x A SparkSQL DataFrame.
#' @param how "any" or "all".
#' if "any", drop a row if it contains any nulls.
#' if "all", drop a row only if all its values are null.
#' if minNonNulls is specified, how is ignored.
#' @param minNonNulls If specified, drop rows that have less than
#' minNonNulls non-null values.
#' This overwrites the how parameter.
#' @param cols Optional list of column names to consider.
#' @return A DataFrame
#'
#' @rdname nafunctions
#' @export
#' @examples
#'\dontrun{
#' sc <- sparkR.init()
#' sqlCtx <- sparkRSQL.init(sc)
#' path <- "path/to/file.json"
#' df <- jsonFile(sqlCtx, path)
#' dropna(df)
#' }
setMethod("dropna",
signature(x = "DataFrame"),
function(x, how = c("any", "all"), minNonNulls = NULL, cols = NULL) {
how <- match.arg(how)
if (is.null(cols)) {
cols <- columns(x)
}
if (is.null(minNonNulls)) {
minNonNulls <- if (how == "any") { length(cols) } else { 1 }
}
naFunctions <- callJMethod(x@sdf, "na")
sdf <- callJMethod(naFunctions, "drop",
as.integer(minNonNulls), listToSeq(as.list(cols)))
dataFrame(sdf)
})
#' @aliases dropna
#' @export
setMethod("na.omit",
signature(x = "DataFrame"),
function(x, how = c("any", "all"), minNonNulls = NULL, cols = NULL) {
dropna(x, how, minNonNulls, cols)
})
#' fillna
#'
#' Replace null values.
#'
#' @param x A SparkSQL DataFrame.
#' @param value Value to replace null values with.
#' Should be an integer, numeric, character or named list.
#' If the value is a named list, then cols is ignored and
#' value must be a mapping from column name (character) to
#' replacement value. The replacement value must be an
#' integer, numeric or character.
#' @param cols optional list of column names to consider.
#' Columns specified in cols that do not have matching data
#' type are ignored. For example, if value is a character, and
#' subset contains a non-character column, then the non-character
#' column is simply ignored.
#' @return A DataFrame
#'
#' @rdname nafunctions
#' @export
#' @examples
#'\dontrun{
#' sc <- sparkR.init()
#' sqlCtx <- sparkRSQL.init(sc)
#' path <- "path/to/file.json"
#' df <- jsonFile(sqlCtx, path)
#' fillna(df, 1)
#' fillna(df, list("age" = 20, "name" = "unknown"))
#' }
setMethod("fillna",
signature(x = "DataFrame"),
function(x, value, cols = NULL) {
if (!(class(value) %in% c("integer", "numeric", "character", "list"))) {
stop("value should be an integer, numeric, charactor or named list.")
}
if (class(value) == "list") {
# Check column names in the named list
colNames <- names(value)
if (length(colNames) == 0 || !all(colNames != "")) {
stop("value should be an a named list with each name being a column name.")
}
# Convert to the named list to an environment to be passed to JVM
valueMap <- new.env()
for (col in colNames) {
# Check each item in the named list is of valid type
v <- value[[col]]
if (!(class(v) %in% c("integer", "numeric", "character"))) {
stop("Each item in value should be an integer, numeric or charactor.")
}
valueMap[[col]] <- v
}
# When value is a named list, caller is expected not to pass in cols
if (!is.null(cols)) {
warning("When value is a named list, cols is ignored!")
cols <- NULL
}
value <- valueMap
} else if (is.integer(value)) {
# Cast an integer to a numeric
value <- as.numeric(value)
}
naFunctions <- callJMethod(x@sdf, "na")
sdf <- if (length(cols) == 0) {
callJMethod(naFunctions, "fill", value)
} else {
callJMethod(naFunctions, "fill", value, listToSeq(as.list(cols)))
}
dataFrame(sdf)
})

View file

@ -396,6 +396,20 @@ setGeneric("columns", function(x) {standardGeneric("columns") })
#' @export
setGeneric("describe", function(x, col, ...) { standardGeneric("describe") })
#' @rdname nafunctions
#' @export
setGeneric("dropna",
function(x, how = c("any", "all"), minNonNulls = NULL, cols = NULL) {
standardGeneric("dropna")
})
#' @rdname nafunctions
#' @export
setGeneric("na.omit",
function(x, how = c("any", "all"), minNonNulls = NULL, cols = NULL) {
standardGeneric("na.omit")
})
#' @rdname schema
#' @export
setGeneric("dtypes", function(x) { standardGeneric("dtypes") })
@ -408,6 +422,10 @@ setGeneric("explain", function(x, ...) { standardGeneric("explain") })
#' @export
setGeneric("except", function(x, y) { standardGeneric("except") })
#' @rdname nafunctions
#' @export
setGeneric("fillna", function(x, value, cols = NULL) { standardGeneric("fillna") })
#' @rdname filter
#' @export
setGeneric("filter", function(x, condition) { standardGeneric("filter") })

View file

@ -160,6 +160,14 @@ writeList <- function(con, arr) {
}
}
# Used to pass arrays where the elements can be of different types
writeGenericList <- function(con, list) {
writeInt(con, length(list))
for (elem in list) {
writeObject(con, elem)
}
}
# Used to pass in hash maps required on Java side.
writeEnv <- function(con, env) {
len <- length(env)
@ -168,7 +176,7 @@ writeEnv <- function(con, env) {
if (len > 0) {
writeList(con, as.list(ls(env)))
vals <- lapply(ls(env), function(x) { env[[x]] })
writeList(con, as.list(vals))
writeGenericList(con, as.list(vals))
}
}

View file

@ -32,6 +32,15 @@ jsonPath <- tempfile(pattern="sparkr-test", fileext=".tmp")
parquetPath <- tempfile(pattern="sparkr-test", fileext=".parquet")
writeLines(mockLines, jsonPath)
# For test nafunctions, like dropna(), fillna(),...
mockLinesNa <- c("{\"name\":\"Bob\",\"age\":16,\"height\":176.5}",
"{\"name\":\"Alice\",\"age\":null,\"height\":164.3}",
"{\"name\":\"David\",\"age\":60,\"height\":null}",
"{\"name\":\"Amy\",\"age\":null,\"height\":null}",
"{\"name\":null,\"age\":null,\"height\":null}")
jsonPathNa <- tempfile(pattern="sparkr-test", fileext=".tmp")
writeLines(mockLinesNa, jsonPathNa)
test_that("infer types", {
expect_equal(infer_type(1L), "integer")
expect_equal(infer_type(1.0), "double")
@ -765,5 +774,105 @@ test_that("describe() on a DataFrame", {
expect_equal(collect(stats)[5, "age"], "30")
})
test_that("dropna() on a DataFrame", {
df <- jsonFile(sqlContext, jsonPathNa)
rows <- collect(df)
# drop with columns
expected <- rows[!is.na(rows$name),]
actual <- collect(dropna(df, cols = "name"))
expect_true(identical(expected, actual))
expected <- rows[!is.na(rows$age),]
actual <- collect(dropna(df, cols = "age"))
row.names(expected) <- row.names(actual)
# identical on two dataframes does not work here. Don't know why.
# use identical on all columns as a workaround.
expect_true(identical(expected$age, actual$age))
expect_true(identical(expected$height, actual$height))
expect_true(identical(expected$name, actual$name))
expected <- rows[!is.na(rows$age) & !is.na(rows$height),]
actual <- collect(dropna(df, cols = c("age", "height")))
expect_true(identical(expected, actual))
expected <- rows[!is.na(rows$age) & !is.na(rows$height) & !is.na(rows$name),]
actual <- collect(dropna(df))
expect_true(identical(expected, actual))
# drop with how
expected <- rows[!is.na(rows$age) & !is.na(rows$height) & !is.na(rows$name),]
actual <- collect(dropna(df))
expect_true(identical(expected, actual))
expected <- rows[!is.na(rows$age) | !is.na(rows$height) | !is.na(rows$name),]
actual <- collect(dropna(df, "all"))
expect_true(identical(expected, actual))
expected <- rows[!is.na(rows$age) & !is.na(rows$height) & !is.na(rows$name),]
actual <- collect(dropna(df, "any"))
expect_true(identical(expected, actual))
expected <- rows[!is.na(rows$age) & !is.na(rows$height),]
actual <- collect(dropna(df, "any", cols = c("age", "height")))
expect_true(identical(expected, actual))
expected <- rows[!is.na(rows$age) | !is.na(rows$height),]
actual <- collect(dropna(df, "all", cols = c("age", "height")))
expect_true(identical(expected, actual))
# drop with threshold
expected <- rows[as.integer(!is.na(rows$age)) + as.integer(!is.na(rows$height)) >= 2,]
actual <- collect(dropna(df, minNonNulls = 2, cols = c("age", "height")))
expect_true(identical(expected, actual))
expected <- rows[as.integer(!is.na(rows$age)) +
as.integer(!is.na(rows$height)) +
as.integer(!is.na(rows$name)) >= 3,]
actual <- collect(dropna(df, minNonNulls = 3, cols = c("name", "age", "height")))
expect_true(identical(expected, actual))
})
test_that("fillna() on a DataFrame", {
df <- jsonFile(sqlContext, jsonPathNa)
rows <- collect(df)
# fill with value
expected <- rows
expected$age[is.na(expected$age)] <- 50
expected$height[is.na(expected$height)] <- 50.6
actual <- collect(fillna(df, 50.6))
expect_true(identical(expected, actual))
expected <- rows
expected$name[is.na(expected$name)] <- "unknown"
actual <- collect(fillna(df, "unknown"))
expect_true(identical(expected, actual))
expected <- rows
expected$age[is.na(expected$age)] <- 50
actual <- collect(fillna(df, 50.6, "age"))
expect_true(identical(expected, actual))
expected <- rows
expected$name[is.na(expected$name)] <- "unknown"
actual <- collect(fillna(df, "unknown", c("age", "name")))
expect_true(identical(expected, actual))
# fill with named list
expected <- rows
expected$age[is.na(expected$age)] <- 50
expected$height[is.na(expected$height)] <- 50.6
expected$name[is.na(expected$name)] <- "unknown"
actual <- collect(fillna(df, list("age" = 50, "height" = 50.6, "name" = "unknown")))
expect_true(identical(expected, actual))
})
unlink(parquetPath)
unlink(jsonPath)
unlink(jsonPathNa)

View file

@ -157,9 +157,11 @@ private[spark] object SerDe {
val keysLen = readInt(in)
val keys = (0 until keysLen).map(_ => readTypedObject(in, keysType))
val valuesType = readObjectType(in)
val valuesLen = readInt(in)
val values = (0 until valuesLen).map(_ => readTypedObject(in, valuesType))
val values = (0 until valuesLen).map(_ => {
val valueType = readObjectType(in)
readTypedObject(in, valueType)
})
mapAsJavaMap(keys.zip(values).toMap)
} else {
new java.util.HashMap[Object, Object]()