spark-instrumented-optimizer/docs/streaming-kinesis.md
Chris Fregly 91f9504e60 [SPARK-1981] Add AWS Kinesis streaming support
Author: Chris Fregly <chris@fregly.com>

Closes #1434 from cfregly/master and squashes the following commits:

4774581 [Chris Fregly] updated docs, renamed retry to retryRandom to be more clear, removed retries around store() method
0393795 [Chris Fregly] moved Kinesis examples out of examples/ and back into extras/kinesis-asl
691a6be [Chris Fregly] fixed tests and formatting, fixed a bug with JavaKinesisWordCount during union of streams
0e1c67b [Chris Fregly] Merge remote-tracking branch 'upstream/master'
74e5c7c [Chris Fregly] updated per TD's feedback.  simplified examples, updated docs
e33cbeb [Chris Fregly] Merge remote-tracking branch 'upstream/master'
bf614e9 [Chris Fregly] per matei's feedback:  moved the kinesis examples into the examples/ dir
d17ca6d [Chris Fregly] per TD's feedback:  updated docs, simplified the KinesisUtils api
912640c [Chris Fregly] changed the foundKinesis class to be a publically-avail class
db3eefd [Chris Fregly] Merge remote-tracking branch 'upstream/master'
21de67f [Chris Fregly] Merge remote-tracking branch 'upstream/master'
6c39561 [Chris Fregly] parameterized the versions of the aws java sdk and kinesis client
338997e [Chris Fregly] improve build docs for kinesis
828f8ae [Chris Fregly] more cleanup
e7c8978 [Chris Fregly] Merge remote-tracking branch 'upstream/master'
cd68c0d [Chris Fregly] fixed typos and backward compatibility
d18e680 [Chris Fregly] Merge remote-tracking branch 'upstream/master'
b3b0ff1 [Chris Fregly] [SPARK-1981] Add AWS Kinesis streaming support
2014-08-02 13:35:35 -07:00

5.1 KiB

layout title
global Spark Streaming Kinesis Receiver

Kinesis

Build notes:

  • Spark supports a Kinesis Streaming Receiver which is not included in the default build due to licensing restrictions.
  • _**Note that by embedding this library you will include [ASL](https://aws.amazon.com/asl/)-licensed code in your Spark package**_.
  • The Spark Kinesis Streaming Receiver source code, examples, tests, and artifacts live in $SPARK_HOME/extras/kinesis-asl.
  • To build with Kinesis, you must run the maven or sbt builds with -Pkinesis-asl`.
  • Applications will need to link to the 'spark-streaming-kinesis-asl` artifact.
  • Kinesis examples notes:

  • To build the Kinesis examples, you must run the maven or sbt builds with -Pkinesis-asl`.
  • These examples automatically determine the number of local threads and KinesisReceivers to spin up based on the number of shards for the stream.
  • KinesisWordCountProducerASL will generate random data to put onto the Kinesis stream for testing.
  • Checkpointing is disabled (no checkpoint dir is set). The examples as written will not recover from a driver failure.
  • Deployment and runtime notes:

  • A single KinesisReceiver can process many shards of a stream.
  • Each shard of a stream is processed by one or more KinesisReceiver's managed by the Kinesis Client Library (KCL) Worker.
  • You never need more KinesisReceivers than the number of shards in your stream.
  • You can horizontally scale the receiving by creating more KinesisReceiver/DStreams (up to the number of shards for a given stream)
  • The Kinesis libraries must be present on all worker nodes, as they will need access to the Kinesis Client Library.
  • This code uses the DefaultAWSCredentialsProviderChain and searches for credentials in the following order of precedence:
    1) Environment Variables - AWS_ACCESS_KEY_ID and AWS_SECRET_KEY
    2) Java System Properties - aws.accessKeyId and aws.secretKey
    3) Credential profiles file - default location (~/.aws/credentials) shared by all AWS SDKs
    4) Instance profile credentials - delivered through the Amazon EC2 metadata service
  • You need to setup a Kinesis stream with 1 or more shards per the following:
    http://docs.aws.amazon.com/kinesis/latest/dev/step-one-create-stream.html
  • Valid Kinesis endpoint urls can be found here: Valid endpoint urls: http://docs.aws.amazon.com/general/latest/gr/rande.html#ak_region
  • When you first start up the KinesisReceiver, the Kinesis Client Library (KCL) needs ~30s to establish connectivity with the AWS Kinesis service, retrieve any checkpoint data, and negotiate with other KCL's reading from the same stream.
  • Be careful when changing the app name. Kinesis maintains a mapping table in DynamoDB based on this app name (http://docs.aws.amazon.com/kinesis/latest/dev/kinesis-record-processor-implementation-app.html#kinesis-record-processor-initialization). Changing the app name could lead to Kinesis errors as only 1 logical application can process a stream. In order to start fresh, it's always best to delete the DynamoDB table that matches your app name. This DynamoDB table lives in us-east-1 regardless of the Kinesis endpoint URL.
  • Failure recovery notes:

  • The combination of Spark Streaming and Kinesis creates 3 different checkpoints as follows:
    1) RDD data checkpoint (Spark Streaming) - frequency is configurable with DStream.checkpoint(Duration)
    2) RDD metadata checkpoint (Spark Streaming) - frequency is every DStream batch
    3) Kinesis checkpointing (Kinesis) - frequency is controlled by the developer calling ICheckpointer.checkpoint() directly
  • Checkpointing too frequently will cause excess load on the AWS checkpoint storage layer and may lead to AWS throttling
  • Upon startup, a KinesisReceiver will begin processing records with sequence numbers greater than the last checkpoint sequence number recorded per shard.
  • If no checkpoint info exists, the worker will start either from the oldest record available (InitialPositionInStream.TRIM_HORIZON) or from the tip/latest (InitialPostitionInStream.LATEST). This is configurable.
  • When pulling from the stream tip (InitialPositionInStream.LATEST), only new stream data will be picked up after the KinesisReceiver starts.
  • InitialPositionInStream.LATEST could lead to missed records if data is added to the stream while no KinesisReceivers are running.
  • In production, you'll want to switch to InitialPositionInStream.TRIM_HORIZON which will read up to 24 hours (Kinesis limit) of previous stream data depending on the checkpoint frequency.
  • InitialPositionInStream.TRIM_HORIZON may lead to duplicate processing of records depending on the checkpoint frequency.
  • Record processing should be idempotent when possible.
  • Failed or latent KinesisReceivers will be detected and automatically shutdown/load-balanced by the KCL.
  • If possible, explicitly shutdown the worker if a failure occurs in order to trigger the final checkpoint.