spark-instrumented-optimizer/R/pkg/inst/tests/testthat/test_streaming.R
Felix Cheung 57b64703e6 [SPARK-20571][SPARKR][SS] Flaky Structured Streaming tests
## What changes were proposed in this pull request?

Make tests more reliable by having it till processed.
Increasing timeout value might help but ultimately the flakiness from processing delay when Jenkins is hard to account for. This isn't an actual public API supported

## How was this patch tested?
unit tests

Author: Felix Cheung <felixcheung_m@hotmail.com>

Closes #17857 from felixcheung/rsstestrelia.
2017-05-04 01:54:59 -07:00

168 lines
6 KiB
R

#
# Licensed to the Apache Software Foundation (ASF) under one or more
# contributor license agreements. See the NOTICE file distributed with
# this work for additional information regarding copyright ownership.
# The ASF licenses this file to You under the Apache License, Version 2.0
# (the "License"); you may not use this file except in compliance with
# the License. You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.
#
library(testthat)
context("Structured Streaming")
# Tests for Structured Streaming functions in SparkR
sparkSession <- sparkR.session(enableHiveSupport = FALSE)
jsonSubDir <- file.path("sparkr-test", "json", "")
if (.Platform$OS.type == "windows") {
# file.path removes the empty separator on Windows, adds it back
jsonSubDir <- paste0(jsonSubDir, .Platform$file.sep)
}
jsonDir <- file.path(tempdir(), jsonSubDir)
dir.create(jsonDir, recursive = TRUE)
mockLines <- c("{\"name\":\"Michael\"}",
"{\"name\":\"Andy\", \"age\":30}",
"{\"name\":\"Justin\", \"age\":19}")
jsonPath <- tempfile(pattern = jsonSubDir, fileext = ".tmp")
writeLines(mockLines, jsonPath)
mockLinesNa <- c("{\"name\":\"Bob\",\"age\":16,\"height\":176.5}",
"{\"name\":\"Alice\",\"age\":null,\"height\":164.3}",
"{\"name\":\"David\",\"age\":60,\"height\":null}")
jsonPathNa <- tempfile(pattern = jsonSubDir, fileext = ".tmp")
schema <- structType(structField("name", "string"),
structField("age", "integer"),
structField("count", "double"))
test_that("read.stream, write.stream, awaitTermination, stopQuery", {
skip_on_cran()
df <- read.stream("json", path = jsonDir, schema = schema, maxFilesPerTrigger = 1)
expect_true(isStreaming(df))
counts <- count(group_by(df, "name"))
q <- write.stream(counts, "memory", queryName = "people", outputMode = "complete")
expect_false(awaitTermination(q, 5 * 1000))
callJMethod(q@ssq, "processAllAvailable")
expect_equal(head(sql("SELECT count(*) FROM people"))[[1]], 3)
writeLines(mockLinesNa, jsonPathNa)
awaitTermination(q, 5 * 1000)
callJMethod(q@ssq, "processAllAvailable")
expect_equal(head(sql("SELECT count(*) FROM people"))[[1]], 6)
stopQuery(q)
expect_true(awaitTermination(q, 1))
expect_error(awaitTermination(q), NA)
})
test_that("print from explain, lastProgress, status, isActive", {
skip_on_cran()
df <- read.stream("json", path = jsonDir, schema = schema)
expect_true(isStreaming(df))
counts <- count(group_by(df, "name"))
q <- write.stream(counts, "memory", queryName = "people2", outputMode = "complete")
awaitTermination(q, 5 * 1000)
callJMethod(q@ssq, "processAllAvailable")
expect_equal(capture.output(explain(q))[[1]], "== Physical Plan ==")
expect_true(any(grepl("\"description\" : \"MemorySink\"", capture.output(lastProgress(q)))))
expect_true(any(grepl("\"isTriggerActive\" : ", capture.output(status(q)))))
expect_equal(queryName(q), "people2")
expect_true(isActive(q))
stopQuery(q)
})
test_that("Stream other format", {
skip_on_cran()
parquetPath <- tempfile(pattern = "sparkr-test", fileext = ".parquet")
df <- read.df(jsonPath, "json", schema)
write.df(df, parquetPath, "parquet", "overwrite")
df <- read.stream(path = parquetPath, schema = schema)
expect_true(isStreaming(df))
counts <- count(group_by(df, "name"))
q <- write.stream(counts, "memory", queryName = "people3", outputMode = "complete")
expect_false(awaitTermination(q, 5 * 1000))
callJMethod(q@ssq, "processAllAvailable")
expect_equal(head(sql("SELECT count(*) FROM people3"))[[1]], 3)
expect_equal(queryName(q), "people3")
expect_true(any(grepl("\"description\" : \"FileStreamSource[[:print:]]+parquet",
capture.output(lastProgress(q)))))
expect_true(isActive(q))
stopQuery(q)
expect_true(awaitTermination(q, 1))
expect_false(isActive(q))
unlink(parquetPath)
})
test_that("Non-streaming DataFrame", {
skip_on_cran()
c <- as.DataFrame(cars)
expect_false(isStreaming(c))
expect_error(write.stream(c, "memory", queryName = "people", outputMode = "complete"),
paste0(".*(writeStream : analysis error - 'writeStream' can be called only on ",
"streaming Dataset/DataFrame).*"))
})
test_that("Unsupported operation", {
skip_on_cran()
# memory sink without aggregation
df <- read.stream("json", path = jsonDir, schema = schema, maxFilesPerTrigger = 1)
expect_error(write.stream(df, "memory", queryName = "people", outputMode = "complete"),
paste0(".*(start : analysis error - Complete output mode not supported when there ",
"are no streaming aggregations on streaming DataFrames/Datasets).*"))
})
test_that("Terminated by error", {
skip_on_cran()
df <- read.stream("json", path = jsonDir, schema = schema, maxFilesPerTrigger = -1)
counts <- count(group_by(df, "name"))
# This would not fail before returning with a StreamingQuery,
# but could dump error log at just about the same time
expect_error(q <- write.stream(counts, "memory", queryName = "people4", outputMode = "complete"),
NA)
expect_error(awaitTermination(q, 5 * 1000),
paste0(".*(awaitTermination : streaming query error - Invalid value '-1' for option",
" 'maxFilesPerTrigger', must be a positive integer).*"))
expect_true(any(grepl("\"message\" : \"Terminated with exception: Invalid value",
capture.output(status(q)))))
expect_true(any(grepl("Streaming query has no progress", capture.output(lastProgress(q)))))
expect_equal(queryName(q), "people4")
expect_false(isActive(q))
stopQuery(q)
})
unlink(jsonPath)
unlink(jsonPathNa)
sparkR.session.stop()