[SPARK-6991] [SPARKR] Adds support for zipPartitions.
Author: hlin09 <hlin09pu@gmail.com> Closes #5568 from hlin09/zipPartitions and squashes the following commits: 12c08a5 [hlin09] Fix comments d2d32db [hlin09] Merge branch 'master' into zipPartitions ec56d2f [hlin09] Fix test. 27655d3 [hlin09] Adds support for zipPartitions.
This commit is contained in:
parent
ef82bddc11
commit
ca9f4ebb8e
|
@ -71,6 +71,7 @@ exportMethods(
|
|||
"unpersist",
|
||||
"value",
|
||||
"values",
|
||||
"zipPartitions",
|
||||
"zipRDD",
|
||||
"zipWithIndex",
|
||||
"zipWithUniqueId"
|
||||
|
|
|
@ -1595,3 +1595,49 @@ setMethod("intersection",
|
|||
|
||||
keys(filterRDD(cogroup(rdd1, rdd2, numPartitions = numPartitions), filterFunction))
|
||||
})
|
||||
|
||||
#' Zips an RDD's partitions with one (or more) RDD(s).
|
||||
#' Same as zipPartitions in Spark.
|
||||
#'
|
||||
#' @param ... RDDs to be zipped.
|
||||
#' @param func A function to transform zipped partitions.
|
||||
#' @return A new RDD by applying a function to the zipped partitions.
|
||||
#' Assumes that all the RDDs have the *same number of partitions*, but
|
||||
#' does *not* require them to have the same number of elements in each partition.
|
||||
#' @examples
|
||||
#'\dontrun{
|
||||
#' sc <- sparkR.init()
|
||||
#' rdd1 <- parallelize(sc, 1:2, 2L) # 1, 2
|
||||
#' rdd2 <- parallelize(sc, 1:4, 2L) # 1:2, 3:4
|
||||
#' rdd3 <- parallelize(sc, 1:6, 2L) # 1:3, 4:6
|
||||
#' collect(zipPartitions(rdd1, rdd2, rdd3,
|
||||
#' func = function(x, y, z) { list(list(x, y, z))} ))
|
||||
#' # list(list(1, c(1,2), c(1,2,3)), list(2, c(3,4), c(4,5,6)))
|
||||
#'}
|
||||
#' @rdname zipRDD
|
||||
#' @aliases zipPartitions,RDD
|
||||
setMethod("zipPartitions",
|
||||
"RDD",
|
||||
function(..., func) {
|
||||
rrdds <- list(...)
|
||||
if (length(rrdds) == 1) {
|
||||
return(rrdds[[1]])
|
||||
}
|
||||
nPart <- sapply(rrdds, numPartitions)
|
||||
if (length(unique(nPart)) != 1) {
|
||||
stop("Can only zipPartitions RDDs which have the same number of partitions.")
|
||||
}
|
||||
|
||||
rrdds <- lapply(rrdds, function(rdd) {
|
||||
mapPartitionsWithIndex(rdd, function(partIndex, part) {
|
||||
print(length(part))
|
||||
list(list(partIndex, part))
|
||||
})
|
||||
})
|
||||
union.rdd <- Reduce(unionRDD, rrdds)
|
||||
zipped.rdd <- values(groupByKey(union.rdd, numPartitions = nPart[1]))
|
||||
res <- mapPartitions(zipped.rdd, function(plist) {
|
||||
do.call(func, plist[[1]])
|
||||
})
|
||||
res
|
||||
})
|
||||
|
|
|
@ -217,6 +217,11 @@ setGeneric("unpersist", function(x, ...) { standardGeneric("unpersist") })
|
|||
#' @export
|
||||
setGeneric("zipRDD", function(x, other) { standardGeneric("zipRDD") })
|
||||
|
||||
#' @rdname zipRDD
|
||||
#' @export
|
||||
setGeneric("zipPartitions", function(..., func) { standardGeneric("zipPartitions") },
|
||||
signature = "...")
|
||||
|
||||
#' @rdname zipWithIndex
|
||||
#' @seealso zipWithUniqueId
|
||||
#' @export
|
||||
|
|
|
@ -66,3 +66,36 @@ test_that("cogroup on two RDDs", {
|
|||
expect_equal(sortKeyValueList(actual),
|
||||
sortKeyValueList(expected))
|
||||
})
|
||||
|
||||
test_that("zipPartitions() on RDDs", {
|
||||
rdd1 <- parallelize(sc, 1:2, 2L) # 1, 2
|
||||
rdd2 <- parallelize(sc, 1:4, 2L) # 1:2, 3:4
|
||||
rdd3 <- parallelize(sc, 1:6, 2L) # 1:3, 4:6
|
||||
actual <- collect(zipPartitions(rdd1, rdd2, rdd3,
|
||||
func = function(x, y, z) { list(list(x, y, z))} ))
|
||||
expect_equal(actual,
|
||||
list(list(1, c(1,2), c(1,2,3)), list(2, c(3,4), c(4,5,6))))
|
||||
|
||||
mockFile = c("Spark is pretty.", "Spark is awesome.")
|
||||
fileName <- tempfile(pattern="spark-test", fileext=".tmp")
|
||||
writeLines(mockFile, fileName)
|
||||
|
||||
rdd <- textFile(sc, fileName, 1)
|
||||
actual <- collect(zipPartitions(rdd, rdd,
|
||||
func = function(x, y) { list(paste(x, y, sep = "\n")) }))
|
||||
expected <- list(paste(mockFile, mockFile, sep = "\n"))
|
||||
expect_equal(actual, expected)
|
||||
|
||||
rdd1 <- parallelize(sc, 0:1, 1)
|
||||
actual <- collect(zipPartitions(rdd1, rdd,
|
||||
func = function(x, y) { list(x + nchar(y)) }))
|
||||
expected <- list(0:1 + nchar(mockFile))
|
||||
expect_equal(actual, expected)
|
||||
|
||||
rdd <- map(rdd, function(x) { x })
|
||||
actual <- collect(zipPartitions(rdd, rdd1,
|
||||
func = function(x, y) { list(y + nchar(x)) }))
|
||||
expect_equal(actual, expected)
|
||||
|
||||
unlink(fileName)
|
||||
})
|
||||
|
|
Loading…
Reference in a new issue