[SPARK-8548] [SPARKR] Remove the trailing whitespaces from the SparkR files
[[SPARK-8548] Remove the trailing whitespaces from the SparkR files - ASF JIRA](https://issues.apache.org/jira/browse/SPARK-8548)
- This is the result of `lint-r`
https://gist.github.com/yu-iskw/0019b37a2c1167f33986
Author: Yu ISHIKAWA <yuu.ishikawa@gmail.com>
Closes #6945 from yu-iskw/SPARK-8548 and squashes the following commits:
0bd567a [Yu ISHIKAWA] [SPARK-8548][SparkR] Remove the trailing whitespaces from the SparkR files
(cherry picked from commit 44fa7df64d
)
Signed-off-by: Shivaram Venkataraman <shivaram@cs.berkeley.edu>
This commit is contained in:
parent
d73900a903
commit
250179485b
|
@ -38,7 +38,7 @@ setClass("DataFrame",
|
||||||
setMethod("initialize", "DataFrame", function(.Object, sdf, isCached) {
|
setMethod("initialize", "DataFrame", function(.Object, sdf, isCached) {
|
||||||
.Object@env <- new.env()
|
.Object@env <- new.env()
|
||||||
.Object@env$isCached <- isCached
|
.Object@env$isCached <- isCached
|
||||||
|
|
||||||
.Object@sdf <- sdf
|
.Object@sdf <- sdf
|
||||||
.Object
|
.Object
|
||||||
})
|
})
|
||||||
|
@ -55,11 +55,11 @@ dataFrame <- function(sdf, isCached = FALSE) {
|
||||||
############################ DataFrame Methods ##############################################
|
############################ DataFrame Methods ##############################################
|
||||||
|
|
||||||
#' Print Schema of a DataFrame
|
#' Print Schema of a DataFrame
|
||||||
#'
|
#'
|
||||||
#' Prints out the schema in tree format
|
#' Prints out the schema in tree format
|
||||||
#'
|
#'
|
||||||
#' @param x A SparkSQL DataFrame
|
#' @param x A SparkSQL DataFrame
|
||||||
#'
|
#'
|
||||||
#' @rdname printSchema
|
#' @rdname printSchema
|
||||||
#' @export
|
#' @export
|
||||||
#' @examples
|
#' @examples
|
||||||
|
@ -78,11 +78,11 @@ setMethod("printSchema",
|
||||||
})
|
})
|
||||||
|
|
||||||
#' Get schema object
|
#' Get schema object
|
||||||
#'
|
#'
|
||||||
#' Returns the schema of this DataFrame as a structType object.
|
#' Returns the schema of this DataFrame as a structType object.
|
||||||
#'
|
#'
|
||||||
#' @param x A SparkSQL DataFrame
|
#' @param x A SparkSQL DataFrame
|
||||||
#'
|
#'
|
||||||
#' @rdname schema
|
#' @rdname schema
|
||||||
#' @export
|
#' @export
|
||||||
#' @examples
|
#' @examples
|
||||||
|
@ -100,9 +100,9 @@ setMethod("schema",
|
||||||
})
|
})
|
||||||
|
|
||||||
#' Explain
|
#' Explain
|
||||||
#'
|
#'
|
||||||
#' Print the logical and physical Catalyst plans to the console for debugging.
|
#' Print the logical and physical Catalyst plans to the console for debugging.
|
||||||
#'
|
#'
|
||||||
#' @param x A SparkSQL DataFrame
|
#' @param x A SparkSQL DataFrame
|
||||||
#' @param extended Logical. If extended is False, explain() only prints the physical plan.
|
#' @param extended Logical. If extended is False, explain() only prints the physical plan.
|
||||||
#' @rdname explain
|
#' @rdname explain
|
||||||
|
@ -200,11 +200,11 @@ setMethod("show", "DataFrame",
|
||||||
})
|
})
|
||||||
|
|
||||||
#' DataTypes
|
#' DataTypes
|
||||||
#'
|
#'
|
||||||
#' Return all column names and their data types as a list
|
#' Return all column names and their data types as a list
|
||||||
#'
|
#'
|
||||||
#' @param x A SparkSQL DataFrame
|
#' @param x A SparkSQL DataFrame
|
||||||
#'
|
#'
|
||||||
#' @rdname dtypes
|
#' @rdname dtypes
|
||||||
#' @export
|
#' @export
|
||||||
#' @examples
|
#' @examples
|
||||||
|
@ -224,11 +224,11 @@ setMethod("dtypes",
|
||||||
})
|
})
|
||||||
|
|
||||||
#' Column names
|
#' Column names
|
||||||
#'
|
#'
|
||||||
#' Return all column names as a list
|
#' Return all column names as a list
|
||||||
#'
|
#'
|
||||||
#' @param x A SparkSQL DataFrame
|
#' @param x A SparkSQL DataFrame
|
||||||
#'
|
#'
|
||||||
#' @rdname columns
|
#' @rdname columns
|
||||||
#' @export
|
#' @export
|
||||||
#' @examples
|
#' @examples
|
||||||
|
@ -256,12 +256,12 @@ setMethod("names",
|
||||||
})
|
})
|
||||||
|
|
||||||
#' Register Temporary Table
|
#' Register Temporary Table
|
||||||
#'
|
#'
|
||||||
#' Registers a DataFrame as a Temporary Table in the SQLContext
|
#' Registers a DataFrame as a Temporary Table in the SQLContext
|
||||||
#'
|
#'
|
||||||
#' @param x A SparkSQL DataFrame
|
#' @param x A SparkSQL DataFrame
|
||||||
#' @param tableName A character vector containing the name of the table
|
#' @param tableName A character vector containing the name of the table
|
||||||
#'
|
#'
|
||||||
#' @rdname registerTempTable
|
#' @rdname registerTempTable
|
||||||
#' @export
|
#' @export
|
||||||
#' @examples
|
#' @examples
|
||||||
|
@ -306,11 +306,11 @@ setMethod("insertInto",
|
||||||
})
|
})
|
||||||
|
|
||||||
#' Cache
|
#' Cache
|
||||||
#'
|
#'
|
||||||
#' Persist with the default storage level (MEMORY_ONLY).
|
#' Persist with the default storage level (MEMORY_ONLY).
|
||||||
#'
|
#'
|
||||||
#' @param x A SparkSQL DataFrame
|
#' @param x A SparkSQL DataFrame
|
||||||
#'
|
#'
|
||||||
#' @rdname cache-methods
|
#' @rdname cache-methods
|
||||||
#' @export
|
#' @export
|
||||||
#' @examples
|
#' @examples
|
||||||
|
@ -400,7 +400,7 @@ setMethod("repartition",
|
||||||
signature(x = "DataFrame", numPartitions = "numeric"),
|
signature(x = "DataFrame", numPartitions = "numeric"),
|
||||||
function(x, numPartitions) {
|
function(x, numPartitions) {
|
||||||
sdf <- callJMethod(x@sdf, "repartition", numToInt(numPartitions))
|
sdf <- callJMethod(x@sdf, "repartition", numToInt(numPartitions))
|
||||||
dataFrame(sdf)
|
dataFrame(sdf)
|
||||||
})
|
})
|
||||||
|
|
||||||
# toJSON
|
# toJSON
|
||||||
|
@ -489,7 +489,7 @@ setMethod("distinct",
|
||||||
#' sqlContext <- sparkRSQL.init(sc)
|
#' sqlContext <- sparkRSQL.init(sc)
|
||||||
#' path <- "path/to/file.json"
|
#' path <- "path/to/file.json"
|
||||||
#' df <- jsonFile(sqlContext, path)
|
#' df <- jsonFile(sqlContext, path)
|
||||||
#' collect(sample(df, FALSE, 0.5))
|
#' collect(sample(df, FALSE, 0.5))
|
||||||
#' collect(sample(df, TRUE, 0.5))
|
#' collect(sample(df, TRUE, 0.5))
|
||||||
#'}
|
#'}
|
||||||
setMethod("sample",
|
setMethod("sample",
|
||||||
|
@ -513,11 +513,11 @@ setMethod("sample_frac",
|
||||||
})
|
})
|
||||||
|
|
||||||
#' Count
|
#' Count
|
||||||
#'
|
#'
|
||||||
#' Returns the number of rows in a DataFrame
|
#' Returns the number of rows in a DataFrame
|
||||||
#'
|
#'
|
||||||
#' @param x A SparkSQL DataFrame
|
#' @param x A SparkSQL DataFrame
|
||||||
#'
|
#'
|
||||||
#' @rdname count
|
#' @rdname count
|
||||||
#' @export
|
#' @export
|
||||||
#' @examples
|
#' @examples
|
||||||
|
@ -568,13 +568,13 @@ setMethod("collect",
|
||||||
})
|
})
|
||||||
|
|
||||||
#' Limit
|
#' Limit
|
||||||
#'
|
#'
|
||||||
#' Limit the resulting DataFrame to the number of rows specified.
|
#' Limit the resulting DataFrame to the number of rows specified.
|
||||||
#'
|
#'
|
||||||
#' @param x A SparkSQL DataFrame
|
#' @param x A SparkSQL DataFrame
|
||||||
#' @param num The number of rows to return
|
#' @param num The number of rows to return
|
||||||
#' @return A new DataFrame containing the number of rows specified.
|
#' @return A new DataFrame containing the number of rows specified.
|
||||||
#'
|
#'
|
||||||
#' @rdname limit
|
#' @rdname limit
|
||||||
#' @export
|
#' @export
|
||||||
#' @examples
|
#' @examples
|
||||||
|
@ -593,7 +593,7 @@ setMethod("limit",
|
||||||
})
|
})
|
||||||
|
|
||||||
#' Take the first NUM rows of a DataFrame and return a the results as a data.frame
|
#' Take the first NUM rows of a DataFrame and return a the results as a data.frame
|
||||||
#'
|
#'
|
||||||
#' @rdname take
|
#' @rdname take
|
||||||
#' @export
|
#' @export
|
||||||
#' @examples
|
#' @examples
|
||||||
|
@ -613,8 +613,8 @@ setMethod("take",
|
||||||
|
|
||||||
#' Head
|
#' Head
|
||||||
#'
|
#'
|
||||||
#' Return the first NUM rows of a DataFrame as a data.frame. If NUM is NULL,
|
#' Return the first NUM rows of a DataFrame as a data.frame. If NUM is NULL,
|
||||||
#' then head() returns the first 6 rows in keeping with the current data.frame
|
#' then head() returns the first 6 rows in keeping with the current data.frame
|
||||||
#' convention in R.
|
#' convention in R.
|
||||||
#'
|
#'
|
||||||
#' @param x A SparkSQL DataFrame
|
#' @param x A SparkSQL DataFrame
|
||||||
|
@ -659,11 +659,11 @@ setMethod("first",
|
||||||
})
|
})
|
||||||
|
|
||||||
# toRDD()
|
# toRDD()
|
||||||
#
|
#
|
||||||
# Converts a Spark DataFrame to an RDD while preserving column names.
|
# Converts a Spark DataFrame to an RDD while preserving column names.
|
||||||
#
|
#
|
||||||
# @param x A Spark DataFrame
|
# @param x A Spark DataFrame
|
||||||
#
|
#
|
||||||
# @rdname DataFrame
|
# @rdname DataFrame
|
||||||
# @export
|
# @export
|
||||||
# @examples
|
# @examples
|
||||||
|
@ -1167,7 +1167,7 @@ setMethod("where",
|
||||||
#'
|
#'
|
||||||
#' @param x A Spark DataFrame
|
#' @param x A Spark DataFrame
|
||||||
#' @param y A Spark DataFrame
|
#' @param y A Spark DataFrame
|
||||||
#' @param joinExpr (Optional) The expression used to perform the join. joinExpr must be a
|
#' @param joinExpr (Optional) The expression used to perform the join. joinExpr must be a
|
||||||
#' Column expression. If joinExpr is omitted, join() wil perform a Cartesian join
|
#' Column expression. If joinExpr is omitted, join() wil perform a Cartesian join
|
||||||
#' @param joinType The type of join to perform. The following join types are available:
|
#' @param joinType The type of join to perform. The following join types are available:
|
||||||
#' 'inner', 'outer', 'left_outer', 'right_outer', 'semijoin'. The default joinType is "inner".
|
#' 'inner', 'outer', 'left_outer', 'right_outer', 'semijoin'. The default joinType is "inner".
|
||||||
|
@ -1303,7 +1303,7 @@ setMethod("except",
|
||||||
#' @param source A name for external data source
|
#' @param source A name for external data source
|
||||||
#' @param mode One of 'append', 'overwrite', 'error', 'ignore'
|
#' @param mode One of 'append', 'overwrite', 'error', 'ignore'
|
||||||
#'
|
#'
|
||||||
#' @rdname write.df
|
#' @rdname write.df
|
||||||
#' @export
|
#' @export
|
||||||
#' @examples
|
#' @examples
|
||||||
#'\dontrun{
|
#'\dontrun{
|
||||||
|
@ -1401,7 +1401,7 @@ setMethod("saveAsTable",
|
||||||
#' @param col A string of name
|
#' @param col A string of name
|
||||||
#' @param ... Additional expressions
|
#' @param ... Additional expressions
|
||||||
#' @return A DataFrame
|
#' @return A DataFrame
|
||||||
#' @rdname describe
|
#' @rdname describe
|
||||||
#' @export
|
#' @export
|
||||||
#' @examples
|
#' @examples
|
||||||
#'\dontrun{
|
#'\dontrun{
|
||||||
|
@ -1444,7 +1444,7 @@ setMethod("describe",
|
||||||
#' This overwrites the how parameter.
|
#' This overwrites the how parameter.
|
||||||
#' @param cols Optional list of column names to consider.
|
#' @param cols Optional list of column names to consider.
|
||||||
#' @return A DataFrame
|
#' @return A DataFrame
|
||||||
#'
|
#'
|
||||||
#' @rdname nafunctions
|
#' @rdname nafunctions
|
||||||
#' @export
|
#' @export
|
||||||
#' @examples
|
#' @examples
|
||||||
|
@ -1465,7 +1465,7 @@ setMethod("dropna",
|
||||||
if (is.null(minNonNulls)) {
|
if (is.null(minNonNulls)) {
|
||||||
minNonNulls <- if (how == "any") { length(cols) } else { 1 }
|
minNonNulls <- if (how == "any") { length(cols) } else { 1 }
|
||||||
}
|
}
|
||||||
|
|
||||||
naFunctions <- callJMethod(x@sdf, "na")
|
naFunctions <- callJMethod(x@sdf, "na")
|
||||||
sdf <- callJMethod(naFunctions, "drop",
|
sdf <- callJMethod(naFunctions, "drop",
|
||||||
as.integer(minNonNulls), listToSeq(as.list(cols)))
|
as.integer(minNonNulls), listToSeq(as.list(cols)))
|
||||||
|
@ -1488,16 +1488,16 @@ setMethod("na.omit",
|
||||||
#' @param value Value to replace null values with.
|
#' @param value Value to replace null values with.
|
||||||
#' Should be an integer, numeric, character or named list.
|
#' Should be an integer, numeric, character or named list.
|
||||||
#' If the value is a named list, then cols is ignored and
|
#' If the value is a named list, then cols is ignored and
|
||||||
#' value must be a mapping from column name (character) to
|
#' value must be a mapping from column name (character) to
|
||||||
#' replacement value. The replacement value must be an
|
#' replacement value. The replacement value must be an
|
||||||
#' integer, numeric or character.
|
#' integer, numeric or character.
|
||||||
#' @param cols optional list of column names to consider.
|
#' @param cols optional list of column names to consider.
|
||||||
#' Columns specified in cols that do not have matching data
|
#' Columns specified in cols that do not have matching data
|
||||||
#' type are ignored. For example, if value is a character, and
|
#' type are ignored. For example, if value is a character, and
|
||||||
#' subset contains a non-character column, then the non-character
|
#' subset contains a non-character column, then the non-character
|
||||||
#' column is simply ignored.
|
#' column is simply ignored.
|
||||||
#' @return A DataFrame
|
#' @return A DataFrame
|
||||||
#'
|
#'
|
||||||
#' @rdname nafunctions
|
#' @rdname nafunctions
|
||||||
#' @export
|
#' @export
|
||||||
#' @examples
|
#' @examples
|
||||||
|
@ -1515,14 +1515,14 @@ setMethod("fillna",
|
||||||
if (!(class(value) %in% c("integer", "numeric", "character", "list"))) {
|
if (!(class(value) %in% c("integer", "numeric", "character", "list"))) {
|
||||||
stop("value should be an integer, numeric, charactor or named list.")
|
stop("value should be an integer, numeric, charactor or named list.")
|
||||||
}
|
}
|
||||||
|
|
||||||
if (class(value) == "list") {
|
if (class(value) == "list") {
|
||||||
# Check column names in the named list
|
# Check column names in the named list
|
||||||
colNames <- names(value)
|
colNames <- names(value)
|
||||||
if (length(colNames) == 0 || !all(colNames != "")) {
|
if (length(colNames) == 0 || !all(colNames != "")) {
|
||||||
stop("value should be an a named list with each name being a column name.")
|
stop("value should be an a named list with each name being a column name.")
|
||||||
}
|
}
|
||||||
|
|
||||||
# Convert to the named list to an environment to be passed to JVM
|
# Convert to the named list to an environment to be passed to JVM
|
||||||
valueMap <- new.env()
|
valueMap <- new.env()
|
||||||
for (col in colNames) {
|
for (col in colNames) {
|
||||||
|
@ -1533,19 +1533,19 @@ setMethod("fillna",
|
||||||
}
|
}
|
||||||
valueMap[[col]] <- v
|
valueMap[[col]] <- v
|
||||||
}
|
}
|
||||||
|
|
||||||
# When value is a named list, caller is expected not to pass in cols
|
# When value is a named list, caller is expected not to pass in cols
|
||||||
if (!is.null(cols)) {
|
if (!is.null(cols)) {
|
||||||
warning("When value is a named list, cols is ignored!")
|
warning("When value is a named list, cols is ignored!")
|
||||||
cols <- NULL
|
cols <- NULL
|
||||||
}
|
}
|
||||||
|
|
||||||
value <- valueMap
|
value <- valueMap
|
||||||
} else if (is.integer(value)) {
|
} else if (is.integer(value)) {
|
||||||
# Cast an integer to a numeric
|
# Cast an integer to a numeric
|
||||||
value <- as.numeric(value)
|
value <- as.numeric(value)
|
||||||
}
|
}
|
||||||
|
|
||||||
naFunctions <- callJMethod(x@sdf, "na")
|
naFunctions <- callJMethod(x@sdf, "na")
|
||||||
sdf <- if (length(cols) == 0) {
|
sdf <- if (length(cols) == 0) {
|
||||||
callJMethod(naFunctions, "fill", value)
|
callJMethod(naFunctions, "fill", value)
|
||||||
|
|
|
@ -48,7 +48,7 @@ setMethod("initialize", "RDD", function(.Object, jrdd, serializedMode,
|
||||||
# byte: The RDD stores data serialized in R.
|
# byte: The RDD stores data serialized in R.
|
||||||
# string: The RDD stores data as strings.
|
# string: The RDD stores data as strings.
|
||||||
# row: The RDD stores the serialized rows of a DataFrame.
|
# row: The RDD stores the serialized rows of a DataFrame.
|
||||||
|
|
||||||
# We use an environment to store mutable states inside an RDD object.
|
# We use an environment to store mutable states inside an RDD object.
|
||||||
# Note that R's call-by-value semantics makes modifying slots inside an
|
# Note that R's call-by-value semantics makes modifying slots inside an
|
||||||
# object (passed as an argument into a function, such as cache()) difficult:
|
# object (passed as an argument into a function, such as cache()) difficult:
|
||||||
|
@ -363,7 +363,7 @@ setMethod("collectPartition",
|
||||||
|
|
||||||
# @description
|
# @description
|
||||||
# \code{collectAsMap} returns a named list as a map that contains all of the elements
|
# \code{collectAsMap} returns a named list as a map that contains all of the elements
|
||||||
# in a key-value pair RDD.
|
# in a key-value pair RDD.
|
||||||
# @examples
|
# @examples
|
||||||
#\dontrun{
|
#\dontrun{
|
||||||
# sc <- sparkR.init()
|
# sc <- sparkR.init()
|
||||||
|
@ -666,7 +666,7 @@ setMethod("minimum",
|
||||||
# rdd <- parallelize(sc, 1:10)
|
# rdd <- parallelize(sc, 1:10)
|
||||||
# sumRDD(rdd) # 55
|
# sumRDD(rdd) # 55
|
||||||
#}
|
#}
|
||||||
# @rdname sumRDD
|
# @rdname sumRDD
|
||||||
# @aliases sumRDD,RDD
|
# @aliases sumRDD,RDD
|
||||||
setMethod("sumRDD",
|
setMethod("sumRDD",
|
||||||
signature(x = "RDD"),
|
signature(x = "RDD"),
|
||||||
|
@ -1090,11 +1090,11 @@ setMethod("sortBy",
|
||||||
# Return:
|
# Return:
|
||||||
# A list of the first N elements from the RDD in the specified order.
|
# A list of the first N elements from the RDD in the specified order.
|
||||||
#
|
#
|
||||||
takeOrderedElem <- function(x, num, ascending = TRUE) {
|
takeOrderedElem <- function(x, num, ascending = TRUE) {
|
||||||
if (num <= 0L) {
|
if (num <= 0L) {
|
||||||
return(list())
|
return(list())
|
||||||
}
|
}
|
||||||
|
|
||||||
partitionFunc <- function(part) {
|
partitionFunc <- function(part) {
|
||||||
if (num < length(part)) {
|
if (num < length(part)) {
|
||||||
# R limitation: order works only on primitive types!
|
# R limitation: order works only on primitive types!
|
||||||
|
@ -1152,7 +1152,7 @@ takeOrderedElem <- function(x, num, ascending = TRUE) {
|
||||||
# @aliases takeOrdered,RDD,RDD-method
|
# @aliases takeOrdered,RDD,RDD-method
|
||||||
setMethod("takeOrdered",
|
setMethod("takeOrdered",
|
||||||
signature(x = "RDD", num = "integer"),
|
signature(x = "RDD", num = "integer"),
|
||||||
function(x, num) {
|
function(x, num) {
|
||||||
takeOrderedElem(x, num)
|
takeOrderedElem(x, num)
|
||||||
})
|
})
|
||||||
|
|
||||||
|
@ -1173,7 +1173,7 @@ setMethod("takeOrdered",
|
||||||
# @aliases top,RDD,RDD-method
|
# @aliases top,RDD,RDD-method
|
||||||
setMethod("top",
|
setMethod("top",
|
||||||
signature(x = "RDD", num = "integer"),
|
signature(x = "RDD", num = "integer"),
|
||||||
function(x, num) {
|
function(x, num) {
|
||||||
takeOrderedElem(x, num, FALSE)
|
takeOrderedElem(x, num, FALSE)
|
||||||
})
|
})
|
||||||
|
|
||||||
|
@ -1181,7 +1181,7 @@ setMethod("top",
|
||||||
#
|
#
|
||||||
# Aggregate the elements of each partition, and then the results for all the
|
# Aggregate the elements of each partition, and then the results for all the
|
||||||
# partitions, using a given associative function and a neutral "zero value".
|
# partitions, using a given associative function and a neutral "zero value".
|
||||||
#
|
#
|
||||||
# @param x An RDD.
|
# @param x An RDD.
|
||||||
# @param zeroValue A neutral "zero value".
|
# @param zeroValue A neutral "zero value".
|
||||||
# @param op An associative function for the folding operation.
|
# @param op An associative function for the folding operation.
|
||||||
|
@ -1207,7 +1207,7 @@ setMethod("fold",
|
||||||
#
|
#
|
||||||
# Aggregate the elements of each partition, and then the results for all the
|
# Aggregate the elements of each partition, and then the results for all the
|
||||||
# partitions, using given combine functions and a neutral "zero value".
|
# partitions, using given combine functions and a neutral "zero value".
|
||||||
#
|
#
|
||||||
# @param x An RDD.
|
# @param x An RDD.
|
||||||
# @param zeroValue A neutral "zero value".
|
# @param zeroValue A neutral "zero value".
|
||||||
# @param seqOp A function to aggregate the RDD elements. It may return a different
|
# @param seqOp A function to aggregate the RDD elements. It may return a different
|
||||||
|
@ -1230,11 +1230,11 @@ setMethod("fold",
|
||||||
# @aliases aggregateRDD,RDD,RDD-method
|
# @aliases aggregateRDD,RDD,RDD-method
|
||||||
setMethod("aggregateRDD",
|
setMethod("aggregateRDD",
|
||||||
signature(x = "RDD", zeroValue = "ANY", seqOp = "ANY", combOp = "ANY"),
|
signature(x = "RDD", zeroValue = "ANY", seqOp = "ANY", combOp = "ANY"),
|
||||||
function(x, zeroValue, seqOp, combOp) {
|
function(x, zeroValue, seqOp, combOp) {
|
||||||
partitionFunc <- function(part) {
|
partitionFunc <- function(part) {
|
||||||
Reduce(seqOp, part, zeroValue)
|
Reduce(seqOp, part, zeroValue)
|
||||||
}
|
}
|
||||||
|
|
||||||
partitionList <- collect(lapplyPartition(x, partitionFunc),
|
partitionList <- collect(lapplyPartition(x, partitionFunc),
|
||||||
flatten = FALSE)
|
flatten = FALSE)
|
||||||
Reduce(combOp, partitionList, zeroValue)
|
Reduce(combOp, partitionList, zeroValue)
|
||||||
|
@ -1330,7 +1330,7 @@ setMethod("setName",
|
||||||
#\dontrun{
|
#\dontrun{
|
||||||
# sc <- sparkR.init()
|
# sc <- sparkR.init()
|
||||||
# rdd <- parallelize(sc, list("a", "b", "c", "d", "e"), 3L)
|
# rdd <- parallelize(sc, list("a", "b", "c", "d", "e"), 3L)
|
||||||
# collect(zipWithUniqueId(rdd))
|
# collect(zipWithUniqueId(rdd))
|
||||||
# # list(list("a", 0), list("b", 3), list("c", 1), list("d", 4), list("e", 2))
|
# # list(list("a", 0), list("b", 3), list("c", 1), list("d", 4), list("e", 2))
|
||||||
#}
|
#}
|
||||||
# @rdname zipWithUniqueId
|
# @rdname zipWithUniqueId
|
||||||
|
@ -1426,7 +1426,7 @@ setMethod("glom",
|
||||||
partitionFunc <- function(part) {
|
partitionFunc <- function(part) {
|
||||||
list(part)
|
list(part)
|
||||||
}
|
}
|
||||||
|
|
||||||
lapplyPartition(x, partitionFunc)
|
lapplyPartition(x, partitionFunc)
|
||||||
})
|
})
|
||||||
|
|
||||||
|
@ -1498,16 +1498,16 @@ setMethod("zipRDD",
|
||||||
# The jrdd's elements are of scala Tuple2 type. The serialized
|
# The jrdd's elements are of scala Tuple2 type. The serialized
|
||||||
# flag here is used for the elements inside the tuples.
|
# flag here is used for the elements inside the tuples.
|
||||||
rdd <- RDD(jrdd, getSerializedMode(rdds[[1]]))
|
rdd <- RDD(jrdd, getSerializedMode(rdds[[1]]))
|
||||||
|
|
||||||
mergePartitions(rdd, TRUE)
|
mergePartitions(rdd, TRUE)
|
||||||
})
|
})
|
||||||
|
|
||||||
# Cartesian product of this RDD and another one.
|
# Cartesian product of this RDD and another one.
|
||||||
#
|
#
|
||||||
# Return the Cartesian product of this RDD and another one,
|
# Return the Cartesian product of this RDD and another one,
|
||||||
# that is, the RDD of all pairs of elements (a, b) where a
|
# that is, the RDD of all pairs of elements (a, b) where a
|
||||||
# is in this and b is in other.
|
# is in this and b is in other.
|
||||||
#
|
#
|
||||||
# @param x An RDD.
|
# @param x An RDD.
|
||||||
# @param other An RDD.
|
# @param other An RDD.
|
||||||
# @return A new RDD which is the Cartesian product of these two RDDs.
|
# @return A new RDD which is the Cartesian product of these two RDDs.
|
||||||
|
@ -1515,7 +1515,7 @@ setMethod("zipRDD",
|
||||||
#\dontrun{
|
#\dontrun{
|
||||||
# sc <- sparkR.init()
|
# sc <- sparkR.init()
|
||||||
# rdd <- parallelize(sc, 1:2)
|
# rdd <- parallelize(sc, 1:2)
|
||||||
# sortByKey(cartesian(rdd, rdd))
|
# sortByKey(cartesian(rdd, rdd))
|
||||||
# # list(list(1, 1), list(1, 2), list(2, 1), list(2, 2))
|
# # list(list(1, 1), list(1, 2), list(2, 1), list(2, 2))
|
||||||
#}
|
#}
|
||||||
# @rdname cartesian
|
# @rdname cartesian
|
||||||
|
@ -1528,7 +1528,7 @@ setMethod("cartesian",
|
||||||
# The jrdd's elements are of scala Tuple2 type. The serialized
|
# The jrdd's elements are of scala Tuple2 type. The serialized
|
||||||
# flag here is used for the elements inside the tuples.
|
# flag here is used for the elements inside the tuples.
|
||||||
rdd <- RDD(jrdd, getSerializedMode(rdds[[1]]))
|
rdd <- RDD(jrdd, getSerializedMode(rdds[[1]]))
|
||||||
|
|
||||||
mergePartitions(rdd, FALSE)
|
mergePartitions(rdd, FALSE)
|
||||||
})
|
})
|
||||||
|
|
||||||
|
@ -1598,11 +1598,11 @@ setMethod("intersection",
|
||||||
|
|
||||||
# Zips an RDD's partitions with one (or more) RDD(s).
|
# Zips an RDD's partitions with one (or more) RDD(s).
|
||||||
# Same as zipPartitions in Spark.
|
# Same as zipPartitions in Spark.
|
||||||
#
|
#
|
||||||
# @param ... RDDs to be zipped.
|
# @param ... RDDs to be zipped.
|
||||||
# @param func A function to transform zipped partitions.
|
# @param func A function to transform zipped partitions.
|
||||||
# @return A new RDD by applying a function to the zipped partitions.
|
# @return A new RDD by applying a function to the zipped partitions.
|
||||||
# Assumes that all the RDDs have the *same number of partitions*, but
|
# Assumes that all the RDDs have the *same number of partitions*, but
|
||||||
# does *not* require them to have the same number of elements in each partition.
|
# does *not* require them to have the same number of elements in each partition.
|
||||||
# @examples
|
# @examples
|
||||||
#\dontrun{
|
#\dontrun{
|
||||||
|
@ -1610,7 +1610,7 @@ setMethod("intersection",
|
||||||
# rdd1 <- parallelize(sc, 1:2, 2L) # 1, 2
|
# rdd1 <- parallelize(sc, 1:2, 2L) # 1, 2
|
||||||
# rdd2 <- parallelize(sc, 1:4, 2L) # 1:2, 3:4
|
# rdd2 <- parallelize(sc, 1:4, 2L) # 1:2, 3:4
|
||||||
# rdd3 <- parallelize(sc, 1:6, 2L) # 1:3, 4:6
|
# rdd3 <- parallelize(sc, 1:6, 2L) # 1:3, 4:6
|
||||||
# collect(zipPartitions(rdd1, rdd2, rdd3,
|
# collect(zipPartitions(rdd1, rdd2, rdd3,
|
||||||
# func = function(x, y, z) { list(list(x, y, z))} ))
|
# func = function(x, y, z) { list(list(x, y, z))} ))
|
||||||
# # list(list(1, c(1,2), c(1,2,3)), list(2, c(3,4), c(4,5,6)))
|
# # list(list(1, c(1,2), c(1,2,3)), list(2, c(3,4), c(4,5,6)))
|
||||||
#}
|
#}
|
||||||
|
@ -1627,7 +1627,7 @@ setMethod("zipPartitions",
|
||||||
if (length(unique(nPart)) != 1) {
|
if (length(unique(nPart)) != 1) {
|
||||||
stop("Can only zipPartitions RDDs which have the same number of partitions.")
|
stop("Can only zipPartitions RDDs which have the same number of partitions.")
|
||||||
}
|
}
|
||||||
|
|
||||||
rrdds <- lapply(rrdds, function(rdd) {
|
rrdds <- lapply(rrdds, function(rdd) {
|
||||||
mapPartitionsWithIndex(rdd, function(partIndex, part) {
|
mapPartitionsWithIndex(rdd, function(partIndex, part) {
|
||||||
print(length(part))
|
print(length(part))
|
||||||
|
|
|
@ -182,7 +182,7 @@ setMethod("toDF", signature(x = "RDD"),
|
||||||
|
|
||||||
#' Create a DataFrame from a JSON file.
|
#' Create a DataFrame from a JSON file.
|
||||||
#'
|
#'
|
||||||
#' Loads a JSON file (one object per line), returning the result as a DataFrame
|
#' Loads a JSON file (one object per line), returning the result as a DataFrame
|
||||||
#' It goes through the entire dataset once to determine the schema.
|
#' It goes through the entire dataset once to determine the schema.
|
||||||
#'
|
#'
|
||||||
#' @param sqlContext SQLContext to use
|
#' @param sqlContext SQLContext to use
|
||||||
|
@ -238,7 +238,7 @@ jsonRDD <- function(sqlContext, rdd, schema = NULL, samplingRatio = 1.0) {
|
||||||
|
|
||||||
|
|
||||||
#' Create a DataFrame from a Parquet file.
|
#' Create a DataFrame from a Parquet file.
|
||||||
#'
|
#'
|
||||||
#' Loads a Parquet file, returning the result as a DataFrame.
|
#' Loads a Parquet file, returning the result as a DataFrame.
|
||||||
#'
|
#'
|
||||||
#' @param sqlContext SQLContext to use
|
#' @param sqlContext SQLContext to use
|
||||||
|
@ -278,7 +278,7 @@ sql <- function(sqlContext, sqlQuery) {
|
||||||
}
|
}
|
||||||
|
|
||||||
#' Create a DataFrame from a SparkSQL Table
|
#' Create a DataFrame from a SparkSQL Table
|
||||||
#'
|
#'
|
||||||
#' Returns the specified Table as a DataFrame. The Table must have already been registered
|
#' Returns the specified Table as a DataFrame. The Table must have already been registered
|
||||||
#' in the SQLContext.
|
#' in the SQLContext.
|
||||||
#'
|
#'
|
||||||
|
@ -298,7 +298,7 @@ sql <- function(sqlContext, sqlQuery) {
|
||||||
|
|
||||||
table <- function(sqlContext, tableName) {
|
table <- function(sqlContext, tableName) {
|
||||||
sdf <- callJMethod(sqlContext, "table", tableName)
|
sdf <- callJMethod(sqlContext, "table", tableName)
|
||||||
dataFrame(sdf)
|
dataFrame(sdf)
|
||||||
}
|
}
|
||||||
|
|
||||||
|
|
||||||
|
@ -352,7 +352,7 @@ tableNames <- function(sqlContext, databaseName = NULL) {
|
||||||
|
|
||||||
|
|
||||||
#' Cache Table
|
#' Cache Table
|
||||||
#'
|
#'
|
||||||
#' Caches the specified table in-memory.
|
#' Caches the specified table in-memory.
|
||||||
#'
|
#'
|
||||||
#' @param sqlContext SQLContext to use
|
#' @param sqlContext SQLContext to use
|
||||||
|
@ -370,11 +370,11 @@ tableNames <- function(sqlContext, databaseName = NULL) {
|
||||||
#' }
|
#' }
|
||||||
|
|
||||||
cacheTable <- function(sqlContext, tableName) {
|
cacheTable <- function(sqlContext, tableName) {
|
||||||
callJMethod(sqlContext, "cacheTable", tableName)
|
callJMethod(sqlContext, "cacheTable", tableName)
|
||||||
}
|
}
|
||||||
|
|
||||||
#' Uncache Table
|
#' Uncache Table
|
||||||
#'
|
#'
|
||||||
#' Removes the specified table from the in-memory cache.
|
#' Removes the specified table from the in-memory cache.
|
||||||
#'
|
#'
|
||||||
#' @param sqlContext SQLContext to use
|
#' @param sqlContext SQLContext to use
|
||||||
|
|
|
@ -27,9 +27,9 @@
|
||||||
# @description Broadcast variables can be created using the broadcast
|
# @description Broadcast variables can be created using the broadcast
|
||||||
# function from a \code{SparkContext}.
|
# function from a \code{SparkContext}.
|
||||||
# @rdname broadcast-class
|
# @rdname broadcast-class
|
||||||
# @seealso broadcast
|
# @seealso broadcast
|
||||||
#
|
#
|
||||||
# @param id Id of the backing Spark broadcast variable
|
# @param id Id of the backing Spark broadcast variable
|
||||||
# @export
|
# @export
|
||||||
setClass("Broadcast", slots = list(id = "character"))
|
setClass("Broadcast", slots = list(id = "character"))
|
||||||
|
|
||||||
|
@ -68,7 +68,7 @@ setMethod("value",
|
||||||
# variable on workers. Not intended for use outside the package.
|
# variable on workers. Not intended for use outside the package.
|
||||||
#
|
#
|
||||||
# @rdname broadcast-internal
|
# @rdname broadcast-internal
|
||||||
# @seealso broadcast, value
|
# @seealso broadcast, value
|
||||||
|
|
||||||
# @param bcastId The id of broadcast variable to set
|
# @param bcastId The id of broadcast variable to set
|
||||||
# @param value The value to be set
|
# @param value The value to be set
|
||||||
|
|
|
@ -18,7 +18,7 @@
|
||||||
# Utility functions to deserialize objects from Java.
|
# Utility functions to deserialize objects from Java.
|
||||||
|
|
||||||
# Type mapping from Java to R
|
# Type mapping from Java to R
|
||||||
#
|
#
|
||||||
# void -> NULL
|
# void -> NULL
|
||||||
# Int -> integer
|
# Int -> integer
|
||||||
# String -> character
|
# String -> character
|
||||||
|
|
|
@ -130,7 +130,7 @@ setGeneric("maximum", function(x) { standardGeneric("maximum") })
|
||||||
# @export
|
# @export
|
||||||
setGeneric("minimum", function(x) { standardGeneric("minimum") })
|
setGeneric("minimum", function(x) { standardGeneric("minimum") })
|
||||||
|
|
||||||
# @rdname sumRDD
|
# @rdname sumRDD
|
||||||
# @export
|
# @export
|
||||||
setGeneric("sumRDD", function(x) { standardGeneric("sumRDD") })
|
setGeneric("sumRDD", function(x) { standardGeneric("sumRDD") })
|
||||||
|
|
||||||
|
@ -219,7 +219,7 @@ setGeneric("zipRDD", function(x, other) { standardGeneric("zipRDD") })
|
||||||
|
|
||||||
# @rdname zipRDD
|
# @rdname zipRDD
|
||||||
# @export
|
# @export
|
||||||
setGeneric("zipPartitions", function(..., func) { standardGeneric("zipPartitions") },
|
setGeneric("zipPartitions", function(..., func) { standardGeneric("zipPartitions") },
|
||||||
signature = "...")
|
signature = "...")
|
||||||
|
|
||||||
# @rdname zipWithIndex
|
# @rdname zipWithIndex
|
||||||
|
@ -364,7 +364,7 @@ setGeneric("subtract",
|
||||||
|
|
||||||
# @rdname subtractByKey
|
# @rdname subtractByKey
|
||||||
# @export
|
# @export
|
||||||
setGeneric("subtractByKey",
|
setGeneric("subtractByKey",
|
||||||
function(x, other, numPartitions = 1) {
|
function(x, other, numPartitions = 1) {
|
||||||
standardGeneric("subtractByKey")
|
standardGeneric("subtractByKey")
|
||||||
})
|
})
|
||||||
|
@ -399,15 +399,15 @@ setGeneric("describe", function(x, col, ...) { standardGeneric("describe") })
|
||||||
#' @rdname nafunctions
|
#' @rdname nafunctions
|
||||||
#' @export
|
#' @export
|
||||||
setGeneric("dropna",
|
setGeneric("dropna",
|
||||||
function(x, how = c("any", "all"), minNonNulls = NULL, cols = NULL) {
|
function(x, how = c("any", "all"), minNonNulls = NULL, cols = NULL) {
|
||||||
standardGeneric("dropna")
|
standardGeneric("dropna")
|
||||||
})
|
})
|
||||||
|
|
||||||
#' @rdname nafunctions
|
#' @rdname nafunctions
|
||||||
#' @export
|
#' @export
|
||||||
setGeneric("na.omit",
|
setGeneric("na.omit",
|
||||||
function(x, how = c("any", "all"), minNonNulls = NULL, cols = NULL) {
|
function(x, how = c("any", "all"), minNonNulls = NULL, cols = NULL) {
|
||||||
standardGeneric("na.omit")
|
standardGeneric("na.omit")
|
||||||
})
|
})
|
||||||
|
|
||||||
#' @rdname schema
|
#' @rdname schema
|
||||||
|
@ -656,4 +656,3 @@ setGeneric("toRadians", function(x) { standardGeneric("toRadians") })
|
||||||
#' @rdname column
|
#' @rdname column
|
||||||
#' @export
|
#' @export
|
||||||
setGeneric("upper", function(x) { standardGeneric("upper") })
|
setGeneric("upper", function(x) { standardGeneric("upper") })
|
||||||
|
|
||||||
|
|
|
@ -136,4 +136,3 @@ createMethods <- function() {
|
||||||
}
|
}
|
||||||
|
|
||||||
createMethods()
|
createMethods()
|
||||||
|
|
||||||
|
|
|
@ -16,7 +16,7 @@
|
||||||
#
|
#
|
||||||
|
|
||||||
# References to objects that exist on the JVM backend
|
# References to objects that exist on the JVM backend
|
||||||
# are maintained using the jobj.
|
# are maintained using the jobj.
|
||||||
|
|
||||||
#' @include generics.R
|
#' @include generics.R
|
||||||
NULL
|
NULL
|
||||||
|
|
|
@ -784,7 +784,7 @@ setMethod("sortByKey",
|
||||||
newRDD <- partitionBy(x, numPartitions, rangePartitionFunc)
|
newRDD <- partitionBy(x, numPartitions, rangePartitionFunc)
|
||||||
lapplyPartition(newRDD, partitionFunc)
|
lapplyPartition(newRDD, partitionFunc)
|
||||||
})
|
})
|
||||||
|
|
||||||
# Subtract a pair RDD with another pair RDD.
|
# Subtract a pair RDD with another pair RDD.
|
||||||
#
|
#
|
||||||
# Return an RDD with the pairs from x whose keys are not in other.
|
# Return an RDD with the pairs from x whose keys are not in other.
|
||||||
|
@ -820,7 +820,7 @@ setMethod("subtractByKey",
|
||||||
})
|
})
|
||||||
|
|
||||||
# Return a subset of this RDD sampled by key.
|
# Return a subset of this RDD sampled by key.
|
||||||
#
|
#
|
||||||
# @description
|
# @description
|
||||||
# \code{sampleByKey} Create a sample of this RDD using variable sampling rates
|
# \code{sampleByKey} Create a sample of this RDD using variable sampling rates
|
||||||
# for different keys as specified by fractions, a key to sampling rate map.
|
# for different keys as specified by fractions, a key to sampling rate map.
|
||||||
|
|
|
@ -20,7 +20,7 @@
|
||||||
|
|
||||||
#' structType
|
#' structType
|
||||||
#'
|
#'
|
||||||
#' Create a structType object that contains the metadata for a DataFrame. Intended for
|
#' Create a structType object that contains the metadata for a DataFrame. Intended for
|
||||||
#' use with createDataFrame and toDF.
|
#' use with createDataFrame and toDF.
|
||||||
#'
|
#'
|
||||||
#' @param x a structField object (created with the field() function)
|
#' @param x a structField object (created with the field() function)
|
||||||
|
|
|
@ -175,7 +175,7 @@ writeGenericList <- function(con, list) {
|
||||||
writeObject(con, elem)
|
writeObject(con, elem)
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
# Used to pass in hash maps required on Java side.
|
# Used to pass in hash maps required on Java side.
|
||||||
writeEnv <- function(con, env) {
|
writeEnv <- function(con, env) {
|
||||||
len <- length(env)
|
len <- length(env)
|
||||||
|
|
|
@ -43,7 +43,7 @@ sparkR.stop <- function() {
|
||||||
callJMethod(sc, "stop")
|
callJMethod(sc, "stop")
|
||||||
rm(".sparkRjsc", envir = env)
|
rm(".sparkRjsc", envir = env)
|
||||||
}
|
}
|
||||||
|
|
||||||
if (exists(".backendLaunched", envir = env)) {
|
if (exists(".backendLaunched", envir = env)) {
|
||||||
callJStatic("SparkRHandler", "stopBackend")
|
callJStatic("SparkRHandler", "stopBackend")
|
||||||
}
|
}
|
||||||
|
@ -174,7 +174,7 @@ sparkR.init <- function(
|
||||||
for (varname in names(sparkEnvir)) {
|
for (varname in names(sparkEnvir)) {
|
||||||
sparkEnvirMap[[varname]] <- sparkEnvir[[varname]]
|
sparkEnvirMap[[varname]] <- sparkEnvir[[varname]]
|
||||||
}
|
}
|
||||||
|
|
||||||
sparkExecutorEnvMap <- new.env()
|
sparkExecutorEnvMap <- new.env()
|
||||||
if (!any(names(sparkExecutorEnv) == "LD_LIBRARY_PATH")) {
|
if (!any(names(sparkExecutorEnv) == "LD_LIBRARY_PATH")) {
|
||||||
sparkExecutorEnvMap[["LD_LIBRARY_PATH"]] <- paste0("$LD_LIBRARY_PATH:",Sys.getenv("LD_LIBRARY_PATH"))
|
sparkExecutorEnvMap[["LD_LIBRARY_PATH"]] <- paste0("$LD_LIBRARY_PATH:",Sys.getenv("LD_LIBRARY_PATH"))
|
||||||
|
@ -214,7 +214,7 @@ sparkR.init <- function(
|
||||||
|
|
||||||
#' Initialize a new SQLContext.
|
#' Initialize a new SQLContext.
|
||||||
#'
|
#'
|
||||||
#' This function creates a SparkContext from an existing JavaSparkContext and
|
#' This function creates a SparkContext from an existing JavaSparkContext and
|
||||||
#' then uses it to initialize a new SQLContext
|
#' then uses it to initialize a new SQLContext
|
||||||
#'
|
#'
|
||||||
#' @param jsc The existing JavaSparkContext created with SparkR.init()
|
#' @param jsc The existing JavaSparkContext created with SparkR.init()
|
||||||
|
|
|
@ -368,21 +368,21 @@ listToSeq <- function(l) {
|
||||||
}
|
}
|
||||||
|
|
||||||
# Utility function to recursively traverse the Abstract Syntax Tree (AST) of a
|
# Utility function to recursively traverse the Abstract Syntax Tree (AST) of a
|
||||||
# user defined function (UDF), and to examine variables in the UDF to decide
|
# user defined function (UDF), and to examine variables in the UDF to decide
|
||||||
# if their values should be included in the new function environment.
|
# if their values should be included in the new function environment.
|
||||||
# param
|
# param
|
||||||
# node The current AST node in the traversal.
|
# node The current AST node in the traversal.
|
||||||
# oldEnv The original function environment.
|
# oldEnv The original function environment.
|
||||||
# defVars An Accumulator of variables names defined in the function's calling environment,
|
# defVars An Accumulator of variables names defined in the function's calling environment,
|
||||||
# including function argument and local variable names.
|
# including function argument and local variable names.
|
||||||
# checkedFunc An environment of function objects examined during cleanClosure. It can
|
# checkedFunc An environment of function objects examined during cleanClosure. It can
|
||||||
# be considered as a "name"-to-"list of functions" mapping.
|
# be considered as a "name"-to-"list of functions" mapping.
|
||||||
# newEnv A new function environment to store necessary function dependencies, an output argument.
|
# newEnv A new function environment to store necessary function dependencies, an output argument.
|
||||||
processClosure <- function(node, oldEnv, defVars, checkedFuncs, newEnv) {
|
processClosure <- function(node, oldEnv, defVars, checkedFuncs, newEnv) {
|
||||||
nodeLen <- length(node)
|
nodeLen <- length(node)
|
||||||
|
|
||||||
if (nodeLen > 1 && typeof(node) == "language") {
|
if (nodeLen > 1 && typeof(node) == "language") {
|
||||||
# Recursive case: current AST node is an internal node, check for its children.
|
# Recursive case: current AST node is an internal node, check for its children.
|
||||||
if (length(node[[1]]) > 1) {
|
if (length(node[[1]]) > 1) {
|
||||||
for (i in 1:nodeLen) {
|
for (i in 1:nodeLen) {
|
||||||
processClosure(node[[i]], oldEnv, defVars, checkedFuncs, newEnv)
|
processClosure(node[[i]], oldEnv, defVars, checkedFuncs, newEnv)
|
||||||
|
@ -393,7 +393,7 @@ processClosure <- function(node, oldEnv, defVars, checkedFuncs, newEnv) {
|
||||||
for (i in 2:nodeLen) {
|
for (i in 2:nodeLen) {
|
||||||
processClosure(node[[i]], oldEnv, defVars, checkedFuncs, newEnv)
|
processClosure(node[[i]], oldEnv, defVars, checkedFuncs, newEnv)
|
||||||
}
|
}
|
||||||
} else if (nodeChar == "<-" || nodeChar == "=" ||
|
} else if (nodeChar == "<-" || nodeChar == "=" ||
|
||||||
nodeChar == "<<-") { # Assignment Ops.
|
nodeChar == "<<-") { # Assignment Ops.
|
||||||
defVar <- node[[2]]
|
defVar <- node[[2]]
|
||||||
if (length(defVar) == 1 && typeof(defVar) == "symbol") {
|
if (length(defVar) == 1 && typeof(defVar) == "symbol") {
|
||||||
|
@ -422,21 +422,21 @@ processClosure <- function(node, oldEnv, defVars, checkedFuncs, newEnv) {
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
} else if (nodeLen == 1 &&
|
} else if (nodeLen == 1 &&
|
||||||
(typeof(node) == "symbol" || typeof(node) == "language")) {
|
(typeof(node) == "symbol" || typeof(node) == "language")) {
|
||||||
# Base case: current AST node is a leaf node and a symbol or a function call.
|
# Base case: current AST node is a leaf node and a symbol or a function call.
|
||||||
nodeChar <- as.character(node)
|
nodeChar <- as.character(node)
|
||||||
if (!nodeChar %in% defVars$data) { # Not a function parameter or local variable.
|
if (!nodeChar %in% defVars$data) { # Not a function parameter or local variable.
|
||||||
func.env <- oldEnv
|
func.env <- oldEnv
|
||||||
topEnv <- parent.env(.GlobalEnv)
|
topEnv <- parent.env(.GlobalEnv)
|
||||||
# Search in function environment, and function's enclosing environments
|
# Search in function environment, and function's enclosing environments
|
||||||
# up to global environment. There is no need to look into package environments
|
# up to global environment. There is no need to look into package environments
|
||||||
# above the global or namespace environment that is not SparkR below the global,
|
# above the global or namespace environment that is not SparkR below the global,
|
||||||
# as they are assumed to be loaded on workers.
|
# as they are assumed to be loaded on workers.
|
||||||
while (!identical(func.env, topEnv)) {
|
while (!identical(func.env, topEnv)) {
|
||||||
# Namespaces other than "SparkR" will not be searched.
|
# Namespaces other than "SparkR" will not be searched.
|
||||||
if (!isNamespace(func.env) ||
|
if (!isNamespace(func.env) ||
|
||||||
(getNamespaceName(func.env) == "SparkR" &&
|
(getNamespaceName(func.env) == "SparkR" &&
|
||||||
!(nodeChar %in% getNamespaceExports("SparkR")))) { # Only include SparkR internals.
|
!(nodeChar %in% getNamespaceExports("SparkR")))) { # Only include SparkR internals.
|
||||||
# Set parameter 'inherits' to FALSE since we do not need to search in
|
# Set parameter 'inherits' to FALSE since we do not need to search in
|
||||||
# attached package environments.
|
# attached package environments.
|
||||||
|
@ -444,7 +444,7 @@ processClosure <- function(node, oldEnv, defVars, checkedFuncs, newEnv) {
|
||||||
error = function(e) { FALSE })) {
|
error = function(e) { FALSE })) {
|
||||||
obj <- get(nodeChar, envir = func.env, inherits = FALSE)
|
obj <- get(nodeChar, envir = func.env, inherits = FALSE)
|
||||||
if (is.function(obj)) { # If the node is a function call.
|
if (is.function(obj)) { # If the node is a function call.
|
||||||
funcList <- mget(nodeChar, envir = checkedFuncs, inherits = F,
|
funcList <- mget(nodeChar, envir = checkedFuncs, inherits = F,
|
||||||
ifnotfound = list(list(NULL)))[[1]]
|
ifnotfound = list(list(NULL)))[[1]]
|
||||||
found <- sapply(funcList, function(func) {
|
found <- sapply(funcList, function(func) {
|
||||||
ifelse(identical(func, obj), TRUE, FALSE)
|
ifelse(identical(func, obj), TRUE, FALSE)
|
||||||
|
@ -453,7 +453,7 @@ processClosure <- function(node, oldEnv, defVars, checkedFuncs, newEnv) {
|
||||||
break
|
break
|
||||||
}
|
}
|
||||||
# Function has not been examined, record it and recursively clean its closure.
|
# Function has not been examined, record it and recursively clean its closure.
|
||||||
assign(nodeChar,
|
assign(nodeChar,
|
||||||
if (is.null(funcList[[1]])) {
|
if (is.null(funcList[[1]])) {
|
||||||
list(obj)
|
list(obj)
|
||||||
} else {
|
} else {
|
||||||
|
@ -466,7 +466,7 @@ processClosure <- function(node, oldEnv, defVars, checkedFuncs, newEnv) {
|
||||||
break
|
break
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
# Continue to search in enclosure.
|
# Continue to search in enclosure.
|
||||||
func.env <- parent.env(func.env)
|
func.env <- parent.env(func.env)
|
||||||
}
|
}
|
||||||
|
@ -474,8 +474,8 @@ processClosure <- function(node, oldEnv, defVars, checkedFuncs, newEnv) {
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
# Utility function to get user defined function (UDF) dependencies (closure).
|
# Utility function to get user defined function (UDF) dependencies (closure).
|
||||||
# More specifically, this function captures the values of free variables defined
|
# More specifically, this function captures the values of free variables defined
|
||||||
# outside a UDF, and stores them in the function's environment.
|
# outside a UDF, and stores them in the function's environment.
|
||||||
# param
|
# param
|
||||||
# func A function whose closure needs to be captured.
|
# func A function whose closure needs to be captured.
|
||||||
|
@ -488,7 +488,7 @@ cleanClosure <- function(func, checkedFuncs = new.env()) {
|
||||||
newEnv <- new.env(parent = .GlobalEnv)
|
newEnv <- new.env(parent = .GlobalEnv)
|
||||||
func.body <- body(func)
|
func.body <- body(func)
|
||||||
oldEnv <- environment(func)
|
oldEnv <- environment(func)
|
||||||
# defVars is an Accumulator of variables names defined in the function's calling
|
# defVars is an Accumulator of variables names defined in the function's calling
|
||||||
# environment. First, function's arguments are added to defVars.
|
# environment. First, function's arguments are added to defVars.
|
||||||
defVars <- initAccumulator()
|
defVars <- initAccumulator()
|
||||||
argNames <- names(as.list(args(func)))
|
argNames <- names(as.list(args(func)))
|
||||||
|
@ -509,15 +509,15 @@ cleanClosure <- function(func, checkedFuncs = new.env()) {
|
||||||
# return value
|
# return value
|
||||||
# A list of two result RDDs.
|
# A list of two result RDDs.
|
||||||
appendPartitionLengths <- function(x, other) {
|
appendPartitionLengths <- function(x, other) {
|
||||||
if (getSerializedMode(x) != getSerializedMode(other) ||
|
if (getSerializedMode(x) != getSerializedMode(other) ||
|
||||||
getSerializedMode(x) == "byte") {
|
getSerializedMode(x) == "byte") {
|
||||||
# Append the number of elements in each partition to that partition so that we can later
|
# Append the number of elements in each partition to that partition so that we can later
|
||||||
# know the boundary of elements from x and other.
|
# know the boundary of elements from x and other.
|
||||||
#
|
#
|
||||||
# Note that this appending also serves the purpose of reserialization, because even if
|
# Note that this appending also serves the purpose of reserialization, because even if
|
||||||
# any RDD is serialized, we need to reserialize it to make sure its partitions are encoded
|
# any RDD is serialized, we need to reserialize it to make sure its partitions are encoded
|
||||||
# as a single byte array. For example, partitions of an RDD generated from partitionBy()
|
# as a single byte array. For example, partitions of an RDD generated from partitionBy()
|
||||||
# may be encoded as multiple byte arrays.
|
# may be encoded as multiple byte arrays.
|
||||||
appendLength <- function(part) {
|
appendLength <- function(part) {
|
||||||
len <- length(part)
|
len <- length(part)
|
||||||
part[[len + 1]] <- len + 1
|
part[[len + 1]] <- len + 1
|
||||||
|
@ -544,23 +544,23 @@ mergePartitions <- function(rdd, zip) {
|
||||||
lengthOfValues <- part[[len]]
|
lengthOfValues <- part[[len]]
|
||||||
lengthOfKeys <- part[[len - lengthOfValues]]
|
lengthOfKeys <- part[[len - lengthOfValues]]
|
||||||
stopifnot(len == lengthOfKeys + lengthOfValues)
|
stopifnot(len == lengthOfKeys + lengthOfValues)
|
||||||
|
|
||||||
# For zip operation, check if corresponding partitions of both RDDs have the same number of elements.
|
# For zip operation, check if corresponding partitions of both RDDs have the same number of elements.
|
||||||
if (zip && lengthOfKeys != lengthOfValues) {
|
if (zip && lengthOfKeys != lengthOfValues) {
|
||||||
stop("Can only zip RDDs with same number of elements in each pair of corresponding partitions.")
|
stop("Can only zip RDDs with same number of elements in each pair of corresponding partitions.")
|
||||||
}
|
}
|
||||||
|
|
||||||
if (lengthOfKeys > 1) {
|
if (lengthOfKeys > 1) {
|
||||||
keys <- part[1 : (lengthOfKeys - 1)]
|
keys <- part[1 : (lengthOfKeys - 1)]
|
||||||
} else {
|
} else {
|
||||||
keys <- list()
|
keys <- list()
|
||||||
}
|
}
|
||||||
if (lengthOfValues > 1) {
|
if (lengthOfValues > 1) {
|
||||||
values <- part[(lengthOfKeys + 1) : (len - 1)]
|
values <- part[(lengthOfKeys + 1) : (len - 1)]
|
||||||
} else {
|
} else {
|
||||||
values <- list()
|
values <- list()
|
||||||
}
|
}
|
||||||
|
|
||||||
if (!zip) {
|
if (!zip) {
|
||||||
return(mergeCompactLists(keys, values))
|
return(mergeCompactLists(keys, values))
|
||||||
}
|
}
|
||||||
|
@ -578,6 +578,6 @@ mergePartitions <- function(rdd, zip) {
|
||||||
part
|
part
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
PipelinedRDD(rdd, partitionFunc)
|
PipelinedRDD(rdd, partitionFunc)
|
||||||
}
|
}
|
||||||
|
|
|
@ -18,4 +18,3 @@
|
||||||
.onLoad <- function(libname, pkgname) {
|
.onLoad <- function(libname, pkgname) {
|
||||||
sparkR.onLoad(libname, pkgname)
|
sparkR.onLoad(libname, pkgname)
|
||||||
}
|
}
|
||||||
|
|
||||||
|
|
|
@ -59,15 +59,15 @@ test_that("saveAsObjectFile()/objectFile() following RDD transformations works",
|
||||||
wordCount <- lapply(words, function(word) { list(word, 1L) })
|
wordCount <- lapply(words, function(word) { list(word, 1L) })
|
||||||
|
|
||||||
counts <- reduceByKey(wordCount, "+", 2L)
|
counts <- reduceByKey(wordCount, "+", 2L)
|
||||||
|
|
||||||
saveAsObjectFile(counts, fileName2)
|
saveAsObjectFile(counts, fileName2)
|
||||||
counts <- objectFile(sc, fileName2)
|
counts <- objectFile(sc, fileName2)
|
||||||
|
|
||||||
output <- collect(counts)
|
output <- collect(counts)
|
||||||
expected <- list(list("awesome.", 1), list("Spark", 2), list("pretty.", 1),
|
expected <- list(list("awesome.", 1), list("Spark", 2), list("pretty.", 1),
|
||||||
list("is", 2))
|
list("is", 2))
|
||||||
expect_equal(sortKeyValueList(output), sortKeyValueList(expected))
|
expect_equal(sortKeyValueList(output), sortKeyValueList(expected))
|
||||||
|
|
||||||
unlink(fileName1)
|
unlink(fileName1)
|
||||||
unlink(fileName2, recursive = TRUE)
|
unlink(fileName2, recursive = TRUE)
|
||||||
})
|
})
|
||||||
|
@ -87,4 +87,3 @@ test_that("saveAsObjectFile()/objectFile() works with multiple paths", {
|
||||||
unlink(fileName1, recursive = TRUE)
|
unlink(fileName1, recursive = TRUE)
|
||||||
unlink(fileName2, recursive = TRUE)
|
unlink(fileName2, recursive = TRUE)
|
||||||
})
|
})
|
||||||
|
|
||||||
|
|
|
@ -30,7 +30,7 @@ mockFile <- c("Spark is pretty.", "Spark is awesome.")
|
||||||
test_that("union on two RDDs", {
|
test_that("union on two RDDs", {
|
||||||
actual <- collect(unionRDD(rdd, rdd))
|
actual <- collect(unionRDD(rdd, rdd))
|
||||||
expect_equal(actual, as.list(rep(nums, 2)))
|
expect_equal(actual, as.list(rep(nums, 2)))
|
||||||
|
|
||||||
fileName <- tempfile(pattern="spark-test", fileext=".tmp")
|
fileName <- tempfile(pattern="spark-test", fileext=".tmp")
|
||||||
writeLines(mockFile, fileName)
|
writeLines(mockFile, fileName)
|
||||||
|
|
||||||
|
@ -52,14 +52,14 @@ test_that("union on two RDDs", {
|
||||||
test_that("cogroup on two RDDs", {
|
test_that("cogroup on two RDDs", {
|
||||||
rdd1 <- parallelize(sc, list(list(1, 1), list(2, 4)))
|
rdd1 <- parallelize(sc, list(list(1, 1), list(2, 4)))
|
||||||
rdd2 <- parallelize(sc, list(list(1, 2), list(1, 3)))
|
rdd2 <- parallelize(sc, list(list(1, 2), list(1, 3)))
|
||||||
cogroup.rdd <- cogroup(rdd1, rdd2, numPartitions = 2L)
|
cogroup.rdd <- cogroup(rdd1, rdd2, numPartitions = 2L)
|
||||||
actual <- collect(cogroup.rdd)
|
actual <- collect(cogroup.rdd)
|
||||||
expect_equal(actual,
|
expect_equal(actual,
|
||||||
list(list(1, list(list(1), list(2, 3))), list(2, list(list(4), list()))))
|
list(list(1, list(list(1), list(2, 3))), list(2, list(list(4), list()))))
|
||||||
|
|
||||||
rdd1 <- parallelize(sc, list(list("a", 1), list("a", 4)))
|
rdd1 <- parallelize(sc, list(list("a", 1), list("a", 4)))
|
||||||
rdd2 <- parallelize(sc, list(list("b", 2), list("a", 3)))
|
rdd2 <- parallelize(sc, list(list("b", 2), list("a", 3)))
|
||||||
cogroup.rdd <- cogroup(rdd1, rdd2, numPartitions = 2L)
|
cogroup.rdd <- cogroup(rdd1, rdd2, numPartitions = 2L)
|
||||||
actual <- collect(cogroup.rdd)
|
actual <- collect(cogroup.rdd)
|
||||||
|
|
||||||
expected <- list(list("b", list(list(), list(2))), list("a", list(list(1, 4), list(3))))
|
expected <- list(list("b", list(list(), list(2))), list("a", list(list(1, 4), list(3))))
|
||||||
|
@ -71,31 +71,31 @@ test_that("zipPartitions() on RDDs", {
|
||||||
rdd1 <- parallelize(sc, 1:2, 2L) # 1, 2
|
rdd1 <- parallelize(sc, 1:2, 2L) # 1, 2
|
||||||
rdd2 <- parallelize(sc, 1:4, 2L) # 1:2, 3:4
|
rdd2 <- parallelize(sc, 1:4, 2L) # 1:2, 3:4
|
||||||
rdd3 <- parallelize(sc, 1:6, 2L) # 1:3, 4:6
|
rdd3 <- parallelize(sc, 1:6, 2L) # 1:3, 4:6
|
||||||
actual <- collect(zipPartitions(rdd1, rdd2, rdd3,
|
actual <- collect(zipPartitions(rdd1, rdd2, rdd3,
|
||||||
func = function(x, y, z) { list(list(x, y, z))} ))
|
func = function(x, y, z) { list(list(x, y, z))} ))
|
||||||
expect_equal(actual,
|
expect_equal(actual,
|
||||||
list(list(1, c(1,2), c(1,2,3)), list(2, c(3,4), c(4,5,6))))
|
list(list(1, c(1,2), c(1,2,3)), list(2, c(3,4), c(4,5,6))))
|
||||||
|
|
||||||
mockFile = c("Spark is pretty.", "Spark is awesome.")
|
mockFile = c("Spark is pretty.", "Spark is awesome.")
|
||||||
fileName <- tempfile(pattern="spark-test", fileext=".tmp")
|
fileName <- tempfile(pattern="spark-test", fileext=".tmp")
|
||||||
writeLines(mockFile, fileName)
|
writeLines(mockFile, fileName)
|
||||||
|
|
||||||
rdd <- textFile(sc, fileName, 1)
|
rdd <- textFile(sc, fileName, 1)
|
||||||
actual <- collect(zipPartitions(rdd, rdd,
|
actual <- collect(zipPartitions(rdd, rdd,
|
||||||
func = function(x, y) { list(paste(x, y, sep = "\n")) }))
|
func = function(x, y) { list(paste(x, y, sep = "\n")) }))
|
||||||
expected <- list(paste(mockFile, mockFile, sep = "\n"))
|
expected <- list(paste(mockFile, mockFile, sep = "\n"))
|
||||||
expect_equal(actual, expected)
|
expect_equal(actual, expected)
|
||||||
|
|
||||||
rdd1 <- parallelize(sc, 0:1, 1)
|
rdd1 <- parallelize(sc, 0:1, 1)
|
||||||
actual <- collect(zipPartitions(rdd1, rdd,
|
actual <- collect(zipPartitions(rdd1, rdd,
|
||||||
func = function(x, y) { list(x + nchar(y)) }))
|
func = function(x, y) { list(x + nchar(y)) }))
|
||||||
expected <- list(0:1 + nchar(mockFile))
|
expected <- list(0:1 + nchar(mockFile))
|
||||||
expect_equal(actual, expected)
|
expect_equal(actual, expected)
|
||||||
|
|
||||||
rdd <- map(rdd, function(x) { x })
|
rdd <- map(rdd, function(x) { x })
|
||||||
actual <- collect(zipPartitions(rdd, rdd1,
|
actual <- collect(zipPartitions(rdd, rdd1,
|
||||||
func = function(x, y) { list(y + nchar(x)) }))
|
func = function(x, y) { list(y + nchar(x)) }))
|
||||||
expect_equal(actual, expected)
|
expect_equal(actual, expected)
|
||||||
|
|
||||||
unlink(fileName)
|
unlink(fileName)
|
||||||
})
|
})
|
||||||
|
|
|
@ -477,7 +477,7 @@ test_that("cartesian() on RDDs", {
|
||||||
list(1, 1), list(1, 2), list(1, 3),
|
list(1, 1), list(1, 2), list(1, 3),
|
||||||
list(2, 1), list(2, 2), list(2, 3),
|
list(2, 1), list(2, 2), list(2, 3),
|
||||||
list(3, 1), list(3, 2), list(3, 3)))
|
list(3, 1), list(3, 2), list(3, 3)))
|
||||||
|
|
||||||
# test case where one RDD is empty
|
# test case where one RDD is empty
|
||||||
emptyRdd <- parallelize(sc, list())
|
emptyRdd <- parallelize(sc, list())
|
||||||
actual <- collect(cartesian(rdd, emptyRdd))
|
actual <- collect(cartesian(rdd, emptyRdd))
|
||||||
|
@ -486,7 +486,7 @@ test_that("cartesian() on RDDs", {
|
||||||
mockFile = c("Spark is pretty.", "Spark is awesome.")
|
mockFile = c("Spark is pretty.", "Spark is awesome.")
|
||||||
fileName <- tempfile(pattern="spark-test", fileext=".tmp")
|
fileName <- tempfile(pattern="spark-test", fileext=".tmp")
|
||||||
writeLines(mockFile, fileName)
|
writeLines(mockFile, fileName)
|
||||||
|
|
||||||
rdd <- textFile(sc, fileName)
|
rdd <- textFile(sc, fileName)
|
||||||
actual <- collect(cartesian(rdd, rdd))
|
actual <- collect(cartesian(rdd, rdd))
|
||||||
expected <- list(
|
expected <- list(
|
||||||
|
@ -495,7 +495,7 @@ test_that("cartesian() on RDDs", {
|
||||||
list("Spark is pretty.", "Spark is pretty."),
|
list("Spark is pretty.", "Spark is pretty."),
|
||||||
list("Spark is pretty.", "Spark is awesome."))
|
list("Spark is pretty.", "Spark is awesome."))
|
||||||
expect_equal(sortKeyValueList(actual), expected)
|
expect_equal(sortKeyValueList(actual), expected)
|
||||||
|
|
||||||
rdd1 <- parallelize(sc, 0:1)
|
rdd1 <- parallelize(sc, 0:1)
|
||||||
actual <- collect(cartesian(rdd1, rdd))
|
actual <- collect(cartesian(rdd1, rdd))
|
||||||
expect_equal(sortKeyValueList(actual),
|
expect_equal(sortKeyValueList(actual),
|
||||||
|
@ -504,11 +504,11 @@ test_that("cartesian() on RDDs", {
|
||||||
list(0, "Spark is awesome."),
|
list(0, "Spark is awesome."),
|
||||||
list(1, "Spark is pretty."),
|
list(1, "Spark is pretty."),
|
||||||
list(1, "Spark is awesome.")))
|
list(1, "Spark is awesome.")))
|
||||||
|
|
||||||
rdd1 <- map(rdd, function(x) { x })
|
rdd1 <- map(rdd, function(x) { x })
|
||||||
actual <- collect(cartesian(rdd, rdd1))
|
actual <- collect(cartesian(rdd, rdd1))
|
||||||
expect_equal(sortKeyValueList(actual), expected)
|
expect_equal(sortKeyValueList(actual), expected)
|
||||||
|
|
||||||
unlink(fileName)
|
unlink(fileName)
|
||||||
})
|
})
|
||||||
|
|
||||||
|
@ -760,7 +760,7 @@ test_that("collectAsMap() on a pairwise RDD", {
|
||||||
})
|
})
|
||||||
|
|
||||||
test_that("show()", {
|
test_that("show()", {
|
||||||
rdd <- parallelize(sc, list(1:10))
|
rdd <- parallelize(sc, list(1:10))
|
||||||
expect_output(show(rdd), "ParallelCollectionRDD\\[\\d+\\] at parallelize at RRDD\\.scala:\\d+")
|
expect_output(show(rdd), "ParallelCollectionRDD\\[\\d+\\] at parallelize at RRDD\\.scala:\\d+")
|
||||||
})
|
})
|
||||||
|
|
||||||
|
|
|
@ -106,39 +106,39 @@ test_that("aggregateByKey", {
|
||||||
zeroValue <- list(0, 0)
|
zeroValue <- list(0, 0)
|
||||||
seqOp <- function(x, y) { list(x[[1]] + y, x[[2]] + 1) }
|
seqOp <- function(x, y) { list(x[[1]] + y, x[[2]] + 1) }
|
||||||
combOp <- function(x, y) { list(x[[1]] + y[[1]], x[[2]] + y[[2]]) }
|
combOp <- function(x, y) { list(x[[1]] + y[[1]], x[[2]] + y[[2]]) }
|
||||||
aggregatedRDD <- aggregateByKey(rdd, zeroValue, seqOp, combOp, 2L)
|
aggregatedRDD <- aggregateByKey(rdd, zeroValue, seqOp, combOp, 2L)
|
||||||
|
|
||||||
actual <- collect(aggregatedRDD)
|
actual <- collect(aggregatedRDD)
|
||||||
|
|
||||||
expected <- list(list(1, list(3, 2)), list(2, list(7, 2)))
|
expected <- list(list(1, list(3, 2)), list(2, list(7, 2)))
|
||||||
expect_equal(sortKeyValueList(actual), sortKeyValueList(expected))
|
expect_equal(sortKeyValueList(actual), sortKeyValueList(expected))
|
||||||
|
|
||||||
# test aggregateByKey for string keys
|
# test aggregateByKey for string keys
|
||||||
rdd <- parallelize(sc, list(list("a", 1), list("a", 2), list("b", 3), list("b", 4)))
|
rdd <- parallelize(sc, list(list("a", 1), list("a", 2), list("b", 3), list("b", 4)))
|
||||||
|
|
||||||
zeroValue <- list(0, 0)
|
zeroValue <- list(0, 0)
|
||||||
seqOp <- function(x, y) { list(x[[1]] + y, x[[2]] + 1) }
|
seqOp <- function(x, y) { list(x[[1]] + y, x[[2]] + 1) }
|
||||||
combOp <- function(x, y) { list(x[[1]] + y[[1]], x[[2]] + y[[2]]) }
|
combOp <- function(x, y) { list(x[[1]] + y[[1]], x[[2]] + y[[2]]) }
|
||||||
aggregatedRDD <- aggregateByKey(rdd, zeroValue, seqOp, combOp, 2L)
|
aggregatedRDD <- aggregateByKey(rdd, zeroValue, seqOp, combOp, 2L)
|
||||||
|
|
||||||
actual <- collect(aggregatedRDD)
|
actual <- collect(aggregatedRDD)
|
||||||
|
|
||||||
expected <- list(list("a", list(3, 2)), list("b", list(7, 2)))
|
expected <- list(list("a", list(3, 2)), list("b", list(7, 2)))
|
||||||
expect_equal(sortKeyValueList(actual), sortKeyValueList(expected))
|
expect_equal(sortKeyValueList(actual), sortKeyValueList(expected))
|
||||||
})
|
})
|
||||||
|
|
||||||
test_that("foldByKey", {
|
test_that("foldByKey", {
|
||||||
# test foldByKey for int keys
|
# test foldByKey for int keys
|
||||||
folded <- foldByKey(intRdd, 0, "+", 2L)
|
folded <- foldByKey(intRdd, 0, "+", 2L)
|
||||||
|
|
||||||
actual <- collect(folded)
|
actual <- collect(folded)
|
||||||
|
|
||||||
expected <- list(list(2L, 101), list(1L, 199))
|
expected <- list(list(2L, 101), list(1L, 199))
|
||||||
expect_equal(sortKeyValueList(actual), sortKeyValueList(expected))
|
expect_equal(sortKeyValueList(actual), sortKeyValueList(expected))
|
||||||
|
|
||||||
# test foldByKey for double keys
|
# test foldByKey for double keys
|
||||||
folded <- foldByKey(doubleRdd, 0, "+", 2L)
|
folded <- foldByKey(doubleRdd, 0, "+", 2L)
|
||||||
|
|
||||||
actual <- collect(folded)
|
actual <- collect(folded)
|
||||||
|
|
||||||
expected <- list(list(1.5, 199), list(2.5, 101))
|
expected <- list(list(1.5, 199), list(2.5, 101))
|
||||||
|
@ -146,15 +146,15 @@ test_that("foldByKey", {
|
||||||
|
|
||||||
# test foldByKey for string keys
|
# test foldByKey for string keys
|
||||||
stringKeyPairs <- list(list("a", -1), list("b", 100), list("b", 1), list("a", 200))
|
stringKeyPairs <- list(list("a", -1), list("b", 100), list("b", 1), list("a", 200))
|
||||||
|
|
||||||
stringKeyRDD <- parallelize(sc, stringKeyPairs)
|
stringKeyRDD <- parallelize(sc, stringKeyPairs)
|
||||||
folded <- foldByKey(stringKeyRDD, 0, "+", 2L)
|
folded <- foldByKey(stringKeyRDD, 0, "+", 2L)
|
||||||
|
|
||||||
actual <- collect(folded)
|
actual <- collect(folded)
|
||||||
|
|
||||||
expected <- list(list("b", 101), list("a", 199))
|
expected <- list(list("b", 101), list("a", 199))
|
||||||
expect_equal(sortKeyValueList(actual), sortKeyValueList(expected))
|
expect_equal(sortKeyValueList(actual), sortKeyValueList(expected))
|
||||||
|
|
||||||
# test foldByKey for empty pair RDD
|
# test foldByKey for empty pair RDD
|
||||||
rdd <- parallelize(sc, list())
|
rdd <- parallelize(sc, list())
|
||||||
folded <- foldByKey(rdd, 0, "+", 2L)
|
folded <- foldByKey(rdd, 0, "+", 2L)
|
||||||
|
|
|
@ -67,7 +67,7 @@ test_that("structType and structField", {
|
||||||
expect_true(inherits(testField, "structField"))
|
expect_true(inherits(testField, "structField"))
|
||||||
expect_true(testField$name() == "a")
|
expect_true(testField$name() == "a")
|
||||||
expect_true(testField$nullable())
|
expect_true(testField$nullable())
|
||||||
|
|
||||||
testSchema <- structType(testField, structField("b", "integer"))
|
testSchema <- structType(testField, structField("b", "integer"))
|
||||||
expect_true(inherits(testSchema, "structType"))
|
expect_true(inherits(testSchema, "structType"))
|
||||||
expect_true(inherits(testSchema$fields()[[2]], "structField"))
|
expect_true(inherits(testSchema$fields()[[2]], "structField"))
|
||||||
|
@ -598,7 +598,7 @@ test_that("column functions", {
|
||||||
c3 <- lower(c) + upper(c) + first(c) + last(c)
|
c3 <- lower(c) + upper(c) + first(c) + last(c)
|
||||||
c4 <- approxCountDistinct(c) + countDistinct(c) + cast(c, "string")
|
c4 <- approxCountDistinct(c) + countDistinct(c) + cast(c, "string")
|
||||||
c5 <- n(c) + n_distinct(c)
|
c5 <- n(c) + n_distinct(c)
|
||||||
c5 <- acos(c) + asin(c) + atan(c) + cbrt(c)
|
c5 <- acos(c) + asin(c) + atan(c) + cbrt(c)
|
||||||
c6 <- ceiling(c) + cos(c) + cosh(c) + exp(c) + expm1(c)
|
c6 <- ceiling(c) + cos(c) + cosh(c) + exp(c) + expm1(c)
|
||||||
c7 <- floor(c) + log(c) + log10(c) + log1p(c) + rint(c)
|
c7 <- floor(c) + log(c) + log10(c) + log1p(c) + rint(c)
|
||||||
c8 <- sign(c) + sin(c) + sinh(c) + tan(c) + tanh(c)
|
c8 <- sign(c) + sin(c) + sinh(c) + tan(c) + tanh(c)
|
||||||
|
@ -829,7 +829,7 @@ test_that("dropna() on a DataFrame", {
|
||||||
rows <- collect(df)
|
rows <- collect(df)
|
||||||
|
|
||||||
# drop with columns
|
# drop with columns
|
||||||
|
|
||||||
expected <- rows[!is.na(rows$name),]
|
expected <- rows[!is.na(rows$name),]
|
||||||
actual <- collect(dropna(df, cols = "name"))
|
actual <- collect(dropna(df, cols = "name"))
|
||||||
expect_true(identical(expected, actual))
|
expect_true(identical(expected, actual))
|
||||||
|
@ -842,7 +842,7 @@ test_that("dropna() on a DataFrame", {
|
||||||
expect_true(identical(expected$age, actual$age))
|
expect_true(identical(expected$age, actual$age))
|
||||||
expect_true(identical(expected$height, actual$height))
|
expect_true(identical(expected$height, actual$height))
|
||||||
expect_true(identical(expected$name, actual$name))
|
expect_true(identical(expected$name, actual$name))
|
||||||
|
|
||||||
expected <- rows[!is.na(rows$age) & !is.na(rows$height),]
|
expected <- rows[!is.na(rows$age) & !is.na(rows$height),]
|
||||||
actual <- collect(dropna(df, cols = c("age", "height")))
|
actual <- collect(dropna(df, cols = c("age", "height")))
|
||||||
expect_true(identical(expected, actual))
|
expect_true(identical(expected, actual))
|
||||||
|
@ -850,7 +850,7 @@ test_that("dropna() on a DataFrame", {
|
||||||
expected <- rows[!is.na(rows$age) & !is.na(rows$height) & !is.na(rows$name),]
|
expected <- rows[!is.na(rows$age) & !is.na(rows$height) & !is.na(rows$name),]
|
||||||
actual <- collect(dropna(df))
|
actual <- collect(dropna(df))
|
||||||
expect_true(identical(expected, actual))
|
expect_true(identical(expected, actual))
|
||||||
|
|
||||||
# drop with how
|
# drop with how
|
||||||
|
|
||||||
expected <- rows[!is.na(rows$age) & !is.na(rows$height) & !is.na(rows$name),]
|
expected <- rows[!is.na(rows$age) & !is.na(rows$height) & !is.na(rows$name),]
|
||||||
|
@ -860,7 +860,7 @@ test_that("dropna() on a DataFrame", {
|
||||||
expected <- rows[!is.na(rows$age) | !is.na(rows$height) | !is.na(rows$name),]
|
expected <- rows[!is.na(rows$age) | !is.na(rows$height) | !is.na(rows$name),]
|
||||||
actual <- collect(dropna(df, "all"))
|
actual <- collect(dropna(df, "all"))
|
||||||
expect_true(identical(expected, actual))
|
expect_true(identical(expected, actual))
|
||||||
|
|
||||||
expected <- rows[!is.na(rows$age) & !is.na(rows$height) & !is.na(rows$name),]
|
expected <- rows[!is.na(rows$age) & !is.na(rows$height) & !is.na(rows$name),]
|
||||||
actual <- collect(dropna(df, "any"))
|
actual <- collect(dropna(df, "any"))
|
||||||
expect_true(identical(expected, actual))
|
expect_true(identical(expected, actual))
|
||||||
|
@ -872,14 +872,14 @@ test_that("dropna() on a DataFrame", {
|
||||||
expected <- rows[!is.na(rows$age) | !is.na(rows$height),]
|
expected <- rows[!is.na(rows$age) | !is.na(rows$height),]
|
||||||
actual <- collect(dropna(df, "all", cols = c("age", "height")))
|
actual <- collect(dropna(df, "all", cols = c("age", "height")))
|
||||||
expect_true(identical(expected, actual))
|
expect_true(identical(expected, actual))
|
||||||
|
|
||||||
# drop with threshold
|
# drop with threshold
|
||||||
|
|
||||||
expected <- rows[as.integer(!is.na(rows$age)) + as.integer(!is.na(rows$height)) >= 2,]
|
expected <- rows[as.integer(!is.na(rows$age)) + as.integer(!is.na(rows$height)) >= 2,]
|
||||||
actual <- collect(dropna(df, minNonNulls = 2, cols = c("age", "height")))
|
actual <- collect(dropna(df, minNonNulls = 2, cols = c("age", "height")))
|
||||||
expect_true(identical(expected, actual))
|
expect_true(identical(expected, actual))
|
||||||
|
|
||||||
expected <- rows[as.integer(!is.na(rows$age)) +
|
expected <- rows[as.integer(!is.na(rows$age)) +
|
||||||
as.integer(!is.na(rows$height)) +
|
as.integer(!is.na(rows$height)) +
|
||||||
as.integer(!is.na(rows$name)) >= 3,]
|
as.integer(!is.na(rows$name)) >= 3,]
|
||||||
actual <- collect(dropna(df, minNonNulls = 3, cols = c("name", "age", "height")))
|
actual <- collect(dropna(df, minNonNulls = 3, cols = c("name", "age", "height")))
|
||||||
|
@ -889,9 +889,9 @@ test_that("dropna() on a DataFrame", {
|
||||||
test_that("fillna() on a DataFrame", {
|
test_that("fillna() on a DataFrame", {
|
||||||
df <- jsonFile(sqlContext, jsonPathNa)
|
df <- jsonFile(sqlContext, jsonPathNa)
|
||||||
rows <- collect(df)
|
rows <- collect(df)
|
||||||
|
|
||||||
# fill with value
|
# fill with value
|
||||||
|
|
||||||
expected <- rows
|
expected <- rows
|
||||||
expected$age[is.na(expected$age)] <- 50
|
expected$age[is.na(expected$age)] <- 50
|
||||||
expected$height[is.na(expected$height)] <- 50.6
|
expected$height[is.na(expected$height)] <- 50.6
|
||||||
|
@ -912,7 +912,7 @@ test_that("fillna() on a DataFrame", {
|
||||||
expected$name[is.na(expected$name)] <- "unknown"
|
expected$name[is.na(expected$name)] <- "unknown"
|
||||||
actual <- collect(fillna(df, "unknown", c("age", "name")))
|
actual <- collect(fillna(df, "unknown", c("age", "name")))
|
||||||
expect_true(identical(expected, actual))
|
expect_true(identical(expected, actual))
|
||||||
|
|
||||||
# fill with named list
|
# fill with named list
|
||||||
|
|
||||||
expected <- rows
|
expected <- rows
|
||||||
|
@ -920,7 +920,7 @@ test_that("fillna() on a DataFrame", {
|
||||||
expected$height[is.na(expected$height)] <- 50.6
|
expected$height[is.na(expected$height)] <- 50.6
|
||||||
expected$name[is.na(expected$name)] <- "unknown"
|
expected$name[is.na(expected$name)] <- "unknown"
|
||||||
actual <- collect(fillna(df, list("age" = 50, "height" = 50.6, "name" = "unknown")))
|
actual <- collect(fillna(df, list("age" = 50, "height" = 50.6, "name" = "unknown")))
|
||||||
expect_true(identical(expected, actual))
|
expect_true(identical(expected, actual))
|
||||||
})
|
})
|
||||||
|
|
||||||
unlink(parquetPath)
|
unlink(parquetPath)
|
||||||
|
|
|
@ -64,4 +64,3 @@ test_that("take() gives back the original elements in correct count and order",
|
||||||
expect_true(length(take(numListRDD, 0)) == 0)
|
expect_true(length(take(numListRDD, 0)) == 0)
|
||||||
expect_true(length(take(numVectorRDD, 0)) == 0)
|
expect_true(length(take(numVectorRDD, 0)) == 0)
|
||||||
})
|
})
|
||||||
|
|
||||||
|
|
|
@ -58,7 +58,7 @@ test_that("textFile() word count works as expected", {
|
||||||
expected <- list(list("pretty.", 1), list("is", 2), list("awesome.", 1),
|
expected <- list(list("pretty.", 1), list("is", 2), list("awesome.", 1),
|
||||||
list("Spark", 2))
|
list("Spark", 2))
|
||||||
expect_equal(sortKeyValueList(output), sortKeyValueList(expected))
|
expect_equal(sortKeyValueList(output), sortKeyValueList(expected))
|
||||||
|
|
||||||
unlink(fileName)
|
unlink(fileName)
|
||||||
})
|
})
|
||||||
|
|
||||||
|
@ -115,13 +115,13 @@ test_that("textFile() and saveAsTextFile() word count works as expected", {
|
||||||
|
|
||||||
saveAsTextFile(counts, fileName2)
|
saveAsTextFile(counts, fileName2)
|
||||||
rdd <- textFile(sc, fileName2)
|
rdd <- textFile(sc, fileName2)
|
||||||
|
|
||||||
output <- collect(rdd)
|
output <- collect(rdd)
|
||||||
expected <- list(list("awesome.", 1), list("Spark", 2),
|
expected <- list(list("awesome.", 1), list("Spark", 2),
|
||||||
list("pretty.", 1), list("is", 2))
|
list("pretty.", 1), list("is", 2))
|
||||||
expectedStr <- lapply(expected, function(x) { toString(x) })
|
expectedStr <- lapply(expected, function(x) { toString(x) })
|
||||||
expect_equal(sortKeyValueList(output), sortKeyValueList(expectedStr))
|
expect_equal(sortKeyValueList(output), sortKeyValueList(expectedStr))
|
||||||
|
|
||||||
unlink(fileName1)
|
unlink(fileName1)
|
||||||
unlink(fileName2)
|
unlink(fileName2)
|
||||||
})
|
})
|
||||||
|
@ -159,4 +159,3 @@ test_that("Pipelined operations on RDDs created using textFile", {
|
||||||
|
|
||||||
unlink(fileName)
|
unlink(fileName)
|
||||||
})
|
})
|
||||||
|
|
||||||
|
|
|
@ -43,13 +43,13 @@ test_that("serializeToBytes on RDD", {
|
||||||
mockFile <- c("Spark is pretty.", "Spark is awesome.")
|
mockFile <- c("Spark is pretty.", "Spark is awesome.")
|
||||||
fileName <- tempfile(pattern="spark-test", fileext=".tmp")
|
fileName <- tempfile(pattern="spark-test", fileext=".tmp")
|
||||||
writeLines(mockFile, fileName)
|
writeLines(mockFile, fileName)
|
||||||
|
|
||||||
text.rdd <- textFile(sc, fileName)
|
text.rdd <- textFile(sc, fileName)
|
||||||
expect_true(getSerializedMode(text.rdd) == "string")
|
expect_true(getSerializedMode(text.rdd) == "string")
|
||||||
ser.rdd <- serializeToBytes(text.rdd)
|
ser.rdd <- serializeToBytes(text.rdd)
|
||||||
expect_equal(collect(ser.rdd), as.list(mockFile))
|
expect_equal(collect(ser.rdd), as.list(mockFile))
|
||||||
expect_true(getSerializedMode(ser.rdd) == "byte")
|
expect_true(getSerializedMode(ser.rdd) == "byte")
|
||||||
|
|
||||||
unlink(fileName)
|
unlink(fileName)
|
||||||
})
|
})
|
||||||
|
|
||||||
|
@ -64,7 +64,7 @@ test_that("cleanClosure on R functions", {
|
||||||
expect_equal(actual, y)
|
expect_equal(actual, y)
|
||||||
actual <- get("g", envir = env, inherits = FALSE)
|
actual <- get("g", envir = env, inherits = FALSE)
|
||||||
expect_equal(actual, g)
|
expect_equal(actual, g)
|
||||||
|
|
||||||
# Test for nested enclosures and package variables.
|
# Test for nested enclosures and package variables.
|
||||||
env2 <- new.env()
|
env2 <- new.env()
|
||||||
funcEnv <- new.env(parent = env2)
|
funcEnv <- new.env(parent = env2)
|
||||||
|
@ -106,7 +106,7 @@ test_that("cleanClosure on R functions", {
|
||||||
expect_equal(length(ls(env)), 1)
|
expect_equal(length(ls(env)), 1)
|
||||||
actual <- get("y", envir = env, inherits = FALSE)
|
actual <- get("y", envir = env, inherits = FALSE)
|
||||||
expect_equal(actual, y)
|
expect_equal(actual, y)
|
||||||
|
|
||||||
# Test for function (and variable) definitions.
|
# Test for function (and variable) definitions.
|
||||||
f <- function(x) {
|
f <- function(x) {
|
||||||
g <- function(y) { y * 2 }
|
g <- function(y) { y * 2 }
|
||||||
|
@ -115,7 +115,7 @@ test_that("cleanClosure on R functions", {
|
||||||
newF <- cleanClosure(f)
|
newF <- cleanClosure(f)
|
||||||
env <- environment(newF)
|
env <- environment(newF)
|
||||||
expect_equal(length(ls(env)), 0) # "y" and "g" should not be included.
|
expect_equal(length(ls(env)), 0) # "y" and "g" should not be included.
|
||||||
|
|
||||||
# Test for overriding variables in base namespace (Issue: SparkR-196).
|
# Test for overriding variables in base namespace (Issue: SparkR-196).
|
||||||
nums <- as.list(1:10)
|
nums <- as.list(1:10)
|
||||||
rdd <- parallelize(sc, nums, 2L)
|
rdd <- parallelize(sc, nums, 2L)
|
||||||
|
@ -128,7 +128,7 @@ test_that("cleanClosure on R functions", {
|
||||||
actual <- collect(lapply(rdd, f))
|
actual <- collect(lapply(rdd, f))
|
||||||
expected <- as.list(c(rep(FALSE, 4), rep(TRUE, 6)))
|
expected <- as.list(c(rep(FALSE, 4), rep(TRUE, 6)))
|
||||||
expect_equal(actual, expected)
|
expect_equal(actual, expected)
|
||||||
|
|
||||||
# Test for broadcast variables.
|
# Test for broadcast variables.
|
||||||
a <- matrix(nrow=10, ncol=10, data=rnorm(100))
|
a <- matrix(nrow=10, ncol=10, data=rnorm(100))
|
||||||
aBroadcast <- broadcast(sc, a)
|
aBroadcast <- broadcast(sc, a)
|
||||||
|
|
Loading…
Reference in a new issue