[SPARK-22956][SS] Bug fix for 2 streams union failover scenario

## What changes were proposed in this pull request?

This problem reported by yanlin-Lynn ivoson and LiangchangZ. Thanks!

When we union 2 streams from kafka or other sources, while one of them have no continues data coming and in the same time task restart, this will cause an `IllegalStateException`. This mainly cause because the code in [MicroBatchExecution](https://github.com/apache/spark/blob/master/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/MicroBatchExecution.scala#L190) , while one stream has no continues data, its comittedOffset same with availableOffset during `populateStartOffsets`, and `currentPartitionOffsets` not properly handled in KafkaSource. Also, maybe we should also consider this scenario in other Source.

## How was this patch tested?

Add a UT in KafkaSourceSuite.scala

Author: Yuanjian Li <xyliyuanjian@gmail.com>

Closes #20150 from xuanyuanking/SPARK-22956.
This commit is contained in:
Yuanjian Li 2018-01-15 22:01:14 -08:00 committed by Shixiong Zhu
parent c7572b79da
commit 07ae39d0ec
4 changed files with 81 additions and 9 deletions

View file

@ -223,6 +223,14 @@ private[kafka010] class KafkaSource(
logInfo(s"GetBatch called with start = $start, end = $end")
val untilPartitionOffsets = KafkaSourceOffset.getPartitionOffsets(end)
// On recovery, getBatch will get called before getOffset
if (currentPartitionOffsets.isEmpty) {
currentPartitionOffsets = Some(untilPartitionOffsets)
}
if (start.isDefined && start.get == end) {
return sqlContext.internalCreateDataFrame(
sqlContext.sparkContext.emptyRDD, schema, isStreaming = true)
}
val fromPartitionOffsets = start match {
case Some(prevBatchEndOffset) =>
KafkaSourceOffset.getPartitionOffsets(prevBatchEndOffset)
@ -305,11 +313,6 @@ private[kafka010] class KafkaSource(
logInfo("GetBatch generating RDD of offset range: " +
offsetRanges.sortBy(_.topicPartition.toString).mkString(", "))
// On recovery, getBatch will get called before getOffset
if (currentPartitionOffsets.isEmpty) {
currentPartitionOffsets = Some(untilPartitionOffsets)
}
sqlContext.internalCreateDataFrame(rdd, schema, isStreaming = true)
}

View file

@ -318,6 +318,71 @@ class KafkaSourceSuite extends KafkaSourceTest {
)
}
test("SPARK-22956: currentPartitionOffsets should be set when no new data comes in") {
def getSpecificDF(range: Range.Inclusive): org.apache.spark.sql.Dataset[Int] = {
val topic = newTopic()
testUtils.createTopic(topic, partitions = 1)
testUtils.sendMessages(topic, range.map(_.toString).toArray, Some(0))
val reader = spark
.readStream
.format("kafka")
.option("kafka.bootstrap.servers", testUtils.brokerAddress)
.option("kafka.metadata.max.age.ms", "1")
.option("maxOffsetsPerTrigger", 5)
.option("subscribe", topic)
.option("startingOffsets", "earliest")
reader.load()
.selectExpr("CAST(value AS STRING)")
.as[String]
.map(k => k.toInt)
}
val df1 = getSpecificDF(0 to 9)
val df2 = getSpecificDF(100 to 199)
val kafka = df1.union(df2)
val clock = new StreamManualClock
val waitUntilBatchProcessed = AssertOnQuery { q =>
eventually(Timeout(streamingTimeout)) {
if (!q.exception.isDefined) {
assert(clock.isStreamWaitingAt(clock.getTimeMillis()))
}
}
if (q.exception.isDefined) {
throw q.exception.get
}
true
}
testStream(kafka)(
StartStream(ProcessingTime(100), clock),
waitUntilBatchProcessed,
// 5 from smaller topic, 5 from bigger one
CheckLastBatch((0 to 4) ++ (100 to 104): _*),
AdvanceManualClock(100),
waitUntilBatchProcessed,
// 5 from smaller topic, 5 from bigger one
CheckLastBatch((5 to 9) ++ (105 to 109): _*),
AdvanceManualClock(100),
waitUntilBatchProcessed,
// smaller topic empty, 5 from bigger one
CheckLastBatch(110 to 114: _*),
StopStream,
StartStream(ProcessingTime(100), clock),
waitUntilBatchProcessed,
// smallest now empty, 5 from bigger one
CheckLastBatch(115 to 119: _*),
AdvanceManualClock(100),
waitUntilBatchProcessed,
// smallest now empty, 5 from bigger one
CheckLastBatch(120 to 124: _*)
)
}
test("cannot stop Kafka stream") {
val topic = newTopic()
testUtils.createTopic(topic, partitions = 5)

View file

@ -208,10 +208,8 @@ class MicroBatchExecution(
* batch will be executed before getOffset is called again. */
availableOffsets.foreach {
case (source: Source, end: Offset) =>
if (committedOffsets.get(source).map(_ != end).getOrElse(true)) {
val start = committedOffsets.get(source)
source.getBatch(start, end)
}
case nonV1Tuple =>
// The V2 API does not have the same edge case requiring getBatch to be called
// here, so we do nothing here.

View file

@ -119,9 +119,15 @@ case class MemoryStream[A : Encoder](id: Int, sqlContext: SQLContext)
val newBlocks = synchronized {
val sliceStart = startOrdinal - lastOffsetCommitted.offset.toInt - 1
val sliceEnd = endOrdinal - lastOffsetCommitted.offset.toInt - 1
assert(sliceStart <= sliceEnd, s"sliceStart: $sliceStart sliceEnd: $sliceEnd")
batches.slice(sliceStart, sliceEnd)
}
if (newBlocks.isEmpty) {
return sqlContext.internalCreateDataFrame(
sqlContext.sparkContext.emptyRDD, schema, isStreaming = true)
}
logDebug(generateDebugString(newBlocks, startOrdinal, endOrdinal))
newBlocks