[SPARK-17838][SPARKR] Check named arguments for options and use formatted R friendly message from JVM exception message
## What changes were proposed in this pull request? This PR proposes to - improve the R-friendly error messages rather than raw JVM exception one. As `read.json`, `read.text`, `read.orc`, `read.parquet` and `read.jdbc` are executed in the same path with `read.df`, and `write.json`, `write.text`, `write.orc`, `write.parquet` and `write.jdbc` shares the same path with `write.df`, it seems it is safe to call `handledCallJMethod` to handle JVM messages. - prevent `zero-length variable name` and prints the ignored options as an warning message. **Before** ``` r > read.json("path", a = 1, 2, 3, "a") Error in env[[name]] <- value : zero-length variable name ``` ``` r > read.json("arbitrary_path") Error in invokeJava(isStatic = FALSE, objId$id, methodName, ...) : org.apache.spark.sql.AnalysisException: Path does not exist: file:/...; at org.apache.spark.sql.execution.datasources.DataSource$$anonfun$12.apply(DataSource.scala:398) ... > read.orc("arbitrary_path") Error in invokeJava(isStatic = FALSE, objId$id, methodName, ...) : org.apache.spark.sql.AnalysisException: Path does not exist: file:/...; at org.apache.spark.sql.execution.datasources.DataSource$$anonfun$12.apply(DataSource.scala:398) ... > read.text("arbitrary_path") Error in invokeJava(isStatic = FALSE, objId$id, methodName, ...) : org.apache.spark.sql.AnalysisException: Path does not exist: file:/...; at org.apache.spark.sql.execution.datasources.DataSource$$anonfun$12.apply(DataSource.scala:398) ... > read.parquet("arbitrary_path") Error in invokeJava(isStatic = FALSE, objId$id, methodName, ...) : org.apache.spark.sql.AnalysisException: Path does not exist: file:/...; at org.apache.spark.sql.execution.datasources.DataSource$$anonfun$12.apply(DataSource.scala:398) ... ``` ``` r > write.json(df, "existing_path") Error in invokeJava(isStatic = FALSE, objId$id, methodName, ...) : org.apache.spark.sql.AnalysisException: path file:/... already exists.; at org.apache.spark.sql.execution.datasources.InsertIntoHadoopFsRelationCommand.run(InsertIntoHadoopFsRelationCommand.scala:68) > write.orc(df, "existing_path") Error in invokeJava(isStatic = FALSE, objId$id, methodName, ...) : org.apache.spark.sql.AnalysisException: path file:/... already exists.; at org.apache.spark.sql.execution.datasources.InsertIntoHadoopFsRelationCommand.run(InsertIntoHadoopFsRelationCommand.scala:68) > write.text(df, "existing_path") Error in invokeJava(isStatic = FALSE, objId$id, methodName, ...) : org.apache.spark.sql.AnalysisException: path file:/... already exists.; at org.apache.spark.sql.execution.datasources.InsertIntoHadoopFsRelationCommand.run(InsertIntoHadoopFsRelationCommand.scala:68) > write.parquet(df, "existing_path") Error in invokeJava(isStatic = FALSE, objId$id, methodName, ...) : org.apache.spark.sql.AnalysisException: path file:/... already exists.; at org.apache.spark.sql.execution.datasources.InsertIntoHadoopFsRelationCommand.run(InsertIntoHadoopFsRelationCommand.scala:68) ``` **After** ``` r read.json("arbitrary_path", a = 1, 2, 3, "a") Unnamed arguments ignored: 2, 3, a. ``` ``` r > read.json("arbitrary_path") Error in json : analysis error - Path does not exist: file:/... > read.orc("arbitrary_path") Error in orc : analysis error - Path does not exist: file:/... > read.text("arbitrary_path") Error in text : analysis error - Path does not exist: file:/... > read.parquet("arbitrary_path") Error in parquet : analysis error - Path does not exist: file:/... ``` ``` r > write.json(df, "existing_path") Error in json : analysis error - path file:/... already exists.; > write.orc(df, "existing_path") Error in orc : analysis error - path file:/... already exists.; > write.text(df, "existing_path") Error in text : analysis error - path file:/... already exists.; > write.parquet(df, "existing_path") Error in parquet : analysis error - path file:/... already exists.; ``` ## How was this patch tested? Unit tests in `test_utils.R` and `test_sparkSQL.R`. Author: hyukjinkwon <gurwls223@gmail.com> Closes #15608 from HyukjinKwon/SPARK-17838.
This commit is contained in:
parent
ad4832a9fa
commit
1ecfafa086
|
@ -788,7 +788,7 @@ setMethod("write.json",
|
|||
function(x, path, mode = "error", ...) {
|
||||
write <- callJMethod(x@sdf, "write")
|
||||
write <- setWriteOptions(write, mode = mode, ...)
|
||||
invisible(callJMethod(write, "json", path))
|
||||
invisible(handledCallJMethod(write, "json", path))
|
||||
})
|
||||
|
||||
#' Save the contents of SparkDataFrame as an ORC file, preserving the schema.
|
||||
|
@ -819,7 +819,7 @@ setMethod("write.orc",
|
|||
function(x, path, mode = "error", ...) {
|
||||
write <- callJMethod(x@sdf, "write")
|
||||
write <- setWriteOptions(write, mode = mode, ...)
|
||||
invisible(callJMethod(write, "orc", path))
|
||||
invisible(handledCallJMethod(write, "orc", path))
|
||||
})
|
||||
|
||||
#' Save the contents of SparkDataFrame as a Parquet file, preserving the schema.
|
||||
|
@ -851,7 +851,7 @@ setMethod("write.parquet",
|
|||
function(x, path, mode = "error", ...) {
|
||||
write <- callJMethod(x@sdf, "write")
|
||||
write <- setWriteOptions(write, mode = mode, ...)
|
||||
invisible(callJMethod(write, "parquet", path))
|
||||
invisible(handledCallJMethod(write, "parquet", path))
|
||||
})
|
||||
|
||||
#' @rdname write.parquet
|
||||
|
@ -895,7 +895,7 @@ setMethod("write.text",
|
|||
function(x, path, mode = "error", ...) {
|
||||
write <- callJMethod(x@sdf, "write")
|
||||
write <- setWriteOptions(write, mode = mode, ...)
|
||||
invisible(callJMethod(write, "text", path))
|
||||
invisible(handledCallJMethod(write, "text", path))
|
||||
})
|
||||
|
||||
#' Distinct
|
||||
|
@ -3342,7 +3342,7 @@ setMethod("write.jdbc",
|
|||
jprops <- varargsToJProperties(...)
|
||||
write <- callJMethod(x@sdf, "write")
|
||||
write <- callJMethod(write, "mode", jmode)
|
||||
invisible(callJMethod(write, "jdbc", url, tableName, jprops))
|
||||
invisible(handledCallJMethod(write, "jdbc", url, tableName, jprops))
|
||||
})
|
||||
|
||||
#' randomSplit
|
||||
|
|
|
@ -350,7 +350,7 @@ read.json.default <- function(path, ...) {
|
|||
paths <- as.list(suppressWarnings(normalizePath(path)))
|
||||
read <- callJMethod(sparkSession, "read")
|
||||
read <- callJMethod(read, "options", options)
|
||||
sdf <- callJMethod(read, "json", paths)
|
||||
sdf <- handledCallJMethod(read, "json", paths)
|
||||
dataFrame(sdf)
|
||||
}
|
||||
|
||||
|
@ -422,7 +422,7 @@ read.orc <- function(path, ...) {
|
|||
path <- suppressWarnings(normalizePath(path))
|
||||
read <- callJMethod(sparkSession, "read")
|
||||
read <- callJMethod(read, "options", options)
|
||||
sdf <- callJMethod(read, "orc", path)
|
||||
sdf <- handledCallJMethod(read, "orc", path)
|
||||
dataFrame(sdf)
|
||||
}
|
||||
|
||||
|
@ -444,7 +444,7 @@ read.parquet.default <- function(path, ...) {
|
|||
paths <- as.list(suppressWarnings(normalizePath(path)))
|
||||
read <- callJMethod(sparkSession, "read")
|
||||
read <- callJMethod(read, "options", options)
|
||||
sdf <- callJMethod(read, "parquet", paths)
|
||||
sdf <- handledCallJMethod(read, "parquet", paths)
|
||||
dataFrame(sdf)
|
||||
}
|
||||
|
||||
|
@ -496,7 +496,7 @@ read.text.default <- function(path, ...) {
|
|||
paths <- as.list(suppressWarnings(normalizePath(path)))
|
||||
read <- callJMethod(sparkSession, "read")
|
||||
read <- callJMethod(read, "options", options)
|
||||
sdf <- callJMethod(read, "text", paths)
|
||||
sdf <- handledCallJMethod(read, "text", paths)
|
||||
dataFrame(sdf)
|
||||
}
|
||||
|
||||
|
@ -914,12 +914,13 @@ read.jdbc <- function(url, tableName,
|
|||
} else {
|
||||
numPartitions <- numToInt(numPartitions)
|
||||
}
|
||||
sdf <- callJMethod(read, "jdbc", url, tableName, as.character(partitionColumn),
|
||||
sdf <- handledCallJMethod(read, "jdbc", url, tableName, as.character(partitionColumn),
|
||||
numToInt(lowerBound), numToInt(upperBound), numPartitions, jprops)
|
||||
} else if (length(predicates) > 0) {
|
||||
sdf <- callJMethod(read, "jdbc", url, tableName, as.list(as.character(predicates)), jprops)
|
||||
sdf <- handledCallJMethod(read, "jdbc", url, tableName, as.list(as.character(predicates)),
|
||||
jprops)
|
||||
} else {
|
||||
sdf <- callJMethod(read, "jdbc", url, tableName, jprops)
|
||||
sdf <- handledCallJMethod(read, "jdbc", url, tableName, jprops)
|
||||
}
|
||||
dataFrame(sdf)
|
||||
}
|
||||
|
|
|
@ -338,12 +338,25 @@ varargsToEnv <- function(...) {
|
|||
# into string.
|
||||
varargsToStrEnv <- function(...) {
|
||||
pairs <- list(...)
|
||||
nameList <- names(pairs)
|
||||
env <- new.env()
|
||||
for (name in names(pairs)) {
|
||||
ignoredNames <- list()
|
||||
|
||||
if (is.null(nameList)) {
|
||||
# When all arguments are not named, names(..) returns NULL.
|
||||
ignoredNames <- pairs
|
||||
} else {
|
||||
for (i in seq_along(pairs)) {
|
||||
name <- nameList[i]
|
||||
value <- pairs[i]
|
||||
if (identical(name, "")) {
|
||||
# When some of arguments are not named, name is "".
|
||||
ignoredNames <- append(ignoredNames, value)
|
||||
} else {
|
||||
value <- pairs[[name]]
|
||||
if (!(is.logical(value) || is.numeric(value) || is.character(value) || is.null(value))) {
|
||||
stop(paste0("Unsupported type for ", name, " : ", class(value),
|
||||
". Supported types are logical, numeric, character and NULL."))
|
||||
". Supported types are logical, numeric, character and NULL."), call. = FALSE)
|
||||
}
|
||||
if (is.logical(value)) {
|
||||
env[[name]] <- tolower(as.character(value))
|
||||
|
@ -353,6 +366,13 @@ varargsToStrEnv <- function(...) {
|
|||
env[[name]] <- as.character(value)
|
||||
}
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
if (length(ignoredNames) != 0) {
|
||||
warning(paste0("Unnamed arguments ignored: ", paste(ignoredNames, collapse = ", "), "."),
|
||||
call. = FALSE)
|
||||
}
|
||||
env
|
||||
}
|
||||
|
||||
|
|
|
@ -2660,6 +2660,14 @@ test_that("Call DataFrameWriter.save() API in Java without path and check argume
|
|||
# DataFrameWriter.save() without path.
|
||||
expect_error(write.df(df, source = "csv"),
|
||||
"Error in save : illegal argument - 'path' is not specified")
|
||||
expect_error(write.json(df, jsonPath),
|
||||
"Error in json : analysis error - path file:.*already exists")
|
||||
expect_error(write.text(df, jsonPath),
|
||||
"Error in text : analysis error - path file:.*already exists")
|
||||
expect_error(write.orc(df, jsonPath),
|
||||
"Error in orc : analysis error - path file:.*already exists")
|
||||
expect_error(write.parquet(df, jsonPath),
|
||||
"Error in parquet : analysis error - path file:.*already exists")
|
||||
|
||||
# Arguments checking in R side.
|
||||
expect_error(write.df(df, "data.tmp", source = c(1, 2)),
|
||||
|
@ -2679,6 +2687,11 @@ test_that("Call DataFrameWriter.load() API in Java without path and check argume
|
|||
paste("Error in loadDF : analysis error - Unable to infer schema for JSON at .",
|
||||
"It must be specified manually"))
|
||||
expect_error(read.df("arbitrary_path"), "Error in loadDF : analysis error - Path does not exist")
|
||||
expect_error(read.json("arbitrary_path"), "Error in json : analysis error - Path does not exist")
|
||||
expect_error(read.text("arbitrary_path"), "Error in text : analysis error - Path does not exist")
|
||||
expect_error(read.orc("arbitrary_path"), "Error in orc : analysis error - Path does not exist")
|
||||
expect_error(read.parquet("arbitrary_path"),
|
||||
"Error in parquet : analysis error - Path does not exist")
|
||||
|
||||
# Arguments checking in R side.
|
||||
expect_error(read.df(path = c(3)),
|
||||
|
@ -2686,6 +2699,9 @@ test_that("Call DataFrameWriter.load() API in Java without path and check argume
|
|||
expect_error(read.df(jsonPath, source = c(1, 2)),
|
||||
paste("source should be character, NULL or omitted. It is the datasource specified",
|
||||
"in 'spark.sql.sources.default' configuration by default."))
|
||||
|
||||
expect_warning(read.json(jsonPath, a = 1, 2, 3, "a"),
|
||||
"Unnamed arguments ignored: 2, 3, a.")
|
||||
})
|
||||
|
||||
unlink(parquetPath)
|
||||
|
|
|
@ -224,6 +224,8 @@ test_that("varargsToStrEnv", {
|
|||
expect_error(varargsToStrEnv(a = list(1, "a")),
|
||||
paste0("Unsupported type for a : list. Supported types are logical, ",
|
||||
"numeric, character and NULL."))
|
||||
expect_warning(varargsToStrEnv(a = 1, 2, 3, 4), "Unnamed arguments ignored: 2, 3, 4.")
|
||||
expect_warning(varargsToStrEnv(1, 2, 3, 4), "Unnamed arguments ignored: 1, 2, 3, 4.")
|
||||
})
|
||||
|
||||
sparkR.session.stop()
|
||||
|
|
Loading…
Reference in a new issue