diff --git a/extras/kinesis-asl/src/main/scala/org/apache/spark/streaming/kinesis/KinesisBackedBlockRDD.scala b/extras/kinesis-asl/src/main/scala/org/apache/spark/streaming/kinesis/KinesisBackedBlockRDD.scala index 8f144a4d97..a003ddf325 100644 --- a/extras/kinesis-asl/src/main/scala/org/apache/spark/streaming/kinesis/KinesisBackedBlockRDD.scala +++ b/extras/kinesis-asl/src/main/scala/org/apache/spark/streaming/kinesis/KinesisBackedBlockRDD.scala @@ -37,16 +37,18 @@ case class SequenceNumberRange( /** Class representing an array of Kinesis sequence number ranges */ private[kinesis] -case class SequenceNumberRanges(ranges: Array[SequenceNumberRange]) { +case class SequenceNumberRanges(ranges: Seq[SequenceNumberRange]) { def isEmpty(): Boolean = ranges.isEmpty + def nonEmpty(): Boolean = ranges.nonEmpty + override def toString(): String = ranges.mkString("SequenceNumberRanges(", ", ", ")") } private[kinesis] object SequenceNumberRanges { def apply(range: SequenceNumberRange): SequenceNumberRanges = { - new SequenceNumberRanges(Array(range)) + new SequenceNumberRanges(Seq(range)) } } @@ -66,14 +68,14 @@ class KinesisBackedBlockRDDPartition( */ private[kinesis] class KinesisBackedBlockRDD( - sc: SparkContext, - regionId: String, - endpointUrl: String, + @transient sc: SparkContext, + val regionName: String, + val endpointUrl: String, @transient blockIds: Array[BlockId], - @transient arrayOfseqNumberRanges: Array[SequenceNumberRanges], + @transient val arrayOfseqNumberRanges: Array[SequenceNumberRanges], @transient isBlockIdValid: Array[Boolean] = Array.empty, - retryTimeoutMs: Int = 10000, - awsCredentialsOption: Option[SerializableAWSCredentials] = None + val retryTimeoutMs: Int = 10000, + val awsCredentialsOption: Option[SerializableAWSCredentials] = None ) extends BlockRDD[Array[Byte]](sc, blockIds) { require(blockIds.length == arrayOfseqNumberRanges.length, @@ -104,7 +106,7 @@ class KinesisBackedBlockRDD( } partition.seqNumberRanges.ranges.iterator.flatMap { range => new KinesisSequenceRangeIterator( - credenentials, endpointUrl, regionId, range, retryTimeoutMs) + credenentials, endpointUrl, regionName, range, retryTimeoutMs) } } if (partition.isBlockIdValid) { diff --git a/extras/kinesis-asl/src/main/scala/org/apache/spark/streaming/kinesis/KinesisInputDStream.scala b/extras/kinesis-asl/src/main/scala/org/apache/spark/streaming/kinesis/KinesisInputDStream.scala new file mode 100644 index 0000000000..2e4204dcb6 --- /dev/null +++ b/extras/kinesis-asl/src/main/scala/org/apache/spark/streaming/kinesis/KinesisInputDStream.scala @@ -0,0 +1,71 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.spark.streaming.kinesis + +import com.amazonaws.services.kinesis.clientlibrary.lib.worker.InitialPositionInStream + +import org.apache.spark.rdd.RDD +import org.apache.spark.storage.{BlockId, StorageLevel} +import org.apache.spark.streaming.dstream.ReceiverInputDStream +import org.apache.spark.streaming.receiver.Receiver +import org.apache.spark.streaming.scheduler.ReceivedBlockInfo +import org.apache.spark.streaming.{Duration, StreamingContext, Time} + +private[kinesis] class KinesisInputDStream( + @transient _ssc: StreamingContext, + streamName: String, + endpointUrl: String, + regionName: String, + initialPositionInStream: InitialPositionInStream, + checkpointAppName: String, + checkpointInterval: Duration, + storageLevel: StorageLevel, + awsCredentialsOption: Option[SerializableAWSCredentials] + ) extends ReceiverInputDStream[Array[Byte]](_ssc) { + + private[streaming] + override def createBlockRDD(time: Time, blockInfos: Seq[ReceivedBlockInfo]): RDD[Array[Byte]] = { + + // This returns true even for when blockInfos is empty + val allBlocksHaveRanges = blockInfos.map { _.metadataOption }.forall(_.nonEmpty) + + if (allBlocksHaveRanges) { + // Create a KinesisBackedBlockRDD, even when there are no blocks + val blockIds = blockInfos.map { _.blockId.asInstanceOf[BlockId] }.toArray + val seqNumRanges = blockInfos.map { + _.metadataOption.get.asInstanceOf[SequenceNumberRanges] }.toArray + val isBlockIdValid = blockInfos.map { _.isBlockIdValid() }.toArray + logDebug(s"Creating KinesisBackedBlockRDD for $time with ${seqNumRanges.length} " + + s"seq number ranges: ${seqNumRanges.mkString(", ")} ") + new KinesisBackedBlockRDD( + context.sc, regionName, endpointUrl, blockIds, seqNumRanges, + isBlockIdValid = isBlockIdValid, + retryTimeoutMs = ssc.graph.batchDuration.milliseconds.toInt, + awsCredentialsOption = awsCredentialsOption) + } else { + logWarning("Kinesis sequence number information was not present with some block metadata," + + " it may not be possible to recover from failures") + super.createBlockRDD(time, blockInfos) + } + } + + override def getReceiver(): Receiver[Array[Byte]] = { + new KinesisReceiver(streamName, endpointUrl, regionName, initialPositionInStream, + checkpointAppName, checkpointInterval, storageLevel, awsCredentialsOption) + } +} diff --git a/extras/kinesis-asl/src/main/scala/org/apache/spark/streaming/kinesis/KinesisReceiver.scala b/extras/kinesis-asl/src/main/scala/org/apache/spark/streaming/kinesis/KinesisReceiver.scala index 1a8a4cecc1..a4baeec084 100644 --- a/extras/kinesis-asl/src/main/scala/org/apache/spark/streaming/kinesis/KinesisReceiver.scala +++ b/extras/kinesis-asl/src/main/scala/org/apache/spark/streaming/kinesis/KinesisReceiver.scala @@ -18,17 +18,20 @@ package org.apache.spark.streaming.kinesis import java.util.UUID +import scala.collection.JavaConversions.asScalaIterator +import scala.collection.mutable import scala.util.control.NonFatal -import com.amazonaws.auth.{AWSCredentials, AWSCredentialsProvider, BasicAWSCredentials, DefaultAWSCredentialsProviderChain} +import com.amazonaws.auth.{AWSCredentials, AWSCredentialsProvider, DefaultAWSCredentialsProviderChain} import com.amazonaws.services.kinesis.clientlibrary.interfaces.{IRecordProcessor, IRecordProcessorFactory} import com.amazonaws.services.kinesis.clientlibrary.lib.worker.{InitialPositionInStream, KinesisClientLibConfiguration, Worker} +import com.amazonaws.services.kinesis.model.Record -import org.apache.spark.Logging -import org.apache.spark.storage.StorageLevel +import org.apache.spark.storage.{StorageLevel, StreamBlockId} import org.apache.spark.streaming.Duration -import org.apache.spark.streaming.receiver.Receiver +import org.apache.spark.streaming.receiver.{BlockGenerator, BlockGeneratorListener, Receiver} import org.apache.spark.util.Utils +import org.apache.spark.{Logging, SparkEnv} private[kinesis] @@ -42,38 +45,47 @@ case class SerializableAWSCredentials(accessKeyId: String, secretKey: String) * Custom AWS Kinesis-specific implementation of Spark Streaming's Receiver. * This implementation relies on the Kinesis Client Library (KCL) Worker as described here: * https://github.com/awslabs/amazon-kinesis-client - * This is a custom receiver used with StreamingContext.receiverStream(Receiver) as described here: - * http://spark.apache.org/docs/latest/streaming-custom-receivers.html - * Instances of this class will get shipped to the Spark Streaming Workers to run within a - * Spark Executor. * - * @param appName Kinesis application name. Kinesis Apps are mapped to Kinesis Streams - * by the Kinesis Client Library. If you change the App name or Stream name, - * the KCL will throw errors. This usually requires deleting the backing - * DynamoDB table with the same name this Kinesis application. + * The way this Receiver works is as follows: + * - The receiver starts a KCL Worker, which is essentially runs a threadpool of multiple + * KinesisRecordProcessor + * - Each KinesisRecordProcessor receives data from a Kinesis shard in batches. Each batch is + * inserted into a Block Generator, and the corresponding range of sequence numbers is recorded. + * - When the block generator defines a block, then the recorded sequence number ranges that were + * inserted into the block are recorded separately for being used later. + * - When the block is ready to be pushed, the block is pushed and the ranges are reported as + * metadata of the block. In addition, the ranges are used to find out the latest sequence + * number for each shard that can be checkpointed through the DynamoDB. + * - Periodically, each KinesisRecordProcessor checkpoints the latest successfully stored sequence + * number for it own shard. + * * @param streamName Kinesis stream name * @param endpointUrl Url of Kinesis service (e.g., https://kinesis.us-east-1.amazonaws.com) * @param regionName Region name used by the Kinesis Client Library for * DynamoDB (lease coordination and checkpointing) and CloudWatch (metrics) - * @param checkpointInterval Checkpoint interval for Kinesis checkpointing. - * See the Kinesis Spark Streaming documentation for more - * details on the different types of checkpoints. * @param initialPositionInStream In the absence of Kinesis checkpoint info, this is the * worker's initial starting position in the stream. * The values are either the beginning of the stream * per Kinesis' limit of 24 hours * (InitialPositionInStream.TRIM_HORIZON) or * the tip of the stream (InitialPositionInStream.LATEST). + * @param checkpointAppName Kinesis application name. Kinesis Apps are mapped to Kinesis Streams + * by the Kinesis Client Library. If you change the App name or Stream name, + * the KCL will throw errors. This usually requires deleting the backing + * DynamoDB table with the same name this Kinesis application. + * @param checkpointInterval Checkpoint interval for Kinesis checkpointing. + * See the Kinesis Spark Streaming documentation for more + * details on the different types of checkpoints. * @param storageLevel Storage level to use for storing the received objects * @param awsCredentialsOption Optional AWS credentials, used when user directly specifies * the credentials */ private[kinesis] class KinesisReceiver( - appName: String, - streamName: String, + val streamName: String, endpointUrl: String, regionName: String, initialPositionInStream: InitialPositionInStream, + checkpointAppName: String, checkpointInterval: Duration, storageLevel: StorageLevel, awsCredentialsOption: Option[SerializableAWSCredentials] @@ -90,7 +102,7 @@ private[kinesis] class KinesisReceiver( * workerId is used by the KCL should be based on the ip address of the actual Spark Worker * where this code runs (not the driver's IP address.) */ - private var workerId: String = null + @volatile private var workerId: String = null /** * Worker is the core client abstraction from the Kinesis Client Library (KCL). @@ -98,22 +110,40 @@ private[kinesis] class KinesisReceiver( * Each shard is assigned its own IRecordProcessor and the worker run multiple such * processors. */ - private var worker: Worker = null + @volatile private var worker: Worker = null + @volatile private var workerThread: Thread = null - /** Thread running the worker */ - private var workerThread: Thread = null + /** BlockGenerator used to generates blocks out of Kinesis data */ + @volatile private var blockGenerator: BlockGenerator = null + /** + * Sequence number ranges added to the current block being generated. + * Accessing and updating of this map is synchronized by locks in BlockGenerator. + */ + private val seqNumRangesInCurrentBlock = new mutable.ArrayBuffer[SequenceNumberRange] + + /** Sequence number ranges of data added to each generated block */ + private val blockIdToSeqNumRanges = new mutable.HashMap[StreamBlockId, SequenceNumberRanges] + with mutable.SynchronizedMap[StreamBlockId, SequenceNumberRanges] + + /** + * Latest sequence number ranges that have been stored successfully. + * This is used for checkpointing through KCL */ + private val shardIdToLatestStoredSeqNum = new mutable.HashMap[String, String] + with mutable.SynchronizedMap[String, String] /** * This is called when the KinesisReceiver starts and must be non-blocking. * The KCL creates and manages the receiving/processing thread pool through Worker.run(). */ override def onStart() { + blockGenerator = new BlockGenerator(new GeneratedBlockHandler, streamId, SparkEnv.get.conf) + workerId = Utils.localHostName() + ":" + UUID.randomUUID() // KCL config instance val awsCredProvider = resolveAWSCredentialsProvider() val kinesisClientLibConfiguration = - new KinesisClientLibConfiguration(appName, streamName, awsCredProvider, workerId) + new KinesisClientLibConfiguration(checkpointAppName, streamName, awsCredProvider, workerId) .withKinesisEndpoint(endpointUrl) .withInitialPositionInStream(initialPositionInStream) .withTaskBackoffTimeMillis(500) @@ -141,6 +171,10 @@ private[kinesis] class KinesisReceiver( } } } + + blockIdToSeqNumRanges.clear() + blockGenerator.start() + workerThread.setName(s"Kinesis Receiver ${streamId}") workerThread.setDaemon(true) workerThread.start() @@ -165,6 +199,81 @@ private[kinesis] class KinesisReceiver( workerId = null } + /** Add records of the given shard to the current block being generated */ + private[kinesis] def addRecords(shardId: String, records: java.util.List[Record]): Unit = { + if (records.size > 0) { + val dataIterator = records.iterator().map { record => + val byteBuffer = record.getData() + val byteArray = new Array[Byte](byteBuffer.remaining()) + byteBuffer.get(byteArray) + byteArray + } + val metadata = SequenceNumberRange(streamName, shardId, + records.get(0).getSequenceNumber(), records.get(records.size() - 1).getSequenceNumber()) + blockGenerator.addMultipleDataWithCallback(dataIterator, metadata) + + } + } + + /** Get the latest sequence number for the given shard that can be checkpointed through KCL */ + private[kinesis] def getLatestSeqNumToCheckpoint(shardId: String): Option[String] = { + shardIdToLatestStoredSeqNum.get(shardId) + } + + /** + * Remember the range of sequence numbers that was added to the currently active block. + * Internally, this is synchronized with `finalizeRangesForCurrentBlock()`. + */ + private def rememberAddedRange(range: SequenceNumberRange): Unit = { + seqNumRangesInCurrentBlock += range + } + + /** + * Finalize the ranges added to the block that was active and prepare the ranges buffer + * for next block. Internally, this is synchronized with `rememberAddedRange()`. + */ + private def finalizeRangesForCurrentBlock(blockId: StreamBlockId): Unit = { + blockIdToSeqNumRanges(blockId) = SequenceNumberRanges(seqNumRangesInCurrentBlock.toArray) + seqNumRangesInCurrentBlock.clear() + logDebug(s"Generated block $blockId has $blockIdToSeqNumRanges") + } + + /** Store the block along with its associated ranges */ + private def storeBlockWithRanges( + blockId: StreamBlockId, arrayBuffer: mutable.ArrayBuffer[Array[Byte]]): Unit = { + val rangesToReportOption = blockIdToSeqNumRanges.remove(blockId) + if (rangesToReportOption.isEmpty) { + stop("Error while storing block into Spark, could not find sequence number ranges " + + s"for block $blockId") + return + } + + val rangesToReport = rangesToReportOption.get + var attempt = 0 + var stored = false + var throwable: Throwable = null + while (!stored && attempt <= 3) { + try { + store(arrayBuffer, rangesToReport) + stored = true + } catch { + case NonFatal(th) => + attempt += 1 + throwable = th + } + } + if (!stored) { + stop("Error while storing block into Spark", throwable) + } + + // Update the latest sequence number that have been successfully stored for each shard + // Note that we are doing this sequentially because the array of sequence number ranges + // is assumed to be + rangesToReport.ranges.foreach { range => + shardIdToLatestStoredSeqNum(range.shardId) = range.toSeqNumber + } + } + /** * If AWS credential is provided, return a AWSCredentialProvider returning that credential. * Otherwise, return the DefaultAWSCredentialsProviderChain. @@ -182,4 +291,46 @@ private[kinesis] class KinesisReceiver( new DefaultAWSCredentialsProviderChain() } } + + + /** + * Class to handle blocks generated by this receiver's block generator. Specifically, in + * the context of the Kinesis Receiver, this handler does the following. + * + * - When an array of records is added to the current active block in the block generator, + * this handler keeps track of the corresponding sequence number range. + * - When the currently active block is ready to sealed (not more records), this handler + * keep track of the list of ranges added into this block in another H + */ + private class GeneratedBlockHandler extends BlockGeneratorListener { + + /** + * Callback method called after a data item is added into the BlockGenerator. + * The data addition, block generation, and calls to onAddData and onGenerateBlock + * are all synchronized through the same lock. + */ + def onAddData(data: Any, metadata: Any): Unit = { + rememberAddedRange(metadata.asInstanceOf[SequenceNumberRange]) + } + + /** + * Callback method called after a block has been generated. + * The data addition, block generation, and calls to onAddData and onGenerateBlock + * are all synchronized through the same lock. + */ + def onGenerateBlock(blockId: StreamBlockId): Unit = { + finalizeRangesForCurrentBlock(blockId) + } + + /** Callback method called when a block is ready to be pushed / stored. */ + def onPushBlock(blockId: StreamBlockId, arrayBuffer: mutable.ArrayBuffer[_]): Unit = { + storeBlockWithRanges(blockId, + arrayBuffer.asInstanceOf[mutable.ArrayBuffer[Array[Byte]]]) + } + + /** Callback called in case of any error in internal of the BlockGenerator */ + def onError(message: String, throwable: Throwable): Unit = { + reportError(message, throwable) + } + } } diff --git a/extras/kinesis-asl/src/main/scala/org/apache/spark/streaming/kinesis/KinesisRecordProcessor.scala b/extras/kinesis-asl/src/main/scala/org/apache/spark/streaming/kinesis/KinesisRecordProcessor.scala index fe9e3a0c79..b240512332 100644 --- a/extras/kinesis-asl/src/main/scala/org/apache/spark/streaming/kinesis/KinesisRecordProcessor.scala +++ b/extras/kinesis-asl/src/main/scala/org/apache/spark/streaming/kinesis/KinesisRecordProcessor.scala @@ -18,20 +18,16 @@ package org.apache.spark.streaming.kinesis import java.util.List -import scala.collection.JavaConversions.asScalaBuffer import scala.util.Random +import scala.util.control.NonFatal -import org.apache.spark.Logging - -import com.amazonaws.services.kinesis.clientlibrary.exceptions.InvalidStateException -import com.amazonaws.services.kinesis.clientlibrary.exceptions.KinesisClientLibDependencyException -import com.amazonaws.services.kinesis.clientlibrary.exceptions.ShutdownException -import com.amazonaws.services.kinesis.clientlibrary.exceptions.ThrottlingException -import com.amazonaws.services.kinesis.clientlibrary.interfaces.IRecordProcessor -import com.amazonaws.services.kinesis.clientlibrary.interfaces.IRecordProcessorCheckpointer +import com.amazonaws.services.kinesis.clientlibrary.exceptions.{InvalidStateException, KinesisClientLibDependencyException, ShutdownException, ThrottlingException} +import com.amazonaws.services.kinesis.clientlibrary.interfaces.{IRecordProcessor, IRecordProcessorCheckpointer} import com.amazonaws.services.kinesis.clientlibrary.types.ShutdownReason import com.amazonaws.services.kinesis.model.Record +import org.apache.spark.Logging + /** * Kinesis-specific implementation of the Kinesis Client Library (KCL) IRecordProcessor. * This implementation operates on the Array[Byte] from the KinesisReceiver. @@ -51,6 +47,7 @@ private[kinesis] class KinesisRecordProcessor( checkpointState: KinesisCheckpointState) extends IRecordProcessor with Logging { // shardId to be populated during initialize() + @volatile private var shardId: String = _ /** @@ -75,47 +72,38 @@ private[kinesis] class KinesisRecordProcessor( override def processRecords(batch: List[Record], checkpointer: IRecordProcessorCheckpointer) { if (!receiver.isStopped()) { try { - /* - * Notes: - * 1) If we try to store the raw ByteBuffer from record.getData(), the Spark Streaming - * Receiver.store(ByteBuffer) attempts to deserialize the ByteBuffer using the - * internally-configured Spark serializer (kryo, etc). - * 2) This is not desirable, so we instead store a raw Array[Byte] and decouple - * ourselves from Spark's internal serialization strategy. - * 3) For performance, the BlockGenerator is asynchronously queuing elements within its - * memory before creating blocks. This prevents the small block scenario, but requires - * that you register callbacks to know when a block has been generated and stored - * (WAL is sufficient for storage) before can checkpoint back to the source. - */ - batch.foreach(record => receiver.store(record.getData().array())) - - logDebug(s"Stored: Worker $workerId stored ${batch.size} records for shardId $shardId") + receiver.addRecords(shardId, batch) + logDebug(s"Stored: Worker $workerId stored ${batch.size} records for shardId $shardId") /* - * Checkpoint the sequence number of the last record successfully processed/stored - * in the batch. - * In this implementation, we're checkpointing after the given checkpointIntervalMillis. - * Note that this logic requires that processRecords() be called AND that it's time to - * checkpoint. I point this out because there is no background thread running the - * checkpointer. Checkpointing is tested and trigger only when a new batch comes in. - * If the worker is shutdown cleanly, checkpoint will happen (see shutdown() below). - * However, if the worker dies unexpectedly, a checkpoint may not happen. - * This could lead to records being processed more than once. + * + * Checkpoint the sequence number of the last record successfully stored. + * Note that in this current implementation, the checkpointing occurs only when after + * checkpointIntervalMillis from the last checkpoint, AND when there is new record + * to process. This leads to the checkpointing lagging behind what records have been + * stored by the receiver. Ofcourse, this can lead records processed more than once, + * under failures and restarts. + * + * TODO: Instead of checkpointing here, run a separate timer task to perform + * checkpointing so that it checkpoints in a timely manner independent of whether + * new records are available or not. */ if (checkpointState.shouldCheckpoint()) { - /* Perform the checkpoint */ - KinesisRecordProcessor.retryRandom(checkpointer.checkpoint(), 4, 100) + receiver.getLatestSeqNumToCheckpoint(shardId).foreach { latestSeqNum => + /* Perform the checkpoint */ + KinesisRecordProcessor.retryRandom(checkpointer.checkpoint(latestSeqNum), 4, 100) - /* Update the next checkpoint time */ - checkpointState.advanceCheckpoint() + /* Update the next checkpoint time */ + checkpointState.advanceCheckpoint() - logDebug(s"Checkpoint: WorkerId $workerId completed checkpoint of ${batch.size}" + + logDebug(s"Checkpoint: WorkerId $workerId completed checkpoint of ${batch.size}" + s" records for shardId $shardId") - logDebug(s"Checkpoint: Next checkpoint is at " + + logDebug(s"Checkpoint: Next checkpoint is at " + s" ${checkpointState.checkpointClock.getTimeMillis()} for shardId $shardId") + } } } catch { - case e: Throwable => { + case NonFatal(e) => { /* * If there is a failure within the batch, the batch will not be checkpointed. * This will potentially cause records since the last checkpoint to be processed @@ -130,7 +118,7 @@ private[kinesis] class KinesisRecordProcessor( } } else { /* RecordProcessor has been stopped. */ - logInfo(s"Stopped: The Spark KinesisReceiver has stopped for workerId $workerId" + + logInfo(s"Stopped: KinesisReceiver has stopped for workerId $workerId" + s" and shardId $shardId. No more records will be processed.") } } @@ -154,7 +142,11 @@ private[kinesis] class KinesisRecordProcessor( * It's now OK to read from the new shards that resulted from a resharding event. */ case ShutdownReason.TERMINATE => - KinesisRecordProcessor.retryRandom(checkpointer.checkpoint(), 4, 100) + val latestSeqNumToCheckpointOption = receiver.getLatestSeqNumToCheckpoint(shardId) + if (latestSeqNumToCheckpointOption.nonEmpty) { + KinesisRecordProcessor.retryRandom( + checkpointer.checkpoint(latestSeqNumToCheckpointOption.get), 4, 100) + } /* * ZOMBIE Use Case. NoOp. diff --git a/extras/kinesis-asl/src/main/scala/org/apache/spark/streaming/kinesis/KinesisTestUtils.scala b/extras/kinesis-asl/src/main/scala/org/apache/spark/streaming/kinesis/KinesisTestUtils.scala index 255ac27f79..711aade182 100644 --- a/extras/kinesis-asl/src/main/scala/org/apache/spark/streaming/kinesis/KinesisTestUtils.scala +++ b/extras/kinesis-asl/src/main/scala/org/apache/spark/streaming/kinesis/KinesisTestUtils.scala @@ -36,22 +36,10 @@ import org.apache.spark.Logging /** * Shared utility methods for performing Kinesis tests that actually transfer data */ -private class KinesisTestUtils(val endpointUrl: String, _regionName: String) extends Logging { - - def this() { - this("https://kinesis.us-west-2.amazonaws.com", "") - } - - def this(endpointUrl: String) { - this(endpointUrl, "") - } - - val regionName = if (_regionName.length == 0) { - RegionUtils.getRegionByEndpoint(endpointUrl).getName() - } else { - RegionUtils.getRegion(_regionName).getName() - } +private class KinesisTestUtils extends Logging { + val endpointUrl = KinesisTestUtils.endpointUrl + val regionName = RegionUtils.getRegionByEndpoint(endpointUrl).getName() val streamShardCount = 2 private val createStreamTimeoutSeconds = 300 @@ -81,11 +69,11 @@ private class KinesisTestUtils(val endpointUrl: String, _regionName: String) ext } def createStream(): Unit = { - logInfo("Creating stream") require(!streamCreated, "Stream already created") _streamName = findNonExistentStreamName() // Create a stream. The number of shards determines the provisioned throughput. + logInfo(s"Creating stream ${_streamName}") val createStreamRequest = new CreateStreamRequest() createStreamRequest.setStreamName(_streamName) createStreamRequest.setShardCount(2) @@ -94,7 +82,7 @@ private class KinesisTestUtils(val endpointUrl: String, _regionName: String) ext // The stream is now being created. Wait for it to become active. waitForStreamToBeActive(_streamName) streamCreated = true - logInfo("Created stream") + logInfo(s"Created stream ${_streamName}") } /** @@ -191,9 +179,38 @@ private class KinesisTestUtils(val endpointUrl: String, _regionName: String) ext private[kinesis] object KinesisTestUtils { - val envVarName = "ENABLE_KINESIS_TESTS" + val envVarNameForEnablingTests = "ENABLE_KINESIS_TESTS" + val endVarNameForEndpoint = "KINESIS_TEST_ENDPOINT_URL" + val defaultEndpointUrl = "https://kinesis.us-west-2.amazonaws.com" - val shouldRunTests = sys.env.get(envVarName) == Some("1") + lazy val shouldRunTests = { + val isEnvSet = sys.env.get(envVarNameForEnablingTests) == Some("1") + if (isEnvSet) { + // scalastyle:off println + // Print this so that they are easily visible on the console and not hidden in the log4j logs. + println( + s""" + |Kinesis tests that actually send data has been enabled by setting the environment + |variable $envVarNameForEnablingTests to 1. This will create Kinesis Streams and + |DynamoDB tables in AWS. Please be aware that this may incur some AWS costs. + |By default, the tests use the endpoint URL $defaultEndpointUrl to create Kinesis streams. + |To change this endpoint URL to a different region, you can set the environment variable + |$endVarNameForEndpoint to the desired endpoint URL + |(e.g. $endVarNameForEndpoint="https://kinesis.us-west-2.amazonaws.com"). + """.stripMargin) + // scalastyle:on println + } + isEnvSet + } + + lazy val endpointUrl = { + val url = sys.env.getOrElse(endVarNameForEndpoint, defaultEndpointUrl) + // scalastyle:off println + // Print this so that they are easily visible on the console and not hidden in the log4j logs. + println(s"Using endpoint URL $url for creating Kinesis streams for tests.") + // scalastyle:on println + url + } def isAWSCredentialsPresent: Boolean = { Try { new DefaultAWSCredentialsProviderChain().getCredentials() }.isSuccess @@ -205,7 +222,13 @@ private[kinesis] object KinesisTestUtils { Try { new DefaultAWSCredentialsProviderChain().getCredentials() } match { case Success(cred) => cred case Failure(e) => - throw new Exception("Kinesis tests enabled, but could get not AWS credentials") + throw new Exception( + s""" + |Kinesis tests enabled using environment variable $envVarNameForEnablingTests + |but could not find AWS credentials. Please follow instructions in AWS documentation + |to set the credentials in your system such that the DefaultAWSCredentialsProviderChain + |can find the credentials. + """.stripMargin) } } } diff --git a/extras/kinesis-asl/src/main/scala/org/apache/spark/streaming/kinesis/KinesisUtils.scala b/extras/kinesis-asl/src/main/scala/org/apache/spark/streaming/kinesis/KinesisUtils.scala index 7dab17eba8..c799fadf2d 100644 --- a/extras/kinesis-asl/src/main/scala/org/apache/spark/streaming/kinesis/KinesisUtils.scala +++ b/extras/kinesis-asl/src/main/scala/org/apache/spark/streaming/kinesis/KinesisUtils.scala @@ -65,9 +65,8 @@ object KinesisUtils { ): ReceiverInputDStream[Array[Byte]] = { // Setting scope to override receiver stream's scope of "receiver stream" ssc.withNamedScope("kinesis stream") { - ssc.receiverStream( - new KinesisReceiver(kinesisAppName, streamName, endpointUrl, validateRegion(regionName), - initialPositionInStream, checkpointInterval, storageLevel, None)) + new KinesisInputDStream(ssc, streamName, endpointUrl, validateRegion(regionName), + initialPositionInStream, kinesisAppName, checkpointInterval, storageLevel, None) } } @@ -112,10 +111,11 @@ object KinesisUtils { awsAccessKeyId: String, awsSecretKey: String ): ReceiverInputDStream[Array[Byte]] = { - ssc.receiverStream( - new KinesisReceiver(kinesisAppName, streamName, endpointUrl, validateRegion(regionName), - initialPositionInStream, checkpointInterval, storageLevel, - Some(SerializableAWSCredentials(awsAccessKeyId, awsSecretKey)))) + ssc.withNamedScope("kinesis stream") { + new KinesisInputDStream(ssc, streamName, endpointUrl, validateRegion(regionName), + initialPositionInStream, kinesisAppName, checkpointInterval, storageLevel, + Some(SerializableAWSCredentials(awsAccessKeyId, awsSecretKey))) + } } /** @@ -155,9 +155,10 @@ object KinesisUtils { initialPositionInStream: InitialPositionInStream, storageLevel: StorageLevel ): ReceiverInputDStream[Array[Byte]] = { - ssc.receiverStream( - new KinesisReceiver(ssc.sc.appName, streamName, endpointUrl, getRegionByEndpoint(endpointUrl), - initialPositionInStream, checkpointInterval, storageLevel, None)) + ssc.withNamedScope("kinesis stream") { + new KinesisInputDStream(ssc, streamName, endpointUrl, getRegionByEndpoint(endpointUrl), + initialPositionInStream, ssc.sc.appName, checkpointInterval, storageLevel, None) + } } /** diff --git a/extras/kinesis-asl/src/test/scala/org/apache/spark/streaming/kinesis/KinesisBackedBlockRDDSuite.scala b/extras/kinesis-asl/src/test/scala/org/apache/spark/streaming/kinesis/KinesisBackedBlockRDDSuite.scala index e81fb11e59..a89e5627e0 100644 --- a/extras/kinesis-asl/src/test/scala/org/apache/spark/streaming/kinesis/KinesisBackedBlockRDDSuite.scala +++ b/extras/kinesis-asl/src/test/scala/org/apache/spark/streaming/kinesis/KinesisBackedBlockRDDSuite.scala @@ -24,8 +24,6 @@ import org.apache.spark.{SparkConf, SparkContext, SparkException} class KinesisBackedBlockRDDSuite extends KinesisFunSuite with BeforeAndAfterAll { - private val regionId = "us-east-1" - private val endpointUrl = "https://kinesis.us-east-1.amazonaws.com" private val testData = 1 to 8 private var testUtils: KinesisTestUtils = null @@ -42,7 +40,7 @@ class KinesisBackedBlockRDDSuite extends KinesisFunSuite with BeforeAndAfterAll override def beforeAll(): Unit = { runIfTestsEnabled("Prepare KinesisTestUtils") { - testUtils = new KinesisTestUtils(endpointUrl) + testUtils = new KinesisTestUtils() testUtils.createStream() shardIdToDataAndSeqNumbers = testUtils.pushData(testData) @@ -75,21 +73,21 @@ class KinesisBackedBlockRDDSuite extends KinesisFunSuite with BeforeAndAfterAll testIfEnabled("Basic reading from Kinesis") { // Verify all data using multiple ranges in a single RDD partition - val receivedData1 = new KinesisBackedBlockRDD(sc, regionId, endpointUrl, + val receivedData1 = new KinesisBackedBlockRDD(sc, testUtils.regionName, testUtils.endpointUrl, fakeBlockIds(1), Array(SequenceNumberRanges(allRanges.toArray)) ).map { bytes => new String(bytes).toInt }.collect() assert(receivedData1.toSet === testData.toSet) // Verify all data using one range in each of the multiple RDD partitions - val receivedData2 = new KinesisBackedBlockRDD(sc, regionId, endpointUrl, + val receivedData2 = new KinesisBackedBlockRDD(sc, testUtils.regionName, testUtils.endpointUrl, fakeBlockIds(allRanges.size), allRanges.map { range => SequenceNumberRanges(Array(range)) }.toArray ).map { bytes => new String(bytes).toInt }.collect() assert(receivedData2.toSet === testData.toSet) // Verify ordering within each partition - val receivedData3 = new KinesisBackedBlockRDD(sc, regionId, endpointUrl, + val receivedData3 = new KinesisBackedBlockRDD(sc, testUtils.regionName, testUtils.endpointUrl, fakeBlockIds(allRanges.size), allRanges.map { range => SequenceNumberRanges(Array(range)) }.toArray ).map { bytes => new String(bytes).toInt }.collectPartitions() @@ -211,7 +209,8 @@ class KinesisBackedBlockRDDSuite extends KinesisFunSuite with BeforeAndAfterAll }, "Incorrect configuration of RDD, unexpected ranges set" ) - val rdd = new KinesisBackedBlockRDD(sc, regionId, endpointUrl, blockIds, ranges) + val rdd = new KinesisBackedBlockRDD( + sc, testUtils.regionName, testUtils.endpointUrl, blockIds, ranges) val collectedData = rdd.map { bytes => new String(bytes).toInt }.collect() @@ -224,8 +223,9 @@ class KinesisBackedBlockRDDSuite extends KinesisFunSuite with BeforeAndAfterAll if (testIsBlockValid) { require(numPartitionsInBM === numPartitions, "All partitions must be in BlockManager") require(numPartitionsInKinesis === 0, "No partitions must be in Kinesis") - val rdd2 = new KinesisBackedBlockRDD(sc, regionId, endpointUrl, blockIds.toArray, - ranges, isBlockIdValid = Array.fill(blockIds.length)(false)) + val rdd2 = new KinesisBackedBlockRDD( + sc, testUtils.regionName, testUtils.endpointUrl, blockIds.toArray, ranges, + isBlockIdValid = Array.fill(blockIds.length)(false)) intercept[SparkException] { rdd2.collect() } diff --git a/extras/kinesis-asl/src/test/scala/org/apache/spark/streaming/kinesis/KinesisFunSuite.scala b/extras/kinesis-asl/src/test/scala/org/apache/spark/streaming/kinesis/KinesisFunSuite.scala index 8373138785..ee428f31d6 100644 --- a/extras/kinesis-asl/src/test/scala/org/apache/spark/streaming/kinesis/KinesisFunSuite.scala +++ b/extras/kinesis-asl/src/test/scala/org/apache/spark/streaming/kinesis/KinesisFunSuite.scala @@ -31,7 +31,7 @@ trait KinesisFunSuite extends SparkFunSuite { if (shouldRunTests) { test(testName)(testBody) } else { - ignore(s"$testName [enable by setting env var $envVarName=1]")(testBody) + ignore(s"$testName [enable by setting env var $envVarNameForEnablingTests=1]")(testBody) } } @@ -40,7 +40,7 @@ trait KinesisFunSuite extends SparkFunSuite { if (shouldRunTests) { body } else { - ignore(s"$message [enable by setting env var $envVarName=1]")() + ignore(s"$message [enable by setting env var $envVarNameForEnablingTests=1]")() } } } diff --git a/extras/kinesis-asl/src/test/scala/org/apache/spark/streaming/kinesis/KinesisReceiverSuite.scala b/extras/kinesis-asl/src/test/scala/org/apache/spark/streaming/kinesis/KinesisReceiverSuite.scala index 98f2c7c4f1..ceb135e065 100644 --- a/extras/kinesis-asl/src/test/scala/org/apache/spark/streaming/kinesis/KinesisReceiverSuite.scala +++ b/extras/kinesis-asl/src/test/scala/org/apache/spark/streaming/kinesis/KinesisReceiverSuite.scala @@ -22,15 +22,14 @@ import scala.collection.JavaConversions.seqAsJavaList import com.amazonaws.services.kinesis.clientlibrary.exceptions.{InvalidStateException, KinesisClientLibDependencyException, ShutdownException, ThrottlingException} import com.amazonaws.services.kinesis.clientlibrary.interfaces.IRecordProcessorCheckpointer -import com.amazonaws.services.kinesis.clientlibrary.lib.worker.InitialPositionInStream import com.amazonaws.services.kinesis.clientlibrary.types.ShutdownReason import com.amazonaws.services.kinesis.model.Record +import org.mockito.Matchers._ import org.mockito.Mockito._ -import org.scalatest.{BeforeAndAfter, Matchers} import org.scalatest.mock.MockitoSugar +import org.scalatest.{BeforeAndAfter, Matchers} -import org.apache.spark.storage.StorageLevel -import org.apache.spark.streaming.{Milliseconds, Seconds, StreamingContext, TestSuiteBase} +import org.apache.spark.streaming.{Milliseconds, TestSuiteBase} import org.apache.spark.util.{Clock, ManualClock, Utils} /** @@ -44,6 +43,8 @@ class KinesisReceiverSuite extends TestSuiteBase with Matchers with BeforeAndAft val endpoint = "endpoint-url" val workerId = "dummyWorkerId" val shardId = "dummyShardId" + val seqNum = "dummySeqNum" + val someSeqNum = Some(seqNum) val record1 = new Record() record1.setData(ByteBuffer.wrap("Spark In Action".getBytes())) @@ -80,16 +81,18 @@ class KinesisReceiverSuite extends TestSuiteBase with Matchers with BeforeAndAft test("process records including store and checkpoint") { when(receiverMock.isStopped()).thenReturn(false) + when(receiverMock.getLatestSeqNumToCheckpoint(shardId)).thenReturn(someSeqNum) when(checkpointStateMock.shouldCheckpoint()).thenReturn(true) val recordProcessor = new KinesisRecordProcessor(receiverMock, workerId, checkpointStateMock) + recordProcessor.initialize(shardId) recordProcessor.processRecords(batch, checkpointerMock) verify(receiverMock, times(1)).isStopped() - verify(receiverMock, times(1)).store(record1.getData().array()) - verify(receiverMock, times(1)).store(record2.getData().array()) + verify(receiverMock, times(1)).addRecords(shardId, batch) + verify(receiverMock, times(1)).getLatestSeqNumToCheckpoint(shardId) verify(checkpointStateMock, times(1)).shouldCheckpoint() - verify(checkpointerMock, times(1)).checkpoint() + verify(checkpointerMock, times(1)).checkpoint(anyString) verify(checkpointStateMock, times(1)).advanceCheckpoint() } @@ -100,19 +103,25 @@ class KinesisReceiverSuite extends TestSuiteBase with Matchers with BeforeAndAft recordProcessor.processRecords(batch, checkpointerMock) verify(receiverMock, times(1)).isStopped() + verify(receiverMock, never).addRecords(anyString, anyListOf(classOf[Record])) + verify(checkpointerMock, never).checkpoint(anyString) } test("shouldn't checkpoint when exception occurs during store") { when(receiverMock.isStopped()).thenReturn(false) - when(receiverMock.store(record1.getData().array())).thenThrow(new RuntimeException()) + when( + receiverMock.addRecords(anyString, anyListOf(classOf[Record])) + ).thenThrow(new RuntimeException()) intercept[RuntimeException] { val recordProcessor = new KinesisRecordProcessor(receiverMock, workerId, checkpointStateMock) + recordProcessor.initialize(shardId) recordProcessor.processRecords(batch, checkpointerMock) } verify(receiverMock, times(1)).isStopped() - verify(receiverMock, times(1)).store(record1.getData().array()) + verify(receiverMock, times(1)).addRecords(shardId, batch) + verify(checkpointerMock, never).checkpoint(anyString) } test("should set checkpoint time to currentTime + checkpoint interval upon instantiation") { @@ -158,19 +167,25 @@ class KinesisReceiverSuite extends TestSuiteBase with Matchers with BeforeAndAft } test("shutdown should checkpoint if the reason is TERMINATE") { - val recordProcessor = new KinesisRecordProcessor(receiverMock, workerId, checkpointStateMock) - val reason = ShutdownReason.TERMINATE - recordProcessor.shutdown(checkpointerMock, reason) + when(receiverMock.getLatestSeqNumToCheckpoint(shardId)).thenReturn(someSeqNum) - verify(checkpointerMock, times(1)).checkpoint() + val recordProcessor = new KinesisRecordProcessor(receiverMock, workerId, checkpointStateMock) + recordProcessor.initialize(shardId) + recordProcessor.shutdown(checkpointerMock, ShutdownReason.TERMINATE) + + verify(receiverMock, times(1)).getLatestSeqNumToCheckpoint(shardId) + verify(checkpointerMock, times(1)).checkpoint(anyString) } test("shutdown should not checkpoint if the reason is something other than TERMINATE") { + when(receiverMock.getLatestSeqNumToCheckpoint(shardId)).thenReturn(someSeqNum) + val recordProcessor = new KinesisRecordProcessor(receiverMock, workerId, checkpointStateMock) + recordProcessor.initialize(shardId) recordProcessor.shutdown(checkpointerMock, ShutdownReason.ZOMBIE) recordProcessor.shutdown(checkpointerMock, null) - verify(checkpointerMock, never()).checkpoint() + verify(checkpointerMock, never).checkpoint(anyString) } test("retry success on first attempt") { diff --git a/extras/kinesis-asl/src/test/scala/org/apache/spark/streaming/kinesis/KinesisStreamSuite.scala b/extras/kinesis-asl/src/test/scala/org/apache/spark/streaming/kinesis/KinesisStreamSuite.scala index b88c9c6478..1177dc7581 100644 --- a/extras/kinesis-asl/src/test/scala/org/apache/spark/streaming/kinesis/KinesisStreamSuite.scala +++ b/extras/kinesis-asl/src/test/scala/org/apache/spark/streaming/kinesis/KinesisStreamSuite.scala @@ -22,34 +22,67 @@ import scala.concurrent.duration._ import scala.language.postfixOps import scala.util.Random +import com.amazonaws.regions.RegionUtils import com.amazonaws.services.kinesis.clientlibrary.lib.worker.InitialPositionInStream +import org.scalatest.Matchers._ import org.scalatest.concurrent.Eventually import org.scalatest.{BeforeAndAfter, BeforeAndAfterAll} -import org.apache.spark.storage.StorageLevel +import org.apache.spark.rdd.RDD +import org.apache.spark.storage.{StorageLevel, StreamBlockId} import org.apache.spark.streaming._ -import org.apache.spark.{SparkConf, SparkContext, SparkFunSuite} +import org.apache.spark.streaming.kinesis.KinesisTestUtils._ +import org.apache.spark.streaming.receiver.BlockManagerBasedStoreResult +import org.apache.spark.streaming.scheduler.ReceivedBlockInfo +import org.apache.spark.util.Utils +import org.apache.spark.{SparkConf, SparkContext} class KinesisStreamSuite extends KinesisFunSuite with Eventually with BeforeAndAfter with BeforeAndAfterAll { - // This is the name that KCL uses to save metadata to DynamoDB - private val kinesisAppName = s"KinesisStreamSuite-${math.abs(Random.nextLong())}" + // This is the name that KCL will use to save metadata to DynamoDB + private val appName = s"KinesisStreamSuite-${math.abs(Random.nextLong())}" + private val batchDuration = Seconds(1) - private var ssc: StreamingContext = _ - private var sc: SparkContext = _ + // Dummy parameters for API testing + private val dummyEndpointUrl = defaultEndpointUrl + private val dummyRegionName = RegionUtils.getRegionByEndpoint(dummyEndpointUrl).getName() + private val dummyAWSAccessKey = "dummyAccessKey" + private val dummyAWSSecretKey = "dummySecretKey" + + private var testUtils: KinesisTestUtils = null + private var ssc: StreamingContext = null + private var sc: SparkContext = null override def beforeAll(): Unit = { val conf = new SparkConf() .setMaster("local[4]") .setAppName("KinesisStreamSuite") // Setting Spark app name to Kinesis app name sc = new SparkContext(conf) + + runIfTestsEnabled("Prepare KinesisTestUtils") { + testUtils = new KinesisTestUtils() + testUtils.createStream() + } } override def afterAll(): Unit = { - sc.stop() - // Delete the Kinesis stream as well as the DynamoDB table generated by - // Kinesis Client Library when consuming the stream + if (ssc != null) { + ssc.stop() + } + if (sc != null) { + sc.stop() + } + if (testUtils != null) { + // Delete the Kinesis stream as well as the DynamoDB table generated by + // Kinesis Client Library when consuming the stream + testUtils.deleteStream() + testUtils.deleteDynamoDBTable(appName) + } + } + + before { + ssc = new StreamingContext(sc, batchDuration) } after { @@ -57,21 +90,75 @@ class KinesisStreamSuite extends KinesisFunSuite ssc.stop(stopSparkContext = false) ssc = null } + if (testUtils != null) { + testUtils.deleteDynamoDBTable(appName) + } } test("KinesisUtils API") { - ssc = new StreamingContext(sc, Seconds(1)) // Tests the API, does not actually test data receiving val kinesisStream1 = KinesisUtils.createStream(ssc, "mySparkStream", - "https://kinesis.us-west-2.amazonaws.com", Seconds(2), + dummyEndpointUrl, Seconds(2), InitialPositionInStream.LATEST, StorageLevel.MEMORY_AND_DISK_2) val kinesisStream2 = KinesisUtils.createStream(ssc, "myAppNam", "mySparkStream", - "https://kinesis.us-west-2.amazonaws.com", "us-west-2", + dummyEndpointUrl, dummyRegionName, InitialPositionInStream.LATEST, Seconds(2), StorageLevel.MEMORY_AND_DISK_2) val kinesisStream3 = KinesisUtils.createStream(ssc, "myAppNam", "mySparkStream", - "https://kinesis.us-west-2.amazonaws.com", "us-west-2", + dummyEndpointUrl, dummyRegionName, InitialPositionInStream.LATEST, Seconds(2), StorageLevel.MEMORY_AND_DISK_2, - "awsAccessKey", "awsSecretKey") + dummyAWSAccessKey, dummyAWSSecretKey) + } + + test("RDD generation") { + val inputStream = KinesisUtils.createStream(ssc, appName, "dummyStream", + dummyEndpointUrl, dummyRegionName, InitialPositionInStream.LATEST, Seconds(2), + StorageLevel.MEMORY_AND_DISK_2, dummyAWSAccessKey, dummyAWSSecretKey) + assert(inputStream.isInstanceOf[KinesisInputDStream]) + + val kinesisStream = inputStream.asInstanceOf[KinesisInputDStream] + val time = Time(1000) + + // Generate block info data for testing + val seqNumRanges1 = SequenceNumberRanges( + SequenceNumberRange("fakeStream", "fakeShardId", "xxx", "yyy")) + val blockId1 = StreamBlockId(kinesisStream.id, 123) + val blockInfo1 = ReceivedBlockInfo( + 0, None, Some(seqNumRanges1), new BlockManagerBasedStoreResult(blockId1, None)) + + val seqNumRanges2 = SequenceNumberRanges( + SequenceNumberRange("fakeStream", "fakeShardId", "aaa", "bbb")) + val blockId2 = StreamBlockId(kinesisStream.id, 345) + val blockInfo2 = ReceivedBlockInfo( + 0, None, Some(seqNumRanges2), new BlockManagerBasedStoreResult(blockId2, None)) + + // Verify that the generated KinesisBackedBlockRDD has the all the right information + val blockInfos = Seq(blockInfo1, blockInfo2) + val nonEmptyRDD = kinesisStream.createBlockRDD(time, blockInfos) + nonEmptyRDD shouldBe a [KinesisBackedBlockRDD] + val kinesisRDD = nonEmptyRDD.asInstanceOf[KinesisBackedBlockRDD] + assert(kinesisRDD.regionName === dummyRegionName) + assert(kinesisRDD.endpointUrl === dummyEndpointUrl) + assert(kinesisRDD.retryTimeoutMs === batchDuration.milliseconds) + assert(kinesisRDD.awsCredentialsOption === + Some(SerializableAWSCredentials(dummyAWSAccessKey, dummyAWSSecretKey))) + assert(nonEmptyRDD.partitions.size === blockInfos.size) + nonEmptyRDD.partitions.foreach { _ shouldBe a [KinesisBackedBlockRDDPartition] } + val partitions = nonEmptyRDD.partitions.map { + _.asInstanceOf[KinesisBackedBlockRDDPartition] }.toSeq + assert(partitions.map { _.seqNumberRanges } === Seq(seqNumRanges1, seqNumRanges2)) + assert(partitions.map { _.blockId } === Seq(blockId1, blockId2)) + assert(partitions.forall { _.isBlockIdValid === true }) + + // Verify that KinesisBackedBlockRDD is generated even when there are no blocks + val emptyRDD = kinesisStream.createBlockRDD(time, Seq.empty) + emptyRDD shouldBe a [KinesisBackedBlockRDD] + emptyRDD.partitions shouldBe empty + + // Verify that the KinesisBackedBlockRDD has isBlockValid = false when blocks are invalid + blockInfos.foreach { _.setBlockIdInvalid() } + kinesisStream.createBlockRDD(time, blockInfos).partitions.foreach { partition => + assert(partition.asInstanceOf[KinesisBackedBlockRDDPartition].isBlockIdValid === false) + } } @@ -84,32 +171,91 @@ class KinesisStreamSuite extends KinesisFunSuite * and you have to set the system environment variable RUN_KINESIS_TESTS=1 . */ testIfEnabled("basic operation") { - val kinesisTestUtils = new KinesisTestUtils() - try { - kinesisTestUtils.createStream() - ssc = new StreamingContext(sc, Seconds(1)) - val awsCredentials = KinesisTestUtils.getAWSCredentials() - val stream = KinesisUtils.createStream(ssc, kinesisAppName, kinesisTestUtils.streamName, - kinesisTestUtils.endpointUrl, kinesisTestUtils.regionName, InitialPositionInStream.LATEST, - Seconds(10), StorageLevel.MEMORY_ONLY, - awsCredentials.getAWSAccessKeyId, awsCredentials.getAWSSecretKey) + val awsCredentials = KinesisTestUtils.getAWSCredentials() + val stream = KinesisUtils.createStream(ssc, appName, testUtils.streamName, + testUtils.endpointUrl, testUtils.regionName, InitialPositionInStream.LATEST, + Seconds(10), StorageLevel.MEMORY_ONLY, + awsCredentials.getAWSAccessKeyId, awsCredentials.getAWSSecretKey) - val collected = new mutable.HashSet[Int] with mutable.SynchronizedSet[Int] - stream.map { bytes => new String(bytes).toInt }.foreachRDD { rdd => - collected ++= rdd.collect() - logInfo("Collected = " + rdd.collect().toSeq.mkString(", ")) - } - ssc.start() - - val testData = 1 to 10 - eventually(timeout(120 seconds), interval(10 second)) { - kinesisTestUtils.pushData(testData) - assert(collected === testData.toSet, "\nData received does not match data sent") - } - ssc.stop() - } finally { - kinesisTestUtils.deleteStream() - kinesisTestUtils.deleteDynamoDBTable(kinesisAppName) + val collected = new mutable.HashSet[Int] with mutable.SynchronizedSet[Int] + stream.map { bytes => new String(bytes).toInt }.foreachRDD { rdd => + collected ++= rdd.collect() + logInfo("Collected = " + rdd.collect().toSeq.mkString(", ")) } + ssc.start() + + val testData = 1 to 10 + eventually(timeout(120 seconds), interval(10 second)) { + testUtils.pushData(testData) + assert(collected === testData.toSet, "\nData received does not match data sent") + } + ssc.stop(stopSparkContext = false) } + + testIfEnabled("failure recovery") { + val sparkConf = new SparkConf().setMaster("local[4]").setAppName(this.getClass.getSimpleName) + val checkpointDir = Utils.createTempDir().getAbsolutePath + + ssc = new StreamingContext(sc, Milliseconds(1000)) + ssc.checkpoint(checkpointDir) + + val awsCredentials = KinesisTestUtils.getAWSCredentials() + val collectedData = new mutable.HashMap[Time, (Array[SequenceNumberRanges], Seq[Int])] + with mutable.SynchronizedMap[Time, (Array[SequenceNumberRanges], Seq[Int])] + + val kinesisStream = KinesisUtils.createStream(ssc, appName, testUtils.streamName, + testUtils.endpointUrl, testUtils.regionName, InitialPositionInStream.LATEST, + Seconds(10), StorageLevel.MEMORY_ONLY, + awsCredentials.getAWSAccessKeyId, awsCredentials.getAWSSecretKey) + + // Verify that the generated RDDs are KinesisBackedBlockRDDs, and collect the data in each batch + kinesisStream.foreachRDD((rdd: RDD[Array[Byte]], time: Time) => { + val kRdd = rdd.asInstanceOf[KinesisBackedBlockRDD] + val data = rdd.map { bytes => new String(bytes).toInt }.collect().toSeq + collectedData(time) = (kRdd.arrayOfseqNumberRanges, data) + }) + + ssc.remember(Minutes(60)) // remember all the batches so that they are all saved in checkpoint + ssc.start() + + def numBatchesWithData: Int = collectedData.count(_._2._2.nonEmpty) + + def isCheckpointPresent: Boolean = Checkpoint.getCheckpointFiles(checkpointDir).nonEmpty + + // Run until there are at least 10 batches with some data in them + // If this times out because numBatchesWithData is empty, then its likely that foreachRDD + // function failed with exceptions, and nothing got added to `collectedData` + eventually(timeout(2 minutes), interval(1 seconds)) { + testUtils.pushData(1 to 5) + assert(isCheckpointPresent && numBatchesWithData > 10) + } + ssc.stop(stopSparkContext = true) // stop the SparkContext so that the blocks are not reused + + // Restart the context from checkpoint and verify whether the + logInfo("Restarting from checkpoint") + ssc = new StreamingContext(checkpointDir) + ssc.start() + val recoveredKinesisStream = ssc.graph.getInputStreams().head + + // Verify that the recomputed RDDs are KinesisBackedBlockRDDs with the same sequence ranges + // and return the same data + val times = collectedData.keySet + times.foreach { time => + val (arrayOfSeqNumRanges, data) = collectedData(time) + val rdd = recoveredKinesisStream.getOrCompute(time).get.asInstanceOf[RDD[Array[Byte]]] + rdd shouldBe a [KinesisBackedBlockRDD] + + // Verify the recovered sequence ranges + val kRdd = rdd.asInstanceOf[KinesisBackedBlockRDD] + assert(kRdd.arrayOfseqNumberRanges.size === arrayOfSeqNumRanges.size) + arrayOfSeqNumRanges.zip(kRdd.arrayOfseqNumberRanges).foreach { case (expected, found) => + assert(expected.ranges.toSeq === found.ranges.toSeq) + } + + // Verify the recovered data + assert(rdd.map { bytes => new String(bytes).toInt }.collect().toSeq === data) + } + ssc.stop() + } + } diff --git a/streaming/src/main/scala/org/apache/spark/streaming/dstream/ReceiverInputDStream.scala b/streaming/src/main/scala/org/apache/spark/streaming/dstream/ReceiverInputDStream.scala index 670ef8d296..a15800917c 100644 --- a/streaming/src/main/scala/org/apache/spark/streaming/dstream/ReceiverInputDStream.scala +++ b/streaming/src/main/scala/org/apache/spark/streaming/dstream/ReceiverInputDStream.scala @@ -21,12 +21,12 @@ import scala.reflect.ClassTag import org.apache.spark.rdd.{BlockRDD, RDD} import org.apache.spark.storage.BlockId -import org.apache.spark.streaming.{StreamingContext, Time} import org.apache.spark.streaming.rdd.WriteAheadLogBackedBlockRDD import org.apache.spark.streaming.receiver.Receiver -import org.apache.spark.streaming.scheduler.{RateController, StreamInputInfo} import org.apache.spark.streaming.scheduler.rate.RateEstimator +import org.apache.spark.streaming.scheduler.{ReceivedBlockInfo, RateController, StreamInputInfo} import org.apache.spark.streaming.util.WriteAheadLogUtils +import org.apache.spark.streaming.{StreamingContext, Time} /** * Abstract class for defining any [[org.apache.spark.streaming.dstream.InputDStream]] @@ -79,50 +79,57 @@ abstract class ReceiverInputDStream[T: ClassTag](@transient ssc_ : StreamingCont // for this batch val receiverTracker = ssc.scheduler.receiverTracker val blockInfos = receiverTracker.getBlocksOfBatch(validTime).getOrElse(id, Seq.empty) - val blockIds = blockInfos.map { _.blockId.asInstanceOf[BlockId] }.toArray // Register the input blocks information into InputInfoTracker val inputInfo = StreamInputInfo(id, blockInfos.flatMap(_.numRecords).sum) ssc.scheduler.inputInfoTracker.reportInfo(validTime, inputInfo) - if (blockInfos.nonEmpty) { - // Are WAL record handles present with all the blocks - val areWALRecordHandlesPresent = blockInfos.forall { _.walRecordHandleOption.nonEmpty } - - if (areWALRecordHandlesPresent) { - // If all the blocks have WAL record handle, then create a WALBackedBlockRDD - val isBlockIdValid = blockInfos.map { _.isBlockIdValid() }.toArray - val walRecordHandles = blockInfos.map { _.walRecordHandleOption.get }.toArray - new WriteAheadLogBackedBlockRDD[T]( - ssc.sparkContext, blockIds, walRecordHandles, isBlockIdValid) - } else { - // Else, create a BlockRDD. However, if there are some blocks with WAL info but not - // others then that is unexpected and log a warning accordingly. - if (blockInfos.find(_.walRecordHandleOption.nonEmpty).nonEmpty) { - if (WriteAheadLogUtils.enableReceiverLog(ssc.conf)) { - logError("Some blocks do not have Write Ahead Log information; " + - "this is unexpected and data may not be recoverable after driver failures") - } else { - logWarning("Some blocks have Write Ahead Log information; this is unexpected") - } - } - new BlockRDD[T](ssc.sc, blockIds) - } - } else { - // If no block is ready now, creating WriteAheadLogBackedBlockRDD or BlockRDD - // according to the configuration - if (WriteAheadLogUtils.enableReceiverLog(ssc.conf)) { - new WriteAheadLogBackedBlockRDD[T]( - ssc.sparkContext, Array.empty, Array.empty, Array.empty) - } else { - new BlockRDD[T](ssc.sc, Array.empty) - } - } + // Create the BlockRDD + createBlockRDD(validTime, blockInfos) } } Some(blockRDD) } + private[streaming] def createBlockRDD(time: Time, blockInfos: Seq[ReceivedBlockInfo]): RDD[T] = { + + if (blockInfos.nonEmpty) { + val blockIds = blockInfos.map { _.blockId.asInstanceOf[BlockId] }.toArray + + // Are WAL record handles present with all the blocks + val areWALRecordHandlesPresent = blockInfos.forall { _.walRecordHandleOption.nonEmpty } + + if (areWALRecordHandlesPresent) { + // If all the blocks have WAL record handle, then create a WALBackedBlockRDD + val isBlockIdValid = blockInfos.map { _.isBlockIdValid() }.toArray + val walRecordHandles = blockInfos.map { _.walRecordHandleOption.get }.toArray + new WriteAheadLogBackedBlockRDD[T]( + ssc.sparkContext, blockIds, walRecordHandles, isBlockIdValid) + } else { + // Else, create a BlockRDD. However, if there are some blocks with WAL info but not + // others then that is unexpected and log a warning accordingly. + if (blockInfos.find(_.walRecordHandleOption.nonEmpty).nonEmpty) { + if (WriteAheadLogUtils.enableReceiverLog(ssc.conf)) { + logError("Some blocks do not have Write Ahead Log information; " + + "this is unexpected and data may not be recoverable after driver failures") + } else { + logWarning("Some blocks have Write Ahead Log information; this is unexpected") + } + } + new BlockRDD[T](ssc.sc, blockIds) + } + } else { + // If no block is ready now, creating WriteAheadLogBackedBlockRDD or BlockRDD + // according to the configuration + if (WriteAheadLogUtils.enableReceiverLog(ssc.conf)) { + new WriteAheadLogBackedBlockRDD[T]( + ssc.sparkContext, Array.empty, Array.empty, Array.empty) + } else { + new BlockRDD[T](ssc.sc, Array.empty) + } + } + } + /** * A RateController that sends the new rate to receivers, via the receiver tracker. */