From cce25b360ee9e39d9510134c73a1761475eaf4ac Mon Sep 17 00:00:00 2001 From: Jose Torres Date: Mon, 7 Aug 2017 12:27:16 -0700 Subject: [PATCH] [SPARK-21565][SS] Propagate metadata in attribute replacement. ## What changes were proposed in this pull request? Propagate metadata in attribute replacement during streaming execution. This is necessary for EventTimeWatermarks consuming replaced attributes. ## How was this patch tested? new unit test, which was verified to fail before the fix Author: Jose Torres Closes #18840 from joseph-torres/SPARK-21565. --- .../execution/streaming/StreamExecution.scala | 3 +- .../streaming/EventTimeWatermarkSuite.scala | 28 +++++++++++++++++++ 2 files changed, 30 insertions(+), 1 deletion(-) diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/StreamExecution.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/StreamExecution.scala index 5711262654..1528e7f469 100644 --- a/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/StreamExecution.scala +++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/StreamExecution.scala @@ -628,7 +628,8 @@ class StreamExecution( // Rewire the plan to use the new attributes that were returned by the source. val replacementMap = AttributeMap(replacements) val triggerLogicalPlan = withNewSources transformAllExpressions { - case a: Attribute if replacementMap.contains(a) => replacementMap(a) + case a: Attribute if replacementMap.contains(a) => + replacementMap(a).withMetadata(a.metadata) case ct: CurrentTimestamp => CurrentBatchTimestamp(offsetSeqMetadata.batchTimestampMs, ct.dataType) diff --git a/sql/core/src/test/scala/org/apache/spark/sql/streaming/EventTimeWatermarkSuite.scala b/sql/core/src/test/scala/org/apache/spark/sql/streaming/EventTimeWatermarkSuite.scala index 552911f32e..4f19fa0bb4 100644 --- a/sql/core/src/test/scala/org/apache/spark/sql/streaming/EventTimeWatermarkSuite.scala +++ b/sql/core/src/test/scala/org/apache/spark/sql/streaming/EventTimeWatermarkSuite.scala @@ -391,6 +391,34 @@ class EventTimeWatermarkSuite extends StreamTest with BeforeAndAfter with Matche checkDataset[Long](df, 1L to 100L: _*) } + test("SPARK-21565: watermark operator accepts attributes from replacement") { + withTempDir { dir => + dir.delete() + + val df = Seq(("a", 100.0, new java.sql.Timestamp(100L))) + .toDF("symbol", "price", "eventTime") + df.write.json(dir.getCanonicalPath) + + val input = spark.readStream.schema(df.schema) + .json(dir.getCanonicalPath) + + val groupEvents = input + .withWatermark("eventTime", "2 seconds") + .groupBy("symbol", "eventTime") + .agg(count("price") as 'count) + .select("symbol", "eventTime", "count") + val q = groupEvents.writeStream + .outputMode("append") + .format("console") + .start() + try { + q.processAllAvailable() + } finally { + q.stop() + } + } + } + private def assertNumStateRows(numTotalRows: Long): AssertOnQuery = AssertOnQuery { q => val progressWithData = q.recentProgress.filter(_.numInputRows > 0).lastOption.get assert(progressWithData.stateOperators(0).numRowsTotal === numTotalRows)