[SPARK-7139] [STREAMING] Allow received block metadata to be saved to WAL and recovered on driver failure

- Enabled ReceivedBlockTracker WAL by default
- Stored block metadata in the WAL
- Optimized WALBackedBlockRDD by skipping block fetch when the block is known to not exist in Spark

Author: Tathagata Das <tathagata.das1565@gmail.com>

Closes #5732 from tdas/SPARK-7139 and squashes the following commits:

575476e [Tathagata Das] Added more tests to get 100% coverage of the WALBackedBlockRDD
19668ba [Tathagata Das] Merge remote-tracking branch 'apache-github/master' into SPARK-7139
685fab3 [Tathagata Das] Addressed comments in PR
637bc9c [Tathagata Das] Changed segment to handle
466212c [Tathagata Das] Merge remote-tracking branch 'apache-github/master' into SPARK-7139
5f67a59 [Tathagata Das] Fixed HdfsUtils to handle append in local file system
1bc5bc3 [Tathagata Das] Fixed bug on unexpected recovery
d06fa21 [Tathagata Das] Enabled ReceivedBlockTracker by default, stored block metadata and optimized block fetching in WALBackedBlockRDD

(cherry picked from commit 1854ac326a)
Signed-off-by: Tathagata Das <tathagata.das1565@gmail.com>
This commit is contained in:
Tathagata Das 2015-05-05 01:45:19 -07:00
parent 5160437c1b
commit ae27c0ef59
10 changed files with 282 additions and 154 deletions

View file

