[SPARK-20168][DSTREAM] Add changes to use kinesis fetches from specific timestamp

## What changes were proposed in this pull request?

Kinesis client can resume from a specified timestamp while creating a stream. We should have option to pass a timestamp in config to allow kinesis to resume from the given timestamp.

The patch introduces a new `KinesisInitialPositionInStream` that takes the `InitialPositionInStream` with the `timestamp` information that can be used to resume kinesis fetches from the provided timestamp.

## How was this patch tested?

Unit Tests

cc : budde brkyvz

Author: Yash Sharma <ysharma@atlassian.com>

Closes #18029 from yssharma/ysharma/kcl_resume.
This commit is contained in:
Yash Sharma 2017-12-26 09:50:39 +02:00 committed by Burak Yavuz
parent be03d3ad79
commit 0e6833006d
8 changed files with 264 additions and 49 deletions

View file

@ -0,0 +1,91 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.spark.streaming.kinesis;
import com.amazonaws.services.kinesis.clientlibrary.lib.worker.InitialPositionInStream;
import java.io.Serializable;
import java.util.Date;
/**
* A java wrapper for exposing [[InitialPositionInStream]]
* to the corresponding Kinesis readers.
*/
interface KinesisInitialPosition {
InitialPositionInStream getPosition();
}
public class KinesisInitialPositions {
public static class Latest implements KinesisInitialPosition, Serializable {
public Latest() {}
@Override
public InitialPositionInStream getPosition() {
return InitialPositionInStream.LATEST;
}
}
public static class TrimHorizon implements KinesisInitialPosition, Serializable {
public TrimHorizon() {}
@Override
public InitialPositionInStream getPosition() {
return InitialPositionInStream.TRIM_HORIZON;
}
}
public static class AtTimestamp implements KinesisInitialPosition, Serializable {
private Date timestamp;
public AtTimestamp(Date timestamp) {
this.timestamp = timestamp;
}
@Override
public InitialPositionInStream getPosition() {
return InitialPositionInStream.AT_TIMESTAMP;
}
public Date getTimestamp() {
return timestamp;
}
}
/**
* Returns instance of [[KinesisInitialPosition]] based on the passed [[InitialPositionInStream]].
* This method is used in KinesisUtils for translating the InitialPositionInStream
* to InitialPosition. This function would be removed when we deprecate the KinesisUtils.
*
* @return [[InitialPosition]]
*/
public static KinesisInitialPosition fromKinesisInitialPosition(
InitialPositionInStream initialPositionInStream) throws UnsupportedOperationException {
if (initialPositionInStream == InitialPositionInStream.LATEST) {
return new Latest();
} else if (initialPositionInStream == InitialPositionInStream.TRIM_HORIZON) {
return new TrimHorizon();
} else {
// InitialPositionInStream.AT_TIMESTAMP is not supported.
// Use InitialPosition.atTimestamp(timestamp) instead.
throw new UnsupportedOperationException(
"Only InitialPositionInStream.LATEST and InitialPositionInStream.TRIM_HORIZON " +
"supported in initialPositionInStream(). Please use the initialPosition() from " +
"builder API in KinesisInputDStream for using InitialPositionInStream.AT_TIMESTAMP");
}
}
}

View file

