[SPARK-20373][SQL][SS] Batch queries with 'Dataset/DataFrame.withWatermark()` does not execute

## What changes were proposed in this pull request?

Any Dataset/DataFrame batch query with the operation `withWatermark` does not execute because the batch planner does not have any rule to explicitly handle the EventTimeWatermark logical plan.
The right solution is to simply remove the plan node, as the watermark should not affect any batch query in any way.

Changes:
- In this PR, we add a new rule `EliminateEventTimeWatermark` to check if we need to ignore the event time watermark. We will ignore watermark in any batch query.

Depends upon:
- [SPARK-20672](https://issues.apache.org/jira/browse/SPARK-20672). We can not add this rule into analyzer directly, because streaming query will be copied to `triggerLogicalPlan ` in every trigger, and the rule will be applied to `triggerLogicalPlan` mistakenly.

Others:
- A typo fix in example.

## How was this patch tested?

add new unit test.

Author: uncleGen <hustyugm@gmail.com>

Closes #17896 from uncleGen/SPARK-20373.
This commit is contained in:
uncleGen 2017-05-09 15:08:09 -07:00 committed by Shixiong Zhu
parent f79aa285cf
commit c0189abc7c
5 changed files with 27 additions and 3 deletions

View file

@ -901,6 +901,9 @@ Some sinks (e.g. files) may not supported fine-grained updates that Update Mode
with them, we have also support Append Mode, where only the *final counts* are written to sink.
This is illustrated below.
Note that using `withWatermark` on a non-streaming Dataset is no-op. As the watermark should not affect
any batch query in any way, we will ignore it directly.
![Watermarking in Append Mode](img/structured-streaming-watermark-append-mode.png)
Similar to the Update Mode earlier, the engine maintains intermediate counts for each window.

View file

@ -34,14 +34,14 @@ import org.apache.spark.sql.streaming._
* To run this on your local machine, you need to first run a Netcat server
* `$ nc -lk 9999`
* and then run the example
* `$ bin/run-example sql.streaming.StructuredNetworkWordCount
* `$ bin/run-example sql.streaming.StructuredSessionization
* localhost 9999`
*/
object StructuredSessionization {
def main(args: Array[String]): Unit = {
if (args.length < 2) {
System.err.println("Usage: StructuredNetworkWordCount <hostname> <port>")
System.err.println("Usage: StructuredSessionization <hostname> <port>")
System.exit(1)
}

View file

@ -2456,6 +2456,16 @@ object CleanupAliases extends Rule[LogicalPlan] {
}
}
/**
* Ignore event time watermark in batch query, which is only supported in Structured Streaming.
* TODO: add this rule into analyzer rule list.
*/
object EliminateEventTimeWatermark extends Rule[LogicalPlan] {
override def apply(plan: LogicalPlan): LogicalPlan = plan transform {
case EventTimeWatermark(_, _, child) if !child.isStreaming => child
}
}
/**
* Maps a time column to multiple time windows using the Expand operator. Since it's non-trivial to
* figure out how many windows a time column can map to, we over-estimate the number of windows and

View file

@ -615,7 +615,8 @@ class Dataset[T] private[sql](
.getOrElse(throw new AnalysisException(s"Unable to parse time delay '$delayThreshold'"))
require(parsedDelay.milliseconds >= 0 && parsedDelay.months >= 0,
s"delay threshold ($delayThreshold) should not be negative.")
EventTimeWatermark(UnresolvedAttribute(eventTime), parsedDelay, logicalPlan)
EliminateEventTimeWatermark(
EventTimeWatermark(UnresolvedAttribute(eventTime), parsedDelay, logicalPlan))
}
/**

View file

@ -344,6 +344,16 @@ class EventTimeWatermarkSuite extends StreamTest with BeforeAndAfter with Loggin
assert(eventTimeColumns(0).name === "second")
}
test("EventTime watermark should be ignored in batch query.") {
val df = testData
.withColumn("eventTime", $"key".cast("timestamp"))
.withWatermark("eventTime", "1 minute")
.select("eventTime")
.as[Long]
checkDataset[Long](df, 1L to 100L: _*)
}
private def assertNumStateRows(numTotalRows: Long): AssertOnQuery = AssertOnQuery { q =>
val progressWithData = q.recentProgress.filter(_.numInputRows > 0).lastOption.get
assert(progressWithData.stateOperators(0).numRowsTotal === numTotalRows)