[SPARK-21988] Add default stats to StreamingExecutionRelation.
## What changes were proposed in this pull request? Add default stats to StreamingExecutionRelation. ## How was this patch tested? existing unit tests and an explain() test to be sure Author: Jose Torres <jose@databricks.com> Closes #19212 from joseph-torres/SPARK-21988.
This commit is contained in:
parent
ddd7f5e11d
commit
054ddb2f54
|
@ -166,7 +166,7 @@ class StreamExecution(
|
||||||
nextSourceId += 1
|
nextSourceId += 1
|
||||||
// We still need to use the previous `output` instead of `source.schema` as attributes in
|
// We still need to use the previous `output` instead of `source.schema` as attributes in
|
||||||
// "df.logicalPlan" has already used attributes of the previous `output`.
|
// "df.logicalPlan" has already used attributes of the previous `output`.
|
||||||
StreamingExecutionRelation(source, output)
|
StreamingExecutionRelation(source, output)(sparkSession)
|
||||||
})
|
})
|
||||||
}
|
}
|
||||||
sources = _logicalPlan.collect { case s: StreamingExecutionRelation => s.source }
|
sources = _logicalPlan.collect { case s: StreamingExecutionRelation => s.source }
|
||||||
|
|
|
@ -18,9 +18,11 @@
|
||||||
package org.apache.spark.sql.execution.streaming
|
package org.apache.spark.sql.execution.streaming
|
||||||
|
|
||||||
import org.apache.spark.rdd.RDD
|
import org.apache.spark.rdd.RDD
|
||||||
|
import org.apache.spark.sql.SparkSession
|
||||||
import org.apache.spark.sql.catalyst.InternalRow
|
import org.apache.spark.sql.catalyst.InternalRow
|
||||||
import org.apache.spark.sql.catalyst.expressions.Attribute
|
import org.apache.spark.sql.catalyst.expressions.Attribute
|
||||||
import org.apache.spark.sql.catalyst.plans.logical.LeafNode
|
import org.apache.spark.sql.catalyst.plans.logical.LeafNode
|
||||||
|
import org.apache.spark.sql.catalyst.plans.logical.Statistics
|
||||||
import org.apache.spark.sql.execution.LeafExecNode
|
import org.apache.spark.sql.execution.LeafExecNode
|
||||||
import org.apache.spark.sql.execution.datasources.DataSource
|
import org.apache.spark.sql.execution.datasources.DataSource
|
||||||
|
|
||||||
|
@ -48,9 +50,21 @@ case class StreamingRelation(dataSource: DataSource, sourceName: String, output:
|
||||||
* Used to link a streaming [[Source]] of data into a
|
* Used to link a streaming [[Source]] of data into a
|
||||||
* [[org.apache.spark.sql.catalyst.plans.logical.LogicalPlan]].
|
* [[org.apache.spark.sql.catalyst.plans.logical.LogicalPlan]].
|
||||||
*/
|
*/
|
||||||
case class StreamingExecutionRelation(source: Source, output: Seq[Attribute]) extends LeafNode {
|
case class StreamingExecutionRelation(
|
||||||
|
source: Source,
|
||||||
|
output: Seq[Attribute])(session: SparkSession)
|
||||||
|
extends LeafNode {
|
||||||
|
|
||||||
override def isStreaming: Boolean = true
|
override def isStreaming: Boolean = true
|
||||||
override def toString: String = source.toString
|
override def toString: String = source.toString
|
||||||
|
|
||||||
|
// There's no sensible value here. On the execution path, this relation will be
|
||||||
|
// swapped out with microbatches. But some dataframe operations (in particular explain) do lead
|
||||||
|
// to this node surviving analysis. So we satisfy the LeafNode contract with the session default
|
||||||
|
// value.
|
||||||
|
override def computeStats(): Statistics = Statistics(
|
||||||
|
sizeInBytes = BigInt(session.sessionState.conf.defaultSizeInBytes)
|
||||||
|
)
|
||||||
}
|
}
|
||||||
|
|
||||||
/**
|
/**
|
||||||
|
@ -65,7 +79,7 @@ case class StreamingRelationExec(sourceName: String, output: Seq[Attribute]) ext
|
||||||
}
|
}
|
||||||
|
|
||||||
object StreamingExecutionRelation {
|
object StreamingExecutionRelation {
|
||||||
def apply(source: Source): StreamingExecutionRelation = {
|
def apply(source: Source, session: SparkSession): StreamingExecutionRelation = {
|
||||||
StreamingExecutionRelation(source, source.schema.toAttributes)
|
StreamingExecutionRelation(source, source.schema.toAttributes)(session)
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
|
@ -53,7 +53,7 @@ object MemoryStream {
|
||||||
case class MemoryStream[A : Encoder](id: Int, sqlContext: SQLContext)
|
case class MemoryStream[A : Encoder](id: Int, sqlContext: SQLContext)
|
||||||
extends Source with Logging {
|
extends Source with Logging {
|
||||||
protected val encoder = encoderFor[A]
|
protected val encoder = encoderFor[A]
|
||||||
protected val logicalPlan = StreamingExecutionRelation(this)
|
protected val logicalPlan = StreamingExecutionRelation(this, sqlContext.sparkSession)
|
||||||
protected val output = logicalPlan.output
|
protected val output = logicalPlan.output
|
||||||
|
|
||||||
/**
|
/**
|
||||||
|
|
|
@ -76,6 +76,22 @@ class StreamSuite extends StreamTest {
|
||||||
CheckAnswer(Row(1, 1, "one"), Row(2, 2, "two"), Row(4, 4, "four")))
|
CheckAnswer(Row(1, 1, "one"), Row(2, 2, "two"), Row(4, 4, "four")))
|
||||||
}
|
}
|
||||||
|
|
||||||
|
|
||||||
|
test("explain join") {
|
||||||
|
// Make a table and ensure it will be broadcast.
|
||||||
|
val smallTable = Seq((1, "one"), (2, "two"), (4, "four")).toDF("number", "word")
|
||||||
|
|
||||||
|
// Join the input stream with a table.
|
||||||
|
val inputData = MemoryStream[Int]
|
||||||
|
val joined = inputData.toDF().join(smallTable, smallTable("number") === $"value")
|
||||||
|
|
||||||
|
val outputStream = new java.io.ByteArrayOutputStream()
|
||||||
|
Console.withOut(outputStream) {
|
||||||
|
joined.explain()
|
||||||
|
}
|
||||||
|
assert(outputStream.toString.contains("StreamingRelation"))
|
||||||
|
}
|
||||||
|
|
||||||
test("SPARK-20432: union one stream with itself") {
|
test("SPARK-20432: union one stream with itself") {
|
||||||
val df = spark.readStream.format(classOf[FakeDefaultSource].getName).load().select("a")
|
val df = spark.readStream.format(classOf[FakeDefaultSource].getName).load().select("a")
|
||||||
val unioned = df.union(df)
|
val unioned = df.union(df)
|
||||||
|
@ -337,7 +353,9 @@ class StreamSuite extends StreamTest {
|
||||||
|
|
||||||
override def stop(): Unit = {}
|
override def stop(): Unit = {}
|
||||||
}
|
}
|
||||||
val df = Dataset[Int](sqlContext.sparkSession, StreamingExecutionRelation(source))
|
val df = Dataset[Int](
|
||||||
|
sqlContext.sparkSession,
|
||||||
|
StreamingExecutionRelation(source, sqlContext.sparkSession))
|
||||||
testStream(df)(
|
testStream(df)(
|
||||||
// `ExpectFailure(isFatalError = true)` verifies two things:
|
// `ExpectFailure(isFatalError = true)` verifies two things:
|
||||||
// - Fatal errors can be propagated to `StreamingQuery.exception` and
|
// - Fatal errors can be propagated to `StreamingQuery.exception` and
|
||||||
|
|
|
@ -653,7 +653,7 @@ class StreamingQuerySuite extends StreamTest with BeforeAndAfter with Logging wi
|
||||||
}
|
}
|
||||||
override def stop(): Unit = {}
|
override def stop(): Unit = {}
|
||||||
}
|
}
|
||||||
StreamingExecutionRelation(source)
|
StreamingExecutionRelation(source, spark)
|
||||||
}
|
}
|
||||||
|
|
||||||
/** Returns the query progress at the end of the first trigger of streaming DF */
|
/** Returns the query progress at the end of the first trigger of streaming DF */
|
||||||
|
|
Loading…
Reference in a new issue