[SPARK-19654][SPARKR][SS] Structured Streaming API for R

## What changes were proposed in this pull request?

Add "experimental" API for SS in R

## How was this patch tested?

manual, unit tests

Author: Felix Cheung <felixcheung_m@hotmail.com>

Closes #16982 from felixcheung/rss.
This commit is contained in:
Felix Cheung 2017-03-18 16:26:48 -07:00 committed by Felix Cheung
parent 54e61df263
commit 5c165596da
8 changed files with 573 additions and 5 deletions

View file

@ -54,5 +54,6 @@ Collate:
'types.R'
'utils.R'
'window.R'
'streaming.R'
RoxygenNote: 5.0.1
VignetteBuilder: knitr

View file

@ -121,6 +121,7 @@ exportMethods("arrange",
"insertInto",
"intersect",
"isLocal",
"isStreaming",
"join",
"limit",
"merge",
@ -169,6 +170,7 @@ exportMethods("arrange",
"write.json",
"write.orc",
"write.parquet",
"write.stream",
"write.text",
"write.ml")
@ -365,6 +367,7 @@ export("as.DataFrame",
"read.json",
"read.orc",
"read.parquet",
"read.stream",
"read.text",
"spark.lapply",
"spark.addFile",
@ -402,6 +405,16 @@ export("partitionBy",
export("windowPartitionBy",
"windowOrderBy")
exportClasses("StreamingQuery")
export("awaitTermination",
"isActive",
"lastProgress",
"queryName",
"status",
"stopQuery")
S3method(print, jobj)
S3method(print, structField)
S3method(print, structType)

View file

@ -133,9 +133,6 @@ setMethod("schema",
#'
#' Print the logical and physical Catalyst plans to the console for debugging.
#'
#' @param x a SparkDataFrame.
#' @param extended Logical. If extended is FALSE, explain() only prints the physical plan.
#' @param ... further arguments to be passed to or from other methods.
#' @family SparkDataFrame functions
#' @aliases explain,SparkDataFrame-method
#' @rdname explain
@ -3515,3 +3512,104 @@ setMethod("getNumPartitions",
function(x) {
callJMethod(callJMethod(x@sdf, "rdd"), "getNumPartitions")
})
#' isStreaming
#'
#' Returns TRUE if this SparkDataFrame contains one or more sources that continuously return data
#' as it arrives.
#'
#' @param x A SparkDataFrame
#' @return TRUE if this SparkDataFrame is from a streaming source
#' @family SparkDataFrame functions
#' @aliases isStreaming,SparkDataFrame-method
#' @rdname isStreaming
#' @name isStreaming
#' @seealso \link{read.stream} \link{write.stream}
#' @export
#' @examples
#'\dontrun{
#' sparkR.session()
#' df <- read.stream("socket", host = "localhost", port = 9999)
#' isStreaming(df)
#' }
#' @note isStreaming since 2.2.0
#' @note experimental
setMethod("isStreaming",
signature(x = "SparkDataFrame"),
function(x) {
callJMethod(x@sdf, "isStreaming")
})
#' Write the streaming SparkDataFrame to a data source.
#'
#' The data source is specified by the \code{source} and a set of options (...).
#' If \code{source} is not specified, the default data source configured by
#' spark.sql.sources.default will be used.
#'
#' Additionally, \code{outputMode} specifies how data of a streaming SparkDataFrame is written to a
#' output data source. There are three modes:
#' \itemize{
#' \item append: Only the new rows in the streaming SparkDataFrame will be written out. This
#' output mode can be only be used in queries that do not contain any aggregation.
#' \item complete: All the rows in the streaming SparkDataFrame will be written out every time
#' there are some updates. This output mode can only be used in queries that
#' contain aggregations.
#' \item update: Only the rows that were updated in the streaming SparkDataFrame will be written
#' out every time there are some updates. If the query doesn't contain aggregations,
#' it will be equivalent to \code{append} mode.
#' }
#'
#' @param df a streaming SparkDataFrame.
#' @param source a name for external data source.
#' @param outputMode one of 'append', 'complete', 'update'.
#' @param ... additional argument(s) passed to the method.
#'
#' @family SparkDataFrame functions
#' @seealso \link{read.stream}
#' @aliases write.stream,SparkDataFrame-method
#' @rdname write.stream
#' @name write.stream
#' @export
#' @examples
#'\dontrun{
#' sparkR.session()
#' df <- read.stream("socket", host = "localhost", port = 9999)
#' isStreaming(df)
#' wordCounts <- count(group_by(df, "value"))
#'
#' # console
#' q <- write.stream(wordCounts, "console", outputMode = "complete")
#' # text stream
#' q <- write.stream(df, "text", path = "/home/user/out", checkpointLocation = "/home/user/cp")
#' # memory stream
#' q <- write.stream(wordCounts, "memory", queryName = "outs", outputMode = "complete")
#' head(sql("SELECT * from outs"))
#' queryName(q)
#'
#' stopQuery(q)
#' }
#' @note write.stream since 2.2.0
#' @note experimental
setMethod("write.stream",
signature(df = "SparkDataFrame"),
function(df, source = NULL, outputMode = NULL, ...) {
if (!is.null(source) && !is.character(source)) {
stop("source should be character, NULL or omitted. It is the data source specified ",
"in 'spark.sql.sources.default' configuration by default.")
}
if (!is.null(outputMode) && !is.character(outputMode)) {
stop("outputMode should be charactor or omitted.")
}
if (is.null(source)) {
source <- getDefaultSqlSource()
}
options <- varargsToStrEnv(...)
write <- handledCallJMethod(df@sdf, "writeStream")
write <- callJMethod(write, "format", source)
if (!is.null(outputMode)) {
write <- callJMethod(write, "outputMode", outputMode)
}
write <- callJMethod(write, "options", options)
ssq <- handledCallJMethod(write, "start")
streamingQuery(ssq)
})

View file

@ -937,3 +937,53 @@ read.jdbc <- function(url, tableName,
}
dataFrame(sdf)
}
#' Load a streaming SparkDataFrame
#'
#' Returns the dataset in a data source as a SparkDataFrame
#'
#' The data source is specified by the \code{source} and a set of options(...).
#' If \code{source} is not specified, the default data source configured by
#' "spark.sql.sources.default" will be used.
#'
#' @param source The name of external data source
#' @param schema The data schema defined in structType, this is required for file-based streaming
#' data source
#' @param ... additional external data source specific named options, for instance \code{path} for
#' file-based streaming data source
#' @return SparkDataFrame
#' @rdname read.stream
#' @name read.stream
#' @seealso \link{write.stream}
#' @export
#' @examples
#'\dontrun{
#' sparkR.session()
#' df <- read.stream("socket", host = "localhost", port = 9999)
#' q <- write.stream(df, "text", path = "/home/user/out", checkpointLocation = "/home/user/cp")
#'
#' df <- read.stream("json", path = jsonDir, schema = schema, maxFilesPerTrigger = 1)
#' }
#' @name read.stream
#' @note read.stream since 2.2.0
#' @note experimental
read.stream <- function(source = NULL, schema = NULL, ...) {
sparkSession <- getSparkSession()
if (!is.null(source) && !is.character(source)) {
stop("source should be character, NULL or omitted. It is the data source specified ",
"in 'spark.sql.sources.default' configuration by default.")
}
if (is.null(source)) {
source <- getDefaultSqlSource()
}
options <- varargsToStrEnv(...)
read <- callJMethod(sparkSession, "readStream")
read <- callJMethod(read, "format", source)
if (!is.null(schema)) {
stopifnot(class(schema) == "structType")
read <- callJMethod(read, "schema", schema$jobj)
}
read <- callJMethod(read, "options", options)
sdf <- handledCallJMethod(read, "load")
dataFrame(callJMethod(sdf, "toDF"))
}

View file

@ -539,6 +539,9 @@ setGeneric("dtypes", function(x) { standardGeneric("dtypes") })
#' @rdname explain
#' @export
#' @param x a SparkDataFrame or a StreamingQuery.
#' @param extended Logical. If extended is FALSE, prints only the physical plan.
#' @param ... further arguments to be passed to or from other methods.
setGeneric("explain", function(x, ...) { standardGeneric("explain") })
#' @rdname except
@ -577,6 +580,10 @@ setGeneric("intersect", function(x, y) { standardGeneric("intersect") })
#' @export
setGeneric("isLocal", function(x) { standardGeneric("isLocal") })
#' @rdname isStreaming
#' @export
setGeneric("isStreaming", function(x) { standardGeneric("isStreaming") })
#' @rdname limit
#' @export
setGeneric("limit", function(x, num) {standardGeneric("limit") })
@ -682,6 +689,12 @@ setGeneric("write.parquet", function(x, path, ...) {
#' @export
setGeneric("saveAsParquetFile", function(x, path) { standardGeneric("saveAsParquetFile") })
#' @rdname write.stream
#' @export
setGeneric("write.stream", function(df, source = NULL, outputMode = NULL, ...) {
standardGeneric("write.stream")
})
#' @rdname write.text
#' @export
setGeneric("write.text", function(x, path, ...) { standardGeneric("write.text") })
@ -1428,10 +1441,36 @@ setGeneric("spark.posterior", function(object, newData) { standardGeneric("spark
#' @export
setGeneric("spark.perplexity", function(object, data) { standardGeneric("spark.perplexity") })
#' @param object a fitted ML model object.
#' @param path the directory where the model is saved.
#' @param ... additional argument(s) passed to the method.
#' @rdname write.ml
#' @export
setGeneric("write.ml", function(object, path, ...) { standardGeneric("write.ml") })
###################### Streaming Methods ##########################
#' @rdname awaitTermination
#' @export
setGeneric("awaitTermination", function(x, timeout) { standardGeneric("awaitTermination") })
#' @rdname isActive
#' @export
setGeneric("isActive", function(x) { standardGeneric("isActive") })
#' @rdname lastProgress
#' @export
setGeneric("lastProgress", function(x) { standardGeneric("lastProgress") })
#' @rdname queryName
#' @export
setGeneric("queryName", function(x) { standardGeneric("queryName") })
#' @rdname status
#' @export
setGeneric("status", function(x) { standardGeneric("status") })
#' @rdname stopQuery
#' @export
setGeneric("stopQuery", function(x) { standardGeneric("stopQuery") })

208
R/pkg/R/streaming.R Normal file
View file

@ -0,0 +1,208 @@
#
# Licensed to the Apache Software Foundation (ASF) under one or more
# contributor license agreements. See the NOTICE file distributed with
# this work for additional information regarding copyright ownership.
# The ASF licenses this file to You under the Apache License, Version 2.0
# (the "License"); you may not use this file except in compliance with
# the License. You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.
#
# streaming.R - Structured Streaming / StreamingQuery class and methods implemented in S4 OO classes
#' @include generics.R jobj.R
NULL
#' S4 class that represents a StreamingQuery
#'
#' StreamingQuery can be created by using read.stream() and write.stream()
#'
#' @rdname StreamingQuery
#' @seealso \link{read.stream}
#'
#' @param ssq A Java object reference to the backing Scala StreamingQuery
#' @export
#' @note StreamingQuery since 2.2.0
#' @note experimental
setClass("StreamingQuery",
slots = list(ssq = "jobj"))
setMethod("initialize", "StreamingQuery", function(.Object, ssq) {
.Object@ssq <- ssq
.Object
})
streamingQuery <- function(ssq) {
stopifnot(class(ssq) == "jobj")
new("StreamingQuery", ssq)
}
#' @rdname show
#' @export
#' @note show(StreamingQuery) since 2.2.0
setMethod("show", "StreamingQuery",
function(object) {
name <- callJMethod(object@ssq, "name")
if (!is.null(name)) {
cat(paste0("StreamingQuery '", name, "'\n"))
} else {
cat("StreamingQuery", "\n")
}
})
#' queryName
#'
#' Returns the user-specified name of the query. This is specified in
#' \code{write.stream(df, queryName = "query")}. This name, if set, must be unique across all active
#' queries.
#'
#' @param x a StreamingQuery.
#' @return The name of the query, or NULL if not specified.
#' @rdname queryName
#' @name queryName
#' @aliases queryName,StreamingQuery-method
#' @family StreamingQuery methods
#' @seealso \link{write.stream}
#' @export
#' @examples
#' \dontrun{ queryName(sq) }
#' @note queryName(StreamingQuery) since 2.2.0
#' @note experimental
setMethod("queryName",
signature(x = "StreamingQuery"),
function(x) {
callJMethod(x@ssq, "name")
})
#' @rdname explain
#' @name explain
#' @aliases explain,StreamingQuery-method
#' @family StreamingQuery methods
#' @export
#' @examples
#' \dontrun{ explain(sq) }
#' @note explain(StreamingQuery) since 2.2.0
setMethod("explain",
signature(x = "StreamingQuery"),
function(x, extended = FALSE) {
cat(callJMethod(x@ssq, "explainInternal", extended), "\n")
})
#' lastProgress
#'
#' Prints the most recent progess update of this streaming query in JSON format.
#'
#' @param x a StreamingQuery.
#' @rdname lastProgress
#' @name lastProgress
#' @aliases lastProgress,StreamingQuery-method
#' @family StreamingQuery methods
#' @export
#' @examples
#' \dontrun{ lastProgress(sq) }
#' @note lastProgress(StreamingQuery) since 2.2.0
#' @note experimental
setMethod("lastProgress",
signature(x = "StreamingQuery"),
function(x) {
p <- callJMethod(x@ssq, "lastProgress")
if (is.null(p)) {
cat("Streaming query has no progress")
} else {
cat(callJMethod(p, "toString"), "\n")
}
})
#' status
#'
#' Prints the current status of the query in JSON format.
#'
#' @param x a StreamingQuery.
#' @rdname status
#' @name status
#' @aliases status,StreamingQuery-method
#' @family StreamingQuery methods
#' @export
#' @examples
#' \dontrun{ status(sq) }
#' @note status(StreamingQuery) since 2.2.0
#' @note experimental
setMethod("status",
signature(x = "StreamingQuery"),
function(x) {
cat(callJMethod(callJMethod(x@ssq, "status"), "toString"), "\n")
})
#' isActive
#'
#' Returns TRUE if this query is actively running.
#'
#' @param x a StreamingQuery.
#' @return TRUE if query is actively running, FALSE if stopped.
#' @rdname isActive
#' @name isActive
#' @aliases isActive,StreamingQuery-method
#' @family StreamingQuery methods
#' @export
#' @examples
#' \dontrun{ isActive(sq) }
#' @note isActive(StreamingQuery) since 2.2.0
#' @note experimental
setMethod("isActive",
signature(x = "StreamingQuery"),
function(x) {
callJMethod(x@ssq, "isActive")
})
#' awaitTermination
#'
#' Waits for the termination of the query, either by \code{stopQuery} or by an error.
#'
#' If the query has terminated, then all subsequent calls to this method will return TRUE
#' immediately.
#'
#' @param x a StreamingQuery.
#' @param timeout time to wait in milliseconds
#' @return TRUE if query has terminated within the timeout period.
#' @rdname awaitTermination
#' @name awaitTermination
#' @aliases awaitTermination,StreamingQuery-method
#' @family StreamingQuery methods
#' @export
#' @examples
#' \dontrun{ awaitTermination(sq, 10000) }
#' @note awaitTermination(StreamingQuery) since 2.2.0
#' @note experimental
setMethod("awaitTermination",
signature(x = "StreamingQuery"),
function(x, timeout) {
handledCallJMethod(x@ssq, "awaitTermination", as.integer(timeout))
})
#' stopQuery
#'
#' Stops the execution of this query if it is running. This method blocks until the execution is
#' stopped.
#'
#' @param x a StreamingQuery.
#' @rdname stopQuery
#' @name stopQuery
#' @aliases stopQuery,StreamingQuery-method
#' @family StreamingQuery methods
#' @export
#' @examples
#' \dontrun{ stopQuery(sq) }
#' @note stopQuery(StreamingQuery) since 2.2.0
#' @note experimental
setMethod("stopQuery",
signature(x = "StreamingQuery"),
function(x) {
invisible(callJMethod(x@ssq, "stop"))
})

View file

@ -823,7 +823,16 @@ captureJVMException <- function(e, method) {
stacktrace <- rawmsg
}
if (any(grep("java.lang.IllegalArgumentException: ", stacktrace))) {
# StreamingQueryException could wrap an IllegalArgumentException, so look for that first
if (any(grep("org.apache.spark.sql.streaming.StreamingQueryException: ", stacktrace))) {
msg <- strsplit(stacktrace, "org.apache.spark.sql.streaming.StreamingQueryException: ",
fixed = TRUE)[[1]]
# Extract "Error in ..." message.
rmsg <- msg[1]
# Extract the first message of JVM exception.
first <- strsplit(msg[2], "\r?\n\tat")[[1]][1]
stop(paste0(rmsg, "streaming query error - ", first), call. = FALSE)
} else if (any(grep("java.lang.IllegalArgumentException: ", stacktrace))) {
msg <- strsplit(stacktrace, "java.lang.IllegalArgumentException: ", fixed = TRUE)[[1]]
# Extract "Error in ..." message.
rmsg <- msg[1]

View file

@ -0,0 +1,150 @@
#
# Licensed to the Apache Software Foundation (ASF) under one or more
# contributor license agreements. See the NOTICE file distributed with
# this work for additional information regarding copyright ownership.
# The ASF licenses this file to You under the Apache License, Version 2.0
# (the "License"); you may not use this file except in compliance with
# the License. You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.
#
library(testthat)
context("Structured Streaming")
# Tests for Structured Streaming functions in SparkR
sparkSession <- sparkR.session(enableHiveSupport = FALSE)
jsonSubDir <- file.path("sparkr-test", "json", "")
if (.Platform$OS.type == "windows") {
# file.path removes the empty separator on Windows, adds it back
jsonSubDir <- paste0(jsonSubDir, .Platform$file.sep)
}
jsonDir <- file.path(tempdir(), jsonSubDir)
dir.create(jsonDir, recursive = TRUE)
mockLines <- c("{\"name\":\"Michael\"}",
"{\"name\":\"Andy\", \"age\":30}",
"{\"name\":\"Justin\", \"age\":19}")
jsonPath <- tempfile(pattern = jsonSubDir, fileext = ".tmp")
writeLines(mockLines, jsonPath)
mockLinesNa <- c("{\"name\":\"Bob\",\"age\":16,\"height\":176.5}",
"{\"name\":\"Alice\",\"age\":null,\"height\":164.3}",
"{\"name\":\"David\",\"age\":60,\"height\":null}")
jsonPathNa <- tempfile(pattern = jsonSubDir, fileext = ".tmp")
schema <- structType(structField("name", "string"),
structField("age", "integer"),
structField("count", "double"))
test_that("read.stream, write.stream, awaitTermination, stopQuery", {
df <- read.stream("json", path = jsonDir, schema = schema, maxFilesPerTrigger = 1)
expect_true(isStreaming(df))
counts <- count(group_by(df, "name"))
q <- write.stream(counts, "memory", queryName = "people", outputMode = "complete")
expect_false(awaitTermination(q, 5 * 1000))
expect_equal(head(sql("SELECT count(*) FROM people"))[[1]], 3)
writeLines(mockLinesNa, jsonPathNa)
awaitTermination(q, 5 * 1000)
expect_equal(head(sql("SELECT count(*) FROM people"))[[1]], 6)
stopQuery(q)
expect_true(awaitTermination(q, 1))
})
test_that("print from explain, lastProgress, status, isActive", {
df <- read.stream("json", path = jsonDir, schema = schema)
expect_true(isStreaming(df))
counts <- count(group_by(df, "name"))
q <- write.stream(counts, "memory", queryName = "people2", outputMode = "complete")
awaitTermination(q, 5 * 1000)
expect_equal(capture.output(explain(q))[[1]], "== Physical Plan ==")
expect_true(any(grepl("\"description\" : \"MemorySink\"", capture.output(lastProgress(q)))))
expect_true(any(grepl("\"isTriggerActive\" : ", capture.output(status(q)))))
expect_equal(queryName(q), "people2")
expect_true(isActive(q))
stopQuery(q)
})
test_that("Stream other format", {
parquetPath <- tempfile(pattern = "sparkr-test", fileext = ".parquet")
df <- read.df(jsonPath, "json", schema)
write.df(df, parquetPath, "parquet", "overwrite")
df <- read.stream(path = parquetPath, schema = schema)
expect_true(isStreaming(df))
counts <- count(group_by(df, "name"))
q <- write.stream(counts, "memory", queryName = "people3", outputMode = "complete")
expect_false(awaitTermination(q, 5 * 1000))
expect_equal(head(sql("SELECT count(*) FROM people3"))[[1]], 3)
expect_equal(queryName(q), "people3")
expect_true(any(grepl("\"description\" : \"FileStreamSource[[:print:]]+parquet",
capture.output(lastProgress(q)))))
expect_true(isActive(q))
stopQuery(q)
expect_true(awaitTermination(q, 1))
expect_false(isActive(q))
unlink(parquetPath)
})
test_that("Non-streaming DataFrame", {
c <- as.DataFrame(cars)
expect_false(isStreaming(c))
expect_error(write.stream(c, "memory", queryName = "people", outputMode = "complete"),
paste0(".*(writeStream : analysis error - 'writeStream' can be called only on ",
"streaming Dataset/DataFrame).*"))
})
test_that("Unsupported operation", {
# memory sink without aggregation
df <- read.stream("json", path = jsonDir, schema = schema, maxFilesPerTrigger = 1)
expect_error(write.stream(df, "memory", queryName = "people", outputMode = "complete"),
paste0(".*(start : analysis error - Complete output mode not supported when there ",
"are no streaming aggregations on streaming DataFrames/Datasets).*"))
})
test_that("Terminated by error", {
df <- read.stream("json", path = jsonDir, schema = schema, maxFilesPerTrigger = -1)
counts <- count(group_by(df, "name"))
# This would not fail before returning with a StreamingQuery,
# but could dump error log at just about the same time
expect_error(q <- write.stream(counts, "memory", queryName = "people4", outputMode = "complete"),
NA)
expect_error(awaitTermination(q, 1),
paste0(".*(awaitTermination : streaming query error - Invalid value '-1' for option",
" 'maxFilesPerTrigger', must be a positive integer).*"))
expect_true(any(grepl("\"message\" : \"Terminated with exception: Invalid value",
capture.output(status(q)))))
expect_true(any(grepl("Streaming query has no progress", capture.output(lastProgress(q)))))
expect_equal(queryName(q), "people4")
expect_false(isActive(q))
stopQuery(q)
})
unlink(jsonPath)
unlink(jsonPathNa)
sparkR.session.stop()