[SPARK-22956][SS] Bug fix for 2 streams union failover scenario
## What changes were proposed in this pull request? This problem reported by yanlin-Lynn ivoson and LiangchangZ. Thanks! When we union 2 streams from kafka or other sources, while one of them have no continues data coming and in the same time task restart, this will cause an `IllegalStateException`. This mainly cause because the code in [MicroBatchExecution](https://github.com/apache/spark/blob/master/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/MicroBatchExecution.scala#L190) , while one stream has no continues data, its comittedOffset same with availableOffset during `populateStartOffsets`, and `currentPartitionOffsets` not properly handled in KafkaSource. Also, maybe we should also consider this scenario in other Source. ## How was this patch tested? Add a UT in KafkaSourceSuite.scala Author: Yuanjian Li <xyliyuanjian@gmail.com> Closes #20150 from xuanyuanking/SPARK-22956.
This commit is contained in:
parent
c7572b79da
commit
07ae39d0ec
|
@ -223,6 +223,14 @@ private[kafka010] class KafkaSource(
|
|||
|
||||
logInfo(s"GetBatch called with start = $start, end = $end")
|
||||
val untilPartitionOffsets = KafkaSourceOffset.getPartitionOffsets(end)
|
||||
// On recovery, getBatch will get called before getOffset
|
||||
if (currentPartitionOffsets.isEmpty) {
|
||||
currentPartitionOffsets = Some(untilPartitionOffsets)
|
||||
}
|
||||
if (start.isDefined && start.get == end) {
|
||||
return sqlContext.internalCreateDataFrame(
|
||||
sqlContext.sparkContext.emptyRDD, schema, isStreaming = true)
|
||||
}
|
||||
val fromPartitionOffsets = start match {
|
||||
case Some(prevBatchEndOffset) =>
|
||||
KafkaSourceOffset.getPartitionOffsets(prevBatchEndOffset)
|
||||
|
@ -305,11 +313,6 @@ private[kafka010] class KafkaSource(
|
|||
logInfo("GetBatch generating RDD of offset range: " +
|
||||
offsetRanges.sortBy(_.topicPartition.toString).mkString(", "))
|
||||
|
||||
// On recovery, getBatch will get called before getOffset
|
||||
if (currentPartitionOffsets.isEmpty) {
|
||||
currentPartitionOffsets = Some(untilPartitionOffsets)
|
||||
}
|
||||
|
||||
sqlContext.internalCreateDataFrame(rdd, schema, isStreaming = true)
|
||||
}
|
||||
|
||||
|
|
|
@ -318,6 +318,71 @@ class KafkaSourceSuite extends KafkaSourceTest {
|
|||
)
|
||||
}
|
||||
|
||||
test("SPARK-22956: currentPartitionOffsets should be set when no new data comes in") {
|
||||
def getSpecificDF(range: Range.Inclusive): org.apache.spark.sql.Dataset[Int] = {
|
||||
val topic = newTopic()
|
||||
testUtils.createTopic(topic, partitions = 1)
|
||||
testUtils.sendMessages(topic, range.map(_.toString).toArray, Some(0))
|
||||
|
||||
val reader = spark
|
||||
.readStream
|
||||
.format("kafka")
|
||||
.option("kafka.bootstrap.servers", testUtils.brokerAddress)
|
||||
.option("kafka.metadata.max.age.ms", "1")
|
||||
.option("maxOffsetsPerTrigger", 5)
|
||||
.option("subscribe", topic)
|
||||
.option("startingOffsets", "earliest")
|
||||
|
||||
reader.load()
|
||||
.selectExpr("CAST(value AS STRING)")
|
||||
.as[String]
|
||||
.map(k => k.toInt)
|
||||
}
|
||||
|
||||
val df1 = getSpecificDF(0 to 9)
|
||||
val df2 = getSpecificDF(100 to 199)
|
||||
|
||||
val kafka = df1.union(df2)
|
||||
|
||||
val clock = new StreamManualClock
|
||||
|
||||
val waitUntilBatchProcessed = AssertOnQuery { q =>
|
||||
eventually(Timeout(streamingTimeout)) {
|
||||
if (!q.exception.isDefined) {
|
||||
assert(clock.isStreamWaitingAt(clock.getTimeMillis()))
|
||||
}
|
||||
}
|
||||
if (q.exception.isDefined) {
|
||||
throw q.exception.get
|
||||
}
|
||||
true
|
||||
}
|
||||
|
||||
testStream(kafka)(
|
||||
StartStream(ProcessingTime(100), clock),
|
||||
waitUntilBatchProcessed,
|
||||
// 5 from smaller topic, 5 from bigger one
|
||||
CheckLastBatch((0 to 4) ++ (100 to 104): _*),
|
||||
AdvanceManualClock(100),
|
||||
waitUntilBatchProcessed,
|
||||
// 5 from smaller topic, 5 from bigger one
|
||||
CheckLastBatch((5 to 9) ++ (105 to 109): _*),
|
||||
AdvanceManualClock(100),
|
||||
waitUntilBatchProcessed,
|
||||
// smaller topic empty, 5 from bigger one
|
||||
CheckLastBatch(110 to 114: _*),
|
||||
StopStream,
|
||||
StartStream(ProcessingTime(100), clock),
|
||||
waitUntilBatchProcessed,
|
||||
// smallest now empty, 5 from bigger one
|
||||
CheckLastBatch(115 to 119: _*),
|
||||
AdvanceManualClock(100),
|
||||
waitUntilBatchProcessed,
|
||||
// smallest now empty, 5 from bigger one
|
||||
CheckLastBatch(120 to 124: _*)
|
||||
)
|
||||
}
|
||||
|
||||
test("cannot stop Kafka stream") {
|
||||
val topic = newTopic()
|
||||
testUtils.createTopic(topic, partitions = 5)
|
||||
|
|
|
@ -208,10 +208,8 @@ class MicroBatchExecution(
|
|||
* batch will be executed before getOffset is called again. */
|
||||
availableOffsets.foreach {
|
||||
case (source: Source, end: Offset) =>
|
||||
if (committedOffsets.get(source).map(_ != end).getOrElse(true)) {
|
||||
val start = committedOffsets.get(source)
|
||||
source.getBatch(start, end)
|
||||
}
|
||||
val start = committedOffsets.get(source)
|
||||
source.getBatch(start, end)
|
||||
case nonV1Tuple =>
|
||||
// The V2 API does not have the same edge case requiring getBatch to be called
|
||||
// here, so we do nothing here.
|
||||
|
|
|
@ -119,9 +119,15 @@ case class MemoryStream[A : Encoder](id: Int, sqlContext: SQLContext)
|
|||
val newBlocks = synchronized {
|
||||
val sliceStart = startOrdinal - lastOffsetCommitted.offset.toInt - 1
|
||||
val sliceEnd = endOrdinal - lastOffsetCommitted.offset.toInt - 1
|
||||
assert(sliceStart <= sliceEnd, s"sliceStart: $sliceStart sliceEnd: $sliceEnd")
|
||||
batches.slice(sliceStart, sliceEnd)
|
||||
}
|
||||
|
||||
if (newBlocks.isEmpty) {
|
||||
return sqlContext.internalCreateDataFrame(
|
||||
sqlContext.sparkContext.emptyRDD, schema, isStreaming = true)
|
||||
}
|
||||
|
||||
logDebug(generateDebugString(newBlocks, startOrdinal, endOrdinal))
|
||||
|
||||
newBlocks
|
||||
|
|
Loading…
Reference in a new issue