[Minor][SparkR] Minor refactor and removes redundancy related to cleanClosure.
1. Only use `cleanClosure` in creation of RRDDs. Normally, user and developer do not need to call `cleanClosure` in their function definition. 2. Removes redundant code (e.g. unnecessary wrapper functions) related to `cleanClosure`. Author: hlin09 <hlin09pu@gmail.com> Closes #5495 from hlin09/cleanClosureFix and squashes the following commits: 74ec303 [hlin09] Minor refactor and removes redundancy.
This commit is contained in:
parent
b45059d0d7
commit
0ba3fdd599
|
@ -85,7 +85,7 @@ setMethod("initialize", "PipelinedRDD", function(.Object, prev, func, jrdd_val)
|
||||||
|
|
||||||
if (!inherits(prev, "PipelinedRDD") || !isPipelinable(prev)) {
|
if (!inherits(prev, "PipelinedRDD") || !isPipelinable(prev)) {
|
||||||
# This transformation is the first in its stage:
|
# This transformation is the first in its stage:
|
||||||
.Object@func <- func
|
.Object@func <- cleanClosure(func)
|
||||||
.Object@prev_jrdd <- getJRDD(prev)
|
.Object@prev_jrdd <- getJRDD(prev)
|
||||||
.Object@env$prev_serializedMode <- prev@env$serializedMode
|
.Object@env$prev_serializedMode <- prev@env$serializedMode
|
||||||
# NOTE: We use prev_serializedMode to track the serialization mode of prev_JRDD
|
# NOTE: We use prev_serializedMode to track the serialization mode of prev_JRDD
|
||||||
|
@ -94,7 +94,7 @@ setMethod("initialize", "PipelinedRDD", function(.Object, prev, func, jrdd_val)
|
||||||
pipelinedFunc <- function(split, iterator) {
|
pipelinedFunc <- function(split, iterator) {
|
||||||
func(split, prev@func(split, iterator))
|
func(split, prev@func(split, iterator))
|
||||||
}
|
}
|
||||||
.Object@func <- pipelinedFunc
|
.Object@func <- cleanClosure(pipelinedFunc)
|
||||||
.Object@prev_jrdd <- prev@prev_jrdd # maintain the pipeline
|
.Object@prev_jrdd <- prev@prev_jrdd # maintain the pipeline
|
||||||
# Get the serialization mode of the parent RDD
|
# Get the serialization mode of the parent RDD
|
||||||
.Object@env$prev_serializedMode <- prev@env$prev_serializedMode
|
.Object@env$prev_serializedMode <- prev@env$prev_serializedMode
|
||||||
|
@ -144,17 +144,13 @@ setMethod("getJRDD", signature(rdd = "PipelinedRDD"),
|
||||||
return(rdd@env$jrdd_val)
|
return(rdd@env$jrdd_val)
|
||||||
}
|
}
|
||||||
|
|
||||||
computeFunc <- function(split, part) {
|
|
||||||
rdd@func(split, part)
|
|
||||||
}
|
|
||||||
|
|
||||||
packageNamesArr <- serialize(.sparkREnv[[".packages"]],
|
packageNamesArr <- serialize(.sparkREnv[[".packages"]],
|
||||||
connection = NULL)
|
connection = NULL)
|
||||||
|
|
||||||
broadcastArr <- lapply(ls(.broadcastNames),
|
broadcastArr <- lapply(ls(.broadcastNames),
|
||||||
function(name) { get(name, .broadcastNames) })
|
function(name) { get(name, .broadcastNames) })
|
||||||
|
|
||||||
serializedFuncArr <- serialize(computeFunc, connection = NULL)
|
serializedFuncArr <- serialize(rdd@func, connection = NULL)
|
||||||
|
|
||||||
prev_jrdd <- rdd@prev_jrdd
|
prev_jrdd <- rdd@prev_jrdd
|
||||||
|
|
||||||
|
@ -551,11 +547,7 @@ setMethod("mapPartitions",
|
||||||
setMethod("lapplyPartitionsWithIndex",
|
setMethod("lapplyPartitionsWithIndex",
|
||||||
signature(X = "RDD", FUN = "function"),
|
signature(X = "RDD", FUN = "function"),
|
||||||
function(X, FUN) {
|
function(X, FUN) {
|
||||||
FUN <- cleanClosure(FUN)
|
PipelinedRDD(X, FUN)
|
||||||
closureCapturingFunc <- function(split, part) {
|
|
||||||
FUN(split, part)
|
|
||||||
}
|
|
||||||
PipelinedRDD(X, closureCapturingFunc)
|
|
||||||
})
|
})
|
||||||
|
|
||||||
#' @rdname lapplyPartitionsWithIndex
|
#' @rdname lapplyPartitionsWithIndex
|
||||||
|
|
|
@ -694,10 +694,6 @@ setMethod("cogroup",
|
||||||
for (i in 1:rddsLen) {
|
for (i in 1:rddsLen) {
|
||||||
rdds[[i]] <- lapply(rdds[[i]],
|
rdds[[i]] <- lapply(rdds[[i]],
|
||||||
function(x) { list(x[[1]], list(i, x[[2]])) })
|
function(x) { list(x[[1]], list(i, x[[2]])) })
|
||||||
# TODO(hao): As issue [SparkR-142] mentions, the right value of i
|
|
||||||
# will not be captured into UDF if getJRDD is not invoked.
|
|
||||||
# It should be resolved together with that issue.
|
|
||||||
getJRDD(rdds[[i]]) # Capture the closure.
|
|
||||||
}
|
}
|
||||||
union.rdd <- Reduce(unionRDD, rdds)
|
union.rdd <- Reduce(unionRDD, rdds)
|
||||||
group.func <- function(vlist) {
|
group.func <- function(vlist) {
|
||||||
|
|
Loading…
Reference in a new issue