diff --git a/streaming/src/main/scala/org/apache/spark/streaming/dstream/ReceiverInputDStream.scala b/streaming/src/main/scala/org/apache/spark/streaming/dstream/ReceiverInputDStream.scala index 391e40924f..bb47d373de 100644 --- a/streaming/src/main/scala/org/apache/spark/streaming/dstream/ReceiverInputDStream.scala +++ b/streaming/src/main/scala/org/apache/spark/streaming/dstream/ReceiverInputDStream.scala @@ -23,8 +23,9 @@ import scala.reflect.ClassTag import org.apache.spark.rdd.{BlockRDD, RDD} import org.apache.spark.storage.BlockId import org.apache.spark.streaming._ -import org.apache.spark.streaming.receiver.Receiver +import org.apache.spark.streaming.receiver.{WriteAheadLogBasedStoreResult, BlockManagerBasedStoreResult, Receiver} import org.apache.spark.streaming.scheduler.ReceivedBlockInfo +import org.apache.spark.SparkException /** * Abstract class for defining any [[org.apache.spark.streaming.dstream.InputDStream]] @@ -65,10 +66,10 @@ abstract class ReceiverInputDStream[T: ClassTag](@transient ssc_ : StreamingCont if (validTime >= graph.startTime) { val blockInfo = ssc.scheduler.receiverTracker.getReceivedBlockInfo(id) receivedBlockInfo(validTime) = blockInfo - val blockIds = blockInfo.map(_.blockId.asInstanceOf[BlockId]) + val blockIds = blockInfo.map { _.blockStoreResult.blockId.asInstanceOf[BlockId] } Some(new BlockRDD[T](ssc.sc, blockIds)) } else { - Some(new BlockRDD[T](ssc.sc, Array[BlockId]())) + Some(new BlockRDD[T](ssc.sc, Array.empty)) } } diff --git a/streaming/src/main/scala/org/apache/spark/streaming/receiver/ReceivedBlock.scala b/streaming/src/main/scala/org/apache/spark/streaming/receiver/ReceivedBlock.scala new file mode 100644 index 0000000000..47968afef2 --- /dev/null +++ b/streaming/src/main/scala/org/apache/spark/streaming/receiver/ReceivedBlock.scala @@ -0,0 +1,35 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.spark.streaming.receiver + +import java.nio.ByteBuffer + +import scala.collection.mutable.ArrayBuffer +import scala.language.existentials + +/** Trait representing a received block */ +private[streaming] sealed trait ReceivedBlock + +/** class representing a block received as an ArrayBuffer */ +private[streaming] case class ArrayBufferBlock(arrayBuffer: ArrayBuffer[_]) extends ReceivedBlock + +/** class representing a block received as an Iterator */ +private[streaming] case class IteratorBlock(iterator: Iterator[_]) extends ReceivedBlock + +/** class representing a block received as an ByteBuffer */ +private[streaming] case class ByteBufferBlock(byteBuffer: ByteBuffer) extends ReceivedBlock diff --git a/streaming/src/main/scala/org/apache/spark/streaming/receiver/ReceivedBlockHandler.scala b/streaming/src/main/scala/org/apache/spark/streaming/receiver/ReceivedBlockHandler.scala new file mode 100644 index 0000000000..fdf995320b --- /dev/null +++ b/streaming/src/main/scala/org/apache/spark/streaming/receiver/ReceivedBlockHandler.scala @@ -0,0 +1,193 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.spark.streaming.receiver + +import scala.concurrent.{Await, ExecutionContext, Future} +import scala.concurrent.duration._ +import scala.language.{existentials, postfixOps} + +import WriteAheadLogBasedBlockHandler._ +import org.apache.hadoop.conf.Configuration +import org.apache.hadoop.fs.Path + +import org.apache.spark.{Logging, SparkConf, SparkException} +import org.apache.spark.storage._ +import org.apache.spark.streaming.util.{Clock, SystemClock, WriteAheadLogFileSegment, WriteAheadLogManager} +import org.apache.spark.util.Utils + +/** Trait that represents the metadata related to storage of blocks */ +private[streaming] trait ReceivedBlockStoreResult { + def blockId: StreamBlockId // Any implementation of this trait will store a block id +} + +/** Trait that represents a class that handles the storage of blocks received by receiver */ +private[streaming] trait ReceivedBlockHandler { + + /** Store a received block with the given block id and return related metadata */ + def storeBlock(blockId: StreamBlockId, receivedBlock: ReceivedBlock): ReceivedBlockStoreResult + + /** Cleanup old blocks older than the given threshold time */ + def cleanupOldBlock(threshTime: Long) +} + + +/** + * Implementation of [[org.apache.spark.streaming.receiver.ReceivedBlockStoreResult]] + * that stores the metadata related to storage of blocks using + * [[org.apache.spark.streaming.receiver.BlockManagerBasedBlockHandler]] + */ +private[streaming] case class BlockManagerBasedStoreResult(blockId: StreamBlockId) + extends ReceivedBlockStoreResult + + +/** + * Implementation of a [[org.apache.spark.streaming.receiver.ReceivedBlockHandler]] which + * stores the received blocks into a block manager with the specified storage level. + */ +private[streaming] class BlockManagerBasedBlockHandler( + blockManager: BlockManager, storageLevel: StorageLevel) + extends ReceivedBlockHandler with Logging { + + def storeBlock(blockId: StreamBlockId, block: ReceivedBlock): ReceivedBlockStoreResult = { + val putResult: Seq[(BlockId, BlockStatus)] = block match { + case ArrayBufferBlock(arrayBuffer) => + blockManager.putIterator(blockId, arrayBuffer.iterator, storageLevel, tellMaster = true) + case IteratorBlock(iterator) => + blockManager.putIterator(blockId, iterator, storageLevel, tellMaster = true) + case ByteBufferBlock(byteBuffer) => + blockManager.putBytes(blockId, byteBuffer, storageLevel, tellMaster = true) + case o => + throw new SparkException( + s"Could not store $blockId to block manager, unexpected block type ${o.getClass.getName}") + } + if (!putResult.map { _._1 }.contains(blockId)) { + throw new SparkException( + s"Could not store $blockId to block manager with storage level $storageLevel") + } + BlockManagerBasedStoreResult(blockId) + } + + def cleanupOldBlock(threshTime: Long) { + // this is not used as blocks inserted into the BlockManager are cleared by DStream's clearing + // of BlockRDDs. + } +} + + +/** + * Implementation of [[org.apache.spark.streaming.receiver.ReceivedBlockStoreResult]] + * that stores the metadata related to storage of blocks using + * [[org.apache.spark.streaming.receiver.WriteAheadLogBasedBlockHandler]] + */ +private[streaming] case class WriteAheadLogBasedStoreResult( + blockId: StreamBlockId, + segment: WriteAheadLogFileSegment + ) extends ReceivedBlockStoreResult + + +/** + * Implementation of a [[org.apache.spark.streaming.receiver.ReceivedBlockHandler]] which + * stores the received blocks in both, a write ahead log and a block manager. + */ +private[streaming] class WriteAheadLogBasedBlockHandler( + blockManager: BlockManager, + streamId: Int, + storageLevel: StorageLevel, + conf: SparkConf, + hadoopConf: Configuration, + checkpointDir: String, + clock: Clock = new SystemClock + ) extends ReceivedBlockHandler with Logging { + + private val blockStoreTimeout = conf.getInt( + "spark.streaming.receiver.blockStoreTimeout", 30).seconds + private val rollingInterval = conf.getInt( + "spark.streaming.receiver.writeAheadLog.rollingInterval", 60) + private val maxFailures = conf.getInt( + "spark.streaming.receiver.writeAheadLog.maxFailures", 3) + + // Manages rolling log files + private val logManager = new WriteAheadLogManager( + checkpointDirToLogDir(checkpointDir, streamId), + hadoopConf, rollingInterval, maxFailures, + callerName = this.getClass.getSimpleName, + clock = clock + ) + + // For processing futures used in parallel block storing into block manager and write ahead log + // # threads = 2, so that both writing to BM and WAL can proceed in parallel + implicit private val executionContext = ExecutionContext.fromExecutorService( + Utils.newDaemonFixedThreadPool(2, this.getClass.getSimpleName)) + + /** + * This implementation stores the block into the block manager as well as a write ahead log. + * It does this in parallel, using Scala Futures, and returns only after the block has + * been stored in both places. + */ + def storeBlock(blockId: StreamBlockId, block: ReceivedBlock): ReceivedBlockStoreResult = { + + // Serialize the block so that it can be inserted into both + val serializedBlock = block match { + case ArrayBufferBlock(arrayBuffer) => + blockManager.dataSerialize(blockId, arrayBuffer.iterator) + case IteratorBlock(iterator) => + blockManager.dataSerialize(blockId, iterator) + case ByteBufferBlock(byteBuffer) => + byteBuffer + case _ => + throw new Exception(s"Could not push $blockId to block manager, unexpected block type") + } + + // Store the block in block manager + val storeInBlockManagerFuture = Future { + val putResult = + blockManager.putBytes(blockId, serializedBlock, storageLevel, tellMaster = true) + if (!putResult.map { _._1 }.contains(blockId)) { + throw new SparkException( + s"Could not store $blockId to block manager with storage level $storageLevel") + } + } + + // Store the block in write ahead log + val storeInWriteAheadLogFuture = Future { + logManager.writeToLog(serializedBlock) + } + + // Combine the futures, wait for both to complete, and return the write ahead log segment + val combinedFuture = for { + _ <- storeInBlockManagerFuture + fileSegment <- storeInWriteAheadLogFuture + } yield fileSegment + val segment = Await.result(combinedFuture, blockStoreTimeout) + WriteAheadLogBasedStoreResult(blockId, segment) + } + + def cleanupOldBlock(threshTime: Long) { + logManager.cleanupOldLogs(threshTime) + } + + def stop() { + logManager.stop() + } +} + +private[streaming] object WriteAheadLogBasedBlockHandler { + def checkpointDirToLogDir(checkpointDir: String, streamId: Int): String = { + new Path(checkpointDir, new Path("receivedData", streamId.toString)).toString + } +} diff --git a/streaming/src/main/scala/org/apache/spark/streaming/receiver/ReceiverSupervisorImpl.scala b/streaming/src/main/scala/org/apache/spark/streaming/receiver/ReceiverSupervisorImpl.scala index 53a3e6200e..5360412330 100644 --- a/streaming/src/main/scala/org/apache/spark/streaming/receiver/ReceiverSupervisorImpl.scala +++ b/streaming/src/main/scala/org/apache/spark/streaming/receiver/ReceiverSupervisorImpl.scala @@ -25,16 +25,13 @@ import scala.concurrent.Await import akka.actor.{Actor, Props} import akka.pattern.ask - import com.google.common.base.Throwables - -import org.apache.spark.{Logging, SparkEnv} -import org.apache.spark.streaming.scheduler._ -import org.apache.spark.util.{Utils, AkkaUtils} +import org.apache.hadoop.conf.Configuration +import org.apache.spark.{Logging, SparkEnv, SparkException} import org.apache.spark.storage.StreamBlockId -import org.apache.spark.streaming.scheduler.DeregisterReceiver -import org.apache.spark.streaming.scheduler.AddBlock -import org.apache.spark.streaming.scheduler.RegisterReceiver +import org.apache.spark.streaming.scheduler._ +import org.apache.spark.streaming.util.WriteAheadLogFileSegment +import org.apache.spark.util.{AkkaUtils, Utils} /** * Concrete implementation of [[org.apache.spark.streaming.receiver.ReceiverSupervisor]] @@ -44,12 +41,26 @@ import org.apache.spark.streaming.scheduler.RegisterReceiver */ private[streaming] class ReceiverSupervisorImpl( receiver: Receiver[_], - env: SparkEnv + env: SparkEnv, + hadoopConf: Configuration, + checkpointDirOption: Option[String] ) extends ReceiverSupervisor(receiver, env.conf) with Logging { - private val blockManager = env.blockManager + private val receivedBlockHandler: ReceivedBlockHandler = { + if (env.conf.getBoolean("spark.streaming.receiver.writeAheadLog.enable", false)) { + if (checkpointDirOption.isEmpty) { + throw new SparkException( + "Cannot enable receiver write-ahead log without checkpoint directory set. " + + "Please use streamingContext.checkpoint() to set the checkpoint directory. " + + "See documentation for more details.") + } + new WriteAheadLogBasedBlockHandler(env.blockManager, receiver.streamId, + receiver.storageLevel, env.conf, hadoopConf, checkpointDirOption.get) + } else { + new BlockManagerBasedBlockHandler(env.blockManager, receiver.storageLevel) + } + } - private val storageLevel = receiver.storageLevel /** Remote Akka actor for the ReceiverTracker */ private val trackerActor = { @@ -105,47 +116,50 @@ private[streaming] class ReceiverSupervisorImpl( /** Store an ArrayBuffer of received data as a data block into Spark's memory. */ def pushArrayBuffer( arrayBuffer: ArrayBuffer[_], - optionalMetadata: Option[Any], - optionalBlockId: Option[StreamBlockId] + metadataOption: Option[Any], + blockIdOption: Option[StreamBlockId] ) { - val blockId = optionalBlockId.getOrElse(nextBlockId) - val time = System.currentTimeMillis - blockManager.putArray(blockId, arrayBuffer.toArray[Any], storageLevel, tellMaster = true) - logDebug("Pushed block " + blockId + " in " + (System.currentTimeMillis - time) + " ms") - reportPushedBlock(blockId, arrayBuffer.size, optionalMetadata) + pushAndReportBlock(ArrayBufferBlock(arrayBuffer), metadataOption, blockIdOption) } /** Store a iterator of received data as a data block into Spark's memory. */ def pushIterator( iterator: Iterator[_], - optionalMetadata: Option[Any], - optionalBlockId: Option[StreamBlockId] + metadataOption: Option[Any], + blockIdOption: Option[StreamBlockId] ) { - val blockId = optionalBlockId.getOrElse(nextBlockId) - val time = System.currentTimeMillis - blockManager.putIterator(blockId, iterator, storageLevel, tellMaster = true) - logDebug("Pushed block " + blockId + " in " + (System.currentTimeMillis - time) + " ms") - reportPushedBlock(blockId, -1, optionalMetadata) + pushAndReportBlock(IteratorBlock(iterator), metadataOption, blockIdOption) } /** Store the bytes of received data as a data block into Spark's memory. */ def pushBytes( bytes: ByteBuffer, - optionalMetadata: Option[Any], - optionalBlockId: Option[StreamBlockId] + metadataOption: Option[Any], + blockIdOption: Option[StreamBlockId] ) { - val blockId = optionalBlockId.getOrElse(nextBlockId) - val time = System.currentTimeMillis - blockManager.putBytes(blockId, bytes, storageLevel, tellMaster = true) - logDebug("Pushed block " + blockId + " in " + (System.currentTimeMillis - time) + " ms") - reportPushedBlock(blockId, -1, optionalMetadata) + pushAndReportBlock(ByteBufferBlock(bytes), metadataOption, blockIdOption) } - /** Report pushed block */ - def reportPushedBlock(blockId: StreamBlockId, numRecords: Long, optionalMetadata: Option[Any]) { - val blockInfo = ReceivedBlockInfo(streamId, blockId, numRecords, optionalMetadata.orNull) - trackerActor ! AddBlock(blockInfo) - logDebug("Reported block " + blockId) + /** Store block and report it to driver */ + def pushAndReportBlock( + receivedBlock: ReceivedBlock, + metadataOption: Option[Any], + blockIdOption: Option[StreamBlockId] + ) { + val blockId = blockIdOption.getOrElse(nextBlockId) + val numRecords = receivedBlock match { + case ArrayBufferBlock(arrayBuffer) => arrayBuffer.size + case _ => -1 + } + + val time = System.currentTimeMillis + val blockStoreResult = receivedBlockHandler.storeBlock(blockId, receivedBlock) + logDebug(s"Pushed block $blockId in ${(System.currentTimeMillis - time)} ms") + + val blockInfo = ReceivedBlockInfo(streamId, numRecords, blockStoreResult) + val future = trackerActor.ask(AddBlock(blockInfo))(askTimeout) + Await.result(future, askTimeout) + logDebug(s"Reported block $blockId") } /** Report error to the receiver tracker */ diff --git a/streaming/src/main/scala/org/apache/spark/streaming/scheduler/BatchInfo.scala b/streaming/src/main/scala/org/apache/spark/streaming/scheduler/BatchInfo.scala index a68aecb881..92dc113f39 100644 --- a/streaming/src/main/scala/org/apache/spark/streaming/scheduler/BatchInfo.scala +++ b/streaming/src/main/scala/org/apache/spark/streaming/scheduler/BatchInfo.scala @@ -17,8 +17,8 @@ package org.apache.spark.streaming.scheduler -import org.apache.spark.streaming.Time import org.apache.spark.annotation.DeveloperApi +import org.apache.spark.streaming.Time /** * :: DeveloperApi :: diff --git a/streaming/src/main/scala/org/apache/spark/streaming/scheduler/JobSet.scala b/streaming/src/main/scala/org/apache/spark/streaming/scheduler/JobSet.scala index a69d743621..8c15a75b1b 100644 --- a/streaming/src/main/scala/org/apache/spark/streaming/scheduler/JobSet.scala +++ b/streaming/src/main/scala/org/apache/spark/streaming/scheduler/JobSet.scala @@ -17,7 +17,8 @@ package org.apache.spark.streaming.scheduler -import scala.collection.mutable.{ArrayBuffer, HashSet} +import scala.collection.mutable.HashSet + import org.apache.spark.streaming.Time /** Class representing a set of Jobs diff --git a/streaming/src/main/scala/org/apache/spark/streaming/scheduler/ReceivedBlockInfo.scala b/streaming/src/main/scala/org/apache/spark/streaming/scheduler/ReceivedBlockInfo.scala new file mode 100644 index 0000000000..94beb590f5 --- /dev/null +++ b/streaming/src/main/scala/org/apache/spark/streaming/scheduler/ReceivedBlockInfo.scala @@ -0,0 +1,28 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.spark.streaming.scheduler + +import org.apache.spark.streaming.receiver.ReceivedBlockStoreResult + +/** Information about blocks received by the receiver */ +private[streaming] case class ReceivedBlockInfo( + streamId: Int, + numRecords: Long, + blockStoreResult: ReceivedBlockStoreResult + ) + diff --git a/streaming/src/main/scala/org/apache/spark/streaming/scheduler/ReceiverTracker.scala b/streaming/src/main/scala/org/apache/spark/streaming/scheduler/ReceiverTracker.scala index 7149dbc12a..d696563bce 100644 --- a/streaming/src/main/scala/org/apache/spark/streaming/scheduler/ReceiverTracker.scala +++ b/streaming/src/main/scala/org/apache/spark/streaming/scheduler/ReceiverTracker.scala @@ -21,21 +21,12 @@ import scala.collection.mutable.{HashMap, SynchronizedMap, SynchronizedQueue} import scala.language.existentials import akka.actor._ -import org.apache.spark.{Logging, SparkEnv, SparkException} +import org.apache.spark.{SerializableWritable, Logging, SparkEnv, SparkException} import org.apache.spark.SparkContext._ -import org.apache.spark.storage.StreamBlockId import org.apache.spark.streaming.{StreamingContext, Time} import org.apache.spark.streaming.receiver.{Receiver, ReceiverSupervisorImpl, StopReceiver} import org.apache.spark.util.AkkaUtils -/** Information about blocks received by the receiver */ -private[streaming] case class ReceivedBlockInfo( - streamId: Int, - blockId: StreamBlockId, - numRecords: Long, - metadata: Any - ) - /** * Messages used by the NetworkReceiver and the ReceiverTracker to communicate * with each other. @@ -153,7 +144,7 @@ class ReceiverTracker(ssc: StreamingContext) extends Logging { def addBlocks(receivedBlockInfo: ReceivedBlockInfo) { getReceivedBlockInfoQueue(receivedBlockInfo.streamId) += receivedBlockInfo logDebug("Stream " + receivedBlockInfo.streamId + " received new blocks: " + - receivedBlockInfo.blockId) + receivedBlockInfo.blockStoreResult.blockId) } /** Report error sent by a receiver */ @@ -188,6 +179,7 @@ class ReceiverTracker(ssc: StreamingContext) extends Logging { sender ! true case AddBlock(receivedBlockInfo) => addBlocks(receivedBlockInfo) + sender ! true case ReportError(streamId, message, error) => reportError(streamId, message, error) case DeregisterReceiver(streamId, message, error) => @@ -252,6 +244,9 @@ class ReceiverTracker(ssc: StreamingContext) extends Logging { ssc.sc.makeRDD(receivers, receivers.size) } + val checkpointDirOption = Option(ssc.checkpointDir) + val serializableHadoopConf = new SerializableWritable(ssc.sparkContext.hadoopConfiguration) + // Function to start the receiver on the worker node val startReceiver = (iterator: Iterator[Receiver[_]]) => { if (!iterator.hasNext) { @@ -259,9 +254,10 @@ class ReceiverTracker(ssc: StreamingContext) extends Logging { "Could not start receiver as object not found.") } val receiver = iterator.next() - val executor = new ReceiverSupervisorImpl(receiver, SparkEnv.get) - executor.start() - executor.awaitTermination() + val supervisor = new ReceiverSupervisorImpl( + receiver, SparkEnv.get, serializableHadoopConf.value, checkpointDirOption) + supervisor.start() + supervisor.awaitTermination() } // Run the dummy Spark job to ensure that all slaves have registered. // This avoids all the receivers to be scheduled on the same node. diff --git a/streaming/src/main/scala/org/apache/spark/streaming/util/WriteAheadLogRandomReader.scala b/streaming/src/main/scala/org/apache/spark/streaming/util/WriteAheadLogRandomReader.scala index 92bad7a882..003989092a 100644 --- a/streaming/src/main/scala/org/apache/spark/streaming/util/WriteAheadLogRandomReader.scala +++ b/streaming/src/main/scala/org/apache/spark/streaming/util/WriteAheadLogRandomReader.scala @@ -52,4 +52,3 @@ private[streaming] class WriteAheadLogRandomReader(path: String, conf: Configura HdfsUtils.checkState(!closed, "Stream is closed. Create a new Reader to read from the file.") } } - diff --git a/streaming/src/test/scala/org/apache/spark/streaming/ReceivedBlockHandlerSuite.scala b/streaming/src/test/scala/org/apache/spark/streaming/ReceivedBlockHandlerSuite.scala new file mode 100644 index 0000000000..ad1a6f01b3 --- /dev/null +++ b/streaming/src/test/scala/org/apache/spark/streaming/ReceivedBlockHandlerSuite.scala @@ -0,0 +1,258 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.spark.streaming + +import java.io.File +import java.nio.ByteBuffer + +import scala.collection.mutable.ArrayBuffer +import scala.concurrent.duration._ +import scala.language.postfixOps + +import akka.actor.{ActorSystem, Props} +import com.google.common.io.Files +import org.apache.commons.io.FileUtils +import org.apache.hadoop.conf.Configuration +import org.scalatest.{BeforeAndAfter, FunSuite, Matchers} +import org.scalatest.concurrent.Eventually._ + +import org.apache.spark._ +import org.apache.spark.network.nio.NioBlockTransferService +import org.apache.spark.scheduler.LiveListenerBus +import org.apache.spark.serializer.KryoSerializer +import org.apache.spark.shuffle.hash.HashShuffleManager +import org.apache.spark.storage._ +import org.apache.spark.streaming.receiver._ +import org.apache.spark.streaming.util._ +import org.apache.spark.util.AkkaUtils +import WriteAheadLogBasedBlockHandler._ +import WriteAheadLogSuite._ + +class ReceivedBlockHandlerSuite extends FunSuite with BeforeAndAfter with Matchers with Logging { + + val conf = new SparkConf().set("spark.streaming.receiver.writeAheadLog.rollingInterval", "1") + val hadoopConf = new Configuration() + val storageLevel = StorageLevel.MEMORY_ONLY_SER + val streamId = 1 + val securityMgr = new SecurityManager(conf) + val mapOutputTracker = new MapOutputTrackerMaster(conf) + val shuffleManager = new HashShuffleManager(conf) + val serializer = new KryoSerializer(conf) + val manualClock = new ManualClock + val blockManagerSize = 10000000 + + var actorSystem: ActorSystem = null + var blockManagerMaster: BlockManagerMaster = null + var blockManager: BlockManager = null + var tempDirectory: File = null + + before { + val (actorSystem, boundPort) = AkkaUtils.createActorSystem( + "test", "localhost", 0, conf = conf, securityManager = securityMgr) + this.actorSystem = actorSystem + conf.set("spark.driver.port", boundPort.toString) + + blockManagerMaster = new BlockManagerMaster( + actorSystem.actorOf(Props(new BlockManagerMasterActor(true, conf, new LiveListenerBus))), + conf, true) + + blockManager = new BlockManager("bm", actorSystem, blockManagerMaster, serializer, + blockManagerSize, conf, mapOutputTracker, shuffleManager, + new NioBlockTransferService(conf, securityMgr)) + + tempDirectory = Files.createTempDir() + manualClock.setTime(0) + } + + after { + if (blockManager != null) { + blockManager.stop() + blockManager = null + } + if (blockManagerMaster != null) { + blockManagerMaster.stop() + blockManagerMaster = null + } + actorSystem.shutdown() + actorSystem.awaitTermination() + actorSystem = null + + if (tempDirectory != null && tempDirectory.exists()) { + FileUtils.deleteDirectory(tempDirectory) + tempDirectory = null + } + } + + test("BlockManagerBasedBlockHandler - store blocks") { + withBlockManagerBasedBlockHandler { handler => + testBlockStoring(handler) { case (data, blockIds, storeResults) => + // Verify the data in block manager is correct + val storedData = blockIds.flatMap { blockId => + blockManager.getLocal(blockId).map { _.data.map {_.toString}.toList }.getOrElse(List.empty) + }.toList + storedData shouldEqual data + + // Verify that the store results are instances of BlockManagerBasedStoreResult + assert( + storeResults.forall { _.isInstanceOf[BlockManagerBasedStoreResult] }, + "Unexpected store result type" + ) + } + } + } + + test("BlockManagerBasedBlockHandler - handle errors in storing block") { + withBlockManagerBasedBlockHandler { handler => + testErrorHandling(handler) + } + } + + test("WriteAheadLogBasedBlockHandler - store blocks") { + withWriteAheadLogBasedBlockHandler { handler => + testBlockStoring(handler) { case (data, blockIds, storeResults) => + // Verify the data in block manager is correct + val storedData = blockIds.flatMap { blockId => + blockManager.getLocal(blockId).map { _.data.map {_.toString}.toList }.getOrElse(List.empty) + }.toList + storedData shouldEqual data + + // Verify that the store results are instances of WriteAheadLogBasedStoreResult + assert( + storeResults.forall { _.isInstanceOf[WriteAheadLogBasedStoreResult] }, + "Unexpected store result type" + ) + // Verify the data in write ahead log files is correct + val fileSegments = storeResults.map { _.asInstanceOf[WriteAheadLogBasedStoreResult].segment} + val loggedData = fileSegments.flatMap { segment => + val reader = new WriteAheadLogRandomReader(segment.path, hadoopConf) + val bytes = reader.read(segment) + reader.close() + blockManager.dataDeserialize(generateBlockId(), bytes).toList + } + loggedData shouldEqual data + } + } + } + + test("WriteAheadLogBasedBlockHandler - handle errors in storing block") { + withWriteAheadLogBasedBlockHandler { handler => + testErrorHandling(handler) + } + } + + test("WriteAheadLogBasedBlockHandler - cleanup old blocks") { + withWriteAheadLogBasedBlockHandler { handler => + val blocks = Seq.tabulate(10) { i => IteratorBlock(Iterator(1 to i)) } + storeBlocks(handler, blocks) + + val preCleanupLogFiles = getWriteAheadLogFiles() + preCleanupLogFiles.size should be > 1 + + // this depends on the number of blocks inserted using generateAndStoreData() + manualClock.currentTime() shouldEqual 5000L + + val cleanupThreshTime = 3000L + handler.cleanupOldBlock(cleanupThreshTime) + eventually(timeout(10000 millis), interval(10 millis)) { + getWriteAheadLogFiles().size should be < preCleanupLogFiles.size + } + } + } + + /** + * Test storing of data using different forms of ReceivedBlocks and verify that they succeeded + * using the given verification function + */ + private def testBlockStoring(receivedBlockHandler: ReceivedBlockHandler) + (verifyFunc: (Seq[String], Seq[StreamBlockId], Seq[ReceivedBlockStoreResult]) => Unit) { + val data = Seq.tabulate(100) { _.toString } + + def storeAndVerify(blocks: Seq[ReceivedBlock]) { + blocks should not be empty + val (blockIds, storeResults) = storeBlocks(receivedBlockHandler, blocks) + withClue(s"Testing with ${blocks.head.getClass.getSimpleName}s:") { + // Verify returns store results have correct block ids + (storeResults.map { _.blockId }) shouldEqual blockIds + + // Call handler-specific verification function + verifyFunc(data, blockIds, storeResults) + } + } + + def dataToByteBuffer(b: Seq[String]) = blockManager.dataSerialize(generateBlockId, b.iterator) + + val blocks = data.grouped(10).toSeq + + storeAndVerify(blocks.map { b => IteratorBlock(b.toIterator) }) + storeAndVerify(blocks.map { b => ArrayBufferBlock(new ArrayBuffer ++= b) }) + storeAndVerify(blocks.map { b => ByteBufferBlock(dataToByteBuffer(b)) }) + } + + /** Test error handling when blocks that cannot be stored */ + private def testErrorHandling(receivedBlockHandler: ReceivedBlockHandler) { + // Handle error in iterator (e.g. divide-by-zero error) + intercept[Exception] { + val iterator = (10 to (-10, -1)).toIterator.map { _ / 0 } + receivedBlockHandler.storeBlock(StreamBlockId(1, 1), IteratorBlock(iterator)) + } + + // Handler error in block manager storing (e.g. too big block) + intercept[SparkException] { + val byteBuffer = ByteBuffer.wrap(new Array[Byte](blockManagerSize + 1)) + receivedBlockHandler.storeBlock(StreamBlockId(1, 1), ByteBufferBlock(byteBuffer)) + } + } + + /** Instantiate a BlockManagerBasedBlockHandler and run a code with it */ + private def withBlockManagerBasedBlockHandler(body: BlockManagerBasedBlockHandler => Unit) { + body(new BlockManagerBasedBlockHandler(blockManager, storageLevel)) + } + + /** Instantiate a WriteAheadLogBasedBlockHandler and run a code with it */ + private def withWriteAheadLogBasedBlockHandler(body: WriteAheadLogBasedBlockHandler => Unit) { + val receivedBlockHandler = new WriteAheadLogBasedBlockHandler(blockManager, 1, + storageLevel, conf, hadoopConf, tempDirectory.toString, manualClock) + try { + body(receivedBlockHandler) + } finally { + receivedBlockHandler.stop() + } + } + + /** Store blocks using a handler */ + private def storeBlocks( + receivedBlockHandler: ReceivedBlockHandler, + blocks: Seq[ReceivedBlock] + ): (Seq[StreamBlockId], Seq[ReceivedBlockStoreResult]) = { + val blockIds = Seq.fill(blocks.size)(generateBlockId()) + val storeResults = blocks.zip(blockIds).map { + case (block, id) => + manualClock.addToTime(500) // log rolling interval set to 1000 ms through SparkConf + logDebug("Inserting block " + id) + receivedBlockHandler.storeBlock(id, block) + }.toList + logDebug("Done inserting") + (blockIds, storeResults) + } + + private def getWriteAheadLogFiles(): Seq[String] = { + getLogFilesInDirectory(checkpointDirToLogDir(tempDirectory.toString, streamId)) + } + + private def generateBlockId(): StreamBlockId = StreamBlockId(streamId, scala.util.Random.nextLong) +} diff --git a/streaming/src/test/scala/org/apache/spark/streaming/util/WriteAheadLogSuite.scala b/streaming/src/test/scala/org/apache/spark/streaming/util/WriteAheadLogSuite.scala index 5eba93c208..1956a4f1db 100644 --- a/streaming/src/test/scala/org/apache/spark/streaming/util/WriteAheadLogSuite.scala +++ b/streaming/src/test/scala/org/apache/spark/streaming/util/WriteAheadLogSuite.scala @@ -58,7 +58,7 @@ class WriteAheadLogSuite extends FunSuite with BeforeAndAfter { test("WriteAheadLogWriter - writing data") { val dataToWrite = generateRandomData() val segments = writeDataUsingWriter(testFile, dataToWrite) - val writtenData = readDataManually(testFile, segments) + val writtenData = readDataManually(segments) assert(writtenData === dataToWrite) } @@ -67,7 +67,7 @@ class WriteAheadLogSuite extends FunSuite with BeforeAndAfter { val writer = new WriteAheadLogWriter(testFile, hadoopConf) dataToWrite.foreach { data => val segment = writer.write(stringToByteBuffer(data)) - val dataRead = readDataManually(testFile, Seq(segment)).head + val dataRead = readDataManually(Seq(segment)).head assert(data === dataRead) } writer.close() @@ -281,14 +281,20 @@ object WriteAheadLogSuite { } /** Read data from a segments of a log file directly and return the list of byte buffers.*/ - def readDataManually(file: String, segments: Seq[WriteAheadLogFileSegment]): Seq[String] = { - val reader = HdfsUtils.getInputStream(file, hadoopConf) - segments.map { x => - reader.seek(x.offset) - val data = new Array[Byte](x.length) - reader.readInt() - reader.readFully(data) - Utils.deserialize[String](data) + def readDataManually(segments: Seq[WriteAheadLogFileSegment]): Seq[String] = { + segments.map { segment => + val reader = HdfsUtils.getInputStream(segment.path, hadoopConf) + try { + reader.seek(segment.offset) + val bytes = new Array[Byte](segment.length) + reader.readInt() + reader.readFully(bytes) + val data = Utils.deserialize[String](bytes) + reader.close() + data + } finally { + reader.close() + } } } @@ -335,9 +341,11 @@ object WriteAheadLogSuite { val fileSystem = HdfsUtils.getFileSystemForPath(logDirectoryPath, hadoopConf) if (fileSystem.exists(logDirectoryPath) && fileSystem.getFileStatus(logDirectoryPath).isDir) { - fileSystem.listStatus(logDirectoryPath).map { - _.getPath.toString.stripPrefix("file:") - }.sorted + fileSystem.listStatus(logDirectoryPath).map { _.getPath() }.sortBy { + _.getName().split("-")(1).toLong + }.map { + _.toString.stripPrefix("file:") + } } else { Seq.empty }