@ -31,7 +31,7 @@ private[spark]
class BlockRDD[T: ClassTag](@transient sc: SparkContext, @transient val blockIds: Array[BlockId])
extends RDD[T](sc, Nil) {
@transient lazy val locations_ = BlockManager.blockIdsToHosts(blockIds, SparkEnv.get)
@transient lazy val _locations = BlockManager.blockIdsToHosts(blockIds, SparkEnv.get)
@volatile private var _isValid = true
override def getPartitions: Array[Partition] = {
@ -54,7 +54,7 @@ class BlockRDD[T: ClassTag](@transient sc: SparkContext, @transient val blockIds
override def getPreferredLocations(split: Partition): Seq[String] = {
assertValid()
locations_(split.asInstanceOf[BlockRDDPartition].blockId)
_locations(split.asInstanceOf[BlockRDDPartition].blockId)
}
/**
@ -79,14 +79,14 @@ class BlockRDD[T: ClassTag](@transient sc: SparkContext, @transient val blockIds
/** Check if this BlockRDD is valid. If not valid, exception is thrown. */
private[spark] def assertValid() {
if (!_isValid) {
if (!isValid) {
throw new SparkException(
"Attempted to use %s after its blocks have been removed!".format(toString))
}
}
protected def getBlockIdLocations(): Map[BlockId, Seq[String]] = {
locations_
_locations
}
}

View file

@ -20,11 +20,11 @@ package org.apache.spark.streaming.dstream
import scala.reflect.ClassTag
import org.apache.spark.rdd.{BlockRDD, RDD}
import org.apache.spark.storage.{BlockId, StorageLevel}
import org.apache.spark.storage.BlockId
import org.apache.spark.streaming._
import org.apache.spark.streaming.rdd.WriteAheadLogBackedBlockRDD
import org.apache.spark.streaming.receiver.{Receiver, WriteAheadLogBasedStoreResult}
import org.apache.spark.streaming.scheduler.{InputInfo, ReceivedBlockInfo}
import org.apache.spark.streaming.receiver.Receiver
import org.apache.spark.streaming.util.WriteAheadLogUtils
/**
* Abstract class for defining any [[org.apache.spark.streaming.dstream.InputDStream]]
@ -64,31 +64,30 @@ abstract class ReceiverInputDStream[T: ClassTag](@transient ssc_ : StreamingCont
} else {
// Otherwise, ask the tracker for all the blocks that have been allocated to this stream
// for this batch
val blockInfos =
ssc.scheduler.receiverTracker.getBlocksOfBatch(validTime).get(id).getOrElse(Seq.empty)
val blockStoreResults = blockInfos.map { _.blockStoreResult }
val blockIds = blockStoreResults.map { _.blockId.asInstanceOf[BlockId] }.toArray
val receiverTracker = ssc.scheduler.receiverTracker
val blockInfos = receiverTracker.getBlocksOfBatch(validTime).getOrElse(id, Seq.empty)
val blockIds = blockInfos.map { _.blockId.asInstanceOf[BlockId] }.toArray
// Register the input blocks information into InputInfoTracker
val inputInfo = InputInfo(id, blockInfos.map(_.numRecords).sum)
ssc.scheduler.inputInfoTracker.reportInfo(validTime, inputInfo)
// Are WAL record handles present with all the blocks
val areWALRecordHandlesPresent = blockInfos.forall { _.walRecordHandleOption.nonEmpty }
// Check whether all the results are of the same type
val resultTypes = blockStoreResults.map { _.getClass }.distinct
if (resultTypes.size > 1) {
logWarning("Multiple result types in block information, WAL information will be ignored.")
}
// If all the results are of type WriteAheadLogBasedStoreResult, then create
// WriteAheadLogBackedBlockRDD else create simple BlockRDD.
if (resultTypes.size == 1 && resultTypes.head == classOf[WriteAheadLogBasedStoreResult]) {
val logSegments = blockStoreResults.map {
_.asInstanceOf[WriteAheadLogBasedStoreResult].walRecordHandle
}.toArray
// Since storeInBlockManager = false, the storage level does not matter.
new WriteAheadLogBackedBlockRDD[T](ssc.sparkContext,
blockIds, logSegments, storeInBlockManager = false, StorageLevel.MEMORY_ONLY_SER)
if (areWALRecordHandlesPresent) {
// If all the blocks have WAL record handle, then create a WALBackedBlockRDD
val isBlockIdValid = blockInfos.map { _.isBlockIdValid() }.toArray
val walRecordHandles = blockInfos.map { _.walRecordHandleOption.get }.toArray
new WriteAheadLogBackedBlockRDD[T](
ssc.sparkContext, blockIds, walRecordHandles, isBlockIdValid)
} else {
// Else, create a BlockRDD. However, if there are some blocks with WAL info but not others
// then that is unexpected and log a warning accordingly.
if (blockInfos.find(_.walRecordHandleOption.nonEmpty).nonEmpty) {
if (WriteAheadLogUtils.enableReceiverLog(ssc.conf)) {
logError("Some blocks do not have Write Ahead Log information; " +
"this is unexpected and data may not be recoverable after driver failures")
} else {
logWarning("Some blocks have Write Ahead Log information; this is unexpected")
}
}
new BlockRDD[T](ssc.sc, blockIds)
}
}

View file

@ -23,6 +23,8 @@ import java.util.UUID
import scala.reflect.ClassTag
import scala.util.control.NonFatal
import org.apache.commons.io.FileUtils
import org.apache.spark._
import org.apache.spark.rdd.BlockRDD
import org.apache.spark.storage.{BlockId, StorageLevel}
@ -31,30 +33,42 @@ import org.apache.spark.streaming.util._
/**
* Partition class for [[org.apache.spark.streaming.rdd.WriteAheadLogBackedBlockRDD]].
* It contains information about the id of the blocks having this partition's data and
* the segment of the write ahead log that backs the partition.
* the corresponding record handle in the write ahead log that backs the partition.
* @param index index of the partition
* @param blockId id of the block having the partition data
* @param isBlockIdValid Whether the block Ids are valid (i.e., the blocks are present in the Spark
* executors). If not, then block lookups by the block ids will be skipped.
* By default, this is an empty array signifying true for all the blocks.
* @param walRecordHandle Handle of the record in a write ahead log having the partition data
*/
private[streaming]
class WriteAheadLogBackedBlockRDDPartition(
val index: Int,
val blockId: BlockId,
val walRecordHandle: WriteAheadLogRecordHandle)
extends Partition
val isBlockIdValid: Boolean,
val walRecordHandle: WriteAheadLogRecordHandle
) extends Partition
/**
* This class represents a special case of the BlockRDD where the data blocks in
* the block manager are also backed by data in write ahead logs. For reading
* the data, this RDD first looks up the blocks by their ids in the block manager.
* If it does not find them, it looks up the corresponding data in the write ahead log.
* If it does not find them, it looks up the WAL using the corresponding record handle.
* The lookup of the blocks from the block manager can be skipped by setting the corresponding
* element in isBlockIdValid to false. This is a performance optimization which does not affect
* correctness, and it can be used in situations where it is known that the block
* does not exist in the Spark executors (e.g. after a failed driver is restarted).
*
*
* @param sc SparkContext
* @param blockIds Ids of the blocks that contains this RDD's data
* @param walRecordHandles Record handles in write ahead logs that contain this RDD's data
* @param storeInBlockManager Whether to store in the block manager after reading
* from the WAL record
* @param isBlockIdValid Whether the block Ids are valid (i.e., the blocks are present in the Spark
* executors). If not, then block lookups by the block ids will be skipped.
* By default, this is an empty array signifying true for all the blocks.
* @param storeInBlockManager Whether to store a block in the block manager
* after reading it from the WAL
* @param storageLevel storage level to store when storing in block manager
* (applicable when storeInBlockManager = true)
*/
@ -63,23 +77,32 @@ class WriteAheadLogBackedBlockRDD[T: ClassTag](
@transient sc: SparkContext,
@transient blockIds: Array[BlockId],
@transient walRecordHandles: Array[WriteAheadLogRecordHandle],
storeInBlockManager: Boolean,
storageLevel: StorageLevel)
@transient isBlockIdValid: Array[Boolean] = Array.empty,
storeInBlockManager: Boolean = false,
storageLevel: StorageLevel = StorageLevel.MEMORY_ONLY_SER)
extends BlockRDD[T](sc, blockIds) {
require(
blockIds.length == walRecordHandles.length,
s"Number of block ids (${blockIds.length}) must be " +
s"the same as number of WAL record handles (${walRecordHandles.length}})!")
s"Number of block Ids (${blockIds.length}) must be " +
s" same as number of WAL record handles (${walRecordHandles.length}})")
require(
isBlockIdValid.isEmpty || isBlockIdValid.length == blockIds.length,
s"Number of elements in isBlockIdValid (${isBlockIdValid.length}) must be " +
s" same as number of block Ids (${blockIds.length})")
// Hadoop configuration is not serializable, so broadcast it as a serializable.
@transient private val hadoopConfig = sc.hadoopConfiguration
private val broadcastedHadoopConf = new SerializableWritable(hadoopConfig)
override def isValid(): Boolean = true
override def getPartitions: Array[Partition] = {
assertValid()
Array.tabulate(blockIds.size) { i =>
new WriteAheadLogBackedBlockRDDPartition(i, blockIds(i), walRecordHandles(i))
Array.tabulate(blockIds.length) { i =>
val isValid = if (isBlockIdValid.length == 0) true else isBlockIdValid(i)
new WriteAheadLogBackedBlockRDDPartition(i, blockIds(i), isValid, walRecordHandles(i))
}
}
@ -94,12 +117,12 @@ class WriteAheadLogBackedBlockRDD[T: ClassTag](
val blockManager = SparkEnv.get.blockManager
val partition = split.asInstanceOf[WriteAheadLogBackedBlockRDDPartition]
val blockId = partition.blockId
blockManager.get(blockId) match {
case Some(block) => // Data is in Block Manager
val iterator = block.data.asInstanceOf[Iterator[T]]
logDebug(s"Read partition data of $this from block manager, block $blockId")
iterator
case None => // Data not found in Block Manager, grab it from write ahead log file
def getBlockFromBlockManager(): Option[Iterator[T]] = {
blockManager.get(blockId).map(_.data.asInstanceOf[Iterator[T]])
}
def getBlockFromWriteAheadLog(): Iterator[T] = {
var dataRead: ByteBuffer = null
var writeAheadLog: WriteAheadLog = null
try {
@ -140,6 +163,12 @@ class WriteAheadLogBackedBlockRDD[T: ClassTag](
}
blockManager.dataDeserialize(blockId, dataRead).asInstanceOf[Iterator[T]]
}
if (partition.isBlockIdValid) {
getBlockFromBlockManager().getOrElse { getBlockFromWriteAheadLog() }
} else {
getBlockFromWriteAheadLog()
}
}
/**
@ -149,12 +178,23 @@ class WriteAheadLogBackedBlockRDD[T: ClassTag](
*/
override def getPreferredLocations(split: Partition): Seq[String] = {
val partition = split.asInstanceOf[WriteAheadLogBackedBlockRDDPartition]
val blockLocations = getBlockIdLocations().get(partition.blockId)
val blockLocations = if (partition.isBlockIdValid) {
getBlockIdLocations().get(partition.blockId)
} else {
None
}
blockLocations.getOrElse {
partition.walRecordHandle match {
case fileSegment: FileBasedWriteAheadLogSegment =>
try {
HdfsUtils.getFileSegmentLocations(
fileSegment.path, fileSegment.offset, fileSegment.length, hadoopConfig)
} catch {
case NonFatal(e) =>
logError("Error getting WAL file segment locations", e)
Seq.empty
}
case _ =>
Seq.empty
}

View file

@ -146,7 +146,7 @@ private[streaming] class ReceiverSupervisorImpl(
val blockStoreResult = receivedBlockHandler.storeBlock(blockId, receivedBlock)
logDebug(s"Pushed block $blockId in ${(System.currentTimeMillis - time)} ms")
val blockInfo = ReceivedBlockInfo(streamId, numRecords, blockStoreResult)
val blockInfo = ReceivedBlockInfo(streamId, numRecords, metadataOption, blockStoreResult)
trackerEndpoint.askWithRetry[Boolean](AddBlock(blockInfo))
logDebug(s"Reported block $blockId")
}

View file

@ -17,12 +17,38 @@
package org.apache.spark.streaming.scheduler
import org.apache.spark.streaming.receiver.ReceivedBlockStoreResult
import org.apache.spark.storage.StreamBlockId
import org.apache.spark.streaming.receiver.{ReceivedBlockStoreResult, WriteAheadLogBasedStoreResult}
import org.apache.spark.streaming.util.WriteAheadLogRecordHandle
/** Information about blocks received by the receiver */
private[streaming] case class ReceivedBlockInfo(
streamId: Int,
numRecords: Long,
metadataOption: Option[Any],
blockStoreResult: ReceivedBlockStoreResult
)
) {
@volatile private var _isBlockIdValid = true
def blockId: StreamBlockId = blockStoreResult.blockId
def walRecordHandleOption: Option[WriteAheadLogRecordHandle] = {
blockStoreResult match {
case walStoreResult: WriteAheadLogBasedStoreResult => Some(walStoreResult.walRecordHandle)
case _ => None
}
}
/** Is the block ID valid, that is, is the block present in the Spark executors. */
def isBlockIdValid(): Boolean = _isBlockIdValid
/**
* Set the block ID as invalid. This is useful when it is known that the block is not present
* in the Spark executors.
*/
def setBlockIdInvalid(): Unit = {
_isBlockIdValid = false
}
}

View file

@ -45,7 +45,7 @@ private[streaming] case class BatchCleanupEvent(times: Seq[Time])
private[streaming]
case class AllocatedBlocks(streamIdToAllocatedBlocks: Map[Int, Seq[ReceivedBlockInfo]]) {
def getBlocksOfStream(streamId: Int): Seq[ReceivedBlockInfo] = {
streamIdToAllocatedBlocks.get(streamId).getOrElse(Seq.empty)
streamIdToAllocatedBlocks.getOrElse(streamId, Seq.empty)
}
}
@ -63,6 +63,7 @@ private[streaming] class ReceivedBlockTracker(
hadoopConf: Configuration,
streamIds: Seq[Int],
clock: Clock,
recoverFromWriteAheadLog: Boolean,
checkpointDirOption: Option[String])
extends Logging {
@ -75,7 +76,9 @@ private[streaming] class ReceivedBlockTracker(
private var lastAllocatedBatchTime: Time = null
// Recover block information from write ahead logs
recoverFromWriteAheadLogs()
if (recoverFromWriteAheadLog) {
recoverPastEvents()
}
/** Add received block. This event will get written to the write ahead log (if enabled). */
def addBlock(receivedBlockInfo: ReceivedBlockInfo): Boolean = synchronized {
@ -167,10 +170,11 @@ private[streaming] class ReceivedBlockTracker(
* Recover all the tracker actions from the write ahead logs to recover the state (unallocated
* and allocated block info) prior to failure.
*/
private def recoverFromWriteAheadLogs(): Unit = synchronized {
private def recoverPastEvents(): Unit = synchronized {
// Insert the recovered block information
def insertAddedBlock(receivedBlockInfo: ReceivedBlockInfo) {
logTrace(s"Recovery: Inserting added block $receivedBlockInfo")
receivedBlockInfo.setBlockIdInvalid()
getReceivedBlockQueue(receivedBlockInfo.streamId) += receivedBlockInfo
}
@ -224,19 +228,9 @@ private[streaming] class ReceivedBlockTracker(
/** Optionally create the write ahead log manager only if the feature is enabled */
private def createWriteAheadLog(): Option[WriteAheadLog] = {
if (WriteAheadLogUtils.enableReceiverLog(conf)) {
if (checkpointDirOption.isEmpty) {
throw new SparkException(
"Cannot enable receiver write-ahead log without checkpoint directory set. " +
"Please use streamingContext.checkpoint() to set the checkpoint directory. " +
"See documentation for more details.")
}
checkpointDirOption.map { checkpointDir =>
val logDir = ReceivedBlockTracker.checkpointDirToLogDir(checkpointDirOption.get)
val log = WriteAheadLogUtils.createLogForDriver(conf, logDir, hadoopConf)
Some(log)
} else {
None
WriteAheadLogUtils.createLogForDriver(conf, logDir, hadoopConf)
}
}

View file

@ -62,6 +62,7 @@ class ReceiverTracker(ssc: StreamingContext, skipReceiverLaunch: Boolean = false
ssc.sparkContext.hadoopConfiguration,
receiverInputStreamIds,
ssc.scheduler.clock,
ssc.isCheckpointPresent,
Option(ssc.checkpointDir)
)
private val listenerBus = ssc.scheduler.listenerBus

View file

@ -27,7 +27,7 @@ private[streaming] object HdfsUtils {
// If the file exists and we have append support, append instead of creating a new file
val stream: FSDataOutputStream = {
if (dfs.isFile(dfsPath)) {
if (conf.getBoolean("hdfs.append.support", false)) {
if (conf.getBoolean("hdfs.append.support", false) || dfs.isInstanceOf[RawLocalFileSystem]) {
dfs.append(dfsPath)
} else {
throw new IllegalStateException("File exists and there is no append support!")

View file

@ -67,15 +67,20 @@ class ReceivedBlockTrackerSuite
// Verify added blocks are unallocated blocks
receivedBlockTracker.getUnallocatedBlocks(streamId) shouldEqual blockInfos
receivedBlockTracker.hasUnallocatedReceivedBlocks should be (true)
// Allocate the blocks to a batch and verify that all of them have been allocated
receivedBlockTracker.allocateBlocksToBatch(1)
receivedBlockTracker.getBlocksOfBatchAndStream(1, streamId) shouldEqual blockInfos
receivedBlockTracker.getBlocksOfBatch(1) shouldEqual Map(streamId -> blockInfos)
receivedBlockTracker.getUnallocatedBlocks(streamId) shouldBe empty
receivedBlockTracker.hasUnallocatedReceivedBlocks should be (false)
// Allocate no blocks to another batch
receivedBlockTracker.allocateBlocksToBatch(2)
receivedBlockTracker.getBlocksOfBatchAndStream(2, streamId) shouldBe empty
receivedBlockTracker.getBlocksOfBatch(2) shouldEqual Map(streamId -> Seq.empty)
// Verify that older batches have no operation on batch allocation,
// will return the same blocks as previously allocated.
@ -88,7 +93,7 @@ class ReceivedBlockTrackerSuite
receivedBlockTracker.getUnallocatedBlocks(streamId) shouldEqual blockInfos
}
test("block addition, block to batch allocation and clean up with write ahead log") {
test("recovery and cleanup with write ahead logs") {
val manualClock = new ManualClock
// Set the time increment level to twice the rotation interval so that every increment creates
// a new log file
@ -114,9 +119,7 @@ class ReceivedBlockTrackerSuite
}
// Set WAL configuration
conf.set("spark.streaming.receiver.writeAheadLog.enable", "true")
conf.set("spark.streaming.driver.writeAheadLog.rollingIntervalSecs", "1")
require(WriteAheadLogUtils.enableReceiverLog(conf))
require(WriteAheadLogUtils.getRollingIntervalSecs(conf, isDriver = true) === 1)
// Start tracker and add blocks
@ -131,15 +134,27 @@ class ReceivedBlockTrackerSuite
getWrittenLogData() shouldEqual expectedWrittenData1
getWriteAheadLogFiles() should have size 1
// Restart tracker and verify recovered list of unallocated blocks
incrementTime()
val tracker2 = createTracker(clock = manualClock)
tracker2.getUnallocatedBlocks(streamId).toList shouldEqual blockInfos1
// Recovery without recovery from WAL and verify list of unallocated blocks is empty
val tracker1_ = createTracker(clock = manualClock, recoverFromWriteAheadLog = false)
tracker1_.getUnallocatedBlocks(streamId) shouldBe empty
tracker1_.hasUnallocatedReceivedBlocks should be (false)
// Restart tracker and verify recovered list of unallocated blocks
val tracker2 = createTracker(clock = manualClock, recoverFromWriteAheadLog = true)
val unallocatedBlocks = tracker2.getUnallocatedBlocks(streamId).toList
unallocatedBlocks shouldEqual blockInfos1
unallocatedBlocks.foreach { block =>
block.isBlockIdValid() should be (false)
}
// Allocate blocks to batch and verify whether the unallocated blocks got allocated
val batchTime1 = manualClock.getTimeMillis()
tracker2.allocateBlocksToBatch(batchTime1)
tracker2.getBlocksOfBatchAndStream(batchTime1, streamId) shouldEqual blockInfos1
tracker2.getBlocksOfBatch(batchTime1) shouldEqual Map(streamId -> blockInfos1)
// Add more blocks and allocate to another batch
incrementTime()
@ -157,7 +172,7 @@ class ReceivedBlockTrackerSuite
// Restart tracker and verify recovered state
incrementTime()
val tracker3 = createTracker(clock = manualClock)
val tracker3 = createTracker(clock = manualClock, recoverFromWriteAheadLog = true)
tracker3.getBlocksOfBatchAndStream(batchTime1, streamId) shouldEqual blockInfos1
tracker3.getBlocksOfBatchAndStream(batchTime2, streamId) shouldEqual blockInfos2
tracker3.getUnallocatedBlocks(streamId) shouldBe empty
@ -180,28 +195,16 @@ class ReceivedBlockTrackerSuite
// Restart tracker and verify recovered state, specifically whether info about the first
// batch has been removed, but not the second batch
incrementTime()
val tracker4 = createTracker(clock = manualClock)
val tracker4 = createTracker(clock = manualClock, recoverFromWriteAheadLog = true)
tracker4.getUnallocatedBlocks(streamId) shouldBe empty
tracker4.getBlocksOfBatchAndStream(batchTime1, streamId) shouldBe empty // should be cleaned
tracker4.getBlocksOfBatchAndStream(batchTime2, streamId) shouldEqual blockInfos2
}
test("enabling write ahead log but not setting checkpoint dir") {
conf.set("spark.streaming.receiver.writeAheadLog.enable", "true")
intercept[SparkException] {
createTracker(setCheckpointDir = false)
}
}
test("setting checkpoint dir but not enabling write ahead log") {
// When WAL config is not set, log manager should not be enabled
val tracker1 = createTracker(setCheckpointDir = true)
test("disable write ahead log when checkpoint directory is not set") {
// When checkpoint is disabled, then the write ahead log is disabled
val tracker1 = createTracker(setCheckpointDir = false)
tracker1.isWriteAheadLogEnabled should be (false)
// When WAL is explicitly disabled, log manager should not be enabled
conf.set("spark.streaming.receiver.writeAheadLog.enable", "false")
val tracker2 = createTracker(setCheckpointDir = true)
tracker2.isWriteAheadLogEnabled should be(false)
}
/**
@ -210,16 +213,18 @@ class ReceivedBlockTrackerSuite
*/
def createTracker(
setCheckpointDir: Boolean = true,
recoverFromWriteAheadLog: Boolean = false,
clock: Clock = new SystemClock): ReceivedBlockTracker = {
val cpDirOption = if (setCheckpointDir) Some(checkpointDirectory.toString) else None
val tracker = new ReceivedBlockTracker(conf, hadoopConf, Seq(streamId), clock, cpDirOption)
val tracker = new ReceivedBlockTracker(
conf, hadoopConf, Seq(streamId), clock, recoverFromWriteAheadLog, cpDirOption)
allReceivedBlockTrackers += tracker
tracker
}
/** Generate blocks infos using random ids */
def generateBlockInfos(): Seq[ReceivedBlockInfo] = {
List.fill(5)(ReceivedBlockInfo(streamId, 0,
List.fill(5)(ReceivedBlockInfo(streamId, 0, None,
BlockManagerBasedStoreResult(StreamBlockId(streamId, math.abs(Random.nextInt)))))
}

View file

@ -26,7 +26,7 @@ import org.scalatest.{BeforeAndAfterAll, BeforeAndAfterEach, FunSuite}
import org.apache.spark.storage.{BlockId, BlockManager, StorageLevel, StreamBlockId}
import org.apache.spark.streaming.util.{FileBasedWriteAheadLogSegment, FileBasedWriteAheadLogWriter}
import org.apache.spark.util.Utils
import org.apache.spark.{SparkConf, SparkContext}
import org.apache.spark.{SparkConf, SparkContext, SparkException}
class WriteAheadLogBackedBlockRDDSuite
extends FunSuite with BeforeAndAfterAll with BeforeAndAfterEach {
@ -60,24 +60,35 @@ class WriteAheadLogBackedBlockRDDSuite
System.clearProperty("spark.driver.port")
}
test("Read data available in block manager and write ahead log") {
testRDD(5, 5)
test("Read data available in both block manager and write ahead log") {
testRDD(numPartitions = 5, numPartitionsInBM = 5, numPartitionsInWAL = 5)
}
test("Read data available only in block manager, not in write ahead log") {
testRDD(5, 0)
testRDD(numPartitions = 5, numPartitionsInBM = 5, numPartitionsInWAL = 0)
}
test("Read data available only in write ahead log, not in block manager") {
testRDD(0, 5)
}
test("Read data available only in write ahead log, and test storing in block manager") {
testRDD(0, 5, testStoreInBM = true)
testRDD(numPartitions = 5, numPartitionsInBM = 0, numPartitionsInWAL = 5)
}
test("Read data with partially available in block manager, and rest in write ahead log") {
testRDD(3, 2)
testRDD(numPartitions = 5, numPartitionsInBM = 3, numPartitionsInWAL = 2)
}
test("Test isBlockValid skips block fetching from BlockManager") {
testRDD(
numPartitions = 5, numPartitionsInBM = 5, numPartitionsInWAL = 0, testIsBlockValid = true)
}
test("Test whether RDD is valid after removing blocks from block manager") {
testRDD(
numPartitions = 5, numPartitionsInBM = 5, numPartitionsInWAL = 5, testBlockRemove = true)
}
test("Test storing of blocks recovered from write ahead log back into block manager") {
testRDD(
numPartitions = 5, numPartitionsInBM = 0, numPartitionsInWAL = 5, testStoreInBM = true)
}
/**
@ -85,23 +96,52 @@ class WriteAheadLogBackedBlockRDDSuite
* and the rest to a write ahead log, and then reading reading it all back using the RDD.
* It can also test if the partitions that were read from the log were again stored in
* block manager.
* @param numPartitionsInBM Number of partitions to write to the Block Manager
* @param numPartitionsInWAL Number of partitions to write to the Write Ahead Log
*
*
*
* @param numPartitions Number of partitions in RDD
* @param numPartitionsInBM Number of partitions to write to the BlockManager.
* Partitions 0 to (numPartitionsInBM-1) will be written to BlockManager
* @param numPartitionsInWAL Number of partitions to write to the Write Ahead Log.
* Partitions (numPartitions - 1 - numPartitionsInWAL) to
* (numPartitions - 1) will be written to WAL
* @param testIsBlockValid Test whether setting isBlockValid to false skips block fetching
* @param testBlockRemove Test whether calling rdd.removeBlock() makes the RDD still usable with
* reads falling back to the WAL
* @param testStoreInBM Test whether blocks read from log are stored back into block manager
*
* Example with numPartitions = 5, numPartitionsInBM = 3, and numPartitionsInWAL = 4
*
* numPartitionsInBM = 3
* |------------------|
* | |
* 0 1 2 3 4
* | |
* |-------------------------|
* numPartitionsInWAL = 4
*/
private def testRDD(
numPartitionsInBM: Int, numPartitionsInWAL: Int, testStoreInBM: Boolean = false) {
val numBlocks = numPartitionsInBM + numPartitionsInWAL
val data = Seq.fill(numBlocks, 10)(scala.util.Random.nextString(50))
numPartitions: Int,
numPartitionsInBM: Int,
numPartitionsInWAL: Int,
testIsBlockValid: Boolean = false,
testBlockRemove: Boolean = false,
testStoreInBM: Boolean = false
) {
require(numPartitionsInBM <= numPartitions,
"Can't put more partitions in BlockManager than that in RDD")
require(numPartitionsInWAL <= numPartitions,
"Can't put more partitions in write ahead log than that in RDD")
val data = Seq.fill(numPartitions, 10)(scala.util.Random.nextString(50))
// Put the necessary blocks in the block manager
val blockIds = Array.fill(numBlocks)(StreamBlockId(Random.nextInt(), Random.nextInt()))
val blockIds = Array.fill(numPartitions)(StreamBlockId(Random.nextInt(), Random.nextInt()))
data.zip(blockIds).take(numPartitionsInBM).foreach { case(block, blockId) =>
blockManager.putIterator(blockId, block.iterator, StorageLevel.MEMORY_ONLY_SER)
}
// Generate write ahead log file segments
val recordHandles = generateFakeRecordHandles(numPartitionsInBM) ++
// Generate write ahead log record handles
val recordHandles = generateFakeRecordHandles(numPartitions - numPartitionsInWAL) ++
generateWALRecordHandles(data.takeRight(numPartitionsInWAL),
blockIds.takeRight(numPartitionsInWAL))
@ -111,7 +151,7 @@ class WriteAheadLogBackedBlockRDDSuite
"Expected blocks not in BlockManager"
)
require(
blockIds.takeRight(numPartitionsInWAL).forall(blockManager.get(_).isEmpty),
blockIds.takeRight(numPartitions - numPartitionsInBM).forall(blockManager.get(_).isEmpty),
"Unexpected blocks in BlockManager"
)
@ -122,19 +162,42 @@ class WriteAheadLogBackedBlockRDDSuite
"Expected blocks not in write ahead log"
)
require(
recordHandles.take(numPartitionsInBM).forall(s =>
recordHandles.take(numPartitions - numPartitionsInWAL).forall(s =>
!new File(s.path.stripPrefix("file://")).exists()),
"Unexpected blocks in write ahead log"
)
// Create the RDD and verify whether the returned data is correct
val rdd = new WriteAheadLogBackedBlockRDD[String](sparkContext, blockIds.toArray,
recordHandles.toArray, storeInBlockManager = false, StorageLevel.MEMORY_ONLY)
recordHandles.toArray, storeInBlockManager = false)
assert(rdd.collect() === data.flatten)
// Verify that the block fetching is skipped when isBlockValid is set to false.
// This is done by using a RDD whose data is only in memory but is set to skip block fetching
// Using that RDD will throw exception, as it skips block fetching even if the blocks are in
// in BlockManager.
if (testIsBlockValid) {
require(numPartitionsInBM === numPartitions, "All partitions must be in BlockManager")
require(numPartitionsInWAL === 0, "No partitions must be in WAL")
val rdd2 = new WriteAheadLogBackedBlockRDD[String](sparkContext, blockIds.toArray,
recordHandles.toArray, isBlockIdValid = Array.fill(blockIds.length)(false))
intercept[SparkException] {
rdd2.collect()
}
}
// Verify that the RDD is not invalid after the blocks are removed and can still read data
// from write ahead log
if (testBlockRemove) {
require(numPartitions === numPartitionsInWAL, "All partitions must be in WAL for this test")
require(numPartitionsInBM > 0, "Some partitions must be in BlockManager for this test")
rdd.removeBlocks()
assert(rdd.collect() === data.flatten)
}
if (testStoreInBM) {
val rdd2 = new WriteAheadLogBackedBlockRDD[String](sparkContext, blockIds.toArray,
recordHandles.toArray, storeInBlockManager = true, StorageLevel.MEMORY_ONLY)
recordHandles.toArray, storeInBlockManager = true, storageLevel = StorageLevel.MEMORY_ONLY)
assert(rdd2.collect() === data.flatten)
assert(
blockIds.forall(blockManager.get(_).nonEmpty),