[SPARK-6824] Fill the docs for DataFrame API in SparkR

This patch also removes the RDD docs from being built as a part of roxygen just by the method to delete
" ' '" of " \#' ".

Author: hqzizania <qian.huang@intel.com>
Author: qhuang <qian.huang@intel.com>

Closes #5969 from hqzizania/R1 and squashes the following commits:

6d27696 [qhuang] fixes in NAMESPACE
eb4b095 [qhuang] remove more docs
6394579 [qhuang] remove RDD docs in generics.R
6813860 [hqzizania] Fill the docs for DataFrame API in SparkR
857220f [hqzizania] remove the pairRDD docs from being built as a part of roxygen
c045d64 [hqzizania] remove the RDD docs from being built as a part of roxygen

(cherry picked from commit 008a60dd37)
Signed-off-by: Shivaram Venkataraman <shivaram@cs.berkeley.edu>
This commit is contained in:
hqzizania 2015-05-08 11:25:04 -07:00 committed by Shivaram Venkataraman
parent 75fed0ca44
commit 4f01f5b563
9 changed files with 1609 additions and 1608 deletions

View file

@ -15,11 +15,11 @@ Suggests:
Description: R frontend for Spark Description: R frontend for Spark
License: Apache License (== 2.0) License: Apache License (== 2.0)
Collate: Collate:
'schema.R'
'generics.R' 'generics.R'
'jobj.R' 'jobj.R'
'RDD.R' 'RDD.R'
'pairRDD.R' 'pairRDD.R'
'schema.R'
'column.R' 'column.R'
'group.R' 'group.R'
'DataFrame.R' 'DataFrame.R'

View file

