[SPARK-22933][SPARKR] R Structured Streaming API for withWatermark, trigger, partitionBy
## What changes were proposed in this pull request? R Structured Streaming API for withWatermark, trigger, partitionBy ## How was this patch tested? manual, unit tests Author: Felix Cheung <felixcheung_m@hotmail.com> Closes #20129 from felixcheung/rwater.
This commit is contained in:
parent
7d045c5f00
commit
df95a908ba
|
@ -179,6 +179,7 @@ exportMethods("arrange",
|
|||
"with",
|
||||
"withColumn",
|
||||
"withColumnRenamed",
|
||||
"withWatermark",
|
||||
"write.df",
|
||||
"write.jdbc",
|
||||
"write.json",
|
||||
|
|
|
@ -3661,7 +3661,8 @@ setMethod("getNumPartitions",
|
|||
#' isStreaming
|
||||
#'
|
||||
#' Returns TRUE if this SparkDataFrame contains one or more sources that continuously return data
|
||||
#' as it arrives.
|
||||
#' as it arrives. A dataset that reads data from a streaming source must be executed as a
|
||||
#' \code{StreamingQuery} using \code{write.stream}.
|
||||
#'
|
||||
#' @param x A SparkDataFrame
|
||||
#' @return TRUE if this SparkDataFrame is from a streaming source
|
||||
|
@ -3707,7 +3708,17 @@ setMethod("isStreaming",
|
|||
#' @param df a streaming SparkDataFrame.
|
||||
#' @param source a name for external data source.
|
||||
#' @param outputMode one of 'append', 'complete', 'update'.
|
||||
#' @param ... additional argument(s) passed to the method.
|
||||
#' @param partitionBy a name or a list of names of columns to partition the output by on the file
|
||||
#' system. If specified, the output is laid out on the file system similar to Hive's
|
||||
#' partitioning scheme.
|
||||
#' @param trigger.processingTime a processing time interval as a string, e.g. '5 seconds',
|
||||
#' '1 minute'. This is a trigger that runs a query periodically based on the processing
|
||||
#' time. If value is '0 seconds', the query will run as fast as possible, this is the
|
||||
#' default. Only one trigger can be set.
|
||||
#' @param trigger.once a logical, must be set to \code{TRUE}. This is a trigger that processes only
|
||||
#' one batch of data in a streaming query then terminates the query. Only one trigger can be
|
||||
#' set.
|
||||
#' @param ... additional external data source specific named options.
|
||||
#'
|
||||
#' @family SparkDataFrame functions
|
||||
#' @seealso \link{read.stream}
|
||||
|
@ -3725,7 +3736,8 @@ setMethod("isStreaming",
|
|||
#' # console
|
||||
#' q <- write.stream(wordCounts, "console", outputMode = "complete")
|
||||
#' # text stream
|
||||
#' q <- write.stream(df, "text", path = "/home/user/out", checkpointLocation = "/home/user/cp")
|
||||
#' q <- write.stream(df, "text", path = "/home/user/out", checkpointLocation = "/home/user/cp"
|
||||
#' partitionBy = c("year", "month"), trigger.processingTime = "30 seconds")
|
||||
#' # memory stream
|
||||
#' q <- write.stream(wordCounts, "memory", queryName = "outs", outputMode = "complete")
|
||||
#' head(sql("SELECT * from outs"))
|
||||
|
@ -3737,7 +3749,8 @@ setMethod("isStreaming",
|
|||
#' @note experimental
|
||||
setMethod("write.stream",
|
||||
signature(df = "SparkDataFrame"),
|
||||
function(df, source = NULL, outputMode = NULL, ...) {
|
||||
function(df, source = NULL, outputMode = NULL, partitionBy = NULL,
|
||||
trigger.processingTime = NULL, trigger.once = NULL, ...) {
|
||||
if (!is.null(source) && !is.character(source)) {
|
||||
stop("source should be character, NULL or omitted. It is the data source specified ",
|
||||
"in 'spark.sql.sources.default' configuration by default.")
|
||||
|
@ -3748,12 +3761,43 @@ setMethod("write.stream",
|
|||
if (is.null(source)) {
|
||||
source <- getDefaultSqlSource()
|
||||
}
|
||||
cols <- NULL
|
||||
if (!is.null(partitionBy)) {
|
||||
if (!all(sapply(partitionBy, function(c) { is.character(c) }))) {
|
||||
stop("All partitionBy column names should be characters.")
|
||||
}
|
||||
cols <- as.list(partitionBy)
|
||||
}
|
||||
jtrigger <- NULL
|
||||
if (!is.null(trigger.processingTime) && !is.na(trigger.processingTime)) {
|
||||
if (!is.null(trigger.once)) {
|
||||
stop("Multiple triggers not allowed.")
|
||||
}
|
||||
interval <- as.character(trigger.processingTime)
|
||||
if (nchar(interval) == 0) {
|
||||
stop("Value for trigger.processingTime must be a non-empty string.")
|
||||
}
|
||||
jtrigger <- handledCallJStatic("org.apache.spark.sql.streaming.Trigger",
|
||||
"ProcessingTime",
|
||||
interval)
|
||||
} else if (!is.null(trigger.once) && !is.na(trigger.once)) {
|
||||
if (!is.logical(trigger.once) || !trigger.once) {
|
||||
stop("Value for trigger.once must be TRUE.")
|
||||
}
|
||||
jtrigger <- callJStatic("org.apache.spark.sql.streaming.Trigger", "Once")
|
||||
}
|
||||
options <- varargsToStrEnv(...)
|
||||
write <- handledCallJMethod(df@sdf, "writeStream")
|
||||
write <- callJMethod(write, "format", source)
|
||||
if (!is.null(outputMode)) {
|
||||
write <- callJMethod(write, "outputMode", outputMode)
|
||||
}
|
||||
if (!is.null(cols)) {
|
||||
write <- callJMethod(write, "partitionBy", cols)
|
||||
}
|
||||
if (!is.null(jtrigger)) {
|
||||
write <- callJMethod(write, "trigger", jtrigger)
|
||||
}
|
||||
write <- callJMethod(write, "options", options)
|
||||
ssq <- handledCallJMethod(write, "start")
|
||||
streamingQuery(ssq)
|
||||
|
@ -3967,3 +4011,47 @@ setMethod("broadcast",
|
|||
sdf <- callJStatic("org.apache.spark.sql.functions", "broadcast", x@sdf)
|
||||
dataFrame(sdf)
|
||||
})
|
||||
|
||||
#' withWatermark
|
||||
#'
|
||||
#' Defines an event time watermark for this streaming SparkDataFrame. A watermark tracks a point in
|
||||
#' time before which we assume no more late data is going to arrive.
|
||||
#'
|
||||
#' Spark will use this watermark for several purposes:
|
||||
#' \itemize{
|
||||
#' \item{-} To know when a given time window aggregation can be finalized and thus can be emitted
|
||||
#' when using output modes that do not allow updates.
|
||||
#' \item{-} To minimize the amount of state that we need to keep for on-going aggregations.
|
||||
#' }
|
||||
#' The current watermark is computed by looking at the \code{MAX(eventTime)} seen across
|
||||
#' all of the partitions in the query minus a user specified \code{delayThreshold}. Due to the cost
|
||||
#' of coordinating this value across partitions, the actual watermark used is only guaranteed
|
||||
#' to be at least \code{delayThreshold} behind the actual event time. In some cases we may still
|
||||
#' process records that arrive more than \code{delayThreshold} late.
|
||||
#'
|
||||
#' @param x a streaming SparkDataFrame
|
||||
#' @param eventTime a string specifying the name of the Column that contains the event time of the
|
||||
#' row.
|
||||
#' @param delayThreshold a string specifying the minimum delay to wait to data to arrive late,
|
||||
#' relative to the latest record that has been processed in the form of an
|
||||
#' interval (e.g. "1 minute" or "5 hours"). NOTE: This should not be negative.
|
||||
#' @return a SparkDataFrame.
|
||||
#' @aliases withWatermark,SparkDataFrame,character,character-method
|
||||
#' @family SparkDataFrame functions
|
||||
#' @rdname withWatermark
|
||||
#' @name withWatermark
|
||||
#' @export
|
||||
#' @examples
|
||||
#' \dontrun{
|
||||
#' sparkR.session()
|
||||
#' schema <- structType(structField("time", "timestamp"), structField("value", "double"))
|
||||
#' df <- read.stream("json", path = jsonDir, schema = schema, maxFilesPerTrigger = 1)
|
||||
#' df <- withWatermark(df, "time", "10 minutes")
|
||||
#' }
|
||||
#' @note withWatermark since 2.3.0
|
||||
setMethod("withWatermark",
|
||||
signature(x = "SparkDataFrame", eventTime = "character", delayThreshold = "character"),
|
||||
function(x, eventTime, delayThreshold) {
|
||||
sdf <- callJMethod(x@sdf, "withWatermark", eventTime, delayThreshold)
|
||||
dataFrame(sdf)
|
||||
})
|
||||
|
|
|
@ -727,7 +727,9 @@ read.jdbc <- function(url, tableName,
|
|||
#' @param schema The data schema defined in structType or a DDL-formatted string, this is
|
||||
#' required for file-based streaming data source
|
||||
#' @param ... additional external data source specific named options, for instance \code{path} for
|
||||
#' file-based streaming data source
|
||||
#' file-based streaming data source. \code{timeZone} to indicate a timezone to be used to
|
||||
#' parse timestamps in the JSON/CSV data sources or partition values; If it isn't set, it
|
||||
#' uses the default value, session local timezone.
|
||||
#' @return SparkDataFrame
|
||||
#' @rdname read.stream
|
||||
#' @name read.stream
|
||||
|
|
|
@ -799,6 +799,12 @@ setGeneric("withColumn", function(x, colName, col) { standardGeneric("withColumn
|
|||
setGeneric("withColumnRenamed",
|
||||
function(x, existingCol, newCol) { standardGeneric("withColumnRenamed") })
|
||||
|
||||
#' @rdname withWatermark
|
||||
#' @export
|
||||
setGeneric("withWatermark", function(x, eventTime, delayThreshold) {
|
||||
standardGeneric("withWatermark")
|
||||
})
|
||||
|
||||
#' @rdname write.df
|
||||
#' @export
|
||||
setGeneric("write.df", function(df, path = NULL, ...) { standardGeneric("write.df") })
|
||||
|
|
|
@ -172,6 +172,113 @@ test_that("Terminated by error", {
|
|||
stopQuery(q)
|
||||
})
|
||||
|
||||
test_that("PartitionBy", {
|
||||
parquetPath <- tempfile(pattern = "sparkr-test", fileext = ".parquet")
|
||||
checkpointPath <- tempfile(pattern = "sparkr-test", fileext = ".checkpoint")
|
||||
textPath <- tempfile(pattern = "sparkr-test", fileext = ".text")
|
||||
df <- read.df(jsonPath, "json", stringSchema)
|
||||
write.df(df, parquetPath, "parquet", "overwrite")
|
||||
|
||||
df <- read.stream(path = parquetPath, schema = stringSchema)
|
||||
|
||||
expect_error(write.stream(df, "json", path = textPath, checkpointLocation = "append",
|
||||
partitionBy = c(1, 2)),
|
||||
"All partitionBy column names should be characters")
|
||||
|
||||
q <- write.stream(df, "json", path = textPath, checkpointLocation = "append",
|
||||
partitionBy = "name")
|
||||
awaitTermination(q, 5 * 1000)
|
||||
callJMethod(q@ssq, "processAllAvailable")
|
||||
|
||||
dirs <- list.files(textPath)
|
||||
expect_equal(length(dirs[substring(dirs, 1, nchar("name=")) == "name="]), 3)
|
||||
|
||||
unlink(checkpointPath)
|
||||
unlink(textPath)
|
||||
unlink(parquetPath)
|
||||
})
|
||||
|
||||
test_that("Watermark", {
|
||||
parquetPath <- tempfile(pattern = "sparkr-test", fileext = ".parquet")
|
||||
schema <- structType(structField("value", "string"))
|
||||
t <- Sys.time()
|
||||
df <- as.DataFrame(lapply(list(t), as.character), schema)
|
||||
write.df(df, parquetPath, "parquet", "append")
|
||||
df <- read.stream(path = parquetPath, schema = "value STRING")
|
||||
df <- withColumn(df, "eventTime", cast(df$value, "timestamp"))
|
||||
df <- withWatermark(df, "eventTime", "10 seconds")
|
||||
counts <- count(group_by(df, "eventTime"))
|
||||
q <- write.stream(counts, "memory", queryName = "times", outputMode = "append")
|
||||
|
||||
# first events
|
||||
df <- as.DataFrame(lapply(list(t + 1, t, t + 2), as.character), schema)
|
||||
write.df(df, parquetPath, "parquet", "append")
|
||||
awaitTermination(q, 5 * 1000)
|
||||
callJMethod(q@ssq, "processAllAvailable")
|
||||
|
||||
# advance watermark to 15
|
||||
df <- as.DataFrame(lapply(list(t + 25), as.character), schema)
|
||||
write.df(df, parquetPath, "parquet", "append")
|
||||
awaitTermination(q, 5 * 1000)
|
||||
callJMethod(q@ssq, "processAllAvailable")
|
||||
|
||||
# old events, should be dropped
|
||||
df <- as.DataFrame(lapply(list(t), as.character), schema)
|
||||
write.df(df, parquetPath, "parquet", "append")
|
||||
awaitTermination(q, 5 * 1000)
|
||||
callJMethod(q@ssq, "processAllAvailable")
|
||||
|
||||
# evict events less than previous watermark
|
||||
df <- as.DataFrame(lapply(list(t + 25), as.character), schema)
|
||||
write.df(df, parquetPath, "parquet", "append")
|
||||
awaitTermination(q, 5 * 1000)
|
||||
callJMethod(q@ssq, "processAllAvailable")
|
||||
|
||||
times <- collect(sql("SELECT * FROM times"))
|
||||
# looks like write timing can affect the first bucket; but it should be t
|
||||
expect_equal(times[order(times$eventTime),][1, 2], 2)
|
||||
|
||||
stopQuery(q)
|
||||
unlink(parquetPath)
|
||||
})
|
||||
|
||||
test_that("Trigger", {
|
||||
parquetPath <- tempfile(pattern = "sparkr-test", fileext = ".parquet")
|
||||
schema <- structType(structField("value", "string"))
|
||||
df <- as.DataFrame(lapply(list(Sys.time()), as.character), schema)
|
||||
write.df(df, parquetPath, "parquet", "append")
|
||||
df <- read.stream(path = parquetPath, schema = "value STRING")
|
||||
|
||||
expect_error(write.stream(df, "memory", queryName = "times", outputMode = "append",
|
||||
trigger.processingTime = "", trigger.once = ""), "Multiple triggers not allowed.")
|
||||
|
||||
expect_error(write.stream(df, "memory", queryName = "times", outputMode = "append",
|
||||
trigger.processingTime = ""),
|
||||
"Value for trigger.processingTime must be a non-empty string.")
|
||||
|
||||
expect_error(write.stream(df, "memory", queryName = "times", outputMode = "append",
|
||||
trigger.processingTime = "invalid"), "illegal argument")
|
||||
|
||||
expect_error(write.stream(df, "memory", queryName = "times", outputMode = "append",
|
||||
trigger.once = ""), "Value for trigger.once must be TRUE.")
|
||||
|
||||
expect_error(write.stream(df, "memory", queryName = "times", outputMode = "append",
|
||||
trigger.once = FALSE), "Value for trigger.once must be TRUE.")
|
||||
|
||||
q <- write.stream(df, "memory", queryName = "times", outputMode = "append", trigger.once = TRUE)
|
||||
awaitTermination(q, 5 * 1000)
|
||||
callJMethod(q@ssq, "processAllAvailable")
|
||||
df <- as.DataFrame(lapply(list(Sys.time()), as.character), schema)
|
||||
write.df(df, parquetPath, "parquet", "append")
|
||||
awaitTermination(q, 5 * 1000)
|
||||
callJMethod(q@ssq, "processAllAvailable")
|
||||
|
||||
expect_equal(nrow(collect(sql("SELECT * FROM times"))), 1)
|
||||
|
||||
stopQuery(q)
|
||||
unlink(parquetPath)
|
||||
})
|
||||
|
||||
unlink(jsonPath)
|
||||
unlink(jsonPathNa)
|
||||
|
||||
|
|
|
@ -793,6 +793,10 @@ class DataStreamWriter(object):
|
|||
.. note:: Evolving.
|
||||
|
||||
:param processingTime: a processing time interval as a string, e.g. '5 seconds', '1 minute'.
|
||||
Set a trigger that runs a query periodically based on the processing
|
||||
time. Only one trigger can be set.
|
||||
:param once: if set to True, set a trigger that processes only one batch of data in a
|
||||
streaming query then terminates the query. Only one trigger can be set.
|
||||
|
||||
>>> # trigger the query for execution every 5 seconds
|
||||
>>> writer = sdf.writeStream.trigger(processingTime='5 seconds')
|
||||
|
|
|
@ -21,7 +21,7 @@ import org.apache.spark.annotation.{Experimental, InterfaceStability}
|
|||
import org.apache.spark.sql.streaming.Trigger
|
||||
|
||||
/**
|
||||
* A [[Trigger]] that process only one batch of data in a streaming query then terminates
|
||||
* A [[Trigger]] that processes only one batch of data in a streaming query then terminates
|
||||
* the query.
|
||||
*/
|
||||
@Experimental
|
||||
|
|
Loading…
Reference in a new issue