[SPARK-19925][SPARKR] Fix SparkR spark.getSparkFiles fails when it was called on executors.
## What changes were proposed in this pull request? SparkR ```spark.getSparkFiles``` fails when it was called on executors, see details at [SPARK-19925](https://issues.apache.org/jira/browse/SPARK-19925). ## How was this patch tested? Add unit tests, and verify this fix at standalone and yarn cluster. Author: Yanbo Liang <ybliang8@gmail.com> Closes #17274 from yanboliang/spark-19925.
This commit is contained in:
parent
c1e87e384d
commit
478fbc866f
|
@ -330,7 +330,13 @@ spark.addFile <- function(path, recursive = FALSE) {
|
||||||
#'}
|
#'}
|
||||||
#' @note spark.getSparkFilesRootDirectory since 2.1.0
|
#' @note spark.getSparkFilesRootDirectory since 2.1.0
|
||||||
spark.getSparkFilesRootDirectory <- function() {
|
spark.getSparkFilesRootDirectory <- function() {
|
||||||
|
if (Sys.getenv("SPARKR_IS_RUNNING_ON_WORKER") == "") {
|
||||||
|
# Running on driver.
|
||||||
callJStatic("org.apache.spark.SparkFiles", "getRootDirectory")
|
callJStatic("org.apache.spark.SparkFiles", "getRootDirectory")
|
||||||
|
} else {
|
||||||
|
# Running on worker.
|
||||||
|
Sys.getenv("SPARKR_SPARKFILES_ROOT_DIR")
|
||||||
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
#' Get the absolute path of a file added through spark.addFile.
|
#' Get the absolute path of a file added through spark.addFile.
|
||||||
|
@ -345,7 +351,13 @@ spark.getSparkFilesRootDirectory <- function() {
|
||||||
#'}
|
#'}
|
||||||
#' @note spark.getSparkFiles since 2.1.0
|
#' @note spark.getSparkFiles since 2.1.0
|
||||||
spark.getSparkFiles <- function(fileName) {
|
spark.getSparkFiles <- function(fileName) {
|
||||||
|
if (Sys.getenv("SPARKR_IS_RUNNING_ON_WORKER") == "") {
|
||||||
|
# Running on driver.
|
||||||
callJStatic("org.apache.spark.SparkFiles", "get", as.character(fileName))
|
callJStatic("org.apache.spark.SparkFiles", "get", as.character(fileName))
|
||||||
|
} else {
|
||||||
|
# Running on worker.
|
||||||
|
file.path(spark.getSparkFilesRootDirectory(), as.character(fileName))
|
||||||
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
#' Run a function over a list of elements, distributing the computations with Spark
|
#' Run a function over a list of elements, distributing the computations with Spark
|
||||||
|
|
|
@ -177,6 +177,13 @@ test_that("add and get file to be downloaded with Spark job on every node", {
|
||||||
spark.addFile(path)
|
spark.addFile(path)
|
||||||
download_path <- spark.getSparkFiles(filename)
|
download_path <- spark.getSparkFiles(filename)
|
||||||
expect_equal(readLines(download_path), words)
|
expect_equal(readLines(download_path), words)
|
||||||
|
|
||||||
|
# Test spark.getSparkFiles works well on executors.
|
||||||
|
seq <- seq(from = 1, to = 10, length.out = 5)
|
||||||
|
f <- function(seq) { spark.getSparkFiles(filename) }
|
||||||
|
results <- spark.lapply(seq, f)
|
||||||
|
for (i in 1:5) { expect_equal(basename(results[[i]]), filename) }
|
||||||
|
|
||||||
unlink(path)
|
unlink(path)
|
||||||
|
|
||||||
# Test add directory recursively.
|
# Test add directory recursively.
|
||||||
|
|
|
@ -347,6 +347,8 @@ private[r] object RRunner {
|
||||||
pb.environment().put("SPARKR_RLIBDIR", rLibDir.mkString(","))
|
pb.environment().put("SPARKR_RLIBDIR", rLibDir.mkString(","))
|
||||||
pb.environment().put("SPARKR_WORKER_PORT", port.toString)
|
pb.environment().put("SPARKR_WORKER_PORT", port.toString)
|
||||||
pb.environment().put("SPARKR_BACKEND_CONNECTION_TIMEOUT", rConnectionTimeout.toString)
|
pb.environment().put("SPARKR_BACKEND_CONNECTION_TIMEOUT", rConnectionTimeout.toString)
|
||||||
|
pb.environment().put("SPARKR_SPARKFILES_ROOT_DIR", SparkFiles.getRootDirectory())
|
||||||
|
pb.environment().put("SPARKR_IS_RUNNING_ON_WORKER", "TRUE")
|
||||||
pb.redirectErrorStream(true) // redirect stderr into stdout
|
pb.redirectErrorStream(true) // redirect stderr into stdout
|
||||||
val proc = pb.start()
|
val proc = pb.start()
|
||||||
val errThread = startStdoutThread(proc)
|
val errThread = startStdoutThread(proc)
|
||||||
|
|
Loading…
Reference in a new issue