[SPARK-33304][R][SQL] Add from_avro and to_avro functions to SparkR

### What changes were proposed in this pull request?

Adds `from_avro` and `to_avro` functions to SparkR.

### Why are the changes needed?

Feature parity.

### Does this PR introduce _any_ user-facing change?

New functions exposed in SparkR API.

### How was this patch tested?

New unit tests.

Closes #30216 from zero323/SPARK-33304.

Authored-by: zero323 <mszymkiewicz@gmail.com>
Signed-off-by: HyukjinKwon <gurwls223@apache.org>
This commit is contained in:
zero323 2020-11-19 09:52:29 +09:00 committed by HyukjinKwon
parent 9a4c79073b
commit 56a8510e19
6 changed files with 181 additions and 4 deletions

View file

@ -292,6 +292,7 @@ exportMethods("%<=>%",
"floor",
"format_number",
"format_string",
"from_avro",
"from_csv",
"from_json",
"from_unixtime",
@ -416,6 +417,7 @@ exportMethods("%<=>%",
"timestamp_seconds",
"toDegrees",
"toRadians",
"to_avro",
"to_csv",
"to_date",
"to_json",

View file

@ -361,6 +361,50 @@ NULL
#' }
NULL
#' Avro processing functions for Column operations
#'
#' Avro processing functions defined for \code{Column}.
#'
#' @param x Column to compute on.
#' @param jsonFormatSchema character Avro schema in JSON string format
#' @param ... additional argument(s) passed as parser options.
#' @name column_avro_functions
#' @rdname column_avro_functions
#' @family avro functions
#' @note Avro is built-in but external data source module since Spark 2.4.
#' Please deploy the application as per
#' \href{https://spark.apache.org/docs/latest/sql-data-sources-avro.html#deploying}{
#' the deployment section
#' } of "Apache Avro Data Source Guide".
#' @examples
#' \dontrun{
#' df <- createDataFrame(iris)
#' schema <- paste(
#' c(
#' '{"type": "record", "namespace": "example.avro", "name": "Iris", "fields": [',
#' '{"type": ["double", "null"], "name": "Sepal_Length"},',
#' '{"type": ["double", "null"], "name": "Sepal_Width"},',
#' '{"type": ["double", "null"], "name": "Petal_Length"},',
#' '{"type": ["double", "null"], "name": "Petal_Width"},',
#' '{"type": ["string", "null"], "name": "Species"}]}'
#' ),
#' collapse="\\n"
#' )
#'
#' df_serialized <- select(
#' df,
#' alias(to_avro(alias(struct(column("*")), "fields")), "payload")
#' )
#'
#' df_deserialized <- select(
#' df_serialized,
#' from_avro(df_serialized$payload, schema)
#' )
#'
#' head(df_deserialized)
#' }
NULL
#' @details
#' \code{lit}: A new Column is created to represent the literal value.
#' If the parameter is a Column, it is returned unchanged.
@ -4547,3 +4591,60 @@ setMethod("vector_to_array",
)
column(jc)
})
#' @details
#' \code{from_avro} Converts a binary column of Avro format into its corresponding catalyst value.
#' The specified schema must match the read data, otherwise the behavior is undefined:
#' it may fail or return arbitrary result.
#' To deserialize the data with a compatible and evolved schema, the expected Avro schema can be
#' set via the option avroSchema.
#'
#' @rdname column_avro_functions
#' @aliases from_avro from_avro,Column-method
#' @note from_avro since 3.1.0
setMethod("from_avro",
signature(x = "characterOrColumn"),
function(x, jsonFormatSchema, ...) {
x <- if (is.character(x)) {
column(x)
} else {
x
}
options <- varargsToStrEnv(...)
jc <- callJStatic(
"org.apache.spark.sql.avro.functions", "from_avro",
x@jc,
jsonFormatSchema,
options
)
column(jc)
})
#' @details
#' \code{to_avro} Converts a column into binary of Avro format.
#'
#' @rdname column_avro_functions
#' @aliases to_avro to_avro,Column-method
#' @note to_avro since 3.1.0
setMethod("to_avro",
signature(x = "characterOrColumn"),
function(x, jsonFormatSchema = NULL) {
x <- if (is.character(x)) {
column(x)
} else {
x
}
jc <- if (is.null(jsonFormatSchema)) {
callJStatic("org.apache.spark.sql.avro.functions", "to_avro", x@jc)
} else {
callJStatic(
"org.apache.spark.sql.avro.functions",
"to_avro",
x@jc,
jsonFormatSchema
)
}
column(jc)
})

View file

