[SPARK-7777][Streaming] Handle the case when there is no block in a batch
In the old implementation, if a batch has no block, `areWALRecordHandlesPresent` will be `true` and it will return `WriteAheadLogBackedBlockRDD`.
This PR handles this case by returning `WriteAheadLogBackedBlockRDD` or `BlockRDD` according to the configuration.
Author: zsxwing <zsxwing@gmail.com>
Closes #6372 from zsxwing/SPARK-7777 and squashes the following commits:
788f895 [zsxwing] Handle the case when there is no block in a batch
(cherry picked from commit ad0badba14
)
Signed-off-by: Tathagata Das <tathagata.das1565@gmail.com>
This commit is contained in:
parent
c8eb76ba67
commit
ea9db50bc3
|
@ -73,27 +73,38 @@ abstract class ReceiverInputDStream[T: ClassTag](@transient ssc_ : StreamingCont
|
|||
val inputInfo = InputInfo(id, blockInfos.map(_.numRecords).sum)
|
||||
ssc.scheduler.inputInfoTracker.reportInfo(validTime, inputInfo)
|
||||
|
||||
// Are WAL record handles present with all the blocks
|
||||
val areWALRecordHandlesPresent = blockInfos.forall { _.walRecordHandleOption.nonEmpty }
|
||||
if (blockInfos.nonEmpty) {
|
||||
// Are WAL record handles present with all the blocks
|
||||
val areWALRecordHandlesPresent = blockInfos.forall { _.walRecordHandleOption.nonEmpty }
|
||||
|
||||
if (areWALRecordHandlesPresent) {
|
||||
// If all the blocks have WAL record handle, then create a WALBackedBlockRDD
|
||||
val isBlockIdValid = blockInfos.map { _.isBlockIdValid() }.toArray
|
||||
val walRecordHandles = blockInfos.map { _.walRecordHandleOption.get }.toArray
|
||||
new WriteAheadLogBackedBlockRDD[T](
|
||||
ssc.sparkContext, blockIds, walRecordHandles, isBlockIdValid)
|
||||
} else {
|
||||
// Else, create a BlockRDD. However, if there are some blocks with WAL info but not others
|
||||
// then that is unexpected and log a warning accordingly.
|
||||
if (blockInfos.find(_.walRecordHandleOption.nonEmpty).nonEmpty) {
|
||||
if (WriteAheadLogUtils.enableReceiverLog(ssc.conf)) {
|
||||
logError("Some blocks do not have Write Ahead Log information; " +
|
||||
"this is unexpected and data may not be recoverable after driver failures")
|
||||
} else {
|
||||
logWarning("Some blocks have Write Ahead Log information; this is unexpected")
|
||||
if (areWALRecordHandlesPresent) {
|
||||
// If all the blocks have WAL record handle, then create a WALBackedBlockRDD
|
||||
val isBlockIdValid = blockInfos.map { _.isBlockIdValid() }.toArray
|
||||
val walRecordHandles = blockInfos.map { _.walRecordHandleOption.get }.toArray
|
||||
new WriteAheadLogBackedBlockRDD[T](
|
||||
ssc.sparkContext, blockIds, walRecordHandles, isBlockIdValid)
|
||||
} else {
|
||||
// Else, create a BlockRDD. However, if there are some blocks with WAL info but not
|
||||
// others then that is unexpected and log a warning accordingly.
|
||||
if (blockInfos.find(_.walRecordHandleOption.nonEmpty).nonEmpty) {
|
||||
if (WriteAheadLogUtils.enableReceiverLog(ssc.conf)) {
|
||||
logError("Some blocks do not have Write Ahead Log information; " +
|
||||
"this is unexpected and data may not be recoverable after driver failures")
|
||||
} else {
|
||||
logWarning("Some blocks have Write Ahead Log information; this is unexpected")
|
||||
}
|
||||
}
|
||||
new BlockRDD[T](ssc.sc, blockIds)
|
||||
}
|
||||
} else {
|
||||
// If no block is ready now, creating WriteAheadLogBackedBlockRDD or BlockRDD
|
||||
// according to the configuration
|
||||
if (WriteAheadLogUtils.enableReceiverLog(ssc.conf)) {
|
||||
new WriteAheadLogBackedBlockRDD[T](
|
||||
ssc.sparkContext, Array.empty, Array.empty, Array.empty)
|
||||
} else {
|
||||
new BlockRDD[T](ssc.sc, Array.empty)
|
||||
}
|
||||
new BlockRDD[T](ssc.sc, blockIds)
|
||||
}
|
||||
}
|
||||
}
|
||||
|
|
|
@ -39,6 +39,7 @@ import org.apache.spark.storage.StorageLevel
|
|||
import org.apache.spark.streaming.scheduler.{StreamingListenerBatchCompleted, StreamingListener}
|
||||
import org.apache.spark.util.{ManualClock, Utils}
|
||||
import org.apache.spark.streaming.dstream.{InputDStream, ReceiverInputDStream}
|
||||
import org.apache.spark.streaming.rdd.WriteAheadLogBackedBlockRDD
|
||||
import org.apache.spark.streaming.receiver.Receiver
|
||||
|
||||
class InputStreamsSuite extends TestSuiteBase with BeforeAndAfter {
|
||||
|
@ -105,6 +106,36 @@ class InputStreamsSuite extends TestSuiteBase with BeforeAndAfter {
|
|||
}
|
||||
}
|
||||
|
||||
test("socket input stream - no block in a batch") {
|
||||
withTestServer(new TestServer()) { testServer =>
|
||||
testServer.start()
|
||||
|
||||
withStreamingContext(new StreamingContext(conf, batchDuration)) { ssc =>
|
||||
ssc.addStreamingListener(ssc.progressListener)
|
||||
|
||||
val batchCounter = new BatchCounter(ssc)
|
||||
val networkStream = ssc.socketTextStream(
|
||||
"localhost", testServer.port, StorageLevel.MEMORY_AND_DISK)
|
||||
val outputBuffer = new ArrayBuffer[Seq[String]] with SynchronizedBuffer[Seq[String]]
|
||||
val outputStream = new TestOutputStream(networkStream, outputBuffer)
|
||||
outputStream.register()
|
||||
ssc.start()
|
||||
|
||||
val clock = ssc.scheduler.clock.asInstanceOf[ManualClock]
|
||||
clock.advance(batchDuration.milliseconds)
|
||||
|
||||
// Make sure the first batch is finished
|
||||
if (!batchCounter.waitUntilBatchesCompleted(1, 30000)) {
|
||||
fail("Timeout: cannot finish all batches in 30 seconds")
|
||||
}
|
||||
|
||||
networkStream.generatedRDDs.foreach { case (_, rdd) =>
|
||||
assert(!rdd.isInstanceOf[WriteAheadLogBackedBlockRDD[_]])
|
||||
}
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
test("binary records stream") {
|
||||
val testDir: File = null
|
||||
try {
|
||||
|
|
Loading…
Reference in a new issue