[SPARK-23303][SQL] improve the explain result for data source v2 relations

## What changes were proposed in this pull request?

The proposed explain format:
**[streaming header] [RelationV2/ScanV2] [data source name] [output] [pushed filters] [options]**

**streaming header**: if it's a streaming relation, put a "Streaming" at the beginning.
**RelationV2/ScanV2**: if it's a logical plan, put a "RelationV2", else, put a "ScanV2"
**data source name**: the simple class name of the data source implementation
**output**: a string of the plan output attributes
**pushed filters**: a string of all the filters that have been pushed to this data source
**options**: all the options to create the data source reader.

The current explain result for data source v2 relation is unreadable:
```
== Parsed Logical Plan ==
'Filter ('i > 6)
+- AnalysisBarrier
      +- Project [j#1]
         +- DataSourceV2Relation [i#0, j#1], org.apache.spark.sql.sources.v2.AdvancedDataSourceV2$Reader3b415940

== Analyzed Logical Plan ==
j: int
Project [j#1]
+- Filter (i#0 > 6)
   +- Project [j#1, i#0]
      +- DataSourceV2Relation [i#0, j#1], org.apache.spark.sql.sources.v2.AdvancedDataSourceV2$Reader3b415940

== Optimized Logical Plan ==
Project [j#1]
+- Filter isnotnull(i#0)
   +- DataSourceV2Relation [i#0, j#1], org.apache.spark.sql.sources.v2.AdvancedDataSourceV2$Reader3b415940

== Physical Plan ==
*(1) Project [j#1]
+- *(1) Filter isnotnull(i#0)
   +- *(1) DataSourceV2Scan [i#0, j#1], org.apache.spark.sql.sources.v2.AdvancedDataSourceV2$Reader3b415940
```

after this PR
```
== Parsed Logical Plan ==
'Project [unresolvedalias('j, None)]
+- AnalysisBarrier
      +- RelationV2 AdvancedDataSourceV2[i#0, j#1]

== Analyzed Logical Plan ==
j: int
Project [j#1]
+- RelationV2 AdvancedDataSourceV2[i#0, j#1]

== Optimized Logical Plan ==
RelationV2 AdvancedDataSourceV2[j#1]

== Physical Plan ==
*(1) ScanV2 AdvancedDataSourceV2[j#1]
```
-------
```
== Analyzed Logical Plan ==
i: int, j: int
Filter (i#88 > 3)
+- RelationV2 JavaAdvancedDataSourceV2[i#88, j#89]

== Optimized Logical Plan ==
Filter isnotnull(i#88)
+- RelationV2 JavaAdvancedDataSourceV2[i#88, j#89] (Pushed Filters: [GreaterThan(i,3)])

== Physical Plan ==
*(1) Filter isnotnull(i#88)
+- *(1) ScanV2 JavaAdvancedDataSourceV2[i#88, j#89] (Pushed Filters: [GreaterThan(i,3)])
```