@ -24,7 +24,6 @@ import scala.util.Random
import com.amazonaws.auth.DefaultAWSCredentialsProviderChain
import com.amazonaws.services.kinesis.AmazonKinesisClient
import com.amazonaws.services.kinesis.clientlibrary.lib.worker.InitialPositionInStream
import com.amazonaws.services.kinesis.model.PutRecordRequest
import org.apache.log4j.{Level, Logger}
@ -33,9 +32,9 @@ import org.apache.spark.internal.Logging
import org.apache.spark.storage.StorageLevel
import org.apache.spark.streaming.{Milliseconds, StreamingContext}
import org.apache.spark.streaming.dstream.DStream.toPairDStreamFunctions
import org.apache.spark.streaming.kinesis.KinesisInitialPositions.Latest
import org.apache.spark.streaming.kinesis.KinesisInputDStream
/**
* Consumes messages from a Amazon Kinesis streams and does wordcount.
*
@ -139,7 +138,7 @@ object KinesisWordCountASL extends Logging {
.streamName(streamName)
.endpointUrl(endpointUrl)
.regionName(regionName)
.initialPositionInStream(InitialPositionInStream.LATEST)
.initialPosition(new Latest())
.checkpointAppName(appName)
.checkpointInterval(kinesisCheckpointInterval)
.storageLevel(StorageLevel.MEMORY_AND_DISK_2)

View file

@ -28,6 +28,7 @@ import org.apache.spark.storage.{BlockId, StorageLevel}
import org.apache.spark.streaming.{Duration, StreamingContext, Time}
import org.apache.spark.streaming.api.java.JavaStreamingContext
import org.apache.spark.streaming.dstream.ReceiverInputDStream
import org.apache.spark.streaming.kinesis.KinesisInitialPositions.Latest
import org.apache.spark.streaming.receiver.Receiver
import org.apache.spark.streaming.scheduler.ReceivedBlockInfo
@ -36,7 +37,7 @@ private[kinesis] class KinesisInputDStream[T: ClassTag](
val streamName: String,
val endpointUrl: String,
val regionName: String,
val initialPositionInStream: InitialPositionInStream,
val initialPosition: KinesisInitialPosition,
val checkpointAppName: String,
val checkpointInterval: Duration,
val _storageLevel: StorageLevel,
@ -77,7 +78,7 @@ private[kinesis] class KinesisInputDStream[T: ClassTag](
}
override def getReceiver(): Receiver[T] = {
new KinesisReceiver(streamName, endpointUrl, regionName, initialPositionInStream,
new KinesisReceiver(streamName, endpointUrl, regionName, initialPosition,
checkpointAppName, checkpointInterval, _storageLevel, messageHandler,
kinesisCreds, dynamoDBCreds, cloudWatchCreds)
}
@ -100,7 +101,7 @@ object KinesisInputDStream {
// Params with defaults
private var endpointUrl: Option[String] = None
private var regionName: Option[String] = None
private var initialPositionInStream: Option[InitialPositionInStream] = None
private var initialPosition: Option[KinesisInitialPosition] = None
private var checkpointInterval: Option[Duration] = None
private var storageLevel: Option[StorageLevel] = None
private var kinesisCredsProvider: Option[SparkAWSCredentials] = None
@ -180,16 +181,32 @@ object KinesisInputDStream {
this
}
/**
* Sets the initial position data is read from in the Kinesis stream. Defaults to
* [[KinesisInitialPositions.Latest]] if no custom value is specified.
*
* @param initialPosition [[KinesisInitialPosition]] value specifying where Spark Streaming
* will start reading records in the Kinesis stream from
* @return Reference to this [[KinesisInputDStream.Builder]]
*/
def initialPosition(initialPosition: KinesisInitialPosition): Builder = {
this.initialPosition = Option(initialPosition)
this
}
/**
* Sets the initial position data is read from in the Kinesis stream. Defaults to
* [[InitialPositionInStream.LATEST]] if no custom value is specified.
* This function would be removed when we deprecate the KinesisUtils.
*
* @param initialPosition InitialPositionInStream value specifying where Spark Streaming
* will start reading records in the Kinesis stream from
* @return Reference to this [[KinesisInputDStream.Builder]]
*/
@deprecated("use initialPosition(initialPosition: KinesisInitialPosition)", "2.3.0")
def initialPositionInStream(initialPosition: InitialPositionInStream): Builder = {
initialPositionInStream = Option(initialPosition)
this.initialPosition = Option(
KinesisInitialPositions.fromKinesisInitialPosition(initialPosition))
this
}
@ -266,7 +283,7 @@ object KinesisInputDStream {
getRequiredParam(streamName, "streamName"),
endpointUrl.getOrElse(DEFAULT_KINESIS_ENDPOINT_URL),
regionName.getOrElse(DEFAULT_KINESIS_REGION_NAME),
initialPositionInStream.getOrElse(DEFAULT_INITIAL_POSITION_IN_STREAM),
initialPosition.getOrElse(DEFAULT_INITIAL_POSITION),
getRequiredParam(checkpointAppName, "checkpointAppName"),
checkpointInterval.getOrElse(ssc.graph.batchDuration),
storageLevel.getOrElse(DEFAULT_STORAGE_LEVEL),
@ -293,7 +310,6 @@ object KinesisInputDStream {
* Creates a [[KinesisInputDStream.Builder]] for constructing [[KinesisInputDStream]] instances.
*
* @since 2.2.0
*
* @return [[KinesisInputDStream.Builder]] instance
*/
def builder: Builder = new Builder
@ -309,7 +325,6 @@ object KinesisInputDStream {
private[kinesis] val DEFAULT_KINESIS_ENDPOINT_URL: String =
"https://kinesis.us-east-1.amazonaws.com"
private[kinesis] val DEFAULT_KINESIS_REGION_NAME: String = "us-east-1"
private[kinesis] val DEFAULT_INITIAL_POSITION_IN_STREAM: InitialPositionInStream =
InitialPositionInStream.LATEST
private[kinesis] val DEFAULT_INITIAL_POSITION: KinesisInitialPosition = new Latest()
private[kinesis] val DEFAULT_STORAGE_LEVEL: StorageLevel = StorageLevel.MEMORY_AND_DISK_2
}

View file

@ -24,12 +24,13 @@ import scala.collection.mutable
import scala.util.control.NonFatal
import com.amazonaws.services.kinesis.clientlibrary.interfaces.{IRecordProcessor, IRecordProcessorCheckpointer, IRecordProcessorFactory}
import com.amazonaws.services.kinesis.clientlibrary.lib.worker.{InitialPositionInStream, KinesisClientLibConfiguration, Worker}
import com.amazonaws.services.kinesis.clientlibrary.lib.worker.{KinesisClientLibConfiguration, Worker}
import com.amazonaws.services.kinesis.model.Record
import org.apache.spark.internal.Logging
import org.apache.spark.storage.{StorageLevel, StreamBlockId}
import org.apache.spark.streaming.Duration
import org.apache.spark.streaming.kinesis.KinesisInitialPositions.AtTimestamp
import org.apache.spark.streaming.receiver.{BlockGenerator, BlockGeneratorListener, Receiver}
import org.apache.spark.util.Utils
@ -56,12 +57,13 @@ import org.apache.spark.util.Utils
* @param endpointUrl Url of Kinesis service (e.g., https://kinesis.us-east-1.amazonaws.com)
* @param regionName Region name used by the Kinesis Client Library for
* DynamoDB (lease coordination and checkpointing) and CloudWatch (metrics)
* @param initialPositionInStream In the absence of Kinesis checkpoint info, this is the
* worker's initial starting position in the stream.
* The values are either the beginning of the stream
* per Kinesis' limit of 24 hours
* (InitialPositionInStream.TRIM_HORIZON) or
* the tip of the stream (InitialPositionInStream.LATEST).
* @param initialPosition Instance of [[KinesisInitialPosition]]
* In the absence of Kinesis checkpoint info, this is the
* worker's initial starting position in the stream.
* The values are either the beginning of the stream
* per Kinesis' limit of 24 hours
* ([[KinesisInitialPositions.TrimHorizon]]) or
* the tip of the stream ([[KinesisInitialPositions.Latest]]).
* @param checkpointAppName Kinesis application name. Kinesis Apps are mapped to Kinesis Streams
* by the Kinesis Client Library. If you change the App name or Stream name,
* the KCL will throw errors. This usually requires deleting the backing
@ -83,7 +85,7 @@ private[kinesis] class KinesisReceiver[T](
val streamName: String,
endpointUrl: String,
regionName: String,
initialPositionInStream: InitialPositionInStream,
initialPosition: KinesisInitialPosition,
checkpointAppName: String,
checkpointInterval: Duration,
storageLevel: StorageLevel,
@ -148,18 +150,29 @@ private[kinesis] class KinesisReceiver[T](
kinesisCheckpointer = new KinesisCheckpointer(receiver, checkpointInterval, workerId)
val kinesisProvider = kinesisCreds.provider
val kinesisClientLibConfiguration = new KinesisClientLibConfiguration(
checkpointAppName,
streamName,
kinesisProvider,
dynamoDBCreds.map(_.provider).getOrElse(kinesisProvider),
cloudWatchCreds.map(_.provider).getOrElse(kinesisProvider),
workerId)
val kinesisClientLibConfiguration = {
val baseClientLibConfiguration = new KinesisClientLibConfiguration(
checkpointAppName,
streamName,
kinesisProvider,
dynamoDBCreds.map(_.provider).getOrElse(kinesisProvider),
cloudWatchCreds.map(_.provider).getOrElse(kinesisProvider),
workerId)
.withKinesisEndpoint(endpointUrl)
.withInitialPositionInStream(initialPositionInStream)
.withInitialPositionInStream(initialPosition.getPosition)
.withTaskBackoffTimeMillis(500)
.withRegionName(regionName)
// Update the Kinesis client lib config with timestamp
// if InitialPositionInStream.AT_TIMESTAMP is passed
initialPosition match {
case ts: AtTimestamp =>
baseClientLibConfiguration.withTimestampAtInitialPositionInStream(ts.getTimestamp)
case _ => baseClientLibConfiguration
}
}
/*
* RecordProcessorFactory creates impls of IRecordProcessor.
* IRecordProcessor adapts the KCL to our Spark KinesisReceiver via the

View file

@ -73,7 +73,8 @@ object KinesisUtils {
// Setting scope to override receiver stream's scope of "receiver stream"
ssc.withNamedScope("kinesis stream") {
new KinesisInputDStream[T](ssc, streamName, endpointUrl, validateRegion(regionName),
initialPositionInStream, kinesisAppName, checkpointInterval, storageLevel,
KinesisInitialPositions.fromKinesisInitialPosition(initialPositionInStream),
kinesisAppName, checkpointInterval, storageLevel,
cleanedHandler, DefaultCredentials, None, None)
}
}
@ -129,7 +130,8 @@ object KinesisUtils {
awsAccessKeyId = awsAccessKeyId,
awsSecretKey = awsSecretKey)
new KinesisInputDStream[T](ssc, streamName, endpointUrl, validateRegion(regionName),
initialPositionInStream, kinesisAppName, checkpointInterval, storageLevel,
KinesisInitialPositions.fromKinesisInitialPosition(initialPositionInStream),
kinesisAppName, checkpointInterval, storageLevel,
cleanedHandler, kinesisCredsProvider, None, None)
}
}
@ -198,7 +200,8 @@ object KinesisUtils {
awsAccessKeyId = awsAccessKeyId,
awsSecretKey = awsSecretKey))
new KinesisInputDStream[T](ssc, streamName, endpointUrl, validateRegion(regionName),
initialPositionInStream, kinesisAppName, checkpointInterval, storageLevel,
KinesisInitialPositions.fromKinesisInitialPosition(initialPositionInStream),
kinesisAppName, checkpointInterval, storageLevel,
cleanedHandler, kinesisCredsProvider, None, None)
}
}
@ -243,7 +246,8 @@ object KinesisUtils {
// Setting scope to override receiver stream's scope of "receiver stream"
ssc.withNamedScope("kinesis stream") {
new KinesisInputDStream[Array[Byte]](ssc, streamName, endpointUrl, validateRegion(regionName),
initialPositionInStream, kinesisAppName, checkpointInterval, storageLevel,
KinesisInitialPositions.fromKinesisInitialPosition(initialPositionInStream),
kinesisAppName, checkpointInterval, storageLevel,
KinesisInputDStream.defaultMessageHandler, DefaultCredentials, None, None)
}
}
@ -293,7 +297,8 @@ object KinesisUtils {
awsAccessKeyId = awsAccessKeyId,
awsSecretKey = awsSecretKey)
new KinesisInputDStream[Array[Byte]](ssc, streamName, endpointUrl, validateRegion(regionName),
initialPositionInStream, kinesisAppName, checkpointInterval, storageLevel,
KinesisInitialPositions.fromKinesisInitialPosition(initialPositionInStream),
kinesisAppName, checkpointInterval, storageLevel,
KinesisInputDStream.defaultMessageHandler, kinesisCredsProvider, None, None)
}
}

View file

@ -17,14 +17,13 @@
package org.apache.spark.streaming.kinesis;
import org.junit.Test;
import com.amazonaws.services.kinesis.clientlibrary.lib.worker.InitialPositionInStream;
import org.apache.spark.streaming.kinesis.KinesisInitialPositions.TrimHorizon;
import org.apache.spark.storage.StorageLevel;
import org.apache.spark.streaming.Duration;
import org.apache.spark.streaming.Seconds;
import org.apache.spark.streaming.LocalJavaStreamingContext;
import org.apache.spark.streaming.Seconds;
import org.junit.Test;
public class JavaKinesisInputDStreamBuilderSuite extends LocalJavaStreamingContext {
/**
@ -35,7 +34,7 @@ public class JavaKinesisInputDStreamBuilderSuite extends LocalJavaStreamingConte
String streamName = "a-very-nice-stream-name";
String endpointUrl = "https://kinesis.us-west-2.amazonaws.com";
String region = "us-west-2";
InitialPositionInStream initialPosition = InitialPositionInStream.TRIM_HORIZON;
KinesisInitialPosition initialPosition = new TrimHorizon();
String appName = "a-very-nice-kinesis-app";
Duration checkpointInterval = Seconds.apply(30);
StorageLevel storageLevel = StorageLevel.MEMORY_ONLY();
@ -45,7 +44,7 @@ public class JavaKinesisInputDStreamBuilderSuite extends LocalJavaStreamingConte
.streamName(streamName)
.endpointUrl(endpointUrl)
.regionName(region)
.initialPositionInStream(initialPosition)
.initialPosition(initialPosition)
.checkpointAppName(appName)
.checkpointInterval(checkpointInterval)
.storageLevel(storageLevel)
@ -53,7 +52,41 @@ public class JavaKinesisInputDStreamBuilderSuite extends LocalJavaStreamingConte
assert(kinesisDStream.streamName() == streamName);
assert(kinesisDStream.endpointUrl() == endpointUrl);
assert(kinesisDStream.regionName() == region);
assert(kinesisDStream.initialPositionInStream() == initialPosition);
assert(kinesisDStream.initialPosition().getPosition() == initialPosition.getPosition());
assert(kinesisDStream.checkpointAppName() == appName);
assert(kinesisDStream.checkpointInterval() == checkpointInterval);
assert(kinesisDStream._storageLevel() == storageLevel);
ssc.stop();
}
/**
* Test to ensure that the old API for InitialPositionInStream
* is supported in KinesisDStream.Builder.
* This test would be removed when we deprecate the KinesisUtils.
*/
@Test
public void testJavaKinesisDStreamBuilderOldApi() {
String streamName = "a-very-nice-stream-name";
String endpointUrl = "https://kinesis.us-west-2.amazonaws.com";
String region = "us-west-2";
String appName = "a-very-nice-kinesis-app";
Duration checkpointInterval = Seconds.apply(30);
StorageLevel storageLevel = StorageLevel.MEMORY_ONLY();
KinesisInputDStream<byte[]> kinesisDStream = KinesisInputDStream.builder()
.streamingContext(ssc)
.streamName(streamName)
.endpointUrl(endpointUrl)
.regionName(region)
.initialPositionInStream(InitialPositionInStream.LATEST)
.checkpointAppName(appName)
.checkpointInterval(checkpointInterval)
.storageLevel(storageLevel)
.build();
assert(kinesisDStream.streamName() == streamName);
assert(kinesisDStream.endpointUrl() == endpointUrl);
assert(kinesisDStream.regionName() == region);
assert(kinesisDStream.initialPosition().getPosition() == InitialPositionInStream.LATEST);
assert(kinesisDStream.checkpointAppName() == appName);
assert(kinesisDStream.checkpointInterval() == checkpointInterval);
assert(kinesisDStream._storageLevel() == storageLevel);

View file

@ -17,12 +17,15 @@
package org.apache.spark.streaming.kinesis
import java.util.Calendar
import com.amazonaws.services.kinesis.clientlibrary.lib.worker.InitialPositionInStream
import org.scalatest.BeforeAndAfterEach
import org.scalatest.mockito.MockitoSugar
import org.apache.spark.storage.StorageLevel
import org.apache.spark.streaming.{Seconds, StreamingContext, TestSuiteBase}
import org.apache.spark.streaming.{Duration, Seconds, StreamingContext, TestSuiteBase}
import org.apache.spark.streaming.kinesis.KinesisInitialPositions.{AtTimestamp, TrimHorizon}
class KinesisInputDStreamBuilderSuite extends TestSuiteBase with BeforeAndAfterEach
with MockitoSugar {
@ -69,7 +72,7 @@ class KinesisInputDStreamBuilderSuite extends TestSuiteBase with BeforeAndAfterE
val dstream = builder.build()
assert(dstream.endpointUrl == DEFAULT_KINESIS_ENDPOINT_URL)
assert(dstream.regionName == DEFAULT_KINESIS_REGION_NAME)
assert(dstream.initialPositionInStream == DEFAULT_INITIAL_POSITION_IN_STREAM)
assert(dstream.initialPosition == DEFAULT_INITIAL_POSITION)
assert(dstream.checkpointInterval == batchDuration)
assert(dstream._storageLevel == DEFAULT_STORAGE_LEVEL)
assert(dstream.kinesisCreds == DefaultCredentials)
@ -80,7 +83,7 @@ class KinesisInputDStreamBuilderSuite extends TestSuiteBase with BeforeAndAfterE
test("should propagate custom non-auth values to KinesisInputDStream") {
val customEndpointUrl = "https://kinesis.us-west-2.amazonaws.com"
val customRegion = "us-west-2"
val customInitialPosition = InitialPositionInStream.TRIM_HORIZON
val customInitialPosition = new TrimHorizon()
val customAppName = "a-very-nice-kinesis-app"
val customCheckpointInterval = Seconds(30)
val customStorageLevel = StorageLevel.MEMORY_ONLY
@ -91,7 +94,7 @@ class KinesisInputDStreamBuilderSuite extends TestSuiteBase with BeforeAndAfterE
val dstream = builder
.endpointUrl(customEndpointUrl)
.regionName(customRegion)
.initialPositionInStream(customInitialPosition)
.initialPosition(customInitialPosition)
.checkpointAppName(customAppName)
.checkpointInterval(customCheckpointInterval)
.storageLevel(customStorageLevel)
@ -101,12 +104,67 @@ class KinesisInputDStreamBuilderSuite extends TestSuiteBase with BeforeAndAfterE
.build()
assert(dstream.endpointUrl == customEndpointUrl)
assert(dstream.regionName == customRegion)
assert(dstream.initialPositionInStream == customInitialPosition)
assert(dstream.initialPosition == customInitialPosition)
assert(dstream.checkpointAppName == customAppName)
assert(dstream.checkpointInterval == customCheckpointInterval)
assert(dstream._storageLevel == customStorageLevel)
assert(dstream.kinesisCreds == customKinesisCreds)
assert(dstream.dynamoDBCreds == Option(customDynamoDBCreds))
assert(dstream.cloudWatchCreds == Option(customCloudWatchCreds))
// Testing with AtTimestamp
val cal = Calendar.getInstance()
cal.add(Calendar.DATE, -1)
val timestamp = cal.getTime()
val initialPositionAtTimestamp = new AtTimestamp(timestamp)
val dstreamAtTimestamp = builder
.endpointUrl(customEndpointUrl)
.regionName(customRegion)
.initialPosition(initialPositionAtTimestamp)
.checkpointAppName(customAppName)
.checkpointInterval(customCheckpointInterval)
.storageLevel(customStorageLevel)
.kinesisCredentials(customKinesisCreds)
.dynamoDBCredentials(customDynamoDBCreds)
.cloudWatchCredentials(customCloudWatchCreds)
.build()
assert(dstreamAtTimestamp.endpointUrl == customEndpointUrl)
assert(dstreamAtTimestamp.regionName == customRegion)
assert(dstreamAtTimestamp.initialPosition.getPosition
== initialPositionAtTimestamp.getPosition)
assert(
dstreamAtTimestamp.initialPosition.asInstanceOf[AtTimestamp].getTimestamp.equals(timestamp))
assert(dstreamAtTimestamp.checkpointAppName == customAppName)
assert(dstreamAtTimestamp.checkpointInterval == customCheckpointInterval)
assert(dstreamAtTimestamp._storageLevel == customStorageLevel)
assert(dstreamAtTimestamp.kinesisCreds == customKinesisCreds)
assert(dstreamAtTimestamp.dynamoDBCreds == Option(customDynamoDBCreds))
assert(dstreamAtTimestamp.cloudWatchCreds == Option(customCloudWatchCreds))
}
test("old Api should throw UnsupportedOperationExceptionexception with AT_TIMESTAMP") {
val streamName: String = "a-very-nice-stream-name"
val endpointUrl: String = "https://kinesis.us-west-2.amazonaws.com"
val region: String = "us-west-2"
val appName: String = "a-very-nice-kinesis-app"
val checkpointInterval: Duration = Seconds.apply(30)
val storageLevel: StorageLevel = StorageLevel.MEMORY_ONLY
// This should not build.
// InitialPositionInStream.AT_TIMESTAMP is not supported in old Api.
// The builder Api in KinesisInputDStream should be used.
intercept[UnsupportedOperationException] {
val kinesisDStream: KinesisInputDStream[Array[Byte]] = KinesisInputDStream.builder
.streamingContext(ssc)
.streamName(streamName)
.endpointUrl(endpointUrl)
.regionName(region)
.initialPositionInStream(InitialPositionInStream.AT_TIMESTAMP)
.checkpointAppName(appName)
.checkpointInterval(checkpointInterval)
.storageLevel(storageLevel)
.build
}
}
}

View file

@ -34,6 +34,7 @@ import org.apache.spark.rdd.RDD
import org.apache.spark.storage.{StorageLevel, StreamBlockId}
import org.apache.spark.streaming._
import org.apache.spark.streaming.dstream.ReceiverInputDStream
import org.apache.spark.streaming.kinesis.KinesisInitialPositions.Latest
import org.apache.spark.streaming.kinesis.KinesisReadConfigurations._
import org.apache.spark.streaming.kinesis.KinesisTestUtils._
import org.apache.spark.streaming.receiver.BlockManagerBasedStoreResult
@ -178,7 +179,7 @@ abstract class KinesisStreamTests(aggregateTestData: Boolean) extends KinesisFun
.streamName(testUtils.streamName)
.endpointUrl(testUtils.endpointUrl)
.regionName(testUtils.regionName)
.initialPositionInStream(InitialPositionInStream.LATEST)
.initialPosition(new Latest())
.checkpointInterval(Seconds(10))
.storageLevel(StorageLevel.MEMORY_ONLY)
.build()
@ -209,7 +210,7 @@ abstract class KinesisStreamTests(aggregateTestData: Boolean) extends KinesisFun
.streamName(testUtils.streamName)
.endpointUrl(testUtils.endpointUrl)
.regionName(testUtils.regionName)
.initialPositionInStream(InitialPositionInStream.LATEST)
.initialPosition(new Latest())
.checkpointInterval(Seconds(10))
.storageLevel(StorageLevel.MEMORY_ONLY)
.buildWithMessageHandler(addFive(_))
@ -245,7 +246,7 @@ abstract class KinesisStreamTests(aggregateTestData: Boolean) extends KinesisFun
.streamName("dummyStream")
.endpointUrl(dummyEndpointUrl)
.regionName(dummyRegionName)
.initialPositionInStream(InitialPositionInStream.LATEST)
.initialPosition(new Latest())
.checkpointInterval(Seconds(10))
.storageLevel(StorageLevel.MEMORY_ONLY)
.build()
@ -293,7 +294,7 @@ abstract class KinesisStreamTests(aggregateTestData: Boolean) extends KinesisFun
.streamName(localTestUtils.streamName)
.endpointUrl(localTestUtils.endpointUrl)
.regionName(localTestUtils.regionName)
.initialPositionInStream(InitialPositionInStream.LATEST)
.initialPosition(new Latest())
.checkpointInterval(Seconds(10))
.storageLevel(StorageLevel.MEMORY_ONLY)
.build()
@ -369,7 +370,7 @@ abstract class KinesisStreamTests(aggregateTestData: Boolean) extends KinesisFun
.streamName(testUtils.streamName)
.endpointUrl(testUtils.endpointUrl)
.regionName(testUtils.regionName)
.initialPositionInStream(InitialPositionInStream.LATEST)
.initialPosition(new Latest())
.checkpointInterval(Seconds(10))
.storageLevel(StorageLevel.MEMORY_ONLY)
.build()