@ -950,7 +950,6 @@ setGeneric("current_date", function(x = "missing") { standardGeneric("current_da
#' @name NULL
setGeneric("current_timestamp", function(x = "missing") { standardGeneric("current_timestamp") })
#' @rdname column_datetime_diff_functions
#' @name NULL
setGeneric("datediff", function(y, x) { standardGeneric("datediff") })
@ -1015,6 +1014,10 @@ setGeneric("expr", function(x) { standardGeneric("expr") })
#' @name NULL
setGeneric("flatten", function(x) { standardGeneric("flatten") })
#' @rdname column_avro_functions
#' @name NULL
setGeneric("from_avro", function(x, ...) { standardGeneric("from_avro") })
#' @rdname column_datetime_diff_functions
#' @name NULL
setGeneric("from_utc_timestamp", function(y, x) { standardGeneric("from_utc_timestamp") })
@ -1388,6 +1391,10 @@ setGeneric("sumDistinct", function(x) { standardGeneric("sumDistinct") })
#' @name timestamp_seconds
setGeneric("timestamp_seconds", function(x) { standardGeneric("timestamp_seconds") })
#' @rdname column_avro_functions
#' @name NULL
setGeneric("to_avro", function(x, ...) { standardGeneric("to_avro") })
#' @rdname column_collection_functions
#' @name NULL
setGeneric("transform_keys", function(x, f) { standardGeneric("transform_keys") })

View file

@ -1841,6 +1841,32 @@ test_that("column functions", {
)
})
test_that("avro column functions", {
skip_if_not(
grepl("spark-avro", sparkR.conf("spark.jars", "")),
"spark-avro jar not present"
)
schema <- '{"namespace": "example.avro",
"type": "record",
"name": "User",
"fields": [
{"name": "name", "type": "string"},
{"name": "favorite_color", "type": ["string", "null"]}
]
}'
c0 <- column("foo")
c1 <- from_avro(c0, schema)
expect_s4_class(c1, "Column")
c2 <- from_avro("foo", schema)
expect_s4_class(c2, "Column")
c3 <- to_avro(c1)
expect_s4_class(c3, "Column")
c4 <- to_avro(c1, schema)
expect_s4_class(c4, "Column")
})
test_that("column binary mathfunctions", {
lines <- c("{\"a\":1, \"b\":5}",
"{\"a\":2, \"b\":6}",

View file

@ -23,7 +23,18 @@ FAILED=0
LOGFILE=$FWDIR/unit-tests.out
rm -f $LOGFILE
SPARK_TESTING=1 NOT_CRAN=true $FWDIR/../bin/spark-submit --driver-java-options "-Dlog4j.configuration=file:$FWDIR/log4j.properties" --conf spark.hadoop.fs.defaultFS="file:///" --conf spark.driver.extraJavaOptions="-Dio.netty.tryReflectionSetAccessible=true" --conf spark.executor.extraJavaOptions="-Dio.netty.tryReflectionSetAccessible=true" $FWDIR/pkg/tests/run-all.R 2>&1 | tee -a $LOGFILE
SPARK_AVRO_JAR_PATH=$(find $FWDIR/../external/avro/ -name "spark-avro*jar" -print | egrep -v "tests.jar|test-sources.jar|sources.jar|javadoc.jar")
if [[ $(echo $SPARK_AVRO_JAR_PATH | wc -l) -eq 1 ]]; then
SPARK_JARS=$SPARK_AVRO_JAR_PATH
fi
if [ -z "$SPARK_JARS" ]; then
SPARK_TESTING=1 NOT_CRAN=true $FWDIR/../bin/spark-submit --driver-java-options "-Dlog4j.configuration=file:$FWDIR/log4j.properties" --conf spark.hadoop.fs.defaultFS="file:///" --conf spark.driver.extraJavaOptions="-Dio.netty.tryReflectionSetAccessible=true" --conf spark.executor.extraJavaOptions="-Dio.netty.tryReflectionSetAccessible=true" $FWDIR/pkg/tests/run-all.R 2>&1 | tee -a $LOGFILE
else
SPARK_TESTING=1 NOT_CRAN=true $FWDIR/../bin/spark-submit --jars $SPARK_JARS --driver-java-options "-Dlog4j.configuration=file:$FWDIR/log4j.properties" --conf spark.hadoop.fs.defaultFS="file:///" --conf spark.driver.extraJavaOptions="-Dio.netty.tryReflectionSetAccessible=true" --conf spark.executor.extraJavaOptions="-Dio.netty.tryReflectionSetAccessible=true" $FWDIR/pkg/tests/run-all.R 2>&1 | tee -a $LOGFILE
fi
FAILED=$((PIPESTATUS[0]||$FAILED))
NUM_TEST_WARNING="$(grep -c -e 'Warnings ----------------' $LOGFILE)"

View file

@ -88,8 +88,6 @@ Kafka key-value record will be augmented with some metadata, such as the ingesti
* If the "value" field that contains your data is in Avro, you could use `from_avro()` to extract your data, enrich it, clean it, and then push it downstream to Kafka again or write it out to a file.
* `to_avro()` can be used to turn structs into Avro records. This method is particularly useful when you would like to re-encode multiple columns into a single one when writing data out to Kafka.
Both functions are currently only available in Scala, Java, and Python.
<div class="codetabs">
<div data-lang="scala" markdown="1">
{% highlight scala %}
@ -183,6 +181,38 @@ query = output\
.option("topic", "topic2")\
.start()
{% endhighlight %}
</div>
<div data-lang="r" markdown="1">
{% highlight r %}
# `from_avro` requires Avro schema in JSON string format.
jsonFormatSchema <- paste0(readLines("examples/src/main/resources/user.avsc"), collapse=" ")
df <- read.stream(
"kafka",
kafka.bootstrap.servers = "host1:port1,host2:port2",
subscribe = "topic1"
)
# 1. Decode the Avro data into a struct;
# 2. Filter by column `favorite_color`;
# 3. Encode the column `name` in Avro format.
output <- select(
filter(
select(df, alias(from_avro("value", jsonFormatSchema), "user")),
column("user.favorite_color") == "red"
),
alias(to_avro("user.name"), "value")
)
write.stream(
output,
"kafka",
kafka.bootstrap.servers = "host1:port1,host2:port2",
topic = "topic2"
)
{% endhighlight %}
</div>
</div>