[SPARK-25446][R] Add schema_of_json() and schema_of_csv() to R

## What changes were proposed in this pull request?

This PR proposes to expose `schema_of_json` and `schema_of_csv` at R side.

**`schema_of_json`**:

```r
json <- '{"name":"Bob"}'
df <- sql("SELECT * FROM range(1)")
head(select(df, schema_of_json(json)))
```

```
  schema_of_json({"name":"Bob"})
1            struct<name:string>
```

**`schema_of_csv`**:

```r
csv <- "Amsterdam,2018"
df <- sql("SELECT * FROM range(1)")
head(select(df, schema_of_csv(csv)))
```

```
  schema_of_csv(Amsterdam,2018)
1    struct<_c0:string,_c1:int>
```

## How was this patch tested?

Manually tested, unit tests added, documentation manually built and verified.

Closes #22939 from HyukjinKwon/SPARK-25446.

Authored-by: hyukjinkwon <gurwls223@apache.org>
Signed-off-by: hyukjinkwon <gurwls223@apache.org>
This commit is contained in:
hyukjinkwon 2018-11-30 10:29:30 +08:00
parent 0166c7373e
commit 66b2046462
4 changed files with 94 additions and 9 deletions

View file

@ -351,6 +351,8 @@ exportMethods("%<=>%",
"row_number",
"rpad",
"rtrim",
"schema_of_csv",
"schema_of_json",
"second",
"sha1",
"sha2",

View file

@ -205,11 +205,18 @@ NULL
#' also supported for the schema.
#' \item \code{from_csv}: a DDL-formatted string
#' }
#' @param ... additional argument(s). In \code{to_json}, \code{to_csv} and \code{from_json},
#' this contains additional named properties to control how it is converted, accepts
#' the same options as the JSON/CSV data source. Additionally \code{to_json} supports
#' the "pretty" option which enables pretty JSON generation. In \code{arrays_zip},
#' this contains additional Columns of arrays to be merged.
#' @param ... additional argument(s).
#' \itemize{
#' \item \code{to_json}, \code{from_json} and \code{schema_of_json}: this contains
#' additional named properties to control how it is converted and accepts the
#' same options as the JSON data source.
#' \item \code{to_json}: it supports the "pretty" option which enables pretty
#' JSON generation.
#' \item \code{to_csv}, \code{from_csv} and \code{schema_of_csv}: this contains
#' additional named properties to control how it is converted and accepts the
#' same options as the CSV data source.
#' \item \code{arrays_zip}, this contains additional Columns of arrays to be merged.
#' }
#' @name column_collection_functions
#' @rdname column_collection_functions
#' @family collection functions
@ -1771,12 +1778,16 @@ setMethod("to_date",
#' df2 <- mutate(df2, people_json = to_json(df2$people))
#'
#' # Converts a map into a JSON object
#' df2 <- sql("SELECT map('name', 'Bob')) as people")
#' df2 <- sql("SELECT map('name', 'Bob') as people")
#' df2 <- mutate(df2, people_json = to_json(df2$people))
#'
#' # Converts an array of maps into a JSON array
#' df2 <- sql("SELECT array(map('name', 'Bob'), map('name', 'Alice')) as people")
#' df2 <- mutate(df2, people_json = to_json(df2$people))}
#' df2 <- mutate(df2, people_json = to_json(df2$people))
#'
#' # Converts a map into a pretty JSON object
#' df2 <- sql("SELECT map('name', 'Bob') as people")
#' df2 <- mutate(df2, people_json = to_json(df2$people, pretty = TRUE))}
#' @note to_json since 2.2.0
setMethod("to_json", signature(x = "Column"),
function(x, ...) {
@ -2285,6 +2296,32 @@ setMethod("from_json", signature(x = "Column", schema = "characterOrstructType")
column(jc)
})
#' @details
#' \code{schema_of_json}: Parses a JSON string and infers its schema in DDL format.
#'
#' @rdname column_collection_functions
#' @aliases schema_of_json schema_of_json,characterOrColumn-method
#' @examples
#'
#' \dontrun{
#' json <- "{\"name\":\"Bob\"}"
#' df <- sql("SELECT * FROM range(1)")
#' head(select(df, schema_of_json(json)))}
#' @note schema_of_json since 3.0.0
setMethod("schema_of_json", signature(x = "characterOrColumn"),
function(x, ...) {
if (class(x) == "character") {
col <- callJStatic("org.apache.spark.sql.functions", "lit", x)
} else {
col <- x@jc
}
options <- varargsToStrEnv(...)
jc <- callJStatic("org.apache.spark.sql.functions",
"schema_of_json",
col, options)
column(jc)
})
#' @details
#' \code{from_csv}: Parses a column containing a CSV string into a Column of \code{structType}
#' with the specified \code{schema}.
@ -2315,6 +2352,32 @@ setMethod("from_csv", signature(x = "Column", schema = "characterOrColumn"),
column(jc)
})
#' @details
#' \code{schema_of_csv}: Parses a CSV string and infers its schema in DDL format.
#'
#' @rdname column_collection_functions
#' @aliases schema_of_csv schema_of_csv,characterOrColumn-method
#' @examples
#'
#' \dontrun{
#' csv <- "Amsterdam,2018"
#' df <- sql("SELECT * FROM range(1)")
#' head(select(df, schema_of_csv(csv)))}
#' @note schema_of_csv since 3.0.0
setMethod("schema_of_csv", signature(x = "characterOrColumn"),
function(x, ...) {
if (class(x) == "character") {
col <- callJStatic("org.apache.spark.sql.functions", "lit", x)
} else {
col <- x@jc
}
options <- varargsToStrEnv(...)
jc <- callJStatic("org.apache.spark.sql.functions",
"schema_of_csv",
col, options)
column(jc)
})
#' @details
#' \code{from_utc_timestamp}: This is a common function for databases supporting TIMESTAMP WITHOUT
#' TIMEZONE. This function takes a timestamp which is timezone-agnostic, and interprets it as a

View file

@ -1206,6 +1206,14 @@ setGeneric("rpad", function(x, len, pad) { standardGeneric("rpad") })
#' @name NULL
setGeneric("rtrim", function(x, trimString) { standardGeneric("rtrim") })
#' @rdname column_collection_functions
#' @name NULL
setGeneric("schema_of_csv", function(x, ...) { standardGeneric("schema_of_csv") })
#' @rdname column_collection_functions
#' @name NULL
setGeneric("schema_of_json", function(x, ...) { standardGeneric("schema_of_json") })
#' @rdname column_aggregate_functions
#' @name NULL
setGeneric("sd", function(x, na.rm = FALSE) { standardGeneric("sd") })

View file

@ -1620,14 +1620,20 @@ test_that("column functions", {
expect_equal(collect(select(df, bround(df$x, 0)))[[1]][1], 2)
expect_equal(collect(select(df, bround(df$x, 0)))[[1]][2], 4)
# Test from_csv()
# Test from_csv(), schema_of_csv()
df <- as.DataFrame(list(list("col" = "1")))
c <- collect(select(df, alias(from_csv(df$col, "a INT"), "csv")))
expect_equal(c[[1]][[1]]$a, 1)
c <- collect(select(df, alias(from_csv(df$col, lit("a INT")), "csv")))
expect_equal(c[[1]][[1]]$a, 1)
# Test to_json(), from_json()
df <- as.DataFrame(list(list("col" = "1")))
c <- collect(select(df, schema_of_csv("Amsterdam,2018")))
expect_equal(c[[1]], "struct<_c0:string,_c1:int>")
c <- collect(select(df, schema_of_csv(lit("Amsterdam,2018"))))
expect_equal(c[[1]], "struct<_c0:string,_c1:int>")
# Test to_json(), from_json(), schema_of_json()
df <- sql("SELECT array(named_struct('name', 'Bob'), named_struct('name', 'Alice')) as people")
j <- collect(select(df, alias(to_json(df$people), "json")))
expect_equal(j[order(j$json), ][1], "[{\"name\":\"Bob\"},{\"name\":\"Alice\"}]")
@ -1654,6 +1660,12 @@ test_that("column functions", {
expect_true(any(apply(s, 1, function(x) { x[[1]]$age == 16 })))
}
df <- as.DataFrame(list(list("col" = "1")))
c <- collect(select(df, schema_of_json('{"name":"Bob"}')))
expect_equal(c[[1]], "struct<name:string>")
c <- collect(select(df, schema_of_json(lit('{"name":"Bob"}'))))
expect_equal(c[[1]], "struct<name:string>")
# Test to_json() supports arrays of primitive types and arrays
df <- sql("SELECT array(19, 42, 70) as age")
j <- collect(select(df, alias(to_json(df$age), "json")))