[SPARK-19074][SS][DOCS] Updated Structured Streaming Programming Guide for update mode and source/sink options

## What changes were proposed in this pull request?

Updates
- Updated Late Data Handling section by adding a figure for Update Mode. Its more intuitive to explain late data handling with Update Mode, so I added the new figure before the Append Mode figure.
- Updated Output Modes section with Update mode
- Added options for all the sources and sinks

---------------------------
---------------------------

![image](https://cloud.githubusercontent.com/assets/663212/21665176/f150b224-d29f-11e6-8372-14d32da21db9.png)

---------------------------
---------------------------
<img width="931" alt="screen shot 2017-01-03 at 6 09 11 pm" src="https://cloud.githubusercontent.com/assets/663212/21629740/d21c9bb8-d1df-11e6-915b-488a59589fa6.png">
<img width="933" alt="screen shot 2017-01-03 at 6 10 00 pm" src="https://cloud.githubusercontent.com/assets/663212/21629749/e22bdabe-d1df-11e6-86d3-7e51d2f28dbc.png">

---------------------------
---------------------------
![image](https://cloud.githubusercontent.com/assets/663212/21665200/108e18fc-d2a0-11e6-8640-af598cab090b.png)
![image](https://cloud.githubusercontent.com/assets/663212/21665148/cfe414fa-d29f-11e6-9baa-4124ccbab093.png)
![image](https://cloud.githubusercontent.com/assets/663212/21665226/2e8f39e4-d2a0-11e6-85b1-7657e2df5491.png)

Author: Tathagata Das <tathagata.das1565@gmail.com>

Closes #16468 from tdas/SPARK-19074.
This commit is contained in:
Tathagata Das 2017-01-06 11:29:01 -08:00
parent a9a137377e
commit b59cddaba0
6 changed files with 165 additions and 53 deletions

Binary file not shown.

After

Width:  |  Height:  |  Size: 243 KiB

Binary file not shown.

After

Width:  |  Height:  |  Size: 292 KiB

Binary file not shown.

Before

Width:  |  Height:  |  Size: 246 KiB

Binary file not shown.

View file

@ -374,7 +374,7 @@ The "Output" is defined as what gets written out to the external storage. The ou
- *Append Mode* - Only the new rows appended in the Result Table since the last trigger will be written to the external storage. This is applicable only on the queries where existing rows in the Result Table are not expected to change.
- *Update Mode* - Only the rows that were updated in the Result Table since the last trigger will be written to the external storage (not available yet in Spark 2.0). Note that this is different from the Complete Mode in that this mode does not output the rows that are not changed.
- *Update Mode* - Only the rows that were updated in the Result Table since the last trigger will be written to the external storage (available since Spark 2.1.1). Note that this is different from the Complete Mode in that this mode only outputs the rows that have changed since the last trigger.
Note that each mode is applicable on certain types of queries. This is discussed in detail [later](#output-modes).
@ -424,7 +424,7 @@ Streaming DataFrames can be created through the `DataStreamReader` interface
([Scala](api/scala/index.html#org.apache.spark.sql.streaming.DataStreamReader)/[Java](api/java/org/apache/spark/sql/streaming/DataStreamReader.html)/[Python](api/python/pyspark.sql.html#pyspark.sql.streaming.DataStreamReader) docs)
returned by `SparkSession.readStream()`. Similar to the read interface for creating static DataFrame, you can specify the details of the source data format, schema, options, etc.
#### Data Sources
#### Input Sources
In Spark 2.0, there are a few built-in sources.
- **File source** - Reads files written in a directory as a stream of data. Supported file formats are text, csv, json, parquet. See the docs of the DataStreamReader interface for a more up-to-date list, and supported options for each file format. Note that the files must be atomically placed in the given directory, which in most file systems, can be achieved by file move operations.
@ -433,6 +433,54 @@ In Spark 2.0, there are a few built-in sources.
- **Socket source (for testing)** - Reads UTF8 text data from a socket connection. The listening server socket is at the driver. Note that this should be used only for testing as this does not provide end-to-end fault-tolerance guarantees.
Some sources are not fault-tolerant because they do not guarantee that data can be replayed using
checkpointed offsets after a failure. See the earlier section on
[fault-tolerance semantics](#fault-tolerance-semantics).
Here are the details of all the sources in Spark.
<table class="table">
<tr>
<th>Source</th>
<th>Options</th>
<th>Fault-tolerant</th>
<th>Notes</th>
</tr>
<tr>
<td><b>File source</b></td>
<td>
<code>path</code>: path to the input directory, and common to all file formats.
<br/><br/>
For file-format-specific options, see the related methods in <code>DataStreamReader</code>
(<a href="api/scala/index.html#org.apache.spark.sql.streaming.DataStreamReader">Scala</a>/<a href="api/java/org/apache/spark/sql/streaming/DataStreamReader.html">Java</a>/<a href="api/python/pyspark.sql.html#pyspark.sql.streaming.DataStreamReader">Python</a>).
E.g. for "parquet" format options see <code>DataStreamReader.parquet()</code></td>
<td>Yes</td>
<td>Supports glob paths, but does not support multiple comma-separated paths/globs.</td>
</tr>
<tr>
<td><b>Socket Source</b></td>
<td>
<code>host</code>: host to connect to, must be specified<br/>
<code>port</code>: port to connect to, must be specified
</td>
<td>No</td>
<td></td>
</tr>
<tr>
<td><b>Kafka Source</b></td>
<td>
See the <a href="structured-streaming-kafka-integration.html">Kafka Integration Guide</a>.
</td>
<td>Yes</td>
<td></td>
</tr>
<tr>
<td></td>
<td></td>
<td></td>
<td></td>
</tr>
</table>
Here are some examples.
<div class="codetabs">
@ -753,34 +801,47 @@ windowedCounts = words
In this example, we are defining the watermark of the query on the value of the column "timestamp",
and also defining "10 minutes" as the threshold of how late is the data allowed to be. If this query
is run in Append output mode (discussed later in [Output Modes](#output-modes) section),
the engine will track the current event time from the column "timestamp" and wait for additional
"10 minutes" in event time before finalizing the windowed counts and adding them to the Result Table.
is run in Update output mode (discussed later in [Output Modes](#output-modes) section),
the engine will keep updating counts of a window in the Resule Table until the window is older
than the watermark, which lags behind the current event time in column "timestamp" by 10 minutes.
Here is an illustration.
![Watermarking in Append Mode](img/structured-streaming-watermark.png)
![Watermarking in Update Mode](img/structured-streaming-watermark-update-mode.png)
As shown in the illustration, the maximum event time tracked by the engine is the
*blue dashed line*, and the watermark set as `(max event time - '10 mins')`
at the beginning of every trigger is the red line For example, when the engine observes the data
`(12:14, dog)`, it sets the watermark for the next trigger as `12:04`.
For the window `12:00 - 12:10`, the partial counts are maintained as internal state while the system
is waiting for late data. After the system finds data (i.e. `(12:21, owl)`) such that the
watermark exceeds 12:10, the partial count is finalized and appended to the table. This count will
not change any further as all "too-late" data older than 12:10 will be ignored.
This watermark lets the engine maintain intermediate state for additional 10 minutes to allow late
data to be counted. For example, the data `(12:09, cat)` is out of order and late, and it falls in
windows `12:05 - 12:15` and `12:10 - 12:20`. Since, it is still ahead of the watermark `12:04` in
the trigger, the engine still maintains the intermediate counts as state and correctly updates the
counts of the related windows. However, when the watermark is updated to 12:11, the intermediate
state for window `(12:00 - 12:10)` is cleared, and all subsequent data (e.g. `(12:04, donkey)`)
is considered "too late" and therefore ignored. Note that after every trigger,
the updated counts (i.e. purple rows) are written to sink as the trigger output, as dictated by
the Update mode.
Note that in Append output mode, the system has to wait for "late threshold" time
before it can output the aggregation of a window. This may not be ideal if data can be very late,
(say 1 day) and you like to have partial counts without waiting for a day. In future, we will add
Update output mode which would allows every update to aggregates to be written to sink every trigger.
Some sinks (e.g. files) may not supported fine-grained updates that Update Mode requires. To work
with them, we have also support Append Mode, where only the *final counts* are written to sink.
This is illustrated below.
![Watermarking in Append Mode](img/structured-streaming-watermark-append-mode.png)
Similar to the Update Mode earlier, the engine maintains intermediate counts for each window.
However, the partial counts are not updated to the Result Table and not written to sink. The engine
waits for "10 mins" for late date to be counted,
then drops intermediate state of a window < watermark, and appends the final
counts to the Result Table/sink. For example, the final counts of window `12:00 - 12:10` is
appended to the Result Table only after the watermark is updated to `12:11`.
**Conditions for watermarking to clean aggregation state**
It is important to note that the following conditions must be satisfied for the watermarking to
clean the state in aggregation queries *(as of Spark 2.1, subject to change in the future)*.
clean the state in aggregation queries *(as of Spark 2.1.1, subject to change in the future)*.
- **Output mode must be Append.** Complete mode requires all aggregate data to be preserved, and hence
cannot use watermarking to drop intermediate state. See the [Output Modes](#output-modes) section
for detailed explanation of the semantics of each output mode.
- **Output mode must be Append or Update.** Complete mode requires all aggregate data to be preserved,
and hence cannot use watermarking to drop intermediate state. See the [Output Modes](#output-modes)
section for detailed explanation of the semantics of each output mode.
- The aggregation must have either the event-time column, or a `window` on the event-time column.
@ -835,8 +896,9 @@ streamingDf.join(staticDf, "type", "right_join") # right outer join with a stat
</div>
### Unsupported Operations
However, note that all of the operations applicable on static DataFrames/Datasets are not supported in streaming DataFrames/Datasets yet. While some of these unsupported operations will be supported in future releases of Spark, there are others which are fundamentally hard to implement on streaming data efficiently. For example, sorting is not supported on the input streaming Dataset, as it requires keeping track of all the data received in the stream. This is therefore fundamentally hard to execute efficiently. As of Spark 2.0, some of the unsupported operations are as follows
There are a few DataFrame/Dataset operations that are not supported with streaming DataFrames/Datasets.
Some of them are as follows.
- Multiple streaming aggregations (i.e. a chain of aggregations on a streaming DF) are not yet supported on streaming Datasets.
- Limit and take first N rows are not supported on streaming Datasets.
@ -863,7 +925,12 @@ In addition, there are some Dataset methods that will not work on streaming Data
- `show()` - Instead use the console sink (see next section).
If you try any of these operations, you will see an AnalysisException like "operation XYZ is not supported with streaming DataFrames/Datasets".
If you try any of these operations, you will see an `AnalysisException` like "operation XYZ is not supported with streaming DataFrames/Datasets".
While some of them may be supported in future releases of Spark,
there are others which are fundamentally hard to implement on streaming data efficiently.
For example, sorting on the input stream is not supported, as it requires keeping
track of all the data received in the stream. This is therefore fundamentally hard to execute
efficiently.
## Starting Streaming Queries
Once you have defined the final result DataFrame/Dataset, all that is left is for you start the streaming computation. To do that, you have to use the `DataStreamWriter`
@ -894,11 +961,11 @@ fault-tolerant sink). For example, queries with only `select`,
- **Complete mode** - The whole Result Table will be outputted to the sink after every trigger.
This is supported for aggregation queries.
- **Update mode** - (*not available in Spark 2.1*) Only the rows in the Result Table that were
- **Update mode** - (*Available since Spark 2.1.1*) Only the rows in the Result Table that were
updated since the last trigger will be outputted to the sink.
More information to be added in future releases.
Different types of streaming queries support different output modes.
Different types of streaming queries support different output modes.
Here is the compatibility matrix.
<table class="table">
@ -909,36 +976,38 @@ Here is the compatibility matrix.
<th>Notes</th>
</tr>
<tr>
<td colspan="2" valign="middle"><br/>Queries without aggregation</td>
<td>Append</td>
<td>
Complete mode note supported as it is infeasible to keep all data in the Result Table.
<td colspan="2" style="vertical-align: middle;">Queries without aggregation</td>
<td style="vertical-align: middle;">Append</td>
<td style="vertical-align: middle;">
Complete mode not supported as it is infeasible to keep all data in the Result Table.
</td>
</tr>
<tr>
<td rowspan="2">Queries with aggregation</td>
<td>Aggregation on event-time with watermark</td>
<td>Append, Complete</td>
<td rowspan="2" style="vertical-align: middle;">Queries with aggregation</td>
<td style="vertical-align: middle;">Aggregation on event-time with watermark</td>
<td style="vertical-align: middle;">Append, Update, Complete</td>
<td>
Append mode uses watermark to drop old aggregation state. But the output of a
windowed aggregation is delayed the late threshold specified in `withWatermark()` as by
the modes semantics, rows can be added to the Result Table only once after they are
finalized (i.e. after watermark is crossed). See
<a href="#handling-late-data">Late Data</a> section for more details.
finalized (i.e. after watermark is crossed). See the
<a href="#handling-late-data-and-watermarking">Late Data</a> section for more details.
<br/><br/>
Update mode uses watermark to drop old aggregation state.
<br/><br/>
Complete mode does drop not old aggregation state since by definition this mode
preserves all data in the Result Table.
</td>
</tr>
<tr>
<td>Other aggregations</td>
<td>Complete</td>
<td style="vertical-align: middle;">Other aggregations</td>
<td style="vertical-align: middle;">Complete, Update</td>
<td>
Since no watermark is defined (only defined in other category),
old aggregation state is not dropped.
<br/><br/>
Append mode is not supported as aggregates can update thus violating the semantics of
this mode.
<br/><br/>
Complete mode does drop not old aggregation state since by definition this mode
preserves all data in the Result Table.
</td>
</tr>
<tr>
@ -954,49 +1023,94 @@ There are a few types of built-in output sinks.
- **File sink** - Stores the output to a directory.
{% highlight scala %}
writeStream
.format("parquet") // can be "orc", "json", "csv", etc.
.option("path", "path/to/destination/dir")
.start()
{% endhighlight %}
- **Foreach sink** - Runs arbitrary computation on the records in the output. See later in the section for more details.
{% highlight scala %}
writeStream
.foreach(...)
.start()
{% endhighlight %}
- **Console sink (for debugging)** - Prints the output to the console/stdout every time there is a trigger. Both, Append and Complete output modes, are supported. This should be used for debugging purposes on low data volumes as the entire output is collected and stored in the driver's memory after every trigger.
- **Memory sink (for debugging)** - The output is stored in memory as an in-memory table. Both, Append and Complete output modes, are supported. This should be used for debugging purposes on low data volumes as the entire output is collected and stored in the driver's memory after every trigger.
{% highlight scala %}
writeStream
.format("console")
.start()
{% endhighlight %}
Here is a table of all the sinks, and the corresponding settings.
- **Memory sink (for debugging)** - The output is stored in memory as an in-memory table.
Both, Append and Complete output modes, are supported. This should be used for debugging purposes
on low data volumes as the entire output is collected and stored in the driver's memory.
Hence, use it with caution.
{% highlight scala %}
writeStream
.format("memory")
.queryName("tableName")
.start()
{% endhighlight %}
Some sinks are not fault-tolerant because they do not guarantee persistence of the output and are
meant for debugging purposes only. See the earlier section on
[fault-tolerance semantics](#fault-tolerance-semantics).
Here are the details of all the sinks in Spark.
<table class="table">
<tr>
<th>Sink</th>
<th>Supported Output Modes</th>
<th style="width:30%">Usage</th>
<th>Options</th>
<th>Fault-tolerant</th>
<th>Notes</th>
</tr>
<tr>
<td><b>File Sink</b></td>
<td>Append</td>
<td><pre>writeStream<br/> .format("parquet")<br/> .option(<br/> "checkpointLocation",<br/> "path/to/checkpoint/dir")<br/> .option(<br/> "path",<br/> "path/to/destination/dir")<br/> .start()</pre></td>
<td>
<code>path</code>: path to the output directory, must be specified.
<code>maxFilesPerTrigger</code>: maximum number of new files to be considered in every trigger (default: no max)
<br/>
<code>latestFirst</code>: whether to processs the latest new files first, useful when there is a large backlog of files(default: false)
<br/><br/>
For file-format-specific options, see the related methods in DataFrameWriter
(<a href="api/scala/index.html#org.apache.spark.sql.DataFrameWriter">Scala</a>/<a href="api/java/org/apache/spark/sql/DataFrameWriter.html">Java</a>/<a href="api/python/pyspark.sql.html#pyspark.sql.DataFrameWriter">Python</a>).
E.g. for "parquet" format options see <code>DataFrameWriter.parquet()</code>
</td>
<td>Yes</td>
<td>Supports writes to partitioned tables. Partitioning by time may be useful.</td>
</tr>
<tr>
<td><b>Foreach Sink</b></td>
<td>All modes</td>
<td><pre>writeStream<br/> .foreach(...)<br/> .start()</pre></td>
<td>Append, Update, Compelete</td>
<td>None</td>
<td>Depends on ForeachWriter implementation</td>
<td>More details in the <a href="#using-foreach">next section</a></td>
</tr>
<tr>
<td><b>Console Sink</b></td>
<td>Append, Complete</td>
<td><pre>writeStream<br/> .format("console")<br/> .start()</pre></td>
<td>Append, Update, Complete</td>
<td>
<code>numRows</code>: Number of rows to print every trigger (default: 20)
<br/>
<code>truncate</code>: Whether to truncate the output if too long (default: true)
</td>
<td>No</td>
<td></td>
</tr>
<tr>
<td><b>Memory Sink</b></td>
<td>Append, Complete</td>
<td><pre>writeStream<br/> .format("memory")<br/> .queryName("table")<br/> .start()</pre></td>
<td>No</td>
<td>Saves the output data as a table, for interactive querying. Table name is the query name.</td>
<td>None</td>
<td>No. But in Complete Mode, restarted query will recreate the full table.</td>
<td>Table name is the query name.</td>
</tr>
<tr>
<td></td>
@ -1007,7 +1121,7 @@ Here is a table of all the sinks, and the corresponding settings.
</tr>
</table>
Finally, you have to call `start()` to actually start the execution of the query. This returns a StreamingQuery object which is a handle to the continuously running execution. You can use this object to manage the query, which we will discuss in the next subsection. For now, lets understand all this with a few examples.
Note that you have to call `start()` to actually start the execution of the query. This returns a StreamingQuery object which is a handle to the continuously running execution. You can use this object to manage the query, which we will discuss in the next subsection. For now, lets understand all this with a few examples.
<div class="codetabs">

View file

@ -115,7 +115,7 @@ final class DataStreamWriter[T] private[sql](ds: Dataset[T]) {
}
/**
* Specifies the underlying output data source. Built-in options include "parquet" for now.
* Specifies the underlying output data source.
*
* @since 2.0.0
*/
@ -137,9 +137,7 @@ final class DataStreamWriter[T] private[sql](ds: Dataset[T]) {
* predicates on the partitioned columns. In order for partitioning to work well, the number
* of distinct values in each column should typically be less than tens of thousands.
*
* This was initially applicable for Parquet but in 1.5+ covers JSON, text, ORC and avro as well.
*
* @since 1.4.0
* @since 2.0.0
*/
@scala.annotation.varargs
def partitionBy(colNames: String*): DataStreamWriter[T] = {