an example for streaming query
```
== Parsed Logical Plan ==
Aggregate [value#6], [value#6, count(1) AS count(1)#11L]
+- SerializeFromObject [staticinvoke(class org.apache.spark.unsafe.types.UTF8String, StringType, fromString, input[0, java.lang.String, true], true, false) AS value#6]
   +- MapElements <function1>, class java.lang.String, [StructField(value,StringType,true)], obj#5: java.lang.String
      +- DeserializeToObject cast(value#25 as string).toString, obj#4: java.lang.String
         +- Streaming RelationV2 MemoryStreamDataSource[value#25]

== Analyzed Logical Plan ==
value: string, count(1): bigint
Aggregate [value#6], [value#6, count(1) AS count(1)#11L]
+- SerializeFromObject [staticinvoke(class org.apache.spark.unsafe.types.UTF8String, StringType, fromString, input[0, java.lang.String, true], true, false) AS value#6]
   +- MapElements <function1>, class java.lang.String, [StructField(value,StringType,true)], obj#5: java.lang.String
      +- DeserializeToObject cast(value#25 as string).toString, obj#4: java.lang.String
         +- Streaming RelationV2 MemoryStreamDataSource[value#25]

== Optimized Logical Plan ==
Aggregate [value#6], [value#6, count(1) AS count(1)#11L]
+- SerializeFromObject [staticinvoke(class org.apache.spark.unsafe.types.UTF8String, StringType, fromString, input[0, java.lang.String, true], true, false) AS value#6]
   +- MapElements <function1>, class java.lang.String, [StructField(value,StringType,true)], obj#5: java.lang.String
      +- DeserializeToObject value#25.toString, obj#4: java.lang.String
         +- Streaming RelationV2 MemoryStreamDataSource[value#25]

== Physical Plan ==
*(4) HashAggregate(keys=[value#6], functions=[count(1)], output=[value#6, count(1)#11L])
+- StateStoreSave [value#6], state info [ checkpoint = *********(redacted)/cloud/dev/spark/target/tmp/temporary-549f264b-2531-4fcb-a52f-433c77347c12/state, runId = f84d9da9-2f8c-45c1-9ea1-70791be684de, opId = 0, ver = 0, numPartitions = 5], Complete, 0
   +- *(3) HashAggregate(keys=[value#6], functions=[merge_count(1)], output=[value#6, count#16L])
      +- StateStoreRestore [value#6], state info [ checkpoint = *********(redacted)/cloud/dev/spark/target/tmp/temporary-549f264b-2531-4fcb-a52f-433c77347c12/state, runId = f84d9da9-2f8c-45c1-9ea1-70791be684de, opId = 0, ver = 0, numPartitions = 5]
         +- *(2) HashAggregate(keys=[value#6], functions=[merge_count(1)], output=[value#6, count#16L])
            +- Exchange hashpartitioning(value#6, 5)
               +- *(1) HashAggregate(keys=[value#6], functions=[partial_count(1)], output=[value#6, count#16L])
                  +- *(1) SerializeFromObject [staticinvoke(class org.apache.spark.unsafe.types.UTF8String, StringType, fromString, input[0, java.lang.String, true], true, false) AS value#6]
                     +- *(1) MapElements <function1>, obj#5: java.lang.String
                        +- *(1) DeserializeToObject value#25.toString, obj#4: java.lang.String
                           +- *(1) ScanV2 MemoryStreamDataSource[value#25]
```
## How was this patch tested?

N/A

Author: Wenchen Fan <wenchen@databricks.com>

Closes #20647 from cloud-fan/explain.
This commit is contained in:
Wenchen Fan 2018-03-05 20:35:14 -08:00
parent 8c5b34c425
commit ad640a5aff
13 changed files with 183 additions and 105 deletions

View file

