[SPARK-9217] [STREAMING] Make the kinesis receiver reliable by recording sequence numbers

This PR is the second one in the larger issue of making the Kinesis integration reliable and provide WAL-free at-least once guarantee. It is based on the design doc - https://docs.google.com/document/d/1k0dl270EnK7uExrsCE7jYw7PYx0YC935uBcxn3p0f58/edit

In this PR, I have updated the Kinesis Receiver to do the following.
- Control the block generation, by creating its own BlockGenerator with own callback methods and using it to keep track of the ranges of sequence numbers that go into each block.
- More specifically, as the KinesisRecordProcessor provides small batches of records, the records are atomically inserted into the block (that is, either the whole batch is in the block, or not). Accordingly the sequence number range of the batch is recorded. Since there may be many batches added to a block, the receiver tracks all the range of sequence numbers that is added to a block.
- When the block is ready to be pushed, the block is pushed and the ranges are reported as metadata of the block. In addition, the ranges are used to find out the latest sequence number for each shard that can be checkpointed through the DynamoDB.
- Periodically, each KinesisRecordProcessor checkpoints the latest successfully stored sequence number for it own shard.
- The array of ranges in the block metadata is used to create KinesisBackedBlockRDDs. The ReceiverInputDStream has been slightly refactored to allow the creation of KinesisBackedBlockRDDs instead of the WALBackedBlockRDDs.

Things to be done
- [x] Add new test to verify that the sequence numbers are recovered.

Author: Tathagata Das <tathagata.das1565@gmail.com>

Closes #7825 from tdas/kinesis-receiver and squashes the following commits:

2159be9 [Tathagata Das] Fixed bug
569be83 [Tathagata Das] Fix scala style issue
bf31e22 [Tathagata Das] Added more documentation to make the kinesis test endpoint more configurable
3ad8361 [Tathagata Das] Merge remote-tracking branch 'apache-github/master' into kinesis-receiver
c693a63 [Tathagata Das] Removed unnecessary constructor params from KinesisTestUtils
e1f1d0a [Tathagata Das] Addressed PR comments
b9fa6bf [Tathagata Das] Fix serialization issues
f8b7680 [Tathagata Das] Updated doc
33fe43a [Tathagata Das] Added more tests
7997138 [Tathagata Das] Fix style errors
a806710 [Tathagata Das] Fixed unit test and use KinesisInputDStream
40a1709 [Tathagata Das] Fixed KinesisReceiverSuite tests
7e44df6 [Tathagata Das] Added documentation and fixed checkpointing
096383f [Tathagata Das] Added test, and addressed some of the comments.
84a7892 [Tathagata Das] fixed scala style issue
e19e37d [Tathagata Das] Added license
1cd7b66 [Tathagata Das] Updated kinesis receiver
This commit is contained in:
Tathagata Das 2015-08-05 00:20:26 -07:00
parent 781c8d71a0
commit c2a71f0714
11 changed files with 611 additions and 203 deletions

View file

