spark-instrumented-optimizer/docs/streaming-kafka-integration.md

60 lines
4 KiB
Markdown
Raw Normal View History

---
layout: global
title: Spark Streaming + Kafka Integration Guide
---
[Apache Kafka](http://kafka.apache.org/) is publish-subscribe messaging rethought as a distributed, partitioned, replicated commit log service. Here we explain how to configure Spark Streaming to receive data from Kafka.
1. **Linking:** In your SBT/Maven project definition, link your streaming application against the following artifact (see [Linking section](streaming-programming-guide.html#linking) in the main programming guide for further information).
groupId = org.apache.spark
artifactId = spark-streaming-kafka_{{site.SCALA_BINARY_VERSION}}
version = {{site.SPARK_VERSION_SHORT}}
2. **Programming:** In the streaming application code, import `KafkaUtils` and create input DStream as follows.
<div class="codetabs">
<div data-lang="scala" markdown="1">
import org.apache.spark.streaming.kafka._
val kafkaStream = KafkaUtils.createStream(
streamingContext, [zookeeperQuorum], [group id of the consumer], [per-topic number of Kafka partitions to consume])
See the [API docs](api/scala/index.html#org.apache.spark.streaming.kafka.KafkaUtils$)
and the [example]({{site.SPARK_GITHUB_URL}}/blob/master/examples/scala-2.10/src/main/scala/org/apache/spark/examples/streaming/KafkaWordCount.scala).
</div>
<div data-lang="java" markdown="1">
import org.apache.spark.streaming.kafka.*;
JavaPairReceiverInputDStream<String, String> kafkaStream = KafkaUtils.createStream(
streamingContext, [zookeeperQuorum], [group id of the consumer], [per-topic number of Kafka partitions to consume]);
See the [API docs](api/java/index.html?org/apache/spark/streaming/kafka/KafkaUtils.html)
and the [example]({{site.SPARK_GITHUB_URL}}/blob/master/examples/scala-2.10/src/main/java/org/apache/spark/examples/streaming/JavaKafkaWordCount.java).
</div>
</div>
*Points to remember:*
- Topic partitions in Kafka does not correlate to partitions of RDDs generated in Spark Streaming. So increasing the number of topic-specific partitions in the `KafkaUtils.createStream()` only increases the number of threads using which topics that are consumed within a single receiver. It does not increase the parallelism of Spark in processing the data. Refer to the main document for more information on that.
- Multiple Kafka input DStreams can be created with different groups and topics for parallel receiving of data using multiple receivers.
3. **Deploying:** Package `spark-streaming-kafka_{{site.SCALA_BINARY_VERSION}}` and its dependencies (except `spark-core_{{site.SCALA_BINARY_VERSION}}` and `spark-streaming_{{site.SCALA_BINARY_VERSION}}` which are provided by `spark-submit`) into the application JAR. Then use `spark-submit` to launch your application (see [Deploying section](streaming-programming-guide.html#deploying-applications) in the main programming guide).
[SPARK-4806] Streaming doc update for 1.2 Important updates to the streaming programming guide - Make the fault-tolerance properties easier to understand, with information about write ahead logs - Update the information about deploying the spark streaming app with information about Driver HA - Update Receiver guide to discuss reliable vs unreliable receivers. Author: Tathagata Das <tathagata.das1565@gmail.com> Author: Josh Rosen <joshrosen@databricks.com> Author: Josh Rosen <rosenville@gmail.com> Closes #3653 from tdas/streaming-doc-update-1.2 and squashes the following commits: f53154a [Tathagata Das] Addressed Josh's comments. ce299e4 [Tathagata Das] Minor update. ca19078 [Tathagata Das] Minor change f746951 [Tathagata Das] Mentioned performance problem with WAL 7787209 [Tathagata Das] Merge branch 'streaming-doc-update-1.2' of github.com:tdas/spark into streaming-doc-update-1.2 2184729 [Tathagata Das] Updated Kafka and Flume guides with reliability information. 2f3178c [Tathagata Das] Added more information about writing reliable receivers in the custom receiver guide. 91aa5aa [Tathagata Das] Improved API Docs menu 5707581 [Tathagata Das] Added Pythn API badge b9c8c24 [Tathagata Das] Merge pull request #26 from JoshRosen/streaming-programming-guide b8c8382 [Josh Rosen] minor fixes a4ef126 [Josh Rosen] Restructure parts of the fault-tolerance section to read a bit nicer when skipping over the headings 65f66cd [Josh Rosen] Fix broken link to fault-tolerance semantics section. f015397 [Josh Rosen] Minor grammar / pluralization fixes. 3019f3a [Josh Rosen] Fix minor Markdown formatting issues aa8bb87 [Tathagata Das] Small update. 195852c [Tathagata Das] Updated based on Josh's comments, updated receiver reliability and deploying section, and also updated configuration. 17b99fb [Tathagata Das] Merge remote-tracking branch 'apache-github/master' into streaming-doc-update-1.2 a0217c0 [Tathagata Das] Changed Deploying menu layout 67fcffc [Tathagata Das] Added cluster mode + supervise example to submitting application guide. e45453b [Tathagata Das] Update streaming guide, added deploying section. 192c7a7 [Tathagata Das] Added more info about Python API, and rewrote the checkpointing section.
2014-12-11 09:21:23 -05:00
Note that the Kafka receiver used by default is an
[*unreliable* receiver](streaming-programming-guide.html#receiver-reliability) section in the
programming guide). In Spark 1.2, we have added an experimental *reliable* Kafka receiver that
provides stronger
[fault-tolerance guarantees](streaming-programming-guide.html#fault-tolerance-semantics) of zero
data loss on failures. This receiver is automatically used when the write ahead log
(also introduced in Spark 1.2) is enabled
(see [Deployment](#deploying-applications.html) section in the programming guide). This
may reduce the receiving throughput of individual Kafka receivers compared to the unreliable
receivers, but this can be corrected by running
[more receivers in parallel](streaming-programming-guide.html#level-of-parallelism-in-data-receiving)
to increase aggregate throughput. Additionally, it is recommended that the replication of the
received data within Spark be disabled when the write ahead log is enabled as the log is already stored
in a replicated storage system. This can be done by setting the storage level for the input
stream to `StorageLevel.MEMORY_AND_DISK_SER` (that is, use
`KafkaUtils.createStream(..., StorageLevel.MEMORY_AND_DISK_SER)`).