[SPARK-23260][SPARK-23262][SQL] several data source v2 naming cleanup

## What changes were proposed in this pull request?

All other classes in the reader/writer package doesn't have `V2` in their names, and the streaming reader/writer don't have `V2` either. It's more consistent to remove `V2` from `DataSourceV2Reader` and `DataSourceVWriter`.

Also rename `DataSourceV2Option` to remote the `V2`, we should only have `V2` in the root interface: `DataSourceV2`.

This PR also fixes some places that the mix-in interface doesn't extend the interface it aimed to mix in.

## How was this patch tested?

existing tests.

Author: Wenchen Fan <wenchen@databricks.com>

Closes #20427 from cloud-fan/ds-v2.
This commit is contained in:
Wenchen Fan 2018-01-30 19:43:17 +08:00
parent 5056877e8b
commit 0a9ac0248b
55 changed files with 176 additions and 176 deletions

View file

@ -41,7 +41,7 @@ import org.apache.spark.unsafe.types.UTF8String
* @param offsetReader a reader used to get kafka offsets. Note that the actual data will be
* read by per-task consumers generated later.
* @param kafkaParams String params for per-task Kafka consumers.
* @param sourceOptions The [[org.apache.spark.sql.sources.v2.DataSourceV2Options]] params which
* @param sourceOptions The [[org.apache.spark.sql.sources.v2.DataSourceOptions]] params which
* are not Kafka consumer params.
* @param metadataPath Path to a directory this reader can use for writing metadata.
* @param initialOffsets The Kafka offsets to start reading data at.

View file