@ -37,16 +37,18 @@ case class SequenceNumberRange(
/** Class representing an array of Kinesis sequence number ranges */
private[kinesis]
case class SequenceNumberRanges(ranges: Array[SequenceNumberRange]) {
case class SequenceNumberRanges(ranges: Seq[SequenceNumberRange]) {
def isEmpty(): Boolean = ranges.isEmpty
def nonEmpty(): Boolean = ranges.nonEmpty
override def toString(): String = ranges.mkString("SequenceNumberRanges(", ", ", ")")
}
private[kinesis]
object SequenceNumberRanges {
def apply(range: SequenceNumberRange): SequenceNumberRanges = {
new SequenceNumberRanges(Array(range))
new SequenceNumberRanges(Seq(range))
}
}
@ -66,14 +68,14 @@ class KinesisBackedBlockRDDPartition(
*/
private[kinesis]
class KinesisBackedBlockRDD(
sc: SparkContext,
regionId: String,
endpointUrl: String,
@transient sc: SparkContext,
val regionName: String,
val endpointUrl: String,
@transient blockIds: Array[BlockId],
@transient arrayOfseqNumberRanges: Array[SequenceNumberRanges],
@transient val arrayOfseqNumberRanges: Array[SequenceNumberRanges],
@transient isBlockIdValid: Array[Boolean] = Array.empty,
retryTimeoutMs: Int = 10000,
awsCredentialsOption: Option[SerializableAWSCredentials] = None
val retryTimeoutMs: Int = 10000,
val awsCredentialsOption: Option[SerializableAWSCredentials] = None
) extends BlockRDD[Array[Byte]](sc, blockIds) {
require(blockIds.length == arrayOfseqNumberRanges.length,
@ -104,7 +106,7 @@ class KinesisBackedBlockRDD(
}
partition.seqNumberRanges.ranges.iterator.flatMap { range =>
new KinesisSequenceRangeIterator(
credenentials, endpointUrl, regionId, range, retryTimeoutMs)
credenentials, endpointUrl, regionName, range, retryTimeoutMs)
}
}
if (partition.isBlockIdValid) {

View file

@ -0,0 +1,71 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.spark.streaming.kinesis
import com.amazonaws.services.kinesis.clientlibrary.lib.worker.InitialPositionInStream
import org.apache.spark.rdd.RDD
import org.apache.spark.storage.{BlockId, StorageLevel}
import org.apache.spark.streaming.dstream.ReceiverInputDStream
import org.apache.spark.streaming.receiver.Receiver
import org.apache.spark.streaming.scheduler.ReceivedBlockInfo
import org.apache.spark.streaming.{Duration, StreamingContext, Time}
private[kinesis] class KinesisInputDStream(
@transient _ssc: StreamingContext,
streamName: String,
endpointUrl: String,
regionName: String,
initialPositionInStream: InitialPositionInStream,
checkpointAppName: String,
checkpointInterval: Duration,
storageLevel: StorageLevel,
awsCredentialsOption: Option[SerializableAWSCredentials]
) extends ReceiverInputDStream[Array[Byte]](_ssc) {
private[streaming]
override def createBlockRDD(time: Time, blockInfos: Seq[ReceivedBlockInfo]): RDD[Array[Byte]] = {
// This returns true even for when blockInfos is empty
val allBlocksHaveRanges = blockInfos.map { _.metadataOption }.forall(_.nonEmpty)
if (allBlocksHaveRanges) {
// Create a KinesisBackedBlockRDD, even when there are no blocks
val blockIds = blockInfos.map { _.blockId.asInstanceOf[BlockId] }.toArray
val seqNumRanges = blockInfos.map {
_.metadataOption.get.asInstanceOf[SequenceNumberRanges] }.toArray
val isBlockIdValid = blockInfos.map { _.isBlockIdValid() }.toArray
logDebug(s"Creating KinesisBackedBlockRDD for $time with ${seqNumRanges.length} " +
s"seq number ranges: ${seqNumRanges.mkString(", ")} ")
new KinesisBackedBlockRDD(
context.sc, regionName, endpointUrl, blockIds, seqNumRanges,
isBlockIdValid = isBlockIdValid,
retryTimeoutMs = ssc.graph.batchDuration.milliseconds.toInt,
awsCredentialsOption = awsCredentialsOption)
} else {
logWarning("Kinesis sequence number information was not present with some block metadata," +
" it may not be possible to recover from failures")
super.createBlockRDD(time, blockInfos)
}
}
override def getReceiver(): Receiver[Array[Byte]] = {
new KinesisReceiver(streamName, endpointUrl, regionName, initialPositionInStream,
checkpointAppName, checkpointInterval, storageLevel, awsCredentialsOption)
}
}

View file

@ -18,17 +18,20 @@ package org.apache.spark.streaming.kinesis
import java.util.UUID
import scala.collection.JavaConversions.asScalaIterator
import scala.collection.mutable
import scala.util.control.NonFatal
import com.amazonaws.auth.{AWSCredentials, AWSCredentialsProvider, BasicAWSCredentials, DefaultAWSCredentialsProviderChain}
import com.amazonaws.auth.{AWSCredentials, AWSCredentialsProvider, DefaultAWSCredentialsProviderChain}
import com.amazonaws.services.kinesis.clientlibrary.interfaces.{IRecordProcessor, IRecordProcessorFactory}
import com.amazonaws.services.kinesis.clientlibrary.lib.worker.{InitialPositionInStream, KinesisClientLibConfiguration, Worker}
import com.amazonaws.services.kinesis.model.Record
import org.apache.spark.Logging
import org.apache.spark.storage.StorageLevel
import org.apache.spark.storage.{StorageLevel, StreamBlockId}
import org.apache.spark.streaming.Duration
import org.apache.spark.streaming.receiver.Receiver
import org.apache.spark.streaming.receiver.{BlockGenerator, BlockGeneratorListener, Receiver}
import org.apache.spark.util.Utils
import org.apache.spark.{Logging, SparkEnv}
private[kinesis]
@ -42,38 +45,47 @@ case class SerializableAWSCredentials(accessKeyId: String, secretKey: String)
* Custom AWS Kinesis-specific implementation of Spark Streaming's Receiver.
* This implementation relies on the Kinesis Client Library (KCL) Worker as described here:
* https://github.com/awslabs/amazon-kinesis-client
* This is a custom receiver used with StreamingContext.receiverStream(Receiver) as described here:
* http://spark.apache.org/docs/latest/streaming-custom-receivers.html
* Instances of this class will get shipped to the Spark Streaming Workers to run within a
* Spark Executor.
*
* @param appName Kinesis application name. Kinesis Apps are mapped to Kinesis Streams
* by the Kinesis Client Library. If you change the App name or Stream name,
* the KCL will throw errors. This usually requires deleting the backing
* DynamoDB table with the same name this Kinesis application.
* The way this Receiver works is as follows:
* - The receiver starts a KCL Worker, which is essentially runs a threadpool of multiple
* KinesisRecordProcessor
* - Each KinesisRecordProcessor receives data from a Kinesis shard in batches. Each batch is
* inserted into a Block Generator, and the corresponding range of sequence numbers is recorded.
* - When the block generator defines a block, then the recorded sequence number ranges that were
* inserted into the block are recorded separately for being used later.
* - When the block is ready to be pushed, the block is pushed and the ranges are reported as
* metadata of the block. In addition, the ranges are used to find out the latest sequence
* number for each shard that can be checkpointed through the DynamoDB.
* - Periodically, each KinesisRecordProcessor checkpoints the latest successfully stored sequence
* number for it own shard.
*
* @param streamName Kinesis stream name
* @param endpointUrl Url of Kinesis service (e.g., https://kinesis.us-east-1.amazonaws.com)
* @param regionName Region name used by the Kinesis Client Library for
* DynamoDB (lease coordination and checkpointing) and CloudWatch (metrics)
* @param checkpointInterval Checkpoint interval for Kinesis checkpointing.
* See the Kinesis Spark Streaming documentation for more
* details on the different types of checkpoints.
* @param initialPositionInStream In the absence of Kinesis checkpoint info, this is the
* worker's initial starting position in the stream.
* The values are either the beginning of the stream
* per Kinesis' limit of 24 hours
* (InitialPositionInStream.TRIM_HORIZON) or
* the tip of the stream (InitialPositionInStream.LATEST).
* @param checkpointAppName Kinesis application name. Kinesis Apps are mapped to Kinesis Streams
* by the Kinesis Client Library. If you change the App name or Stream name,
* the KCL will throw errors. This usually requires deleting the backing
* DynamoDB table with the same name this Kinesis application.
* @param checkpointInterval Checkpoint interval for Kinesis checkpointing.
* See the Kinesis Spark Streaming documentation for more
* details on the different types of checkpoints.
* @param storageLevel Storage level to use for storing the received objects
* @param awsCredentialsOption Optional AWS credentials, used when user directly specifies
* the credentials
*/
private[kinesis] class KinesisReceiver(
appName: String,
streamName: String,
val streamName: String,
endpointUrl: String,
regionName: String,
initialPositionInStream: InitialPositionInStream,
checkpointAppName: String,
checkpointInterval: Duration,
storageLevel: StorageLevel,
awsCredentialsOption: Option[SerializableAWSCredentials]
@ -90,7 +102,7 @@ private[kinesis] class KinesisReceiver(
* workerId is used by the KCL should be based on the ip address of the actual Spark Worker
* where this code runs (not the driver's IP address.)
*/
private var workerId: String = null
@volatile private var workerId: String = null
/**
* Worker is the core client abstraction from the Kinesis Client Library (KCL).
@ -98,22 +110,40 @@ private[kinesis] class KinesisReceiver(
* Each shard is assigned its own IRecordProcessor and the worker run multiple such
* processors.
*/
private var worker: Worker = null
@volatile private var worker: Worker = null
@volatile private var workerThread: Thread = null
/** Thread running the worker */
private var workerThread: Thread = null
/** BlockGenerator used to generates blocks out of Kinesis data */
@volatile private var blockGenerator: BlockGenerator = null
/**
* Sequence number ranges added to the current block being generated.
* Accessing and updating of this map is synchronized by locks in BlockGenerator.
*/
private val seqNumRangesInCurrentBlock = new mutable.ArrayBuffer[SequenceNumberRange]
/** Sequence number ranges of data added to each generated block */
private val blockIdToSeqNumRanges = new mutable.HashMap[StreamBlockId, SequenceNumberRanges]
with mutable.SynchronizedMap[StreamBlockId, SequenceNumberRanges]
/**
* Latest sequence number ranges that have been stored successfully.
* This is used for checkpointing through KCL */
private val shardIdToLatestStoredSeqNum = new mutable.HashMap[String, String]
with mutable.SynchronizedMap[String, String]
/**
* This is called when the KinesisReceiver starts and must be non-blocking.
* The KCL creates and manages the receiving/processing thread pool through Worker.run().
*/
override def onStart() {
blockGenerator = new BlockGenerator(new GeneratedBlockHandler, streamId, SparkEnv.get.conf)
workerId = Utils.localHostName() + ":" + UUID.randomUUID()
// KCL config instance
val awsCredProvider = resolveAWSCredentialsProvider()
val kinesisClientLibConfiguration =
new KinesisClientLibConfiguration(appName, streamName, awsCredProvider, workerId)
new KinesisClientLibConfiguration(checkpointAppName, streamName, awsCredProvider, workerId)
.withKinesisEndpoint(endpointUrl)
.withInitialPositionInStream(initialPositionInStream)
.withTaskBackoffTimeMillis(500)
@ -141,6 +171,10 @@ private[kinesis] class KinesisReceiver(
}
}
}
blockIdToSeqNumRanges.clear()
blockGenerator.start()
workerThread.setName(s"Kinesis Receiver ${streamId}")
workerThread.setDaemon(true)
workerThread.start()
@ -165,6 +199,81 @@ private[kinesis] class KinesisReceiver(
workerId = null
}
/** Add records of the given shard to the current block being generated */
private[kinesis] def addRecords(shardId: String, records: java.util.List[Record]): Unit = {
if (records.size > 0) {
val dataIterator = records.iterator().map { record =>
val byteBuffer = record.getData()
val byteArray = new Array[Byte](byteBuffer.remaining())
byteBuffer.get(byteArray)
byteArray
}
val metadata = SequenceNumberRange(streamName, shardId,
records.get(0).getSequenceNumber(), records.get(records.size() - 1).getSequenceNumber())
blockGenerator.addMultipleDataWithCallback(dataIterator, metadata)
}
}
/** Get the latest sequence number for the given shard that can be checkpointed through KCL */
private[kinesis] def getLatestSeqNumToCheckpoint(shardId: String): Option[String] = {
shardIdToLatestStoredSeqNum.get(shardId)
}
/**
* Remember the range of sequence numbers that was added to the currently active block.
* Internally, this is synchronized with `finalizeRangesForCurrentBlock()`.
*/
private def rememberAddedRange(range: SequenceNumberRange): Unit = {
seqNumRangesInCurrentBlock += range
}
/**
* Finalize the ranges added to the block that was active and prepare the ranges buffer
* for next block. Internally, this is synchronized with `rememberAddedRange()`.
*/
private def finalizeRangesForCurrentBlock(blockId: StreamBlockId): Unit = {
blockIdToSeqNumRanges(blockId) = SequenceNumberRanges(seqNumRangesInCurrentBlock.toArray)
seqNumRangesInCurrentBlock.clear()
logDebug(s"Generated block $blockId has $blockIdToSeqNumRanges")
}
/** Store the block along with its associated ranges */
private def storeBlockWithRanges(
blockId: StreamBlockId, arrayBuffer: mutable.ArrayBuffer[Array[Byte]]): Unit = {
val rangesToReportOption = blockIdToSeqNumRanges.remove(blockId)
if (rangesToReportOption.isEmpty) {
stop("Error while storing block into Spark, could not find sequence number ranges " +
s"for block $blockId")
return
}
val rangesToReport = rangesToReportOption.get
var attempt = 0
var stored = false
var throwable: Throwable = null
while (!stored && attempt <= 3) {
try {
store(arrayBuffer, rangesToReport)
stored = true
} catch {
case NonFatal(th) =>
attempt += 1
throwable = th
}
}
if (!stored) {
stop("Error while storing block into Spark", throwable)
}
// Update the latest sequence number that have been successfully stored for each shard
// Note that we are doing this sequentially because the array of sequence number ranges
// is assumed to be
rangesToReport.ranges.foreach { range =>
shardIdToLatestStoredSeqNum(range.shardId) = range.toSeqNumber
}
}
/**
* If AWS credential is provided, return a AWSCredentialProvider returning that credential.
* Otherwise, return the DefaultAWSCredentialsProviderChain.
@ -182,4 +291,46 @@ private[kinesis] class KinesisReceiver(
new DefaultAWSCredentialsProviderChain()
}
}
/**
* Class to handle blocks generated by this receiver's block generator. Specifically, in
* the context of the Kinesis Receiver, this handler does the following.
*
* - When an array of records is added to the current active block in the block generator,
* this handler keeps track of the corresponding sequence number range.
* - When the currently active block is ready to sealed (not more records), this handler
* keep track of the list of ranges added into this block in another H
*/
private class GeneratedBlockHandler extends BlockGeneratorListener {
/**
* Callback method called after a data item is added into the BlockGenerator.
* The data addition, block generation, and calls to onAddData and onGenerateBlock
* are all synchronized through the same lock.
*/
def onAddData(data: Any, metadata: Any): Unit = {
rememberAddedRange(metadata.asInstanceOf[SequenceNumberRange])
}
/**
* Callback method called after a block has been generated.
* The data addition, block generation, and calls to onAddData and onGenerateBlock
* are all synchronized through the same lock.
*/
def onGenerateBlock(blockId: StreamBlockId): Unit = {
finalizeRangesForCurrentBlock(blockId)
}
/** Callback method called when a block is ready to be pushed / stored. */
def onPushBlock(blockId: StreamBlockId, arrayBuffer: mutable.ArrayBuffer[_]): Unit = {
storeBlockWithRanges(blockId,
arrayBuffer.asInstanceOf[mutable.ArrayBuffer[Array[Byte]]])
}
/** Callback called in case of any error in internal of the BlockGenerator */
def onError(message: String, throwable: Throwable): Unit = {
reportError(message, throwable)
}
}
}

View file

@ -18,20 +18,16 @@ package org.apache.spark.streaming.kinesis
import java.util.List
import scala.collection.JavaConversions.asScalaBuffer
import scala.util.Random
import scala.util.control.NonFatal
import org.apache.spark.Logging
import com.amazonaws.services.kinesis.clientlibrary.exceptions.InvalidStateException
import com.amazonaws.services.kinesis.clientlibrary.exceptions.KinesisClientLibDependencyException
import com.amazonaws.services.kinesis.clientlibrary.exceptions.ShutdownException
import com.amazonaws.services.kinesis.clientlibrary.exceptions.ThrottlingException
import com.amazonaws.services.kinesis.clientlibrary.interfaces.IRecordProcessor
import com.amazonaws.services.kinesis.clientlibrary.interfaces.IRecordProcessorCheckpointer
import com.amazonaws.services.kinesis.clientlibrary.exceptions.{InvalidStateException, KinesisClientLibDependencyException, ShutdownException, ThrottlingException}
import com.amazonaws.services.kinesis.clientlibrary.interfaces.{IRecordProcessor, IRecordProcessorCheckpointer}
import com.amazonaws.services.kinesis.clientlibrary.types.ShutdownReason
import com.amazonaws.services.kinesis.model.Record
import org.apache.spark.Logging
/**
* Kinesis-specific implementation of the Kinesis Client Library (KCL) IRecordProcessor.
* This implementation operates on the Array[Byte] from the KinesisReceiver.
@ -51,6 +47,7 @@ private[kinesis] class KinesisRecordProcessor(
checkpointState: KinesisCheckpointState) extends IRecordProcessor with Logging {
// shardId to be populated during initialize()
@volatile
private var shardId: String = _
/**
@ -75,47 +72,38 @@ private[kinesis] class KinesisRecordProcessor(
override def processRecords(batch: List[Record], checkpointer: IRecordProcessorCheckpointer) {
if (!receiver.isStopped()) {
try {
/*
* Notes:
* 1) If we try to store the raw ByteBuffer from record.getData(), the Spark Streaming
* Receiver.store(ByteBuffer) attempts to deserialize the ByteBuffer using the
* internally-configured Spark serializer (kryo, etc).
* 2) This is not desirable, so we instead store a raw Array[Byte] and decouple
* ourselves from Spark's internal serialization strategy.
* 3) For performance, the BlockGenerator is asynchronously queuing elements within its
* memory before creating blocks. This prevents the small block scenario, but requires
* that you register callbacks to know when a block has been generated and stored
* (WAL is sufficient for storage) before can checkpoint back to the source.
*/
batch.foreach(record => receiver.store(record.getData().array()))
logDebug(s"Stored: Worker $workerId stored ${batch.size} records for shardId $shardId")
receiver.addRecords(shardId, batch)
logDebug(s"Stored: Worker $workerId stored ${batch.size} records for shardId $shardId")
/*
* Checkpoint the sequence number of the last record successfully processed/stored
* in the batch.
* In this implementation, we're checkpointing after the given checkpointIntervalMillis.
* Note that this logic requires that processRecords() be called AND that it's time to
* checkpoint. I point this out because there is no background thread running the
* checkpointer. Checkpointing is tested and trigger only when a new batch comes in.
* If the worker is shutdown cleanly, checkpoint will happen (see shutdown() below).
* However, if the worker dies unexpectedly, a checkpoint may not happen.
* This could lead to records being processed more than once.
*
* Checkpoint the sequence number of the last record successfully stored.
* Note that in this current implementation, the checkpointing occurs only when after
* checkpointIntervalMillis from the last checkpoint, AND when there is new record
* to process. This leads to the checkpointing lagging behind what records have been
* stored by the receiver. Ofcourse, this can lead records processed more than once,
* under failures and restarts.
*
* TODO: Instead of checkpointing here, run a separate timer task to perform
* checkpointing so that it checkpoints in a timely manner independent of whether
* new records are available or not.
*/
if (checkpointState.shouldCheckpoint()) {
/* Perform the checkpoint */
KinesisRecordProcessor.retryRandom(checkpointer.checkpoint(), 4, 100)
receiver.getLatestSeqNumToCheckpoint(shardId).foreach { latestSeqNum =>
/* Perform the checkpoint */
KinesisRecordProcessor.retryRandom(checkpointer.checkpoint(latestSeqNum), 4, 100)
/* Update the next checkpoint time */
checkpointState.advanceCheckpoint()
/* Update the next checkpoint time */
checkpointState.advanceCheckpoint()
logDebug(s"Checkpoint: WorkerId $workerId completed checkpoint of ${batch.size}" +
logDebug(s"Checkpoint: WorkerId $workerId completed checkpoint of ${batch.size}" +
s" records for shardId $shardId")
logDebug(s"Checkpoint: Next checkpoint is at " +
logDebug(s"Checkpoint: Next checkpoint is at " +
s" ${checkpointState.checkpointClock.getTimeMillis()} for shardId $shardId")
}
}
} catch {
case e: Throwable => {
case NonFatal(e) => {
/*
* If there is a failure within the batch, the batch will not be checkpointed.
* This will potentially cause records since the last checkpoint to be processed
@ -130,7 +118,7 @@ private[kinesis] class KinesisRecordProcessor(
}
} else {
/* RecordProcessor has been stopped. */
logInfo(s"Stopped: The Spark KinesisReceiver has stopped for workerId $workerId" +
logInfo(s"Stopped: KinesisReceiver has stopped for workerId $workerId" +
s" and shardId $shardId. No more records will be processed.")
}
}
@ -154,7 +142,11 @@ private[kinesis] class KinesisRecordProcessor(
* It's now OK to read from the new shards that resulted from a resharding event.
*/
case ShutdownReason.TERMINATE =>
KinesisRecordProcessor.retryRandom(checkpointer.checkpoint(), 4, 100)
val latestSeqNumToCheckpointOption = receiver.getLatestSeqNumToCheckpoint(shardId)
if (latestSeqNumToCheckpointOption.nonEmpty) {
KinesisRecordProcessor.retryRandom(
checkpointer.checkpoint(latestSeqNumToCheckpointOption.get), 4, 100)
}
/*
* ZOMBIE Use Case. NoOp.

View file

@ -36,22 +36,10 @@ import org.apache.spark.Logging
/**
* Shared utility methods for performing Kinesis tests that actually transfer data
*/
private class KinesisTestUtils(val endpointUrl: String, _regionName: String) extends Logging {
def this() {
this("https://kinesis.us-west-2.amazonaws.com", "")
}
def this(endpointUrl: String) {
this(endpointUrl, "")
}
val regionName = if (_regionName.length == 0) {
RegionUtils.getRegionByEndpoint(endpointUrl).getName()
} else {
RegionUtils.getRegion(_regionName).getName()
}
private class KinesisTestUtils extends Logging {
val endpointUrl = KinesisTestUtils.endpointUrl
val regionName = RegionUtils.getRegionByEndpoint(endpointUrl).getName()
val streamShardCount = 2
private val createStreamTimeoutSeconds = 300
@ -81,11 +69,11 @@ private class KinesisTestUtils(val endpointUrl: String, _regionName: String) ext
}
def createStream(): Unit = {
logInfo("Creating stream")
require(!streamCreated, "Stream already created")
_streamName = findNonExistentStreamName()
// Create a stream. The number of shards determines the provisioned throughput.
logInfo(s"Creating stream ${_streamName}")
val createStreamRequest = new CreateStreamRequest()
createStreamRequest.setStreamName(_streamName)
createStreamRequest.setShardCount(2)
@ -94,7 +82,7 @@ private class KinesisTestUtils(val endpointUrl: String, _regionName: String) ext
// The stream is now being created. Wait for it to become active.
waitForStreamToBeActive(_streamName)
streamCreated = true
logInfo("Created stream")
logInfo(s"Created stream ${_streamName}")
}
/**
@ -191,9 +179,38 @@ private class KinesisTestUtils(val endpointUrl: String, _regionName: String) ext
private[kinesis] object KinesisTestUtils {
val envVarName = "ENABLE_KINESIS_TESTS"
val envVarNameForEnablingTests = "ENABLE_KINESIS_TESTS"
val endVarNameForEndpoint = "KINESIS_TEST_ENDPOINT_URL"
val defaultEndpointUrl = "https://kinesis.us-west-2.amazonaws.com"
val shouldRunTests = sys.env.get(envVarName) == Some("1")
lazy val shouldRunTests = {
val isEnvSet = sys.env.get(envVarNameForEnablingTests) == Some("1")
if (isEnvSet) {
// scalastyle:off println
// Print this so that they are easily visible on the console and not hidden in the log4j logs.
println(
s"""
|Kinesis tests that actually send data has been enabled by setting the environment
|variable $envVarNameForEnablingTests to 1. This will create Kinesis Streams and
|DynamoDB tables in AWS. Please be aware that this may incur some AWS costs.
|By default, the tests use the endpoint URL $defaultEndpointUrl to create Kinesis streams.
|To change this endpoint URL to a different region, you can set the environment variable
|$endVarNameForEndpoint to the desired endpoint URL
|(e.g. $endVarNameForEndpoint="https://kinesis.us-west-2.amazonaws.com").
""".stripMargin)
// scalastyle:on println
}
isEnvSet
}
lazy val endpointUrl = {
val url = sys.env.getOrElse(endVarNameForEndpoint, defaultEndpointUrl)
// scalastyle:off println
// Print this so that they are easily visible on the console and not hidden in the log4j logs.
println(s"Using endpoint URL $url for creating Kinesis streams for tests.")
// scalastyle:on println
url
}
def isAWSCredentialsPresent: Boolean = {
Try { new DefaultAWSCredentialsProviderChain().getCredentials() }.isSuccess
@ -205,7 +222,13 @@ private[kinesis] object KinesisTestUtils {
Try { new DefaultAWSCredentialsProviderChain().getCredentials() } match {
case Success(cred) => cred
case Failure(e) =>
throw new Exception("Kinesis tests enabled, but could get not AWS credentials")
throw new Exception(
s"""
|Kinesis tests enabled using environment variable $envVarNameForEnablingTests
|but could not find AWS credentials. Please follow instructions in AWS documentation
|to set the credentials in your system such that the DefaultAWSCredentialsProviderChain
|can find the credentials.
""".stripMargin)
}
}
}

View file

@ -65,9 +65,8 @@ object KinesisUtils {
): ReceiverInputDStream[Array[Byte]] = {
// Setting scope to override receiver stream's scope of "receiver stream"
ssc.withNamedScope("kinesis stream") {
ssc.receiverStream(
new KinesisReceiver(kinesisAppName, streamName, endpointUrl, validateRegion(regionName),
initialPositionInStream, checkpointInterval, storageLevel, None))
new KinesisInputDStream(ssc, streamName, endpointUrl, validateRegion(regionName),
initialPositionInStream, kinesisAppName, checkpointInterval, storageLevel, None)
}
}
@ -112,10 +111,11 @@ object KinesisUtils {
awsAccessKeyId: String,
awsSecretKey: String
): ReceiverInputDStream[Array[Byte]] = {
ssc.receiverStream(
new KinesisReceiver(kinesisAppName, streamName, endpointUrl, validateRegion(regionName),
initialPositionInStream, checkpointInterval, storageLevel,
Some(SerializableAWSCredentials(awsAccessKeyId, awsSecretKey))))
ssc.withNamedScope("kinesis stream") {
new KinesisInputDStream(ssc, streamName, endpointUrl, validateRegion(regionName),
initialPositionInStream, kinesisAppName, checkpointInterval, storageLevel,
Some(SerializableAWSCredentials(awsAccessKeyId, awsSecretKey)))
}
}
/**
@ -155,9 +155,10 @@ object KinesisUtils {
initialPositionInStream: InitialPositionInStream,
storageLevel: StorageLevel
): ReceiverInputDStream[Array[Byte]] = {
ssc.receiverStream(
new KinesisReceiver(ssc.sc.appName, streamName, endpointUrl, getRegionByEndpoint(endpointUrl),
initialPositionInStream, checkpointInterval, storageLevel, None))
ssc.withNamedScope("kinesis stream") {
new KinesisInputDStream(ssc, streamName, endpointUrl, getRegionByEndpoint(endpointUrl),
initialPositionInStream, ssc.sc.appName, checkpointInterval, storageLevel, None)
}
}
/**

View file

@ -24,8 +24,6 @@ import org.apache.spark.{SparkConf, SparkContext, SparkException}
class KinesisBackedBlockRDDSuite extends KinesisFunSuite with BeforeAndAfterAll {
private val regionId = "us-east-1"
private val endpointUrl = "https://kinesis.us-east-1.amazonaws.com"
private val testData = 1 to 8
private var testUtils: KinesisTestUtils = null
@ -42,7 +40,7 @@ class KinesisBackedBlockRDDSuite extends KinesisFunSuite with BeforeAndAfterAll
override def beforeAll(): Unit = {
runIfTestsEnabled("Prepare KinesisTestUtils") {
testUtils = new KinesisTestUtils(endpointUrl)
testUtils = new KinesisTestUtils()
testUtils.createStream()
shardIdToDataAndSeqNumbers = testUtils.pushData(testData)
@ -75,21 +73,21 @@ class KinesisBackedBlockRDDSuite extends KinesisFunSuite with BeforeAndAfterAll
testIfEnabled("Basic reading from Kinesis") {
// Verify all data using multiple ranges in a single RDD partition
val receivedData1 = new KinesisBackedBlockRDD(sc, regionId, endpointUrl,
val receivedData1 = new KinesisBackedBlockRDD(sc, testUtils.regionName, testUtils.endpointUrl,
fakeBlockIds(1),
Array(SequenceNumberRanges(allRanges.toArray))
).map { bytes => new String(bytes).toInt }.collect()
assert(receivedData1.toSet === testData.toSet)
// Verify all data using one range in each of the multiple RDD partitions
val receivedData2 = new KinesisBackedBlockRDD(sc, regionId, endpointUrl,
val receivedData2 = new KinesisBackedBlockRDD(sc, testUtils.regionName, testUtils.endpointUrl,
fakeBlockIds(allRanges.size),
allRanges.map { range => SequenceNumberRanges(Array(range)) }.toArray
).map { bytes => new String(bytes).toInt }.collect()
assert(receivedData2.toSet === testData.toSet)
// Verify ordering within each partition
val receivedData3 = new KinesisBackedBlockRDD(sc, regionId, endpointUrl,
val receivedData3 = new KinesisBackedBlockRDD(sc, testUtils.regionName, testUtils.endpointUrl,
fakeBlockIds(allRanges.size),
allRanges.map { range => SequenceNumberRanges(Array(range)) }.toArray
).map { bytes => new String(bytes).toInt }.collectPartitions()
@ -211,7 +209,8 @@ class KinesisBackedBlockRDDSuite extends KinesisFunSuite with BeforeAndAfterAll
}, "Incorrect configuration of RDD, unexpected ranges set"
)
val rdd = new KinesisBackedBlockRDD(sc, regionId, endpointUrl, blockIds, ranges)
val rdd = new KinesisBackedBlockRDD(
sc, testUtils.regionName, testUtils.endpointUrl, blockIds, ranges)
val collectedData = rdd.map { bytes =>
new String(bytes).toInt
}.collect()
@ -224,8 +223,9 @@ class KinesisBackedBlockRDDSuite extends KinesisFunSuite with BeforeAndAfterAll
if (testIsBlockValid) {
require(numPartitionsInBM === numPartitions, "All partitions must be in BlockManager")
require(numPartitionsInKinesis === 0, "No partitions must be in Kinesis")
val rdd2 = new KinesisBackedBlockRDD(sc, regionId, endpointUrl, blockIds.toArray,
ranges, isBlockIdValid = Array.fill(blockIds.length)(false))
val rdd2 = new KinesisBackedBlockRDD(
sc, testUtils.regionName, testUtils.endpointUrl, blockIds.toArray, ranges,
isBlockIdValid = Array.fill(blockIds.length)(false))
intercept[SparkException] {
rdd2.collect()
}

View file

@ -31,7 +31,7 @@ trait KinesisFunSuite extends SparkFunSuite {
if (shouldRunTests) {
test(testName)(testBody)
} else {
ignore(s"$testName [enable by setting env var $envVarName=1]")(testBody)
ignore(s"$testName [enable by setting env var $envVarNameForEnablingTests=1]")(testBody)
}
}
@ -40,7 +40,7 @@ trait KinesisFunSuite extends SparkFunSuite {
if (shouldRunTests) {
body
} else {
ignore(s"$message [enable by setting env var $envVarName=1]")()
ignore(s"$message [enable by setting env var $envVarNameForEnablingTests=1]")()
}
}
}

View file

@ -22,15 +22,14 @@ import scala.collection.JavaConversions.seqAsJavaList
import com.amazonaws.services.kinesis.clientlibrary.exceptions.{InvalidStateException, KinesisClientLibDependencyException, ShutdownException, ThrottlingException}
import com.amazonaws.services.kinesis.clientlibrary.interfaces.IRecordProcessorCheckpointer
import com.amazonaws.services.kinesis.clientlibrary.lib.worker.InitialPositionInStream
import com.amazonaws.services.kinesis.clientlibrary.types.ShutdownReason
import com.amazonaws.services.kinesis.model.Record
import org.mockito.Matchers._
import org.mockito.Mockito._
import org.scalatest.{BeforeAndAfter, Matchers}
import org.scalatest.mock.MockitoSugar
import org.scalatest.{BeforeAndAfter, Matchers}
import org.apache.spark.storage.StorageLevel
import org.apache.spark.streaming.{Milliseconds, Seconds, StreamingContext, TestSuiteBase}
import org.apache.spark.streaming.{Milliseconds, TestSuiteBase}
import org.apache.spark.util.{Clock, ManualClock, Utils}
/**
@ -44,6 +43,8 @@ class KinesisReceiverSuite extends TestSuiteBase with Matchers with BeforeAndAft
val endpoint = "endpoint-url"
val workerId = "dummyWorkerId"
val shardId = "dummyShardId"
val seqNum = "dummySeqNum"
val someSeqNum = Some(seqNum)
val record1 = new Record()
record1.setData(ByteBuffer.wrap("Spark In Action".getBytes()))
@ -80,16 +81,18 @@ class KinesisReceiverSuite extends TestSuiteBase with Matchers with BeforeAndAft
test("process records including store and checkpoint") {
when(receiverMock.isStopped()).thenReturn(false)
when(receiverMock.getLatestSeqNumToCheckpoint(shardId)).thenReturn(someSeqNum)
when(checkpointStateMock.shouldCheckpoint()).thenReturn(true)
val recordProcessor = new KinesisRecordProcessor(receiverMock, workerId, checkpointStateMock)
recordProcessor.initialize(shardId)
recordProcessor.processRecords(batch, checkpointerMock)
verify(receiverMock, times(1)).isStopped()
verify(receiverMock, times(1)).store(record1.getData().array())
verify(receiverMock, times(1)).store(record2.getData().array())
verify(receiverMock, times(1)).addRecords(shardId, batch)
verify(receiverMock, times(1)).getLatestSeqNumToCheckpoint(shardId)
verify(checkpointStateMock, times(1)).shouldCheckpoint()
verify(checkpointerMock, times(1)).checkpoint()
verify(checkpointerMock, times(1)).checkpoint(anyString)
verify(checkpointStateMock, times(1)).advanceCheckpoint()
}
@ -100,19 +103,25 @@ class KinesisReceiverSuite extends TestSuiteBase with Matchers with BeforeAndAft
recordProcessor.processRecords(batch, checkpointerMock)
verify(receiverMock, times(1)).isStopped()
verify(receiverMock, never).addRecords(anyString, anyListOf(classOf[Record]))
verify(checkpointerMock, never).checkpoint(anyString)
}
test("shouldn't checkpoint when exception occurs during store") {
when(receiverMock.isStopped()).thenReturn(false)
when(receiverMock.store(record1.getData().array())).thenThrow(new RuntimeException())
when(
receiverMock.addRecords(anyString, anyListOf(classOf[Record]))
).thenThrow(new RuntimeException())
intercept[RuntimeException] {
val recordProcessor = new KinesisRecordProcessor(receiverMock, workerId, checkpointStateMock)
recordProcessor.initialize(shardId)
recordProcessor.processRecords(batch, checkpointerMock)
}
verify(receiverMock, times(1)).isStopped()
verify(receiverMock, times(1)).store(record1.getData().array())
verify(receiverMock, times(1)).addRecords(shardId, batch)
verify(checkpointerMock, never).checkpoint(anyString)
}
test("should set checkpoint time to currentTime + checkpoint interval upon instantiation") {
@ -158,19 +167,25 @@ class KinesisReceiverSuite extends TestSuiteBase with Matchers with BeforeAndAft
}
test("shutdown should checkpoint if the reason is TERMINATE") {
val recordProcessor = new KinesisRecordProcessor(receiverMock, workerId, checkpointStateMock)
val reason = ShutdownReason.TERMINATE
recordProcessor.shutdown(checkpointerMock, reason)
when(receiverMock.getLatestSeqNumToCheckpoint(shardId)).thenReturn(someSeqNum)
verify(checkpointerMock, times(1)).checkpoint()
val recordProcessor = new KinesisRecordProcessor(receiverMock, workerId, checkpointStateMock)
recordProcessor.initialize(shardId)
recordProcessor.shutdown(checkpointerMock, ShutdownReason.TERMINATE)
verify(receiverMock, times(1)).getLatestSeqNumToCheckpoint(shardId)
verify(checkpointerMock, times(1)).checkpoint(anyString)
}
test("shutdown should not checkpoint if the reason is something other than TERMINATE") {
when(receiverMock.getLatestSeqNumToCheckpoint(shardId)).thenReturn(someSeqNum)
val recordProcessor = new KinesisRecordProcessor(receiverMock, workerId, checkpointStateMock)
recordProcessor.initialize(shardId)
recordProcessor.shutdown(checkpointerMock, ShutdownReason.ZOMBIE)
recordProcessor.shutdown(checkpointerMock, null)
verify(checkpointerMock, never()).checkpoint()
verify(checkpointerMock, never).checkpoint(anyString)
}
test("retry success on first attempt") {

View file

@ -22,34 +22,67 @@ import scala.concurrent.duration._
import scala.language.postfixOps
import scala.util.Random
import com.amazonaws.regions.RegionUtils
import com.amazonaws.services.kinesis.clientlibrary.lib.worker.InitialPositionInStream
import org.scalatest.Matchers._
import org.scalatest.concurrent.Eventually
import org.scalatest.{BeforeAndAfter, BeforeAndAfterAll}
import org.apache.spark.storage.StorageLevel
import org.apache.spark.rdd.RDD
import org.apache.spark.storage.{StorageLevel, StreamBlockId}
import org.apache.spark.streaming._
import org.apache.spark.{SparkConf, SparkContext, SparkFunSuite}
import org.apache.spark.streaming.kinesis.KinesisTestUtils._
import org.apache.spark.streaming.receiver.BlockManagerBasedStoreResult
import org.apache.spark.streaming.scheduler.ReceivedBlockInfo
import org.apache.spark.util.Utils
import org.apache.spark.{SparkConf, SparkContext}
class KinesisStreamSuite extends KinesisFunSuite
with Eventually with BeforeAndAfter with BeforeAndAfterAll {
// This is the name that KCL uses to save metadata to DynamoDB
private val kinesisAppName = s"KinesisStreamSuite-${math.abs(Random.nextLong())}"
// This is the name that KCL will use to save metadata to DynamoDB
private val appName = s"KinesisStreamSuite-${math.abs(Random.nextLong())}"
private val batchDuration = Seconds(1)
private var ssc: StreamingContext = _
private var sc: SparkContext = _
// Dummy parameters for API testing
private val dummyEndpointUrl = defaultEndpointUrl
private val dummyRegionName = RegionUtils.getRegionByEndpoint(dummyEndpointUrl).getName()
private val dummyAWSAccessKey = "dummyAccessKey"
private val dummyAWSSecretKey = "dummySecretKey"
private var testUtils: KinesisTestUtils = null
private var ssc: StreamingContext = null
private var sc: SparkContext = null
override def beforeAll(): Unit = {
val conf = new SparkConf()
.setMaster("local[4]")
.setAppName("KinesisStreamSuite") // Setting Spark app name to Kinesis app name
sc = new SparkContext(conf)
runIfTestsEnabled("Prepare KinesisTestUtils") {
testUtils = new KinesisTestUtils()
testUtils.createStream()
}
}
override def afterAll(): Unit = {
sc.stop()
// Delete the Kinesis stream as well as the DynamoDB table generated by
// Kinesis Client Library when consuming the stream
if (ssc != null) {
ssc.stop()
}
if (sc != null) {
sc.stop()
}
if (testUtils != null) {
// Delete the Kinesis stream as well as the DynamoDB table generated by
// Kinesis Client Library when consuming the stream
testUtils.deleteStream()
testUtils.deleteDynamoDBTable(appName)
}
}
before {
ssc = new StreamingContext(sc, batchDuration)
}
after {
@ -57,21 +90,75 @@ class KinesisStreamSuite extends KinesisFunSuite
ssc.stop(stopSparkContext = false)
ssc = null
}
if (testUtils != null) {
testUtils.deleteDynamoDBTable(appName)
}
}
test("KinesisUtils API") {
ssc = new StreamingContext(sc, Seconds(1))
// Tests the API, does not actually test data receiving
val kinesisStream1 = KinesisUtils.createStream(ssc, "mySparkStream",
"https://kinesis.us-west-2.amazonaws.com", Seconds(2),
dummyEndpointUrl, Seconds(2),
InitialPositionInStream.LATEST, StorageLevel.MEMORY_AND_DISK_2)
val kinesisStream2 = KinesisUtils.createStream(ssc, "myAppNam", "mySparkStream",
"https://kinesis.us-west-2.amazonaws.com", "us-west-2",
dummyEndpointUrl, dummyRegionName,
InitialPositionInStream.LATEST, Seconds(2), StorageLevel.MEMORY_AND_DISK_2)
val kinesisStream3 = KinesisUtils.createStream(ssc, "myAppNam", "mySparkStream",
"https://kinesis.us-west-2.amazonaws.com", "us-west-2",
dummyEndpointUrl, dummyRegionName,
InitialPositionInStream.LATEST, Seconds(2), StorageLevel.MEMORY_AND_DISK_2,
"awsAccessKey", "awsSecretKey")
dummyAWSAccessKey, dummyAWSSecretKey)
}
test("RDD generation") {
val inputStream = KinesisUtils.createStream(ssc, appName, "dummyStream",
dummyEndpointUrl, dummyRegionName, InitialPositionInStream.LATEST, Seconds(2),
StorageLevel.MEMORY_AND_DISK_2, dummyAWSAccessKey, dummyAWSSecretKey)
assert(inputStream.isInstanceOf[KinesisInputDStream])
val kinesisStream = inputStream.asInstanceOf[KinesisInputDStream]
val time = Time(1000)
// Generate block info data for testing
val seqNumRanges1 = SequenceNumberRanges(
SequenceNumberRange("fakeStream", "fakeShardId", "xxx", "yyy"))
val blockId1 = StreamBlockId(kinesisStream.id, 123)
val blockInfo1 = ReceivedBlockInfo(
0, None, Some(seqNumRanges1), new BlockManagerBasedStoreResult(blockId1, None))
val seqNumRanges2 = SequenceNumberRanges(
SequenceNumberRange("fakeStream", "fakeShardId", "aaa", "bbb"))
val blockId2 = StreamBlockId(kinesisStream.id, 345)
val blockInfo2 = ReceivedBlockInfo(
0, None, Some(seqNumRanges2), new BlockManagerBasedStoreResult(blockId2, None))
// Verify that the generated KinesisBackedBlockRDD has the all the right information
val blockInfos = Seq(blockInfo1, blockInfo2)
val nonEmptyRDD = kinesisStream.createBlockRDD(time, blockInfos)
nonEmptyRDD shouldBe a [KinesisBackedBlockRDD]
val kinesisRDD = nonEmptyRDD.asInstanceOf[KinesisBackedBlockRDD]
assert(kinesisRDD.regionName === dummyRegionName)
assert(kinesisRDD.endpointUrl === dummyEndpointUrl)
assert(kinesisRDD.retryTimeoutMs === batchDuration.milliseconds)
assert(kinesisRDD.awsCredentialsOption ===
Some(SerializableAWSCredentials(dummyAWSAccessKey, dummyAWSSecretKey)))
assert(nonEmptyRDD.partitions.size === blockInfos.size)
nonEmptyRDD.partitions.foreach { _ shouldBe a [KinesisBackedBlockRDDPartition] }
val partitions = nonEmptyRDD.partitions.map {
_.asInstanceOf[KinesisBackedBlockRDDPartition] }.toSeq
assert(partitions.map { _.seqNumberRanges } === Seq(seqNumRanges1, seqNumRanges2))
assert(partitions.map { _.blockId } === Seq(blockId1, blockId2))
assert(partitions.forall { _.isBlockIdValid === true })
// Verify that KinesisBackedBlockRDD is generated even when there are no blocks
val emptyRDD = kinesisStream.createBlockRDD(time, Seq.empty)
emptyRDD shouldBe a [KinesisBackedBlockRDD]
emptyRDD.partitions shouldBe empty
// Verify that the KinesisBackedBlockRDD has isBlockValid = false when blocks are invalid
blockInfos.foreach { _.setBlockIdInvalid() }
kinesisStream.createBlockRDD(time, blockInfos).partitions.foreach { partition =>
assert(partition.asInstanceOf[KinesisBackedBlockRDDPartition].isBlockIdValid === false)
}
}
@ -84,32 +171,91 @@ class KinesisStreamSuite extends KinesisFunSuite
* and you have to set the system environment variable RUN_KINESIS_TESTS=1 .
*/
testIfEnabled("basic operation") {
val kinesisTestUtils = new KinesisTestUtils()
try {
kinesisTestUtils.createStream()
ssc = new StreamingContext(sc, Seconds(1))
val awsCredentials = KinesisTestUtils.getAWSCredentials()
val stream = KinesisUtils.createStream(ssc, kinesisAppName, kinesisTestUtils.streamName,
kinesisTestUtils.endpointUrl, kinesisTestUtils.regionName, InitialPositionInStream.LATEST,
Seconds(10), StorageLevel.MEMORY_ONLY,
awsCredentials.getAWSAccessKeyId, awsCredentials.getAWSSecretKey)
val awsCredentials = KinesisTestUtils.getAWSCredentials()
val stream = KinesisUtils.createStream(ssc, appName, testUtils.streamName,
testUtils.endpointUrl, testUtils.regionName, InitialPositionInStream.LATEST,
Seconds(10), StorageLevel.MEMORY_ONLY,
awsCredentials.getAWSAccessKeyId, awsCredentials.getAWSSecretKey)
val collected = new mutable.HashSet[Int] with mutable.SynchronizedSet[Int]
stream.map { bytes => new String(bytes).toInt }.foreachRDD { rdd =>
collected ++= rdd.collect()
logInfo("Collected = " + rdd.collect().toSeq.mkString(", "))
}
ssc.start()
val testData = 1 to 10
eventually(timeout(120 seconds), interval(10 second)) {
kinesisTestUtils.pushData(testData)
assert(collected === testData.toSet, "\nData received does not match data sent")
}
ssc.stop()
} finally {
kinesisTestUtils.deleteStream()
kinesisTestUtils.deleteDynamoDBTable(kinesisAppName)
val collected = new mutable.HashSet[Int] with mutable.SynchronizedSet[Int]
stream.map { bytes => new String(bytes).toInt }.foreachRDD { rdd =>
collected ++= rdd.collect()
logInfo("Collected = " + rdd.collect().toSeq.mkString(", "))
}
ssc.start()
val testData = 1 to 10
eventually(timeout(120 seconds), interval(10 second)) {
testUtils.pushData(testData)
assert(collected === testData.toSet, "\nData received does not match data sent")
}
ssc.stop(stopSparkContext = false)
}
testIfEnabled("failure recovery") {
val sparkConf = new SparkConf().setMaster("local[4]").setAppName(this.getClass.getSimpleName)
val checkpointDir = Utils.createTempDir().getAbsolutePath
ssc = new StreamingContext(sc, Milliseconds(1000))
ssc.checkpoint(checkpointDir)
val awsCredentials = KinesisTestUtils.getAWSCredentials()
val collectedData = new mutable.HashMap[Time, (Array[SequenceNumberRanges], Seq[Int])]
with mutable.SynchronizedMap[Time, (Array[SequenceNumberRanges], Seq[Int])]
val kinesisStream = KinesisUtils.createStream(ssc, appName, testUtils.streamName,
testUtils.endpointUrl, testUtils.regionName, InitialPositionInStream.LATEST,
Seconds(10), StorageLevel.MEMORY_ONLY,
awsCredentials.getAWSAccessKeyId, awsCredentials.getAWSSecretKey)
// Verify that the generated RDDs are KinesisBackedBlockRDDs, and collect the data in each batch
kinesisStream.foreachRDD((rdd: RDD[Array[Byte]], time: Time) => {
val kRdd = rdd.asInstanceOf[KinesisBackedBlockRDD]
val data = rdd.map { bytes => new String(bytes).toInt }.collect().toSeq
collectedData(time) = (kRdd.arrayOfseqNumberRanges, data)
})
ssc.remember(Minutes(60)) // remember all the batches so that they are all saved in checkpoint
ssc.start()
def numBatchesWithData: Int = collectedData.count(_._2._2.nonEmpty)
def isCheckpointPresent: Boolean = Checkpoint.getCheckpointFiles(checkpointDir).nonEmpty
// Run until there are at least 10 batches with some data in them
// If this times out because numBatchesWithData is empty, then its likely that foreachRDD
// function failed with exceptions, and nothing got added to `collectedData`
eventually(timeout(2 minutes), interval(1 seconds)) {
testUtils.pushData(1 to 5)
assert(isCheckpointPresent && numBatchesWithData > 10)
}
ssc.stop(stopSparkContext = true) // stop the SparkContext so that the blocks are not reused
// Restart the context from checkpoint and verify whether the
logInfo("Restarting from checkpoint")
ssc = new StreamingContext(checkpointDir)
ssc.start()
val recoveredKinesisStream = ssc.graph.getInputStreams().head
// Verify that the recomputed RDDs are KinesisBackedBlockRDDs with the same sequence ranges
// and return the same data
val times = collectedData.keySet
times.foreach { time =>
val (arrayOfSeqNumRanges, data) = collectedData(time)
val rdd = recoveredKinesisStream.getOrCompute(time).get.asInstanceOf[RDD[Array[Byte]]]
rdd shouldBe a [KinesisBackedBlockRDD]
// Verify the recovered sequence ranges
val kRdd = rdd.asInstanceOf[KinesisBackedBlockRDD]
assert(kRdd.arrayOfseqNumberRanges.size === arrayOfSeqNumRanges.size)
arrayOfSeqNumRanges.zip(kRdd.arrayOfseqNumberRanges).foreach { case (expected, found) =>
assert(expected.ranges.toSeq === found.ranges.toSeq)
}
// Verify the recovered data
assert(rdd.map { bytes => new String(bytes).toInt }.collect().toSeq === data)
}
ssc.stop()
}
}

View file

@ -21,12 +21,12 @@ import scala.reflect.ClassTag
import org.apache.spark.rdd.{BlockRDD, RDD}
import org.apache.spark.storage.BlockId
import org.apache.spark.streaming.{StreamingContext, Time}
import org.apache.spark.streaming.rdd.WriteAheadLogBackedBlockRDD
import org.apache.spark.streaming.receiver.Receiver
import org.apache.spark.streaming.scheduler.{RateController, StreamInputInfo}
import org.apache.spark.streaming.scheduler.rate.RateEstimator
import org.apache.spark.streaming.scheduler.{ReceivedBlockInfo, RateController, StreamInputInfo}
import org.apache.spark.streaming.util.WriteAheadLogUtils
import org.apache.spark.streaming.{StreamingContext, Time}
/**
* Abstract class for defining any [[org.apache.spark.streaming.dstream.InputDStream]]
@ -79,50 +79,57 @@ abstract class ReceiverInputDStream[T: ClassTag](@transient ssc_ : StreamingCont
// for this batch
val receiverTracker = ssc.scheduler.receiverTracker
val blockInfos = receiverTracker.getBlocksOfBatch(validTime).getOrElse(id, Seq.empty)
val blockIds = blockInfos.map { _.blockId.asInstanceOf[BlockId] }.toArray
// Register the input blocks information into InputInfoTracker
val inputInfo = StreamInputInfo(id, blockInfos.flatMap(_.numRecords).sum)
ssc.scheduler.inputInfoTracker.reportInfo(validTime, inputInfo)
if (blockInfos.nonEmpty) {
// Are WAL record handles present with all the blocks
val areWALRecordHandlesPresent = blockInfos.forall { _.walRecordHandleOption.nonEmpty }
if (areWALRecordHandlesPresent) {
// If all the blocks have WAL record handle, then create a WALBackedBlockRDD
val isBlockIdValid = blockInfos.map { _.isBlockIdValid() }.toArray
val walRecordHandles = blockInfos.map { _.walRecordHandleOption.get }.toArray
new WriteAheadLogBackedBlockRDD[T](
ssc.sparkContext, blockIds, walRecordHandles, isBlockIdValid)
} else {
// Else, create a BlockRDD. However, if there are some blocks with WAL info but not
// others then that is unexpected and log a warning accordingly.
if (blockInfos.find(_.walRecordHandleOption.nonEmpty).nonEmpty) {
if (WriteAheadLogUtils.enableReceiverLog(ssc.conf)) {
logError("Some blocks do not have Write Ahead Log information; " +
"this is unexpected and data may not be recoverable after driver failures")
} else {
logWarning("Some blocks have Write Ahead Log information; this is unexpected")
}
}
new BlockRDD[T](ssc.sc, blockIds)
}
} else {
// If no block is ready now, creating WriteAheadLogBackedBlockRDD or BlockRDD
// according to the configuration
if (WriteAheadLogUtils.enableReceiverLog(ssc.conf)) {
new WriteAheadLogBackedBlockRDD[T](
ssc.sparkContext, Array.empty, Array.empty, Array.empty)
} else {
new BlockRDD[T](ssc.sc, Array.empty)
}
}
// Create the BlockRDD
createBlockRDD(validTime, blockInfos)
}
}
Some(blockRDD)
}
private[streaming] def createBlockRDD(time: Time, blockInfos: Seq[ReceivedBlockInfo]): RDD[T] = {
if (blockInfos.nonEmpty) {
val blockIds = blockInfos.map { _.blockId.asInstanceOf[BlockId] }.toArray
// Are WAL record handles present with all the blocks
val areWALRecordHandlesPresent = blockInfos.forall { _.walRecordHandleOption.nonEmpty }
if (areWALRecordHandlesPresent) {
// If all the blocks have WAL record handle, then create a WALBackedBlockRDD
val isBlockIdValid = blockInfos.map { _.isBlockIdValid() }.toArray
val walRecordHandles = blockInfos.map { _.walRecordHandleOption.get }.toArray
new WriteAheadLogBackedBlockRDD[T](
ssc.sparkContext, blockIds, walRecordHandles, isBlockIdValid)
} else {
// Else, create a BlockRDD. However, if there are some blocks with WAL info but not
// others then that is unexpected and log a warning accordingly.
if (blockInfos.find(_.walRecordHandleOption.nonEmpty).nonEmpty) {
if (WriteAheadLogUtils.enableReceiverLog(ssc.conf)) {
logError("Some blocks do not have Write Ahead Log information; " +
"this is unexpected and data may not be recoverable after driver failures")
} else {
logWarning("Some blocks have Write Ahead Log information; this is unexpected")
}
}
new BlockRDD[T](ssc.sc, blockIds)
}
} else {
// If no block is ready now, creating WriteAheadLogBackedBlockRDD or BlockRDD
// according to the configuration
if (WriteAheadLogUtils.enableReceiverLog(ssc.conf)) {
new WriteAheadLogBackedBlockRDD[T](
ssc.sparkContext, Array.empty, Array.empty, Array.empty)
} else {
new BlockRDD[T](ssc.sc, Array.empty)
}
}
}
/**
* A RateController that sends the new rate to receivers, via the receiver tracker.
*/