From 66b2046462c0e93b2ca167728eba9f4d13a5a67c Mon Sep 17 00:00:00 2001 From: hyukjinkwon Date: Fri, 30 Nov 2018 10:29:30 +0800 Subject: [PATCH] [SPARK-25446][R] Add schema_of_json() and schema_of_csv() to R ## What changes were proposed in this pull request? This PR proposes to expose `schema_of_json` and `schema_of_csv` at R side. **`schema_of_json`**: ```r json <- '{"name":"Bob"}' df <- sql("SELECT * FROM range(1)") head(select(df, schema_of_json(json))) ``` ``` schema_of_json({"name":"Bob"}) 1 struct ``` **`schema_of_csv`**: ```r csv <- "Amsterdam,2018" df <- sql("SELECT * FROM range(1)") head(select(df, schema_of_csv(csv))) ``` ``` schema_of_csv(Amsterdam,2018) 1 struct<_c0:string,_c1:int> ``` ## How was this patch tested? Manually tested, unit tests added, documentation manually built and verified. Closes #22939 from HyukjinKwon/SPARK-25446. Authored-by: hyukjinkwon Signed-off-by: hyukjinkwon --- R/pkg/NAMESPACE | 2 + R/pkg/R/functions.R | 77 ++++++++++++++++++++++++--- R/pkg/R/generics.R | 8 +++ R/pkg/tests/fulltests/test_sparkSQL.R | 16 +++++- 4 files changed, 94 insertions(+), 9 deletions(-) diff --git a/R/pkg/NAMESPACE b/R/pkg/NAMESPACE index cdeafdd90c..1f8ba0bcf1 100644 --- a/R/pkg/NAMESPACE +++ b/R/pkg/NAMESPACE @@ -351,6 +351,8 @@ exportMethods("%<=>%", "row_number", "rpad", "rtrim", + "schema_of_csv", + "schema_of_json", "second", "sha1", "sha2", diff --git a/R/pkg/R/functions.R b/R/pkg/R/functions.R index f72645a257..f568a931ae 100644 --- a/R/pkg/R/functions.R +++ b/R/pkg/R/functions.R @@ -205,11 +205,18 @@ NULL #' also supported for the schema. #' \item \code{from_csv}: a DDL-formatted string #' } -#' @param ... additional argument(s). In \code{to_json}, \code{to_csv} and \code{from_json}, -#' this contains additional named properties to control how it is converted, accepts -#' the same options as the JSON/CSV data source. Additionally \code{to_json} supports -#' the "pretty" option which enables pretty JSON generation. In \code{arrays_zip}, -#' this contains additional Columns of arrays to be merged. +#' @param ... additional argument(s). +#' \itemize{ +#' \item \code{to_json}, \code{from_json} and \code{schema_of_json}: this contains +#' additional named properties to control how it is converted and accepts the +#' same options as the JSON data source. +#' \item \code{to_json}: it supports the "pretty" option which enables pretty +#' JSON generation. +#' \item \code{to_csv}, \code{from_csv} and \code{schema_of_csv}: this contains +#' additional named properties to control how it is converted and accepts the +#' same options as the CSV data source. +#' \item \code{arrays_zip}, this contains additional Columns of arrays to be merged. +#' } #' @name column_collection_functions #' @rdname column_collection_functions #' @family collection functions @@ -1771,12 +1778,16 @@ setMethod("to_date", #' df2 <- mutate(df2, people_json = to_json(df2$people)) #' #' # Converts a map into a JSON object -#' df2 <- sql("SELECT map('name', 'Bob')) as people") +#' df2 <- sql("SELECT map('name', 'Bob') as people") #' df2 <- mutate(df2, people_json = to_json(df2$people)) #' #' # Converts an array of maps into a JSON array #' df2 <- sql("SELECT array(map('name', 'Bob'), map('name', 'Alice')) as people") -#' df2 <- mutate(df2, people_json = to_json(df2$people))} +#' df2 <- mutate(df2, people_json = to_json(df2$people)) +#' +#' # Converts a map into a pretty JSON object +#' df2 <- sql("SELECT map('name', 'Bob') as people") +#' df2 <- mutate(df2, people_json = to_json(df2$people, pretty = TRUE))} #' @note to_json since 2.2.0 setMethod("to_json", signature(x = "Column"), function(x, ...) { @@ -2285,6 +2296,32 @@ setMethod("from_json", signature(x = "Column", schema = "characterOrstructType") column(jc) }) +#' @details +#' \code{schema_of_json}: Parses a JSON string and infers its schema in DDL format. +#' +#' @rdname column_collection_functions +#' @aliases schema_of_json schema_of_json,characterOrColumn-method +#' @examples +#' +#' \dontrun{ +#' json <- "{\"name\":\"Bob\"}" +#' df <- sql("SELECT * FROM range(1)") +#' head(select(df, schema_of_json(json)))} +#' @note schema_of_json since 3.0.0 +setMethod("schema_of_json", signature(x = "characterOrColumn"), + function(x, ...) { + if (class(x) == "character") { + col <- callJStatic("org.apache.spark.sql.functions", "lit", x) + } else { + col <- x@jc + } + options <- varargsToStrEnv(...) + jc <- callJStatic("org.apache.spark.sql.functions", + "schema_of_json", + col, options) + column(jc) + }) + #' @details #' \code{from_csv}: Parses a column containing a CSV string into a Column of \code{structType} #' with the specified \code{schema}. @@ -2315,6 +2352,32 @@ setMethod("from_csv", signature(x = "Column", schema = "characterOrColumn"), column(jc) }) +#' @details +#' \code{schema_of_csv}: Parses a CSV string and infers its schema in DDL format. +#' +#' @rdname column_collection_functions +#' @aliases schema_of_csv schema_of_csv,characterOrColumn-method +#' @examples +#' +#' \dontrun{ +#' csv <- "Amsterdam,2018" +#' df <- sql("SELECT * FROM range(1)") +#' head(select(df, schema_of_csv(csv)))} +#' @note schema_of_csv since 3.0.0 +setMethod("schema_of_csv", signature(x = "characterOrColumn"), + function(x, ...) { + if (class(x) == "character") { + col <- callJStatic("org.apache.spark.sql.functions", "lit", x) + } else { + col <- x@jc + } + options <- varargsToStrEnv(...) + jc <- callJStatic("org.apache.spark.sql.functions", + "schema_of_csv", + col, options) + column(jc) + }) + #' @details #' \code{from_utc_timestamp}: This is a common function for databases supporting TIMESTAMP WITHOUT #' TIMEZONE. This function takes a timestamp which is timezone-agnostic, and interprets it as a diff --git a/R/pkg/R/generics.R b/R/pkg/R/generics.R index b2ca6e6217..9d8c24c686 100644 --- a/R/pkg/R/generics.R +++ b/R/pkg/R/generics.R @@ -1206,6 +1206,14 @@ setGeneric("rpad", function(x, len, pad) { standardGeneric("rpad") }) #' @name NULL setGeneric("rtrim", function(x, trimString) { standardGeneric("rtrim") }) +#' @rdname column_collection_functions +#' @name NULL +setGeneric("schema_of_csv", function(x, ...) { standardGeneric("schema_of_csv") }) + +#' @rdname column_collection_functions +#' @name NULL +setGeneric("schema_of_json", function(x, ...) { standardGeneric("schema_of_json") }) + #' @rdname column_aggregate_functions #' @name NULL setGeneric("sd", function(x, na.rm = FALSE) { standardGeneric("sd") }) diff --git a/R/pkg/tests/fulltests/test_sparkSQL.R b/R/pkg/tests/fulltests/test_sparkSQL.R index 77a29c9eca..0d5118c127 100644 --- a/R/pkg/tests/fulltests/test_sparkSQL.R +++ b/R/pkg/tests/fulltests/test_sparkSQL.R @@ -1620,14 +1620,20 @@ test_that("column functions", { expect_equal(collect(select(df, bround(df$x, 0)))[[1]][1], 2) expect_equal(collect(select(df, bround(df$x, 0)))[[1]][2], 4) - # Test from_csv() + # Test from_csv(), schema_of_csv() df <- as.DataFrame(list(list("col" = "1"))) c <- collect(select(df, alias(from_csv(df$col, "a INT"), "csv"))) expect_equal(c[[1]][[1]]$a, 1) c <- collect(select(df, alias(from_csv(df$col, lit("a INT")), "csv"))) expect_equal(c[[1]][[1]]$a, 1) - # Test to_json(), from_json() + df <- as.DataFrame(list(list("col" = "1"))) + c <- collect(select(df, schema_of_csv("Amsterdam,2018"))) + expect_equal(c[[1]], "struct<_c0:string,_c1:int>") + c <- collect(select(df, schema_of_csv(lit("Amsterdam,2018")))) + expect_equal(c[[1]], "struct<_c0:string,_c1:int>") + + # Test to_json(), from_json(), schema_of_json() df <- sql("SELECT array(named_struct('name', 'Bob'), named_struct('name', 'Alice')) as people") j <- collect(select(df, alias(to_json(df$people), "json"))) expect_equal(j[order(j$json), ][1], "[{\"name\":\"Bob\"},{\"name\":\"Alice\"}]") @@ -1654,6 +1660,12 @@ test_that("column functions", { expect_true(any(apply(s, 1, function(x) { x[[1]]$age == 16 }))) } + df <- as.DataFrame(list(list("col" = "1"))) + c <- collect(select(df, schema_of_json('{"name":"Bob"}'))) + expect_equal(c[[1]], "struct") + c <- collect(select(df, schema_of_json(lit('{"name":"Bob"}')))) + expect_equal(c[[1]], "struct") + # Test to_json() supports arrays of primitive types and arrays df <- sql("SELECT array(19, 42, 70) as age") j <- collect(select(df, alias(to_json(df$age), "json")))