[MINOR][SS][DOCS] Update Structured Streaming guide doc and update code typo

### What changes were proposed in this pull request?

This is a minor change to update structured-streaming-programming-guide and typos in code.

### Why are the changes needed?

Keep the user-facing document correct and updated.

### Does this PR introduce _any_ user-facing change?

No.

### How was this patch tested?

Unit tests.

Closes #30074 from viirya/ss-minor.

Authored-by: Liang-Chi Hsieh <viirya@gmail.com>
Signed-off-by: Dongjoon Hyun <dhyun@apple.com>
This commit is contained in:
Liang-Chi Hsieh 2020-10-16 22:18:12 -07:00 committed by Dongjoon Hyun
parent 911dcd3983
commit 2c4599db4b
2 changed files with 9 additions and 7 deletions

View file

@ -1763,7 +1763,9 @@ Here is the compatibility matrix.
<tr>
<td colspan="2" style="vertical-align: middle;">Queries with <code>mapGroupsWithState</code></td>
<td style="vertical-align: middle;">Update</td>
<td style="vertical-align: middle;"></td>
<td style="vertical-align: middle;">
Aggregations not allowed in a query with <code>mapGroupsWithState</code>.
</td>
</tr>
<tr>
<td rowspan="2" style="vertical-align: middle;">Queries with <code>flatMapGroupsWithState</code></td>
@ -1777,7 +1779,7 @@ Here is the compatibility matrix.
<td style="vertical-align: middle;">Update operation mode</td>
<td style="vertical-align: middle;">Update</td>
<td style="vertical-align: middle;">
Aggregations not allowed after <code>flatMapGroupsWithState</code>.
Aggregations not allowed in a query with <code>flatMapGroupsWithState</code>.
</td>
</tr>
<tr>

View file

@ -212,11 +212,11 @@ object UnsupportedOperationChecker extends Logging {
case m: FlatMapGroupsWithState if m.isStreaming =>
// Check compatibility with output modes and aggregations in query
val aggsAfterFlatMapGroups = collectStreamingAggregates(plan)
val aggsInQuery = collectStreamingAggregates(plan)
if (m.isMapGroupsWithState) { // check mapGroupsWithState
// allowed only in update query output mode and without aggregation
if (aggsAfterFlatMapGroups.nonEmpty) {
if (aggsInQuery.nonEmpty) {
throwError(
"mapGroupsWithState is not supported with aggregation " +
"on a streaming DataFrame/Dataset")
@ -225,8 +225,8 @@ object UnsupportedOperationChecker extends Logging {
"mapGroupsWithState is not supported with " +
s"$outputMode output mode on a streaming DataFrame/Dataset")
}
} else { // check latMapGroupsWithState
if (aggsAfterFlatMapGroups.isEmpty) {
} else { // check flatMapGroupsWithState
if (aggsInQuery.isEmpty) {
// flatMapGroupsWithState without aggregation: operation's output mode must
// match query output mode
m.outputMode match {
@ -252,7 +252,7 @@ object UnsupportedOperationChecker extends Logging {
} else if (collectStreamingAggregates(m).nonEmpty) {
throwError(
"flatMapGroupsWithState in append mode is not supported after " +
s"aggregation on a streaming DataFrame/Dataset")
"aggregation on a streaming DataFrame/Dataset")
}
}
}