@ -30,7 +30,7 @@ import org.apache.spark.internal.Logging
import org.apache.spark.sql.{AnalysisException, DataFrame, SaveMode, SparkSession, SQLContext}
import org.apache.spark.sql.execution.streaming.{Sink, Source}
import org.apache.spark.sql.sources._
import org.apache.spark.sql.sources.v2.DataSourceV2Options
import org.apache.spark.sql.sources.v2.DataSourceOptions
import org.apache.spark.sql.sources.v2.streaming.{ContinuousReadSupport, StreamWriteSupport}
import org.apache.spark.sql.sources.v2.streaming.writer.StreamWriter
import org.apache.spark.sql.streaming.OutputMode
@ -109,7 +109,7 @@ private[kafka010] class KafkaSourceProvider extends DataSourceRegister
override def createContinuousReader(
schema: Optional[StructType],
metadataPath: String,
options: DataSourceV2Options): KafkaContinuousReader = {
options: DataSourceOptions): KafkaContinuousReader = {
val parameters = options.asMap().asScala.toMap
validateStreamOptions(parameters)
// Each running query should use its own group id. Otherwise, the query may be only assigned
@ -227,7 +227,7 @@ private[kafka010] class KafkaSourceProvider extends DataSourceRegister
queryId: String,
schema: StructType,
mode: OutputMode,
options: DataSourceV2Options): StreamWriter = {
options: DataSourceOptions): StreamWriter = {
import scala.collection.JavaConverters._
val spark = SparkSession.getActiveSession.get

View file

@ -29,18 +29,18 @@ import org.apache.spark.annotation.InterfaceStability;
* data source options.
*/
@InterfaceStability.Evolving
public class DataSourceV2Options {
public class DataSourceOptions {
private final Map<String, String> keyLowerCasedMap;
private String toLowerCase(String key) {
return key.toLowerCase(Locale.ROOT);
}
public static DataSourceV2Options empty() {
return new DataSourceV2Options(new HashMap<>());
public static DataSourceOptions empty() {
return new DataSourceOptions(new HashMap<>());
}
public DataSourceV2Options(Map<String, String> originalMap) {
public DataSourceOptions(Map<String, String> originalMap) {
keyLowerCasedMap = new HashMap<>(originalMap.size());
for (Map.Entry<String, String> entry : originalMap.entrySet()) {
keyLowerCasedMap.put(toLowerCase(entry.getKey()), entry.getValue());

View file

@ -18,17 +18,17 @@
package org.apache.spark.sql.sources.v2;
import org.apache.spark.annotation.InterfaceStability;
import org.apache.spark.sql.sources.v2.reader.DataSourceV2Reader;
import org.apache.spark.sql.sources.v2.reader.DataSourceReader;
/**
* A mix-in interface for {@link DataSourceV2}. Data sources can implement this interface to
* provide data reading ability and scan the data from the data source.
*/
@InterfaceStability.Evolving
public interface ReadSupport {
public interface ReadSupport extends DataSourceV2 {
/**
* Creates a {@link DataSourceV2Reader} to scan the data from this data source.
* Creates a {@link DataSourceReader} to scan the data from this data source.
*
* If this method fails (by throwing an exception), the action would fail and no Spark job was
* submitted.
@ -36,5 +36,5 @@ public interface ReadSupport {
* @param options the options for the returned data source reader, which is an immutable
* case-insensitive string-to-string map.
*/
DataSourceV2Reader createReader(DataSourceV2Options options);
DataSourceReader createReader(DataSourceOptions options);
}

View file

@ -18,7 +18,7 @@
package org.apache.spark.sql.sources.v2;
import org.apache.spark.annotation.InterfaceStability;
import org.apache.spark.sql.sources.v2.reader.DataSourceV2Reader;
import org.apache.spark.sql.sources.v2.reader.DataSourceReader;
import org.apache.spark.sql.types.StructType;
/**
@ -30,10 +30,10 @@ import org.apache.spark.sql.types.StructType;
* supports both schema inference and user-specified schema.
*/
@InterfaceStability.Evolving
public interface ReadSupportWithSchema {
public interface ReadSupportWithSchema extends DataSourceV2 {
/**
* Create a {@link DataSourceV2Reader} to scan the data from this data source.
* Create a {@link DataSourceReader} to scan the data from this data source.
*
* If this method fails (by throwing an exception), the action would fail and no Spark job was
* submitted.
@ -45,5 +45,5 @@ public interface ReadSupportWithSchema {
* @param options the options for the returned data source reader, which is an immutable
* case-insensitive string-to-string map.
*/
DataSourceV2Reader createReader(StructType schema, DataSourceV2Options options);
DataSourceReader createReader(StructType schema, DataSourceOptions options);
}

View file

@ -25,7 +25,7 @@ import org.apache.spark.annotation.InterfaceStability;
* session.
*/
@InterfaceStability.Evolving
public interface SessionConfigSupport {
public interface SessionConfigSupport extends DataSourceV2 {
/**
* Key prefix of the session configs to propagate. Spark will extract all session configs that

View file

@ -21,7 +21,7 @@ import java.util.Optional;
import org.apache.spark.annotation.InterfaceStability;
import org.apache.spark.sql.SaveMode;
import org.apache.spark.sql.sources.v2.writer.DataSourceV2Writer;
import org.apache.spark.sql.sources.v2.writer.DataSourceWriter;
import org.apache.spark.sql.types.StructType;
/**
@ -29,17 +29,17 @@ import org.apache.spark.sql.types.StructType;
* provide data writing ability and save the data to the data source.
*/
@InterfaceStability.Evolving
public interface WriteSupport {
public interface WriteSupport extends DataSourceV2 {
/**
* Creates an optional {@link DataSourceV2Writer} to save the data to this data source. Data
* Creates an optional {@link DataSourceWriter} to save the data to this data source. Data
* sources can return None if there is no writing needed to be done according to the save mode.
*
* If this method fails (by throwing an exception), the action would fail and no Spark job was
* submitted.
*
* @param jobId A unique string for the writing job. It's possible that there are many writing
* jobs running at the same time, and the returned {@link DataSourceV2Writer} can
* jobs running at the same time, and the returned {@link DataSourceWriter} can
* use this job id to distinguish itself from other jobs.
* @param schema the schema of the data to be written.
* @param mode the save mode which determines what to do when the data are already in this data
@ -47,6 +47,6 @@ public interface WriteSupport {
* @param options the options for the returned data source writer, which is an immutable
* case-insensitive string-to-string map.
*/
Optional<DataSourceV2Writer> createWriter(
String jobId, StructType schema, SaveMode mode, DataSourceV2Options options);
Optional<DataSourceWriter> createWriter(
String jobId, StructType schema, SaveMode mode, DataSourceOptions options);
}

View file

@ -22,7 +22,7 @@ import java.io.Serializable;
import org.apache.spark.annotation.InterfaceStability;
/**
* A reader factory returned by {@link DataSourceV2Reader#createDataReaderFactories()} and is
* A reader factory returned by {@link DataSourceReader#createDataReaderFactories()} and is
* responsible for creating the actual data reader. The relationship between
* {@link DataReaderFactory} and {@link DataReader}
* is similar to the relationship between {@link Iterable} and {@link java.util.Iterator}.

View file

@ -21,14 +21,15 @@ import java.util.List;
import org.apache.spark.annotation.InterfaceStability;
import org.apache.spark.sql.Row;
import org.apache.spark.sql.sources.v2.DataSourceOptions;
import org.apache.spark.sql.sources.v2.ReadSupport;
import org.apache.spark.sql.sources.v2.ReadSupportWithSchema;
import org.apache.spark.sql.types.StructType;
/**
* A data source reader that is returned by
* {@link org.apache.spark.sql.sources.v2.ReadSupport#createReader(
* org.apache.spark.sql.sources.v2.DataSourceV2Options)} or
* {@link org.apache.spark.sql.sources.v2.ReadSupportWithSchema#createReader(
* StructType, org.apache.spark.sql.sources.v2.DataSourceV2Options)}.
* {@link ReadSupport#createReader(DataSourceOptions)} or
* {@link ReadSupportWithSchema#createReader(StructType, DataSourceOptions)}.
* It can mix in various query optimization interfaces to speed up the data scan. The actual scan
* logic is delegated to {@link DataReaderFactory}s that are returned by
* {@link #createDataReaderFactories()}.
@ -52,7 +53,7 @@ import org.apache.spark.sql.types.StructType;
* issues the scan request and does the actual data reading.
*/
@InterfaceStability.Evolving
public interface DataSourceV2Reader {
public interface DataSourceReader {
/**
* Returns the actual schema of this data source reader, which may be different from the physical

View file

@ -21,7 +21,7 @@ import org.apache.spark.annotation.InterfaceStability;
import org.apache.spark.sql.catalyst.expressions.Expression;
/**
* A mix-in interface for {@link DataSourceV2Reader}. Data source readers can implement this
* A mix-in interface for {@link DataSourceReader}. Data source readers can implement this
* interface to push down arbitrary expressions as predicates to the data source.
* This is an experimental and unstable interface as {@link Expression} is not public and may get
* changed in the future Spark versions.
@ -31,7 +31,7 @@ import org.apache.spark.sql.catalyst.expressions.Expression;
* process this interface.
*/
@InterfaceStability.Unstable
public interface SupportsPushDownCatalystFilters {
public interface SupportsPushDownCatalystFilters extends DataSourceReader {
/**
* Pushes down filters, and returns unsupported filters.

View file

@ -21,7 +21,7 @@ import org.apache.spark.annotation.InterfaceStability;
import org.apache.spark.sql.sources.Filter;
/**
* A mix-in interface for {@link DataSourceV2Reader}. Data source readers can implement this
* A mix-in interface for {@link DataSourceReader}. Data source readers can implement this
* interface to push down filters to the data source and reduce the size of the data to be read.
*
* Note that, if data source readers implement both this interface and
@ -29,7 +29,7 @@ import org.apache.spark.sql.sources.Filter;
* {@link SupportsPushDownCatalystFilters}.
*/
@InterfaceStability.Evolving
public interface SupportsPushDownFilters {
public interface SupportsPushDownFilters extends DataSourceReader {
/**
* Pushes down filters, and returns unsupported filters.

View file

@ -21,12 +21,12 @@ import org.apache.spark.annotation.InterfaceStability;
import org.apache.spark.sql.types.StructType;
/**
* A mix-in interface for {@link DataSourceV2Reader}. Data source readers can implement this
* A mix-in interface for {@link DataSourceReader}. Data source readers can implement this
* interface to push down required columns to the data source and only read these columns during
* scan to reduce the size of the data to be read.
*/
@InterfaceStability.Evolving
public interface SupportsPushDownRequiredColumns {
public interface SupportsPushDownRequiredColumns extends DataSourceReader {
/**
* Applies column pruning w.r.t. the given requiredSchema.
@ -35,7 +35,7 @@ public interface SupportsPushDownRequiredColumns {
* also OK to do the pruning partially, e.g., a data source may not be able to prune nested
* fields, and only prune top-level columns.
*
* Note that, data source readers should update {@link DataSourceV2Reader#readSchema()} after
* Note that, data source readers should update {@link DataSourceReader#readSchema()} after
* applying column pruning.
*/
void pruneColumns(StructType requiredSchema);

View file

@ -20,11 +20,11 @@ package org.apache.spark.sql.sources.v2.reader;
import org.apache.spark.annotation.InterfaceStability;
/**
* A mix in interface for {@link DataSourceV2Reader}. Data source readers can implement this
* A mix in interface for {@link DataSourceReader}. Data source readers can implement this
* interface to report data partitioning and try to avoid shuffle at Spark side.
*/
@InterfaceStability.Evolving
public interface SupportsReportPartitioning {
public interface SupportsReportPartitioning extends DataSourceReader {
/**
* Returns the output data partitioning that this reader guarantees.

View file

@ -20,11 +20,11 @@ package org.apache.spark.sql.sources.v2.reader;
import org.apache.spark.annotation.InterfaceStability;
/**
* A mix in interface for {@link DataSourceV2Reader}. Data source readers can implement this
* A mix in interface for {@link DataSourceReader}. Data source readers can implement this
* interface to report statistics to Spark.
*/
@InterfaceStability.Evolving
public interface SupportsReportStatistics {
public interface SupportsReportStatistics extends DataSourceReader {
/**
* Returns the basic statistics of this data source.

View file

@ -24,11 +24,11 @@ import org.apache.spark.sql.Row;
import org.apache.spark.sql.vectorized.ColumnarBatch;
/**
* A mix-in interface for {@link DataSourceV2Reader}. Data source readers can implement this
* A mix-in interface for {@link DataSourceReader}. Data source readers can implement this
* interface to output {@link ColumnarBatch} and make the scan faster.
*/
@InterfaceStability.Evolving
public interface SupportsScanColumnarBatch extends DataSourceV2Reader {
public interface SupportsScanColumnarBatch extends DataSourceReader {
@Override
default List<DataReaderFactory<Row>> createDataReaderFactories() {
throw new IllegalStateException(
@ -36,7 +36,7 @@ public interface SupportsScanColumnarBatch extends DataSourceV2Reader {
}
/**
* Similar to {@link DataSourceV2Reader#createDataReaderFactories()}, but returns columnar data
* Similar to {@link DataSourceReader#createDataReaderFactories()}, but returns columnar data
* in batches.
*/
List<DataReaderFactory<ColumnarBatch>> createBatchDataReaderFactories();

View file

@ -24,13 +24,13 @@ import org.apache.spark.sql.Row;
import org.apache.spark.sql.catalyst.expressions.UnsafeRow;
/**
* A mix-in interface for {@link DataSourceV2Reader}. Data source readers can implement this
* A mix-in interface for {@link DataSourceReader}. Data source readers can implement this
* interface to output {@link UnsafeRow} directly and avoid the row copy at Spark side.
* This is an experimental and unstable interface, as {@link UnsafeRow} is not public and may get
* changed in the future Spark versions.
*/
@InterfaceStability.Unstable
public interface SupportsScanUnsafeRow extends DataSourceV2Reader {
public interface SupportsScanUnsafeRow extends DataSourceReader {
@Override
default List<DataReaderFactory<Row>> createDataReaderFactories() {
@ -39,7 +39,7 @@ public interface SupportsScanUnsafeRow extends DataSourceV2Reader {
}
/**
* Similar to {@link DataSourceV2Reader#createDataReaderFactories()},
* Similar to {@link DataSourceReader#createDataReaderFactories()},
* but returns data in unsafe row format.
*/
List<DataReaderFactory<UnsafeRow>> createUnsafeRowReaderFactories();

View file

@ -21,7 +21,7 @@ import java.util.Optional;
import org.apache.spark.annotation.InterfaceStability;
import org.apache.spark.sql.sources.v2.DataSourceV2;
import org.apache.spark.sql.sources.v2.DataSourceV2Options;
import org.apache.spark.sql.sources.v2.DataSourceOptions;
import org.apache.spark.sql.sources.v2.streaming.reader.ContinuousReader;
import org.apache.spark.sql.types.StructType;
@ -44,5 +44,5 @@ public interface ContinuousReadSupport extends DataSourceV2 {
ContinuousReader createContinuousReader(
Optional<StructType> schema,
String checkpointLocation,
DataSourceV2Options options);
DataSourceOptions options);
}

View file

@ -20,8 +20,8 @@ package org.apache.spark.sql.sources.v2.streaming;
import java.util.Optional;
import org.apache.spark.annotation.InterfaceStability;
import org.apache.spark.sql.sources.v2.DataSourceOptions;
import org.apache.spark.sql.sources.v2.DataSourceV2;
import org.apache.spark.sql.sources.v2.DataSourceV2Options;
import org.apache.spark.sql.sources.v2.streaming.reader.MicroBatchReader;
import org.apache.spark.sql.types.StructType;
@ -50,5 +50,5 @@ public interface MicroBatchReadSupport extends DataSourceV2 {
MicroBatchReader createMicroBatchReader(
Optional<StructType> schema,
String checkpointLocation,
DataSourceV2Options options);
DataSourceOptions options);
}

View file

@ -19,10 +19,10 @@ package org.apache.spark.sql.sources.v2.streaming;
import org.apache.spark.annotation.InterfaceStability;
import org.apache.spark.sql.execution.streaming.BaseStreamingSink;
import org.apache.spark.sql.sources.v2.DataSourceOptions;
import org.apache.spark.sql.sources.v2.DataSourceV2;
import org.apache.spark.sql.sources.v2.DataSourceV2Options;
import org.apache.spark.sql.sources.v2.streaming.writer.StreamWriter;
import org.apache.spark.sql.sources.v2.writer.DataSourceV2Writer;
import org.apache.spark.sql.sources.v2.writer.DataSourceWriter;
import org.apache.spark.sql.streaming.OutputMode;
import org.apache.spark.sql.types.StructType;
@ -31,7 +31,7 @@ import org.apache.spark.sql.types.StructType;
* provide data writing ability for structured streaming.
*/
@InterfaceStability.Evolving
public interface StreamWriteSupport extends BaseStreamingSink {
public interface StreamWriteSupport extends DataSourceV2, BaseStreamingSink {
/**
* Creates an optional {@link StreamWriter} to save the data to this data source. Data
@ -39,7 +39,7 @@ public interface StreamWriteSupport extends BaseStreamingSink {
*
* @param queryId A unique string for the writing query. It's possible that there are many
* writing queries running at the same time, and the returned
* {@link DataSourceV2Writer} can use this id to distinguish itself from others.
* {@link DataSourceWriter} can use this id to distinguish itself from others.
* @param schema the schema of the data to be written.
* @param mode the output mode which determines what successive epoch output means to this
* sink, please refer to {@link OutputMode} for more details.
@ -50,5 +50,5 @@ public interface StreamWriteSupport extends BaseStreamingSink {
String queryId,
StructType schema,
OutputMode mode,
DataSourceV2Options options);
DataSourceOptions options);
}

View file

@ -19,12 +19,12 @@ package org.apache.spark.sql.sources.v2.streaming.reader;
import org.apache.spark.annotation.InterfaceStability;
import org.apache.spark.sql.execution.streaming.BaseStreamingSource;
import org.apache.spark.sql.sources.v2.reader.DataSourceV2Reader;
import org.apache.spark.sql.sources.v2.reader.DataSourceReader;
import java.util.Optional;
/**
* A mix-in interface for {@link DataSourceV2Reader}. Data source readers can implement this
* A mix-in interface for {@link DataSourceReader}. Data source readers can implement this
* interface to allow reading in a continuous processing mode stream.
*
* Implementations must ensure each reader factory output is a {@link ContinuousDataReader}.
@ -33,7 +33,7 @@ import java.util.Optional;
* DataSource V1 APIs. This extension will be removed once we get rid of V1 completely.
*/
@InterfaceStability.Evolving
public interface ContinuousReader extends BaseStreamingSource, DataSourceV2Reader {
public interface ContinuousReader extends BaseStreamingSource, DataSourceReader {
/**
* Merge partitioned offsets coming from {@link ContinuousDataReader} instances for each
* partition to a single global offset.

View file

@ -18,20 +18,20 @@
package org.apache.spark.sql.sources.v2.streaming.reader;
import org.apache.spark.annotation.InterfaceStability;
import org.apache.spark.sql.sources.v2.reader.DataSourceV2Reader;
import org.apache.spark.sql.sources.v2.reader.DataSourceReader;
import org.apache.spark.sql.execution.streaming.BaseStreamingSource;
import java.util.Optional;
/**
* A mix-in interface for {@link DataSourceV2Reader}. Data source readers can implement this
* A mix-in interface for {@link DataSourceReader}. Data source readers can implement this
* interface to indicate they allow micro-batch streaming reads.
*
* Note: This class currently extends {@link BaseStreamingSource} to maintain compatibility with
* DataSource V1 APIs. This extension will be removed once we get rid of V1 completely.
*/
@InterfaceStability.Evolving
public interface MicroBatchReader extends DataSourceV2Reader, BaseStreamingSource {
public interface MicroBatchReader extends DataSourceReader, BaseStreamingSource {
/**
* Set the desired offset range for reader factories created from this reader. Reader factories
* will generate only data within (`start`, `end`]; that is, from the first record after `start`

View file

@ -18,19 +18,19 @@
package org.apache.spark.sql.sources.v2.streaming.writer;
import org.apache.spark.annotation.InterfaceStability;
import org.apache.spark.sql.sources.v2.writer.DataSourceV2Writer;
import org.apache.spark.sql.sources.v2.writer.DataSourceWriter;
import org.apache.spark.sql.sources.v2.writer.DataWriter;
import org.apache.spark.sql.sources.v2.writer.WriterCommitMessage;
/**
* A {@link DataSourceV2Writer} for use with structured streaming. This writer handles commits and
* A {@link DataSourceWriter} for use with structured streaming. This writer handles commits and
* aborts relative to an epoch ID determined by the execution engine.
*
* {@link DataWriter} implementations generated by a StreamWriter may be reused for multiple epochs,
* and so must reset any internal state after a successful commit.
*/
@InterfaceStability.Evolving
public interface StreamWriter extends DataSourceV2Writer {
public interface StreamWriter extends DataSourceWriter {
/**
* Commits this writing job for the specified epoch with a list of commit messages. The commit
* messages are collected from successful data writers and are produced by

View file

@ -20,16 +20,16 @@ package org.apache.spark.sql.sources.v2.writer;
import org.apache.spark.annotation.InterfaceStability;
import org.apache.spark.sql.Row;
import org.apache.spark.sql.SaveMode;
import org.apache.spark.sql.sources.v2.DataSourceV2Options;
import org.apache.spark.sql.sources.v2.DataSourceOptions;
import org.apache.spark.sql.sources.v2.WriteSupport;
import org.apache.spark.sql.streaming.OutputMode;
import org.apache.spark.sql.types.StructType;
/**
* A data source writer that is returned by
* {@link WriteSupport#createWriter(String, StructType, SaveMode, DataSourceV2Options)}/
* {@link WriteSupport#createWriter(String, StructType, SaveMode, DataSourceOptions)}/
* {@link org.apache.spark.sql.sources.v2.streaming.StreamWriteSupport#createStreamWriter(
* String, StructType, OutputMode, DataSourceV2Options)}.
* String, StructType, OutputMode, DataSourceOptions)}.
* It can mix in various writing optimization interfaces to speed up the data saving. The actual
* writing logic is delegated to {@link DataWriter}.
*
@ -52,7 +52,7 @@ import org.apache.spark.sql.types.StructType;
* Please refer to the documentation of commit/abort methods for detailed specifications.
*/
@InterfaceStability.Evolving
public interface DataSourceV2Writer {
public interface DataSourceWriter {
/**
* Creates a writer factory which will be serialized and sent to executors.

View file

@ -33,11 +33,11 @@ import org.apache.spark.annotation.InterfaceStability;
*
* If this data writer succeeds(all records are successfully written and {@link #commit()}
* succeeds), a {@link WriterCommitMessage} will be sent to the driver side and pass to
* {@link DataSourceV2Writer#commit(WriterCommitMessage[])} with commit messages from other data
* {@link DataSourceWriter#commit(WriterCommitMessage[])} with commit messages from other data
* writers. If this data writer fails(one record fails to write or {@link #commit()} fails), an
* exception will be sent to the driver side, and Spark will retry this writing task for some times,
* each time {@link DataWriterFactory#createDataWriter(int, int)} gets a different `attemptNumber`,
* and finally call {@link DataSourceV2Writer#abort(WriterCommitMessage[])} if all retry fail.
* and finally call {@link DataSourceWriter#abort(WriterCommitMessage[])} if all retry fail.
*
* Besides the retry mechanism, Spark may launch speculative tasks if the existing writing task
* takes too long to finish. Different from retried tasks, which are launched one by one after the
@ -69,11 +69,11 @@ public interface DataWriter<T> {
/**
* Commits this writer after all records are written successfully, returns a commit message which
* will be sent back to driver side and passed to
* {@link DataSourceV2Writer#commit(WriterCommitMessage[])}.
* {@link DataSourceWriter#commit(WriterCommitMessage[])}.
*
* The written data should only be visible to data source readers after
* {@link DataSourceV2Writer#commit(WriterCommitMessage[])} succeeds, which means this method
* should still "hide" the written data and ask the {@link DataSourceV2Writer} at driver side to
* {@link DataSourceWriter#commit(WriterCommitMessage[])} succeeds, which means this method
* should still "hide" the written data and ask the {@link DataSourceWriter} at driver side to
* do the final commit via {@link WriterCommitMessage}.
*
* If this method fails (by throwing an exception), {@link #abort()} will be called and this
@ -91,7 +91,7 @@ public interface DataWriter<T> {
* failed.
*
* If this method fails(by throwing an exception), the underlying data source may have garbage
* that need to be cleaned by {@link DataSourceV2Writer#abort(WriterCommitMessage[])} or manually,
* that need to be cleaned by {@link DataSourceWriter#abort(WriterCommitMessage[])} or manually,
* but these garbage should not be visible to data source readers.
*
* @throws IOException if failure happens during disk/network IO like writing files.

View file

@ -22,7 +22,7 @@ import java.io.Serializable;
import org.apache.spark.annotation.InterfaceStability;
/**
* A factory of {@link DataWriter} returned by {@link DataSourceV2Writer#createWriterFactory()},
* A factory of {@link DataWriter} returned by {@link DataSourceWriter#createWriterFactory()},
* which is responsible for creating and initializing the actual data writer at executor side.
*
* Note that, the writer factory will be serialized and sent to executors, then the data writer

View file

@ -22,14 +22,14 @@ import org.apache.spark.sql.Row;
import org.apache.spark.sql.catalyst.InternalRow;
/**
* A mix-in interface for {@link DataSourceV2Writer}. Data source writers can implement this
* A mix-in interface for {@link DataSourceWriter}. Data source writers can implement this
* interface to write {@link InternalRow} directly and avoid the row conversion at Spark side.
* This is an experimental and unstable interface, as {@link InternalRow} is not public and may get
* changed in the future Spark versions.
*/
@InterfaceStability.Unstable
public interface SupportsWriteInternalRow extends DataSourceV2Writer {
public interface SupportsWriteInternalRow extends DataSourceWriter {
@Override
default DataWriterFactory<Row> createWriterFactory() {

View file

@ -23,10 +23,10 @@ import org.apache.spark.annotation.InterfaceStability;
/**
* A commit message returned by {@link DataWriter#commit()} and will be sent back to the driver side
* as the input parameter of {@link DataSourceV2Writer#commit(WriterCommitMessage[])}.
* as the input parameter of {@link DataSourceWriter#commit(WriterCommitMessage[])}.
*
* This is an empty interface, data sources should define their own message class and use it in
* their {@link DataWriter#commit()} and {@link DataSourceV2Writer#commit(WriterCommitMessage[])}
* their {@link DataWriter#commit()} and {@link DataSourceWriter#commit(WriterCommitMessage[])}
* implementations.
*/
@InterfaceStability.Evolving

View file

@ -186,7 +186,7 @@ class DataFrameReader private[sql](sparkSession: SparkSession) extends Logging {
val cls = DataSource.lookupDataSource(source, sparkSession.sessionState.conf)
if (classOf[DataSourceV2].isAssignableFrom(cls)) {
val ds = cls.newInstance()
val options = new DataSourceV2Options((extraOptions ++
val options = new DataSourceOptions((extraOptions ++
DataSourceV2Utils.extractSessionConfigs(
ds = ds.asInstanceOf[DataSourceV2],
conf = sparkSession.sessionState.conf)).asJava)

View file

@ -243,7 +243,7 @@ final class DataFrameWriter[T] private[sql](ds: Dataset[T]) {
val ds = cls.newInstance()
ds match {
case ws: WriteSupport =>
val options = new DataSourceV2Options((extraOptions ++
val options = new DataSourceOptions((extraOptions ++
DataSourceV2Utils.extractSessionConfigs(
ds = ds.asInstanceOf[DataSourceV2],
conf = df.sparkSession.sessionState.conf)).asJava)

View file

@ -35,7 +35,7 @@ trait DataSourceReaderHolder {
/**
* The held data source reader.
*/
def reader: DataSourceV2Reader
def reader: DataSourceReader
/**
* The metadata of this data source reader that can be used for equality test.

View file

@ -23,7 +23,7 @@ import org.apache.spark.sql.sources.v2.reader._
case class DataSourceV2Relation(
fullOutput: Seq[AttributeReference],
reader: DataSourceV2Reader) extends LeafNode with DataSourceReaderHolder {
reader: DataSourceReader) extends LeafNode with DataSourceReaderHolder {
override def canEqual(other: Any): Boolean = other.isInstanceOf[DataSourceV2Relation]
@ -41,12 +41,12 @@ case class DataSourceV2Relation(
*/
class StreamingDataSourceV2Relation(
fullOutput: Seq[AttributeReference],
reader: DataSourceV2Reader) extends DataSourceV2Relation(fullOutput, reader) {
reader: DataSourceReader) extends DataSourceV2Relation(fullOutput, reader) {
override def isStreaming: Boolean = true
}
object DataSourceV2Relation {
def apply(reader: DataSourceV2Reader): DataSourceV2Relation = {
def apply(reader: DataSourceReader): DataSourceV2Relation = {
new DataSourceV2Relation(reader.readSchema().toAttributes, reader)
}
}

View file

@ -36,7 +36,7 @@ import org.apache.spark.sql.types.StructType
*/
case class DataSourceV2ScanExec(
fullOutput: Seq[AttributeReference],
@transient reader: DataSourceV2Reader)
@transient reader: DataSourceReader)
extends LeafExecNode with DataSourceReaderHolder with ColumnarBatchScan {
override def canEqual(other: Any): Boolean = other.isInstanceOf[DataSourceV2ScanExec]

View file

@ -35,7 +35,7 @@ import org.apache.spark.util.Utils
/**
* The logical plan for writing data into data source v2.
*/
case class WriteToDataSourceV2(writer: DataSourceV2Writer, query: LogicalPlan) extends LogicalPlan {
case class WriteToDataSourceV2(writer: DataSourceWriter, query: LogicalPlan) extends LogicalPlan {
override def children: Seq[LogicalPlan] = Seq(query)
override def output: Seq[Attribute] = Nil
}
@ -43,7 +43,7 @@ case class WriteToDataSourceV2(writer: DataSourceV2Writer, query: LogicalPlan) e
/**
* The physical plan for writing data into data source v2.
*/
case class WriteToDataSourceV2Exec(writer: DataSourceV2Writer, query: SparkPlan) extends SparkPlan {
case class WriteToDataSourceV2Exec(writer: DataSourceWriter, query: SparkPlan) extends SparkPlan {
override def children: Seq[SparkPlan] = Seq(query)
override def output: Seq[Attribute] = Nil

View file

@ -29,7 +29,7 @@ import org.apache.spark.sql.catalyst.plans.logical.{LocalRelation, LogicalPlan}
import org.apache.spark.sql.execution.SQLExecution
import org.apache.spark.sql.execution.datasources.v2.{StreamingDataSourceV2Relation, WriteToDataSourceV2}
import org.apache.spark.sql.execution.streaming.sources.{InternalRowMicroBatchWriter, MicroBatchWriter}
import org.apache.spark.sql.sources.v2.DataSourceV2Options
import org.apache.spark.sql.sources.v2.DataSourceOptions
import org.apache.spark.sql.sources.v2.streaming.{MicroBatchReadSupport, StreamWriteSupport}
import org.apache.spark.sql.sources.v2.streaming.reader.{MicroBatchReader, Offset => OffsetV2}
import org.apache.spark.sql.sources.v2.writer.SupportsWriteInternalRow
@ -89,7 +89,7 @@ class MicroBatchExecution(
val reader = source.createMicroBatchReader(
Optional.empty(), // user specified schema
metadataPath,
new DataSourceV2Options(options.asJava))
new DataSourceOptions(options.asJava))
nextSourceId += 1
StreamingExecutionRelation(reader, output)(sparkSession)
})
@ -447,7 +447,7 @@ class MicroBatchExecution(
s"$runId",
newAttributePlan.schema,
outputMode,
new DataSourceV2Options(extraOptions.asJava))
new DataSourceOptions(extraOptions.asJava))
if (writer.isInstanceOf[SupportsWriteInternalRow]) {
WriteToDataSourceV2(
new InternalRowMicroBatchWriter(currentBatchId, writer), newAttributePlan)

View file

@ -111,7 +111,7 @@ class RateSourceProvider extends StreamSourceProvider with DataSourceRegister
override def createContinuousReader(
schema: Optional[StructType],
checkpointLocation: String,
options: DataSourceV2Options): ContinuousReader = {
options: DataSourceOptions): ContinuousReader = {
new RateStreamContinuousReader(options)
}

View file

@ -20,7 +20,7 @@ package org.apache.spark.sql.execution.streaming
import org.apache.spark.sql._
import org.apache.spark.sql.execution.streaming.sources.ConsoleWriter
import org.apache.spark.sql.sources.{BaseRelation, CreatableRelationProvider, DataSourceRegister}
import org.apache.spark.sql.sources.v2.{DataSourceV2, DataSourceV2Options}
import org.apache.spark.sql.sources.v2.{DataSourceOptions, DataSourceV2}
import org.apache.spark.sql.sources.v2.streaming.StreamWriteSupport
import org.apache.spark.sql.sources.v2.streaming.writer.StreamWriter
import org.apache.spark.sql.streaming.OutputMode
@ -40,7 +40,7 @@ class ConsoleSinkProvider extends DataSourceV2
queryId: String,
schema: StructType,
mode: OutputMode,
options: DataSourceV2Options): StreamWriter = {
options: DataSourceOptions): StreamWriter = {
new ConsoleWriter(schema, options)
}

View file

@ -31,7 +31,7 @@ import org.apache.spark.sql.catalyst.plans.logical.LogicalPlan
import org.apache.spark.sql.execution.SQLExecution
import org.apache.spark.sql.execution.datasources.v2.{DataSourceV2Relation, StreamingDataSourceV2Relation, WriteToDataSourceV2}
import org.apache.spark.sql.execution.streaming.{ContinuousExecutionRelation, StreamingRelationV2, _}
import org.apache.spark.sql.sources.v2.DataSourceV2Options
import org.apache.spark.sql.sources.v2.DataSourceOptions
import org.apache.spark.sql.sources.v2.streaming.{ContinuousReadSupport, StreamWriteSupport}
import org.apache.spark.sql.sources.v2.streaming.reader.{ContinuousReader, PartitionOffset}
import org.apache.spark.sql.streaming.{OutputMode, ProcessingTime, Trigger}
@ -160,7 +160,7 @@ class ContinuousExecution(
dataSource.createContinuousReader(
java.util.Optional.empty[StructType](),
metadataPath,
new DataSourceV2Options(extraReaderOptions.asJava))
new DataSourceOptions(extraReaderOptions.asJava))
}
uniqueSources = continuousSources.distinct
@ -198,7 +198,7 @@ class ContinuousExecution(
s"$runId",
triggerLogicalPlan.schema,
outputMode,
new DataSourceV2Options(extraOptions.asJava))
new DataSourceOptions(extraOptions.asJava))
val withSink = WriteToDataSourceV2(writer, triggerLogicalPlan)
val reader = withSink.collect {

View file

@ -23,19 +23,18 @@ import org.json4s.DefaultFormats
import org.json4s.jackson.Serialization
import org.apache.spark.sql.Row
import org.apache.spark.sql.catalyst.expressions.UnsafeRow
import org.apache.spark.sql.catalyst.util.DateTimeUtils
import org.apache.spark.sql.execution.streaming.{RateSourceProvider, RateStreamOffset, ValueRunTimeMsPair}
import org.apache.spark.sql.execution.streaming.sources.RateStreamSourceV2
import org.apache.spark.sql.sources.v2.{DataSourceV2, DataSourceV2Options}
import org.apache.spark.sql.sources.v2.DataSourceOptions
import org.apache.spark.sql.sources.v2.reader._
import org.apache.spark.sql.sources.v2.streaming.reader.{ContinuousDataReader, ContinuousReader, Offset, PartitionOffset}
import org.apache.spark.sql.types.{LongType, StructField, StructType, TimestampType}
import org.apache.spark.sql.types.StructType
case class RateStreamPartitionOffset(
partition: Int, currentValue: Long, currentTimeMs: Long) extends PartitionOffset
class RateStreamContinuousReader(options: DataSourceV2Options)
class RateStreamContinuousReader(options: DataSourceOptions)
extends ContinuousReader {
implicit val defaultFormats: DefaultFormats = DefaultFormats

View file

@ -19,13 +19,13 @@ package org.apache.spark.sql.execution.streaming.sources
import org.apache.spark.internal.Logging
import org.apache.spark.sql.{Row, SparkSession}
import org.apache.spark.sql.sources.v2.DataSourceV2Options
import org.apache.spark.sql.sources.v2.DataSourceOptions
import org.apache.spark.sql.sources.v2.streaming.writer.StreamWriter
import org.apache.spark.sql.sources.v2.writer.{DataWriterFactory, WriterCommitMessage}
import org.apache.spark.sql.types.StructType
/** Common methods used to create writes for the the console sink */
class ConsoleWriter(schema: StructType, options: DataSourceV2Options)
class ConsoleWriter(schema: StructType, options: DataSourceOptions)
extends StreamWriter with Logging {
// Number of rows to display, by default 20 rows

View file

@ -20,14 +20,14 @@ package org.apache.spark.sql.execution.streaming.sources
import org.apache.spark.sql.Row
import org.apache.spark.sql.catalyst.InternalRow
import org.apache.spark.sql.sources.v2.streaming.writer.StreamWriter
import org.apache.spark.sql.sources.v2.writer.{DataSourceV2Writer, DataWriterFactory, SupportsWriteInternalRow, WriterCommitMessage}
import org.apache.spark.sql.sources.v2.writer.{DataSourceWriter, DataWriterFactory, SupportsWriteInternalRow, WriterCommitMessage}
/**
* A [[DataSourceV2Writer]] used to hook V2 stream writers into a microbatch plan. It implements
* A [[DataSourceWriter]] used to hook V2 stream writers into a microbatch plan. It implements
* the non-streaming interface, forwarding the batch ID determined at construction to a wrapped
* streaming writer.
*/
class MicroBatchWriter(batchId: Long, writer: StreamWriter) extends DataSourceV2Writer {
class MicroBatchWriter(batchId: Long, writer: StreamWriter) extends DataSourceWriter {
override def commit(messages: Array[WriterCommitMessage]): Unit = {
writer.commit(batchId, messages)
}
@ -38,7 +38,7 @@ class MicroBatchWriter(batchId: Long, writer: StreamWriter) extends DataSourceV2
}
class InternalRowMicroBatchWriter(batchId: Long, writer: StreamWriter)
extends DataSourceV2Writer with SupportsWriteInternalRow {
extends DataSourceWriter with SupportsWriteInternalRow {
override def commit(messages: Array[WriterCommitMessage]): Unit = {
writer.commit(batchId, messages)
}

View file

@ -21,11 +21,11 @@ import scala.collection.mutable
import org.apache.spark.internal.Logging
import org.apache.spark.sql.Row
import org.apache.spark.sql.sources.v2.writer.{DataWriter, DataWriterFactory, WriterCommitMessage}
import org.apache.spark.sql.sources.v2.writer.{DataSourceWriter, DataWriter, DataWriterFactory, WriterCommitMessage}
/**
* A simple [[DataWriterFactory]] whose tasks just pack rows into the commit message for delivery
* to a [[org.apache.spark.sql.sources.v2.writer.DataSourceV2Writer]] on the driver.
* to a [[DataSourceWriter]] on the driver.
*
* Note that, because it sends all rows to the driver, this factory will generally be unsuitable
* for production-quality sinks. It's intended for use in tests.

View file

@ -29,7 +29,7 @@ import org.apache.spark.sql.Row
import org.apache.spark.sql.catalyst.util.DateTimeUtils
import org.apache.spark.sql.execution.streaming.{RateStreamOffset, ValueRunTimeMsPair}
import org.apache.spark.sql.sources.DataSourceRegister
import org.apache.spark.sql.sources.v2.{DataSourceV2, DataSourceV2Options}
import org.apache.spark.sql.sources.v2.{DataSourceOptions, DataSourceV2}
import org.apache.spark.sql.sources.v2.reader._
import org.apache.spark.sql.sources.v2.streaming.MicroBatchReadSupport
import org.apache.spark.sql.sources.v2.streaming.reader.{MicroBatchReader, Offset}
@ -44,14 +44,14 @@ class RateSourceProviderV2 extends DataSourceV2 with MicroBatchReadSupport with
override def createMicroBatchReader(
schema: Optional[StructType],
checkpointLocation: String,
options: DataSourceV2Options): MicroBatchReader = {
options: DataSourceOptions): MicroBatchReader = {
new RateStreamMicroBatchReader(options)
}
override def shortName(): String = "ratev2"
}
class RateStreamMicroBatchReader(options: DataSourceV2Options)
class RateStreamMicroBatchReader(options: DataSourceOptions)
extends MicroBatchReader {
implicit val defaultFormats: DefaultFormats = DefaultFormats

View file

@ -29,7 +29,7 @@ import org.apache.spark.sql.catalyst.expressions.Attribute
import org.apache.spark.sql.catalyst.plans.logical.{LeafNode, Statistics}
import org.apache.spark.sql.catalyst.streaming.InternalOutputModes.{Append, Complete, Update}
import org.apache.spark.sql.execution.streaming.Sink
import org.apache.spark.sql.sources.v2.{DataSourceV2, DataSourceV2Options}
import org.apache.spark.sql.sources.v2.{DataSourceOptions, DataSourceV2}
import org.apache.spark.sql.sources.v2.streaming.StreamWriteSupport
import org.apache.spark.sql.sources.v2.streaming.writer.StreamWriter
import org.apache.spark.sql.sources.v2.writer._
@ -45,7 +45,7 @@ class MemorySinkV2 extends DataSourceV2 with StreamWriteSupport with Logging {
queryId: String,
schema: StructType,
mode: OutputMode,
options: DataSourceV2Options): StreamWriter = {
options: DataSourceOptions): StreamWriter = {
new MemoryStreamWriter(this, mode)
}
@ -114,7 +114,7 @@ class MemorySinkV2 extends DataSourceV2 with StreamWriteSupport with Logging {
case class MemoryWriterCommitMessage(partition: Int, data: Seq[Row]) extends WriterCommitMessage {}
class MemoryWriter(sink: MemorySinkV2, batchId: Long, outputMode: OutputMode)
extends DataSourceV2Writer with Logging {
extends DataSourceWriter with Logging {
override def createWriterFactory: MemoryWriterFactory = MemoryWriterFactory(outputMode)

View file

@ -28,7 +28,7 @@ import org.apache.spark.sql.execution.command.DDLUtils
import org.apache.spark.sql.execution.datasources.DataSource
import org.apache.spark.sql.execution.streaming.{StreamingRelation, StreamingRelationV2}
import org.apache.spark.sql.sources.StreamSourceProvider
import org.apache.spark.sql.sources.v2.DataSourceV2Options
import org.apache.spark.sql.sources.v2.DataSourceOptions
import org.apache.spark.sql.sources.v2.streaming.{ContinuousReadSupport, MicroBatchReadSupport}
import org.apache.spark.sql.types.StructType
import org.apache.spark.util.Utils
@ -158,7 +158,7 @@ final class DataStreamReader private[sql](sparkSession: SparkSession) extends Lo
}
val ds = DataSource.lookupDataSource(source, sparkSession.sqlContext.conf).newInstance()
val options = new DataSourceV2Options(extraOptions.asJava)
val options = new DataSourceOptions(extraOptions.asJava)
// We need to generate the V1 data source so we can pass it to the V2 relation as a shim.
// We can't be sure at this point whether we'll actually want to use V2, since we don't know the
// writer or whether the query is continuous.

View file

@ -24,15 +24,15 @@ import org.apache.spark.sql.Row;
import org.apache.spark.sql.catalyst.expressions.GenericRow;
import org.apache.spark.sql.sources.Filter;
import org.apache.spark.sql.sources.GreaterThan;
import org.apache.spark.sql.sources.v2.DataSourceOptions;
import org.apache.spark.sql.sources.v2.DataSourceV2;
import org.apache.spark.sql.sources.v2.DataSourceV2Options;
import org.apache.spark.sql.sources.v2.ReadSupport;
import org.apache.spark.sql.sources.v2.reader.*;
import org.apache.spark.sql.types.StructType;
public class JavaAdvancedDataSourceV2 implements DataSourceV2, ReadSupport {
class Reader implements DataSourceV2Reader, SupportsPushDownRequiredColumns,
class Reader implements DataSourceReader, SupportsPushDownRequiredColumns,
SupportsPushDownFilters {
private StructType requiredSchema = new StructType().add("i", "int").add("j", "int");
@ -131,7 +131,7 @@ public class JavaAdvancedDataSourceV2 implements DataSourceV2, ReadSupport {
@Override
public DataSourceV2Reader createReader(DataSourceV2Options options) {
public DataSourceReader createReader(DataSourceOptions options) {
return new Reader();
}
}

View file

@ -21,8 +21,8 @@ import java.io.IOException;
import java.util.List;
import org.apache.spark.sql.execution.vectorized.OnHeapColumnVector;
import org.apache.spark.sql.sources.v2.DataSourceOptions;
import org.apache.spark.sql.sources.v2.DataSourceV2;
import org.apache.spark.sql.sources.v2.DataSourceV2Options;
import org.apache.spark.sql.sources.v2.ReadSupport;
import org.apache.spark.sql.sources.v2.reader.*;
import org.apache.spark.sql.types.DataTypes;
@ -33,7 +33,7 @@ import org.apache.spark.sql.vectorized.ColumnarBatch;
public class JavaBatchDataSourceV2 implements DataSourceV2, ReadSupport {
class Reader implements DataSourceV2Reader, SupportsScanColumnarBatch {
class Reader implements DataSourceReader, SupportsScanColumnarBatch {
private final StructType schema = new StructType().add("i", "int").add("j", "int");
@Override
@ -108,7 +108,7 @@ public class JavaBatchDataSourceV2 implements DataSourceV2, ReadSupport {
@Override
public DataSourceV2Reader createReader(DataSourceV2Options options) {
public DataSourceReader createReader(DataSourceOptions options) {
return new Reader();
}
}

View file

@ -23,15 +23,15 @@ import java.util.List;
import org.apache.spark.sql.Row;
import org.apache.spark.sql.catalyst.expressions.GenericRow;
import org.apache.spark.sql.sources.v2.DataSourceOptions;
import org.apache.spark.sql.sources.v2.DataSourceV2;
import org.apache.spark.sql.sources.v2.DataSourceV2Options;
import org.apache.spark.sql.sources.v2.ReadSupport;
import org.apache.spark.sql.sources.v2.reader.*;
import org.apache.spark.sql.types.StructType;
public class JavaPartitionAwareDataSource implements DataSourceV2, ReadSupport {
class Reader implements DataSourceV2Reader, SupportsReportPartitioning {
class Reader implements DataSourceReader, SupportsReportPartitioning {
private final StructType schema = new StructType().add("a", "int").add("b", "int");
@Override
@ -104,7 +104,7 @@ public class JavaPartitionAwareDataSource implements DataSourceV2, ReadSupport {
}
@Override
public DataSourceV2Reader createReader(DataSourceV2Options options) {
public DataSourceReader createReader(DataSourceOptions options) {
return new Reader();
}
}

View file

@ -20,16 +20,16 @@ package test.org.apache.spark.sql.sources.v2;
import java.util.List;
import org.apache.spark.sql.Row;
import org.apache.spark.sql.sources.v2.DataSourceOptions;
import org.apache.spark.sql.sources.v2.DataSourceV2;
import org.apache.spark.sql.sources.v2.DataSourceV2Options;
import org.apache.spark.sql.sources.v2.ReadSupportWithSchema;
import org.apache.spark.sql.sources.v2.reader.DataSourceV2Reader;
import org.apache.spark.sql.sources.v2.reader.DataSourceReader;
import org.apache.spark.sql.sources.v2.reader.DataReaderFactory;
import org.apache.spark.sql.types.StructType;
public class JavaSchemaRequiredDataSource implements DataSourceV2, ReadSupportWithSchema {
class Reader implements DataSourceV2Reader {
class Reader implements DataSourceReader {
private final StructType schema;
Reader(StructType schema) {
@ -48,7 +48,7 @@ public class JavaSchemaRequiredDataSource implements DataSourceV2, ReadSupportWi
}
@Override
public DataSourceV2Reader createReader(StructType schema, DataSourceV2Options options) {
public DataSourceReader createReader(StructType schema, DataSourceOptions options) {
return new Reader(schema);
}
}

View file

@ -23,16 +23,16 @@ import java.util.List;
import org.apache.spark.sql.Row;
import org.apache.spark.sql.catalyst.expressions.GenericRow;
import org.apache.spark.sql.sources.v2.DataSourceV2;
import org.apache.spark.sql.sources.v2.DataSourceV2Options;
import org.apache.spark.sql.sources.v2.DataSourceOptions;
import org.apache.spark.sql.sources.v2.ReadSupport;
import org.apache.spark.sql.sources.v2.reader.DataReader;
import org.apache.spark.sql.sources.v2.reader.DataReaderFactory;
import org.apache.spark.sql.sources.v2.reader.DataSourceV2Reader;
import org.apache.spark.sql.sources.v2.reader.DataSourceReader;
import org.apache.spark.sql.types.StructType;
public class JavaSimpleDataSourceV2 implements DataSourceV2, ReadSupport {
class Reader implements DataSourceV2Reader {
class Reader implements DataSourceReader {
private final StructType schema = new StructType().add("i", "int").add("j", "int");
@Override
@ -80,7 +80,7 @@ public class JavaSimpleDataSourceV2 implements DataSourceV2, ReadSupport {
}
@Override
public DataSourceV2Reader createReader(DataSourceV2Options options) {
public DataSourceReader createReader(DataSourceOptions options) {
return new Reader();
}
}

View file

@ -21,15 +21,15 @@ import java.io.IOException;
import java.util.List;
import org.apache.spark.sql.catalyst.expressions.UnsafeRow;
import org.apache.spark.sql.sources.v2.DataSourceOptions;
import org.apache.spark.sql.sources.v2.DataSourceV2;
import org.apache.spark.sql.sources.v2.DataSourceV2Options;
import org.apache.spark.sql.sources.v2.ReadSupport;
import org.apache.spark.sql.sources.v2.reader.*;
import org.apache.spark.sql.types.StructType;
public class JavaUnsafeRowDataSourceV2 implements DataSourceV2, ReadSupport {
class Reader implements DataSourceV2Reader, SupportsScanUnsafeRow {
class Reader implements DataSourceReader, SupportsScanUnsafeRow {
private final StructType schema = new StructType().add("i", "int").add("j", "int");
@Override
@ -83,7 +83,7 @@ public class JavaUnsafeRowDataSourceV2 implements DataSourceV2, ReadSupport {
}
@Override
public DataSourceV2Reader createReader(DataSourceV2Options options) {
public DataSourceReader createReader(DataSourceOptions options) {
return new Reader();
}
}

View file

@ -26,7 +26,7 @@ import org.apache.spark.sql.Row
import org.apache.spark.sql.execution.datasources.DataSource
import org.apache.spark.sql.execution.streaming.continuous._
import org.apache.spark.sql.execution.streaming.sources.{RateStreamBatchTask, RateStreamMicroBatchReader, RateStreamSourceV2}
import org.apache.spark.sql.sources.v2.DataSourceV2Options
import org.apache.spark.sql.sources.v2.DataSourceOptions
import org.apache.spark.sql.sources.v2.streaming.{ContinuousReadSupport, MicroBatchReadSupport}
import org.apache.spark.sql.streaming.StreamTest
import org.apache.spark.util.ManualClock
@ -49,7 +49,7 @@ class RateSourceV2Suite extends StreamTest {
test("microbatch in registry") {
DataSource.lookupDataSource("ratev2", spark.sqlContext.conf).newInstance() match {
case ds: MicroBatchReadSupport =>
val reader = ds.createMicroBatchReader(Optional.empty(), "", DataSourceV2Options.empty())
val reader = ds.createMicroBatchReader(Optional.empty(), "", DataSourceOptions.empty())
assert(reader.isInstanceOf[RateStreamMicroBatchReader])
case _ =>
throw new IllegalStateException("Could not find v2 read support for rate")
@ -76,14 +76,14 @@ class RateSourceV2Suite extends StreamTest {
test("microbatch - numPartitions propagated") {
val reader = new RateStreamMicroBatchReader(
new DataSourceV2Options(Map("numPartitions" -> "11", "rowsPerSecond" -> "33").asJava))
new DataSourceOptions(Map("numPartitions" -> "11", "rowsPerSecond" -> "33").asJava))
reader.setOffsetRange(Optional.empty(), Optional.empty())
val tasks = reader.createDataReaderFactories()
assert(tasks.size == 11)
}
test("microbatch - set offset") {
val reader = new RateStreamMicroBatchReader(DataSourceV2Options.empty())
val reader = new RateStreamMicroBatchReader(DataSourceOptions.empty())
val startOffset = RateStreamOffset(Map((0, ValueRunTimeMsPair(0, 1000))))
val endOffset = RateStreamOffset(Map((0, ValueRunTimeMsPair(0, 2000))))
reader.setOffsetRange(Optional.of(startOffset), Optional.of(endOffset))
@ -93,7 +93,7 @@ class RateSourceV2Suite extends StreamTest {
test("microbatch - infer offsets") {
val reader = new RateStreamMicroBatchReader(
new DataSourceV2Options(Map("numPartitions" -> "1", "rowsPerSecond" -> "100").asJava))
new DataSourceOptions(Map("numPartitions" -> "1", "rowsPerSecond" -> "100").asJava))
reader.clock.waitTillTime(reader.clock.getTimeMillis() + 100)
reader.setOffsetRange(Optional.empty(), Optional.empty())
reader.getStartOffset() match {
@ -114,7 +114,7 @@ class RateSourceV2Suite extends StreamTest {
test("microbatch - predetermined batch size") {
val reader = new RateStreamMicroBatchReader(
new DataSourceV2Options(Map("numPartitions" -> "1", "rowsPerSecond" -> "20").asJava))
new DataSourceOptions(Map("numPartitions" -> "1", "rowsPerSecond" -> "20").asJava))
val startOffset = RateStreamOffset(Map((0, ValueRunTimeMsPair(0, 1000))))
val endOffset = RateStreamOffset(Map((0, ValueRunTimeMsPair(20, 2000))))
reader.setOffsetRange(Optional.of(startOffset), Optional.of(endOffset))
@ -125,7 +125,7 @@ class RateSourceV2Suite extends StreamTest {
test("microbatch - data read") {
val reader = new RateStreamMicroBatchReader(
new DataSourceV2Options(Map("numPartitions" -> "11", "rowsPerSecond" -> "33").asJava))
new DataSourceOptions(Map("numPartitions" -> "11", "rowsPerSecond" -> "33").asJava))
val startOffset = RateStreamSourceV2.createInitialOffset(11, reader.creationTimeMs)
val endOffset = RateStreamOffset(startOffset.partitionToValueAndRunTimeMs.toSeq.map {
case (part, ValueRunTimeMsPair(currentVal, currentReadTime)) =>
@ -150,7 +150,7 @@ class RateSourceV2Suite extends StreamTest {
test("continuous in registry") {
DataSource.lookupDataSource("rate", spark.sqlContext.conf).newInstance() match {
case ds: ContinuousReadSupport =>
val reader = ds.createContinuousReader(Optional.empty(), "", DataSourceV2Options.empty())
val reader = ds.createContinuousReader(Optional.empty(), "", DataSourceOptions.empty())
assert(reader.isInstanceOf[RateStreamContinuousReader])
case _ =>
throw new IllegalStateException("Could not find v2 read support for rate")
@ -159,7 +159,7 @@ class RateSourceV2Suite extends StreamTest {
test("continuous data") {
val reader = new RateStreamContinuousReader(
new DataSourceV2Options(Map("numPartitions" -> "2", "rowsPerSecond" -> "20").asJava))
new DataSourceOptions(Map("numPartitions" -> "2", "rowsPerSecond" -> "20").asJava))
reader.setOffset(Optional.empty())
val tasks = reader.createDataReaderFactories()
assert(tasks.size == 2)

View file

@ -22,24 +22,24 @@ import scala.collection.JavaConverters._
import org.apache.spark.SparkFunSuite
/**
* A simple test suite to verify `DataSourceV2Options`.
* A simple test suite to verify `DataSourceOptions`.
*/
class DataSourceV2OptionsSuite extends SparkFunSuite {
class DataSourceOptionsSuite extends SparkFunSuite {
test("key is case-insensitive") {
val options = new DataSourceV2Options(Map("foo" -> "bar").asJava)
val options = new DataSourceOptions(Map("foo" -> "bar").asJava)
assert(options.get("foo").get() == "bar")
assert(options.get("FoO").get() == "bar")
assert(!options.get("abc").isPresent)
}
test("value is case-sensitive") {
val options = new DataSourceV2Options(Map("foo" -> "bAr").asJava)
val options = new DataSourceOptions(Map("foo" -> "bAr").asJava)
assert(options.get("foo").get == "bAr")
}
test("getInt") {
val options = new DataSourceV2Options(Map("numFOo" -> "1", "foo" -> "bar").asJava)
val options = new DataSourceOptions(Map("numFOo" -> "1", "foo" -> "bar").asJava)
assert(options.getInt("numFOO", 10) == 1)
assert(options.getInt("numFOO2", 10) == 10)
@ -49,7 +49,7 @@ class DataSourceV2OptionsSuite extends SparkFunSuite {
}
test("getBoolean") {
val options = new DataSourceV2Options(
val options = new DataSourceOptions(
Map("isFoo" -> "true", "isFOO2" -> "false", "foo" -> "bar").asJava)
assert(options.getBoolean("isFoo", false))
assert(!options.getBoolean("isFoo2", true))
@ -59,7 +59,7 @@ class DataSourceV2OptionsSuite extends SparkFunSuite {
}
test("getLong") {
val options = new DataSourceV2Options(Map("numFoo" -> "9223372036854775807",
val options = new DataSourceOptions(Map("numFoo" -> "9223372036854775807",
"foo" -> "bar").asJava)
assert(options.getLong("numFOO", 0L) == 9223372036854775807L)
assert(options.getLong("numFoo2", -1L) == -1L)
@ -70,7 +70,7 @@ class DataSourceV2OptionsSuite extends SparkFunSuite {
}
test("getDouble") {
val options = new DataSourceV2Options(Map("numFoo" -> "922337.1",
val options = new DataSourceOptions(Map("numFoo" -> "922337.1",
"foo" -> "bar").asJava)
assert(options.getDouble("numFOO", 0d) == 922337.1d)
assert(options.getDouble("numFoo2", -1.02d) == -1.02d)

View file

@ -201,7 +201,7 @@ class DataSourceV2Suite extends QueryTest with SharedSQLContext {
class SimpleDataSourceV2 extends DataSourceV2 with ReadSupport {
class Reader extends DataSourceV2Reader {
class Reader extends DataSourceReader {
override def readSchema(): StructType = new StructType().add("i", "int").add("j", "int")
override def createDataReaderFactories(): JList[DataReaderFactory[Row]] = {
@ -209,7 +209,7 @@ class SimpleDataSourceV2 extends DataSourceV2 with ReadSupport {
}
}
override def createReader(options: DataSourceV2Options): DataSourceV2Reader = new Reader
override def createReader(options: DataSourceOptions): DataSourceReader = new Reader
}
class SimpleDataReaderFactory(start: Int, end: Int)
@ -233,7 +233,7 @@ class SimpleDataReaderFactory(start: Int, end: Int)
class AdvancedDataSourceV2 extends DataSourceV2 with ReadSupport {
class Reader extends DataSourceV2Reader
class Reader extends DataSourceReader
with SupportsPushDownRequiredColumns with SupportsPushDownFilters {
var requiredSchema = new StructType().add("i", "int").add("j", "int")
@ -275,7 +275,7 @@ class AdvancedDataSourceV2 extends DataSourceV2 with ReadSupport {
}
}
override def createReader(options: DataSourceV2Options): DataSourceV2Reader = new Reader
override def createReader(options: DataSourceOptions): DataSourceReader = new Reader
}
class AdvancedDataReaderFactory(start: Int, end: Int, requiredSchema: StructType)
@ -306,7 +306,7 @@ class AdvancedDataReaderFactory(start: Int, end: Int, requiredSchema: StructType
class UnsafeRowDataSourceV2 extends DataSourceV2 with ReadSupport {
class Reader extends DataSourceV2Reader with SupportsScanUnsafeRow {
class Reader extends DataSourceReader with SupportsScanUnsafeRow {
override def readSchema(): StructType = new StructType().add("i", "int").add("j", "int")
override def createUnsafeRowReaderFactories(): JList[DataReaderFactory[UnsafeRow]] = {
@ -315,7 +315,7 @@ class UnsafeRowDataSourceV2 extends DataSourceV2 with ReadSupport {
}
}
override def createReader(options: DataSourceV2Options): DataSourceV2Reader = new Reader
override def createReader(options: DataSourceOptions): DataSourceReader = new Reader
}
class UnsafeRowDataReaderFactory(start: Int, end: Int)
@ -343,18 +343,18 @@ class UnsafeRowDataReaderFactory(start: Int, end: Int)
class SchemaRequiredDataSource extends DataSourceV2 with ReadSupportWithSchema {
class Reader(val readSchema: StructType) extends DataSourceV2Reader {
class Reader(val readSchema: StructType) extends DataSourceReader {
override def createDataReaderFactories(): JList[DataReaderFactory[Row]] =
java.util.Collections.emptyList()
}
override def createReader(schema: StructType, options: DataSourceV2Options): DataSourceV2Reader =
override def createReader(schema: StructType, options: DataSourceOptions): DataSourceReader =
new Reader(schema)
}
class BatchDataSourceV2 extends DataSourceV2 with ReadSupport {
class Reader extends DataSourceV2Reader with SupportsScanColumnarBatch {
class Reader extends DataSourceReader with SupportsScanColumnarBatch {
override def readSchema(): StructType = new StructType().add("i", "int").add("j", "int")
override def createBatchDataReaderFactories(): JList[DataReaderFactory[ColumnarBatch]] = {
@ -362,7 +362,7 @@ class BatchDataSourceV2 extends DataSourceV2 with ReadSupport {
}
}
override def createReader(options: DataSourceV2Options): DataSourceV2Reader = new Reader
override def createReader(options: DataSourceOptions): DataSourceReader = new Reader
}
class BatchDataReaderFactory(start: Int, end: Int)
@ -406,7 +406,7 @@ class BatchDataReaderFactory(start: Int, end: Int)
class PartitionAwareDataSource extends DataSourceV2 with ReadSupport {
class Reader extends DataSourceV2Reader with SupportsReportPartitioning {
class Reader extends DataSourceReader with SupportsReportPartitioning {
override def readSchema(): StructType = new StructType().add("a", "int").add("b", "int")
override def createDataReaderFactories(): JList[DataReaderFactory[Row]] = {
@ -428,7 +428,7 @@ class PartitionAwareDataSource extends DataSourceV2 with ReadSupport {
}
}
override def createReader(options: DataSourceV2Options): DataSourceV2Reader = new Reader
override def createReader(options: DataSourceOptions): DataSourceReader = new Reader
}
class SpecificDataReaderFactory(i: Array[Int], j: Array[Int])

View file

@ -28,7 +28,7 @@ import org.apache.hadoop.fs.{FileSystem, FSDataInputStream, Path}
import org.apache.spark.SparkContext
import org.apache.spark.sql.{Row, SaveMode}
import org.apache.spark.sql.catalyst.InternalRow
import org.apache.spark.sql.sources.v2.reader.{DataReader, DataReaderFactory, DataSourceV2Reader}
import org.apache.spark.sql.sources.v2.reader.{DataReader, DataReaderFactory, DataSourceReader}
import org.apache.spark.sql.sources.v2.writer._
import org.apache.spark.sql.types.{DataType, StructType}
import org.apache.spark.util.SerializableConfiguration
@ -42,7 +42,7 @@ class SimpleWritableDataSource extends DataSourceV2 with ReadSupport with WriteS
private val schema = new StructType().add("i", "long").add("j", "long")
class Reader(path: String, conf: Configuration) extends DataSourceV2Reader {
class Reader(path: String, conf: Configuration) extends DataSourceReader {
override def readSchema(): StructType = schema
override def createDataReaderFactories(): JList[DataReaderFactory[Row]] = {
@ -64,7 +64,7 @@ class SimpleWritableDataSource extends DataSourceV2 with ReadSupport with WriteS
}
}
class Writer(jobId: String, path: String, conf: Configuration) extends DataSourceV2Writer {
class Writer(jobId: String, path: String, conf: Configuration) extends DataSourceWriter {
override def createWriterFactory(): DataWriterFactory[Row] = {
new SimpleCSVDataWriterFactory(path, jobId, new SerializableConfiguration(conf))
}
@ -104,7 +104,7 @@ class SimpleWritableDataSource extends DataSourceV2 with ReadSupport with WriteS
}
}
override def createReader(options: DataSourceV2Options): DataSourceV2Reader = {
override def createReader(options: DataSourceOptions): DataSourceReader = {
val path = new Path(options.get("path").get())
val conf = SparkContext.getActive.get.hadoopConfiguration
new Reader(path.toUri.toString, conf)
@ -114,7 +114,7 @@ class SimpleWritableDataSource extends DataSourceV2 with ReadSupport with WriteS
jobId: String,
schema: StructType,
mode: SaveMode,
options: DataSourceV2Options): Optional[DataSourceV2Writer] = {
options: DataSourceOptions): Optional[DataSourceWriter] = {
assert(DataType.equalsStructurally(schema.asNullable, this.schema.asNullable))
assert(!SparkContext.getActive.get.conf.getBoolean("spark.speculation", false))
@ -141,7 +141,7 @@ class SimpleWritableDataSource extends DataSourceV2 with ReadSupport with WriteS
}
private def createWriter(
jobId: String, path: Path, conf: Configuration, internal: Boolean): DataSourceV2Writer = {
jobId: String, path: Path, conf: Configuration, internal: Boolean): DataSourceWriter = {
val pathStr = path.toUri.toString
if (internal) {
new InternalRowWriter(jobId, pathStr, conf)

View file

@ -25,7 +25,7 @@ import org.apache.spark.sql.execution.streaming.{RateStreamOffset, Sink, Streami
import org.apache.spark.sql.execution.streaming.continuous.ContinuousTrigger
import org.apache.spark.sql.internal.SQLConf
import org.apache.spark.sql.sources.{DataSourceRegister, StreamSinkProvider}
import org.apache.spark.sql.sources.v2.DataSourceV2Options
import org.apache.spark.sql.sources.v2.DataSourceOptions
import org.apache.spark.sql.sources.v2.reader.DataReaderFactory
import org.apache.spark.sql.sources.v2.streaming._
import org.apache.spark.sql.sources.v2.streaming.reader.{ContinuousReader, MicroBatchReader, Offset, PartitionOffset}
@ -54,14 +54,14 @@ trait FakeMicroBatchReadSupport extends MicroBatchReadSupport {
override def createMicroBatchReader(
schema: Optional[StructType],
checkpointLocation: String,
options: DataSourceV2Options): MicroBatchReader = FakeReader()
options: DataSourceOptions): MicroBatchReader = FakeReader()
}
trait FakeContinuousReadSupport extends ContinuousReadSupport {
override def createContinuousReader(
schema: Optional[StructType],
checkpointLocation: String,
options: DataSourceV2Options): ContinuousReader = FakeReader()
options: DataSourceOptions): ContinuousReader = FakeReader()
}
trait FakeStreamWriteSupport extends StreamWriteSupport {
@ -69,7 +69,7 @@ trait FakeStreamWriteSupport extends StreamWriteSupport {
queryId: String,
schema: StructType,
mode: OutputMode,
options: DataSourceV2Options): StreamWriter = {
options: DataSourceOptions): StreamWriter = {
throw new IllegalStateException("fake sink - cannot actually write")
}
}