[SPARK-23688][SS] Refactor tests away from rate source

## What changes were proposed in this pull request?

Replace rate source with memory source in continuous mode test suite. Keep using "rate" source if the tests intend to put data periodically in background, or need to put short source name to load, since "memory" doesn't have provider for source.

## How was this patch tested?

Ran relevant test suite from IDE.

Author: Jungtaek Lim <kabhwan@gmail.com>

Closes #21152 from HeartSaVioR/SPARK-23688.
This commit is contained in:
Jungtaek Lim 2018-04-28 09:55:56 +08:00 committed by jerryshao
parent 8614edd445
commit 1fb46f30f8

View file

@ -75,73 +75,50 @@ class ContinuousSuite extends ContinuousSuiteBase {
}
test("map") {
val df = spark.readStream
.format("rate")
.option("numPartitions", "5")
.option("rowsPerSecond", "5")
.load()
.select('value)
.map(r => r.getLong(0) * 2)
val input = ContinuousMemoryStream[Int]
val df = input.toDF().map(_.getInt(0) * 2)
testStream(df, useV2Sink = true)(
StartStream(longContinuousTrigger),
AwaitEpoch(0),
Execute(waitForRateSourceTriggers(_, 2)),
IncrementEpoch(),
Execute(waitForRateSourceTriggers(_, 4)),
IncrementEpoch(),
CheckAnswerRowsContains(scala.Range(0, 40, 2).map(Row(_))))
testStream(df)(
AddData(input, 0, 1),
CheckAnswer(0, 2),
StopStream,
AddData(input, 2, 3, 4),
StartStream(),
CheckAnswer(0, 2, 4, 6, 8))
}
test("flatMap") {
val df = spark.readStream
.format("rate")
.option("numPartitions", "5")
.option("rowsPerSecond", "5")
.load()
.select('value)
.flatMap(r => Seq(0, r.getLong(0), r.getLong(0) * 2))
val input = ContinuousMemoryStream[Int]
val df = input.toDF().flatMap(r => Seq(0, r.getInt(0), r.getInt(0) * 2))
testStream(df, useV2Sink = true)(
StartStream(longContinuousTrigger),
AwaitEpoch(0),
Execute(waitForRateSourceTriggers(_, 2)),
IncrementEpoch(),
Execute(waitForRateSourceTriggers(_, 4)),
IncrementEpoch(),
CheckAnswerRowsContains(scala.Range(0, 20).flatMap(n => Seq(0, n, n * 2)).map(Row(_))))
testStream(df)(
AddData(input, 0, 1),
CheckAnswer((0 to 1).flatMap(n => Seq(0, n, n * 2)): _*),
StopStream,
AddData(input, 2, 3, 4),
StartStream(),
CheckAnswer((0 to 4).flatMap(n => Seq(0, n, n * 2)): _*))
}
test("filter") {
val df = spark.readStream
.format("rate")
.option("numPartitions", "5")
.option("rowsPerSecond", "5")
.load()
.select('value)
.where('value > 5)
val input = ContinuousMemoryStream[Int]
val df = input.toDF().where('value > 2)
testStream(df, useV2Sink = true)(
StartStream(longContinuousTrigger),
AwaitEpoch(0),
Execute(waitForRateSourceTriggers(_, 2)),
IncrementEpoch(),
Execute(waitForRateSourceTriggers(_, 4)),
IncrementEpoch(),
CheckAnswerRowsContains(scala.Range(6, 20).map(Row(_))))
testStream(df)(
AddData(input, 0, 1),
CheckAnswer(),
StopStream,
AddData(input, 2, 3, 4),
StartStream(),
CheckAnswer(3, 4))
}
test("deduplicate") {
val df = spark.readStream
.format("rate")
.option("numPartitions", "5")
.option("rowsPerSecond", "5")
.load()
.select('value)
.dropDuplicates()
val input = ContinuousMemoryStream[Int]
val df = input.toDF().dropDuplicates()
val except = intercept[AnalysisException] {
testStream(df, useV2Sink = true)(StartStream(longContinuousTrigger))
testStream(df)(StartStream())
}
assert(except.message.contains(
@ -149,15 +126,11 @@ class ContinuousSuite extends ContinuousSuiteBase {
}
test("timestamp") {
val df = spark.readStream
.format("rate")
.option("numPartitions", "5")
.option("rowsPerSecond", "5")
.load()
.select(current_timestamp())
val input = ContinuousMemoryStream[Int]
val df = input.toDF().select(current_timestamp())
val except = intercept[AnalysisException] {
testStream(df, useV2Sink = true)(StartStream(longContinuousTrigger))
testStream(df)(StartStream())
}
assert(except.message.contains(
@ -165,58 +138,43 @@ class ContinuousSuite extends ContinuousSuiteBase {
}
test("subquery alias") {
val df = spark.readStream
.format("rate")
.option("numPartitions", "5")
.option("rowsPerSecond", "5")
.load()
.createOrReplaceTempView("rate")
val test = spark.sql("select value from rate where value > 5")
val input = ContinuousMemoryStream[Int]
input.toDF().createOrReplaceTempView("memory")
val test = spark.sql("select value from memory where value > 2")
testStream(test, useV2Sink = true)(
StartStream(longContinuousTrigger),
AwaitEpoch(0),
Execute(waitForRateSourceTriggers(_, 2)),
IncrementEpoch(),
Execute(waitForRateSourceTriggers(_, 4)),
IncrementEpoch(),
CheckAnswerRowsContains(scala.Range(6, 20).map(Row(_))))
testStream(test)(
AddData(input, 0, 1),
CheckAnswer(),
StopStream,
AddData(input, 2, 3, 4),
StartStream(),
CheckAnswer(3, 4))
}
test("repeatedly restart") {
val df = spark.readStream
.format("rate")
.option("numPartitions", "5")
.option("rowsPerSecond", "5")
.load()
.select('value)
val input = ContinuousMemoryStream[Int]
val df = input.toDF()
testStream(df, useV2Sink = true)(
StartStream(longContinuousTrigger),
AwaitEpoch(0),
Execute(waitForRateSourceTriggers(_, 2)),
IncrementEpoch(),
CheckAnswerRowsContains(scala.Range(0, 10).map(Row(_))),
testStream(df)(
StartStream(),
AddData(input, 0, 1),
CheckAnswer(0, 1),
StopStream,
StartStream(longContinuousTrigger),
StartStream(),
StopStream,
StartStream(longContinuousTrigger),
StartStream(),
StopStream,
StartStream(longContinuousTrigger),
AwaitEpoch(2),
Execute(waitForRateSourceTriggers(_, 2)),
IncrementEpoch(),
CheckAnswerRowsContains(scala.Range(0, 20).map(Row(_))),
StartStream(),
StopStream,
AddData(input, 2, 3),
StartStream(),
CheckAnswer(0, 1, 2, 3),
StopStream)
}
test("task failure kills the query") {
val df = spark.readStream
.format("rate")
.option("numPartitions", "5")
.option("rowsPerSecond", "5")
.load()
.select('value)
val input = ContinuousMemoryStream[Int]
val df = input.toDF()
// Get an arbitrary task from this query to kill. It doesn't matter which one.
var taskId: Long = -1
@ -227,9 +185,9 @@ class ContinuousSuite extends ContinuousSuiteBase {
}
spark.sparkContext.addSparkListener(listener)
try {
testStream(df, useV2Sink = true)(
testStream(df)(
StartStream(Trigger.Continuous(100)),
Execute(waitForRateSourceTriggers(_, 2)),
AddData(input, 0, 1, 2, 3),
Execute { _ =>
// Wait until a task is started, then kill its first attempt.
eventually(timeout(streamingTimeout)) {
@ -252,6 +210,7 @@ class ContinuousSuite extends ContinuousSuiteBase {
.option("rowsPerSecond", "2")
.load()
.select('value)
val query = df.writeStream
.format("memory")
.queryName("noharness")