From 56a8510e19b3e0349e41d2a8903f4bf05ca00a28 Mon Sep 17 00:00:00 2001 From: zero323 Date: Thu, 19 Nov 2020 09:52:29 +0900 Subject: [PATCH] [SPARK-33304][R][SQL] Add from_avro and to_avro functions to SparkR ### What changes were proposed in this pull request? Adds `from_avro` and `to_avro` functions to SparkR. ### Why are the changes needed? Feature parity. ### Does this PR introduce _any_ user-facing change? New functions exposed in SparkR API. ### How was this patch tested? New unit tests. Closes #30216 from zero323/SPARK-33304. Authored-by: zero323 Signed-off-by: HyukjinKwon --- R/pkg/NAMESPACE | 2 + R/pkg/R/functions.R | 101 ++++++++++++++++++++++++++ R/pkg/R/generics.R | 9 ++- R/pkg/tests/fulltests/test_sparkSQL.R | 26 +++++++ R/run-tests.sh | 13 +++- docs/sql-data-sources-avro.md | 34 ++++++++- 6 files changed, 181 insertions(+), 4 deletions(-) diff --git a/R/pkg/NAMESPACE b/R/pkg/NAMESPACE index 404a6968ea..b927a6b96b 100644 --- a/R/pkg/NAMESPACE +++ b/R/pkg/NAMESPACE @@ -292,6 +292,7 @@ exportMethods("%<=>%", "floor", "format_number", "format_string", + "from_avro", "from_csv", "from_json", "from_unixtime", @@ -416,6 +417,7 @@ exportMethods("%<=>%", "timestamp_seconds", "toDegrees", "toRadians", + "to_avro", "to_csv", "to_date", "to_json", diff --git a/R/pkg/R/functions.R b/R/pkg/R/functions.R index bcd798a8c3..039d28a3a3 100644 --- a/R/pkg/R/functions.R +++ b/R/pkg/R/functions.R @@ -361,6 +361,50 @@ NULL #' } NULL +#' Avro processing functions for Column operations +#' +#' Avro processing functions defined for \code{Column}. +#' +#' @param x Column to compute on. +#' @param jsonFormatSchema character Avro schema in JSON string format +#' @param ... additional argument(s) passed as parser options. +#' @name column_avro_functions +#' @rdname column_avro_functions +#' @family avro functions +#' @note Avro is built-in but external data source module since Spark 2.4. +#' Please deploy the application as per +#' \href{https://spark.apache.org/docs/latest/sql-data-sources-avro.html#deploying}{ +#' the deployment section +#' } of "Apache Avro Data Source Guide". +#' @examples +#' \dontrun{ +#' df <- createDataFrame(iris) +#' schema <- paste( +#' c( +#' '{"type": "record", "namespace": "example.avro", "name": "Iris", "fields": [', +#' '{"type": ["double", "null"], "name": "Sepal_Length"},', +#' '{"type": ["double", "null"], "name": "Sepal_Width"},', +#' '{"type": ["double", "null"], "name": "Petal_Length"},', +#' '{"type": ["double", "null"], "name": "Petal_Width"},', +#' '{"type": ["string", "null"], "name": "Species"}]}' +#' ), +#' collapse="\\n" +#' ) +#' +#' df_serialized <- select( +#' df, +#' alias(to_avro(alias(struct(column("*")), "fields")), "payload") +#' ) +#' +#' df_deserialized <- select( +#' df_serialized, +#' from_avro(df_serialized$payload, schema) +#' ) +#' +#' head(df_deserialized) +#' } +NULL + #' @details #' \code{lit}: A new Column is created to represent the literal value. #' If the parameter is a Column, it is returned unchanged. @@ -4547,3 +4591,60 @@ setMethod("vector_to_array", ) column(jc) }) + +#' @details +#' \code{from_avro} Converts a binary column of Avro format into its corresponding catalyst value. +#' The specified schema must match the read data, otherwise the behavior is undefined: +#' it may fail or return arbitrary result. +#' To deserialize the data with a compatible and evolved schema, the expected Avro schema can be +#' set via the option avroSchema. +#' +#' @rdname column_avro_functions +#' @aliases from_avro from_avro,Column-method +#' @note from_avro since 3.1.0 +setMethod("from_avro", + signature(x = "characterOrColumn"), + function(x, jsonFormatSchema, ...) { + x <- if (is.character(x)) { + column(x) + } else { + x + } + + options <- varargsToStrEnv(...) + jc <- callJStatic( + "org.apache.spark.sql.avro.functions", "from_avro", + x@jc, + jsonFormatSchema, + options + ) + column(jc) + }) + +#' @details +#' \code{to_avro} Converts a column into binary of Avro format. +#' +#' @rdname column_avro_functions +#' @aliases to_avro to_avro,Column-method +#' @note to_avro since 3.1.0 +setMethod("to_avro", + signature(x = "characterOrColumn"), + function(x, jsonFormatSchema = NULL) { + x <- if (is.character(x)) { + column(x) + } else { + x + } + + jc <- if (is.null(jsonFormatSchema)) { + callJStatic("org.apache.spark.sql.avro.functions", "to_avro", x@jc) + } else { + callJStatic( + "org.apache.spark.sql.avro.functions", + "to_avro", + x@jc, + jsonFormatSchema + ) + } + column(jc) + }) diff --git a/R/pkg/R/generics.R b/R/pkg/R/generics.R index e372ae27e3..1fe6599bf1 100644 --- a/R/pkg/R/generics.R +++ b/R/pkg/R/generics.R @@ -950,7 +950,6 @@ setGeneric("current_date", function(x = "missing") { standardGeneric("current_da #' @name NULL setGeneric("current_timestamp", function(x = "missing") { standardGeneric("current_timestamp") }) - #' @rdname column_datetime_diff_functions #' @name NULL setGeneric("datediff", function(y, x) { standardGeneric("datediff") }) @@ -1015,6 +1014,10 @@ setGeneric("expr", function(x) { standardGeneric("expr") }) #' @name NULL setGeneric("flatten", function(x) { standardGeneric("flatten") }) +#' @rdname column_avro_functions +#' @name NULL +setGeneric("from_avro", function(x, ...) { standardGeneric("from_avro") }) + #' @rdname column_datetime_diff_functions #' @name NULL setGeneric("from_utc_timestamp", function(y, x) { standardGeneric("from_utc_timestamp") }) @@ -1388,6 +1391,10 @@ setGeneric("sumDistinct", function(x) { standardGeneric("sumDistinct") }) #' @name timestamp_seconds setGeneric("timestamp_seconds", function(x) { standardGeneric("timestamp_seconds") }) +#' @rdname column_avro_functions +#' @name NULL +setGeneric("to_avro", function(x, ...) { standardGeneric("to_avro") }) + #' @rdname column_collection_functions #' @name NULL setGeneric("transform_keys", function(x, f) { standardGeneric("transform_keys") }) diff --git a/R/pkg/tests/fulltests/test_sparkSQL.R b/R/pkg/tests/fulltests/test_sparkSQL.R index 3a0d359e2a..45de1ef1bd 100644 --- a/R/pkg/tests/fulltests/test_sparkSQL.R +++ b/R/pkg/tests/fulltests/test_sparkSQL.R @@ -1841,6 +1841,32 @@ test_that("column functions", { ) }) +test_that("avro column functions", { + skip_if_not( + grepl("spark-avro", sparkR.conf("spark.jars", "")), + "spark-avro jar not present" + ) + + schema <- '{"namespace": "example.avro", + "type": "record", + "name": "User", + "fields": [ + {"name": "name", "type": "string"}, + {"name": "favorite_color", "type": ["string", "null"]} + ] + }' + + c0 <- column("foo") + c1 <- from_avro(c0, schema) + expect_s4_class(c1, "Column") + c2 <- from_avro("foo", schema) + expect_s4_class(c2, "Column") + c3 <- to_avro(c1) + expect_s4_class(c3, "Column") + c4 <- to_avro(c1, schema) + expect_s4_class(c4, "Column") +}) + test_that("column binary mathfunctions", { lines <- c("{\"a\":1, \"b\":5}", "{\"a\":2, \"b\":6}", diff --git a/R/run-tests.sh b/R/run-tests.sh index 51ca7d600c..edc2b2b60b 100755 --- a/R/run-tests.sh +++ b/R/run-tests.sh @@ -23,7 +23,18 @@ FAILED=0 LOGFILE=$FWDIR/unit-tests.out rm -f $LOGFILE -SPARK_TESTING=1 NOT_CRAN=true $FWDIR/../bin/spark-submit --driver-java-options "-Dlog4j.configuration=file:$FWDIR/log4j.properties" --conf spark.hadoop.fs.defaultFS="file:///" --conf spark.driver.extraJavaOptions="-Dio.netty.tryReflectionSetAccessible=true" --conf spark.executor.extraJavaOptions="-Dio.netty.tryReflectionSetAccessible=true" $FWDIR/pkg/tests/run-all.R 2>&1 | tee -a $LOGFILE +SPARK_AVRO_JAR_PATH=$(find $FWDIR/../external/avro/ -name "spark-avro*jar" -print | egrep -v "tests.jar|test-sources.jar|sources.jar|javadoc.jar") + +if [[ $(echo $SPARK_AVRO_JAR_PATH | wc -l) -eq 1 ]]; then + SPARK_JARS=$SPARK_AVRO_JAR_PATH +fi + +if [ -z "$SPARK_JARS" ]; then + SPARK_TESTING=1 NOT_CRAN=true $FWDIR/../bin/spark-submit --driver-java-options "-Dlog4j.configuration=file:$FWDIR/log4j.properties" --conf spark.hadoop.fs.defaultFS="file:///" --conf spark.driver.extraJavaOptions="-Dio.netty.tryReflectionSetAccessible=true" --conf spark.executor.extraJavaOptions="-Dio.netty.tryReflectionSetAccessible=true" $FWDIR/pkg/tests/run-all.R 2>&1 | tee -a $LOGFILE +else + SPARK_TESTING=1 NOT_CRAN=true $FWDIR/../bin/spark-submit --jars $SPARK_JARS --driver-java-options "-Dlog4j.configuration=file:$FWDIR/log4j.properties" --conf spark.hadoop.fs.defaultFS="file:///" --conf spark.driver.extraJavaOptions="-Dio.netty.tryReflectionSetAccessible=true" --conf spark.executor.extraJavaOptions="-Dio.netty.tryReflectionSetAccessible=true" $FWDIR/pkg/tests/run-all.R 2>&1 | tee -a $LOGFILE +fi + FAILED=$((PIPESTATUS[0]||$FAILED)) NUM_TEST_WARNING="$(grep -c -e 'Warnings ----------------' $LOGFILE)" diff --git a/docs/sql-data-sources-avro.md b/docs/sql-data-sources-avro.md index 69b165ed28..9ecc6eb91d 100644 --- a/docs/sql-data-sources-avro.md +++ b/docs/sql-data-sources-avro.md @@ -88,8 +88,6 @@ Kafka key-value record will be augmented with some metadata, such as the ingesti * If the "value" field that contains your data is in Avro, you could use `from_avro()` to extract your data, enrich it, clean it, and then push it downstream to Kafka again or write it out to a file. * `to_avro()` can be used to turn structs into Avro records. This method is particularly useful when you would like to re-encode multiple columns into a single one when writing data out to Kafka. -Both functions are currently only available in Scala, Java, and Python. -
{% highlight scala %} @@ -183,6 +181,38 @@ query = output\ .option("topic", "topic2")\ .start() +{% endhighlight %} +
+
+{% highlight r %} + +# `from_avro` requires Avro schema in JSON string format. +jsonFormatSchema <- paste0(readLines("examples/src/main/resources/user.avsc"), collapse=" ") + +df <- read.stream( + "kafka", + kafka.bootstrap.servers = "host1:port1,host2:port2", + subscribe = "topic1" +) + +# 1. Decode the Avro data into a struct; +# 2. Filter by column `favorite_color`; +# 3. Encode the column `name` in Avro format. + +output <- select( + filter( + select(df, alias(from_avro("value", jsonFormatSchema), "user")), + column("user.favorite_color") == "red" + ), + alias(to_avro("user.name"), "value") +) + +write.stream( + output, + "kafka", + kafka.bootstrap.servers = "host1:port1,host2:port2", + topic = "topic2" +) {% endhighlight %}