@ -60,7 +60,7 @@ class KafkaContinuousSourceTopicDeletionSuite extends KafkaContinuousTest {
eventually(timeout(streamingTimeout)) {
assert(
query.lastExecution.logical.collectFirst {
case StreamingDataSourceV2Relation(_, r: KafkaContinuousReader) => r
case StreamingDataSourceV2Relation(_, _, _, r: KafkaContinuousReader) => r
}.exists { r =>
// Ensure the new topic is present and the old topic is gone.
r.knownPartitions.exists(_.topic == topic2)

View file

@ -47,7 +47,7 @@ trait KafkaContinuousTest extends KafkaSourceTest {
eventually(timeout(streamingTimeout)) {
assert(
query.lastExecution.logical.collectFirst {
case StreamingDataSourceV2Relation(_, r: KafkaContinuousReader) => r
case StreamingDataSourceV2Relation(_, _, _, r: KafkaContinuousReader) => r
}.exists(_.knownPartitions.size == newCount),
s"query never reconfigured to $newCount partitions")
}

View file

@ -124,7 +124,7 @@ abstract class KafkaSourceTest extends StreamTest with SharedSQLContext {
} ++ (query.get.lastExecution match {
case null => Seq()
case e => e.logical.collect {
case StreamingDataSourceV2Relation(_, reader: KafkaContinuousReader) => reader
case StreamingDataSourceV2Relation(_, _, _, reader: KafkaContinuousReader) => reader
}
})
}.distinct

View file

@ -1,64 +0,0 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.spark.sql.execution.datasources.v2
import java.util.Objects
import org.apache.spark.sql.catalyst.expressions.Attribute
import org.apache.spark.sql.sources.v2.reader._
/**
* A base class for data source reader holder with customized equals/hashCode methods.
*/
trait DataSourceReaderHolder {
/**
* The output of the data source reader, w.r.t. column pruning.
*/
def output: Seq[Attribute]
/**
* The held data source reader.
*/
def reader: DataSourceReader
/**
* The metadata of this data source reader that can be used for equality test.
*/
private def metadata: Seq[Any] = {
val filters: Any = reader match {
case s: SupportsPushDownCatalystFilters => s.pushedCatalystFilters().toSet
case s: SupportsPushDownFilters => s.pushedFilters().toSet
case _ => Nil
}
Seq(output, reader.getClass, filters)
}
def canEqual(other: Any): Boolean
override def equals(other: Any): Boolean = other match {
case other: DataSourceReaderHolder =>
canEqual(other) && metadata.length == other.metadata.length &&
metadata.zip(other.metadata).forall { case (l, r) => l == r }
case _ => false
}
override def hashCode(): Int = {
metadata.map(Objects.hashCode).foldLeft(0)((a, b) => 31 * a + b)
}
}

View file

@ -35,15 +35,12 @@ case class DataSourceV2Relation(
options: Map[String, String],
projection: Seq[AttributeReference],
filters: Option[Seq[Expression]] = None,
userSpecifiedSchema: Option[StructType] = None) extends LeafNode with MultiInstanceRelation {
userSpecifiedSchema: Option[StructType] = None)
extends LeafNode with MultiInstanceRelation with DataSourceV2StringFormat {
import DataSourceV2Relation._
override def simpleString: String = {
s"DataSourceV2Relation(source=${source.name}, " +
s"schema=[${output.map(a => s"$a ${a.dataType.simpleString}").mkString(", ")}], " +
s"filters=[${pushedFilters.mkString(", ")}], options=$options)"
}
override def simpleString: String = "RelationV2 " + metadataString
override lazy val schema: StructType = reader.readSchema()
@ -107,19 +104,36 @@ case class DataSourceV2Relation(
}
/**
* A specialization of DataSourceV2Relation with the streaming bit set to true. Otherwise identical
* to the non-streaming relation.
* A specialization of [[DataSourceV2Relation]] with the streaming bit set to true.
*
* Note that, this plan has a mutable reader, so Spark won't apply operator push-down for this plan,
* to avoid making the plan mutable. We should consolidate this plan and [[DataSourceV2Relation]]
* after we figure out how to apply operator push-down for streaming data sources.
*/
case class StreamingDataSourceV2Relation(
output: Seq[AttributeReference],
source: DataSourceV2,
options: Map[String, String],
reader: DataSourceReader)
extends LeafNode with DataSourceReaderHolder with MultiInstanceRelation {
extends LeafNode with MultiInstanceRelation with DataSourceV2StringFormat {
override def isStreaming: Boolean = true
override def canEqual(other: Any): Boolean = other.isInstanceOf[StreamingDataSourceV2Relation]
override def simpleString: String = "Streaming RelationV2 " + metadataString
override def newInstance(): LogicalPlan = copy(output = output.map(_.newInstance()))
// TODO: unify the equal/hashCode implementation for all data source v2 query plans.
override def equals(other: Any): Boolean = other match {
case other: StreamingDataSourceV2Relation =>
output == other.output && reader.getClass == other.reader.getClass && options == other.options
case _ => false
}
override def hashCode(): Int = {
Seq(output, source, options).hashCode()
}
override def computeStats(): Statistics = reader match {
case r: SupportsReportStatistics =>
Statistics(sizeInBytes = r.getStatistics.sizeInBytes().orElse(conf.defaultSizeInBytes))

View file

@ -27,6 +27,7 @@ import org.apache.spark.sql.catalyst.expressions._
import org.apache.spark.sql.catalyst.plans.physical
import org.apache.spark.sql.execution.{ColumnarBatchScan, LeafExecNode, WholeStageCodegenExec}
import org.apache.spark.sql.execution.streaming.continuous._
import org.apache.spark.sql.sources.v2.DataSourceV2
import org.apache.spark.sql.sources.v2.reader._
import org.apache.spark.sql.sources.v2.reader.streaming.ContinuousReader
import org.apache.spark.sql.types.StructType
@ -36,10 +37,23 @@ import org.apache.spark.sql.types.StructType
*/
case class DataSourceV2ScanExec(
output: Seq[AttributeReference],
@transient source: DataSourceV2,
@transient options: Map[String, String],
@transient reader: DataSourceReader)
extends LeafExecNode with DataSourceReaderHolder with ColumnarBatchScan {
extends LeafExecNode with DataSourceV2StringFormat with ColumnarBatchScan {
override def canEqual(other: Any): Boolean = other.isInstanceOf[DataSourceV2ScanExec]
override def simpleString: String = "ScanV2 " + metadataString
// TODO: unify the equal/hashCode implementation for all data source v2 query plans.
override def equals(other: Any): Boolean = other match {
case other: DataSourceV2ScanExec =>
output == other.output && reader.getClass == other.reader.getClass && options == other.options
case _ => false
}
override def hashCode(): Int = {
Seq(output, source, options).hashCode()
}
override def outputPartitioning: physical.Partitioning = reader match {
case s: SupportsReportPartitioning =>

View file

@ -23,11 +23,11 @@ import org.apache.spark.sql.execution.SparkPlan
object DataSourceV2Strategy extends Strategy {
override def apply(plan: LogicalPlan): Seq[SparkPlan] = plan match {
case relation: DataSourceV2Relation =>
DataSourceV2ScanExec(relation.output, relation.reader) :: Nil
case r: DataSourceV2Relation =>
DataSourceV2ScanExec(r.output, r.source, r.options, r.reader) :: Nil
case relation: StreamingDataSourceV2Relation =>
DataSourceV2ScanExec(relation.output, relation.reader) :: Nil
case r: StreamingDataSourceV2Relation =>
DataSourceV2ScanExec(r.output, r.source, r.options, r.reader) :: Nil
case WriteToDataSourceV2(writer, query) =>
WriteToDataSourceV2Exec(writer, planLater(query)) :: Nil

View file

@ -0,0 +1,94 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.spark.sql.execution.datasources.v2
import org.apache.commons.lang3.StringUtils
import org.apache.spark.sql.catalyst.expressions.Attribute
import org.apache.spark.sql.internal.SQLConf
import org.apache.spark.sql.sources.DataSourceRegister
import org.apache.spark.sql.sources.v2.DataSourceV2
import org.apache.spark.sql.sources.v2.reader._
import org.apache.spark.util.Utils
/**
* A trait that can be used by data source v2 related query plans(both logical and physical), to
* provide a string format of the data source information for explain.
*/
trait DataSourceV2StringFormat {
/**
* The instance of this data source implementation. Note that we only consider its class in
* equals/hashCode, not the instance itself.
*/
def source: DataSourceV2
/**
* The output of the data source reader, w.r.t. column pruning.
*/
def output: Seq[Attribute]
/**
* The options for this data source reader.
*/
def options: Map[String, String]
/**
* The created data source reader. Here we use it to get the filters that has been pushed down
* so far, itself doesn't take part in the equals/hashCode.
*/
def reader: DataSourceReader
private lazy val filters = reader match {
case s: SupportsPushDownCatalystFilters => s.pushedCatalystFilters().toSet
case s: SupportsPushDownFilters => s.pushedFilters().toSet
case _ => Set.empty
}
private def sourceName: String = source match {
case registered: DataSourceRegister => registered.shortName()
case _ => source.getClass.getSimpleName.stripSuffix("$")
}
def metadataString: String = {
val entries = scala.collection.mutable.ArrayBuffer.empty[(String, String)]
if (filters.nonEmpty) {
entries += "Filters" -> filters.mkString("[", ", ", "]")
}
// TODO: we should only display some standard options like path, table, etc.
if (options.nonEmpty) {
entries += "Options" -> Utils.redact(options).map {
case (k, v) => s"$k=$v"
}.mkString("[", ",", "]")
}
val outputStr = Utils.truncatedString(output, "[", ", ", "]")
val entriesStr = if (entries.nonEmpty) {
Utils.truncatedString(entries.map {
case (key, value) => key + ": " + StringUtils.abbreviate(value, 100)
}, " (", ", ", ")")
} else {
""
}
s"$sourceName$outputStr$entriesStr"
}
}

View file

@ -20,16 +20,16 @@ package org.apache.spark.sql.execution.streaming
import java.util.Optional
import scala.collection.JavaConverters._
import scala.collection.mutable.{ArrayBuffer, Map => MutableMap}
import scala.collection.mutable.{Map => MutableMap}
import org.apache.spark.sql.{Dataset, SparkSession}
import org.apache.spark.sql.catalyst.encoders.RowEncoder
import org.apache.spark.sql.catalyst.expressions.{Alias, Attribute, AttributeMap, CurrentBatchTimestamp, CurrentDate, CurrentTimestamp}
import org.apache.spark.sql.catalyst.expressions.{Alias, CurrentBatchTimestamp, CurrentDate, CurrentTimestamp}
import org.apache.spark.sql.catalyst.plans.logical.{LocalRelation, LogicalPlan, Project}
import org.apache.spark.sql.execution.SQLExecution
import org.apache.spark.sql.execution.datasources.v2.{StreamingDataSourceV2Relation, WriteToDataSourceV2}
import org.apache.spark.sql.execution.streaming.sources.{InternalRowMicroBatchWriter, MicroBatchWriter}
import org.apache.spark.sql.sources.v2.{DataSourceOptions, MicroBatchReadSupport, StreamWriteSupport}
import org.apache.spark.sql.sources.v2.{DataSourceOptions, DataSourceV2, MicroBatchReadSupport, StreamWriteSupport}
import org.apache.spark.sql.sources.v2.reader.streaming.{MicroBatchReader, Offset => OffsetV2}
import org.apache.spark.sql.sources.v2.writer.SupportsWriteInternalRow
import org.apache.spark.sql.streaming.{OutputMode, ProcessingTime, Trigger}
@ -52,6 +52,9 @@ class MicroBatchExecution(
@volatile protected var sources: Seq[BaseStreamingSource] = Seq.empty
private val readerToDataSourceMap =
MutableMap.empty[MicroBatchReader, (DataSourceV2, Map[String, String])]
private val triggerExecutor = trigger match {
case t: ProcessingTime => ProcessingTimeExecutor(t, triggerClock)
case OneTimeTrigger => OneTimeExecutor()
@ -97,6 +100,7 @@ class MicroBatchExecution(
metadataPath,
new DataSourceOptions(options.asJava))
nextSourceId += 1
readerToDataSourceMap(reader) = dataSourceV2 -> options
logInfo(s"Using MicroBatchReader [$reader] from " +
s"DataSourceV2 named '$sourceName' [$dataSourceV2]")
StreamingExecutionRelation(reader, output)(sparkSession)
@ -419,8 +423,19 @@ class MicroBatchExecution(
toJava(current),
Optional.of(availableV2))
logDebug(s"Retrieving data from $reader: $current -> $availableV2")
Some(reader ->
new StreamingDataSourceV2Relation(reader.readSchema().toAttributes, reader))
val (source, options) = reader match {
// `MemoryStream` is special. It's for test only and doesn't have a `DataSourceV2`
// implementation. We provide a fake one here for explain.
case _: MemoryStream[_] => MemoryStreamDataSource -> Map.empty[String, String]
// Provide a fake value here just in case something went wrong, e.g. the reader gives
// a wrong `equals` implementation.
case _ => readerToDataSourceMap.getOrElse(reader, {
FakeDataSourceV2 -> Map.empty[String, String]
})
}
Some(reader -> StreamingDataSourceV2Relation(
reader.readSchema().toAttributes, source, options, reader))
case _ => None
}
}
@ -525,3 +540,7 @@ class MicroBatchExecution(
object MicroBatchExecution {
val BATCH_ID_KEY = "streaming.sql.batchId"
}
object MemoryStreamDataSource extends DataSourceV2
object FakeDataSourceV2 extends DataSourceV2

View file

@ -29,7 +29,7 @@ import org.apache.spark.sql.SparkSession
import org.apache.spark.sql.catalyst.expressions.{Attribute, AttributeMap, CurrentDate, CurrentTimestamp}
import org.apache.spark.sql.catalyst.plans.logical.LogicalPlan
import org.apache.spark.sql.execution.SQLExecution
import org.apache.spark.sql.execution.datasources.v2.{DataSourceV2Relation, StreamingDataSourceV2Relation, WriteToDataSourceV2}
import org.apache.spark.sql.execution.datasources.v2.{StreamingDataSourceV2Relation, WriteToDataSourceV2}
import org.apache.spark.sql.execution.streaming.{ContinuousExecutionRelation, StreamingRelationV2, _}
import org.apache.spark.sql.sources.v2.{ContinuousReadSupport, DataSourceOptions, StreamWriteSupport}
import org.apache.spark.sql.sources.v2.reader.streaming.{ContinuousReader, PartitionOffset}
@ -167,7 +167,7 @@ class ContinuousExecution(
var insertedSourceId = 0
val withNewSources = logicalPlan transform {
case ContinuousExecutionRelation(_, _, output) =>
case ContinuousExecutionRelation(source, options, output) =>
val reader = continuousSources(insertedSourceId)
insertedSourceId += 1
val newOutput = reader.readSchema().toAttributes
@ -180,7 +180,7 @@ class ContinuousExecution(
val loggedOffset = offsets.offsets(0)
val realOffset = loggedOffset.map(off => reader.deserializeOffset(off.json))
reader.setStartOffset(java.util.Optional.ofNullable(realOffset.orNull))
new StreamingDataSourceV2Relation(newOutput, reader)
StreamingDataSourceV2Relation(newOutput, source, options, reader)
}
// Rewire the plan to use the new attributes that were returned by the source.
@ -201,7 +201,7 @@ class ContinuousExecution(
val withSink = WriteToDataSourceV2(writer, triggerLogicalPlan)
val reader = withSink.collect {
case StreamingDataSourceV2Relation(_, r: ContinuousReader) => r
case StreamingDataSourceV2Relation(_, _, _, r: ContinuousReader) => r
}.head
reportTimeTaken("queryPlanning") {

View file

@ -492,16 +492,20 @@ class StreamSuite extends StreamTest {
val explainWithoutExtended = q.explainInternal(false)
// `extended = false` only displays the physical plan.
assert("StreamingDataSourceV2Relation".r.findAllMatchIn(explainWithoutExtended).size === 0)
assert("DataSourceV2Scan".r.findAllMatchIn(explainWithoutExtended).size === 1)
assert("Streaming RelationV2 MemoryStreamDataSource".r
.findAllMatchIn(explainWithoutExtended).size === 0)
assert("ScanV2 MemoryStreamDataSource".r
.findAllMatchIn(explainWithoutExtended).size === 1)
// Use "StateStoreRestore" to verify that it does output a streaming physical plan
assert(explainWithoutExtended.contains("StateStoreRestore"))
val explainWithExtended = q.explainInternal(true)
// `extended = true` displays 3 logical plans (Parsed/Optimized/Optimized) and 1 physical
// plan.
assert("StreamingDataSourceV2Relation".r.findAllMatchIn(explainWithExtended).size === 3)
assert("DataSourceV2Scan".r.findAllMatchIn(explainWithExtended).size === 1)
assert("Streaming RelationV2 MemoryStreamDataSource".r
.findAllMatchIn(explainWithExtended).size === 3)
assert("ScanV2 MemoryStreamDataSource".r
.findAllMatchIn(explainWithExtended).size === 1)
// Use "StateStoreRestore" to verify that it does output a streaming physical plan
assert(explainWithExtended.contains("StateStoreRestore"))
} finally {

View file

@ -629,8 +629,8 @@ trait StreamTest extends QueryTest with SharedSQLContext with TimeLimits with Be
def findSourceIndex(plan: LogicalPlan): Option[Int] = {
plan
.collect {
case StreamingExecutionRelation(s, _) => s
case StreamingDataSourceV2Relation(_, r) => r
case r: StreamingExecutionRelation => r.source
case r: StreamingDataSourceV2Relation => r.reader
}
.zipWithIndex
.find(_._1 == source)

View file

@ -17,15 +17,12 @@
package org.apache.spark.sql.streaming.continuous
import java.util.UUID
import org.apache.spark.{SparkContext, SparkEnv, SparkException}
import org.apache.spark.scheduler.{SparkListener, SparkListenerJobStart, SparkListenerTaskStart}
import org.apache.spark.{SparkContext, SparkException}
import org.apache.spark.scheduler.{SparkListener, SparkListenerTaskStart}
import org.apache.spark.sql._
import org.apache.spark.sql.execution.datasources.v2.{DataSourceV2ScanExec, WriteToDataSourceV2Exec}
import org.apache.spark.sql.execution.datasources.v2.DataSourceV2ScanExec
import org.apache.spark.sql.execution.streaming._
import org.apache.spark.sql.execution.streaming.continuous._
import org.apache.spark.sql.execution.streaming.sources.MemorySinkV2
import org.apache.spark.sql.functions._
import org.apache.spark.sql.streaming.{StreamTest, Trigger}
import org.apache.spark.sql.test.TestSparkSession
@ -43,7 +40,7 @@ class ContinuousSuiteBase extends StreamTest {
case s: ContinuousExecution =>
assert(numTriggers >= 2, "must wait for at least 2 triggers to ensure query is initialized")
val reader = s.lastExecution.executedPlan.collectFirst {
case DataSourceV2ScanExec(_, r: RateStreamContinuousReader) => r
case DataSourceV2ScanExec(_, _, _, r: RateStreamContinuousReader) => r
}.get
val deltaMs = numTriggers * 1000 + 300