[SPARK-18461][DOCS][STRUCTUREDSTREAMING] Added more information about monitoring streaming queries

## What changes were proposed in this pull request?
<img width="941" alt="screen shot 2016-11-15 at 6 27 32 pm" src="https://cloud.githubusercontent.com/assets/663212/20332521/4190b858-ab61-11e6-93a6-4bdc05105ed9.png">
<img width="940" alt="screen shot 2016-11-15 at 6 27 45 pm" src="https://cloud.githubusercontent.com/assets/663212/20332525/44a0d01e-ab61-11e6-8668-47f925490d4f.png">

Author: Tathagata Das <tathagata.das1565@gmail.com>

Closes #15897 from tdas/SPARK-18461.
This commit is contained in:
Tathagata Das 2016-11-16 11:03:10 -08:00 committed by Michael Armbrust
parent 0048ce7ce6
commit bb6cdfd9a6

View file

@ -1087,9 +1087,185 @@ spark.streams().awaitAnyTermination() # block until any one of them terminates
</div>
</div>
Finally, for asynchronous monitoring of streaming queries, you can create and attach a `StreamingQueryListener`
([Scala](api/scala/index.html#org.apache.spark.sql.streaming.StreamingQueryListener)/[Java](api/java/org/apache/spark/sql/streaming/StreamingQueryListener.html) docs),
which will give you regular callback-based updates when queries are started and terminated.
## Monitoring Streaming Queries
There are two ways you can monitor queries. You can directly get the current status
of an active query using `streamingQuery.status`, which will return a `StreamingQueryStatus` object
([Scala](api/scala/index.html#org.apache.spark.sql.streaming.StreamingQueryStatus)/[Java](api/java/org/apache/spark/sql/streaming/StreamingQueryStatus.html)/[Python](api/python/pyspark.sql.html#pyspark.sql.streaming.StreamingQueryStatus) docs)
that has all the details like current ingestion rates, processing rates, average latency,
details of the currently active trigger, etc.
<div class="codetabs">
<div data-lang="scala" markdown="1">
{% highlight scala %}
val query: StreamingQuery = ...
println(query.status)
/* Will print the current status of the query
Status of query 'queryName'
Query id: 1
Status timestamp: 123
Input rate: 15.5 rows/sec
Processing rate 23.5 rows/sec
Latency: 345.0 ms
Trigger details:
batchId: 5
isDataPresentInTrigger: true
isTriggerActive: true
latency.getBatch.total: 20
latency.getOffset.total: 10
numRows.input.total: 100
Source statuses [1 source]:
Source 1 - MySource1
Available offset: 0
Input rate: 15.5 rows/sec
Processing rate: 23.5 rows/sec
Trigger details:
numRows.input.source: 100
latency.getOffset.source: 10
latency.getBatch.source: 20
Sink status - MySink
Committed offsets: [1, -]
*/
{% endhighlight %}
</div>
<div data-lang="java" markdown="1">
{% highlight java %}
StreamingQuery query = ...
System.out.println(query.status);
/* Will print the current status of the query
Status of query 'queryName'
Query id: 1
Status timestamp: 123
Input rate: 15.5 rows/sec
Processing rate 23.5 rows/sec
Latency: 345.0 ms
Trigger details:
batchId: 5
isDataPresentInTrigger: true
isTriggerActive: true
latency.getBatch.total: 20
latency.getOffset.total: 10
numRows.input.total: 100
Source statuses [1 source]:
Source 1 - MySource1
Available offset: 0
Input rate: 15.5 rows/sec
Processing rate: 23.5 rows/sec
Trigger details:
numRows.input.source: 100
latency.getOffset.source: 10
latency.getBatch.source: 20
Sink status - MySink
Committed offsets: [1, -]
*/
{% endhighlight %}
</div>
<div data-lang="python" markdown="1">
{% highlight python %}
query = ... // a StreamingQuery
print(query.status)
'''
Will print the current status of the query
Status of query 'queryName'
Query id: 1
Status timestamp: 123
Input rate: 15.5 rows/sec
Processing rate 23.5 rows/sec
Latency: 345.0 ms
Trigger details:
batchId: 5
isDataPresentInTrigger: true
isTriggerActive: true
latency.getBatch.total: 20
latency.getOffset.total: 10
numRows.input.total: 100
Source statuses [1 source]:
Source 1 - MySource1
Available offset: 0
Input rate: 15.5 rows/sec
Processing rate: 23.5 rows/sec
Trigger details:
numRows.input.source: 100
latency.getOffset.source: 10
latency.getBatch.source: 20
Sink status - MySink
Committed offsets: [1, -]
'''
{% endhighlight %}
</div>
</div>
You can also asynchronously monitor all queries associated with a
`SparkSession` by attaching a `StreamingQueryListener`
([Scala](api/scala/index.html#org.apache.spark.sql.streaming.StreamingQueryListener)/[Java](api/java/org/apache/spark/sql/streaming/StreamingQueryListener.html) docs).
Once you attach your custom `StreamingQueryListener` object with
`sparkSession.streams.attachListener()`, you will get callbacks when a query is started and
stopped and when there is progress made in an active query. Here is an example,
<div class="codetabs">
<div data-lang="scala" markdown="1">
{% highlight scala %}
val spark: SparkSession = ...
spark.streams.addListener(new StreamingQueryListener() {
override def onQueryStarted(queryStarted: QueryStartedEvent): Unit = {
println("Query started: " + queryTerminated.queryStatus.name)
}
override def onQueryTerminated(queryTerminated: QueryTerminatedEvent): Unit = {
println("Query terminated: " + queryTerminated.queryStatus.name)
}
override def onQueryProgress(queryProgress: QueryProgressEvent): Unit = {
println("Query made progress: " + queryProgress.queryStatus)
}
})
{% endhighlight %}
</div>
<div data-lang="java" markdown="1">
{% highlight java %}
SparkSession spark = ...
spark.streams.addListener(new StreamingQueryListener() {
@Overrides void onQueryStarted(QueryStartedEvent queryStarted) {
System.out.println("Query started: " + queryTerminated.queryStatus.name);
}
@Overrides void onQueryTerminated(QueryTerminatedEvent queryTerminated) {
System.out.println("Query terminated: " + queryTerminated.queryStatus.name);
}
@Overrides void onQueryProgress(QueryProgressEvent queryProgress) {
System.out.println("Query made progress: " + queryProgress.queryStatus);
}
});
{% endhighlight %}
</div>
<div data-lang="python" markdown="1">
{% highlight bash %}
Not available in Python.
{% endhighlight %}
</div>
</div>
## Recovering from Failures with Checkpointing
In case of a failure or intentional shutdown, you can recover the previous progress and state of a previous query, and continue where it left off. This is done using checkpointing and write ahead logs. You can configure a query with a checkpoint location, and the query will save all the progress information (i.e. range of offsets processed in each trigger) and the running aggregates (e.g. word counts in the [quick example](#quick-example)) to the checkpoint location. As of Spark 2.0, this checkpoint location has to be a path in an HDFS compatible file system, and can be set as an option in the DataStreamWriter when [starting a query](#starting-streaming-queries).