[SPARK-24073][SQL] Rename DataReaderFactory to InputPartition.

## What changes were proposed in this pull request?

Renames:
* `DataReaderFactory` to `InputPartition`
* `DataReader` to `InputPartitionReader`
* `createDataReaderFactories` to `planInputPartitions`
* `createUnsafeDataReaderFactories` to `planUnsafeInputPartitions`
* `createBatchDataReaderFactories` to `planBatchInputPartitions`

This fixes the changes in SPARK-23219, which renamed ReadTask to
DataReaderFactory. The intent of that change was to make the read and
write API match (write side uses DataWriterFactory), but the underlying
problem is that the two classes are not equivalent.

ReadTask/DataReader function as Iterable/Iterator. One InputPartition is
a specific partition of the data to be read, in contrast to
DataWriterFactory where the same factory instance is used in all write
tasks. InputPartition's purpose is to manage the lifecycle of the
associated reader, which is now called InputPartitionReader, with an
explicit create operation to mirror the close operation. This was no
longer clear from the API because DataReaderFactory appeared to be more
generic than it is and it isn't clear why a set of them is produced for
a read.

## How was this patch tested?

Existing tests, which have been updated to use the new name.

Author: Ryan Blue <blue@apache.org>

Closes #21145 from rdblue/SPARK-24073-revert-data-reader-factory-rename.
This commit is contained in:
Ryan Blue 2018-05-09 21:48:54 -07:00 committed by gatorsmile
parent 9341c951e8
commit 62d01391fe
39 changed files with 273 additions and 264 deletions

View file

