[SPARK-19906][SS][DOCS] Documentation describing how to write queries to Kafka

## What changes were proposed in this pull request?

Add documentation that describes how to write streaming and batch queries to Kafka.

zsxwing tdas

Please review http://spark.apache.org/contributing.html before opening a pull request.

Author: Tyson Condie <tcondie@gmail.com>

Closes #17246 from tcondie/kafka-write-docs.
This commit is contained in:
Tyson Condie 2017-03-20 17:18:59 -07:00 committed by Tathagata Das
parent bec6b16c19
commit c2d1761a57

View file

@ -3,9 +3,9 @@ layout: global
title: Structured Streaming + Kafka Integration Guide (Kafka broker version 0.10.0 or higher)
---
Structured Streaming integration for Kafka 0.10 to poll data from Kafka.
Structured Streaming integration for Kafka 0.10 to read data from and write data to Kafka.
### Linking
## Linking
For Scala/Java applications using SBT/Maven project definitions, link your application with the following artifact:
groupId = org.apache.spark
@ -15,40 +15,42 @@ For Scala/Java applications using SBT/Maven project definitions, link your appli
For Python applications, you need to add this above library and its dependencies when deploying your
application. See the [Deploying](#deploying) subsection below.
### Creating a Kafka Source Stream
## Reading Data from Kafka
### Creating a Kafka Source for Streaming Queries
<div class="codetabs">
<div data-lang="scala" markdown="1">
{% highlight scala %}
// Subscribe to 1 topic
val ds1 = spark
val df = spark
.readStream
.format("kafka")
.option("kafka.bootstrap.servers", "host1:port1,host2:port2")
.option("subscribe", "topic1")
.load()
ds1.selectExpr("CAST(key AS STRING)", "CAST(value AS STRING)")
df.selectExpr("CAST(key AS STRING)", "CAST(value AS STRING)")
.as[(String, String)]
// Subscribe to multiple topics
val ds2 = spark
val df = spark
.readStream
.format("kafka")
.option("kafka.bootstrap.servers", "host1:port1,host2:port2")
.option("subscribe", "topic1,topic2")
.load()
ds2.selectExpr("CAST(key AS STRING)", "CAST(value AS STRING)")
df.selectExpr("CAST(key AS STRING)", "CAST(value AS STRING)")
.as[(String, String)]
// Subscribe to a pattern
val ds3 = spark
val df = spark
.readStream
.format("kafka")
.option("kafka.bootstrap.servers", "host1:port1,host2:port2")
.option("subscribePattern", "topic.*")
.load()
ds3.selectExpr("CAST(key AS STRING)", "CAST(value AS STRING)")
df.selectExpr("CAST(key AS STRING)", "CAST(value AS STRING)")
.as[(String, String)]
{% endhighlight %}
@ -57,31 +59,31 @@ ds3.selectExpr("CAST(key AS STRING)", "CAST(value AS STRING)")
{% highlight java %}
// Subscribe to 1 topic
Dataset<Row> ds1 = spark
DataFrame<Row> df = spark
.readStream()
.format("kafka")
.option("kafka.bootstrap.servers", "host1:port1,host2:port2")
.option("subscribe", "topic1")
.load()
ds1.selectExpr("CAST(key AS STRING)", "CAST(value AS STRING)")
df.selectExpr("CAST(key AS STRING)", "CAST(value AS STRING)")
// Subscribe to multiple topics
Dataset<Row> ds2 = spark
DataFrame<Row> df = spark
.readStream()
.format("kafka")
.option("kafka.bootstrap.servers", "host1:port1,host2:port2")
.option("subscribe", "topic1,topic2")
.load()
ds2.selectExpr("CAST(key AS STRING)", "CAST(value AS STRING)")
df.selectExpr("CAST(key AS STRING)", "CAST(value AS STRING)")
// Subscribe to a pattern
Dataset<Row> ds3 = spark
DataFrame<Row> df = spark
.readStream()
.format("kafka")
.option("kafka.bootstrap.servers", "host1:port1,host2:port2")
.option("subscribePattern", "topic.*")
.load()
ds3.selectExpr("CAST(key AS STRING)", "CAST(value AS STRING)")
df.selectExpr("CAST(key AS STRING)", "CAST(value AS STRING)")
{% endhighlight %}
</div>
@ -89,37 +91,37 @@ ds3.selectExpr("CAST(key AS STRING)", "CAST(value AS STRING)")
{% highlight python %}
# Subscribe to 1 topic
ds1 = spark
.readStream
.format("kafka")
.option("kafka.bootstrap.servers", "host1:port1,host2:port2")
.option("subscribe", "topic1")
df = spark \
.readStream \
.format("kafka") \
.option("kafka.bootstrap.servers", "host1:port1,host2:port2") \
.option("subscribe", "topic1") \
.load()
ds1.selectExpr("CAST(key AS STRING)", "CAST(value AS STRING)")
df.selectExpr("CAST(key AS STRING)", "CAST(value AS STRING)")
# Subscribe to multiple topics
ds2 = spark
.readStream
.format("kafka")
.option("kafka.bootstrap.servers", "host1:port1,host2:port2")
.option("subscribe", "topic1,topic2")
df = spark \
.readStream \
.format("kafka") \
.option("kafka.bootstrap.servers", "host1:port1,host2:port2") \
.option("subscribe", "topic1,topic2") \
.load()
ds2.selectExpr("CAST(key AS STRING)", "CAST(value AS STRING)")
df.selectExpr("CAST(key AS STRING)", "CAST(value AS STRING)")
# Subscribe to a pattern
ds3 = spark
.readStream
.format("kafka")
.option("kafka.bootstrap.servers", "host1:port1,host2:port2")
.option("subscribePattern", "topic.*")
df = spark \
.readStream \
.format("kafka") \
.option("kafka.bootstrap.servers", "host1:port1,host2:port2") \
.option("subscribePattern", "topic.*") \
.load()
ds3.selectExpr("CAST(key AS STRING)", "CAST(value AS STRING)")
df.selectExpr("CAST(key AS STRING)", "CAST(value AS STRING)")
{% endhighlight %}
</div>
</div>
### Creating a Kafka Source Batch
### Creating a Kafka Source for Batch Queries
If you have a use case that is better suited to batch processing,
you can create an Dataset/DataFrame for a defined range of offsets.
@ -128,17 +130,17 @@ you can create an Dataset/DataFrame for a defined range of offsets.
{% highlight scala %}
// Subscribe to 1 topic defaults to the earliest and latest offsets
val ds1 = spark
val df = spark
.read
.format("kafka")
.option("kafka.bootstrap.servers", "host1:port1,host2:port2")
.option("subscribe", "topic1")
.load()
ds1.selectExpr("CAST(key AS STRING)", "CAST(value AS STRING)")
df.selectExpr("CAST(key AS STRING)", "CAST(value AS STRING)")
.as[(String, String)]
// Subscribe to multiple topics, specifying explicit Kafka offsets
val ds2 = spark
val df = spark
.read
.format("kafka")
.option("kafka.bootstrap.servers", "host1:port1,host2:port2")
@ -146,11 +148,11 @@ val ds2 = spark
.option("startingOffsets", """{"topic1":{"0":23,"1":-2},"topic2":{"0":-2}}""")
.option("endingOffsets", """{"topic1":{"0":50,"1":-1},"topic2":{"0":-1}}""")
.load()
ds2.selectExpr("CAST(key AS STRING)", "CAST(value AS STRING)")
df.selectExpr("CAST(key AS STRING)", "CAST(value AS STRING)")
.as[(String, String)]
// Subscribe to a pattern, at the earliest and latest offsets
val ds3 = spark
val df = spark
.read
.format("kafka")
.option("kafka.bootstrap.servers", "host1:port1,host2:port2")
@ -158,7 +160,7 @@ val ds3 = spark
.option("startingOffsets", "earliest")
.option("endingOffsets", "latest")
.load()
ds3.selectExpr("CAST(key AS STRING)", "CAST(value AS STRING)")
df.selectExpr("CAST(key AS STRING)", "CAST(value AS STRING)")
.as[(String, String)]
{% endhighlight %}
@ -167,16 +169,16 @@ ds3.selectExpr("CAST(key AS STRING)", "CAST(value AS STRING)")
{% highlight java %}
// Subscribe to 1 topic defaults to the earliest and latest offsets
Dataset<Row> ds1 = spark
DataFrame<Row> df = spark
.read()
.format("kafka")
.option("kafka.bootstrap.servers", "host1:port1,host2:port2")
.option("subscribe", "topic1")
.load();
ds1.selectExpr("CAST(key AS STRING)", "CAST(value AS STRING)");
df.selectExpr("CAST(key AS STRING)", "CAST(value AS STRING)");
// Subscribe to multiple topics, specifying explicit Kafka offsets
Dataset<Row> ds2 = spark
DataFrame<Row> df = spark
.read()
.format("kafka")
.option("kafka.bootstrap.servers", "host1:port1,host2:port2")
@ -184,10 +186,10 @@ Dataset<Row> ds2 = spark
.option("startingOffsets", "{\"topic1\":{\"0\":23,\"1\":-2},\"topic2\":{\"0\":-2}}")
.option("endingOffsets", "{\"topic1\":{\"0\":50,\"1\":-1},\"topic2\":{\"0\":-1}}")
.load();
ds2.selectExpr("CAST(key AS STRING)", "CAST(value AS STRING)");
df.selectExpr("CAST(key AS STRING)", "CAST(value AS STRING)");
// Subscribe to a pattern, at the earliest and latest offsets
Dataset<Row> ds3 = spark
DataFrame<Row> df = spark
.read()
.format("kafka")
.option("kafka.bootstrap.servers", "host1:port1,host2:port2")
@ -195,7 +197,7 @@ Dataset<Row> ds3 = spark
.option("startingOffsets", "earliest")
.option("endingOffsets", "latest")
.load();
ds3.selectExpr("CAST(key AS STRING)", "CAST(value AS STRING)");
df.selectExpr("CAST(key AS STRING)", "CAST(value AS STRING)");
{% endhighlight %}
</div>
@ -203,16 +205,16 @@ ds3.selectExpr("CAST(key AS STRING)", "CAST(value AS STRING)");
{% highlight python %}
# Subscribe to 1 topic defaults to the earliest and latest offsets
ds1 = spark \
df = spark \
.read \
.format("kafka") \
.option("kafka.bootstrap.servers", "host1:port1,host2:port2") \
.option("subscribe", "topic1") \
.load()
ds1.selectExpr("CAST(key AS STRING)", "CAST(value AS STRING)")
df.selectExpr("CAST(key AS STRING)", "CAST(value AS STRING)")
# Subscribe to multiple topics, specifying explicit Kafka offsets
ds2 = spark \
df = spark \
.read \
.format("kafka") \
.option("kafka.bootstrap.servers", "host1:port1,host2:port2") \
@ -220,10 +222,10 @@ ds2 = spark \
.option("startingOffsets", """{"topic1":{"0":23,"1":-2},"topic2":{"0":-2}}""") \
.option("endingOffsets", """{"topic1":{"0":50,"1":-1},"topic2":{"0":-1}}""") \
.load()
ds2.selectExpr("CAST(key AS STRING)", "CAST(value AS STRING)")
df.selectExpr("CAST(key AS STRING)", "CAST(value AS STRING)")
# Subscribe to a pattern, at the earliest and latest offsets
ds3 = spark \
df = spark \
.read \
.format("kafka") \
.option("kafka.bootstrap.servers", "host1:port1,host2:port2") \
@ -231,8 +233,7 @@ ds3 = spark \
.option("startingOffsets", "earliest") \
.option("endingOffsets", "latest") \
.load()
ds3.selectExpr("CAST(key AS STRING)", "CAST(value AS STRING)")
df.selectExpr("CAST(key AS STRING)", "CAST(value AS STRING)")
{% endhighlight %}
</div>
</div>
@ -373,11 +374,213 @@ The following configurations are optional:
</tr>
</table>
Kafka's own configurations can be set via `DataStreamReader.option` with `kafka.` prefix, e.g,
`stream.option("kafka.bootstrap.servers", "host:port")`. For possible kafkaParams, see
[Kafka consumer config docs](http://kafka.apache.org/documentation.html#newconsumerconfigs).
## Writing Data to Kafka
Note that the following Kafka params cannot be set and the Kafka source will throw an exception:
Here, we describe the support for writing Streaming Queries and Batch Queries to Apache Kafka. Take note that
Apache Kafka only supports at least once write semantics. Consequently, when writing---either Streaming Queries
or Batch Queries---to Kafka, some records may be duplicated; this can happen, for example, if Kafka needs
to retry a message that was not acknowledged by a Broker, even though that Broker received and wrote the message record.
Structured Streaming cannot prevent such duplicates from occurring due to these Kafka write semantics. However,
if writing the query is successful, then you can assume that the query output was written at least once. A possible
solution to remove duplicates when reading the written data could be to introduce a primary (unique) key
that can be used to perform de-duplication when reading.
The Dataframe being written to Kafka should have the following columns in schema:
<table class="table">
<tr><th>Column</th><th>Type</th></tr>
<tr>
<td>key (optional)</td>
<td>string or binary</td>
</tr>
<tr>
<td>value (required)</td>
<td>string or binary</td>
</tr>
<tr>
<td>topic (*optional)</td>
<td>string</td>
</tr>
</table>
\* The topic column is required if the "topic" configuration option is not specified.<br>
The value column is the only required option. If a key column is not specified then
a ```null``` valued key column will be automatically added (see Kafka semantics on
how ```null``` valued key values are handled). If a topic column exists then its value
is used as the topic when writing the given row to Kafka, unless the "topic" configuration
option is set i.e., the "topic" configuration option overrides the topic column.
The following options must be set for the Kafka sink
for both batch and streaming queries.
<table class="table">
<tr><th>Option</th><th>value</th><th>meaning</th></tr>
<tr>
<td>kafka.bootstrap.servers</td>
<td>A comma-separated list of host:port</td>
<td>The Kafka "bootstrap.servers" configuration.</td>
</tr>
</table>
The following configurations are optional:
<table class="table">
<tr><th>Option</th><th>value</th><th>default</th><th>query type</th><th>meaning</th></tr>
<tr>
<td>topic</td>
<td>string</td>
<td>none</td>
<td>streaming and batch</td>
<td>Sets the topic that all rows will be written to in Kafka. This option overrides any
topic column that may exist in the data.</td>
</tr>
</table>
### Creating a Kafka Sink for Streaming Queries
<div class="codetabs">
<div data-lang="scala" markdown="1">
{% highlight scala %}
// Write key-value data from a DataFrame to a specific Kafka topic specified in an option
val ds = df
.selectExpr("CAST(key AS STRING)", "CAST(value AS STRING)")
.writeStream
.format("kafka")
.option("kafka.bootstrap.servers", "host1:port1,host2:port2")
.option("topic", "topic1")
.start()
// Write key-value data from a DataFrame to Kafka using a topic specified in the data
val ds = df
.selectExpr("topic", "CAST(key AS STRING)", "CAST(value AS STRING)")
.writeStream
.format("kafka")
.option("kafka.bootstrap.servers", "host1:port1,host2:port2")
.start()
{% endhighlight %}
</div>
<div data-lang="java" markdown="1">
{% highlight java %}
// Write key-value data from a DataFrame to a specific Kafka topic specified in an option
StreamingQuery ds = df
.selectExpr("CAST(key AS STRING)", "CAST(value AS STRING)")
.writeStream()
.format("kafka")
.option("kafka.bootstrap.servers", "host1:port1,host2:port2")
.option("topic", "topic1")
.start()
// Write key-value data from a DataFrame to Kafka using a topic specified in the data
StreamingQuery ds = df
.selectExpr("topic", "CAST(key AS STRING)", "CAST(value AS STRING)")
.writeStream()
.format("kafka")
.option("kafka.bootstrap.servers", "host1:port1,host2:port2")
.start()
{% endhighlight %}
</div>
<div data-lang="python" markdown="1">
{% highlight python %}
# Write key-value data from a DataFrame to a specific Kafka topic specified in an option
ds = df \
.selectExpr("CAST(key AS STRING)", "CAST(value AS STRING)") \
.writeStream \
.format("kafka") \
.option("kafka.bootstrap.servers", "host1:port1,host2:port2") \
.option("topic", "topic1") \
.start()
# Write key-value data from a DataFrame to Kafka using a topic specified in the data
ds = df \
.selectExpr("topic", "CAST(key AS STRING)", "CAST(value AS STRING)") \
.writeStream \
.format("kafka") \
.option("kafka.bootstrap.servers", "host1:port1,host2:port2") \
.start()
{% endhighlight %}
</div>
</div>
### Writing the output of Batch Queries to Kafka
<div class="codetabs">
<div data-lang="scala" markdown="1">
{% highlight scala %}
// Write key-value data from a DataFrame to a specific Kafka topic specified in an option
df.selectExpr("CAST(key AS STRING)", "CAST(value AS STRING)")
.write
.format("kafka")
.option("kafka.bootstrap.servers", "host1:port1,host2:port2")
.option("topic", "topic1")
.save()
// Write key-value data from a DataFrame to Kafka using a topic specified in the data
df.selectExpr("topic", "CAST(key AS STRING)", "CAST(value AS STRING)")
.write
.format("kafka")
.option("kafka.bootstrap.servers", "host1:port1,host2:port2")
.save()
{% endhighlight %}
</div>
<div data-lang="java" markdown="1">
{% highlight java %}
// Write key-value data from a DataFrame to a specific Kafka topic specified in an option
df.selectExpr("CAST(key AS STRING)", "CAST(value AS STRING)")
.write()
.format("kafka")
.option("kafka.bootstrap.servers", "host1:port1,host2:port2")
.option("topic", "topic1")
.save()
// Write key-value data from a DataFrame to Kafka using a topic specified in the data
df.selectExpr("topic", "CAST(key AS STRING)", "CAST(value AS STRING)")
.write()
.format("kafka")
.option("kafka.bootstrap.servers", "host1:port1,host2:port2")
.save()
{% endhighlight %}
</div>
<div data-lang="python" markdown="1">
{% highlight python %}
# Write key-value data from a DataFrame to a specific Kafka topic specified in an option
df.selectExpr("CAST(key AS STRING)", "CAST(value AS STRING)") \
.write \
.format("kafka") \
.option("kafka.bootstrap.servers", "host1:port1,host2:port2") \
.option("topic", "topic1") \
.save()
# Write key-value data from a DataFrame to Kafka using a topic specified in the data
df.selectExpr("topic", "CAST(key AS STRING)", "CAST(value AS STRING)") \
.write \
.format("kafka") \
.option("kafka.bootstrap.servers", "host1:port1,host2:port2") \
.save()
{% endhighlight %}
</div>
</div>
## Kafka Specific Configurations
Kafka's own configurations can be set via `DataStreamReader.option` with `kafka.` prefix, e.g,
`stream.option("kafka.bootstrap.servers", "host:port")`. For possible kafka parameters, see
[Kafka consumer config docs](http://kafka.apache.org/documentation.html#newconsumerconfigs) for
parameters related to reading data, and [Kafka producer config docs](http://kafka.apache.org/documentation/#producerconfigs)
for parameters related to writing data.
Note that the following Kafka params cannot be set and the Kafka source or sink will throw an exception:
- **group.id**: Kafka source will create a unique group id for each query automatically.
- **auto.offset.reset**: Set the source option `startingOffsets` to specify
@ -389,11 +592,15 @@ Note that the following Kafka params cannot be set and the Kafka source will thr
DataFrame operations to explicitly deserialize the keys.
- **value.deserializer**: Values are always deserialized as byte arrays with ByteArrayDeserializer.
Use DataFrame operations to explicitly deserialize the values.
- **key.serializer**: Keys are always serialized with ByteArraySerializer or StringSerializer. Use
DataFrame operations to explicitly serialize the keys into either strings or byte arrays.
- **value.serializer**: values are always serialized with ByteArraySerializer or StringSerializer. Use
DataFrame oeprations to explicitly serialize the values into either strings or byte arrays.
- **enable.auto.commit**: Kafka source doesn't commit any offset.
- **interceptor.classes**: Kafka source always read keys and values as byte arrays. It's not safe to
use ConsumerInterceptor as it may break the query.
### Deploying
## Deploying
As with any Spark applications, `spark-submit` is used to launch your application. `spark-sql-kafka-0-10_{{site.SCALA_BINARY_VERSION}}`
and its dependencies can be directly added to `spark-submit` using `--packages`, such as,