@ -26,7 +26,6 @@ exportMethods("cache",
"intersect", "intersect",
"isLocal", "isLocal",
"join", "join",
"length",
"limit", "limit",
"orderBy", "orderBy",
"names", "names",
@ -101,9 +100,6 @@ export("cacheTable",
"tables", "tables",
"uncacheTable") "uncacheTable")
export("sparkRSQL.init",
"sparkRHive.init")
export("structField", export("structField",
"structField.jobj", "structField.jobj",
"structField.character", "structField.character",

View file

@ -45,6 +45,9 @@ setMethod("initialize", "DataFrame", function(.Object, sdf, isCached) {
#' @rdname DataFrame #' @rdname DataFrame
#' @export #' @export
#'
#' @param sdf A Java object reference to the backing Scala DataFrame
#' @param isCached TRUE if the dataFrame is cached
dataFrame <- function(sdf, isCached = FALSE) { dataFrame <- function(sdf, isCached = FALSE) {
new("DataFrame", sdf, isCached) new("DataFrame", sdf, isCached)
} }
@ -244,7 +247,7 @@ setMethod("columns",
}) })
#' @rdname columns #' @rdname columns
#' @export #' @aliases names,DataFrame,function-method
setMethod("names", setMethod("names",
signature(x = "DataFrame"), signature(x = "DataFrame"),
function(x) { function(x) {
@ -399,23 +402,23 @@ setMethod("repartition",
dataFrame(sdf) dataFrame(sdf)
}) })
#' toJSON # toJSON
#' #
#' Convert the rows of a DataFrame into JSON objects and return an RDD where # Convert the rows of a DataFrame into JSON objects and return an RDD where
#' each element contains a JSON string. # each element contains a JSON string.
#' #
#' @param x A SparkSQL DataFrame #@param x A SparkSQL DataFrame
#' @return A StringRRDD of JSON objects # @return A StringRRDD of JSON objects
#' @rdname tojson # @rdname tojson
#' @export # @export
#' @examples # @examples
#'\dontrun{ #\dontrun{
#' sc <- sparkR.init() # sc <- sparkR.init()
#' sqlCtx <- sparkRSQL.init(sc) # sqlCtx <- sparkRSQL.init(sc)
#' path <- "path/to/file.json" # path <- "path/to/file.json"
#' df <- jsonFile(sqlCtx, path) # df <- jsonFile(sqlCtx, path)
#' newRDD <- toJSON(df) # newRDD <- toJSON(df)
#'} #}
setMethod("toJSON", setMethod("toJSON",
signature(x = "DataFrame"), signature(x = "DataFrame"),
function(x) { function(x) {
@ -578,8 +581,8 @@ setMethod("limit",
dataFrame(res) dataFrame(res)
}) })
# Take the first NUM rows of a DataFrame and return a the results as a data.frame #' Take the first NUM rows of a DataFrame and return a the results as a data.frame
#'
#' @rdname take #' @rdname take
#' @export #' @export
#' @examples #' @examples
@ -644,22 +647,22 @@ setMethod("first",
take(x, 1) take(x, 1)
}) })
#' toRDD() # toRDD()
#' #
#' Converts a Spark DataFrame to an RDD while preserving column names. # Converts a Spark DataFrame to an RDD while preserving column names.
#' #
#' @param x A Spark DataFrame # @param x A Spark DataFrame
#' #
#' @rdname DataFrame # @rdname DataFrame
#' @export # @export
#' @examples # @examples
#'\dontrun{ #\dontrun{
#' sc <- sparkR.init() # sc <- sparkR.init()
#' sqlCtx <- sparkRSQL.init(sc) # sqlCtx <- sparkRSQL.init(sc)
#' path <- "path/to/file.json" # path <- "path/to/file.json"
#' df <- jsonFile(sqlCtx, path) # df <- jsonFile(sqlCtx, path)
#' rdd <- toRDD(df) # rdd <- toRDD(df)
#' } # }
setMethod("toRDD", setMethod("toRDD",
signature(x = "DataFrame"), signature(x = "DataFrame"),
function(x) { function(x) {
@ -706,6 +709,7 @@ setMethod("groupBy",
#' #'
#' Compute aggregates by specifying a list of columns #' Compute aggregates by specifying a list of columns
#' #'
#' @param x a DataFrame
#' @rdname DataFrame #' @rdname DataFrame
#' @export #' @export
setMethod("agg", setMethod("agg",
@ -721,7 +725,7 @@ setMethod("agg",
# the requested map function. # # the requested map function. #
################################################################################### ###################################################################################
#' @rdname lapply # @rdname lapply
setMethod("lapply", setMethod("lapply",
signature(X = "DataFrame", FUN = "function"), signature(X = "DataFrame", FUN = "function"),
function(X, FUN) { function(X, FUN) {
@ -729,14 +733,14 @@ setMethod("lapply",
lapply(rdd, FUN) lapply(rdd, FUN)
}) })
#' @rdname lapply # @rdname lapply
setMethod("map", setMethod("map",
signature(X = "DataFrame", FUN = "function"), signature(X = "DataFrame", FUN = "function"),
function(X, FUN) { function(X, FUN) {
lapply(X, FUN) lapply(X, FUN)
}) })
#' @rdname flatMap # @rdname flatMap
setMethod("flatMap", setMethod("flatMap",
signature(X = "DataFrame", FUN = "function"), signature(X = "DataFrame", FUN = "function"),
function(X, FUN) { function(X, FUN) {
@ -744,7 +748,7 @@ setMethod("flatMap",
flatMap(rdd, FUN) flatMap(rdd, FUN)
}) })
#' @rdname lapplyPartition # @rdname lapplyPartition
setMethod("lapplyPartition", setMethod("lapplyPartition",
signature(X = "DataFrame", FUN = "function"), signature(X = "DataFrame", FUN = "function"),
function(X, FUN) { function(X, FUN) {
@ -752,14 +756,14 @@ setMethod("lapplyPartition",
lapplyPartition(rdd, FUN) lapplyPartition(rdd, FUN)
}) })
#' @rdname lapplyPartition # @rdname lapplyPartition
setMethod("mapPartitions", setMethod("mapPartitions",
signature(X = "DataFrame", FUN = "function"), signature(X = "DataFrame", FUN = "function"),
function(X, FUN) { function(X, FUN) {
lapplyPartition(X, FUN) lapplyPartition(X, FUN)
}) })
#' @rdname foreach # @rdname foreach
setMethod("foreach", setMethod("foreach",
signature(x = "DataFrame", func = "function"), signature(x = "DataFrame", func = "function"),
function(x, func) { function(x, func) {
@ -767,7 +771,7 @@ setMethod("foreach",
foreach(rdd, func) foreach(rdd, func)
}) })
#' @rdname foreach # @rdname foreach
setMethod("foreachPartition", setMethod("foreachPartition",
signature(x = "DataFrame", func = "function"), signature(x = "DataFrame", func = "function"),
function(x, func) { function(x, func) {
@ -788,6 +792,7 @@ setMethod("$", signature(x = "DataFrame"),
getColumn(x, name) getColumn(x, name)
}) })
#' @rdname select
setMethod("$<-", signature(x = "DataFrame"), setMethod("$<-", signature(x = "DataFrame"),
function(x, name, value) { function(x, name, value) {
stopifnot(class(value) == "Column" || is.null(value)) stopifnot(class(value) == "Column" || is.null(value))
@ -1009,7 +1014,7 @@ setMethod("sortDF",
}) })
#' @rdname sortDF #' @rdname sortDF
#' @export #' @aliases orderBy,DataFrame,function-method
setMethod("orderBy", setMethod("orderBy",
signature(x = "DataFrame", col = "characterOrColumn"), signature(x = "DataFrame", col = "characterOrColumn"),
function(x, col) { function(x, col) {
@ -1046,7 +1051,7 @@ setMethod("filter",
}) })
#' @rdname filter #' @rdname filter
#' @export #' @aliases where,DataFrame,function-method
setMethod("where", setMethod("where",
signature(x = "DataFrame", condition = "characterOrColumn"), signature(x = "DataFrame", condition = "characterOrColumn"),
function(x, condition) { function(x, condition) {

File diff suppressed because it is too large Load diff

View file

@ -150,21 +150,21 @@ createDataFrame <- function(sqlCtx, data, schema = NULL, samplingRatio = 1.0) {
dataFrame(sdf) dataFrame(sdf)
} }
#' toDF # toDF
#' #
#' Converts an RDD to a DataFrame by infer the types. # Converts an RDD to a DataFrame by infer the types.
#' #
#' @param x An RDD # @param x An RDD
#' #
#' @rdname DataFrame # @rdname DataFrame
#' @export # @export
#' @examples # @examples
#'\dontrun{ #\dontrun{
#' sc <- sparkR.init() # sc <- sparkR.init()
#' sqlCtx <- sparkRSQL.init(sc) # sqlCtx <- sparkRSQL.init(sc)
#' rdd <- lapply(parallelize(sc, 1:10), function(x) list(a=x, b=as.character(x))) # rdd <- lapply(parallelize(sc, 1:10), function(x) list(a=x, b=as.character(x)))
#' df <- toDF(rdd) # df <- toDF(rdd)
#' } # }
setGeneric("toDF", function(x, ...) { standardGeneric("toDF") }) setGeneric("toDF", function(x, ...) { standardGeneric("toDF") })
@ -207,23 +207,23 @@ jsonFile <- function(sqlCtx, path) {
} }
#' JSON RDD # JSON RDD
#' #
#' Loads an RDD storing one JSON object per string as a DataFrame. # Loads an RDD storing one JSON object per string as a DataFrame.
#' #
#' @param sqlCtx SQLContext to use # @param sqlCtx SQLContext to use
#' @param rdd An RDD of JSON string # @param rdd An RDD of JSON string
#' @param schema A StructType object to use as schema # @param schema A StructType object to use as schema
#' @param samplingRatio The ratio of simpling used to infer the schema # @param samplingRatio The ratio of simpling used to infer the schema
#' @return A DataFrame # @return A DataFrame
#' @export # @export
#' @examples # @examples
#'\dontrun{ #\dontrun{
#' sc <- sparkR.init() # sc <- sparkR.init()
#' sqlCtx <- sparkRSQL.init(sc) # sqlCtx <- sparkRSQL.init(sc)
#' rdd <- texFile(sc, "path/to/json") # rdd <- texFile(sc, "path/to/json")
#' df <- jsonRDD(sqlCtx, rdd) # df <- jsonRDD(sqlCtx, rdd)
#' } # }
# TODO: support schema # TODO: support schema
jsonRDD <- function(sqlCtx, rdd, schema = NULL, samplingRatio = 1.0) { jsonRDD <- function(sqlCtx, rdd, schema = NULL, samplingRatio = 1.0) {

View file

@ -23,21 +23,21 @@
.broadcastValues <- new.env() .broadcastValues <- new.env()
.broadcastIdToName <- new.env() .broadcastIdToName <- new.env()
#' @title S4 class that represents a Broadcast variable # @title S4 class that represents a Broadcast variable
#' @description Broadcast variables can be created using the broadcast # @description Broadcast variables can be created using the broadcast
#' function from a \code{SparkContext}. # function from a \code{SparkContext}.
#' @rdname broadcast-class # @rdname broadcast-class
#' @seealso broadcast # @seealso broadcast
#' #
#' @param id Id of the backing Spark broadcast variable # @param id Id of the backing Spark broadcast variable
#' @export # @export
setClass("Broadcast", slots = list(id = "character")) setClass("Broadcast", slots = list(id = "character"))
#' @rdname broadcast-class # @rdname broadcast-class
#' @param value Value of the broadcast variable # @param value Value of the broadcast variable
#' @param jBroadcastRef reference to the backing Java broadcast object # @param jBroadcastRef reference to the backing Java broadcast object
#' @param objName name of broadcasted object # @param objName name of broadcasted object
#' @export # @export
Broadcast <- function(id, value, jBroadcastRef, objName) { Broadcast <- function(id, value, jBroadcastRef, objName) {
.broadcastValues[[id]] <- value .broadcastValues[[id]] <- value
.broadcastNames[[as.character(objName)]] <- jBroadcastRef .broadcastNames[[as.character(objName)]] <- jBroadcastRef
@ -45,13 +45,13 @@ Broadcast <- function(id, value, jBroadcastRef, objName) {
new("Broadcast", id = id) new("Broadcast", id = id)
} }
#' @description # @description
#' \code{value} can be used to get the value of a broadcast variable inside # \code{value} can be used to get the value of a broadcast variable inside
#' a distributed function. # a distributed function.
#' #
#' @param bcast The broadcast variable to get # @param bcast The broadcast variable to get
#' @rdname broadcast # @rdname broadcast
#' @aliases value,Broadcast-method # @aliases value,Broadcast-method
setMethod("value", setMethod("value",
signature(bcast = "Broadcast"), signature(bcast = "Broadcast"),
function(bcast) { function(bcast) {
@ -62,24 +62,24 @@ setMethod("value",
} }
}) })
#' Internal function to set values of a broadcast variable. # Internal function to set values of a broadcast variable.
#' #
#' This function is used internally by Spark to set the value of a broadcast # This function is used internally by Spark to set the value of a broadcast
#' variable on workers. Not intended for use outside the package. # variable on workers. Not intended for use outside the package.
#' #
#' @rdname broadcast-internal # @rdname broadcast-internal
#' @seealso broadcast, value # @seealso broadcast, value
#' @param bcastId The id of broadcast variable to set # @param bcastId The id of broadcast variable to set
#' @param value The value to be set # @param value The value to be set
#' @export # @export
setBroadcastValue <- function(bcastId, value) { setBroadcastValue <- function(bcastId, value) {
bcastIdStr <- as.character(bcastId) bcastIdStr <- as.character(bcastId)
.broadcastValues[[bcastIdStr]] <- value .broadcastValues[[bcastIdStr]] <- value
} }
#' Helper function to clear the list of broadcast variables we know about # Helper function to clear the list of broadcast variables we know about
#' Should be called when the SparkR JVM backend is shutdown # Should be called when the SparkR JVM backend is shutdown
clearBroadcastVariables <- function() { clearBroadcastVariables <- function() {
bcasts <- ls(.broadcastNames) bcasts <- ls(.broadcastNames)
rm(list = bcasts, envir = .broadcastNames) rm(list = bcasts, envir = .broadcastNames)

View file

@ -25,27 +25,27 @@ getMinPartitions <- function(sc, minPartitions) {
as.integer(minPartitions) as.integer(minPartitions)
} }
#' Create an RDD from a text file. # Create an RDD from a text file.
#' #
#' This function reads a text file from HDFS, a local file system (available on all # This function reads a text file from HDFS, a local file system (available on all
#' nodes), or any Hadoop-supported file system URI, and creates an # nodes), or any Hadoop-supported file system URI, and creates an
#' RDD of strings from it. # RDD of strings from it.
#' #
#' @param sc SparkContext to use # @param sc SparkContext to use
#' @param path Path of file to read. A vector of multiple paths is allowed. # @param path Path of file to read. A vector of multiple paths is allowed.
#' @param minPartitions Minimum number of partitions to be created. If NULL, the default # @param minPartitions Minimum number of partitions to be created. If NULL, the default
#' value is chosen based on available parallelism. # value is chosen based on available parallelism.
#' @return RDD where each item is of type \code{character} # @return RDD where each item is of type \code{character}
#' @export # @export
#' @examples # @examples
#'\dontrun{ #\dontrun{
#' sc <- sparkR.init() # sc <- sparkR.init()
#' lines <- textFile(sc, "myfile.txt") # lines <- textFile(sc, "myfile.txt")
#'} #}
textFile <- function(sc, path, minPartitions = NULL) { textFile <- function(sc, path, minPartitions = NULL) {
# Allow the user to have a more flexible definiton of the text file path # Allow the user to have a more flexible definiton of the text file path
path <- suppressWarnings(normalizePath(path)) path <- suppressWarnings(normalizePath(path))
#' Convert a string vector of paths to a string containing comma separated paths # Convert a string vector of paths to a string containing comma separated paths
path <- paste(path, collapse = ",") path <- paste(path, collapse = ",")
jrdd <- callJMethod(sc, "textFile", path, getMinPartitions(sc, minPartitions)) jrdd <- callJMethod(sc, "textFile", path, getMinPartitions(sc, minPartitions))
@ -53,27 +53,27 @@ textFile <- function(sc, path, minPartitions = NULL) {
RDD(jrdd, "string") RDD(jrdd, "string")
} }
#' Load an RDD saved as a SequenceFile containing serialized objects. # Load an RDD saved as a SequenceFile containing serialized objects.
#' #
#' The file to be loaded should be one that was previously generated by calling # The file to be loaded should be one that was previously generated by calling
#' saveAsObjectFile() of the RDD class. # saveAsObjectFile() of the RDD class.
#' #
#' @param sc SparkContext to use # @param sc SparkContext to use
#' @param path Path of file to read. A vector of multiple paths is allowed. # @param path Path of file to read. A vector of multiple paths is allowed.
#' @param minPartitions Minimum number of partitions to be created. If NULL, the default # @param minPartitions Minimum number of partitions to be created. If NULL, the default
#' value is chosen based on available parallelism. # value is chosen based on available parallelism.
#' @return RDD containing serialized R objects. # @return RDD containing serialized R objects.
#' @seealso saveAsObjectFile # @seealso saveAsObjectFile
#' @export # @export
#' @examples # @examples
#'\dontrun{ #\dontrun{
#' sc <- sparkR.init() # sc <- sparkR.init()
#' rdd <- objectFile(sc, "myfile") # rdd <- objectFile(sc, "myfile")
#'} #}
objectFile <- function(sc, path, minPartitions = NULL) { objectFile <- function(sc, path, minPartitions = NULL) {
# Allow the user to have a more flexible definiton of the text file path # Allow the user to have a more flexible definiton of the text file path
path <- suppressWarnings(normalizePath(path)) path <- suppressWarnings(normalizePath(path))
#' Convert a string vector of paths to a string containing comma separated paths # Convert a string vector of paths to a string containing comma separated paths
path <- paste(path, collapse = ",") path <- paste(path, collapse = ",")
jrdd <- callJMethod(sc, "objectFile", path, getMinPartitions(sc, minPartitions)) jrdd <- callJMethod(sc, "objectFile", path, getMinPartitions(sc, minPartitions))
@ -81,24 +81,24 @@ objectFile <- function(sc, path, minPartitions = NULL) {
RDD(jrdd, "byte") RDD(jrdd, "byte")
} }
#' Create an RDD from a homogeneous list or vector. # Create an RDD from a homogeneous list or vector.
#' #
#' This function creates an RDD from a local homogeneous list in R. The elements # This function creates an RDD from a local homogeneous list in R. The elements
#' in the list are split into \code{numSlices} slices and distributed to nodes # in the list are split into \code{numSlices} slices and distributed to nodes
#' in the cluster. # in the cluster.
#' #
#' @param sc SparkContext to use # @param sc SparkContext to use
#' @param coll collection to parallelize # @param coll collection to parallelize
#' @param numSlices number of partitions to create in the RDD # @param numSlices number of partitions to create in the RDD
#' @return an RDD created from this collection # @return an RDD created from this collection
#' @export # @export
#' @examples # @examples
#'\dontrun{ #\dontrun{
#' sc <- sparkR.init() # sc <- sparkR.init()
#' rdd <- parallelize(sc, 1:10, 2) # rdd <- parallelize(sc, 1:10, 2)
#' # The RDD should contain 10 elements # # The RDD should contain 10 elements
#' length(rdd) # length(rdd)
#'} #}
parallelize <- function(sc, coll, numSlices = 1) { parallelize <- function(sc, coll, numSlices = 1) {
# TODO: bound/safeguard numSlices # TODO: bound/safeguard numSlices
# TODO: unit tests for if the split works for all primitives # TODO: unit tests for if the split works for all primitives
@ -133,33 +133,33 @@ parallelize <- function(sc, coll, numSlices = 1) {
RDD(jrdd, "byte") RDD(jrdd, "byte")
} }
#' Include this specified package on all workers # Include this specified package on all workers
#' #
#' This function can be used to include a package on all workers before the # This function can be used to include a package on all workers before the
#' user's code is executed. This is useful in scenarios where other R package # user's code is executed. This is useful in scenarios where other R package
#' functions are used in a function passed to functions like \code{lapply}. # functions are used in a function passed to functions like \code{lapply}.
#' NOTE: The package is assumed to be installed on every node in the Spark # NOTE: The package is assumed to be installed on every node in the Spark
#' cluster. # cluster.
#' #
#' @param sc SparkContext to use # @param sc SparkContext to use
#' @param pkg Package name # @param pkg Package name
#' #
#' @export # @export
#' @examples # @examples
#'\dontrun{ #\dontrun{
#' library(Matrix) # library(Matrix)
#' #
#' sc <- sparkR.init() # sc <- sparkR.init()
#' # Include the matrix library we will be using # # Include the matrix library we will be using
#' includePackage(sc, Matrix) # includePackage(sc, Matrix)
#' #
#' generateSparse <- function(x) { # generateSparse <- function(x) {
#' sparseMatrix(i=c(1, 2, 3), j=c(1, 2, 3), x=c(1, 2, 3)) # sparseMatrix(i=c(1, 2, 3), j=c(1, 2, 3), x=c(1, 2, 3))
#' } # }
#' #
#' rdd <- lapplyPartition(parallelize(sc, 1:2, 2L), generateSparse) # rdd <- lapplyPartition(parallelize(sc, 1:2, 2L), generateSparse)
#' collect(rdd) # collect(rdd)
#'} #}
includePackage <- function(sc, pkg) { includePackage <- function(sc, pkg) {
pkg <- as.character(substitute(pkg)) pkg <- as.character(substitute(pkg))
if (exists(".packages", .sparkREnv)) { if (exists(".packages", .sparkREnv)) {
@ -171,30 +171,30 @@ includePackage <- function(sc, pkg) {
.sparkREnv$.packages <- packages .sparkREnv$.packages <- packages
} }
#' @title Broadcast a variable to all workers # @title Broadcast a variable to all workers
#' #
#' @description # @description
#' Broadcast a read-only variable to the cluster, returning a \code{Broadcast} # Broadcast a read-only variable to the cluster, returning a \code{Broadcast}
#' object for reading it in distributed functions. # object for reading it in distributed functions.
#' #
#' @param sc Spark Context to use # @param sc Spark Context to use
#' @param object Object to be broadcast # @param object Object to be broadcast
#' @export # @export
#' @examples # @examples
#'\dontrun{ #\dontrun{
#' sc <- sparkR.init() # sc <- sparkR.init()
#' rdd <- parallelize(sc, 1:2, 2L) # rdd <- parallelize(sc, 1:2, 2L)
#' #
#' # Large Matrix object that we want to broadcast # # Large Matrix object that we want to broadcast
#' randomMat <- matrix(nrow=100, ncol=10, data=rnorm(1000)) # randomMat <- matrix(nrow=100, ncol=10, data=rnorm(1000))
#' randomMatBr <- broadcast(sc, randomMat) # randomMatBr <- broadcast(sc, randomMat)
#' #
#' # Use the broadcast variable inside the function # # Use the broadcast variable inside the function
#' useBroadcast <- function(x) { # useBroadcast <- function(x) {
#' sum(value(randomMatBr) * x) # sum(value(randomMatBr) * x)
#' } # }
#' sumRDD <- lapply(rdd, useBroadcast) # sumRDD <- lapply(rdd, useBroadcast)
#'} #}
broadcast <- function(sc, object) { broadcast <- function(sc, object) {
objName <- as.character(substitute(object)) objName <- as.character(substitute(object))
serializedObj <- serialize(object, connection = NULL) serializedObj <- serialize(object, connection = NULL)
@ -205,21 +205,21 @@ broadcast <- function(sc, object) {
Broadcast(id, object, jBroadcast, objName) Broadcast(id, object, jBroadcast, objName)
} }
#' @title Set the checkpoint directory # @title Set the checkpoint directory
#' #
#' Set the directory under which RDDs are going to be checkpointed. The # Set the directory under which RDDs are going to be checkpointed. The
#' directory must be a HDFS path if running on a cluster. # directory must be a HDFS path if running on a cluster.
#' #
#' @param sc Spark Context to use # @param sc Spark Context to use
#' @param dirName Directory path # @param dirName Directory path
#' @export # @export
#' @examples # @examples
#'\dontrun{ #\dontrun{
#' sc <- sparkR.init() # sc <- sparkR.init()
#' setCheckpointDir(sc, "~/checkpoint") # setCheckpointDir(sc, "~/checkpoint")
#' rdd <- parallelize(sc, 1:2, 2L) # rdd <- parallelize(sc, 1:2, 2L)
#' checkpoint(rdd) # checkpoint(rdd)
#'} #}
setCheckpointDir <- function(sc, dirName) { setCheckpointDir <- function(sc, dirName) {
invisible(callJMethod(sc, "setCheckpointDir", suppressWarnings(normalizePath(dirName)))) invisible(callJMethod(sc, "setCheckpointDir", suppressWarnings(normalizePath(dirName))))
} }

View file

@ -17,353 +17,353 @@
############ RDD Actions and Transformations ############ ############ RDD Actions and Transformations ############
#' @rdname aggregateRDD # @rdname aggregateRDD
#' @seealso reduce # @seealso reduce
#' @export # @export
setGeneric("aggregateRDD", function(x, zeroValue, seqOp, combOp) { standardGeneric("aggregateRDD") }) setGeneric("aggregateRDD", function(x, zeroValue, seqOp, combOp) { standardGeneric("aggregateRDD") })
#' @rdname cache-methods # @rdname cache-methods
#' @export # @export
setGeneric("cache", function(x) { standardGeneric("cache") }) setGeneric("cache", function(x) { standardGeneric("cache") })
#' @rdname coalesce # @rdname coalesce
#' @seealso repartition # @seealso repartition
#' @export # @export
setGeneric("coalesce", function(x, numPartitions, ...) { standardGeneric("coalesce") }) setGeneric("coalesce", function(x, numPartitions, ...) { standardGeneric("coalesce") })
#' @rdname checkpoint-methods # @rdname checkpoint-methods
#' @export # @export
setGeneric("checkpoint", function(x) { standardGeneric("checkpoint") }) setGeneric("checkpoint", function(x) { standardGeneric("checkpoint") })
#' @rdname collect-methods # @rdname collect-methods
#' @export # @export
setGeneric("collect", function(x, ...) { standardGeneric("collect") }) setGeneric("collect", function(x, ...) { standardGeneric("collect") })
#' @rdname collect-methods # @rdname collect-methods
#' @export # @export
setGeneric("collectAsMap", function(x) { standardGeneric("collectAsMap") }) setGeneric("collectAsMap", function(x) { standardGeneric("collectAsMap") })
#' @rdname collect-methods # @rdname collect-methods
#' @export # @export
setGeneric("collectPartition", setGeneric("collectPartition",
function(x, partitionId) { function(x, partitionId) {
standardGeneric("collectPartition") standardGeneric("collectPartition")
}) })
#' @rdname count # @rdname count
#' @export # @export
setGeneric("count", function(x) { standardGeneric("count") }) setGeneric("count", function(x) { standardGeneric("count") })
#' @rdname countByValue # @rdname countByValue
#' @export # @export
setGeneric("countByValue", function(x) { standardGeneric("countByValue") }) setGeneric("countByValue", function(x) { standardGeneric("countByValue") })
#' @rdname distinct # @rdname distinct
#' @export # @export
setGeneric("distinct", function(x, numPartitions = 1) { standardGeneric("distinct") }) setGeneric("distinct", function(x, numPartitions = 1) { standardGeneric("distinct") })
#' @rdname filterRDD # @rdname filterRDD
#' @export # @export
setGeneric("filterRDD", function(x, f) { standardGeneric("filterRDD") }) setGeneric("filterRDD", function(x, f) { standardGeneric("filterRDD") })
#' @rdname first # @rdname first
#' @export # @export
setGeneric("first", function(x) { standardGeneric("first") }) setGeneric("first", function(x) { standardGeneric("first") })
#' @rdname flatMap # @rdname flatMap
#' @export # @export
setGeneric("flatMap", function(X, FUN) { standardGeneric("flatMap") }) setGeneric("flatMap", function(X, FUN) { standardGeneric("flatMap") })
#' @rdname fold # @rdname fold
#' @seealso reduce # @seealso reduce
#' @export # @export
setGeneric("fold", function(x, zeroValue, op) { standardGeneric("fold") }) setGeneric("fold", function(x, zeroValue, op) { standardGeneric("fold") })
#' @rdname foreach # @rdname foreach
#' @export # @export
setGeneric("foreach", function(x, func) { standardGeneric("foreach") }) setGeneric("foreach", function(x, func) { standardGeneric("foreach") })
#' @rdname foreach # @rdname foreach
#' @export # @export
setGeneric("foreachPartition", function(x, func) { standardGeneric("foreachPartition") }) setGeneric("foreachPartition", function(x, func) { standardGeneric("foreachPartition") })
# The jrdd accessor function. # The jrdd accessor function.
setGeneric("getJRDD", function(rdd, ...) { standardGeneric("getJRDD") }) setGeneric("getJRDD", function(rdd, ...) { standardGeneric("getJRDD") })
#' @rdname glom # @rdname glom
#' @export # @export
setGeneric("glom", function(x) { standardGeneric("glom") }) setGeneric("glom", function(x) { standardGeneric("glom") })
#' @rdname keyBy # @rdname keyBy
#' @export # @export
setGeneric("keyBy", function(x, func) { standardGeneric("keyBy") }) setGeneric("keyBy", function(x, func) { standardGeneric("keyBy") })
#' @rdname lapplyPartition # @rdname lapplyPartition
#' @export # @export
setGeneric("lapplyPartition", function(X, FUN) { standardGeneric("lapplyPartition") }) setGeneric("lapplyPartition", function(X, FUN) { standardGeneric("lapplyPartition") })
#' @rdname lapplyPartitionsWithIndex # @rdname lapplyPartitionsWithIndex
#' @export # @export
setGeneric("lapplyPartitionsWithIndex", setGeneric("lapplyPartitionsWithIndex",
function(X, FUN) { function(X, FUN) {
standardGeneric("lapplyPartitionsWithIndex") standardGeneric("lapplyPartitionsWithIndex")
}) })
#' @rdname lapply # @rdname lapply
#' @export # @export
setGeneric("map", function(X, FUN) { standardGeneric("map") }) setGeneric("map", function(X, FUN) { standardGeneric("map") })
#' @rdname lapplyPartition # @rdname lapplyPartition
#' @export # @export
setGeneric("mapPartitions", function(X, FUN) { standardGeneric("mapPartitions") }) setGeneric("mapPartitions", function(X, FUN) { standardGeneric("mapPartitions") })
#' @rdname lapplyPartitionsWithIndex # @rdname lapplyPartitionsWithIndex
#' @export # @export
setGeneric("mapPartitionsWithIndex", setGeneric("mapPartitionsWithIndex",
function(X, FUN) { standardGeneric("mapPartitionsWithIndex") }) function(X, FUN) { standardGeneric("mapPartitionsWithIndex") })
#' @rdname maximum # @rdname maximum
#' @export # @export
setGeneric("maximum", function(x) { standardGeneric("maximum") }) setGeneric("maximum", function(x) { standardGeneric("maximum") })
#' @rdname minimum # @rdname minimum
#' @export # @export
setGeneric("minimum", function(x) { standardGeneric("minimum") }) setGeneric("minimum", function(x) { standardGeneric("minimum") })
#' @rdname sumRDD # @rdname sumRDD
#' @export # @export
setGeneric("sumRDD", function(x) { standardGeneric("sumRDD") }) setGeneric("sumRDD", function(x) { standardGeneric("sumRDD") })
#' @rdname name # @rdname name
#' @export # @export
setGeneric("name", function(x) { standardGeneric("name") }) setGeneric("name", function(x) { standardGeneric("name") })
#' @rdname numPartitions # @rdname numPartitions
#' @export # @export
setGeneric("numPartitions", function(x) { standardGeneric("numPartitions") }) setGeneric("numPartitions", function(x) { standardGeneric("numPartitions") })
#' @rdname persist # @rdname persist
#' @export # @export
setGeneric("persist", function(x, newLevel) { standardGeneric("persist") }) setGeneric("persist", function(x, newLevel) { standardGeneric("persist") })
#' @rdname pipeRDD # @rdname pipeRDD
#' @export # @export
setGeneric("pipeRDD", function(x, command, env = list()) { standardGeneric("pipeRDD")}) setGeneric("pipeRDD", function(x, command, env = list()) { standardGeneric("pipeRDD")})
#' @rdname reduce # @rdname reduce
#' @export # @export
setGeneric("reduce", function(x, func) { standardGeneric("reduce") }) setGeneric("reduce", function(x, func) { standardGeneric("reduce") })
#' @rdname repartition # @rdname repartition
#' @seealso coalesce # @seealso coalesce
#' @export # @export
setGeneric("repartition", function(x, numPartitions) { standardGeneric("repartition") }) setGeneric("repartition", function(x, numPartitions) { standardGeneric("repartition") })
#' @rdname sampleRDD # @rdname sampleRDD
#' @export # @export
setGeneric("sampleRDD", setGeneric("sampleRDD",
function(x, withReplacement, fraction, seed) { function(x, withReplacement, fraction, seed) {
standardGeneric("sampleRDD") standardGeneric("sampleRDD")
}) })
#' @rdname saveAsObjectFile # @rdname saveAsObjectFile
#' @seealso objectFile # @seealso objectFile
#' @export # @export
setGeneric("saveAsObjectFile", function(x, path) { standardGeneric("saveAsObjectFile") }) setGeneric("saveAsObjectFile", function(x, path) { standardGeneric("saveAsObjectFile") })
#' @rdname saveAsTextFile # @rdname saveAsTextFile
#' @export # @export
setGeneric("saveAsTextFile", function(x, path) { standardGeneric("saveAsTextFile") }) setGeneric("saveAsTextFile", function(x, path) { standardGeneric("saveAsTextFile") })
#' @rdname setName # @rdname setName
#' @export # @export
setGeneric("setName", function(x, name) { standardGeneric("setName") }) setGeneric("setName", function(x, name) { standardGeneric("setName") })
#' @rdname sortBy # @rdname sortBy
#' @export # @export
setGeneric("sortBy", setGeneric("sortBy",
function(x, func, ascending = TRUE, numPartitions = 1) { function(x, func, ascending = TRUE, numPartitions = 1) {
standardGeneric("sortBy") standardGeneric("sortBy")
}) })
#' @rdname take # @rdname take
#' @export # @export
setGeneric("take", function(x, num) { standardGeneric("take") }) setGeneric("take", function(x, num) { standardGeneric("take") })
#' @rdname takeOrdered # @rdname takeOrdered
#' @export # @export
setGeneric("takeOrdered", function(x, num) { standardGeneric("takeOrdered") }) setGeneric("takeOrdered", function(x, num) { standardGeneric("takeOrdered") })
#' @rdname takeSample # @rdname takeSample
#' @export # @export
setGeneric("takeSample", setGeneric("takeSample",
function(x, withReplacement, num, seed) { function(x, withReplacement, num, seed) {
standardGeneric("takeSample") standardGeneric("takeSample")
}) })
#' @rdname top # @rdname top
#' @export # @export
setGeneric("top", function(x, num) { standardGeneric("top") }) setGeneric("top", function(x, num) { standardGeneric("top") })
#' @rdname unionRDD # @rdname unionRDD
#' @export # @export
setGeneric("unionRDD", function(x, y) { standardGeneric("unionRDD") }) setGeneric("unionRDD", function(x, y) { standardGeneric("unionRDD") })
#' @rdname unpersist-methods # @rdname unpersist-methods
#' @export # @export
setGeneric("unpersist", function(x, ...) { standardGeneric("unpersist") }) setGeneric("unpersist", function(x, ...) { standardGeneric("unpersist") })
#' @rdname zipRDD # @rdname zipRDD
#' @export # @export
setGeneric("zipRDD", function(x, other) { standardGeneric("zipRDD") }) setGeneric("zipRDD", function(x, other) { standardGeneric("zipRDD") })
#' @rdname zipRDD # @rdname zipRDD
#' @export # @export
setGeneric("zipPartitions", function(..., func) { standardGeneric("zipPartitions") }, setGeneric("zipPartitions", function(..., func) { standardGeneric("zipPartitions") },
signature = "...") signature = "...")
#' @rdname zipWithIndex # @rdname zipWithIndex
#' @seealso zipWithUniqueId # @seealso zipWithUniqueId
#' @export # @export
setGeneric("zipWithIndex", function(x) { standardGeneric("zipWithIndex") }) setGeneric("zipWithIndex", function(x) { standardGeneric("zipWithIndex") })
#' @rdname zipWithUniqueId # @rdname zipWithUniqueId
#' @seealso zipWithIndex # @seealso zipWithIndex
#' @export # @export
setGeneric("zipWithUniqueId", function(x) { standardGeneric("zipWithUniqueId") }) setGeneric("zipWithUniqueId", function(x) { standardGeneric("zipWithUniqueId") })
############ Binary Functions ############# ############ Binary Functions #############
#' @rdname cartesian # @rdname cartesian
#' @export # @export
setGeneric("cartesian", function(x, other) { standardGeneric("cartesian") }) setGeneric("cartesian", function(x, other) { standardGeneric("cartesian") })
#' @rdname countByKey # @rdname countByKey
#' @export # @export
setGeneric("countByKey", function(x) { standardGeneric("countByKey") }) setGeneric("countByKey", function(x) { standardGeneric("countByKey") })
#' @rdname flatMapValues # @rdname flatMapValues
#' @export # @export
setGeneric("flatMapValues", function(X, FUN) { standardGeneric("flatMapValues") }) setGeneric("flatMapValues", function(X, FUN) { standardGeneric("flatMapValues") })
#' @rdname intersection # @rdname intersection
#' @export # @export
setGeneric("intersection", function(x, other, numPartitions = 1) { setGeneric("intersection", function(x, other, numPartitions = 1) {
standardGeneric("intersection") }) standardGeneric("intersection") })
#' @rdname keys # @rdname keys
#' @export # @export
setGeneric("keys", function(x) { standardGeneric("keys") }) setGeneric("keys", function(x) { standardGeneric("keys") })
#' @rdname lookup # @rdname lookup
#' @export # @export
setGeneric("lookup", function(x, key) { standardGeneric("lookup") }) setGeneric("lookup", function(x, key) { standardGeneric("lookup") })
#' @rdname mapValues # @rdname mapValues
#' @export # @export
setGeneric("mapValues", function(X, FUN) { standardGeneric("mapValues") }) setGeneric("mapValues", function(X, FUN) { standardGeneric("mapValues") })
#' @rdname sampleByKey # @rdname sampleByKey
#' @export # @export
setGeneric("sampleByKey", setGeneric("sampleByKey",
function(x, withReplacement, fractions, seed) { function(x, withReplacement, fractions, seed) {
standardGeneric("sampleByKey") standardGeneric("sampleByKey")
}) })
#' @rdname values # @rdname values
#' @export # @export
setGeneric("values", function(x) { standardGeneric("values") }) setGeneric("values", function(x) { standardGeneric("values") })
############ Shuffle Functions ############ ############ Shuffle Functions ############
#' @rdname aggregateByKey # @rdname aggregateByKey
#' @seealso foldByKey, combineByKey # @seealso foldByKey, combineByKey
#' @export # @export
setGeneric("aggregateByKey", setGeneric("aggregateByKey",
function(x, zeroValue, seqOp, combOp, numPartitions) { function(x, zeroValue, seqOp, combOp, numPartitions) {
standardGeneric("aggregateByKey") standardGeneric("aggregateByKey")
}) })
#' @rdname cogroup # @rdname cogroup
#' @export # @export
setGeneric("cogroup", setGeneric("cogroup",
function(..., numPartitions) { function(..., numPartitions) {
standardGeneric("cogroup") standardGeneric("cogroup")
}, },
signature = "...") signature = "...")
#' @rdname combineByKey # @rdname combineByKey
#' @seealso groupByKey, reduceByKey # @seealso groupByKey, reduceByKey
#' @export # @export
setGeneric("combineByKey", setGeneric("combineByKey",
function(x, createCombiner, mergeValue, mergeCombiners, numPartitions) { function(x, createCombiner, mergeValue, mergeCombiners, numPartitions) {
standardGeneric("combineByKey") standardGeneric("combineByKey")
}) })
#' @rdname foldByKey # @rdname foldByKey
#' @seealso aggregateByKey, combineByKey # @seealso aggregateByKey, combineByKey
#' @export # @export
setGeneric("foldByKey", setGeneric("foldByKey",
function(x, zeroValue, func, numPartitions) { function(x, zeroValue, func, numPartitions) {
standardGeneric("foldByKey") standardGeneric("foldByKey")
}) })
#' @rdname join-methods # @rdname join-methods
#' @export # @export
setGeneric("fullOuterJoin", function(x, y, numPartitions) { standardGeneric("fullOuterJoin") }) setGeneric("fullOuterJoin", function(x, y, numPartitions) { standardGeneric("fullOuterJoin") })
#' @rdname groupByKey # @rdname groupByKey
#' @seealso reduceByKey # @seealso reduceByKey
#' @export # @export
setGeneric("groupByKey", function(x, numPartitions) { standardGeneric("groupByKey") }) setGeneric("groupByKey", function(x, numPartitions) { standardGeneric("groupByKey") })
#' @rdname join-methods # @rdname join-methods
#' @export # @export
setGeneric("join", function(x, y, ...) { standardGeneric("join") }) setGeneric("join", function(x, y, ...) { standardGeneric("join") })
#' @rdname join-methods # @rdname join-methods
#' @export # @export
setGeneric("leftOuterJoin", function(x, y, numPartitions) { standardGeneric("leftOuterJoin") }) setGeneric("leftOuterJoin", function(x, y, numPartitions) { standardGeneric("leftOuterJoin") })
#' @rdname partitionBy # @rdname partitionBy
#' @export # @export
setGeneric("partitionBy", function(x, numPartitions, ...) { standardGeneric("partitionBy") }) setGeneric("partitionBy", function(x, numPartitions, ...) { standardGeneric("partitionBy") })
#' @rdname reduceByKey # @rdname reduceByKey
#' @seealso groupByKey # @seealso groupByKey
#' @export # @export
setGeneric("reduceByKey", function(x, combineFunc, numPartitions) { standardGeneric("reduceByKey")}) setGeneric("reduceByKey", function(x, combineFunc, numPartitions) { standardGeneric("reduceByKey")})
#' @rdname reduceByKeyLocally # @rdname reduceByKeyLocally
#' @seealso reduceByKey # @seealso reduceByKey
#' @export # @export
setGeneric("reduceByKeyLocally", setGeneric("reduceByKeyLocally",
function(x, combineFunc) { function(x, combineFunc) {
standardGeneric("reduceByKeyLocally") standardGeneric("reduceByKeyLocally")
}) })
#' @rdname join-methods # @rdname join-methods
#' @export # @export
setGeneric("rightOuterJoin", function(x, y, numPartitions) { standardGeneric("rightOuterJoin") }) setGeneric("rightOuterJoin", function(x, y, numPartitions) { standardGeneric("rightOuterJoin") })
#' @rdname sortByKey # @rdname sortByKey
#' @export # @export
setGeneric("sortByKey", setGeneric("sortByKey",
function(x, ascending = TRUE, numPartitions = 1) { function(x, ascending = TRUE, numPartitions = 1) {
standardGeneric("sortByKey") standardGeneric("sortByKey")
}) })
#' @rdname subtract # @rdname subtract
#' @export # @export
setGeneric("subtract", setGeneric("subtract",
function(x, other, numPartitions = 1) { function(x, other, numPartitions = 1) {
standardGeneric("subtract") standardGeneric("subtract")
}) })
#' @rdname subtractByKey # @rdname subtractByKey
#' @export # @export
setGeneric("subtractByKey", setGeneric("subtractByKey",
function(x, other, numPartitions = 1) { function(x, other, numPartitions = 1) {
standardGeneric("subtractByKey") standardGeneric("subtractByKey")
@ -372,8 +372,8 @@ setGeneric("subtractByKey",
################### Broadcast Variable Methods ################# ################### Broadcast Variable Methods #################
#' @rdname broadcast # @rdname broadcast
#' @export # @export
setGeneric("value", function(bcast) { standardGeneric("value") }) setGeneric("value", function(bcast) { standardGeneric("value") })
@ -477,8 +477,8 @@ setGeneric("showDF", function(x,...) { standardGeneric("showDF") })
#' @export #' @export
setGeneric("sortDF", function(x, col, ...) { standardGeneric("sortDF") }) setGeneric("sortDF", function(x, col, ...) { standardGeneric("sortDF") })
#' @rdname tojson # @rdname tojson
#' @export # @export
setGeneric("toJSON", function(x) { standardGeneric("toJSON") }) setGeneric("toJSON", function(x) { standardGeneric("toJSON") })
#' @rdname DataFrame #' @rdname DataFrame

File diff suppressed because it is too large Load diff