@ -29,7 +29,7 @@ import org.apache.spark.sql.SparkSession
import org.apache.spark.sql.catalyst.expressions.UnsafeRow
import org.apache.spark.sql.kafka010.KafkaSourceProvider.{INSTRUCTION_FOR_FAIL_ON_DATA_LOSS_FALSE, INSTRUCTION_FOR_FAIL_ON_DATA_LOSS_TRUE}
import org.apache.spark.sql.sources.v2.reader._
import org.apache.spark.sql.sources.v2.reader.streaming.{ContinuousDataReader, ContinuousReader, Offset, PartitionOffset}
import org.apache.spark.sql.sources.v2.reader.streaming.{ContinuousInputPartitionReader, ContinuousReader, Offset, PartitionOffset}
import org.apache.spark.sql.types.StructType
/**
@ -86,7 +86,7 @@ class KafkaContinuousReader(
KafkaSourceOffset(JsonUtils.partitionOffsets(json))
}
override def createUnsafeRowReaderFactories(): ju.List[DataReaderFactory[UnsafeRow]] = {
override def planUnsafeInputPartitions(): ju.List[InputPartition[UnsafeRow]] = {
import scala.collection.JavaConverters._
val oldStartPartitionOffsets = KafkaSourceOffset.getPartitionOffsets(offset)
@ -108,7 +108,7 @@ class KafkaContinuousReader(
case (topicPartition, start) =>
KafkaContinuousDataReaderFactory(
topicPartition, start, kafkaParams, pollTimeoutMs, failOnDataLoss)
.asInstanceOf[DataReaderFactory[UnsafeRow]]
.asInstanceOf[InputPartition[UnsafeRow]]
}.asJava
}
@ -161,18 +161,18 @@ case class KafkaContinuousDataReaderFactory(
startOffset: Long,
kafkaParams: ju.Map[String, Object],
pollTimeoutMs: Long,
failOnDataLoss: Boolean) extends ContinuousDataReaderFactory[UnsafeRow] {
failOnDataLoss: Boolean) extends ContinuousInputPartition[UnsafeRow] {
override def createDataReaderWithOffset(offset: PartitionOffset): DataReader[UnsafeRow] = {
override def createContinuousReader(offset: PartitionOffset): InputPartitionReader[UnsafeRow] = {
val kafkaOffset = offset.asInstanceOf[KafkaSourcePartitionOffset]
require(kafkaOffset.topicPartition == topicPartition,
s"Expected topicPartition: $topicPartition, but got: ${kafkaOffset.topicPartition}")
new KafkaContinuousDataReader(
new KafkaContinuousInputPartitionReader(
topicPartition, kafkaOffset.partitionOffset, kafkaParams, pollTimeoutMs, failOnDataLoss)
}
override def createDataReader(): KafkaContinuousDataReader = {
new KafkaContinuousDataReader(
override def createPartitionReader(): KafkaContinuousInputPartitionReader = {
new KafkaContinuousInputPartitionReader(
topicPartition, startOffset, kafkaParams, pollTimeoutMs, failOnDataLoss)
}
}
@ -187,12 +187,12 @@ case class KafkaContinuousDataReaderFactory(
* @param failOnDataLoss Flag indicating whether data reader should fail if some offsets
* are skipped.
*/
class KafkaContinuousDataReader(
class KafkaContinuousInputPartitionReader(
topicPartition: TopicPartition,
startOffset: Long,
kafkaParams: ju.Map[String, Object],
pollTimeoutMs: Long,
failOnDataLoss: Boolean) extends ContinuousDataReader[UnsafeRow] {
failOnDataLoss: Boolean) extends ContinuousInputPartitionReader[UnsafeRow] {
private val consumer = KafkaDataConsumer.acquire(topicPartition, kafkaParams, useCache = false)
private val converter = new KafkaRecordToUnsafeRowConverter

View file

@ -33,7 +33,7 @@ import org.apache.spark.sql.catalyst.expressions.UnsafeRow
import org.apache.spark.sql.execution.streaming.{HDFSMetadataLog, SerializedOffset}
import org.apache.spark.sql.kafka010.KafkaSourceProvider.{INSTRUCTION_FOR_FAIL_ON_DATA_LOSS_FALSE, INSTRUCTION_FOR_FAIL_ON_DATA_LOSS_TRUE}
import org.apache.spark.sql.sources.v2.DataSourceOptions
import org.apache.spark.sql.sources.v2.reader.{DataReader, DataReaderFactory, SupportsScanUnsafeRow}
import org.apache.spark.sql.sources.v2.reader.{InputPartition, InputPartitionReader, SupportsScanUnsafeRow}
import org.apache.spark.sql.sources.v2.reader.streaming.{MicroBatchReader, Offset}
import org.apache.spark.sql.types.StructType
import org.apache.spark.util.UninterruptibleThread
@ -101,7 +101,7 @@ private[kafka010] class KafkaMicroBatchReader(
}
}
override def createUnsafeRowReaderFactories(): ju.List[DataReaderFactory[UnsafeRow]] = {
override def planUnsafeInputPartitions(): ju.List[InputPartition[UnsafeRow]] = {
// Find the new partitions, and get their earliest offsets
val newPartitions = endPartitionOffsets.keySet.diff(startPartitionOffsets.keySet)
val newPartitionInitialOffsets = kafkaOffsetReader.fetchEarliestOffsets(newPartitions.toSeq)
@ -146,7 +146,7 @@ private[kafka010] class KafkaMicroBatchReader(
new KafkaMicroBatchDataReaderFactory(
range, executorKafkaParams, pollTimeoutMs, failOnDataLoss, reuseKafkaConsumer)
}
factories.map(_.asInstanceOf[DataReaderFactory[UnsafeRow]]).asJava
factories.map(_.asInstanceOf[InputPartition[UnsafeRow]]).asJava
}
override def getStartOffset: Offset = {
@ -299,27 +299,28 @@ private[kafka010] class KafkaMicroBatchReader(
}
}
/** A [[DataReaderFactory]] for reading Kafka data in a micro-batch streaming query. */
/** A [[InputPartition]] for reading Kafka data in a micro-batch streaming query. */
private[kafka010] case class KafkaMicroBatchDataReaderFactory(
offsetRange: KafkaOffsetRange,
executorKafkaParams: ju.Map[String, Object],
pollTimeoutMs: Long,
failOnDataLoss: Boolean,
reuseKafkaConsumer: Boolean) extends DataReaderFactory[UnsafeRow] {
reuseKafkaConsumer: Boolean) extends InputPartition[UnsafeRow] {
override def preferredLocations(): Array[String] = offsetRange.preferredLoc.toArray
override def createDataReader(): DataReader[UnsafeRow] = new KafkaMicroBatchDataReader(
offsetRange, executorKafkaParams, pollTimeoutMs, failOnDataLoss, reuseKafkaConsumer)
override def createPartitionReader(): InputPartitionReader[UnsafeRow] =
new KafkaMicroBatchInputPartitionReader(offsetRange, executorKafkaParams, pollTimeoutMs,
failOnDataLoss, reuseKafkaConsumer)
}
/** A [[DataReader]] for reading Kafka data in a micro-batch streaming query. */
private[kafka010] case class KafkaMicroBatchDataReader(
/** A [[InputPartitionReader]] for reading Kafka data in a micro-batch streaming query. */
private[kafka010] case class KafkaMicroBatchInputPartitionReader(
offsetRange: KafkaOffsetRange,
executorKafkaParams: ju.Map[String, Object],
pollTimeoutMs: Long,
failOnDataLoss: Boolean,
reuseKafkaConsumer: Boolean) extends DataReader[UnsafeRow] with Logging {
reuseKafkaConsumer: Boolean) extends InputPartitionReader[UnsafeRow] with Logging {
private val consumer = KafkaDataConsumer.acquire(
offsetRange.topicPartition, executorKafkaParams, reuseKafkaConsumer)

View file

@ -31,6 +31,7 @@ import org.apache.spark.sql.{AnalysisException, DataFrame, SaveMode, SparkSessio
import org.apache.spark.sql.execution.streaming.{Sink, Source}
import org.apache.spark.sql.sources._
import org.apache.spark.sql.sources.v2.{ContinuousReadSupport, DataSourceOptions, MicroBatchReadSupport, StreamWriteSupport}
import org.apache.spark.sql.sources.v2.reader.streaming.ContinuousInputPartitionReader
import org.apache.spark.sql.sources.v2.writer.streaming.StreamWriter
import org.apache.spark.sql.streaming.OutputMode
import org.apache.spark.sql.types.StructType
@ -149,7 +150,7 @@ private[kafka010] class KafkaSourceProvider extends DataSourceRegister
}
/**
* Creates a [[org.apache.spark.sql.sources.v2.reader.streaming.ContinuousDataReader]] to read
* Creates a [[ContinuousInputPartitionReader]] to read
* Kafka data in a continuous streaming query.
*/
override def createContinuousReader(

View file

@ -678,7 +678,7 @@ class KafkaMicroBatchV2SourceSuite extends KafkaMicroBatchSourceSuiteBase {
Optional.of[OffsetV2](KafkaSourceOffset(Map(tp -> 0L))),
Optional.of[OffsetV2](KafkaSourceOffset(Map(tp -> 100L)))
)
val factories = reader.createUnsafeRowReaderFactories().asScala
val factories = reader.planUnsafeInputPartitions().asScala
.map(_.asInstanceOf[KafkaMicroBatchDataReaderFactory])
withClue(s"minPartitions = $minPartitions generated factories $factories\n\t") {
assert(factories.size == numPartitionsGenerated)

View file

@ -34,7 +34,7 @@ public interface MicroBatchReadSupport extends DataSourceV2 {
* streaming query.
*
* The execution engine will create a micro-batch reader at the start of a streaming query,
* alternate calls to setOffsetRange and createDataReaderFactories for each batch to process, and
* alternate calls to setOffsetRange and planInputPartitions for each batch to process, and
* then call stop() when the execution is complete. Note that a single query may have multiple
* executions due to restart or failure recovery.
*

View file

@ -21,15 +21,15 @@ import org.apache.spark.annotation.InterfaceStability;
import org.apache.spark.sql.sources.v2.reader.streaming.PartitionOffset;
/**
* A mix-in interface for {@link DataReaderFactory}. Continuous data reader factories can
* implement this interface to provide creating {@link DataReader} with particular offset.
* A mix-in interface for {@link InputPartition}. Continuous input partitions can
* implement this interface to provide creating {@link InputPartitionReader} with particular offset.
*/
@InterfaceStability.Evolving
public interface ContinuousDataReaderFactory<T> extends DataReaderFactory<T> {
public interface ContinuousInputPartition<T> extends InputPartition<T> {
/**
* Create a DataReader with particular offset as its startOffset.
*
* @param offset offset want to set as the DataReader's startOffset.
*/
DataReader<T> createDataReaderWithOffset(PartitionOffset offset);
InputPartitionReader<T> createContinuousReader(PartitionOffset offset);
}

View file

@ -31,8 +31,8 @@ import org.apache.spark.sql.types.StructType;
* {@link ReadSupport#createReader(DataSourceOptions)} or
* {@link ReadSupportWithSchema#createReader(StructType, DataSourceOptions)}.
* It can mix in various query optimization interfaces to speed up the data scan. The actual scan
* logic is delegated to {@link DataReaderFactory}s that are returned by
* {@link #createDataReaderFactories()}.
* logic is delegated to {@link InputPartition}s that are returned by
* {@link #planInputPartitions()}.
*
* There are mainly 3 kinds of query optimizations:
* 1. Operators push-down. E.g., filter push-down, required columns push-down(aka column
@ -65,8 +65,8 @@ public interface DataSourceReader {
StructType readSchema();
/**
* Returns a list of reader factories. Each factory is responsible for creating a data reader to
* output data for one RDD partition. That means the number of factories returned here is same as
* Returns a list of read tasks. Each task is responsible for creating a data reader to
* output data for one RDD partition. That means the number of tasks returned here is same as
* the number of RDD partitions this scan outputs.
*
* Note that, this may not be a full scan if the data source reader mixes in other optimization
@ -76,5 +76,5 @@ public interface DataSourceReader {
* If this method fails (by throwing an exception), the action would fail and no Spark job was
* submitted.
*/
List<DataReaderFactory<Row>> createDataReaderFactories();
List<InputPartition<Row>> planInputPartitions();
}

View file

@ -22,20 +22,20 @@ import java.io.Serializable;
import org.apache.spark.annotation.InterfaceStability;
/**
* A reader factory returned by {@link DataSourceReader#createDataReaderFactories()} and is
* An input partition returned by {@link DataSourceReader#planInputPartitions()} and is
* responsible for creating the actual data reader. The relationship between
* {@link DataReaderFactory} and {@link DataReader}
* {@link InputPartition} and {@link InputPartitionReader}
* is similar to the relationship between {@link Iterable} and {@link java.util.Iterator}.
*
* Note that, the reader factory will be serialized and sent to executors, then the data reader
* will be created on executors and do the actual reading. So {@link DataReaderFactory} must be
* serializable and {@link DataReader} doesn't need to be.
* Note that input partitions will be serialized and sent to executors, then the partition reader
* will be created on executors and do the actual reading. So {@link InputPartition} must be
* serializable and {@link InputPartitionReader} doesn't need to be.
*/
@InterfaceStability.Evolving
public interface DataReaderFactory<T> extends Serializable {
public interface InputPartition<T> extends Serializable {
/**
* The preferred locations where the data reader returned by this reader factory can run faster,
* The preferred locations where the data reader returned by this partition can run faster,
* but Spark does not guarantee to run the data reader on these locations.
* The implementations should make sure that it can be run on any location.
* The location is a string representing the host name.
@ -57,5 +57,5 @@ public interface DataReaderFactory<T> extends Serializable {
* If this method fails (by throwing an exception), the corresponding Spark task would fail and
* get retried until hitting the maximum retry times.
*/
DataReader<T> createDataReader();
InputPartitionReader<T> createPartitionReader();
}

View file

@ -23,7 +23,7 @@ import java.io.IOException;
import org.apache.spark.annotation.InterfaceStability;
/**
* A data reader returned by {@link DataReaderFactory#createDataReader()} and is responsible for
* A data reader returned by {@link InputPartition#createPartitionReader()} and is responsible for
* outputting data for a RDD partition.
*
* Note that, Currently the type `T` can only be {@link org.apache.spark.sql.Row} for normal data
@ -31,7 +31,7 @@ import org.apache.spark.annotation.InterfaceStability;
* readers that mix in {@link SupportsScanUnsafeRow}.
*/
@InterfaceStability.Evolving
public interface DataReader<T> extends Closeable {
public interface InputPartitionReader<T> extends Closeable {
/**
* Proceed to next record, returns false if there is no more records.

View file

@ -24,7 +24,7 @@ import org.apache.spark.sql.sources.v2.reader.partitioning.Partitioning;
* A mix in interface for {@link DataSourceReader}. Data source readers can implement this
* interface to report data partitioning and try to avoid shuffle at Spark side.
*
* Note that, when the reader creates exactly one {@link DataReaderFactory}, Spark may avoid
* Note that, when the reader creates exactly one {@link InputPartition}, Spark may avoid
* adding a shuffle even if the reader does not implement this interface.
*/
@InterfaceStability.Evolving

View file

@ -30,22 +30,22 @@ import org.apache.spark.sql.vectorized.ColumnarBatch;
@InterfaceStability.Evolving
public interface SupportsScanColumnarBatch extends DataSourceReader {
@Override
default List<DataReaderFactory<Row>> createDataReaderFactories() {
default List<InputPartition<Row>> planInputPartitions() {
throw new IllegalStateException(
"createDataReaderFactories not supported by default within SupportsScanColumnarBatch.");
"planInputPartitions not supported by default within SupportsScanColumnarBatch.");
}
/**
* Similar to {@link DataSourceReader#createDataReaderFactories()}, but returns columnar data
* Similar to {@link DataSourceReader#planInputPartitions()}, but returns columnar data
* in batches.
*/
List<DataReaderFactory<ColumnarBatch>> createBatchDataReaderFactories();
List<InputPartition<ColumnarBatch>> planBatchInputPartitions();
/**
* Returns true if the concrete data source reader can read data in batch according to the scan
* properties like required columns, pushes filters, etc. It's possible that the implementation
* can only support some certain columns with certain types. Users can overwrite this method and
* {@link #createDataReaderFactories()} to fallback to normal read path under some conditions.
* {@link #planInputPartitions()} to fallback to normal read path under some conditions.
*/
default boolean enableBatchRead() {
return true;

View file

@ -33,14 +33,14 @@ import org.apache.spark.sql.catalyst.expressions.UnsafeRow;
public interface SupportsScanUnsafeRow extends DataSourceReader {
@Override
default List<DataReaderFactory<Row>> createDataReaderFactories() {
default List<InputPartition<Row>> planInputPartitions() {
throw new IllegalStateException(
"createDataReaderFactories not supported by default within SupportsScanUnsafeRow");
"planInputPartitions not supported by default within SupportsScanUnsafeRow");
}
/**
* Similar to {@link DataSourceReader#createDataReaderFactories()},
* Similar to {@link DataSourceReader#planInputPartitions()},
* but returns data in unsafe row format.
*/
List<DataReaderFactory<UnsafeRow>> createUnsafeRowReaderFactories();
List<InputPartition<UnsafeRow>> planUnsafeInputPartitions();
}

View file

@ -18,12 +18,12 @@
package org.apache.spark.sql.sources.v2.reader.partitioning;
import org.apache.spark.annotation.InterfaceStability;
import org.apache.spark.sql.sources.v2.reader.DataReader;
import org.apache.spark.sql.sources.v2.reader.InputPartitionReader;
/**
* A concrete implementation of {@link Distribution}. Represents a distribution where records that
* share the same values for the {@link #clusteredColumns} will be produced by the same
* {@link DataReader}.
* {@link InputPartitionReader}.
*/
@InterfaceStability.Evolving
public class ClusteredDistribution implements Distribution {

View file

@ -18,13 +18,13 @@
package org.apache.spark.sql.sources.v2.reader.partitioning;
import org.apache.spark.annotation.InterfaceStability;
import org.apache.spark.sql.sources.v2.reader.DataReader;
import org.apache.spark.sql.sources.v2.reader.InputPartitionReader;
/**
* An interface to represent data distribution requirement, which specifies how the records should
* be distributed among the data partitions(one {@link DataReader} outputs data for one partition).
* be distributed among the data partitions(one {@link InputPartitionReader} outputs data for one partition).
* Note that this interface has nothing to do with the data ordering inside one
* partition(the output records of a single {@link DataReader}).
* partition(the output records of a single {@link InputPartitionReader}).
*
* The instance of this interface is created and provided by Spark, then consumed by
* {@link Partitioning#satisfy(Distribution)}. This means data source developers don't need to

View file

@ -18,7 +18,7 @@
package org.apache.spark.sql.sources.v2.reader.partitioning;
import org.apache.spark.annotation.InterfaceStability;
import org.apache.spark.sql.sources.v2.reader.DataReaderFactory;
import org.apache.spark.sql.sources.v2.reader.InputPartition;
import org.apache.spark.sql.sources.v2.reader.SupportsReportPartitioning;
/**
@ -31,7 +31,7 @@ import org.apache.spark.sql.sources.v2.reader.SupportsReportPartitioning;
public interface Partitioning {
/**
* Returns the number of partitions(i.e., {@link DataReaderFactory}s) the data source outputs.
* Returns the number of partitions(i.e., {@link InputPartition}s) the data source outputs.
*/
int numPartitions();

View file

@ -18,13 +18,13 @@
package org.apache.spark.sql.sources.v2.reader.streaming;
import org.apache.spark.annotation.InterfaceStability;
import org.apache.spark.sql.sources.v2.reader.DataReader;
import org.apache.spark.sql.sources.v2.reader.InputPartitionReader;
/**
* A variation on {@link DataReader} for use with streaming in continuous processing mode.
* A variation on {@link InputPartitionReader} for use with streaming in continuous processing mode.
*/
@InterfaceStability.Evolving
public interface ContinuousDataReader<T> extends DataReader<T> {
public interface ContinuousInputPartitionReader<T> extends InputPartitionReader<T> {
/**
* Get the offset of the current record, or the start offset if no records have been read.
*

View file

@ -27,7 +27,7 @@ import java.util.Optional;
* A mix-in interface for {@link DataSourceReader}. Data source readers can implement this
* interface to allow reading in a continuous processing mode stream.
*
* Implementations must ensure each reader factory output is a {@link ContinuousDataReader}.
* Implementations must ensure each partition reader is a {@link ContinuousInputPartitionReader}.
*
* Note: This class currently extends {@link BaseStreamingSource} to maintain compatibility with
* DataSource V1 APIs. This extension will be removed once we get rid of V1 completely.
@ -35,7 +35,7 @@ import java.util.Optional;
@InterfaceStability.Evolving
public interface ContinuousReader extends BaseStreamingSource, DataSourceReader {
/**
* Merge partitioned offsets coming from {@link ContinuousDataReader} instances for each
* Merge partitioned offsets coming from {@link ContinuousInputPartitionReader} instances for each
* partition to a single global offset.
*/
Offset mergeOffsets(PartitionOffset[] offsets);
@ -47,7 +47,7 @@ public interface ContinuousReader extends BaseStreamingSource, DataSourceReader
Offset deserializeOffset(String json);
/**
* Set the desired start offset for reader factories created from this reader. The scan will
* Set the desired start offset for partitions created from this reader. The scan will
* start from the first record after the provided offset, or from an implementation-defined
* inferred starting point if no offset is provided.
*/
@ -61,8 +61,8 @@ public interface ContinuousReader extends BaseStreamingSource, DataSourceReader
Offset getStartOffset();
/**
* The execution engine will call this method in every epoch to determine if new reader
* factories need to be generated, which may be required if for example the underlying
* The execution engine will call this method in every epoch to determine if new input
* partitions need to be generated, which may be required if for example the underlying
* source system has had partitions added or removed.
*
* If true, the query will be shut down and restarted with a new reader.

View file

@ -33,7 +33,7 @@ import java.util.Optional;
@InterfaceStability.Evolving
public interface MicroBatchReader extends DataSourceReader, BaseStreamingSource {
/**
* Set the desired offset range for reader factories created from this reader. Reader factories
* Set the desired offset range for input partitions created from this reader. Partition readers
* will generate only data within (`start`, `end`]; that is, from the first record after `start`
* to the record with offset `end`.
*

View file

@ -22,14 +22,14 @@ import scala.reflect.ClassTag
import org.apache.spark.{InterruptibleIterator, Partition, SparkContext, TaskContext}
import org.apache.spark.rdd.RDD
import org.apache.spark.sql.sources.v2.reader.DataReaderFactory
import org.apache.spark.sql.sources.v2.reader.InputPartition
class DataSourceRDDPartition[T : ClassTag](val index: Int, val readerFactory: DataReaderFactory[T])
class DataSourceRDDPartition[T : ClassTag](val index: Int, val inputPartition: InputPartition[T])
extends Partition with Serializable
class DataSourceRDD[T: ClassTag](
sc: SparkContext,
@transient private val readerFactories: Seq[DataReaderFactory[T]])
@transient private val readerFactories: Seq[InputPartition[T]])
extends RDD[T](sc, Nil) {
override protected def getPartitions: Array[Partition] = {
@ -39,7 +39,8 @@ class DataSourceRDD[T: ClassTag](
}
override def compute(split: Partition, context: TaskContext): Iterator[T] = {
val reader = split.asInstanceOf[DataSourceRDDPartition[T]].readerFactory.createDataReader()
val reader = split.asInstanceOf[DataSourceRDDPartition[T]].inputPartition
.createPartitionReader()
context.addTaskCompletionListener(_ => reader.close())
val iter = new Iterator[T] {
private[this] var valuePrepared = false
@ -63,6 +64,6 @@ class DataSourceRDD[T: ClassTag](
}
override def getPreferredLocations(split: Partition): Seq[String] = {
split.asInstanceOf[DataSourceRDDPartition[T]].readerFactory.preferredLocations()
split.asInstanceOf[DataSourceRDDPartition[T]].inputPartition.preferredLocations()
}
}

View file

@ -59,13 +59,13 @@ case class DataSourceV2ScanExec(
}
override def outputPartitioning: physical.Partitioning = reader match {
case r: SupportsScanColumnarBatch if r.enableBatchRead() && batchReaderFactories.size == 1 =>
case r: SupportsScanColumnarBatch if r.enableBatchRead() && batchPartitions.size == 1 =>
SinglePartition
case r: SupportsScanColumnarBatch if !r.enableBatchRead() && readerFactories.size == 1 =>
case r: SupportsScanColumnarBatch if !r.enableBatchRead() && partitions.size == 1 =>
SinglePartition
case r if !r.isInstanceOf[SupportsScanColumnarBatch] && readerFactories.size == 1 =>
case r if !r.isInstanceOf[SupportsScanColumnarBatch] && partitions.size == 1 =>
SinglePartition
case s: SupportsReportPartitioning =>
@ -75,19 +75,19 @@ case class DataSourceV2ScanExec(
case _ => super.outputPartitioning
}
private lazy val readerFactories: Seq[DataReaderFactory[UnsafeRow]] = reader match {
case r: SupportsScanUnsafeRow => r.createUnsafeRowReaderFactories().asScala
private lazy val partitions: Seq[InputPartition[UnsafeRow]] = reader match {
case r: SupportsScanUnsafeRow => r.planUnsafeInputPartitions().asScala
case _ =>
reader.createDataReaderFactories().asScala.map {
new RowToUnsafeRowDataReaderFactory(_, reader.readSchema()): DataReaderFactory[UnsafeRow]
reader.planInputPartitions().asScala.map {
new RowToUnsafeRowInputPartition(_, reader.readSchema()): InputPartition[UnsafeRow]
}
}
private lazy val batchReaderFactories: Seq[DataReaderFactory[ColumnarBatch]] = reader match {
private lazy val batchPartitions: Seq[InputPartition[ColumnarBatch]] = reader match {
case r: SupportsScanColumnarBatch if r.enableBatchRead() =>
assert(!reader.isInstanceOf[ContinuousReader],
"continuous stream reader does not support columnar read yet.")
r.createBatchDataReaderFactories().asScala
r.planBatchInputPartitions().asScala
}
private lazy val inputRDD: RDD[InternalRow] = reader match {
@ -95,19 +95,18 @@ case class DataSourceV2ScanExec(
EpochCoordinatorRef.get(
sparkContext.getLocalProperty(ContinuousExecution.EPOCH_COORDINATOR_ID_KEY),
sparkContext.env)
.askSync[Unit](SetReaderPartitions(readerFactories.size))
.askSync[Unit](SetReaderPartitions(partitions.size))
new ContinuousDataSourceRDD(
sparkContext,
sqlContext.conf.continuousStreamingExecutorQueueSize,
sqlContext.conf.continuousStreamingExecutorPollIntervalMs,
readerFactories)
.asInstanceOf[RDD[InternalRow]]
partitions).asInstanceOf[RDD[InternalRow]]
case r: SupportsScanColumnarBatch if r.enableBatchRead() =>
new DataSourceRDD(sparkContext, batchReaderFactories).asInstanceOf[RDD[InternalRow]]
new DataSourceRDD(sparkContext, batchPartitions).asInstanceOf[RDD[InternalRow]]
case _ =>
new DataSourceRDD(sparkContext, readerFactories).asInstanceOf[RDD[InternalRow]]
new DataSourceRDD(sparkContext, partitions).asInstanceOf[RDD[InternalRow]]
}
override def inputRDDs(): Seq[RDD[InternalRow]] = Seq(inputRDD)
@ -132,19 +131,22 @@ case class DataSourceV2ScanExec(
}
}
class RowToUnsafeRowDataReaderFactory(rowReaderFactory: DataReaderFactory[Row], schema: StructType)
extends DataReaderFactory[UnsafeRow] {
class RowToUnsafeRowInputPartition(partition: InputPartition[Row], schema: StructType)
extends InputPartition[UnsafeRow] {
override def preferredLocations: Array[String] = rowReaderFactory.preferredLocations
override def preferredLocations: Array[String] = partition.preferredLocations
override def createDataReader: DataReader[UnsafeRow] = {
new RowToUnsafeDataReader(
rowReaderFactory.createDataReader, RowEncoder.apply(schema).resolveAndBind())
override def createPartitionReader: InputPartitionReader[UnsafeRow] = {
new RowToUnsafeInputPartitionReader(
partition.createPartitionReader, RowEncoder.apply(schema).resolveAndBind())
}
}
class RowToUnsafeDataReader(val rowReader: DataReader[Row], encoder: ExpressionEncoder[Row])
extends DataReader[UnsafeRow] {
class RowToUnsafeInputPartitionReader(
val rowReader: InputPartitionReader[Row],
encoder: ExpressionEncoder[Row])
extends InputPartitionReader[UnsafeRow] {
override def next: Boolean = rowReader.next

View file

@ -21,14 +21,14 @@ import org.apache.spark._
import org.apache.spark.rdd.RDD
import org.apache.spark.sql.{Row, SQLContext}
import org.apache.spark.sql.catalyst.expressions.UnsafeRow
import org.apache.spark.sql.execution.datasources.v2.{DataSourceRDDPartition, RowToUnsafeDataReader}
import org.apache.spark.sql.execution.datasources.v2.{DataSourceRDDPartition, RowToUnsafeInputPartitionReader}
import org.apache.spark.sql.sources.v2.reader._
import org.apache.spark.sql.sources.v2.reader.streaming.{ContinuousDataReader, PartitionOffset}
import org.apache.spark.sql.sources.v2.reader.streaming.{ContinuousInputPartitionReader, PartitionOffset}
import org.apache.spark.util.{NextIterator, ThreadUtils}
class ContinuousDataSourceRDDPartition(
val index: Int,
val readerFactory: DataReaderFactory[UnsafeRow])
val inputPartition: InputPartition[UnsafeRow])
extends Partition with Serializable {
// This is semantically a lazy val - it's initialized once the first time a call to
@ -51,12 +51,12 @@ class ContinuousDataSourceRDD(
sc: SparkContext,
dataQueueSize: Int,
epochPollIntervalMs: Long,
@transient private val readerFactories: Seq[DataReaderFactory[UnsafeRow]])
@transient private val readerFactories: Seq[InputPartition[UnsafeRow]])
extends RDD[UnsafeRow](sc, Nil) {
override protected def getPartitions: Array[Partition] = {
readerFactories.zipWithIndex.map {
case (readerFactory, index) => new ContinuousDataSourceRDDPartition(index, readerFactory)
case (inputPartition, index) => new ContinuousDataSourceRDDPartition(index, inputPartition)
}.toArray
}
@ -75,7 +75,7 @@ class ContinuousDataSourceRDD(
if (partition.queueReader == null) {
partition.queueReader =
new ContinuousQueuedDataReader(
partition.readerFactory, context, dataQueueSize, epochPollIntervalMs)
partition.inputPartition, context, dataQueueSize, epochPollIntervalMs)
}
partition.queueReader
@ -96,17 +96,17 @@ class ContinuousDataSourceRDD(
}
override def getPreferredLocations(split: Partition): Seq[String] = {
split.asInstanceOf[ContinuousDataSourceRDDPartition].readerFactory.preferredLocations()
split.asInstanceOf[ContinuousDataSourceRDDPartition].inputPartition.preferredLocations()
}
}
object ContinuousDataSourceRDD {
private[continuous] def getContinuousReader(
reader: DataReader[UnsafeRow]): ContinuousDataReader[_] = {
reader: InputPartitionReader[UnsafeRow]): ContinuousInputPartitionReader[_] = {
reader match {
case r: ContinuousDataReader[UnsafeRow] => r
case wrapped: RowToUnsafeDataReader =>
wrapped.rowReader.asInstanceOf[ContinuousDataReader[Row]]
case r: ContinuousInputPartitionReader[UnsafeRow] => r
case wrapped: RowToUnsafeInputPartitionReader =>
wrapped.rowReader.asInstanceOf[ContinuousInputPartitionReader[Row]]
case _ =>
throw new IllegalStateException(s"Unknown continuous reader type ${reader.getClass}")
}

View file

@ -18,15 +18,14 @@
package org.apache.spark.sql.execution.streaming.continuous
import java.io.Closeable
import java.util.concurrent.{ArrayBlockingQueue, BlockingQueue, TimeUnit}
import java.util.concurrent.atomic.AtomicBoolean
import java.util.concurrent.{ArrayBlockingQueue, TimeUnit}
import scala.util.control.NonFatal
import org.apache.spark.{Partition, SparkEnv, SparkException, TaskContext}
import org.apache.spark.{SparkEnv, SparkException, TaskContext}
import org.apache.spark.internal.Logging
import org.apache.spark.sql.catalyst.expressions.UnsafeRow
import org.apache.spark.sql.sources.v2.reader.{DataReader, DataReaderFactory}
import org.apache.spark.sql.sources.v2.reader.{InputPartition, InputPartitionReader}
import org.apache.spark.sql.sources.v2.reader.streaming.PartitionOffset
import org.apache.spark.util.ThreadUtils
@ -38,11 +37,11 @@ import org.apache.spark.util.ThreadUtils
* offsets across epochs. Each compute() should call the next() method here until null is returned.
*/
class ContinuousQueuedDataReader(
factory: DataReaderFactory[UnsafeRow],
partition: InputPartition[UnsafeRow],
context: TaskContext,
dataQueueSize: Int,
epochPollIntervalMs: Long) extends Closeable {
private val reader = factory.createDataReader()
private val reader = partition.createPartitionReader()
// Important sequencing - we must get our starting point before the provider threads start running
private var currentOffset: PartitionOffset =
@ -132,7 +131,7 @@ class ContinuousQueuedDataReader(
/**
* The data component of [[ContinuousQueuedDataReader]]. Pushes (row, offset) to the queue when
* a new row arrives to the [[DataReader]].
* a new row arrives to the [[InputPartitionReader]].
*/
class DataReaderThread extends Thread(
s"continuous-reader--${context.partitionId()}--" +

View file

@ -28,7 +28,7 @@ import org.apache.spark.sql.execution.streaming.{RateStreamOffset, ValueRunTimeM
import org.apache.spark.sql.execution.streaming.sources.RateStreamProvider
import org.apache.spark.sql.sources.v2.DataSourceOptions
import org.apache.spark.sql.sources.v2.reader._
import org.apache.spark.sql.sources.v2.reader.streaming.{ContinuousDataReader, ContinuousReader, Offset, PartitionOffset}
import org.apache.spark.sql.sources.v2.reader.streaming.{ContinuousInputPartitionReader, ContinuousReader, Offset, PartitionOffset}
import org.apache.spark.sql.types.StructType
case class RateStreamPartitionOffset(
@ -67,7 +67,7 @@ class RateStreamContinuousReader(options: DataSourceOptions)
override def getStartOffset(): Offset = offset
override def createDataReaderFactories(): java.util.List[DataReaderFactory[Row]] = {
override def planInputPartitions(): java.util.List[InputPartition[Row]] = {
val partitionStartMap = offset match {
case off: RateStreamOffset => off.partitionToValueAndRunTimeMs
case off =>
@ -91,7 +91,7 @@ class RateStreamContinuousReader(options: DataSourceOptions)
i,
numPartitions,
perPartitionRate)
.asInstanceOf[DataReaderFactory[Row]]
.asInstanceOf[InputPartition[Row]]
}.asJava
}
@ -119,13 +119,13 @@ case class RateStreamContinuousDataReaderFactory(
partitionIndex: Int,
increment: Long,
rowsPerSecond: Double)
extends ContinuousDataReaderFactory[Row] {
extends ContinuousInputPartition[Row] {
override def createDataReaderWithOffset(offset: PartitionOffset): DataReader[Row] = {
override def createContinuousReader(offset: PartitionOffset): InputPartitionReader[Row] = {
val rateStreamOffset = offset.asInstanceOf[RateStreamPartitionOffset]
require(rateStreamOffset.partition == partitionIndex,
s"Expected partitionIndex: $partitionIndex, but got: ${rateStreamOffset.partition}")
new RateStreamContinuousDataReader(
new RateStreamContinuousInputPartitionReader(
rateStreamOffset.currentValue,
rateStreamOffset.currentTimeMs,
partitionIndex,
@ -133,18 +133,18 @@ case class RateStreamContinuousDataReaderFactory(
rowsPerSecond)
}
override def createDataReader(): DataReader[Row] =
new RateStreamContinuousDataReader(
override def createPartitionReader(): InputPartitionReader[Row] =
new RateStreamContinuousInputPartitionReader(
startValue, startTimeMs, partitionIndex, increment, rowsPerSecond)
}
class RateStreamContinuousDataReader(
class RateStreamContinuousInputPartitionReader(
startValue: Long,
startTimeMs: Long,
partitionIndex: Int,
increment: Long,
rowsPerSecond: Double)
extends ContinuousDataReader[Row] {
extends ContinuousInputPartitionReader[Row] {
private var nextReadTime: Long = startTimeMs
private val readTimeIncrement: Long = (1000 / rowsPerSecond).toLong

View file

@ -33,7 +33,7 @@ import org.apache.spark.sql.catalyst.expressions.{Attribute, UnsafeRow}
import org.apache.spark.sql.catalyst.plans.logical.{LeafNode, LogicalPlan, Statistics}
import org.apache.spark.sql.catalyst.plans.logical.statsEstimation.EstimationUtils
import org.apache.spark.sql.catalyst.streaming.InternalOutputModes._
import org.apache.spark.sql.sources.v2.reader.{DataReader, DataReaderFactory, SupportsScanUnsafeRow}
import org.apache.spark.sql.sources.v2.reader.{InputPartition, InputPartitionReader, SupportsScanUnsafeRow}
import org.apache.spark.sql.sources.v2.reader.streaming.{MicroBatchReader, Offset => OffsetV2}
import org.apache.spark.sql.streaming.OutputMode
import org.apache.spark.sql.types.StructType
@ -139,7 +139,7 @@ case class MemoryStream[A : Encoder](id: Int, sqlContext: SQLContext)
if (endOffset.offset == -1) null else endOffset
}
override def createUnsafeRowReaderFactories(): ju.List[DataReaderFactory[UnsafeRow]] = {
override def planUnsafeInputPartitions(): ju.List[InputPartition[UnsafeRow]] = {
synchronized {
// Compute the internal batch numbers to fetch: [startOrdinal, endOrdinal)
val startOrdinal = startOffset.offset.toInt + 1
@ -156,7 +156,7 @@ case class MemoryStream[A : Encoder](id: Int, sqlContext: SQLContext)
logDebug(generateDebugString(newBlocks.flatten, startOrdinal, endOrdinal))
newBlocks.map { block =>
new MemoryStreamDataReaderFactory(block).asInstanceOf[DataReaderFactory[UnsafeRow]]
new MemoryStreamDataReaderFactory(block).asInstanceOf[InputPartition[UnsafeRow]]
}.asJava
}
}
@ -202,9 +202,9 @@ case class MemoryStream[A : Encoder](id: Int, sqlContext: SQLContext)
class MemoryStreamDataReaderFactory(records: Array[UnsafeRow])
extends DataReaderFactory[UnsafeRow] {
override def createDataReader(): DataReader[UnsafeRow] = {
new DataReader[UnsafeRow] {
extends InputPartition[UnsafeRow] {
override def createPartitionReader(): InputPartitionReader[UnsafeRow] = {
new InputPartitionReader[UnsafeRow] {
private var currentIndex = -1
override def next(): Boolean = {

View file

@ -34,8 +34,8 @@ import org.apache.spark.sql.{Encoder, Row, SQLContext}
import org.apache.spark.sql.execution.streaming._
import org.apache.spark.sql.execution.streaming.sources.ContinuousMemoryStream.GetRecord
import org.apache.spark.sql.sources.v2.{ContinuousReadSupport, DataSourceOptions}
import org.apache.spark.sql.sources.v2.reader.DataReaderFactory
import org.apache.spark.sql.sources.v2.reader.streaming.{ContinuousDataReader, ContinuousReader, Offset, PartitionOffset}
import org.apache.spark.sql.sources.v2.reader.InputPartition
import org.apache.spark.sql.sources.v2.reader.streaming.{ContinuousInputPartitionReader, ContinuousReader, Offset, PartitionOffset}
import org.apache.spark.sql.types.StructType
import org.apache.spark.util.RpcUtils
@ -99,7 +99,7 @@ class ContinuousMemoryStream[A : Encoder](id: Int, sqlContext: SQLContext)
)
}
override def createDataReaderFactories(): ju.List[DataReaderFactory[Row]] = {
override def planInputPartitions(): ju.List[InputPartition[Row]] = {
synchronized {
val endpointName = s"ContinuousMemoryStreamRecordEndpoint-${java.util.UUID.randomUUID()}-$id"
endpointRef =
@ -108,7 +108,7 @@ class ContinuousMemoryStream[A : Encoder](id: Int, sqlContext: SQLContext)
startOffset.partitionNums.map {
case (part, index) =>
new ContinuousMemoryStreamDataReaderFactory(
endpointName, part, index): DataReaderFactory[Row]
endpointName, part, index): InputPartition[Row]
}.toList.asJava
}
}
@ -160,9 +160,9 @@ object ContinuousMemoryStream {
class ContinuousMemoryStreamDataReaderFactory(
driverEndpointName: String,
partition: Int,
startOffset: Int) extends DataReaderFactory[Row] {
override def createDataReader: ContinuousMemoryStreamDataReader =
new ContinuousMemoryStreamDataReader(driverEndpointName, partition, startOffset)
startOffset: Int) extends InputPartition[Row] {
override def createPartitionReader: ContinuousMemoryStreamInputPartitionReader =
new ContinuousMemoryStreamInputPartitionReader(driverEndpointName, partition, startOffset)
}
/**
@ -170,10 +170,10 @@ class ContinuousMemoryStreamDataReaderFactory(
*
* Polls the driver endpoint for new records.
*/
class ContinuousMemoryStreamDataReader(
class ContinuousMemoryStreamInputPartitionReader(
driverEndpointName: String,
partition: Int,
startOffset: Int) extends ContinuousDataReader[Row] {
startOffset: Int) extends ContinuousInputPartitionReader[Row] {
private val endpoint = RpcUtils.makeDriverRef(
driverEndpointName,
SparkEnv.get.conf,

View file

@ -134,7 +134,7 @@ class RateStreamMicroBatchReader(options: DataSourceOptions, checkpointLocation:
LongOffset(json.toLong)
}
override def createDataReaderFactories(): java.util.List[DataReaderFactory[Row]] = {
override def planInputPartitions(): java.util.List[InputPartition[Row]] = {
val startSeconds = LongOffset.convert(start).map(_.offset).getOrElse(0L)
val endSeconds = LongOffset.convert(end).map(_.offset).getOrElse(0L)
assert(startSeconds <= endSeconds, s"startSeconds($startSeconds) > endSeconds($endSeconds)")
@ -169,7 +169,7 @@ class RateStreamMicroBatchReader(options: DataSourceOptions, checkpointLocation:
(0 until numPartitions).map { p =>
new RateStreamMicroBatchDataReaderFactory(
p, numPartitions, rangeStart, rangeEnd, localStartTimeMs, relativeMsPerValue)
: DataReaderFactory[Row]
: InputPartition[Row]
}.toList.asJava
}
@ -188,19 +188,20 @@ class RateStreamMicroBatchDataReaderFactory(
rangeStart: Long,
rangeEnd: Long,
localStartTimeMs: Long,
relativeMsPerValue: Double) extends DataReaderFactory[Row] {
relativeMsPerValue: Double) extends InputPartition[Row] {
override def createDataReader(): DataReader[Row] = new RateStreamMicroBatchDataReader(
partitionId, numPartitions, rangeStart, rangeEnd, localStartTimeMs, relativeMsPerValue)
override def createPartitionReader(): InputPartitionReader[Row] =
new RateStreamMicroBatchInputPartitionReader(partitionId, numPartitions, rangeStart, rangeEnd,
localStartTimeMs, relativeMsPerValue)
}
class RateStreamMicroBatchDataReader(
class RateStreamMicroBatchInputPartitionReader(
partitionId: Int,
numPartitions: Int,
rangeStart: Long,
rangeEnd: Long,
localStartTimeMs: Long,
relativeMsPerValue: Double) extends DataReader[Row] {
relativeMsPerValue: Double) extends InputPartitionReader[Row] {
private var count = 0
override def next(): Boolean = {

View file

@ -33,7 +33,7 @@ import org.apache.spark.sql._
import org.apache.spark.sql.execution.streaming.LongOffset
import org.apache.spark.sql.sources.DataSourceRegister
import org.apache.spark.sql.sources.v2.{DataSourceOptions, DataSourceV2, MicroBatchReadSupport}
import org.apache.spark.sql.sources.v2.reader.{DataReader, DataReaderFactory}
import org.apache.spark.sql.sources.v2.reader.{InputPartition, InputPartitionReader}
import org.apache.spark.sql.sources.v2.reader.streaming.{MicroBatchReader, Offset}
import org.apache.spark.sql.types.{StringType, StructField, StructType, TimestampType}
@ -140,7 +140,7 @@ class TextSocketMicroBatchReader(options: DataSourceOptions) extends MicroBatchR
}
}
override def createDataReaderFactories(): JList[DataReaderFactory[Row]] = {
override def planInputPartitions(): JList[InputPartition[Row]] = {
assert(startOffset != null && endOffset != null,
"start offset and end offset should already be set before create read tasks.")
@ -165,21 +165,22 @@ class TextSocketMicroBatchReader(options: DataSourceOptions) extends MicroBatchR
(0 until numPartitions).map { i =>
val slice = slices(i)
new DataReaderFactory[Row] {
override def createDataReader(): DataReader[Row] = new DataReader[Row] {
private var currentIdx = -1
new InputPartition[Row] {
override def createPartitionReader(): InputPartitionReader[Row] =
new InputPartitionReader[Row] {
private var currentIdx = -1
override def next(): Boolean = {
currentIdx += 1
currentIdx < slice.size
override def next(): Boolean = {
currentIdx += 1
currentIdx < slice.size
}
override def get(): Row = {
Row(slice(currentIdx)._1, slice(currentIdx)._2)
}
override def close(): Unit = {}
}
override def get(): Row = {
Row(slice(currentIdx)._1, slice(currentIdx)._2)
}
override def close(): Unit = {}
}
}
}.toList.asJava
}

View file

@ -79,8 +79,8 @@ public class JavaAdvancedDataSourceV2 implements DataSourceV2, ReadSupport {
}
@Override
public List<DataReaderFactory<Row>> createDataReaderFactories() {
List<DataReaderFactory<Row>> res = new ArrayList<>();
public List<InputPartition<Row>> planInputPartitions() {
List<InputPartition<Row>> res = new ArrayList<>();
Integer lowerBound = null;
for (Filter filter : filters) {
@ -94,33 +94,33 @@ public class JavaAdvancedDataSourceV2 implements DataSourceV2, ReadSupport {
}
if (lowerBound == null) {
res.add(new JavaAdvancedDataReaderFactory(0, 5, requiredSchema));
res.add(new JavaAdvancedDataReaderFactory(5, 10, requiredSchema));
res.add(new JavaAdvancedInputPartition(0, 5, requiredSchema));
res.add(new JavaAdvancedInputPartition(5, 10, requiredSchema));
} else if (lowerBound < 4) {
res.add(new JavaAdvancedDataReaderFactory(lowerBound + 1, 5, requiredSchema));
res.add(new JavaAdvancedDataReaderFactory(5, 10, requiredSchema));
res.add(new JavaAdvancedInputPartition(lowerBound + 1, 5, requiredSchema));
res.add(new JavaAdvancedInputPartition(5, 10, requiredSchema));
} else if (lowerBound < 9) {
res.add(new JavaAdvancedDataReaderFactory(lowerBound + 1, 10, requiredSchema));
res.add(new JavaAdvancedInputPartition(lowerBound + 1, 10, requiredSchema));
}
return res;
}
}
static class JavaAdvancedDataReaderFactory implements DataReaderFactory<Row>, DataReader<Row> {
static class JavaAdvancedInputPartition implements InputPartition<Row>, InputPartitionReader<Row> {
private int start;
private int end;
private StructType requiredSchema;
JavaAdvancedDataReaderFactory(int start, int end, StructType requiredSchema) {
JavaAdvancedInputPartition(int start, int end, StructType requiredSchema) {
this.start = start;
this.end = end;
this.requiredSchema = requiredSchema;
}
@Override
public DataReader<Row> createDataReader() {
return new JavaAdvancedDataReaderFactory(start - 1, end, requiredSchema);
public InputPartitionReader<Row> createPartitionReader() {
return new JavaAdvancedInputPartition(start - 1, end, requiredSchema);
}
@Override

View file

@ -42,14 +42,14 @@ public class JavaBatchDataSourceV2 implements DataSourceV2, ReadSupport {
}
@Override
public List<DataReaderFactory<ColumnarBatch>> createBatchDataReaderFactories() {
public List<InputPartition<ColumnarBatch>> planBatchInputPartitions() {
return java.util.Arrays.asList(
new JavaBatchDataReaderFactory(0, 50), new JavaBatchDataReaderFactory(50, 90));
new JavaBatchInputPartition(0, 50), new JavaBatchInputPartition(50, 90));
}
}
static class JavaBatchDataReaderFactory
implements DataReaderFactory<ColumnarBatch>, DataReader<ColumnarBatch> {
static class JavaBatchInputPartition
implements InputPartition<ColumnarBatch>, InputPartitionReader<ColumnarBatch> {
private int start;
private int end;
@ -59,13 +59,13 @@ public class JavaBatchDataSourceV2 implements DataSourceV2, ReadSupport {
private OnHeapColumnVector j;
private ColumnarBatch batch;
JavaBatchDataReaderFactory(int start, int end) {
JavaBatchInputPartition(int start, int end) {
this.start = start;
this.end = end;
}
@Override
public DataReader<ColumnarBatch> createDataReader() {
public InputPartitionReader<ColumnarBatch> createPartitionReader() {
this.i = new OnHeapColumnVector(BATCH_SIZE, DataTypes.IntegerType);
this.j = new OnHeapColumnVector(BATCH_SIZE, DataTypes.IntegerType);
ColumnVector[] vectors = new ColumnVector[2];

View file

@ -43,10 +43,10 @@ public class JavaPartitionAwareDataSource implements DataSourceV2, ReadSupport {
}
@Override
public List<DataReaderFactory<Row>> createDataReaderFactories() {
public List<InputPartition<Row>> planInputPartitions() {
return java.util.Arrays.asList(
new SpecificDataReaderFactory(new int[]{1, 1, 3}, new int[]{4, 4, 6}),
new SpecificDataReaderFactory(new int[]{2, 4, 4}, new int[]{6, 2, 2}));
new SpecificInputPartition(new int[]{1, 1, 3}, new int[]{4, 4, 6}),
new SpecificInputPartition(new int[]{2, 4, 4}, new int[]{6, 2, 2}));
}
@Override
@ -73,12 +73,12 @@ public class JavaPartitionAwareDataSource implements DataSourceV2, ReadSupport {
}
}
static class SpecificDataReaderFactory implements DataReaderFactory<Row>, DataReader<Row> {
static class SpecificInputPartition implements InputPartition<Row>, InputPartitionReader<Row> {
private int[] i;
private int[] j;
private int current = -1;
SpecificDataReaderFactory(int[] i, int[] j) {
SpecificInputPartition(int[] i, int[] j) {
assert i.length == j.length;
this.i = i;
this.j = j;
@ -101,7 +101,7 @@ public class JavaPartitionAwareDataSource implements DataSourceV2, ReadSupport {
}
@Override
public DataReader<Row> createDataReader() {
public InputPartitionReader<Row> createPartitionReader() {
return this;
}
}

View file

@ -24,7 +24,7 @@ import org.apache.spark.sql.sources.v2.DataSourceOptions;
import org.apache.spark.sql.sources.v2.DataSourceV2;
import org.apache.spark.sql.sources.v2.ReadSupportWithSchema;
import org.apache.spark.sql.sources.v2.reader.DataSourceReader;
import org.apache.spark.sql.sources.v2.reader.DataReaderFactory;
import org.apache.spark.sql.sources.v2.reader.InputPartition;
import org.apache.spark.sql.types.StructType;
public class JavaSchemaRequiredDataSource implements DataSourceV2, ReadSupportWithSchema {
@ -42,7 +42,7 @@ public class JavaSchemaRequiredDataSource implements DataSourceV2, ReadSupportWi
}
@Override
public List<DataReaderFactory<Row>> createDataReaderFactories() {
public List<InputPartition<Row>> planInputPartitions() {
return java.util.Collections.emptyList();
}
}

View file

@ -25,8 +25,8 @@ import org.apache.spark.sql.catalyst.expressions.GenericRow;
import org.apache.spark.sql.sources.v2.DataSourceV2;
import org.apache.spark.sql.sources.v2.DataSourceOptions;
import org.apache.spark.sql.sources.v2.ReadSupport;
import org.apache.spark.sql.sources.v2.reader.DataReader;
import org.apache.spark.sql.sources.v2.reader.DataReaderFactory;
import org.apache.spark.sql.sources.v2.reader.InputPartitionReader;
import org.apache.spark.sql.sources.v2.reader.InputPartition;
import org.apache.spark.sql.sources.v2.reader.DataSourceReader;
import org.apache.spark.sql.types.StructType;
@ -41,25 +41,25 @@ public class JavaSimpleDataSourceV2 implements DataSourceV2, ReadSupport {
}
@Override
public List<DataReaderFactory<Row>> createDataReaderFactories() {
public List<InputPartition<Row>> planInputPartitions() {
return java.util.Arrays.asList(
new JavaSimpleDataReaderFactory(0, 5),
new JavaSimpleDataReaderFactory(5, 10));
new JavaSimpleInputPartition(0, 5),
new JavaSimpleInputPartition(5, 10));
}
}
static class JavaSimpleDataReaderFactory implements DataReaderFactory<Row>, DataReader<Row> {
static class JavaSimpleInputPartition implements InputPartition<Row>, InputPartitionReader<Row> {
private int start;
private int end;
JavaSimpleDataReaderFactory(int start, int end) {
JavaSimpleInputPartition(int start, int end) {
this.start = start;
this.end = end;
}
@Override
public DataReader<Row> createDataReader() {
return new JavaSimpleDataReaderFactory(start - 1, end);
public InputPartitionReader<Row> createPartitionReader() {
return new JavaSimpleInputPartition(start - 1, end);
}
@Override

View file

@ -38,20 +38,20 @@ public class JavaUnsafeRowDataSourceV2 implements DataSourceV2, ReadSupport {
}
@Override
public List<DataReaderFactory<UnsafeRow>> createUnsafeRowReaderFactories() {
public List<InputPartition<UnsafeRow>> planUnsafeInputPartitions() {
return java.util.Arrays.asList(
new JavaUnsafeRowDataReaderFactory(0, 5),
new JavaUnsafeRowDataReaderFactory(5, 10));
new JavaUnsafeRowInputPartition(0, 5),
new JavaUnsafeRowInputPartition(5, 10));
}
}
static class JavaUnsafeRowDataReaderFactory
implements DataReaderFactory<UnsafeRow>, DataReader<UnsafeRow> {
static class JavaUnsafeRowInputPartition
implements InputPartition<UnsafeRow>, InputPartitionReader<UnsafeRow> {
private int start;
private int end;
private UnsafeRow row;
JavaUnsafeRowDataReaderFactory(int start, int end) {
JavaUnsafeRowInputPartition(int start, int end) {
this.start = start;
this.end = end;
this.row = new UnsafeRow(2);
@ -59,8 +59,8 @@ public class JavaUnsafeRowDataSourceV2 implements DataSourceV2, ReadSupport {
}
@Override
public DataReader<UnsafeRow> createDataReader() {
return new JavaUnsafeRowDataReaderFactory(start - 1, end);
public InputPartitionReader<UnsafeRow> createPartitionReader() {
return new JavaUnsafeRowInputPartition(start - 1, end);
}
@Override

View file

@ -142,9 +142,9 @@ class RateSourceSuite extends StreamTest {
val startOffset = LongOffset(0L)
val endOffset = LongOffset(1L)
reader.setOffsetRange(Optional.of(startOffset), Optional.of(endOffset))
val tasks = reader.createDataReaderFactories()
val tasks = reader.planInputPartitions()
assert(tasks.size == 1)
val dataReader = tasks.get(0).createDataReader()
val dataReader = tasks.get(0).createPartitionReader()
val data = ArrayBuffer[Row]()
while (dataReader.next()) {
data.append(dataReader.get())
@ -159,11 +159,11 @@ class RateSourceSuite extends StreamTest {
val startOffset = LongOffset(0L)
val endOffset = LongOffset(1L)
reader.setOffsetRange(Optional.of(startOffset), Optional.of(endOffset))
val tasks = reader.createDataReaderFactories()
val tasks = reader.planInputPartitions()
assert(tasks.size == 11)
val readData = tasks.asScala
.map(_.createDataReader())
.map(_.createPartitionReader())
.flatMap { reader =>
val buf = scala.collection.mutable.ListBuffer[Row]()
while (reader.next()) buf.append(reader.get())
@ -304,7 +304,7 @@ class RateSourceSuite extends StreamTest {
val reader = new RateStreamContinuousReader(
new DataSourceOptions(Map("numPartitions" -> "2", "rowsPerSecond" -> "20").asJava))
reader.setStartOffset(Optional.empty())
val tasks = reader.createDataReaderFactories()
val tasks = reader.planInputPartitions()
assert(tasks.size == 2)
val data = scala.collection.mutable.ListBuffer[Row]()
@ -314,7 +314,7 @@ class RateSourceSuite extends StreamTest {
.asInstanceOf[RateStreamOffset]
.partitionToValueAndRunTimeMs(t.partitionIndex)
.runTimeMs
val r = t.createDataReader().asInstanceOf[RateStreamContinuousDataReader]
val r = t.createPartitionReader().asInstanceOf[RateStreamContinuousInputPartitionReader]
for (rowIndex <- 0 to 9) {
r.next()
data.append(r.get())

View file

@ -346,8 +346,8 @@ class SimpleSinglePartitionSource extends DataSourceV2 with ReadSupport {
class Reader extends DataSourceReader {
override def readSchema(): StructType = new StructType().add("i", "int").add("j", "int")
override def createDataReaderFactories(): JList[DataReaderFactory[Row]] = {
java.util.Arrays.asList(new SimpleDataReaderFactory(0, 5))
override def planInputPartitions(): JList[InputPartition[Row]] = {
java.util.Arrays.asList(new SimpleInputPartition(0, 5))
}
}
@ -359,20 +359,21 @@ class SimpleDataSourceV2 extends DataSourceV2 with ReadSupport {
class Reader extends DataSourceReader {
override def readSchema(): StructType = new StructType().add("i", "int").add("j", "int")
override def createDataReaderFactories(): JList[DataReaderFactory[Row]] = {
java.util.Arrays.asList(new SimpleDataReaderFactory(0, 5), new SimpleDataReaderFactory(5, 10))
override def planInputPartitions(): JList[InputPartition[Row]] = {
java.util.Arrays.asList(new SimpleInputPartition(0, 5), new SimpleInputPartition(5, 10))
}
}
override def createReader(options: DataSourceOptions): DataSourceReader = new Reader
}
class SimpleDataReaderFactory(start: Int, end: Int)
extends DataReaderFactory[Row]
with DataReader[Row] {
class SimpleInputPartition(start: Int, end: Int)
extends InputPartition[Row]
with InputPartitionReader[Row] {
private var current = start - 1
override def createDataReader(): DataReader[Row] = new SimpleDataReaderFactory(start, end)
override def createPartitionReader(): InputPartitionReader[Row] =
new SimpleInputPartition(start, end)
override def next(): Boolean = {
current += 1
@ -413,21 +414,21 @@ class AdvancedDataSourceV2 extends DataSourceV2 with ReadSupport {
requiredSchema
}
override def createDataReaderFactories(): JList[DataReaderFactory[Row]] = {
override def planInputPartitions(): JList[InputPartition[Row]] = {
val lowerBound = filters.collect {
case GreaterThan("i", v: Int) => v
}.headOption
val res = new ArrayList[DataReaderFactory[Row]]
val res = new ArrayList[InputPartition[Row]]
if (lowerBound.isEmpty) {
res.add(new AdvancedDataReaderFactory(0, 5, requiredSchema))
res.add(new AdvancedDataReaderFactory(5, 10, requiredSchema))
res.add(new AdvancedInputPartition(0, 5, requiredSchema))
res.add(new AdvancedInputPartition(5, 10, requiredSchema))
} else if (lowerBound.get < 4) {
res.add(new AdvancedDataReaderFactory(lowerBound.get + 1, 5, requiredSchema))
res.add(new AdvancedDataReaderFactory(5, 10, requiredSchema))
res.add(new AdvancedInputPartition(lowerBound.get + 1, 5, requiredSchema))
res.add(new AdvancedInputPartition(5, 10, requiredSchema))
} else if (lowerBound.get < 9) {
res.add(new AdvancedDataReaderFactory(lowerBound.get + 1, 10, requiredSchema))
res.add(new AdvancedInputPartition(lowerBound.get + 1, 10, requiredSchema))
}
res
@ -437,13 +438,13 @@ class AdvancedDataSourceV2 extends DataSourceV2 with ReadSupport {
override def createReader(options: DataSourceOptions): DataSourceReader = new Reader
}
class AdvancedDataReaderFactory(start: Int, end: Int, requiredSchema: StructType)
extends DataReaderFactory[Row] with DataReader[Row] {
class AdvancedInputPartition(start: Int, end: Int, requiredSchema: StructType)
extends InputPartition[Row] with InputPartitionReader[Row] {
private var current = start - 1
override def createDataReader(): DataReader[Row] = {
new AdvancedDataReaderFactory(start, end, requiredSchema)
override def createPartitionReader(): InputPartitionReader[Row] = {
new AdvancedInputPartition(start, end, requiredSchema)
}
override def close(): Unit = {}
@ -468,24 +469,24 @@ class UnsafeRowDataSourceV2 extends DataSourceV2 with ReadSupport {
class Reader extends DataSourceReader with SupportsScanUnsafeRow {
override def readSchema(): StructType = new StructType().add("i", "int").add("j", "int")
override def createUnsafeRowReaderFactories(): JList[DataReaderFactory[UnsafeRow]] = {
java.util.Arrays.asList(new UnsafeRowDataReaderFactory(0, 5),
new UnsafeRowDataReaderFactory(5, 10))
override def planUnsafeInputPartitions(): JList[InputPartition[UnsafeRow]] = {
java.util.Arrays.asList(new UnsafeRowInputPartitionReader(0, 5),
new UnsafeRowInputPartitionReader(5, 10))
}
}
override def createReader(options: DataSourceOptions): DataSourceReader = new Reader
}
class UnsafeRowDataReaderFactory(start: Int, end: Int)
extends DataReaderFactory[UnsafeRow] with DataReader[UnsafeRow] {
class UnsafeRowInputPartitionReader(start: Int, end: Int)
extends InputPartition[UnsafeRow] with InputPartitionReader[UnsafeRow] {
private val row = new UnsafeRow(2)
row.pointTo(new Array[Byte](8 * 3), 8 * 3)
private var current = start - 1
override def createDataReader(): DataReader[UnsafeRow] = this
override def createPartitionReader(): InputPartitionReader[UnsafeRow] = this
override def next(): Boolean = {
current += 1
@ -503,7 +504,7 @@ class UnsafeRowDataReaderFactory(start: Int, end: Int)
class SchemaRequiredDataSource extends DataSourceV2 with ReadSupportWithSchema {
class Reader(val readSchema: StructType) extends DataSourceReader {
override def createDataReaderFactories(): JList[DataReaderFactory[Row]] =
override def planInputPartitions(): JList[InputPartition[Row]] =
java.util.Collections.emptyList()
}
@ -516,16 +517,17 @@ class BatchDataSourceV2 extends DataSourceV2 with ReadSupport {
class Reader extends DataSourceReader with SupportsScanColumnarBatch {
override def readSchema(): StructType = new StructType().add("i", "int").add("j", "int")
override def createBatchDataReaderFactories(): JList[DataReaderFactory[ColumnarBatch]] = {
java.util.Arrays.asList(new BatchDataReaderFactory(0, 50), new BatchDataReaderFactory(50, 90))
override def planBatchInputPartitions(): JList[InputPartition[ColumnarBatch]] = {
java.util.Arrays.asList(
new BatchInputPartitionReader(0, 50), new BatchInputPartitionReader(50, 90))
}
}
override def createReader(options: DataSourceOptions): DataSourceReader = new Reader
}
class BatchDataReaderFactory(start: Int, end: Int)
extends DataReaderFactory[ColumnarBatch] with DataReader[ColumnarBatch] {
class BatchInputPartitionReader(start: Int, end: Int)
extends InputPartition[ColumnarBatch] with InputPartitionReader[ColumnarBatch] {
private final val BATCH_SIZE = 20
private lazy val i = new OnHeapColumnVector(BATCH_SIZE, IntegerType)
@ -534,7 +536,7 @@ class BatchDataReaderFactory(start: Int, end: Int)
private var current = start
override def createDataReader(): DataReader[ColumnarBatch] = this
override def createPartitionReader(): InputPartitionReader[ColumnarBatch] = this
override def next(): Boolean = {
i.reset()
@ -568,11 +570,11 @@ class PartitionAwareDataSource extends DataSourceV2 with ReadSupport {
class Reader extends DataSourceReader with SupportsReportPartitioning {
override def readSchema(): StructType = new StructType().add("a", "int").add("b", "int")
override def createDataReaderFactories(): JList[DataReaderFactory[Row]] = {
override def planInputPartitions(): JList[InputPartition[Row]] = {
// Note that we don't have same value of column `a` across partitions.
java.util.Arrays.asList(
new SpecificDataReaderFactory(Array(1, 1, 3), Array(4, 4, 6)),
new SpecificDataReaderFactory(Array(2, 4, 4), Array(6, 2, 2)))
new SpecificInputPartitionReader(Array(1, 1, 3), Array(4, 4, 6)),
new SpecificInputPartitionReader(Array(2, 4, 4), Array(6, 2, 2)))
}
override def outputPartitioning(): Partitioning = new MyPartitioning
@ -590,14 +592,14 @@ class PartitionAwareDataSource extends DataSourceV2 with ReadSupport {
override def createReader(options: DataSourceOptions): DataSourceReader = new Reader
}
class SpecificDataReaderFactory(i: Array[Int], j: Array[Int])
extends DataReaderFactory[Row]
with DataReader[Row] {
class SpecificInputPartitionReader(i: Array[Int], j: Array[Int])
extends InputPartition[Row]
with InputPartitionReader[Row] {
assert(i.length == j.length)
private var current = -1
override def createDataReader(): DataReader[Row] = this
override def createPartitionReader(): InputPartitionReader[Row] = this
override def next(): Boolean = {
current += 1

View file

@ -28,7 +28,7 @@ import org.apache.hadoop.fs.{FileSystem, FSDataInputStream, Path}
import org.apache.spark.SparkContext
import org.apache.spark.sql.{Row, SaveMode}
import org.apache.spark.sql.catalyst.InternalRow
import org.apache.spark.sql.sources.v2.reader.{DataReader, DataReaderFactory, DataSourceReader}
import org.apache.spark.sql.sources.v2.reader.{DataSourceReader, InputPartition, InputPartitionReader}
import org.apache.spark.sql.sources.v2.writer._
import org.apache.spark.sql.types.{DataType, StructType}
import org.apache.spark.util.SerializableConfiguration
@ -45,7 +45,7 @@ class SimpleWritableDataSource extends DataSourceV2 with ReadSupport with WriteS
class Reader(path: String, conf: Configuration) extends DataSourceReader {
override def readSchema(): StructType = schema
override def createDataReaderFactories(): JList[DataReaderFactory[Row]] = {
override def planInputPartitions(): JList[InputPartition[Row]] = {
val dataPath = new Path(path)
val fs = dataPath.getFileSystem(conf)
if (fs.exists(dataPath)) {
@ -54,9 +54,9 @@ class SimpleWritableDataSource extends DataSourceV2 with ReadSupport with WriteS
name.startsWith("_") || name.startsWith(".")
}.map { f =>
val serializableConf = new SerializableConfiguration(conf)
new SimpleCSVDataReaderFactory(
new SimpleCSVInputPartitionReader(
f.getPath.toUri.toString,
serializableConf): DataReaderFactory[Row]
serializableConf): InputPartition[Row]
}.toList.asJava
} else {
Collections.emptyList()
@ -156,14 +156,14 @@ class SimpleWritableDataSource extends DataSourceV2 with ReadSupport with WriteS
}
}
class SimpleCSVDataReaderFactory(path: String, conf: SerializableConfiguration)
extends DataReaderFactory[Row] with DataReader[Row] {
class SimpleCSVInputPartitionReader(path: String, conf: SerializableConfiguration)
extends InputPartition[Row] with InputPartitionReader[Row] {
@transient private var lines: Iterator[String] = _
@transient private var currentLine: String = _
@transient private var inputStream: FSDataInputStream = _
override def createDataReader(): DataReader[Row] = {
override def createPartitionReader(): InputPartitionReader[Row] = {
val filePath = new Path(path)
val fs = filePath.getFileSystem(conf.value)
inputStream = fs.open(filePath)

View file

@ -35,7 +35,7 @@ import org.apache.spark.sql.execution.streaming._
import org.apache.spark.sql.execution.streaming.sources.TestForeachWriter
import org.apache.spark.sql.functions._
import org.apache.spark.sql.internal.SQLConf
import org.apache.spark.sql.sources.v2.reader.DataReaderFactory
import org.apache.spark.sql.sources.v2.reader.InputPartition
import org.apache.spark.sql.sources.v2.reader.streaming.{Offset => OffsetV2}
import org.apache.spark.sql.streaming.util.{BlockingSource, MockSourceProvider, StreamManualClock}
import org.apache.spark.sql.types.StructType
@ -227,10 +227,10 @@ class StreamingQuerySuite extends StreamTest with BeforeAndAfter with Logging wi
}
// getBatch should take 100 ms the first time it is called
override def createUnsafeRowReaderFactories(): ju.List[DataReaderFactory[UnsafeRow]] = {
override def planUnsafeInputPartitions(): ju.List[InputPartition[UnsafeRow]] = {
synchronized {
clock.waitTillTime(1350)
super.createUnsafeRowReaderFactories()
super.planUnsafeInputPartitions()
}
}
}
@ -290,13 +290,14 @@ class StreamingQuerySuite extends StreamTest with BeforeAndAfter with Logging wi
AdvanceManualClock(100), // time = 1150 to unblock getEndOffset
AssertClockTime(1150),
AssertStreamExecThreadIsWaitingForTime(1350), // will block on createReadTasks that needs 1350
// will block on planInputPartitions that needs 1350
AssertStreamExecThreadIsWaitingForTime(1350),
AssertOnQuery(_.status.isDataAvailable === true),
AssertOnQuery(_.status.isTriggerActive === true),
AssertOnQuery(_.status.message === "Processing new data"),
AssertOnQuery(_.recentProgress.count(_.numInputRows > 0) === 0),
AdvanceManualClock(200), // time = 1350 to unblock createReadTasks
AdvanceManualClock(200), // time = 1350 to unblock planInputPartitions
AssertClockTime(1350),
AssertStreamExecThreadIsWaitingForTime(1500), // will block on map task that needs 1500
AssertOnQuery(_.status.isDataAvailable === true),

View file

@ -27,8 +27,8 @@ import org.apache.spark.{SparkEnv, SparkFunSuite, TaskContext}
import org.apache.spark.rpc.{RpcEndpointRef, RpcEnv}
import org.apache.spark.sql.catalyst.expressions.{GenericInternalRow, UnsafeProjection, UnsafeRow}
import org.apache.spark.sql.execution.streaming.continuous._
import org.apache.spark.sql.sources.v2.reader.DataReaderFactory
import org.apache.spark.sql.sources.v2.reader.streaming.{ContinuousDataReader, ContinuousReader, PartitionOffset}
import org.apache.spark.sql.sources.v2.reader.InputPartition
import org.apache.spark.sql.sources.v2.reader.streaming.{ContinuousInputPartitionReader, ContinuousReader, PartitionOffset}
import org.apache.spark.sql.sources.v2.writer.streaming.StreamWriter
import org.apache.spark.sql.streaming.StreamTest
import org.apache.spark.sql.types.{DataType, IntegerType}
@ -72,8 +72,8 @@ class ContinuousQueuedDataReaderSuite extends StreamTest with MockitoSugar {
*/
private def setup(): (BlockingQueue[UnsafeRow], ContinuousQueuedDataReader) = {
val queue = new ArrayBlockingQueue[UnsafeRow](1024)
val factory = new DataReaderFactory[UnsafeRow] {
override def createDataReader() = new ContinuousDataReader[UnsafeRow] {
val factory = new InputPartition[UnsafeRow] {
override def createPartitionReader() = new ContinuousInputPartitionReader[UnsafeRow] {
var index = -1
var curr: UnsafeRow = _

View file

@ -26,7 +26,7 @@ import org.apache.spark.sql.execution.streaming.continuous.ContinuousTrigger
import org.apache.spark.sql.internal.SQLConf
import org.apache.spark.sql.sources.{DataSourceRegister, StreamSinkProvider}
import org.apache.spark.sql.sources.v2.{ContinuousReadSupport, DataSourceOptions, MicroBatchReadSupport, StreamWriteSupport}
import org.apache.spark.sql.sources.v2.reader.DataReaderFactory
import org.apache.spark.sql.sources.v2.reader.InputPartition
import org.apache.spark.sql.sources.v2.reader.streaming.{ContinuousReader, MicroBatchReader, Offset, PartitionOffset}
import org.apache.spark.sql.sources.v2.writer.streaming.StreamWriter
import org.apache.spark.sql.streaming.{OutputMode, StreamTest, Trigger}
@ -44,7 +44,7 @@ case class FakeReader() extends MicroBatchReader with ContinuousReader {
def mergeOffsets(offsets: Array[PartitionOffset]): Offset = RateStreamOffset(Map())
def setStartOffset(start: Optional[Offset]): Unit = {}
def createDataReaderFactories(): java.util.ArrayList[DataReaderFactory[Row]] = {
def planInputPartitions(): java.util.ArrayList[InputPartition[Row]] = {
throw new IllegalStateException("fake source - cannot actually read")
}
}