From 054ddb2f54ab8e6b0088fbf9d576c7770e5abcbf Mon Sep 17 00:00:00 2001 From: Jose Torres Date: Thu, 14 Sep 2017 11:06:25 -0700 Subject: [PATCH] [SPARK-21988] Add default stats to StreamingExecutionRelation. ## What changes were proposed in this pull request? Add default stats to StreamingExecutionRelation. ## How was this patch tested? existing unit tests and an explain() test to be sure Author: Jose Torres Closes #19212 from joseph-torres/SPARK-21988. --- .../execution/streaming/StreamExecution.scala | 2 +- .../streaming/StreamingRelation.scala | 20 ++++++++++++++++--- .../sql/execution/streaming/memory.scala | 2 +- .../spark/sql/streaming/StreamSuite.scala | 20 ++++++++++++++++++- .../sql/streaming/StreamingQuerySuite.scala | 2 +- 5 files changed, 39 insertions(+), 7 deletions(-) diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/StreamExecution.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/StreamExecution.scala index 71088ff638..952e431fb1 100644 --- a/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/StreamExecution.scala +++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/StreamExecution.scala @@ -166,7 +166,7 @@ class StreamExecution( nextSourceId += 1 // We still need to use the previous `output` instead of `source.schema` as attributes in // "df.logicalPlan" has already used attributes of the previous `output`. - StreamingExecutionRelation(source, output) + StreamingExecutionRelation(source, output)(sparkSession) }) } sources = _logicalPlan.collect { case s: StreamingExecutionRelation => s.source } diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/StreamingRelation.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/StreamingRelation.scala index e8b00094ad..ab716052c2 100644 --- a/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/StreamingRelation.scala +++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/StreamingRelation.scala @@ -18,9 +18,11 @@ package org.apache.spark.sql.execution.streaming import org.apache.spark.rdd.RDD +import org.apache.spark.sql.SparkSession import org.apache.spark.sql.catalyst.InternalRow import org.apache.spark.sql.catalyst.expressions.Attribute import org.apache.spark.sql.catalyst.plans.logical.LeafNode +import org.apache.spark.sql.catalyst.plans.logical.Statistics import org.apache.spark.sql.execution.LeafExecNode import org.apache.spark.sql.execution.datasources.DataSource @@ -48,9 +50,21 @@ case class StreamingRelation(dataSource: DataSource, sourceName: String, output: * Used to link a streaming [[Source]] of data into a * [[org.apache.spark.sql.catalyst.plans.logical.LogicalPlan]]. */ -case class StreamingExecutionRelation(source: Source, output: Seq[Attribute]) extends LeafNode { +case class StreamingExecutionRelation( + source: Source, + output: Seq[Attribute])(session: SparkSession) + extends LeafNode { + override def isStreaming: Boolean = true override def toString: String = source.toString + + // There's no sensible value here. On the execution path, this relation will be + // swapped out with microbatches. But some dataframe operations (in particular explain) do lead + // to this node surviving analysis. So we satisfy the LeafNode contract with the session default + // value. + override def computeStats(): Statistics = Statistics( + sizeInBytes = BigInt(session.sessionState.conf.defaultSizeInBytes) + ) } /** @@ -65,7 +79,7 @@ case class StreamingRelationExec(sourceName: String, output: Seq[Attribute]) ext } object StreamingExecutionRelation { - def apply(source: Source): StreamingExecutionRelation = { - StreamingExecutionRelation(source, source.schema.toAttributes) + def apply(source: Source, session: SparkSession): StreamingExecutionRelation = { + StreamingExecutionRelation(source, source.schema.toAttributes)(session) } } diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/memory.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/memory.scala index c9784c093b..3041d4d703 100644 --- a/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/memory.scala +++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/memory.scala @@ -53,7 +53,7 @@ object MemoryStream { case class MemoryStream[A : Encoder](id: Int, sqlContext: SQLContext) extends Source with Logging { protected val encoder = encoderFor[A] - protected val logicalPlan = StreamingExecutionRelation(this) + protected val logicalPlan = StreamingExecutionRelation(this, sqlContext.sparkSession) protected val output = logicalPlan.output /** diff --git a/sql/core/src/test/scala/org/apache/spark/sql/streaming/StreamSuite.scala b/sql/core/src/test/scala/org/apache/spark/sql/streaming/StreamSuite.scala index d0b2041a86..9c901062d5 100644 --- a/sql/core/src/test/scala/org/apache/spark/sql/streaming/StreamSuite.scala +++ b/sql/core/src/test/scala/org/apache/spark/sql/streaming/StreamSuite.scala @@ -76,6 +76,22 @@ class StreamSuite extends StreamTest { CheckAnswer(Row(1, 1, "one"), Row(2, 2, "two"), Row(4, 4, "four"))) } + + test("explain join") { + // Make a table and ensure it will be broadcast. + val smallTable = Seq((1, "one"), (2, "two"), (4, "four")).toDF("number", "word") + + // Join the input stream with a table. + val inputData = MemoryStream[Int] + val joined = inputData.toDF().join(smallTable, smallTable("number") === $"value") + + val outputStream = new java.io.ByteArrayOutputStream() + Console.withOut(outputStream) { + joined.explain() + } + assert(outputStream.toString.contains("StreamingRelation")) + } + test("SPARK-20432: union one stream with itself") { val df = spark.readStream.format(classOf[FakeDefaultSource].getName).load().select("a") val unioned = df.union(df) @@ -337,7 +353,9 @@ class StreamSuite extends StreamTest { override def stop(): Unit = {} } - val df = Dataset[Int](sqlContext.sparkSession, StreamingExecutionRelation(source)) + val df = Dataset[Int]( + sqlContext.sparkSession, + StreamingExecutionRelation(source, sqlContext.sparkSession)) testStream(df)( // `ExpectFailure(isFatalError = true)` verifies two things: // - Fatal errors can be propagated to `StreamingQuery.exception` and diff --git a/sql/core/src/test/scala/org/apache/spark/sql/streaming/StreamingQuerySuite.scala b/sql/core/src/test/scala/org/apache/spark/sql/streaming/StreamingQuerySuite.scala index bf7c448ab5..3823e336d0 100644 --- a/sql/core/src/test/scala/org/apache/spark/sql/streaming/StreamingQuerySuite.scala +++ b/sql/core/src/test/scala/org/apache/spark/sql/streaming/StreamingQuerySuite.scala @@ -653,7 +653,7 @@ class StreamingQuerySuite extends StreamTest with BeforeAndAfter with Logging wi } override def stop(): Unit = {} } - StreamingExecutionRelation(source) + StreamingExecutionRelation(source, spark) } /** Returns the query progress at the end of the first trigger of streaming DF */