[SPARK-26695][SQL] data source v2 API refactor - continuous read

## What changes were proposed in this pull request?

Following https://github.com/apache/spark/pull/23430, this PR does the API refactor for continuous read, w.r.t. the [doc](https://docs.google.com/document/d/1uUmKCpWLdh9vHxP7AWJ9EgbwB_U6T3EJYNjhISGmiQg/edit?usp=sharing)

The major changes:
1. rename `XXXContinuousReadSupport` to `XXXContinuousStream`
2. at the beginning of continuous streaming execution, convert `StreamingRelationV2` to `StreamingDataSourceV2Relation` directly, instead of `StreamingExecutionRelation`.
3. remove all the hacks as we have finished all the read side API refactor

## How was this patch tested?

existing tests

Closes #23619 from cloud-fan/continuous.

Authored-by: Wenchen Fan <wenchen@databricks.com>
Signed-off-by: gatorsmile <gatorsmile@gmail.com>
This commit is contained in:
Wenchen Fan 2019-01-29 00:07:27 -08:00 committed by gatorsmile
parent 16990f9299
commit e97ab1d980
55 changed files with 520 additions and 1252 deletions

View file

@ -30,10 +30,9 @@ import org.apache.spark.sql.catalyst.expressions.UnsafeRow
import org.apache.spark.sql.kafka010.KafkaSourceProvider.{INSTRUCTION_FOR_FAIL_ON_DATA_LOSS_FALSE, INSTRUCTION_FOR_FAIL_ON_DATA_LOSS_TRUE}
import org.apache.spark.sql.sources.v2.reader._
import org.apache.spark.sql.sources.v2.reader.streaming._
import org.apache.spark.sql.types.StructType
/**
* A [[ContinuousReadSupport]] for data from kafka.
* A [[ContinuousStream]] for data from kafka.
*
* @param offsetReader a reader used to get kafka offsets. Note that the actual data will be
* read by per-task consumers generated later.
@ -46,17 +45,23 @@ import org.apache.spark.sql.types.StructType
* scenarios, where some offsets after the specified initial ones can't be
* properly read.
*/
class KafkaContinuousReadSupport(
class KafkaContinuousStream(
offsetReader: KafkaOffsetReader,
kafkaParams: ju.Map[String, Object],
sourceOptions: Map[String, String],
metadataPath: String,
initialOffsets: KafkaOffsetRangeLimit,
failOnDataLoss: Boolean)
extends ContinuousReadSupport with Logging {
extends ContinuousStream with Logging {
private val pollTimeoutMs = sourceOptions.getOrElse("kafkaConsumer.pollTimeoutMs", "512").toLong
// Initialized when creating reader factories. If this diverges from the partitions at the latest
// offsets, we need to reconfigure.
// Exposed outside this object only for unit tests.
@volatile private[sql] var knownPartitions: Set[TopicPartition] = _
override def initialOffset(): Offset = {
val offsets = initialOffsets match {
case EarliestOffsetRangeLimit => KafkaSourceOffset(offsetReader.fetchEarliestOffsets())
@ -67,18 +72,32 @@ class KafkaContinuousReadSupport(
offsets
}
override def fullSchema(): StructType = KafkaOffsetReader.kafkaSchema
override def newScanConfigBuilder(start: Offset): ScanConfigBuilder = {
new KafkaContinuousScanConfigBuilder(fullSchema(), start, offsetReader, reportDataLoss)
}
override def deserializeOffset(json: String): Offset = {
KafkaSourceOffset(JsonUtils.partitionOffsets(json))
}
override def planInputPartitions(config: ScanConfig): Array[InputPartition] = {
val startOffsets = config.asInstanceOf[KafkaContinuousScanConfig].startOffsets
override def planInputPartitions(start: Offset): Array[InputPartition] = {
val oldStartPartitionOffsets = KafkaSourceOffset.getPartitionOffsets(start)
val currentPartitionSet = offsetReader.fetchEarliestOffsets().keySet
val newPartitions = currentPartitionSet.diff(oldStartPartitionOffsets.keySet)
val newPartitionOffsets = offsetReader.fetchEarliestOffsets(newPartitions.toSeq)
val deletedPartitions = oldStartPartitionOffsets.keySet.diff(currentPartitionSet)
if (deletedPartitions.nonEmpty) {
val message = if (
offsetReader.driverKafkaParams.containsKey(ConsumerConfig.GROUP_ID_CONFIG)) {
s"$deletedPartitions are gone. ${KafkaSourceProvider.CUSTOM_GROUP_ID_ERROR_MESSAGE}"
} else {
s"$deletedPartitions are gone. Some data may have been missed."
}
reportDataLoss(message)
}
val startOffsets = newPartitionOffsets ++
oldStartPartitionOffsets.filterKeys(!deletedPartitions.contains(_))
knownPartitions = startOffsets.keySet
startOffsets.toSeq.map {
case (topicPartition, start) =>
KafkaContinuousInputPartition(
@ -86,8 +105,7 @@ class KafkaContinuousReadSupport(
}.toArray
}
override def createContinuousReaderFactory(
config: ScanConfig): ContinuousPartitionReaderFactory = {
override def createContinuousReaderFactory(): ContinuousPartitionReaderFactory = {
KafkaContinuousReaderFactory
}
@ -105,8 +123,7 @@ class KafkaContinuousReadSupport(
KafkaSourceOffset(mergedMap)
}
override def needsReconfiguration(config: ScanConfig): Boolean = {
val knownPartitions = config.asInstanceOf[KafkaContinuousScanConfig].knownPartitions
override def needsReconfiguration(): Boolean = {
offsetReader.fetchLatestOffsets(None).keySet != knownPartitions
}
@ -151,47 +168,6 @@ object KafkaContinuousReaderFactory extends ContinuousPartitionReaderFactory {
}
}
class KafkaContinuousScanConfigBuilder(
schema: StructType,
startOffset: Offset,
offsetReader: KafkaOffsetReader,
reportDataLoss: String => Unit)
extends ScanConfigBuilder {
override def build(): ScanConfig = {
val oldStartPartitionOffsets = KafkaSourceOffset.getPartitionOffsets(startOffset)
val currentPartitionSet = offsetReader.fetchEarliestOffsets().keySet
val newPartitions = currentPartitionSet.diff(oldStartPartitionOffsets.keySet)
val newPartitionOffsets = offsetReader.fetchEarliestOffsets(newPartitions.toSeq)
val deletedPartitions = oldStartPartitionOffsets.keySet.diff(currentPartitionSet)
if (deletedPartitions.nonEmpty) {
val message = if (
offsetReader.driverKafkaParams.containsKey(ConsumerConfig.GROUP_ID_CONFIG)) {
s"$deletedPartitions are gone. ${KafkaSourceProvider.CUSTOM_GROUP_ID_ERROR_MESSAGE}"
} else {
s"$deletedPartitions are gone. Some data may have been missed."
}
reportDataLoss(message)
}
val startOffsets = newPartitionOffsets ++
oldStartPartitionOffsets.filterKeys(!deletedPartitions.contains(_))
KafkaContinuousScanConfig(schema, startOffsets)
}
}
case class KafkaContinuousScanConfig(
readSchema: StructType,
startOffsets: Map[TopicPartition, Long])
extends ScanConfig {
// Created when building the scan config builder. If this diverges from the partitions at the
// latest offsets, we need to reconfigure the kafka read support.
def knownPartitions: Set[TopicPartition] = startOffsets.keySet
}
/**
* A per-task data reader for continuous Kafka processing.
*

View file

@ -32,7 +32,7 @@ import org.apache.spark.sql.execution.streaming.{Sink, Source}
import org.apache.spark.sql.sources._
import org.apache.spark.sql.sources.v2._
import org.apache.spark.sql.sources.v2.reader.{Scan, ScanBuilder}
import org.apache.spark.sql.sources.v2.reader.streaming.MicroBatchStream
import org.apache.spark.sql.sources.v2.reader.streaming.{ContinuousStream, MicroBatchStream}
import org.apache.spark.sql.sources.v2.writer.streaming.StreamingWriteSupport
import org.apache.spark.sql.streaming.OutputMode
import org.apache.spark.sql.types.StructType
@ -48,7 +48,6 @@ private[kafka010] class KafkaSourceProvider extends DataSourceRegister
with RelationProvider
with CreatableRelationProvider
with StreamingWriteSupportProvider
with ContinuousReadSupportProvider
with TableProvider
with Logging {
import KafkaSourceProvider._
@ -107,46 +106,6 @@ private[kafka010] class KafkaSourceProvider extends DataSourceRegister
new KafkaTable(strategy(options.asMap().asScala.toMap))
}
/**
* Creates a [[org.apache.spark.sql.sources.v2.reader.streaming.ContinuousReadSupport]] to read
* Kafka data in a continuous streaming query.
*/
override def createContinuousReadSupport(
metadataPath: String,
options: DataSourceOptions): KafkaContinuousReadSupport = {
val parameters = options.asMap().asScala.toMap
validateStreamOptions(parameters)
// Each running query should use its own group id. Otherwise, the query may be only assigned
// partial data since Kafka will assign partitions to multiple consumers having the same group
// id. Hence, we should generate a unique id for each query.
val uniqueGroupId = streamingUniqueGroupId(parameters, metadataPath)
val caseInsensitiveParams = parameters.map { case (k, v) => (k.toLowerCase(Locale.ROOT), v) }
val specifiedKafkaParams =
parameters
.keySet
.filter(_.toLowerCase(Locale.ROOT).startsWith("kafka."))
.map { k => k.drop(6).toString -> parameters(k) }
.toMap
val startingStreamOffsets = KafkaSourceProvider.getKafkaOffsetRangeLimit(caseInsensitiveParams,
STARTING_OFFSETS_OPTION_KEY, LatestOffsetRangeLimit)
val kafkaOffsetReader = new KafkaOffsetReader(
strategy(caseInsensitiveParams),
kafkaParamsForDriver(specifiedKafkaParams),
parameters,
driverGroupIdPrefix = s"$uniqueGroupId-driver")
new KafkaContinuousReadSupport(
kafkaOffsetReader,
kafkaParamsForExecutors(specifiedKafkaParams, uniqueGroupId),
parameters,
metadataPath,
startingStreamOffsets,
failOnDataLoss(caseInsensitiveParams))
}
/**
* Returns a new base relation with the given parameters.
*
@ -406,7 +365,7 @@ private[kafka010] class KafkaSourceProvider extends DataSourceRegister
}
class KafkaTable(strategy: => ConsumerStrategy) extends Table
with SupportsMicroBatchRead {
with SupportsMicroBatchRead with SupportsContinuousRead {
override def name(): String = s"Kafka $strategy"
@ -449,6 +408,40 @@ private[kafka010] class KafkaSourceProvider extends DataSourceRegister
startingStreamOffsets,
failOnDataLoss(caseInsensitiveParams))
}
override def toContinuousStream(checkpointLocation: String): ContinuousStream = {
val parameters = options.asMap().asScala.toMap
validateStreamOptions(parameters)
// Each running query should use its own group id. Otherwise, the query may be only assigned
// partial data since Kafka will assign partitions to multiple consumers having the same group
// id. Hence, we should generate a unique id for each query.
val uniqueGroupId = streamingUniqueGroupId(parameters, checkpointLocation)
val caseInsensitiveParams = parameters.map { case (k, v) => (k.toLowerCase(Locale.ROOT), v) }
val specifiedKafkaParams =
parameters
.keySet
.filter(_.toLowerCase(Locale.ROOT).startsWith("kafka."))
.map { k => k.drop(6).toString -> parameters(k) }
.toMap
val startingStreamOffsets = KafkaSourceProvider.getKafkaOffsetRangeLimit(
caseInsensitiveParams, STARTING_OFFSETS_OPTION_KEY, LatestOffsetRangeLimit)
val kafkaOffsetReader = new KafkaOffsetReader(
strategy(caseInsensitiveParams),
kafkaParamsForDriver(specifiedKafkaParams),
parameters,
driverGroupIdPrefix = s"$uniqueGroupId-driver")
new KafkaContinuousStream(
kafkaOffsetReader,
kafkaParamsForExecutors(specifiedKafkaParams, uniqueGroupId),
parameters,
checkpointLocation,
startingStreamOffsets,
failOnDataLoss(caseInsensitiveParams))
}
}
}

View file

@ -209,11 +209,11 @@ class KafkaContinuousSourceTopicDeletionSuite extends KafkaContinuousTest {
assert(
query.lastExecution.executedPlan.collectFirst {
case scan: ContinuousScanExec
if scan.readSupport.isInstanceOf[KafkaContinuousReadSupport] =>
scan.scanConfig.asInstanceOf[KafkaContinuousScanConfig]
}.exists { config =>
if scan.stream.isInstanceOf[KafkaContinuousStream] =>
scan.stream.asInstanceOf[KafkaContinuousStream]
}.exists { stream =>
// Ensure the new topic is present and the old topic is gone.
config.knownPartitions.exists(_.topic == topic2)
stream.knownPartitions.exists(_.topic == topic2)
},
s"query never reconfigured to new topic $topic2")
}

View file

@ -48,8 +48,8 @@ trait KafkaContinuousTest extends KafkaSourceTest {
assert(
query.lastExecution.executedPlan.collectFirst {
case scan: ContinuousScanExec
if scan.readSupport.isInstanceOf[KafkaContinuousReadSupport] =>
scan.scanConfig.asInstanceOf[KafkaContinuousScanConfig]
if scan.stream.isInstanceOf[KafkaContinuousStream] =>
scan.stream.asInstanceOf[KafkaContinuousStream]
}.exists(_.knownPartitions.size == newCount),
s"query never reconfigured to $newCount partitions")
}

View file

@ -28,14 +28,13 @@ import scala.collection.JavaConverters._
import scala.io.Source
import scala.util.Random
import org.apache.kafka.clients.admin.{AdminClient, ConsumerGroupListing}
import org.apache.kafka.clients.producer.{KafkaProducer, ProducerRecord, RecordMetadata}
import org.apache.kafka.clients.producer.{ProducerRecord, RecordMetadata}
import org.apache.kafka.common.TopicPartition
import org.scalatest.concurrent.PatienceConfiguration.Timeout
import org.scalatest.time.SpanSugar._
import org.apache.spark.sql.{Dataset, ForeachWriter, SparkSession}
import org.apache.spark.sql.execution.datasources.v2.{OldStreamingDataSourceV2Relation, StreamingDataSourceV2Relation}
import org.apache.spark.sql.execution.datasources.v2.StreamingDataSourceV2Relation
import org.apache.spark.sql.execution.exchange.ReusedExchangeExec
import org.apache.spark.sql.execution.streaming._
import org.apache.spark.sql.execution.streaming.continuous.ContinuousExecution
@ -118,17 +117,10 @@ abstract class KafkaSourceTest extends StreamTest with SharedSQLContext with Kaf
val sources: Seq[BaseStreamingSource] = {
query.get.logicalPlan.collect {
case StreamingExecutionRelation(source: KafkaSource, _) => source
case r: StreamingDataSourceV2Relation
if r.stream.isInstanceOf[KafkaMicroBatchStream] =>
r.stream.asInstanceOf[KafkaMicroBatchStream]
} ++ (query.get.lastExecution match {
case null => Seq()
case e => e.logical.collect {
case r: OldStreamingDataSourceV2Relation
if r.readSupport.isInstanceOf[KafkaContinuousReadSupport] =>
r.readSupport.asInstanceOf[KafkaContinuousReadSupport]
}
})
case r: StreamingDataSourceV2Relation if r.stream.isInstanceOf[KafkaMicroBatchStream] ||
r.stream.isInstanceOf[KafkaContinuousStream] =>
r.stream
}
}.distinct
if (sources.isEmpty) {

View file

@ -1,61 +0,0 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.spark.sql.sources.v2;
import org.apache.spark.annotation.Evolving;
import org.apache.spark.sql.execution.datasources.v2.DataSourceV2Utils;
import org.apache.spark.sql.sources.v2.reader.BatchReadSupport;
import org.apache.spark.sql.types.StructType;
/**
* A mix-in interface for {@link DataSourceV2}. Data sources can implement this interface to
* provide data reading ability for batch processing.
*
* This interface is used to create {@link BatchReadSupport} instances when end users run
* {@code SparkSession.read.format(...).option(...).load()}.
*/
@Evolving
public interface BatchReadSupportProvider extends DataSourceV2 {
/**
* Creates a {@link BatchReadSupport} instance to load the data from this data source with a user
* specified schema, which is called by Spark at the beginning of each batch query.
*
* Spark will call this method at the beginning of each batch query to create a
* {@link BatchReadSupport} instance.
*
* By default this method throws {@link UnsupportedOperationException}, implementations should
* override this method to handle user specified schema.
*
* @param schema the user specified schema.
* @param options the options for the returned data source reader, which is an immutable
* case-insensitive string-to-string map.
*/
default BatchReadSupport createBatchReadSupport(StructType schema, DataSourceOptions options) {
return DataSourceV2Utils.failForUserSpecifiedSchema(this);
}
/**
* Creates a {@link BatchReadSupport} instance to scan the data from this data source, which is
* called by Spark at the beginning of each batch query.
*
* @param options the options for the returned data source reader, which is an immutable
* case-insensitive string-to-string map.
*/
BatchReadSupport createBatchReadSupport(DataSourceOptions options);
}

View file

@ -1,70 +0,0 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.spark.sql.sources.v2;
import org.apache.spark.annotation.Evolving;
import org.apache.spark.sql.execution.datasources.v2.DataSourceV2Utils;
import org.apache.spark.sql.sources.v2.reader.streaming.ContinuousReadSupport;
import org.apache.spark.sql.types.StructType;
/**
* A mix-in interface for {@link DataSourceV2}. Data sources can implement this interface to
* provide data reading ability for continuous stream processing.
*
* This interface is used to create {@link ContinuousReadSupport} instances when end users run
* {@code SparkSession.readStream.format(...).option(...).load()} with a continuous trigger.
*/
@Evolving
public interface ContinuousReadSupportProvider extends DataSourceV2 {
/**
* Creates a {@link ContinuousReadSupport} instance to scan the data from this streaming data
* source with a user specified schema, which is called by Spark at the beginning of each
* continuous streaming query.
*
* By default this method throws {@link UnsupportedOperationException}, implementations should
* override this method to handle user specified schema.
*
* @param schema the user provided schema.
* @param checkpointLocation a path to Hadoop FS scratch space that can be used for failure
* recovery. Readers for the same logical source in the same query
* will be given the same checkpointLocation.
* @param options the options for the returned data source reader, which is an immutable
* case-insensitive string-to-string map.
*/
default ContinuousReadSupport createContinuousReadSupport(
StructType schema,
String checkpointLocation,
DataSourceOptions options) {
return DataSourceV2Utils.failForUserSpecifiedSchema(this);
}
/**
* Creates a {@link ContinuousReadSupport} instance to scan the data from this streaming data
* source, which is called by Spark at the beginning of each continuous streaming query.
*
* @param checkpointLocation a path to Hadoop FS scratch space that can be used for failure
* recovery. Readers for the same logical source in the same query
* will be given the same checkpointLocation.
* @param options the options for the returned data source reader, which is an immutable
* case-insensitive string-to-string map.
*/
ContinuousReadSupport createContinuousReadSupport(
String checkpointLocation,
DataSourceOptions options);
}

View file

@ -20,7 +20,7 @@ package org.apache.spark.sql.sources.v2;
import org.apache.spark.annotation.Evolving;
/**
* TODO: remove it when we finish the API refactor for streaming side.
* TODO: remove it when we finish the API refactor for streaming write side.
*/
@Evolving
public interface DataSourceV2 {}

View file

@ -15,16 +15,20 @@
* limitations under the License.
*/
package org.apache.spark.sql.sources.v2.reader;
package org.apache.spark.sql.sources.v2;
import org.apache.spark.annotation.Evolving;
import org.apache.spark.sql.sources.v2.reader.Scan;
import org.apache.spark.sql.sources.v2.reader.ScanBuilder;
/**
* An interface for building the {@link ScanConfig}. Implementations can mixin those
* SupportsPushDownXYZ interfaces to do operator pushdown, and keep the operator pushdown result in
* the returned {@link ScanConfig}.
* An empty mix-in interface for {@link Table}, to indicate this table supports streaming scan with
* continuous mode.
* <p>
* If a {@link Table} implements this interface, the
* {@link SupportsRead#newScanBuilder(DataSourceOptions)} must return a {@link ScanBuilder} that
* builds {@link Scan} with {@link Scan#toContinuousStream(String)} implemented.
* </p>
*/
@Evolving
public interface ScanConfigBuilder {
ScanConfig build();
}
public interface SupportsContinuousRead extends SupportsRead { }

View file

@ -42,7 +42,7 @@ public interface Batch {
InputPartition[] planInputPartitions();
/**
* Returns a factory, which produces one {@link PartitionReader} for one {@link InputPartition}.
* Returns a factory to create a {@link PartitionReader} for each {@link InputPartition}.
*/
PartitionReaderFactory createReaderFactory();
}

View file

@ -1,51 +0,0 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.spark.sql.sources.v2.reader;
import org.apache.spark.annotation.Evolving;
/**
* An interface that defines how to load the data from data source for batch processing.
*
* The execution engine will get an instance of this interface from a data source provider
* (e.g. {@link org.apache.spark.sql.sources.v2.BatchReadSupportProvider}) at the start of a batch
* query, then call {@link #newScanConfigBuilder()} and create an instance of {@link ScanConfig}.
* The {@link ScanConfigBuilder} can apply operator pushdown and keep the pushdown result in
* {@link ScanConfig}. The {@link ScanConfig} will be used to create input partitions and reader
* factory to scan data from the data source with a Spark job.
*/
@Evolving
public interface BatchReadSupport extends ReadSupport {
/**
* Returns a builder of {@link ScanConfig}. Spark will call this method and create a
* {@link ScanConfig} for each data scanning job.
*
* The builder can take some query specific information to do operators pushdown, and keep these
* information in the created {@link ScanConfig}.
*
* This is the first step of the data scan. All other methods in {@link BatchReadSupport} needs
* to take {@link ScanConfig} as an input.
*/
ScanConfigBuilder newScanConfigBuilder();
/**
* Returns a factory, which produces one {@link PartitionReader} for one {@link InputPartition}.
*/
PartitionReaderFactory createReaderFactory(ScanConfig config);
}

View file

@ -23,7 +23,7 @@ import org.apache.spark.annotation.Evolving;
/**
* A serializable representation of an input partition returned by
* {@link ReadSupport#planInputPartitions(ScanConfig)}.
* {@link Batch#planInputPartitions()} and the corresponding ones in streaming .
*
* Note that {@link InputPartition} will be serialized and sent to executors, then
* {@link PartitionReader} will be created by

View file

@ -1,38 +0,0 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.spark.sql.sources.v2.reader;
import org.apache.spark.annotation.Evolving;
import org.apache.spark.sql.sources.v2.reader.partitioning.Partitioning;
/**
* A mix in interface for {@link BatchReadSupport}. Data sources can implement this interface to
* report data partitioning and try to avoid shuffle at Spark side.
*
* Note that, when a {@link ReadSupport} implementation creates exactly one {@link InputPartition},
* Spark may avoid adding a shuffle even if the reader does not implement this interface.
*/
@Evolving
// TODO: remove it, after we finish the API refactor completely.
public interface OldSupportsReportPartitioning extends ReadSupport {
/**
* Returns the output data partitioning that this reader guarantees.
*/
Partitioning outputPartitioning(ScanConfig config);
}

View file

@ -1,38 +0,0 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.spark.sql.sources.v2.reader;
import org.apache.spark.annotation.Evolving;
/**
* A mix in interface for {@link BatchReadSupport}. Data sources can implement this interface to
* report statistics to Spark.
*
* As of Spark 2.4, statistics are reported to the optimizer before any operator is pushed to the
* data source. Implementations that return more accurate statistics based on pushed operators will
* not improve query performance until the planner can push operators before getting stats.
*/
@Evolving
// TODO: remove it, after we finish the API refactor completely.
public interface OldSupportsReportStatistics extends ReadSupport {
/**
* Returns the estimated statistics of this data source scan.
*/
Statistics estimateStatistics(ScanConfig config);
}

View file

@ -1,50 +0,0 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.spark.sql.sources.v2.reader;
import org.apache.spark.annotation.Evolving;
import org.apache.spark.sql.types.StructType;
/**
* The base interface for all the batch and streaming read supports. Data sources should implement
* concrete read support interfaces like {@link BatchReadSupport}.
*
* If Spark fails to execute any methods in the implementations of this interface (by throwing an
* exception), the read action will fail and no Spark job will be submitted.
*/
@Evolving
public interface ReadSupport {
/**
* Returns the full schema of this data source, which is usually the physical schema of the
* underlying storage. This full schema should not be affected by column pruning or other
* optimizations.
*/
StructType fullSchema();
/**
* Returns a list of {@link InputPartition input partitions}. Each {@link InputPartition}
* represents a data split that can be processed by one Spark task. The number of input
* partitions returned here is the same as the number of RDD partitions this scan outputs.
*
* Note that, this may not be a full scan if the data source supports optimization like filter
* push-down. Implementations should check the input {@link ScanConfig} and adjust the resulting
* {@link InputPartition input partitions}.
*/
InputPartition[] planInputPartitions(ScanConfig config);
}

View file

@ -18,9 +18,11 @@
package org.apache.spark.sql.sources.v2.reader;
import org.apache.spark.annotation.Evolving;
import org.apache.spark.sql.sources.v2.reader.streaming.ContinuousStream;
import org.apache.spark.sql.sources.v2.reader.streaming.MicroBatchStream;
import org.apache.spark.sql.types.StructType;
import org.apache.spark.sql.sources.v2.SupportsBatchRead;
import org.apache.spark.sql.sources.v2.SupportsContinuousRead;
import org.apache.spark.sql.sources.v2.SupportsMicroBatchRead;
import org.apache.spark.sql.sources.v2.Table;
@ -65,7 +67,7 @@ public interface Scan {
* @throws UnsupportedOperationException
*/
default Batch toBatch() {
throw new UnsupportedOperationException("Batch scans are not supported");
throw new UnsupportedOperationException(description() + ": Batch scan are not supported");
}
/**
@ -81,6 +83,22 @@ public interface Scan {
* @throws UnsupportedOperationException
*/
default MicroBatchStream toMicroBatchStream(String checkpointLocation) {
throw new UnsupportedOperationException("Micro-batch scans are not supported");
throw new UnsupportedOperationException(description() + ": Micro-batch scan are not supported");
}
/**
* Returns the physical representation of this scan for streaming query with continuous mode. By
* default this method throws exception, data sources must overwrite this method to provide an
* implementation, if the {@link Table} that creates this scan implements
* {@link SupportsContinuousRead}.
*
* @param checkpointLocation a path to Hadoop FS scratch space that can be used for failure
* recovery. Data streams for the same logical source in the same query
* will be given the same checkpointLocation.
*
* @throws UnsupportedOperationException
*/
default ContinuousStream toContinuousStream(String checkpointLocation) {
throw new UnsupportedOperationException(description() + ": Continuous scan are not supported");
}
}

View file

@ -1,45 +0,0 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.spark.sql.sources.v2.reader;
import org.apache.spark.annotation.Evolving;
import org.apache.spark.sql.types.StructType;
/**
* An interface that carries query specific information for the data scanning job, like operator
* pushdown information and streaming query offsets. This is defined as an empty interface, and data
* sources should define their own {@link ScanConfig} classes.
*
* For APIs that take a {@link ScanConfig} as input, like
* {@link ReadSupport#planInputPartitions(ScanConfig)},
* {@link BatchReadSupport#createReaderFactory(ScanConfig)} and
* {@link OldSupportsReportStatistics#estimateStatistics(ScanConfig)}, implementations mostly need
* to cast the input {@link ScanConfig} to the concrete {@link ScanConfig} class of the data source.
*/
@Evolving
public interface ScanConfig {
/**
* Returns the actual schema of this data source reader, which may be different from the physical
* schema of the underlying storage, as column pruning or other optimizations may happen.
*
* If this method fails (by throwing an exception), the action will fail and no Spark job will be
* submitted.
*/
StructType readSchema();
}

View file

@ -35,7 +35,7 @@ public interface SupportsPushDownRequiredColumns extends ScanBuilder {
* also OK to do the pruning partially, e.g., a data source may not be able to prune nested
* fields, and only prune top-level columns.
*
* Note that, {@link ScanConfig#readSchema()} implementation should take care of the column
* Note that, {@link Scan#readSchema()} implementation should take care of the column
* pruning applied here.
*/
void pruneColumns(StructType requiredSchema);

View file

@ -21,14 +21,14 @@ import org.apache.spark.annotation.Evolving;
import org.apache.spark.sql.sources.v2.reader.partitioning.Partitioning;
/**
* A mix in interface for {@link Batch}. Data sources can implement this interface to
* A mix in interface for {@link Scan}. Data sources can implement this interface to
* report data partitioning and try to avoid shuffle at Spark side.
*
* Note that, when a {@link Batch} implementation creates exactly one {@link InputPartition},
* Note that, when a {@link Scan} implementation creates exactly one {@link InputPartition},
* Spark may avoid adding a shuffle even if the reader does not implement this interface.
*/
@Evolving
public interface SupportsReportPartitioning extends Batch {
public interface SupportsReportPartitioning extends Scan {
/**
* Returns the output data partitioning that this reader guarantees.

View file

@ -20,7 +20,7 @@ package org.apache.spark.sql.sources.v2.reader;
import org.apache.spark.annotation.Evolving;
/**
* A mix in interface for {@link Batch}. Data sources can implement this interface to
* A mix in interface for {@link Scan}. Data sources can implement this interface to
* report statistics to Spark.
*
* As of Spark 2.4, statistics are reported to the optimizer before any operator is pushed to the
@ -28,7 +28,7 @@ import org.apache.spark.annotation.Evolving;
* not improve query performance until the planner can push operators before getting stats.
*/
@Evolving
public interface SupportsReportStatistics extends Batch {
public interface SupportsReportStatistics extends Scan {
/**
* Returns the estimated statistics of this data source scan.

View file

@ -1,77 +0,0 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.spark.sql.sources.v2.reader.streaming;
import org.apache.spark.annotation.Evolving;
import org.apache.spark.sql.execution.streaming.BaseStreamingSource;
import org.apache.spark.sql.sources.v2.reader.InputPartition;
import org.apache.spark.sql.sources.v2.reader.ScanConfig;
import org.apache.spark.sql.sources.v2.reader.ScanConfigBuilder;
/**
* An interface that defines how to load the data from data source for continuous streaming
* processing.
*
* The execution engine will get an instance of this interface from a data source provider
* (e.g. {@link org.apache.spark.sql.sources.v2.ContinuousReadSupportProvider}) at the start of a
* streaming query, then call {@link #newScanConfigBuilder(Offset)} and create an instance of
* {@link ScanConfig} for the duration of the streaming query or until
* {@link #needsReconfiguration(ScanConfig)} is true. The {@link ScanConfig} will be used to create
* input partitions and reader factory to scan data with a Spark job for its duration. At the end
* {@link #stop()} will be called when the streaming execution is completed. Note that a single
* query may have multiple executions due to restart or failure recovery.
*/
@Evolving
public interface ContinuousReadSupport extends StreamingReadSupport, BaseStreamingSource {
/**
* Returns a builder of {@link ScanConfig}. Spark will call this method and create a
* {@link ScanConfig} for each data scanning job.
*
* The builder can take some query specific information to do operators pushdown, store streaming
* offsets, etc., and keep these information in the created {@link ScanConfig}.
*
* This is the first step of the data scan. All other methods in {@link ContinuousReadSupport}
* needs to take {@link ScanConfig} as an input.
*/
ScanConfigBuilder newScanConfigBuilder(Offset start);
/**
* Returns a factory, which produces one {@link ContinuousPartitionReader} for one
* {@link InputPartition}.
*/
ContinuousPartitionReaderFactory createContinuousReaderFactory(ScanConfig config);
/**
* Merge partitioned offsets coming from {@link ContinuousPartitionReader} instances
* for each partition to a single global offset.
*/
Offset mergeOffsets(PartitionOffset[] offsets);
/**
* The execution engine will call this method in every epoch to determine if new input
* partitions need to be generated, which may be required if for example the underlying
* source system has had partitions added or removed.
*
* If true, the query will be shut down and restarted with a new {@link ContinuousReadSupport}
* instance.
*/
default boolean needsReconfiguration(ScanConfig config) {
return false;
}
}

View file

@ -0,0 +1,70 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.spark.sql.sources.v2.reader.streaming;
import org.apache.spark.annotation.Evolving;
import org.apache.spark.sql.sources.v2.reader.InputPartition;
import org.apache.spark.sql.sources.v2.reader.Scan;
/**
* A {@link SparkDataStream} for streaming queries with continuous mode.
*/
@Evolving
public interface ContinuousStream extends SparkDataStream {
/**
* Returns a list of {@link InputPartition input partitions} given the start offset. Each
* {@link InputPartition} represents a data split that can be processed by one Spark task. The
* number of input partitions returned here is the same as the number of RDD partitions this scan
* outputs.
* <p>
* If the {@link Scan} supports filter pushdown, this stream is likely configured with a filter
* and is responsible for creating splits for that filter, which is not a full scan.
* </p>
* <p>
* This method will be called to launch one Spark job for reading the data stream. It will be
* called more than once, if {@link #needsReconfiguration()} returns true and Spark needs to
* launch a new job.
* </p>
*/
InputPartition[] planInputPartitions(Offset start);
/**
* Returns a factory to create a {@link ContinuousPartitionReader} for each
* {@link InputPartition}.
*/
ContinuousPartitionReaderFactory createContinuousReaderFactory();
/**
* Merge partitioned offsets coming from {@link ContinuousPartitionReader} instances
* for each partition to a single global offset.
*/
Offset mergeOffsets(PartitionOffset[] offsets);
/**
* The execution engine will call this method in every epoch to determine if new input
* partitions need to be generated, which may be required if for example the underlying
* source system has had partitions added or removed.
*
* If true, the Spark job to scan this continuous data stream will be interrupted and Spark will
* launch it again with a new list of {@link InputPartition input partitions}.
*/
default boolean needsReconfiguration() {
return false;
}
}

View file

@ -51,7 +51,7 @@ public interface MicroBatchStream extends SparkDataStream {
InputPartition[] planInputPartitions(Offset start, Offset end);
/**
* Returns a factory, which produces one {@link PartitionReader} for one {@link InputPartition}.
* Returns a factory to create a {@link PartitionReader} for each {@link InputPartition}.
*/
PartitionReaderFactory createReaderFactory();
}

View file

@ -21,7 +21,7 @@ import org.apache.spark.annotation.Evolving;
/**
* An abstract representation of progress through a {@link MicroBatchStream} or
* {@link ContinuousReadSupport}.
* {@link ContinuousStream}.
* During execution, offsets provided by the data source implementation will be logged and used as
* restart checkpoints. Each source should provide an offset implementation which the source can use
* to reconstruct a position in the stream up to which data has been seen/processed.

View file

@ -24,7 +24,8 @@ import org.apache.spark.sql.execution.streaming.BaseStreamingSource;
* The base interface representing a readable data stream in a Spark streaming query. It's
* responsible to manage the offsets of the streaming source in the streaming query.
*
* Data sources should implement concrete data stream interfaces: {@link MicroBatchStream}.
* Data sources should implement concrete data stream interfaces:
* {@link MicroBatchStream} and {@link ContinuousStream}.
*/
@Evolving
public interface SparkDataStream extends BaseStreamingSource {

View file

@ -1,52 +0,0 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.spark.sql.sources.v2.reader.streaming;
import com.google.common.annotations.VisibleForTesting;
import org.apache.spark.sql.sources.v2.reader.ReadSupport;
/**
* A base interface for streaming read support. Data sources should implement concrete streaming
* read support interfaces: {@link ContinuousReadSupport}.
* This is exposed for a testing purpose.
*/
@VisibleForTesting
public interface StreamingReadSupport extends ReadSupport {
/**
* Returns the initial offset for a streaming query to start reading from. Note that the
* streaming data source should not assume that it will start reading from its initial offset:
* if Spark is restarting an existing query, it will restart from the check-pointed offset rather
* than the initial one.
*/
Offset initialOffset();
/**
* Deserialize a JSON string into an Offset of the implementation-defined offset type.
*
* @throws IllegalArgumentException if the JSON does not encode a valid offset for this reader
*/
Offset deserializeOffset(String json);
/**
* Informs the source that Spark has completed processing all data for offsets less than or
* equal to `end` and will only request offsets greater than `end` in the future.
*/
void commit(Offset end);
}

View file

@ -0,0 +1,49 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.spark.sql.execution.datasources.v2
import org.apache.spark.rdd.RDD
import org.apache.spark.sql.catalyst.InternalRow
import org.apache.spark.sql.catalyst.expressions._
import org.apache.spark.sql.sources.v2.reader._
/**
* Physical plan node for scanning a batch of data from a data source v2.
*/
case class BatchScanExec(
output: Seq[AttributeReference],
@transient scan: Scan) extends DataSourceV2ScanExecBase {
@transient lazy val batch = scan.toBatch
// TODO: unify the equal/hashCode implementation for all data source v2 query plans.
override def equals(other: Any): Boolean = other match {
case other: BatchScanExec => this.batch == other.batch
case _ => false
}
override def hashCode(): Int = batch.hashCode()
override lazy val partitions: Seq[InputPartition] = batch.planInputPartitions()
override lazy val readerFactory: PartitionReaderFactory = batch.createReaderFactory()
override lazy val inputRDD: RDD[InternalRow] = {
new DataSourceRDD(sparkContext, partitions, readerFactory, supportsBatch)
}
}

View file

@ -20,99 +20,44 @@ package org.apache.spark.sql.execution.datasources.v2
import org.apache.spark.rdd.RDD
import org.apache.spark.sql.catalyst.InternalRow
import org.apache.spark.sql.catalyst.expressions._
import org.apache.spark.sql.catalyst.plans.physical
import org.apache.spark.sql.catalyst.plans.physical.SinglePartition
import org.apache.spark.sql.execution.{ColumnarBatchScan, LeafExecNode, WholeStageCodegenExec}
import org.apache.spark.sql.execution.streaming.continuous._
import org.apache.spark.sql.sources.v2.DataSourceV2
import org.apache.spark.sql.sources.v2.reader._
import org.apache.spark.sql.sources.v2.reader.streaming.{ContinuousPartitionReaderFactory, ContinuousReadSupport}
import org.apache.spark.sql.sources.v2.reader.streaming.{ContinuousPartitionReaderFactory, ContinuousStream, Offset}
/**
* Physical plan node for scanning data from a streaming data source with continuous mode.
*/
// TODO: merge it and `MicroBatchScanExec`.
case class ContinuousScanExec(
output: Seq[AttributeReference],
@transient source: DataSourceV2,
@transient options: Map[String, String],
@transient pushedFilters: Seq[Expression],
@transient readSupport: ReadSupport,
@transient scanConfig: ScanConfig)
extends LeafExecNode with DataSourceV2StringFormat with ColumnarBatchScan {
override def simpleString(maxFields: Int): String = "ScanV2 " + metadataString(maxFields)
output: Seq[Attribute],
@transient scan: Scan,
@transient stream: ContinuousStream,
@transient start: Offset) extends DataSourceV2ScanExecBase {
// TODO: unify the equal/hashCode implementation for all data source v2 query plans.
override def equals(other: Any): Boolean = other match {
case other: ContinuousScanExec =>
output == other.output && readSupport.getClass == other.readSupport.getClass &&
options == other.options
case other: ContinuousScanExec => this.stream == other.stream
case _ => false
}
override def hashCode(): Int = {
Seq(output, source, options).hashCode()
override def hashCode(): Int = stream.hashCode()
override lazy val partitions: Seq[InputPartition] = stream.planInputPartitions(start)
override lazy val readerFactory: ContinuousPartitionReaderFactory = {
stream.createContinuousReaderFactory()
}
override def outputPartitioning: physical.Partitioning = readSupport match {
case _ if partitions.length == 1 =>
SinglePartition
case s: OldSupportsReportPartitioning =>
new DataSourcePartitioning(
s.outputPartitioning(scanConfig), AttributeMap(output.map(a => a -> a.name)))
case _ => super.outputPartitioning
}
private lazy val partitions: Seq[InputPartition] = readSupport.planInputPartitions(scanConfig)
private lazy val readerFactory = readSupport match {
case r: ContinuousReadSupport => r.createContinuousReaderFactory(scanConfig)
case _ => throw new IllegalStateException("unknown read support: " + readSupport)
}
override val supportsBatch: Boolean = {
require(partitions.forall(readerFactory.supportColumnarReads) ||
!partitions.exists(readerFactory.supportColumnarReads),
"Cannot mix row-based and columnar input partitions.")
partitions.exists(readerFactory.supportColumnarReads)
}
private lazy val inputRDD: RDD[InternalRow] = readSupport match {
case _: ContinuousReadSupport =>
assert(!supportsBatch,
"continuous stream reader does not support columnar read yet.")
EpochCoordinatorRef.get(
sparkContext.getLocalProperty(ContinuousExecution.EPOCH_COORDINATOR_ID_KEY),
sparkContext.env)
.askSync[Unit](SetReaderPartitions(partitions.size))
new ContinuousDataSourceRDD(
sparkContext,
sqlContext.conf.continuousStreamingExecutorQueueSize,
sqlContext.conf.continuousStreamingExecutorPollIntervalMs,
partitions,
schema,
readerFactory.asInstanceOf[ContinuousPartitionReaderFactory])
case _ =>
new DataSourceRDD(
sparkContext, partitions, readerFactory.asInstanceOf[PartitionReaderFactory], supportsBatch)
}
override def inputRDDs(): Seq[RDD[InternalRow]] = Seq(inputRDD)
override protected def doExecute(): RDD[InternalRow] = {
if (supportsBatch) {
WholeStageCodegenExec(this)(codegenStageId = 0).execute()
} else {
val numOutputRows = longMetric("numOutputRows")
inputRDD.map { r =>
numOutputRows += 1
r
}
}
override lazy val inputRDD: RDD[InternalRow] = {
EpochCoordinatorRef.get(
sparkContext.getLocalProperty(ContinuousExecution.EPOCH_COORDINATOR_ID_KEY),
sparkContext.env)
.askSync[Unit](SetReaderPartitions(partitions.size))
new ContinuousDataSourceRDD(
sparkContext,
sqlContext.conf.continuousStreamingExecutorQueueSize,
sqlContext.conf.continuousStreamingExecutorPollIntervalMs,
partitions,
schema,
readerFactory.asInstanceOf[ContinuousPartitionReaderFactory])
}
}

View file

@ -23,7 +23,7 @@ import scala.collection.JavaConverters._
import org.apache.spark.sql.AnalysisException
import org.apache.spark.sql.catalyst.analysis.{MultiInstanceRelation, NamedRelation}
import org.apache.spark.sql.catalyst.expressions.{Attribute, AttributeReference, Expression}
import org.apache.spark.sql.catalyst.expressions.{Attribute, AttributeReference}
import org.apache.spark.sql.catalyst.plans.logical.{LeafNode, LogicalPlan, Statistics}
import org.apache.spark.sql.catalyst.util.truncatedString
import org.apache.spark.sql.sources.v2._
@ -58,8 +58,6 @@ case class DataSourceV2Relation(
case _ => throw new AnalysisException(s"Table is not readable: ${table.name()}")
}
def newWriteBuilder(schema: StructType): WriteBuilder = table match {
case s: SupportsBatchWrite =>
val dsOptions = new DataSourceOptions(options.asJava)
@ -94,7 +92,7 @@ case class DataSourceV2Relation(
*/
case class StreamingDataSourceV2Relation(
output: Seq[Attribute],
scanDesc: String,
scan: Scan,
stream: SparkDataStream,
startOffset: Option[Offset] = None,
endOffset: Option[Offset] = None)
@ -104,7 +102,7 @@ case class StreamingDataSourceV2Relation(
override def newInstance(): LogicalPlan = copy(output = output.map(_.newInstance()))
override def computeStats(): Statistics = stream match {
override def computeStats(): Statistics = scan match {
case r: SupportsReportStatistics =>
val statistics = r.estimateStatistics()
Statistics(sizeInBytes = statistics.sizeInBytes().orElse(conf.defaultSizeInBytes))
@ -113,46 +111,6 @@ case class StreamingDataSourceV2Relation(
}
}
// TODO: remove it after finish API refactor for continuous streaming.
case class OldStreamingDataSourceV2Relation(
output: Seq[AttributeReference],
source: DataSourceV2,
options: Map[String, String],
readSupport: ReadSupport,
scanConfigBuilder: ScanConfigBuilder)
extends LeafNode with MultiInstanceRelation with DataSourceV2StringFormat {
override def isStreaming: Boolean = true
override def simpleString(maxFields: Int): String = {
"Streaming RelationV2 " + metadataString(maxFields)
}
override def pushedFilters: Seq[Expression] = Nil
override def newInstance(): LogicalPlan = copy(output = output.map(_.newInstance()))
// TODO: unify the equal/hashCode implementation for all data source v2 query plans.
override def equals(other: Any): Boolean = other match {
case other: OldStreamingDataSourceV2Relation =>
output == other.output && readSupport.getClass == other.readSupport.getClass &&
options == other.options
case _ => false
}
override def hashCode(): Int = {
Seq(output, source, options).hashCode()
}
override def computeStats(): Statistics = readSupport match {
case r: OldSupportsReportStatistics =>
val statistics = r.estimateStatistics(scanConfigBuilder.build())
Statistics(sizeInBytes = statistics.sizeInBytes().orElse(conf.defaultSizeInBytes))
case _ =>
Statistics(sizeInBytes = conf.defaultSizeInBytes)
}
}
object DataSourceV2Relation {
def create(table: Table, options: Map[String, String]): DataSourceV2Relation = {
val output = table.schema().toAttributes

View file

@ -19,39 +19,26 @@ package org.apache.spark.sql.execution.datasources.v2
import org.apache.spark.rdd.RDD
import org.apache.spark.sql.catalyst.InternalRow
import org.apache.spark.sql.catalyst.expressions._
import org.apache.spark.sql.catalyst.expressions.AttributeMap
import org.apache.spark.sql.catalyst.plans.physical
import org.apache.spark.sql.catalyst.plans.physical.SinglePartition
import org.apache.spark.sql.catalyst.util.truncatedString
import org.apache.spark.sql.execution.{ColumnarBatchScan, LeafExecNode, WholeStageCodegenExec}
import org.apache.spark.sql.sources.v2.reader._
import org.apache.spark.sql.sources.v2.reader.{InputPartition, PartitionReaderFactory, Scan, SupportsReportPartitioning}
/**
* Physical plan node for scanning a batch of data from a data source.
*/
case class DataSourceV2ScanExec(
output: Seq[AttributeReference],
scanDesc: String,
@transient batch: Batch)
extends LeafExecNode with ColumnarBatchScan {
trait DataSourceV2ScanExecBase extends LeafExecNode with ColumnarBatchScan {
def scan: Scan
def partitions: Seq[InputPartition]
def readerFactory: PartitionReaderFactory
override def simpleString(maxFields: Int): String = {
s"ScanV2${truncatedString(output, "[", ", ", "]", maxFields)} $scanDesc"
s"$nodeName${truncatedString(output, "[", ", ", "]", maxFields)} ${scan.description()}"
}
// TODO: unify the equal/hashCode implementation for all data source v2 query plans.
override def equals(other: Any): Boolean = other match {
case other: DataSourceV2ScanExec => this.batch == other.batch
case _ => false
}
override def hashCode(): Int = batch.hashCode()
private lazy val partitions = batch.planInputPartitions()
private lazy val readerFactory = batch.createReaderFactory()
override def outputPartitioning: physical.Partitioning = batch match {
override def outputPartitioning: physical.Partitioning = scan match {
case _ if partitions.length == 1 =>
SinglePartition
@ -70,13 +57,11 @@ case class DataSourceV2ScanExec(
partitions.exists(readerFactory.supportColumnarReads)
}
private lazy val inputRDD: RDD[InternalRow] = {
new DataSourceRDD(sparkContext, partitions, readerFactory, supportsBatch)
}
def inputRDD: RDD[InternalRow]
override def inputRDDs(): Seq[RDD[InternalRow]] = Seq(inputRDD)
override protected def doExecute(): RDD[InternalRow] = {
override def doExecute(): RDD[InternalRow] = {
if (supportsBatch) {
WholeStageCodegenExec(this)(codegenStageId = 0).execute()
} else {

View file

@ -27,7 +27,7 @@ import org.apache.spark.sql.execution.{FilterExec, ProjectExec, SparkPlan}
import org.apache.spark.sql.execution.datasources.DataSourceStrategy
import org.apache.spark.sql.execution.streaming.continuous.{ContinuousCoalesceExec, WriteToContinuousDataSource, WriteToContinuousDataSourceExec}
import org.apache.spark.sql.sources.v2.reader._
import org.apache.spark.sql.sources.v2.reader.streaming.{ContinuousReadSupport, MicroBatchStream}
import org.apache.spark.sql.sources.v2.reader.streaming.{ContinuousStream, MicroBatchStream}
import org.apache.spark.sql.sources.v2.writer.SupportsSaveMode
object DataSourceV2Strategy extends Strategy {
@ -117,7 +117,7 @@ object DataSourceV2Strategy extends Strategy {
|Output: ${output.mkString(", ")}
""".stripMargin)
val plan = DataSourceV2ScanExec(output, scan.description(), scan.toBatch)
val plan = BatchScanExec(output, scan)
val filterCondition = postScanFilters.reduceLeftOption(And)
val withFilter = filterCondition.map(FilterExec(_, plan)).getOrElse(plan)
@ -130,15 +130,14 @@ object DataSourceV2Strategy extends Strategy {
// ensure there is a projection, which will produce unsafe rows required by some operators
ProjectExec(r.output,
MicroBatchScanExec(
r.output, r.scanDesc, microBatchStream, r.startOffset.get, r.endOffset.get)) :: Nil
r.output, r.scan, microBatchStream, r.startOffset.get, r.endOffset.get)) :: Nil
case r: OldStreamingDataSourceV2Relation =>
// TODO: support operator pushdown for streaming data sources.
val scanConfig = r.scanConfigBuilder.build()
case r: StreamingDataSourceV2Relation if r.startOffset.isDefined && r.endOffset.isEmpty =>
val continuousStream = r.stream.asInstanceOf[ContinuousStream]
// ensure there is a projection, which will produce unsafe rows required by some operators
ProjectExec(r.output,
ContinuousScanExec(
r.output, r.source, r.options, r.pushedFilters, r.readSupport, scanConfig)) :: Nil
r.output, r.scan, continuousStream, r.startOffset.get)) :: Nil
case WriteToDataSourceV2(writer, query) =>
WriteToDataSourceV2Exec(writer, planLater(query)) :: Nil
@ -158,8 +157,7 @@ object DataSourceV2Strategy extends Strategy {
case Repartition(1, false, child) =>
val isContinuous = child.find {
case s: OldStreamingDataSourceV2Relation =>
s.readSupport.isInstanceOf[ContinuousReadSupport]
case r: StreamingDataSourceV2Relation => r.stream.isInstanceOf[ContinuousStream]
case _ => false
}.isDefined

View file

@ -19,12 +19,8 @@ package org.apache.spark.sql.execution.datasources.v2
import org.apache.spark.rdd.RDD
import org.apache.spark.sql.catalyst.InternalRow
import org.apache.spark.sql.catalyst.expressions.{Attribute, AttributeMap}
import org.apache.spark.sql.catalyst.plans.physical
import org.apache.spark.sql.catalyst.plans.physical.SinglePartition
import org.apache.spark.sql.catalyst.util.truncatedString
import org.apache.spark.sql.execution.{ColumnarBatchScan, LeafExecNode, WholeStageCodegenExec}
import org.apache.spark.sql.sources.v2.reader.SupportsReportPartitioning
import org.apache.spark.sql.catalyst.expressions.Attribute
import org.apache.spark.sql.sources.v2.reader.{InputPartition, PartitionReaderFactory, Scan}
import org.apache.spark.sql.sources.v2.reader.streaming.{MicroBatchStream, Offset}
/**
@ -32,14 +28,10 @@ import org.apache.spark.sql.sources.v2.reader.streaming.{MicroBatchStream, Offse
*/
case class MicroBatchScanExec(
output: Seq[Attribute],
scanDesc: String,
@transient scan: Scan,
@transient stream: MicroBatchStream,
@transient start: Offset,
@transient end: Offset) extends LeafExecNode with ColumnarBatchScan {
override def simpleString(maxFields: Int): String = {
s"ScanV2${truncatedString(output, "[", ", ", "]", maxFields)} $scanDesc"
}
@transient end: Offset) extends DataSourceV2ScanExecBase {
// TODO: unify the equal/hashCode implementation for all data source v2 query plans.
override def equals(other: Any): Boolean = other match {
@ -49,44 +41,11 @@ case class MicroBatchScanExec(
override def hashCode(): Int = stream.hashCode()
private lazy val partitions = stream.planInputPartitions(start, end)
override lazy val partitions: Seq[InputPartition] = stream.planInputPartitions(start, end)
private lazy val readerFactory = stream.createReaderFactory()
override lazy val readerFactory: PartitionReaderFactory = stream.createReaderFactory()
override def outputPartitioning: physical.Partitioning = stream match {
case _ if partitions.length == 1 =>
SinglePartition
case s: SupportsReportPartitioning =>
new DataSourcePartitioning(
s.outputPartitioning(), AttributeMap(output.map(a => a -> a.name)))
case _ => super.outputPartitioning
}
override def supportsBatch: Boolean = {
require(partitions.forall(readerFactory.supportColumnarReads) ||
!partitions.exists(readerFactory.supportColumnarReads),
"Cannot mix row-based and columnar input partitions.")
partitions.exists(readerFactory.supportColumnarReads)
}
private lazy val inputRDD: RDD[InternalRow] = {
override lazy val inputRDD: RDD[InternalRow] = {
new DataSourceRDD(sparkContext, partitions, readerFactory, supportsBatch)
}
override def inputRDDs(): Seq[RDD[InternalRow]] = Seq(inputRDD)
override protected def doExecute(): RDD[InternalRow] = {
if (supportsBatch) {
WholeStageCodegenExec(this)(codegenStageId = 0).execute()
} else {
val numOutputRows = longMetric("numOutputRows")
inputRDD.map { r =>
numOutputRows += 1
r
}
}
}
}

View file

@ -99,7 +99,7 @@ class MicroBatchExecution(
// TODO: operator pushdown.
val scan = table.newScanBuilder(dsOptions).build()
val stream = scan.toMicroBatchStream(metadataPath)
StreamingDataSourceV2Relation(output, scan.description(), stream)
StreamingDataSourceV2Relation(output, scan, stream)
})
case s @ StreamingRelationV2(ds, dsName, _, _, output, v1Relation) =>
v2ToExecutionRelationMap.getOrElseUpdate(s, {

View file

@ -1,40 +0,0 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.spark.sql.execution.streaming
import org.apache.spark.sql.sources.v2.reader.{ScanConfig, ScanConfigBuilder}
import org.apache.spark.sql.types.StructType
/**
* A very simple [[ScanConfigBuilder]] implementation that creates a simple [[ScanConfig]] to
* carry schema and offsets for streaming data sources.
*/
class SimpleStreamingScanConfigBuilder(
schema: StructType,
start: Offset,
end: Option[Offset] = None)
extends ScanConfigBuilder {
override def build(): ScanConfig = SimpleStreamingScanConfig(schema, start, end)
}
case class SimpleStreamingScanConfig(
readSchema: StructType,
start: Offset,
end: Option[Offset])
extends ScanConfig

View file

@ -25,7 +25,7 @@ import org.apache.spark.sql.catalyst.expressions.Attribute
import org.apache.spark.sql.catalyst.plans.logical.{LeafNode, LogicalPlan, Statistics}
import org.apache.spark.sql.execution.LeafExecNode
import org.apache.spark.sql.execution.datasources.DataSource
import org.apache.spark.sql.sources.v2.{ContinuousReadSupportProvider, DataSourceV2, Table}
import org.apache.spark.sql.sources.v2.{DataSourceV2, Table}
object StreamingRelation {
def apply(dataSource: DataSource): StreamingRelation = {
@ -110,30 +110,6 @@ case class StreamingRelationV2(
override def newInstance(): LogicalPlan = this.copy(output = output.map(_.newInstance()))(session)
}
/**
* Used to link a [[DataSourceV2]] into a continuous processing execution.
*/
case class ContinuousExecutionRelation(
source: ContinuousReadSupportProvider,
extraOptions: Map[String, String],
output: Seq[Attribute])(session: SparkSession)
extends LeafNode with MultiInstanceRelation {
override def otherCopyArgs: Seq[AnyRef] = session :: Nil
override def isStreaming: Boolean = true
override def toString: String = source.toString
// There's no sensible value here. On the execution path, this relation will be
// swapped out with microbatches. But some dataframe operations (in particular explain) do lead
// to this node surviving analysis. So we satisfy the LeafNode contract with the session default
// value.
override def computeStats(): Statistics = Statistics(
sizeInBytes = BigInt(session.sessionState.conf.defaultSizeInBytes)
)
override def newInstance(): LogicalPlan = this.copy(output = output.map(_.newInstance()))(session)
}
/**
* A dummy physical plan for [[StreamingRelation]] to support
* [[org.apache.spark.sql.Dataset.explain]]

View file

@ -22,20 +22,18 @@ import java.util.concurrent.TimeUnit
import java.util.function.UnaryOperator
import scala.collection.JavaConverters._
import scala.collection.mutable.{ArrayBuffer, Map => MutableMap}
import scala.collection.mutable.{Map => MutableMap}
import org.apache.spark.SparkEnv
import org.apache.spark.sql.SparkSession
import org.apache.spark.sql.catalyst.expressions.{Attribute, AttributeMap, CurrentDate, CurrentTimestamp}
import org.apache.spark.sql.catalyst.expressions.{CurrentDate, CurrentTimestamp}
import org.apache.spark.sql.catalyst.plans.logical.LogicalPlan
import org.apache.spark.sql.catalyst.util.truncatedString
import org.apache.spark.sql.execution.SQLExecution
import org.apache.spark.sql.execution.datasources.v2.{ContinuousScanExec, OldStreamingDataSourceV2Relation}
import org.apache.spark.sql.execution.streaming.{ContinuousExecutionRelation, StreamingRelationV2, _}
import org.apache.spark.sql.internal.SQLConf
import org.apache.spark.sql.execution.datasources.v2.StreamingDataSourceV2Relation
import org.apache.spark.sql.execution.streaming.{StreamingRelationV2, _}
import org.apache.spark.sql.sources.v2
import org.apache.spark.sql.sources.v2.{ContinuousReadSupportProvider, DataSourceOptions, StreamingWriteSupportProvider}
import org.apache.spark.sql.sources.v2.reader.streaming.{ContinuousReadSupport, PartitionOffset}
import org.apache.spark.sql.sources.v2.{DataSourceOptions, StreamingWriteSupportProvider, SupportsContinuousRead}
import org.apache.spark.sql.sources.v2.reader.streaming.{ContinuousStream, PartitionOffset}
import org.apache.spark.sql.streaming.{OutputMode, ProcessingTime, Trigger}
import org.apache.spark.util.Clock
@ -54,25 +52,39 @@ class ContinuousExecution(
sparkSession, name, checkpointRoot, analyzedPlan, sink,
trigger, triggerClock, outputMode, deleteCheckpointOnStop) {
@volatile protected var continuousSources: Seq[ContinuousReadSupport] = Seq()
override protected def sources: Seq[BaseStreamingSource] = continuousSources
@volatile protected var sources: Seq[ContinuousStream] = Seq()
// For use only in test harnesses.
private[sql] var currentEpochCoordinatorId: String = _
override val logicalPlan: LogicalPlan = {
val toExecutionRelationMap = MutableMap[StreamingRelationV2, ContinuousExecutionRelation]()
analyzedPlan.transform {
case r @ StreamingRelationV2(
source: ContinuousReadSupportProvider, _, _, extraReaderOptions, output, _) =>
// TODO: shall we create `ContinuousReadSupport` here instead of each reconfiguration?
toExecutionRelationMap.getOrElseUpdate(r, {
ContinuousExecutionRelation(source, extraReaderOptions, output)(sparkSession)
val v2ToRelationMap = MutableMap[StreamingRelationV2, StreamingDataSourceV2Relation]()
var nextSourceId = 0
val _logicalPlan = analyzedPlan.transform {
case s @ StreamingRelationV2(
ds, dsName, table: SupportsContinuousRead, options, output, _) =>
v2ToRelationMap.getOrElseUpdate(s, {
val metadataPath = s"$resolvedCheckpointRoot/sources/$nextSourceId"
nextSourceId += 1
logInfo(s"Reading table [$table] from DataSourceV2 named '$dsName' [$ds]")
val dsOptions = new DataSourceOptions(options.asJava)
// TODO: operator pushdown.
val scan = table.newScanBuilder(dsOptions).build()
val stream = scan.toContinuousStream(metadataPath)
StreamingDataSourceV2Relation(output, scan, stream)
})
case StreamingRelationV2(_, sourceName, _, _, _, _) =>
throw new UnsupportedOperationException(
s"Data source $sourceName does not support continuous processing.")
}
sources = _logicalPlan.collect {
case r: StreamingDataSourceV2Relation => r.stream.asInstanceOf[ContinuousStream]
}
uniqueSources = sources.distinct
_logicalPlan
}
private val triggerExecutor = trigger match {
@ -92,6 +104,8 @@ class ContinuousExecution(
do {
runContinuous(sparkSessionForStream)
} while (state.updateAndGet(stateUpdate) == ACTIVE)
stopSources()
}
/**
@ -135,7 +149,7 @@ class ContinuousExecution(
updateStatusMessage("Starting new streaming query")
logInfo(s"Starting new streaming query.")
currentBatchId = 0
OffsetSeq.fill(continuousSources.map(_ => null): _*)
OffsetSeq.fill(sources.map(_ => null): _*)
}
}
@ -144,47 +158,17 @@ class ContinuousExecution(
* @param sparkSessionForQuery Isolated [[SparkSession]] to run the continuous query with.
*/
private def runContinuous(sparkSessionForQuery: SparkSession): Unit = {
// A list of attributes that will need to be updated.
val replacements = new ArrayBuffer[(Attribute, Attribute)]
// Translate from continuous relation to the underlying data source.
var nextSourceId = 0
continuousSources = logicalPlan.collect {
case ContinuousExecutionRelation(dataSource, extraReaderOptions, output) =>
val metadataPath = s"$resolvedCheckpointRoot/sources/$nextSourceId"
nextSourceId += 1
dataSource.createContinuousReadSupport(
metadataPath,
new DataSourceOptions(extraReaderOptions.asJava))
}
uniqueSources = continuousSources.distinct
val offsets = getStartOffsets(sparkSessionForQuery)
var insertedSourceId = 0
val withNewSources = logicalPlan transform {
case ContinuousExecutionRelation(source, options, output) =>
val readSupport = continuousSources(insertedSourceId)
insertedSourceId += 1
val newOutput = readSupport.fullSchema().toAttributes
val maxFields = SQLConf.get.maxToStringFields
assert(output.size == newOutput.size,
s"Invalid reader: ${truncatedString(output, ",", maxFields)} != " +
s"${truncatedString(newOutput, ",", maxFields)}")
replacements ++= output.zip(newOutput)
val withNewSources: LogicalPlan = logicalPlan transform {
case relation: StreamingDataSourceV2Relation =>
val loggedOffset = offsets.offsets(0)
val realOffset = loggedOffset.map(off => readSupport.deserializeOffset(off.json))
val startOffset = realOffset.getOrElse(readSupport.initialOffset)
val scanConfigBuilder = readSupport.newScanConfigBuilder(startOffset)
OldStreamingDataSourceV2Relation(newOutput, source, options, readSupport, scanConfigBuilder)
val realOffset = loggedOffset.map(off => relation.stream.deserializeOffset(off.json))
val startOffset = realOffset.getOrElse(relation.stream.initialOffset)
relation.copy(startOffset = Some(startOffset))
}
// Rewire the plan to use the new attributes that were returned by the source.
val replacementMap = AttributeMap(replacements)
val triggerLogicalPlan = withNewSources transformAllExpressions {
case a: Attribute if replacementMap.contains(a) =>
replacementMap(a).withMetadata(a.metadata)
withNewSources.transformAllExpressions {
case (_: CurrentTimestamp | _: CurrentDate) =>
throw new IllegalStateException(
"CurrentTimestamp and CurrentDate not yet supported for continuous processing")
@ -192,15 +176,15 @@ class ContinuousExecution(
val writer = sink.createStreamingWriteSupport(
s"$runId",
triggerLogicalPlan.schema,
withNewSources.schema,
outputMode,
new DataSourceOptions(extraOptions.asJava))
val withSink = WriteToContinuousDataSource(writer, triggerLogicalPlan)
val planWithSink = WriteToContinuousDataSource(writer, withNewSources)
reportTimeTaken("queryPlanning") {
lastExecution = new IncrementalExecution(
sparkSessionForQuery,
withSink,
planWithSink,
outputMode,
checkpointFile("state"),
id,
@ -210,10 +194,9 @@ class ContinuousExecution(
lastExecution.executedPlan // Force the lazy generation of execution plan
}
val (readSupport, scanConfig) = lastExecution.executedPlan.collect {
case scan: ContinuousScanExec
if scan.readSupport.isInstanceOf[ContinuousReadSupport] =>
scan.readSupport.asInstanceOf[ContinuousReadSupport] -> scan.scanConfig
val stream = planWithSink.collect {
case relation: StreamingDataSourceV2Relation =>
relation.stream.asInstanceOf[ContinuousStream]
}.head
sparkSessionForQuery.sparkContext.setLocalProperty(
@ -233,16 +216,14 @@ class ContinuousExecution(
// Use the parent Spark session for the endpoint since it's where this query ID is registered.
val epochEndpoint =
EpochCoordinatorRef.create(
writer, readSupport, this, epochCoordinatorId, currentBatchId, sparkSession, SparkEnv.get)
writer, stream, this, epochCoordinatorId, currentBatchId, sparkSession, SparkEnv.get)
val epochUpdateThread = new Thread(new Runnable {
override def run: Unit = {
try {
triggerExecutor.execute(() => {
startTrigger()
val shouldReconfigure = readSupport.needsReconfiguration(scanConfig) &&
state.compareAndSet(ACTIVE, RECONFIGURING)
if (shouldReconfigure) {
if (stream.needsReconfiguration && state.compareAndSet(ACTIVE, RECONFIGURING)) {
if (queryExecutionThread.isAlive) {
queryExecutionThread.interrupt()
}
@ -289,7 +270,6 @@ class ContinuousExecution(
epochUpdateThread.interrupt()
epochUpdateThread.join()
stopSources()
sparkSession.sparkContext.cancelJobGroup(runId.toString)
}
}
@ -299,11 +279,11 @@ class ContinuousExecution(
*/
def addOffset(
epoch: Long,
readSupport: ContinuousReadSupport,
stream: ContinuousStream,
partitionOffsets: Seq[PartitionOffset]): Unit = {
assert(continuousSources.length == 1, "only one continuous source supported currently")
assert(sources.length == 1, "only one continuous source supported currently")
val globalOffset = readSupport.mergeOffsets(partitionOffsets.toArray)
val globalOffset = stream.mergeOffsets(partitionOffsets.toArray)
val oldOffset = synchronized {
offsetLog.add(epoch, OffsetSeq.fill(globalOffset))
offsetLog.get(epoch - 1)
@ -329,7 +309,7 @@ class ContinuousExecution(
def commit(epoch: Long): Unit = {
updateStatusMessage(s"Committing epoch $epoch")
assert(continuousSources.length == 1, "only one continuous source supported currently")
assert(sources.length == 1, "only one continuous source supported currently")
assert(offsetLog.get(epoch).isDefined, s"offset for epoch $epoch not reported before commit")
synchronized {
@ -338,9 +318,9 @@ class ContinuousExecution(
if (queryExecutionThread.isAlive) {
commitLog.add(epoch, CommitMetadata())
val offset =
continuousSources(0).deserializeOffset(offsetLog.get(epoch).get.offsets(0).get.json)
committedOffsets ++= Seq(continuousSources(0) -> offset)
continuousSources(0).commit(offset.asInstanceOf[v2.reader.streaming.Offset])
sources(0).deserializeOffset(offsetLog.get(epoch).get.offsets(0).get.json)
committedOffsets ++= Seq(sources(0) -> offset)
sources(0).commit(offset.asInstanceOf[v2.reader.streaming.Offset])
} else {
return
}

View file

@ -22,23 +22,22 @@ import org.json4s.jackson.Serialization
import org.apache.spark.sql.catalyst.InternalRow
import org.apache.spark.sql.catalyst.util.DateTimeUtils
import org.apache.spark.sql.execution.streaming.{RateStreamOffset, SimpleStreamingScanConfig, SimpleStreamingScanConfigBuilder, ValueRunTimeMsPair}
import org.apache.spark.sql.execution.streaming.sources.RateStreamProvider
import org.apache.spark.sql.execution.streaming.{RateStreamOffset, ValueRunTimeMsPair}
import org.apache.spark.sql.sources.v2.DataSourceOptions
import org.apache.spark.sql.sources.v2.reader._
import org.apache.spark.sql.sources.v2.reader.streaming._
import org.apache.spark.sql.types.StructType
case class RateStreamPartitionOffset(
partition: Int, currentValue: Long, currentTimeMs: Long) extends PartitionOffset
class RateStreamContinuousReadSupport(options: DataSourceOptions) extends ContinuousReadSupport {
class RateStreamContinuousStream(
rowsPerSecond: Long,
numPartitions: Int,
options: DataSourceOptions) extends ContinuousStream {
implicit val defaultFormats: DefaultFormats = DefaultFormats
val creationTime = System.currentTimeMillis()
val numPartitions = options.get(RateStreamProvider.NUM_PARTITIONS).orElse("5").toInt
val rowsPerSecond = options.get(RateStreamProvider.ROWS_PER_SECOND).orElse("6").toLong
val perPartitionRate = rowsPerSecond.toDouble / numPartitions.toDouble
override def mergeOffsets(offsets: Array[PartitionOffset]): Offset = {
@ -54,18 +53,10 @@ class RateStreamContinuousReadSupport(options: DataSourceOptions) extends Contin
RateStreamOffset(Serialization.read[Map[Int, ValueRunTimeMsPair]](json))
}
override def fullSchema(): StructType = RateStreamProvider.SCHEMA
override def newScanConfigBuilder(start: Offset): ScanConfigBuilder = {
new SimpleStreamingScanConfigBuilder(fullSchema(), start)
}
override def initialOffset: Offset = createInitialOffset(numPartitions, creationTime)
override def planInputPartitions(config: ScanConfig): Array[InputPartition] = {
val startOffset = config.asInstanceOf[SimpleStreamingScanConfig].start
val partitionStartMap = startOffset match {
override def planInputPartitions(start: Offset): Array[InputPartition] = {
val partitionStartMap = start match {
case off: RateStreamOffset => off.partitionToValueAndRunTimeMs
case off =>
throw new IllegalArgumentException(
@ -91,8 +82,7 @@ class RateStreamContinuousReadSupport(options: DataSourceOptions) extends Contin
}.toArray
}
override def createContinuousReaderFactory(
config: ScanConfig): ContinuousPartitionReaderFactory = {
override def createContinuousReaderFactory(): ContinuousPartitionReaderFactory = {
RateStreamContinuousReaderFactory
}

View file

@ -31,37 +31,29 @@ import org.json4s.jackson.Serialization
import org.apache.spark.SparkEnv
import org.apache.spark.internal.Logging
import org.apache.spark.rpc.RpcEndpointRef
import org.apache.spark.sql._
import org.apache.spark.sql.catalyst.InternalRow
import org.apache.spark.sql.execution.streaming.{Offset => _, _}
import org.apache.spark.sql.execution.streaming.sources.TextSocketReader
import org.apache.spark.sql.sources.v2.DataSourceOptions
import org.apache.spark.sql.sources.v2.reader._
import org.apache.spark.sql.sources.v2.reader.streaming._
import org.apache.spark.sql.types.StructType
import org.apache.spark.util.RpcUtils
/**
* A ContinuousReadSupport that reads text lines through a TCP socket, designed only for tutorials
* and debugging. This ContinuousReadSupport will *not* work in production applications due to
* A [[ContinuousStream]] that reads text lines through a TCP socket, designed only for tutorials
* and debugging. This ContinuousStream will *not* work in production applications due to
* multiple reasons, including no support for fault recovery.
*
* The driver maintains a socket connection to the host-port, keeps the received messages in
* buckets and serves the messages to the executors via a RPC endpoint.
*/
class TextSocketContinuousReadSupport(options: DataSourceOptions)
extends ContinuousReadSupport with Logging {
class TextSocketContinuousStream(
host: String, port: Int, numPartitions: Int, options: DataSourceOptions)
extends ContinuousStream with Logging {
implicit val defaultFormats: DefaultFormats = DefaultFormats
private val host: String = options.get("host").get()
private val port: Int = options.get("port").get().toInt
assert(SparkSession.getActiveSession.isDefined)
private val spark = SparkSession.getActiveSession.get
private val numPartitions = spark.sparkContext.defaultParallelism
@GuardedBy("this")
private var socket: Socket = _
@ -101,21 +93,9 @@ class TextSocketContinuousReadSupport(options: DataSourceOptions)
startOffset
}
override def newScanConfigBuilder(start: Offset): ScanConfigBuilder = {
new SimpleStreamingScanConfigBuilder(fullSchema(), start)
}
override def fullSchema(): StructType = {
if (includeTimestamp) {
TextSocketReader.SCHEMA_TIMESTAMP
} else {
TextSocketReader.SCHEMA_REGULAR
}
}
override def planInputPartitions(config: ScanConfig): Array[InputPartition] = {
val startOffset = config.asInstanceOf[SimpleStreamingScanConfig]
.start.asInstanceOf[TextSocketOffset]
override def planInputPartitions(start: Offset): Array[InputPartition] = {
val startOffset = start.asInstanceOf[TextSocketOffset]
recordEndpoint.setStartOffsets(startOffset.offsets)
val endpointName = s"TextSocketContinuousReaderEndpoint-${java.util.UUID.randomUUID()}"
endpointRef = recordEndpoint.rpcEnv.setupEndpoint(endpointName, recordEndpoint)
@ -140,8 +120,7 @@ class TextSocketContinuousReadSupport(options: DataSourceOptions)
}.toArray
}
override def createContinuousReaderFactory(
config: ScanConfig): ContinuousPartitionReaderFactory = {
override def createContinuousReaderFactory(): ContinuousPartitionReaderFactory = {
TextSocketReaderFactory
}
@ -197,7 +176,7 @@ class TextSocketContinuousReadSupport(options: DataSourceOptions)
logWarning(s"Stream closed by $host:$port")
return
}
TextSocketContinuousReadSupport.this.synchronized {
TextSocketContinuousStream.this.synchronized {
currentOffset += 1
val newData = (line,
Timestamp.valueOf(

View file

@ -23,7 +23,7 @@ import org.apache.spark.SparkEnv
import org.apache.spark.internal.Logging
import org.apache.spark.rpc.{RpcCallContext, RpcEndpointRef, RpcEnv, ThreadSafeRpcEndpoint}
import org.apache.spark.sql.SparkSession
import org.apache.spark.sql.sources.v2.reader.streaming.{ContinuousReadSupport, PartitionOffset}
import org.apache.spark.sql.sources.v2.reader.streaming.{ContinuousStream, PartitionOffset}
import org.apache.spark.sql.sources.v2.writer.WriterCommitMessage
import org.apache.spark.sql.sources.v2.writer.streaming.StreamingWriteSupport
import org.apache.spark.util.RpcUtils
@ -83,14 +83,14 @@ private[sql] object EpochCoordinatorRef extends Logging {
*/
def create(
writeSupport: StreamingWriteSupport,
readSupport: ContinuousReadSupport,
stream: ContinuousStream,
query: ContinuousExecution,
epochCoordinatorId: String,
startEpoch: Long,
session: SparkSession,
env: SparkEnv): RpcEndpointRef = synchronized {
val coordinator = new EpochCoordinator(
writeSupport, readSupport, query, startEpoch, session, env.rpcEnv)
writeSupport, stream, query, startEpoch, session, env.rpcEnv)
val ref = env.rpcEnv.setupEndpoint(endpointName(epochCoordinatorId), coordinator)
logInfo("Registered EpochCoordinator endpoint")
ref
@ -116,7 +116,7 @@ private[sql] object EpochCoordinatorRef extends Logging {
*/
private[continuous] class EpochCoordinator(
writeSupport: StreamingWriteSupport,
readSupport: ContinuousReadSupport,
stream: ContinuousStream,
query: ContinuousExecution,
startEpoch: Long,
session: SparkSession,
@ -220,7 +220,7 @@ private[continuous] class EpochCoordinator(
partitionOffsets.collect { case ((e, _), o) if e == epoch => o }
if (thisEpochOffsets.size == numReaderPartitions) {
logDebug(s"Epoch $epoch has offsets reported from all partitions: $thisEpochOffsets")
query.addOffset(epoch, readSupport, thisEpochOffsets.toSeq)
query.addOffset(epoch, stream, thisEpochOffsets.toSeq)
resolveCommitsAtEpoch(epoch)
}
}

View file

@ -33,9 +33,9 @@ import org.apache.spark.sql.catalyst.plans.logical.statsEstimation.EstimationUti
import org.apache.spark.sql.catalyst.streaming.InternalOutputModes._
import org.apache.spark.sql.catalyst.util.truncatedString
import org.apache.spark.sql.internal.SQLConf
import org.apache.spark.sql.sources.v2.{DataSourceOptions, SupportsMicroBatchRead, Table, TableProvider}
import org.apache.spark.sql.sources.v2._
import org.apache.spark.sql.sources.v2.reader._
import org.apache.spark.sql.sources.v2.reader.streaming.{MicroBatchStream, Offset => OffsetV2}
import org.apache.spark.sql.sources.v2.reader.streaming.{ContinuousStream, MicroBatchStream, Offset => OffsetV2}
import org.apache.spark.sql.streaming.OutputMode
import org.apache.spark.sql.types.StructType
@ -68,7 +68,15 @@ abstract class MemoryStreamBase[A : Encoder](sqlContext: SQLContext) extends Bas
def fullSchema(): StructType = encoder.schema
protected def logicalPlan: LogicalPlan
protected val logicalPlan: LogicalPlan = {
StreamingRelationV2(
MemoryStreamTableProvider,
"memory",
new MemoryStreamTable(this),
Map.empty,
attributes,
None)(sqlContext.sparkSession)
}
def addData(data: TraversableOnce[A]): Offset
}
@ -81,7 +89,8 @@ object MemoryStreamTableProvider extends TableProvider {
}
}
class MemoryStreamTable(val stream: MemoryStreamBase[_]) extends Table with SupportsMicroBatchRead {
class MemoryStreamTable(val stream: MemoryStreamBase[_]) extends Table
with SupportsMicroBatchRead with SupportsContinuousRead {
override def name(): String = "MemoryStreamDataSource"
@ -101,7 +110,11 @@ class MemoryStreamScanBuilder(stream: MemoryStreamBase[_]) extends ScanBuilder w
override def readSchema(): StructType = stream.fullSchema()
override def toMicroBatchStream(checkpointLocation: String): MicroBatchStream = {
stream.asInstanceOf[MemoryStream[_]]
stream.asInstanceOf[MicroBatchStream]
}
override def toContinuousStream(checkpointLocation: String): ContinuousStream = {
stream.asInstanceOf[ContinuousStream]
}
}
@ -113,16 +126,6 @@ class MemoryStreamScanBuilder(stream: MemoryStreamBase[_]) extends ScanBuilder w
case class MemoryStream[A : Encoder](id: Int, sqlContext: SQLContext)
extends MemoryStreamBase[A](sqlContext) with MicroBatchStream with Logging {
protected val logicalPlan: LogicalPlan = {
StreamingRelationV2(
MemoryStreamTableProvider,
"memory",
new MemoryStreamTable(this),
Map.empty,
attributes,
None)(sqlContext.sparkSession)
}
protected val output = logicalPlan.output
/**

View file

@ -30,8 +30,7 @@ import org.apache.spark.rpc.RpcEndpointRef
import org.apache.spark.sql.{Encoder, SQLContext}
import org.apache.spark.sql.catalyst.InternalRow
import org.apache.spark.sql.execution.streaming.{Offset => _, _}
import org.apache.spark.sql.sources.v2.{ContinuousReadSupportProvider, DataSourceOptions}
import org.apache.spark.sql.sources.v2.reader.{InputPartition, ScanConfig, ScanConfigBuilder}
import org.apache.spark.sql.sources.v2.reader.InputPartition
import org.apache.spark.sql.sources.v2.reader.streaming._
import org.apache.spark.util.RpcUtils
@ -44,15 +43,10 @@ import org.apache.spark.util.RpcUtils
* the specified offset within the list, or null if that offset doesn't yet have a record.
*/
class ContinuousMemoryStream[A : Encoder](id: Int, sqlContext: SQLContext, numPartitions: Int = 2)
extends MemoryStreamBase[A](sqlContext)
with ContinuousReadSupportProvider with ContinuousReadSupport {
extends MemoryStreamBase[A](sqlContext) with ContinuousStream {
private implicit val formats = Serialization.formats(NoTypeHints)
protected val logicalPlan =
// TODO: don't pass null as table after finish API refactor for continuous stream.
StreamingRelationV2(this, "memory", null, Map(), attributes, None)(sqlContext.sparkSession)
// ContinuousReader implementation
@GuardedBy("this")
@ -87,13 +81,9 @@ class ContinuousMemoryStream[A : Encoder](id: Int, sqlContext: SQLContext, numPa
)
}
override def newScanConfigBuilder(start: Offset): ScanConfigBuilder = {
new SimpleStreamingScanConfigBuilder(fullSchema(), start)
}
override def planInputPartitions(config: ScanConfig): Array[InputPartition] = {
val startOffset = config.asInstanceOf[SimpleStreamingScanConfig]
.start.asInstanceOf[ContinuousMemoryStreamOffset]
override def planInputPartitions(start: Offset): Array[InputPartition] = {
val startOffset = start.asInstanceOf[ContinuousMemoryStreamOffset]
synchronized {
val endpointName = s"ContinuousMemoryStreamRecordEndpoint-${java.util.UUID.randomUUID()}-$id"
endpointRef =
@ -105,8 +95,7 @@ class ContinuousMemoryStream[A : Encoder](id: Int, sqlContext: SQLContext, numPa
}
}
override def createContinuousReaderFactory(
config: ScanConfig): ContinuousPartitionReaderFactory = {
override def createContinuousReaderFactory(): ContinuousPartitionReaderFactory = {
ContinuousMemoryStreamReaderFactory
}
@ -115,12 +104,6 @@ class ContinuousMemoryStream[A : Encoder](id: Int, sqlContext: SQLContext, numPa
}
override def commit(end: Offset): Unit = {}
// ContinuousReadSupportProvider implementation
// This is necessary because of how StreamTest finds the source for AddDataMemory steps.
override def createContinuousReadSupport(
checkpointLocation: String,
options: DataSourceOptions): ContinuousReadSupport = this
}
object ContinuousMemoryStream {

View file

@ -19,11 +19,11 @@ package org.apache.spark.sql.execution.streaming.sources
import org.apache.spark.network.util.JavaUtils
import org.apache.spark.sql.SparkSession
import org.apache.spark.sql.execution.streaming.continuous.RateStreamContinuousReadSupport
import org.apache.spark.sql.execution.streaming.continuous.RateStreamContinuousStream
import org.apache.spark.sql.sources.DataSourceRegister
import org.apache.spark.sql.sources.v2._
import org.apache.spark.sql.sources.v2.reader.{Scan, ScanBuilder}
import org.apache.spark.sql.sources.v2.reader.streaming.{ContinuousReadSupport, MicroBatchStream}
import org.apache.spark.sql.sources.v2.reader.streaming.{ContinuousStream, MicroBatchStream}
import org.apache.spark.sql.types._
/**
@ -41,7 +41,7 @@ import org.apache.spark.sql.types._
* be resource constrained, and `numPartitions` can be tweaked to help reach the desired speed.
*/
class RateStreamProvider extends DataSourceV2
with TableProvider with ContinuousReadSupportProvider with DataSourceRegister {
with TableProvider with DataSourceRegister {
import RateStreamProvider._
override def getTable(options: DataSourceOptions): Table = {
@ -68,12 +68,6 @@ class RateStreamProvider extends DataSourceV2
new RateStreamTable(rowsPerSecond, rampUpTimeSeconds, numPartitions)
}
override def createContinuousReadSupport(
checkpointLocation: String,
options: DataSourceOptions): ContinuousReadSupport = {
new RateStreamContinuousReadSupport(options)
}
override def shortName(): String = "rate"
}
@ -81,7 +75,7 @@ class RateStreamTable(
rowsPerSecond: Long,
rampUpTimeSeconds: Long,
numPartitions: Int)
extends Table with SupportsMicroBatchRead {
extends Table with SupportsMicroBatchRead with SupportsContinuousRead {
override def name(): String = {
s"RateStream(rowsPerSecond=$rowsPerSecond, rampUpTimeSeconds=$rampUpTimeSeconds, " +
@ -98,6 +92,10 @@ class RateStreamTable(
new RateStreamMicroBatchStream(
rowsPerSecond, rampUpTimeSeconds, numPartitions, options, checkpointLocation)
}
override def toContinuousStream(checkpointLocation: String): ContinuousStream = {
new RateStreamContinuousStream(rowsPerSecond, numPartitions, options)
}
}
}
}

View file

@ -26,7 +26,6 @@ import javax.annotation.concurrent.GuardedBy
import scala.collection.mutable.ListBuffer
import org.apache.spark.internal.Logging
import org.apache.spark.sql.SparkSession
import org.apache.spark.sql.catalyst.InternalRow
import org.apache.spark.sql.catalyst.util.DateTimeUtils
import org.apache.spark.sql.execution.streaming.LongOffset
@ -40,7 +39,8 @@ import org.apache.spark.unsafe.types.UTF8String
* and debugging. This MicroBatchReadSupport will *not* work in production applications due to
* multiple reasons, including no support for fault recovery.
*/
class TextSocketMicroBatchStream(host: String, port: Int, options: DataSourceOptions)
class TextSocketMicroBatchStream(
host: String, port: Int, numPartitions: Int, options: DataSourceOptions)
extends MicroBatchStream with Logging {
@GuardedBy("this")
@ -124,10 +124,6 @@ class TextSocketMicroBatchStream(host: String, port: Int, options: DataSourceOpt
batches.slice(sliceStart, sliceEnd)
}
assert(SparkSession.getActiveSession.isDefined)
val spark = SparkSession.getActiveSession.get
val numPartitions = spark.sparkContext.defaultParallelism
val slices = Array.fill(numPartitions)(new ListBuffer[(UTF8String, Long)])
rawList.zipWithIndex.foreach { case (r, idx) =>
slices(idx % numPartitions).append(r)

View file

@ -24,16 +24,15 @@ import scala.util.{Failure, Success, Try}
import org.apache.spark.internal.Logging
import org.apache.spark.sql._
import org.apache.spark.sql.execution.streaming.continuous.TextSocketContinuousReadSupport
import org.apache.spark.sql.execution.streaming.continuous.TextSocketContinuousStream
import org.apache.spark.sql.sources.DataSourceRegister
import org.apache.spark.sql.sources.v2._
import org.apache.spark.sql.sources.v2.reader.{Scan, ScanBuilder}
import org.apache.spark.sql.sources.v2.reader.streaming.{ContinuousReadSupport, MicroBatchStream}
import org.apache.spark.sql.sources.v2.reader.streaming.{ContinuousStream, MicroBatchStream}
import org.apache.spark.sql.types.{StringType, StructField, StructType, TimestampType}
class TextSocketSourceProvider extends DataSourceV2
with TableProvider with ContinuousReadSupportProvider
with DataSourceRegister with Logging {
with TableProvider with DataSourceRegister with Logging {
private def checkParameters(params: DataSourceOptions): Unit = {
logWarning("The socket source should not be used for production applications! " +
@ -58,22 +57,16 @@ class TextSocketSourceProvider extends DataSourceV2
new TextSocketTable(
options.get("host").get,
options.getInt("port", -1),
options.getInt("numPartitions", SparkSession.active.sparkContext.defaultParallelism),
options.getBoolean("includeTimestamp", false))
}
override def createContinuousReadSupport(
checkpointLocation: String,
options: DataSourceOptions): ContinuousReadSupport = {
checkParameters(options)
new TextSocketContinuousReadSupport(options)
}
/** String that represents the format that this data source provider uses. */
override def shortName(): String = "socket"
}
class TextSocketTable(host: String, port: Int, includeTimestamp: Boolean)
extends Table with SupportsMicroBatchRead {
class TextSocketTable(host: String, port: Int, numPartitions: Int, includeTimestamp: Boolean)
extends Table with SupportsMicroBatchRead with SupportsContinuousRead {
override def name(): String = s"Socket[$host:$port]"
@ -90,7 +83,11 @@ class TextSocketTable(host: String, port: Int, includeTimestamp: Boolean)
override def readSchema(): StructType = schema()
override def toMicroBatchStream(checkpointLocation: String): MicroBatchStream = {
new TextSocketMicroBatchStream(host, port, options)
new TextSocketMicroBatchStream(host, port, numPartitions, options)
}
override def toContinuousStream(checkpointLocation: String): ContinuousStream = {
new TextSocketContinuousStream(host, port, numPartitions, options)
}
}
}

View file

@ -30,9 +30,7 @@ import org.apache.spark.sql.execution.datasources.v2.DataSourceV2Utils
import org.apache.spark.sql.execution.streaming.{StreamingRelation, StreamingRelationV2}
import org.apache.spark.sql.sources.StreamSourceProvider
import org.apache.spark.sql.sources.v2._
import org.apache.spark.sql.sources.v2.reader.streaming.ContinuousReadSupport
import org.apache.spark.sql.types.StructType
import org.apache.spark.util.Utils
/**
* Interface used to load a streaming `Dataset` from external storage systems (e.g. file systems,
@ -183,39 +181,12 @@ final class DataStreamReader private[sql](sparkSession: SparkSession) extends Lo
case _ => provider.getTable(dsOptions)
}
table match {
case s: SupportsMicroBatchRead =>
case _: SupportsMicroBatchRead | _: SupportsContinuousRead =>
Dataset.ofRows(
sparkSession,
StreamingRelationV2(
provider, source, s, options,
table.schema.toAttributes, v1Relation)(sparkSession))
case _ if ds.isInstanceOf[ContinuousReadSupportProvider] =>
val provider = ds.asInstanceOf[ContinuousReadSupportProvider]
var tempReadSupport: ContinuousReadSupport = null
val schema = try {
val tmpCheckpointPath = Utils.createTempDir(namePrefix = s"tempCP").getCanonicalPath
tempReadSupport = if (userSpecifiedSchema.isDefined) {
provider.createContinuousReadSupport(
userSpecifiedSchema.get, tmpCheckpointPath, dsOptions)
} else {
provider.createContinuousReadSupport(tmpCheckpointPath, dsOptions)
}
tempReadSupport.fullSchema()
} finally {
// Stop tempReader to avoid side-effect thing
if (tempReadSupport != null) {
tempReadSupport.stop()
tempReadSupport = null
}
}
Dataset.ofRows(
sparkSession,
// TODO: do not pass null as table after finish the API refactor for continuous
// stream.
StreamingRelationV2(
provider, source, table = null, options,
schema.toAttributes, v1Relation)(sparkSession))
provider, source, table, options, table.schema.toAttributes, v1Relation)(
sparkSession))
// fallback to v1
case _ => Dataset.ofRows(sparkSession, StreamingRelation(v1DataSource))

View file

@ -29,7 +29,7 @@ import org.apache.spark.sql.execution.datasources.v2.StreamingDataSourceV2Relati
import org.apache.spark.sql.execution.streaming._
import org.apache.spark.sql.execution.streaming.continuous._
import org.apache.spark.sql.functions._
import org.apache.spark.sql.sources.v2.{ContinuousReadSupportProvider, DataSourceOptions}
import org.apache.spark.sql.sources.v2.DataSourceOptions
import org.apache.spark.sql.sources.v2.reader.streaming.Offset
import org.apache.spark.sql.streaming.StreamTest
import org.apache.spark.util.ManualClock
@ -308,30 +308,17 @@ class RateStreamProviderSuite extends StreamTest {
"rate source does not support user-specified schema"))
}
test("continuous in registry") {
DataSource.lookupDataSource("rate", spark.sqlContext.conf).
getConstructor().newInstance() match {
case ds: ContinuousReadSupportProvider =>
val readSupport = ds.createContinuousReadSupport(
"", DataSourceOptions.empty())
assert(readSupport.isInstanceOf[RateStreamContinuousReadSupport])
case _ =>
throw new IllegalStateException("Could not find read support for continuous rate")
}
}
test("continuous data") {
val readSupport = new RateStreamContinuousReadSupport(
new DataSourceOptions(Map("numPartitions" -> "2", "rowsPerSecond" -> "20").asJava))
val config = readSupport.newScanConfigBuilder(readSupport.initialOffset).build()
val tasks = readSupport.planInputPartitions(config)
val readerFactory = readSupport.createContinuousReaderFactory(config)
assert(tasks.size == 2)
val stream = new RateStreamContinuousStream(
rowsPerSecond = 20, numPartitions = 2, options = DataSourceOptions.empty())
val partitions = stream.planInputPartitions(stream.initialOffset)
val readerFactory = stream.createContinuousReaderFactory()
assert(partitions.size == 2)
val data = scala.collection.mutable.ListBuffer[InternalRow]()
tasks.foreach {
partitions.foreach {
case t: RateStreamContinuousInputPartition =>
val startTimeMs = readSupport.initialOffset()
val startTimeMs = stream.initialOffset()
.asInstanceOf[RateStreamOffset]
.partitionToValueAndRunTimeMs(t.partitionIndex)
.runTimeMs

View file

@ -29,6 +29,7 @@ import org.scalatest.BeforeAndAfterEach
import org.apache.spark.internal.Logging
import org.apache.spark.sql.AnalysisException
import org.apache.spark.sql.catalyst.InternalRow
import org.apache.spark.sql.execution.datasources.DataSource
import org.apache.spark.sql.execution.datasources.v2.StreamingDataSourceV2Relation
import org.apache.spark.sql.execution.streaming._
@ -294,25 +295,25 @@ class TextSocketStreamSuite extends StreamTest with SharedSQLContext with Before
serverThread = new ServerThread()
serverThread.start()
val readSupport = new TextSocketContinuousReadSupport(
new DataSourceOptions(Map("numPartitions" -> "2", "host" -> "localhost",
"port" -> serverThread.port.toString).asJava))
val scanConfig = readSupport.newScanConfigBuilder(readSupport.initialOffset()).build()
val tasks = readSupport.planInputPartitions(scanConfig)
assert(tasks.size == 2)
val stream = new TextSocketContinuousStream(
host = "localhost",
port = serverThread.port,
numPartitions = 2,
options = DataSourceOptions.empty())
val partitions = stream.planInputPartitions(stream.initialOffset())
assert(partitions.length == 2)
val numRecords = 10
val data = scala.collection.mutable.ListBuffer[Int]()
val offsets = scala.collection.mutable.ListBuffer[Int]()
val readerFactory = readSupport.createContinuousReaderFactory(scanConfig)
val readerFactory = stream.createContinuousReaderFactory()
import org.scalatest.time.SpanSugar._
failAfter(5 seconds) {
// inject rows, read and check the data and offsets
for (i <- 0 until numRecords) {
serverThread.enqueue(i.toString)
}
tasks.foreach {
partitions.foreach {
case t: TextSocketContinuousInputPartition =>
val r = readerFactory.createReader(t).asInstanceOf[TextSocketContinuousPartitionReader]
for (i <- 0 until numRecords / 2) {
@ -330,15 +331,15 @@ class TextSocketStreamSuite extends StreamTest with SharedSQLContext with Before
data.clear()
case _ => throw new IllegalStateException("Unexpected task type")
}
assert(readSupport.startOffset.offsets == List(3, 3))
readSupport.commit(TextSocketOffset(List(5, 5)))
assert(readSupport.startOffset.offsets == List(5, 5))
assert(stream.startOffset.offsets == List(3, 3))
stream.commit(TextSocketOffset(List(5, 5)))
assert(stream.startOffset.offsets == List(5, 5))
}
def commitOffset(partition: Int, offset: Int): Unit = {
val offsetsToCommit = readSupport.startOffset.offsets.updated(partition, offset)
readSupport.commit(TextSocketOffset(offsetsToCommit))
assert(readSupport.startOffset.offsets == offsetsToCommit)
val offsetsToCommit = stream.startOffset.offsets.updated(partition, offset)
stream.commit(TextSocketOffset(offsetsToCommit))
assert(stream.startOffset.offsets == offsetsToCommit)
}
}
@ -346,13 +347,15 @@ class TextSocketStreamSuite extends StreamTest with SharedSQLContext with Before
serverThread = new ServerThread()
serverThread.start()
val readSupport = new TextSocketContinuousReadSupport(
new DataSourceOptions(Map("numPartitions" -> "2", "host" -> "localhost",
"port" -> serverThread.port.toString).asJava))
val stream = new TextSocketContinuousStream(
host = "localhost",
port = serverThread.port,
numPartitions = 2,
options = DataSourceOptions.empty())
readSupport.startOffset = TextSocketOffset(List(5, 5))
stream.startOffset = TextSocketOffset(List(5, 5))
assertThrows[IllegalStateException] {
readSupport.commit(TextSocketOffset(List(6, 6)))
stream.commit(TextSocketOffset(List(6, 6)))
}
}
@ -360,21 +363,21 @@ class TextSocketStreamSuite extends StreamTest with SharedSQLContext with Before
serverThread = new ServerThread()
serverThread.start()
val readSupport = new TextSocketContinuousReadSupport(
new DataSourceOptions(Map("numPartitions" -> "2", "host" -> "localhost",
"includeTimestamp" -> "true",
"port" -> serverThread.port.toString).asJava))
val scanConfig = readSupport.newScanConfigBuilder(readSupport.initialOffset()).build()
val tasks = readSupport.planInputPartitions(scanConfig)
assert(tasks.size == 2)
val stream = new TextSocketContinuousStream(
host = "localhost",
port = serverThread.port,
numPartitions = 2,
options = new DataSourceOptions(Map("includeTimestamp" -> "true").asJava))
val partitions = stream.planInputPartitions(stream.initialOffset())
assert(partitions.size == 2)
val numRecords = 4
// inject rows, read and check the data and offsets
for (i <- 0 until numRecords) {
serverThread.enqueue(i.toString)
}
val readerFactory = readSupport.createContinuousReaderFactory(scanConfig)
tasks.foreach {
val readerFactory = stream.createContinuousReaderFactory()
partitions.foreach {
case t: TextSocketContinuousInputPartition =>
val r = readerFactory.createReader(t).asInstanceOf[TextSocketContinuousPartitionReader]
for (_ <- 0 until numRecords / 2) {

View file

@ -24,7 +24,7 @@ import test.org.apache.spark.sql.sources.v2._
import org.apache.spark.SparkException
import org.apache.spark.sql.{DataFrame, QueryTest, Row}
import org.apache.spark.sql.catalyst.InternalRow
import org.apache.spark.sql.execution.datasources.v2.{DataSourceV2Relation, DataSourceV2ScanExec}
import org.apache.spark.sql.execution.datasources.v2.{BatchScanExec, DataSourceV2Relation}
import org.apache.spark.sql.execution.exchange.{Exchange, ShuffleExchangeExec}
import org.apache.spark.sql.execution.vectorized.OnHeapColumnVector
import org.apache.spark.sql.functions._
@ -40,14 +40,14 @@ class DataSourceV2Suite extends QueryTest with SharedSQLContext {
private def getBatch(query: DataFrame): AdvancedBatch = {
query.queryExecution.executedPlan.collect {
case d: DataSourceV2ScanExec =>
case d: BatchScanExec =>
d.batch.asInstanceOf[AdvancedBatch]
}.head
}
private def getJavaBatch(query: DataFrame): JavaAdvancedDataSourceV2.AdvancedBatch = {
query.queryExecution.executedPlan.collect {
case d: DataSourceV2ScanExec =>
case d: BatchScanExec =>
d.batch.asInstanceOf[JavaAdvancedDataSourceV2.AdvancedBatch]
}.head
}
@ -309,7 +309,7 @@ class DataSourceV2Suite extends QueryTest with SharedSQLContext {
assert(logical.canonicalized.output.length == logicalNumOutput)
val physical = df.queryExecution.executedPlan.collect {
case d: DataSourceV2ScanExec => d
case d: BatchScanExec => d
}.head
assert(physical.canonicalized.output.length == physicalNumOutput)
}

View file

@ -495,7 +495,7 @@ class StreamSuite extends StreamTest {
// `extended = false` only displays the physical plan.
assert("StreamingDataSourceV2Relation".r
.findAllMatchIn(explainWithoutExtended).size === 0)
assert("ScanV2".r
assert("BatchScan".r
.findAllMatchIn(explainWithoutExtended).size === 1)
// Use "StateStoreRestore" to verify that it does output a streaming physical plan
assert(explainWithoutExtended.contains("StateStoreRestore"))
@ -505,7 +505,7 @@ class StreamSuite extends StreamTest {
// plan.
assert("StreamingDataSourceV2Relation".r
.findAllMatchIn(explainWithExtended).size === 3)
assert("ScanV2".r
assert("BatchScan".r
.findAllMatchIn(explainWithExtended).size === 1)
// Use "StateStoreRestore" to verify that it does output a streaming physical plan
assert(explainWithExtended.contains("StateStoreRestore"))
@ -548,17 +548,17 @@ class StreamSuite extends StreamTest {
val explainWithoutExtended = q.explainInternal(false)
// `extended = false` only displays the physical plan.
assert("Streaming RelationV2 ContinuousMemoryStream".r
assert("StreamingDataSourceV2Relation".r
.findAllMatchIn(explainWithoutExtended).size === 0)
assert("ScanV2 ContinuousMemoryStream".r
assert("ContinuousScan".r
.findAllMatchIn(explainWithoutExtended).size === 1)
val explainWithExtended = q.explainInternal(true)
// `extended = true` displays 3 logical plans (Parsed/Optimized/Optimized) and 1 physical
// plan.
assert("Streaming RelationV2 ContinuousMemoryStream".r
assert("StreamingDataSourceV2Relation".r
.findAllMatchIn(explainWithExtended).size === 3)
assert("ScanV2 ContinuousMemoryStream".r
assert("ContinuousScan".r
.findAllMatchIn(explainWithExtended).size === 1)
} finally {
q.stop()

View file

@ -39,12 +39,11 @@ import org.apache.spark.sql.catalyst.encoders.{encoderFor, ExpressionEncoder, Ro
import org.apache.spark.sql.catalyst.plans.logical.LogicalPlan
import org.apache.spark.sql.catalyst.plans.physical.AllTuples
import org.apache.spark.sql.catalyst.util._
import org.apache.spark.sql.execution.datasources.v2.{OldStreamingDataSourceV2Relation, StreamingDataSourceV2Relation}
import org.apache.spark.sql.execution.datasources.v2.StreamingDataSourceV2Relation
import org.apache.spark.sql.execution.streaming._
import org.apache.spark.sql.execution.streaming.continuous.{ContinuousExecution, EpochCoordinatorRef, IncrementAndGetEpoch}
import org.apache.spark.sql.execution.streaming.sources.MemorySinkV2
import org.apache.spark.sql.execution.streaming.state.StateStore
import org.apache.spark.sql.sources.v2.reader.streaming.ContinuousReadSupport
import org.apache.spark.sql.streaming.StreamingQueryListener._
import org.apache.spark.sql.test.SharedSQLContext
import org.apache.spark.util.{Clock, SystemClock, Utils}
@ -692,16 +691,10 @@ trait StreamTest extends QueryTest with SharedSQLContext with TimeLimits with Be
case r: StreamingExecutionRelation => r.source
// v2 source
case r: StreamingDataSourceV2Relation => r.stream
case r: OldStreamingDataSourceV2Relation => r.readSupport
// We can add data to memory stream before starting it. Then the input plan has
// not been processed by the streaming engine and contains `StreamingRelationV2`.
case r: StreamingRelationV2 if r.sourceName == "memory" =>
// TODO: remove this null hack after finish API refactor for continuous stream.
if (r.table == null) {
r.dataSource.asInstanceOf[ContinuousReadSupport]
} else {
r.table.asInstanceOf[MemoryStreamTable].stream
}
r.table.asInstanceOf[MemoryStreamTable].stream
}
.zipWithIndex
.find(_._1 == source)

View file

@ -36,7 +36,7 @@ import org.apache.spark.sql.execution.streaming._
import org.apache.spark.sql.execution.streaming.sources.TestForeachWriter
import org.apache.spark.sql.functions._
import org.apache.spark.sql.internal.SQLConf
import org.apache.spark.sql.sources.v2.reader.{InputPartition, ScanConfig}
import org.apache.spark.sql.sources.v2.reader.InputPartition
import org.apache.spark.sql.sources.v2.reader.streaming.{Offset => OffsetV2}
import org.apache.spark.sql.streaming.util.{BlockingSource, MockSourceProvider, StreamManualClock}
import org.apache.spark.sql.types.StructType
@ -911,7 +911,7 @@ class StreamingQuerySuite extends StreamTest with BeforeAndAfter with Logging wi
testStream(df, useV2Sink = true)(
StartStream(trigger = Trigger.Continuous(100)),
AssertOnQuery(_.logicalPlan.toJSON.contains("ContinuousExecutionRelation"))
AssertOnQuery(_.logicalPlan.toJSON.contains("StreamingDataSourceV2Relation"))
)
}

View file

@ -27,7 +27,7 @@ import org.apache.spark.rpc.RpcEndpointRef
import org.apache.spark.sql.catalyst.InternalRow
import org.apache.spark.sql.catalyst.expressions.{GenericInternalRow, UnsafeProjection, UnsafeRow}
import org.apache.spark.sql.execution.streaming.continuous._
import org.apache.spark.sql.sources.v2.reader.streaming.{ContinuousPartitionReader, ContinuousReadSupport, PartitionOffset}
import org.apache.spark.sql.sources.v2.reader.streaming.{ContinuousPartitionReader, ContinuousStream, PartitionOffset}
import org.apache.spark.sql.sources.v2.writer.streaming.StreamingWriteSupport
import org.apache.spark.sql.streaming.StreamTest
import org.apache.spark.sql.types.{DataType, IntegerType, StructType}
@ -44,7 +44,7 @@ class ContinuousQueuedDataReaderSuite extends StreamTest with MockitoSugar {
super.beforeEach()
epochEndpoint = EpochCoordinatorRef.create(
mock[StreamingWriteSupport],
mock[ContinuousReadSupport],
mock[ContinuousStream],
mock[ContinuousExecution],
coordinatorId,
startEpoch,

View file

@ -41,7 +41,7 @@ class ContinuousSuiteBase extends StreamTest {
case s: ContinuousExecution =>
assert(numTriggers >= 2, "must wait for at least 2 triggers to ensure query is initialized")
val reader = s.lastExecution.executedPlan.collectFirst {
case ContinuousScanExec(_, _, _, _, r: RateStreamContinuousReadSupport, _) => r
case ContinuousScanExec(_, _, r: RateStreamContinuousStream, _) => r
}.get
val deltaMs = numTriggers * 1000 + 300

View file

@ -27,7 +27,7 @@ import org.apache.spark._
import org.apache.spark.rpc.RpcEndpointRef
import org.apache.spark.sql.LocalSparkSession
import org.apache.spark.sql.execution.streaming.continuous._
import org.apache.spark.sql.sources.v2.reader.streaming.{ContinuousReadSupport, PartitionOffset}
import org.apache.spark.sql.sources.v2.reader.streaming.{ContinuousStream, PartitionOffset}
import org.apache.spark.sql.sources.v2.writer.WriterCommitMessage
import org.apache.spark.sql.sources.v2.writer.streaming.StreamingWriteSupport
import org.apache.spark.sql.test.TestSparkSession
@ -45,7 +45,7 @@ class EpochCoordinatorSuite
private var orderVerifier: InOrder = _
override def beforeEach(): Unit = {
val reader = mock[ContinuousReadSupport]
val stream = mock[ContinuousStream]
writeSupport = mock[StreamingWriteSupport]
query = mock[ContinuousExecution]
orderVerifier = inOrder(writeSupport, query)
@ -53,7 +53,7 @@ class EpochCoordinatorSuite
spark = new TestSparkSession()
epochCoordinator
= EpochCoordinatorRef.create(writeSupport, reader, query, "test", 1, spark, SparkEnv.get)
= EpochCoordinatorRef.create(writeSupport, stream, query, "test", 1, spark, SparkEnv.get)
}
test("single epoch") {

View file

@ -31,33 +31,23 @@ import org.apache.spark.sql.streaming.{OutputMode, StreamingQuery, StreamTest, T
import org.apache.spark.sql.types.StructType
import org.apache.spark.util.Utils
class FakeDataStream extends MicroBatchStream {
class FakeDataStream extends MicroBatchStream with ContinuousStream {
override def deserializeOffset(json: String): Offset = RateStreamOffset(Map())
override def commit(end: Offset): Unit = {}
override def stop(): Unit = {}
override def initialOffset(): Offset = RateStreamOffset(Map())
override def latestOffset(): Offset = RateStreamOffset(Map())
override def mergeOffsets(offsets: Array[PartitionOffset]): Offset = RateStreamOffset(Map())
override def planInputPartitions(start: Offset, end: Offset): Array[InputPartition] = {
throw new IllegalStateException("fake source - cannot actually read")
}
override def planInputPartitions(start: Offset): Array[InputPartition] = {
throw new IllegalStateException("fake source - cannot actually read")
}
override def createReaderFactory(): PartitionReaderFactory = {
throw new IllegalStateException("fake source - cannot actually read")
}
}
case class FakeReadSupport() extends ContinuousReadSupport {
override def deserializeOffset(json: String): Offset = RateStreamOffset(Map())
override def commit(end: Offset): Unit = {}
override def stop(): Unit = {}
override def mergeOffsets(offsets: Array[PartitionOffset]): Offset = RateStreamOffset(Map())
override def fullSchema(): StructType = StructType(Seq())
override def initialOffset(): Offset = RateStreamOffset(Map())
override def newScanConfigBuilder(start: Offset): ScanConfigBuilder = null
override def createContinuousReaderFactory(
config: ScanConfig): ContinuousPartitionReaderFactory = {
throw new IllegalStateException("fake source - cannot actually read")
}
override def planInputPartitions(config: ScanConfig): Array[InputPartition] = {
override def createContinuousReaderFactory(): ContinuousPartitionReaderFactory = {
throw new IllegalStateException("fake source - cannot actually read")
}
}
@ -66,21 +56,19 @@ class FakeScanBuilder extends ScanBuilder with Scan {
override def build(): Scan = this
override def readSchema(): StructType = StructType(Seq())
override def toMicroBatchStream(checkpointLocation: String): MicroBatchStream = new FakeDataStream
override def toContinuousStream(checkpointLocation: String): ContinuousStream = new FakeDataStream
}
class FakeMicroBatchReadTable extends Table with SupportsMicroBatchRead {
trait FakeMicroBatchReadTable extends Table with SupportsMicroBatchRead {
override def name(): String = "fake"
override def schema(): StructType = StructType(Seq())
override def newScanBuilder(options: DataSourceOptions): ScanBuilder = new FakeScanBuilder
}
trait FakeContinuousReadSupportProvider extends ContinuousReadSupportProvider {
override def createContinuousReadSupport(
checkpointLocation: String,
options: DataSourceOptions): ContinuousReadSupport = {
LastReadOptions.options = options
FakeReadSupport()
}
trait FakeContinuousReadTable extends Table with SupportsContinuousRead {
override def name(): String = "fake"
override def schema(): StructType = StructType(Seq())
override def newScanBuilder(options: DataSourceOptions): ScanBuilder = new FakeScanBuilder
}
trait FakeStreamingWriteSupportProvider extends StreamingWriteSupportProvider {
@ -111,27 +99,34 @@ class FakeReadMicroBatchOnly
class FakeReadContinuousOnly
extends DataSourceRegister
with TableProvider
with FakeContinuousReadSupportProvider
with SessionConfigSupport {
override def shortName(): String = "fake-read-continuous-only"
override def keyPrefix: String = shortName()
override def getTable(options: DataSourceOptions): Table = new Table {
override def schema(): StructType = StructType(Seq())
override def name(): String = "fake"
override def getTable(options: DataSourceOptions): Table = {
LastReadOptions.options = options
new FakeContinuousReadTable {}
}
}
class FakeReadBothModes extends DataSourceRegister
with TableProvider with FakeContinuousReadSupportProvider {
class FakeReadBothModes extends DataSourceRegister with TableProvider {
override def shortName(): String = "fake-read-microbatch-continuous"
override def getTable(options: DataSourceOptions): Table = new FakeMicroBatchReadTable {}
override def getTable(options: DataSourceOptions): Table = {
new Table with FakeMicroBatchReadTable with FakeContinuousReadTable {}
}
}
class FakeReadNeitherMode extends DataSourceRegister {
class FakeReadNeitherMode extends DataSourceRegister with TableProvider {
override def shortName(): String = "fake-read-neither-mode"
override def getTable(options: DataSourceOptions): Table = {
new Table {
override def name(): String = "fake"
override def schema(): StructType = StructType(Nil)
}
}
}
class FakeWriteSupportProvider
@ -324,33 +319,25 @@ class StreamingDataSourceV2Suite extends StreamTest {
for ((read, write, trigger) <- cases) {
testQuietly(s"stream with read format $read, write format $write, trigger $trigger") {
val readSource = DataSource.lookupDataSource(read, spark.sqlContext.conf).
getConstructor().newInstance()
val table = DataSource.lookupDataSource(read, spark.sqlContext.conf).getConstructor()
.newInstance().asInstanceOf[TableProvider].getTable(DataSourceOptions.empty())
val writeSource = DataSource.lookupDataSource(write, spark.sqlContext.conf).
getConstructor().newInstance()
def isMicroBatch(ds: Any): Boolean = ds match {
case provider: TableProvider =>
val table = provider.getTable(DataSourceOptions.empty())
table.isInstanceOf[SupportsMicroBatchRead]
case _ => false
}
(readSource, writeSource, trigger) match {
(table, writeSource, trigger) match {
// Valid microbatch queries.
case (_: TableProvider, _: StreamingWriteSupportProvider, t)
if isMicroBatch(readSource) && !t.isInstanceOf[ContinuousTrigger] =>
case (_: SupportsMicroBatchRead, _: StreamingWriteSupportProvider, t)
if !t.isInstanceOf[ContinuousTrigger] =>
testPositiveCase(read, write, trigger)
// Valid continuous queries.
case (_: ContinuousReadSupportProvider, _: StreamingWriteSupportProvider,
case (_: SupportsContinuousRead, _: StreamingWriteSupportProvider,
_: ContinuousTrigger) =>
testPositiveCase(read, write, trigger)
// Invalid - can't read at all
case (r, _, _)
if !r.isInstanceOf[TableProvider]
&& !r.isInstanceOf[ContinuousReadSupportProvider] =>
case (r, _, _) if !r.isInstanceOf[SupportsMicroBatchRead] &&
!r.isInstanceOf[SupportsContinuousRead] =>
testNegativeCase(read, write, trigger,
s"Data source $read does not support streamed reading")
@ -361,14 +348,13 @@ class StreamingDataSourceV2Suite extends StreamTest {
// Invalid - trigger is continuous but reader is not
case (r, _: StreamingWriteSupportProvider, _: ContinuousTrigger)
if !r.isInstanceOf[ContinuousReadSupportProvider] =>
if !r.isInstanceOf[SupportsContinuousRead] =>
testNegativeCase(read, write, trigger,
s"Data source $read does not support continuous processing")
// Invalid - trigger is microbatch but reader is not
case (r, _, t)
if !isMicroBatch(r) &&
!t.isInstanceOf[ContinuousTrigger] =>
case (r, _, t) if !r.isInstanceOf[SupportsMicroBatchRead] &&
!t.isInstanceOf[ContinuousTrigger] =>
testPostCreationNegativeCase(read, write, trigger,
s"Data source $read does not support microbatch processing")
}