[SPARK-28747][SQL] merge the two data source v2 fallback configs

## What changes were proposed in this pull request?

Currently we have 2 configs to specify which v2 sources should fallback to v1 code path. One config for read path, and one config for write path.

However, I found it's awkward to work with these 2 configs:
1. for `CREATE TABLE USING format`, should this be read path or write path?
2. for `V2SessionCatalog.loadTable`,  we need to return `UnresolvedTable` if it's a DS v1 or we need to fallback to v1 code path. However, at that time, we don't know if the returned table will be used for read or write.

We don't have any new features or perf improvement in file source v2. The fallback API is just a safeguard if we have bugs in v2 implementations. There are not many benefits to support falling back to v1 for read and write path separately.

This PR proposes to merge these 2 configs into one.

## How was this patch tested?

existing tests

Closes #25465 from cloud-fan/merge-conf.

Authored-by: Wenchen Fan <wenchen@databricks.com>
Signed-off-by: Wenchen Fan <wenchen@databricks.com>
This commit is contained in:
Wenchen Fan 2019-08-27 20:47:24 +08:00
parent c02c86e4e8
commit cb06209fc9
43 changed files with 158 additions and 231 deletions

View file

@ -354,14 +354,12 @@ class AvroV1LogicalTypeSuite extends AvroLogicalTypeSuite {
override protected def sparkConf: SparkConf = override protected def sparkConf: SparkConf =
super super
.sparkConf .sparkConf
.set(SQLConf.USE_V1_SOURCE_READER_LIST, "avro") .set(SQLConf.USE_V1_SOURCE_LIST, "avro")
.set(SQLConf.USE_V1_SOURCE_WRITER_LIST, "avro")
} }
class AvroV2LogicalTypeSuite extends AvroLogicalTypeSuite { class AvroV2LogicalTypeSuite extends AvroLogicalTypeSuite {
override protected def sparkConf: SparkConf = override protected def sparkConf: SparkConf =
super super
.sparkConf .sparkConf
.set(SQLConf.USE_V1_SOURCE_READER_LIST, "") .set(SQLConf.USE_V1_SOURCE_LIST, "")
.set(SQLConf.USE_V1_SOURCE_WRITER_LIST, "")
} }

View file

@ -1498,14 +1498,12 @@ class AvroV1Suite extends AvroSuite {
override protected def sparkConf: SparkConf = override protected def sparkConf: SparkConf =
super super
.sparkConf .sparkConf
.set(SQLConf.USE_V1_SOURCE_READER_LIST, "avro") .set(SQLConf.USE_V1_SOURCE_LIST, "avro")
.set(SQLConf.USE_V1_SOURCE_WRITER_LIST, "avro")
} }
class AvroV2Suite extends AvroSuite { class AvroV2Suite extends AvroSuite {
override protected def sparkConf: SparkConf = override protected def sparkConf: SparkConf =
super super
.sparkConf .sparkConf
.set(SQLConf.USE_V1_SOURCE_READER_LIST, "") .set(SQLConf.USE_V1_SOURCE_LIST, "")
.set(SQLConf.USE_V1_SOURCE_WRITER_LIST, "")
} }

View file

@ -179,7 +179,7 @@ class KafkaDontFailOnDataLossSuite extends StreamTest with KafkaMissingOffsetsTe
} }
test("failOnDataLoss=false should not return duplicated records: batch v1") { test("failOnDataLoss=false should not return duplicated records: batch v1") {
withSQLConf(SQLConf.USE_V1_SOURCE_READER_LIST.key -> "kafka") { withSQLConf(SQLConf.USE_V1_SOURCE_LIST.key -> "kafka") {
verifyMissingOffsetsDontCauseDuplicatedRecords(testStreamingQuery = false) { (df, table) => verifyMissingOffsetsDontCauseDuplicatedRecords(testStreamingQuery = false) { (df, table) =>
df.write.saveAsTable(table) df.write.saveAsTable(table)
} }
@ -187,7 +187,7 @@ class KafkaDontFailOnDataLossSuite extends StreamTest with KafkaMissingOffsetsTe
} }
test("failOnDataLoss=false should not return duplicated records: batch v2") { test("failOnDataLoss=false should not return duplicated records: batch v2") {
withSQLConf(SQLConf.USE_V1_SOURCE_READER_LIST.key -> "") { withSQLConf(SQLConf.USE_V1_SOURCE_LIST.key -> "") {
verifyMissingOffsetsDontCauseDuplicatedRecords(testStreamingQuery = false) { (df, table) => verifyMissingOffsetsDontCauseDuplicatedRecords(testStreamingQuery = false) { (df, table) =>
df.write.saveAsTable(table) df.write.saveAsTable(table)
} }

View file

@ -46,7 +46,7 @@ abstract class KafkaRelationSuiteBase extends QueryTest with SharedSparkSession
override protected def sparkConf: SparkConf = override protected def sparkConf: SparkConf =
super super
.sparkConf .sparkConf
.set(SQLConf.USE_V1_SOURCE_READER_LIST, "kafka") .set(SQLConf.USE_V1_SOURCE_LIST, "kafka")
protected def newTopic(): String = s"topic-${topicId.getAndIncrement()}" protected def newTopic(): String = s"topic-${topicId.getAndIncrement()}"
@ -385,7 +385,7 @@ class KafkaRelationSuiteV1 extends KafkaRelationSuiteBase {
override protected def sparkConf: SparkConf = override protected def sparkConf: SparkConf =
super super
.sparkConf .sparkConf
.set(SQLConf.USE_V1_SOURCE_READER_LIST, "kafka") .set(SQLConf.USE_V1_SOURCE_LIST, "kafka")
test("V1 Source is used when set through SQLConf") { test("V1 Source is used when set through SQLConf") {
val topic = newTopic() val topic = newTopic()
@ -400,7 +400,7 @@ class KafkaRelationSuiteV2 extends KafkaRelationSuiteBase {
override protected def sparkConf: SparkConf = override protected def sparkConf: SparkConf =
super super
.sparkConf .sparkConf
.set(SQLConf.USE_V1_SOURCE_READER_LIST, "") .set(SQLConf.USE_V1_SOURCE_LIST, "")
test("V2 Source is used when set through SQLConf") { test("V2 Source is used when set through SQLConf") {
val topic = newTopic() val topic = newTopic()

View file

@ -427,7 +427,7 @@ class KafkaSinkBatchSuiteV1 extends KafkaSinkBatchSuiteBase {
override protected def sparkConf: SparkConf = override protected def sparkConf: SparkConf =
super super
.sparkConf .sparkConf
.set(SQLConf.USE_V1_SOURCE_WRITER_LIST, "kafka") .set(SQLConf.USE_V1_SOURCE_LIST, "kafka")
test("batch - unsupported save modes") { test("batch - unsupported save modes") {
testUnsupportedSaveModes((mode) => s"Save mode ${mode.name} not allowed for Kafka") testUnsupportedSaveModes((mode) => s"Save mode ${mode.name} not allowed for Kafka")
@ -438,7 +438,7 @@ class KafkaSinkBatchSuiteV2 extends KafkaSinkBatchSuiteBase {
override protected def sparkConf: SparkConf = override protected def sparkConf: SparkConf =
super super
.sparkConf .sparkConf
.set(SQLConf.USE_V1_SOURCE_WRITER_LIST, "") .set(SQLConf.USE_V1_SOURCE_LIST, "")
test("batch - unsupported save modes") { test("batch - unsupported save modes") {
testUnsupportedSaveModes((mode) => s"cannot be written with ${mode.name} mode") testUnsupportedSaveModes((mode) => s"cannot be written with ${mode.name} mode")

View file

@ -1589,22 +1589,14 @@ object SQLConf {
.timeConf(TimeUnit.MILLISECONDS) .timeConf(TimeUnit.MILLISECONDS)
.createWithDefault(100) .createWithDefault(100)
val USE_V1_SOURCE_READER_LIST = buildConf("spark.sql.sources.read.useV1SourceList") val USE_V1_SOURCE_LIST = buildConf("spark.sql.sources.useV1SourceList")
.internal() .internal()
.doc("A comma-separated list of data source short names or fully qualified data source" + .doc("A comma-separated list of data source short names or fully qualified data source " +
" register class names for which data source V2 read paths are disabled. Reads from these" + "implementation class names for which Data Source V2 code path is disabled. These data " +
" sources will fall back to the V1 sources.") "sources will fallback to Data Source V1 code path.")
.stringConf .stringConf
.createWithDefault("") .createWithDefault("")
val USE_V1_SOURCE_WRITER_LIST = buildConf("spark.sql.sources.write.useV1SourceList")
.internal()
.doc("A comma-separated list of data source short names or fully qualified data source" +
" register class names for which data source V2 write paths are disabled. Writes from these" +
" sources will fall back to the V1 sources.")
.stringConf
.createWithDefault("csv,json,orc,text,parquet")
val DISABLED_V2_STREAMING_WRITERS = buildConf("spark.sql.streaming.disabledV2Writers") val DISABLED_V2_STREAMING_WRITERS = buildConf("spark.sql.streaming.disabledV2Writers")
.doc("A comma-separated list of fully qualified data source register class names for which" + .doc("A comma-separated list of fully qualified data source register class names for which" +
" StreamWriteSupport is disabled. Writes to these sources will fall back to the V1 Sinks.") " StreamWriteSupport is disabled. Writes to these sources will fall back to the V1 Sinks.")
@ -2352,10 +2344,6 @@ class SQLConf extends Serializable with Logging {
def continuousStreamingExecutorPollIntervalMs: Long = def continuousStreamingExecutorPollIntervalMs: Long =
getConf(CONTINUOUS_STREAMING_EXECUTOR_POLL_INTERVAL_MS) getConf(CONTINUOUS_STREAMING_EXECUTOR_POLL_INTERVAL_MS)
def useV1SourceReaderList: String = getConf(USE_V1_SOURCE_READER_LIST)
def useV1SourceWriterList: String = getConf(USE_V1_SOURCE_WRITER_LIST)
def disabledV2StreamingWriters: String = getConf(DISABLED_V2_STREAMING_WRITERS) def disabledV2StreamingWriters: String = getConf(DISABLED_V2_STREAMING_WRITERS)
def disabledV2StreamingMicroBatchReaders: String = def disabledV2StreamingMicroBatchReaders: String =

View file

@ -38,7 +38,6 @@ import org.apache.spark.sql.execution.datasources.csv._
import org.apache.spark.sql.execution.datasources.jdbc._ import org.apache.spark.sql.execution.datasources.jdbc._
import org.apache.spark.sql.execution.datasources.json.TextInputJsonDataSource import org.apache.spark.sql.execution.datasources.json.TextInputJsonDataSource
import org.apache.spark.sql.execution.datasources.v2.{DataSourceV2Relation, DataSourceV2Utils} import org.apache.spark.sql.execution.datasources.v2.{DataSourceV2Relation, DataSourceV2Utils}
import org.apache.spark.sql.sources.DataSourceRegister
import org.apache.spark.sql.sources.v2._ import org.apache.spark.sql.sources.v2._
import org.apache.spark.sql.sources.v2.TableCapability._ import org.apache.spark.sql.sources.v2.TableCapability._
import org.apache.spark.sql.types.StructType import org.apache.spark.sql.types.StructType
@ -204,16 +203,7 @@ class DataFrameReader private[sql](sparkSession: SparkSession) extends Logging {
"read files of Hive data source directly.") "read files of Hive data source directly.")
} }
val useV1Sources = DataSource.lookupDataSourceV2(source, sparkSession.sessionState.conf).map { provider =>
sparkSession.sessionState.conf.useV1SourceReaderList.toLowerCase(Locale.ROOT).split(",")
val cls = DataSource.lookupDataSource(source, sparkSession.sessionState.conf)
val shouldUseV1Source = cls.newInstance() match {
case d: DataSourceRegister if useV1Sources.contains(d.shortName()) => true
case _ => useV1Sources.contains(cls.getCanonicalName.toLowerCase(Locale.ROOT))
}
if (!shouldUseV1Source && classOf[TableProvider].isAssignableFrom(cls)) {
val provider = cls.getConstructor().newInstance().asInstanceOf[TableProvider]
val sessionOptions = DataSourceV2Utils.extractSessionConfigs( val sessionOptions = DataSourceV2Utils.extractSessionConfigs(
source = provider, conf = sparkSession.sessionState.conf) source = provider, conf = sparkSession.sessionState.conf)
val pathsOption = if (paths.isEmpty) { val pathsOption = if (paths.isEmpty) {
@ -236,9 +226,7 @@ class DataFrameReader private[sql](sparkSession: SparkSession) extends Logging {
case _ => loadV1Source(paths: _*) case _ => loadV1Source(paths: _*)
} }
} else { }.getOrElse(loadV1Source(paths: _*))
loadV1Source(paths: _*)
}
} }
private def loadV1Source(paths: String*) = { private def loadV1Source(paths: String*) = {

View file

@ -34,7 +34,7 @@ import org.apache.spark.sql.execution.command.DDLUtils
import org.apache.spark.sql.execution.datasources.{CreateTable, DataSource, DataSourceUtils, LogicalRelation} import org.apache.spark.sql.execution.datasources.{CreateTable, DataSource, DataSourceUtils, LogicalRelation}
import org.apache.spark.sql.execution.datasources.v2._ import org.apache.spark.sql.execution.datasources.v2._
import org.apache.spark.sql.internal.SQLConf.PartitionOverwriteMode import org.apache.spark.sql.internal.SQLConf.PartitionOverwriteMode
import org.apache.spark.sql.sources.{BaseRelation, DataSourceRegister} import org.apache.spark.sql.sources.BaseRelation
import org.apache.spark.sql.sources.v2._ import org.apache.spark.sql.sources.v2._
import org.apache.spark.sql.sources.v2.TableCapability._ import org.apache.spark.sql.sources.v2.TableCapability._
import org.apache.spark.sql.sources.v2.internal.UnresolvedTable import org.apache.spark.sql.sources.v2.internal.UnresolvedTable
@ -251,37 +251,21 @@ final class DataFrameWriter[T] private[sql](ds: Dataset[T]) {
assertNotBucketed("save") assertNotBucketed("save")
val session = df.sparkSession val maybeV2Provider = lookupV2Provider()
val cls = DataSource.lookupDataSource(source, session.sessionState.conf) if (maybeV2Provider.isDefined) {
val canUseV2 = canUseV2Source(session, cls) && partitioningColumns.isEmpty if (partitioningColumns.nonEmpty) {
throw new AnalysisException(
"Cannot write data to TableProvider implementation if partition columns are specified.")
}
// In Data Source V2 project, partitioning is still under development. val provider = maybeV2Provider.get
// Here we fallback to V1 if partitioning columns are specified.
// TODO(SPARK-26778): use V2 implementations when partitioning feature is supported.
if (canUseV2) {
val provider = cls.getConstructor().newInstance().asInstanceOf[TableProvider]
val sessionOptions = DataSourceV2Utils.extractSessionConfigs( val sessionOptions = DataSourceV2Utils.extractSessionConfigs(
provider, session.sessionState.conf) provider, df.sparkSession.sessionState.conf)
val options = sessionOptions ++ extraOptions val options = sessionOptions ++ extraOptions
val dsOptions = new CaseInsensitiveStringMap(options.asJava) val dsOptions = new CaseInsensitiveStringMap(options.asJava)
import org.apache.spark.sql.execution.datasources.v2.DataSourceV2Implicits._ import org.apache.spark.sql.execution.datasources.v2.DataSourceV2Implicits._
provider.getTable(dsOptions) match { provider.getTable(dsOptions) match {
// TODO (SPARK-27815): To not break existing tests, here we treat file source as a special
// case, and pass the save mode to file source directly. This hack should be removed.
case table: FileTable =>
val write = table.newWriteBuilder(dsOptions).asInstanceOf[FileWriteBuilder]
.mode(modeForDSV1) // should not change default mode for file source.
.withQueryId(UUID.randomUUID().toString)
.withInputDataSchema(df.logicalPlan.schema)
.buildForBatch()
// The returned `Write` can be null, which indicates that we can skip writing.
if (write != null) {
runCommand(df.sparkSession, "save") {
WriteToDataSourceV2(write, df.logicalPlan)
}
}
case table: SupportsWrite if table.supports(BATCH_WRITE) => case table: SupportsWrite if table.supports(BATCH_WRITE) =>
lazy val relation = DataSourceV2Relation.create(table, dsOptions) lazy val relation = DataSourceV2Relation.create(table, dsOptions)
modeForDSV2 match { modeForDSV2 match {
@ -368,8 +352,7 @@ final class DataFrameWriter[T] private[sql](ds: Dataset[T]) {
} }
val session = df.sparkSession val session = df.sparkSession
val provider = DataSource.lookupDataSource(source, session.sessionState.conf) val canUseV2 = lookupV2Provider().isDefined
val canUseV2 = canUseV2Source(session, provider)
val sessionCatalogOpt = session.sessionState.analyzer.sessionCatalog val sessionCatalogOpt = session.sessionState.analyzer.sessionCatalog
session.sessionState.sqlParser.parseMultipartIdentifier(tableName) match { session.sessionState.sqlParser.parseMultipartIdentifier(tableName) match {
@ -503,8 +486,7 @@ final class DataFrameWriter[T] private[sql](ds: Dataset[T]) {
import org.apache.spark.sql.catalog.v2.CatalogV2Implicits._ import org.apache.spark.sql.catalog.v2.CatalogV2Implicits._
val session = df.sparkSession val session = df.sparkSession
val provider = DataSource.lookupDataSource(source, session.sessionState.conf) val canUseV2 = lookupV2Provider().isDefined
val canUseV2 = canUseV2Source(session, provider)
val sessionCatalogOpt = session.sessionState.analyzer.sessionCatalog val sessionCatalogOpt = session.sessionState.analyzer.sessionCatalog
session.sessionState.sqlParser.parseMultipartIdentifier(tableName) match { session.sessionState.sqlParser.parseMultipartIdentifier(tableName) match {
@ -849,14 +831,12 @@ final class DataFrameWriter[T] private[sql](ds: Dataset[T]) {
private def modeForDSV2 = mode.getOrElse(SaveMode.Append) private def modeForDSV2 = mode.getOrElse(SaveMode.Append)
private def canUseV2Source(session: SparkSession, providerClass: Class[_]): Boolean = { private def lookupV2Provider(): Option[TableProvider] = {
val useV1Sources = DataSource.lookupDataSourceV2(source, df.sparkSession.sessionState.conf) match {
session.sessionState.conf.useV1SourceWriterList.toLowerCase(Locale.ROOT).split(",") // TODO(SPARK-28396): File source v2 write path is currently broken.
val shouldUseV1Source = providerClass.newInstance() match { case Some(_: FileDataSourceV2) => None
case d: DataSourceRegister if useV1Sources.contains(d.shortName()) => true case other => other
case _ => useV1Sources.contains(providerClass.getCanonicalName.toLowerCase(Locale.ROOT))
} }
!shouldUseV1Source && classOf[TableProvider].isAssignableFrom(providerClass)
} }
/////////////////////////////////////////////////////////////////////////////////////// ///////////////////////////////////////////////////////////////////////////////////////

View file

@ -46,6 +46,7 @@ import org.apache.spark.sql.execution.streaming._
import org.apache.spark.sql.execution.streaming.sources.{RateStreamProvider, TextSocketSourceProvider} import org.apache.spark.sql.execution.streaming.sources.{RateStreamProvider, TextSocketSourceProvider}
import org.apache.spark.sql.internal.SQLConf import org.apache.spark.sql.internal.SQLConf
import org.apache.spark.sql.sources._ import org.apache.spark.sql.sources._
import org.apache.spark.sql.sources.v2.TableProvider
import org.apache.spark.sql.streaming.OutputMode import org.apache.spark.sql.streaming.OutputMode
import org.apache.spark.sql.types.{CalendarIntervalType, StructField, StructType} import org.apache.spark.sql.types.{CalendarIntervalType, StructField, StructType}
import org.apache.spark.sql.util.SchemaUtils import org.apache.spark.sql.util.SchemaUtils
@ -711,6 +712,24 @@ object DataSource extends Logging {
} }
} }
/**
* Returns an optional [[TableProvider]] instance for the given provider. It returns None if
* there is no corresponding Data Source V2 implementation, or the provider is configured to
* fallback to Data Source V1 code path.
*/
def lookupDataSourceV2(provider: String, conf: SQLConf): Option[TableProvider] = {
val useV1Sources = conf.getConf(SQLConf.USE_V1_SOURCE_LIST).toLowerCase(Locale.ROOT)
.split(",").map(_.trim)
val cls = lookupDataSource(provider, conf)
cls.newInstance() match {
case d: DataSourceRegister if useV1Sources.contains(d.shortName()) => None
case t: TableProvider
if !useV1Sources.contains(cls.getCanonicalName.toLowerCase(Locale.ROOT)) =>
Some(t)
case _ => None
}
}
/** /**
* Checks and returns files in all the paths. * Checks and returns files in all the paths.
*/ */

View file

@ -17,8 +17,6 @@
package org.apache.spark.sql.execution.datasources package org.apache.spark.sql.execution.datasources
import java.util.Locale
import scala.collection.mutable import scala.collection.mutable
import org.apache.spark.sql.{AnalysisException, SaveMode} import org.apache.spark.sql.{AnalysisException, SaveMode}
@ -31,8 +29,8 @@ import org.apache.spark.sql.catalyst.plans.logical.{CreateTableAsSelect, CreateV
import org.apache.spark.sql.catalyst.plans.logical.sql.{AlterTableAddColumnsStatement, AlterTableSetLocationStatement, AlterTableSetPropertiesStatement, AlterTableUnsetPropertiesStatement, AlterViewSetPropertiesStatement, AlterViewUnsetPropertiesStatement, CreateTableAsSelectStatement, CreateTableStatement, DeleteFromStatement, DescribeColumnStatement, DescribeTableStatement, DropTableStatement, DropViewStatement, QualifiedColType, ReplaceTableAsSelectStatement, ReplaceTableStatement, ShowTablesStatement} import org.apache.spark.sql.catalyst.plans.logical.sql.{AlterTableAddColumnsStatement, AlterTableSetLocationStatement, AlterTableSetPropertiesStatement, AlterTableUnsetPropertiesStatement, AlterViewSetPropertiesStatement, AlterViewUnsetPropertiesStatement, CreateTableAsSelectStatement, CreateTableStatement, DeleteFromStatement, DescribeColumnStatement, DescribeTableStatement, DropTableStatement, DropViewStatement, QualifiedColType, ReplaceTableAsSelectStatement, ReplaceTableStatement, ShowTablesStatement}
import org.apache.spark.sql.catalyst.rules.Rule import org.apache.spark.sql.catalyst.rules.Rule
import org.apache.spark.sql.execution.command.{AlterTableAddColumnsCommand, AlterTableSetLocationCommand, AlterTableSetPropertiesCommand, AlterTableUnsetPropertiesCommand, DescribeColumnCommand, DescribeTableCommand, DropTableCommand, ShowTablesCommand} import org.apache.spark.sql.execution.command.{AlterTableAddColumnsCommand, AlterTableSetLocationCommand, AlterTableSetPropertiesCommand, AlterTableUnsetPropertiesCommand, DescribeColumnCommand, DescribeTableCommand, DropTableCommand, ShowTablesCommand}
import org.apache.spark.sql.execution.datasources.v2.FileDataSourceV2
import org.apache.spark.sql.internal.SQLConf import org.apache.spark.sql.internal.SQLConf
import org.apache.spark.sql.sources.v2.TableProvider
import org.apache.spark.sql.types.{HIVE_TYPE_STRING, HiveStringType, MetadataBuilder, StructField, StructType} import org.apache.spark.sql.types.{HIVE_TYPE_STRING, HiveStringType, MetadataBuilder, StructField, StructType}
case class DataSourceResolution( case class DataSourceResolution(
@ -48,7 +46,7 @@ case class DataSourceResolution(
override def apply(plan: LogicalPlan): LogicalPlan = plan resolveOperators { override def apply(plan: LogicalPlan): LogicalPlan = plan resolveOperators {
case CreateTableStatement( case CreateTableStatement(
AsTableIdentifier(table), schema, partitionCols, bucketSpec, properties, AsTableIdentifier(table), schema, partitionCols, bucketSpec, properties,
V1WriteProvider(provider), options, location, comment, ifNotExists) => V1Provider(provider), options, location, comment, ifNotExists) =>
// the source is v1, the identifier has no catalog, and there is no default v2 catalog // the source is v1, the identifier has no catalog, and there is no default v2 catalog
val tableDesc = buildCatalogTable(table, schema, partitionCols, bucketSpec, properties, val tableDesc = buildCatalogTable(table, schema, partitionCols, bucketSpec, properties,
provider, options, location, comment, ifNotExists) provider, options, location, comment, ifNotExists)
@ -71,7 +69,7 @@ case class DataSourceResolution(
case CreateTableAsSelectStatement( case CreateTableAsSelectStatement(
AsTableIdentifier(table), query, partitionCols, bucketSpec, properties, AsTableIdentifier(table), query, partitionCols, bucketSpec, properties,
V1WriteProvider(provider), options, location, comment, ifNotExists) => V1Provider(provider), options, location, comment, ifNotExists) =>
// the source is v1, the identifier has no catalog, and there is no default v2 catalog // the source is v1, the identifier has no catalog, and there is no default v2 catalog
val tableDesc = buildCatalogTable(table, new StructType, partitionCols, bucketSpec, val tableDesc = buildCatalogTable(table, new StructType, partitionCols, bucketSpec,
properties, provider, options, location, comment, ifNotExists) properties, provider, options, location, comment, ifNotExists)
@ -106,14 +104,14 @@ case class DataSourceResolution(
case ReplaceTableStatement( case ReplaceTableStatement(
AsTableIdentifier(table), schema, partitionCols, bucketSpec, properties, AsTableIdentifier(table), schema, partitionCols, bucketSpec, properties,
V1WriteProvider(provider), options, location, comment, orCreate) => V1Provider(provider), options, location, comment, orCreate) =>
throw new AnalysisException( throw new AnalysisException(
s"Replacing tables is not supported using the legacy / v1 Spark external catalog" + s"Replacing tables is not supported using the legacy / v1 Spark external catalog" +
s" API. Write provider name: $provider, identifier: $table.") s" API. Write provider name: $provider, identifier: $table.")
case ReplaceTableAsSelectStatement( case ReplaceTableAsSelectStatement(
AsTableIdentifier(table), query, partitionCols, bucketSpec, properties, AsTableIdentifier(table), query, partitionCols, bucketSpec, properties,
V1WriteProvider(provider), options, location, comment, orCreate) => V1Provider(provider), options, location, comment, orCreate) =>
throw new AnalysisException( throw new AnalysisException(
s"Replacing tables is not supported using the legacy / v1 Spark external catalog" + s"Replacing tables is not supported using the legacy / v1 Spark external catalog" +
s" API. Write provider name: $provider, identifier: $table.") s" API. Write provider name: $provider, identifier: $table.")
@ -205,21 +203,13 @@ case class DataSourceResolution(
} }
} }
object V1WriteProvider { object V1Provider {
private val v1WriteOverrideSet =
conf.useV1SourceWriterList.toLowerCase(Locale.ROOT).split(",").toSet
def unapply(provider: String): Option[String] = { def unapply(provider: String): Option[String] = {
if (v1WriteOverrideSet.contains(provider.toLowerCase(Locale.ROOT))) { DataSource.lookupDataSourceV2(provider, conf) match {
Some(provider) // TODO(SPARK-28396): Currently file source v2 can't work with tables.
} else { case Some(_: FileDataSourceV2) => Some(provider)
lazy val providerClass = DataSource.lookupDataSource(provider, conf) case Some(_) => None
provider match { case _ => Some(provider)
case _ if classOf[TableProvider].isAssignableFrom(providerClass) =>
None
case _ =>
Some(provider)
}
} }
} }
} }

View file

@ -799,7 +799,7 @@ class DataFrameSuite extends QueryTest with SharedSparkSession {
test("inputFiles") { test("inputFiles") {
Seq("csv", "").foreach { useV1List => Seq("csv", "").foreach { useV1List =>
withSQLConf(SQLConf.USE_V1_SOURCE_READER_LIST.key -> useV1List) { withSQLConf(SQLConf.USE_V1_SOURCE_LIST.key -> useV1List) {
withTempDir { dir => withTempDir { dir =>
val df = Seq((1, 22)).toDF("a", "b") val df = Seq((1, 22)).toDF("a", "b")

View file

@ -347,7 +347,7 @@ class FileBasedDataSourceSuite extends QueryTest with SharedSparkSession {
msg.toLowerCase(Locale.ROOT).contains(msg2)) msg.toLowerCase(Locale.ROOT).contains(msg2))
} }
withSQLConf(SQLConf.USE_V1_SOURCE_WRITER_LIST.key -> useV1List) { withSQLConf(SQLConf.USE_V1_SOURCE_LIST.key -> useV1List) {
// write path // write path
Seq("csv", "json", "parquet", "orc").foreach { format => Seq("csv", "json", "parquet", "orc").foreach { format =>
val msg = intercept[AnalysisException] { val msg = intercept[AnalysisException] {
@ -388,8 +388,7 @@ class FileBasedDataSourceSuite extends QueryTest with SharedSparkSession {
def errorMessage(format: String): String = { def errorMessage(format: String): String = {
s"$format data source does not support null data type." s"$format data source does not support null data type."
} }
withSQLConf(SQLConf.USE_V1_SOURCE_READER_LIST.key -> useV1List, withSQLConf(SQLConf.USE_V1_SOURCE_LIST.key -> useV1List) {
SQLConf.USE_V1_SOURCE_WRITER_LIST.key -> useV1List) {
withTempDir { dir => withTempDir { dir =>
val tempDir = new File(dir, "files").getCanonicalPath val tempDir = new File(dir, "files").getCanonicalPath
@ -476,7 +475,7 @@ class FileBasedDataSourceSuite extends QueryTest with SharedSparkSession {
test("SPARK-25237 compute correct input metrics in FileScanRDD") { test("SPARK-25237 compute correct input metrics in FileScanRDD") {
// TODO: Test CSV V2 as well after it implements [[SupportsReportStatistics]]. // TODO: Test CSV V2 as well after it implements [[SupportsReportStatistics]].
withSQLConf(SQLConf.USE_V1_SOURCE_READER_LIST.key -> "csv") { withSQLConf(SQLConf.USE_V1_SOURCE_LIST.key -> "csv") {
withTempPath { p => withTempPath { p =>
val path = p.getAbsolutePath val path = p.getAbsolutePath
spark.range(1000).repartition(1).write.csv(path) spark.range(1000).repartition(1).write.csv(path)
@ -500,7 +499,7 @@ class FileBasedDataSourceSuite extends QueryTest with SharedSparkSession {
test("Do not use cache on overwrite") { test("Do not use cache on overwrite") {
Seq("", "orc").foreach { useV1SourceReaderList => Seq("", "orc").foreach { useV1SourceReaderList =>
withSQLConf(SQLConf.USE_V1_SOURCE_READER_LIST.key -> useV1SourceReaderList) { withSQLConf(SQLConf.USE_V1_SOURCE_LIST.key -> useV1SourceReaderList) {
withTempDir { dir => withTempDir { dir =>
val path = dir.toString val path = dir.toString
spark.range(1000).write.mode("overwrite").orc(path) spark.range(1000).write.mode("overwrite").orc(path)
@ -516,7 +515,7 @@ class FileBasedDataSourceSuite extends QueryTest with SharedSparkSession {
test("Do not use cache on append") { test("Do not use cache on append") {
Seq("", "orc").foreach { useV1SourceReaderList => Seq("", "orc").foreach { useV1SourceReaderList =>
withSQLConf(SQLConf.USE_V1_SOURCE_READER_LIST.key -> useV1SourceReaderList) { withSQLConf(SQLConf.USE_V1_SOURCE_LIST.key -> useV1SourceReaderList) {
withTempDir { dir => withTempDir { dir =>
val path = dir.toString val path = dir.toString
spark.range(1000).write.mode("append").orc(path) spark.range(1000).write.mode("append").orc(path)
@ -532,7 +531,7 @@ class FileBasedDataSourceSuite extends QueryTest with SharedSparkSession {
test("UDF input_file_name()") { test("UDF input_file_name()") {
Seq("", "orc").foreach { useV1SourceReaderList => Seq("", "orc").foreach { useV1SourceReaderList =>
withSQLConf(SQLConf.USE_V1_SOURCE_READER_LIST.key -> useV1SourceReaderList) { withSQLConf(SQLConf.USE_V1_SOURCE_LIST.key -> useV1SourceReaderList) {
withTempPath { dir => withTempPath { dir =>
val path = dir.getCanonicalPath val path = dir.getCanonicalPath
spark.range(10).write.orc(path) spark.range(10).write.orc(path)
@ -660,7 +659,7 @@ class FileBasedDataSourceSuite extends QueryTest with SharedSparkSession {
test("sizeInBytes should be the total size of all files") { test("sizeInBytes should be the total size of all files") {
Seq("orc", "").foreach { useV1SourceReaderList => Seq("orc", "").foreach { useV1SourceReaderList =>
withSQLConf(SQLConf.USE_V1_SOURCE_READER_LIST.key -> useV1SourceReaderList) { withSQLConf(SQLConf.USE_V1_SOURCE_LIST.key -> useV1SourceReaderList) {
withTempDir { dir => withTempDir { dir =>
dir.delete() dir.delete()
spark.range(1000).write.orc(dir.toString) spark.range(1000).write.orc(dir.toString)
@ -711,7 +710,7 @@ class FileBasedDataSourceSuite extends QueryTest with SharedSparkSession {
} }
test("File table location should include both values of option `path` and `paths`") { test("File table location should include both values of option `path` and `paths`") {
withSQLConf(SQLConf.USE_V1_SOURCE_READER_LIST.key -> "") { withSQLConf(SQLConf.USE_V1_SOURCE_LIST.key -> "") {
withTempPaths(3) { paths => withTempPaths(3) { paths =>
paths.zipWithIndex.foreach { case (path, index) => paths.zipWithIndex.foreach { case (path, index) =>
Seq(index).toDF("a").write.mode("overwrite").parquet(path.getCanonicalPath) Seq(index).toDF("a").write.mode("overwrite").parquet(path.getCanonicalPath)

View file

@ -65,7 +65,7 @@ class MetadataCacheV1Suite extends MetadataCacheSuite {
override protected def sparkConf: SparkConf = override protected def sparkConf: SparkConf =
super super
.sparkConf .sparkConf
.set(SQLConf.USE_V1_SOURCE_READER_LIST, "orc") .set(SQLConf.USE_V1_SOURCE_LIST, "orc")
test("SPARK-16337 temporary view refresh") { test("SPARK-16337 temporary view refresh") {
withTempView("view_refresh") { withTempPath { (location: File) => withTempView("view_refresh") { withTempPath { (location: File) =>
@ -123,5 +123,5 @@ class MetadataCacheV2Suite extends MetadataCacheSuite {
override protected def sparkConf: SparkConf = override protected def sparkConf: SparkConf =
super super
.sparkConf .sparkConf
.set(SQLConf.USE_V1_SOURCE_READER_LIST, "") .set(SQLConf.USE_V1_SOURCE_LIST, "")
} }

View file

@ -2999,7 +2999,7 @@ class SQLQuerySuite extends QueryTest with SharedSparkSession {
} }
Seq("orc", "parquet").foreach { format => Seq("orc", "parquet").foreach { format =>
withSQLConf(SQLConf.USE_V1_SOURCE_READER_LIST.key -> "") { withSQLConf(SQLConf.USE_V1_SOURCE_LIST.key -> "") {
withTempPath { dir => withTempPath { dir =>
spark.range(10).map(i => (i, i.toString)).toDF("id", "s") spark.range(10).map(i => (i, i.toString)).toDF("id", "s")
.write .write
@ -3024,7 +3024,7 @@ class SQLQuerySuite extends QueryTest with SharedSparkSession {
} }
test("SPARK-26709: OptimizeMetadataOnlyQuery does not handle empty records correctly") { test("SPARK-26709: OptimizeMetadataOnlyQuery does not handle empty records correctly") {
withSQLConf(SQLConf.USE_V1_SOURCE_READER_LIST.key -> "parquet") { withSQLConf(SQLConf.USE_V1_SOURCE_LIST.key -> "parquet") {
Seq(true, false).foreach { enableOptimizeMetadataOnlyQuery => Seq(true, false).foreach { enableOptimizeMetadataOnlyQuery =>
withSQLConf(SQLConf.OPTIMIZER_METADATA_ONLY.key -> withSQLConf(SQLConf.OPTIMIZER_METADATA_ONLY.key ->
enableOptimizeMetadataOnlyQuery.toString) { enableOptimizeMetadataOnlyQuery.toString) {

View file

@ -69,7 +69,7 @@ abstract class DataSourceScanRedactionTest extends QueryTest with SharedSparkSes
*/ */
class DataSourceScanExecRedactionSuite extends DataSourceScanRedactionTest { class DataSourceScanExecRedactionSuite extends DataSourceScanRedactionTest {
override protected def sparkConf: SparkConf = super.sparkConf override protected def sparkConf: SparkConf = super.sparkConf
.set(SQLConf.USE_V1_SOURCE_READER_LIST.key, "orc") .set(SQLConf.USE_V1_SOURCE_LIST.key, "orc")
override protected def getRootPath(df: DataFrame): Path = override protected def getRootPath(df: DataFrame): Path =
df.queryExecution.sparkPlan.find(_.isInstanceOf[FileSourceScanExec]).get df.queryExecution.sparkPlan.find(_.isInstanceOf[FileSourceScanExec]).get
@ -121,7 +121,7 @@ class DataSourceScanExecRedactionSuite extends DataSourceScanRedactionTest {
class DataSourceV2ScanExecRedactionSuite extends DataSourceScanRedactionTest { class DataSourceV2ScanExecRedactionSuite extends DataSourceScanRedactionTest {
override protected def sparkConf: SparkConf = super.sparkConf override protected def sparkConf: SparkConf = super.sparkConf
.set(SQLConf.USE_V1_SOURCE_READER_LIST.key, "") .set(SQLConf.USE_V1_SOURCE_LIST.key, "")
override protected def getRootPath(df: DataFrame): Path = override protected def getRootPath(df: DataFrame): Path =
df.queryExecution.sparkPlan.find(_.isInstanceOf[BatchScanExec]).get df.queryExecution.sparkPlan.find(_.isInstanceOf[BatchScanExec]).get

View file

@ -133,7 +133,7 @@ class OptimizeMetadataOnlyQuerySuite extends QueryTest with SharedSparkSession {
// This test case is only for file source V1. As the rule OptimizeMetadataOnlyQuery is disabled // This test case is only for file source V1. As the rule OptimizeMetadataOnlyQuery is disabled
// by default, we can skip testing file source v2 in current stage. // by default, we can skip testing file source v2 in current stage.
withSQLConf(OPTIMIZER_METADATA_ONLY.key -> "true", withSQLConf(OPTIMIZER_METADATA_ONLY.key -> "true",
SQLConf.USE_V1_SOURCE_READER_LIST.key -> "json") { SQLConf.USE_V1_SOURCE_LIST.key -> "json") {
withTempPath { path => withTempPath { path =>
val tablePath = new File(s"${path.getCanonicalPath}/cOl3=c/cOl1=a/cOl5=e") val tablePath = new File(s"${path.getCanonicalPath}/cOl3=c/cOl1=a/cOl5=e")
Seq(("a", "b", "c", "d", "e")).toDF("cOl1", "cOl2", "cOl3", "cOl4", "cOl5") Seq(("a", "b", "c", "d", "e")).toDF("cOl1", "cOl2", "cOl3", "cOl4", "cOl5")

View file

@ -172,7 +172,7 @@ class PlannerSuite extends SharedSparkSession {
} }
test("SPARK-11390 explain should print PushedFilters of PhysicalRDD") { test("SPARK-11390 explain should print PushedFilters of PhysicalRDD") {
withSQLConf(SQLConf.USE_V1_SOURCE_READER_LIST.key -> "parquet") { withSQLConf(SQLConf.USE_V1_SOURCE_LIST.key -> "parquet") {
withTempPath { file => withTempPath { file =>
val path = file.getCanonicalPath val path = file.getCanonicalPath
testData.write.parquet(path) testData.write.parquet(path)

View file

@ -33,7 +33,7 @@ class SameResultSuite extends QueryTest with SharedSparkSession {
import testImplicits._ import testImplicits._
test("FileSourceScanExec: different orders of data filters and partition filters") { test("FileSourceScanExec: different orders of data filters and partition filters") {
withSQLConf(SQLConf.USE_V1_SOURCE_READER_LIST.key -> "parquet") { withSQLConf(SQLConf.USE_V1_SOURCE_LIST.key -> "parquet") {
withTempPath { path => withTempPath { path =>
val tmpDir = path.getCanonicalPath val tmpDir = path.getCanonicalPath
spark.range(10) spark.range(10)
@ -52,7 +52,7 @@ class SameResultSuite extends QueryTest with SharedSparkSession {
} }
test("FileScan: different orders of data filters and partition filters") { test("FileScan: different orders of data filters and partition filters") {
withSQLConf(SQLConf.USE_V1_SOURCE_READER_LIST.key -> "") { withSQLConf(SQLConf.USE_V1_SOURCE_LIST.key -> "") {
Seq("orc", "json", "csv", "parquet").foreach { format => Seq("orc", "json", "csv", "parquet").foreach { format =>
withTempPath { path => withTempPath { path =>
val tmpDir = path.getCanonicalPath val tmpDir = path.getCanonicalPath
@ -85,7 +85,7 @@ class SameResultSuite extends QueryTest with SharedSparkSession {
} }
test("TextScan") { test("TextScan") {
withSQLConf(SQLConf.USE_V1_SOURCE_READER_LIST.key -> "") { withSQLConf(SQLConf.USE_V1_SOURCE_LIST.key -> "") {
withTempPath { path => withTempPath { path =>
val tmpDir = path.getCanonicalPath val tmpDir = path.getCanonicalPath
spark.range(10) spark.range(10)

View file

@ -37,7 +37,7 @@ class SparkPlanSuite extends QueryTest with SharedSparkSession {
} }
test("SPARK-23731 plans should be canonicalizable after being (de)serialized") { test("SPARK-23731 plans should be canonicalizable after being (de)serialized") {
withSQLConf(SQLConf.USE_V1_SOURCE_READER_LIST.key -> "parquet") { withSQLConf(SQLConf.USE_V1_SOURCE_LIST.key -> "parquet") {
withTempPath { path => withTempPath { path =>
spark.range(1).write.parquet(path.getAbsolutePath) spark.range(1).write.parquet(path.getAbsolutePath)
val df = spark.read.parquet(path.getAbsolutePath) val df = spark.read.parquet(path.getAbsolutePath)
@ -56,7 +56,7 @@ class SparkPlanSuite extends QueryTest with SharedSparkSession {
} }
test("SPARK-27418 BatchScanExec should be canonicalizable after being (de)serialized") { test("SPARK-27418 BatchScanExec should be canonicalizable after being (de)serialized") {
withSQLConf(SQLConf.USE_V1_SOURCE_READER_LIST.key -> "") { withSQLConf(SQLConf.USE_V1_SOURCE_LIST.key -> "") {
withTempPath { path => withTempPath { path =>
spark.range(1).write.parquet(path.getAbsolutePath) spark.range(1).write.parquet(path.getAbsolutePath)
val df = spark.read.parquet(path.getAbsolutePath) val df = spark.read.parquet(path.getAbsolutePath)
@ -75,7 +75,7 @@ class SparkPlanSuite extends QueryTest with SharedSparkSession {
} }
test("SPARK-25357 SparkPlanInfo of FileScan contains nonEmpty metadata") { test("SPARK-25357 SparkPlanInfo of FileScan contains nonEmpty metadata") {
withSQLConf(SQLConf.USE_V1_SOURCE_READER_LIST.key -> "parquet") { withSQLConf(SQLConf.USE_V1_SOURCE_LIST.key -> "parquet") {
withTempPath { path => withTempPath { path =>
spark.range(5).write.parquet(path.getAbsolutePath) spark.range(5).write.parquet(path.getAbsolutePath)
val f = spark.read.parquet(path.getAbsolutePath) val f = spark.read.parquet(path.getAbsolutePath)

View file

@ -36,8 +36,7 @@ object OrcNestedSchemaPruningBenchmark extends NestedSchemaPruningBenchmark {
override val benchmarkName: String = "Nested Schema Pruning Benchmark For ORC v1" override val benchmarkName: String = "Nested Schema Pruning Benchmark For ORC v1"
override def runBenchmarkSuite(mainArgs: Array[String]): Unit = { override def runBenchmarkSuite(mainArgs: Array[String]): Unit = {
withSQLConf(SQLConf.USE_V1_SOURCE_READER_LIST.key -> "orc", withSQLConf(SQLConf.USE_V1_SOURCE_LIST.key -> "orc") {
SQLConf.USE_V1_SOURCE_WRITER_LIST.key -> "orc") {
super.runBenchmarkSuite(mainArgs) super.runBenchmarkSuite(mainArgs)
} }
} }

View file

@ -507,7 +507,7 @@ class InMemoryColumnarQuerySuite extends QueryTest with SharedSparkSession {
withSQLConf( withSQLConf(
SQLConf.CBO_ENABLED.key -> "true", SQLConf.CBO_ENABLED.key -> "true",
SQLConf.DEFAULT_DATA_SOURCE_NAME.key -> "orc", SQLConf.DEFAULT_DATA_SOURCE_NAME.key -> "orc",
SQLConf.USE_V1_SOURCE_READER_LIST.key -> useV1SourceReaderList) { SQLConf.USE_V1_SOURCE_LIST.key -> useV1SourceReaderList) {
withTempPath { workDir => withTempPath { workDir =>
withTable("table1") { withTable("table1") {
val workDirPath = workDir.getAbsolutePath val workDirPath = workDir.getAbsolutePath

View file

@ -32,15 +32,15 @@ import org.apache.spark.sql.catalyst.catalog.{BucketSpec, CatalogStorageFormat,
import org.apache.spark.sql.catalyst.parser.CatalystSqlParser import org.apache.spark.sql.catalyst.parser.CatalystSqlParser
import org.apache.spark.sql.catalyst.plans.logical.{CreateTableAsSelect, CreateV2Table, DropTable, LogicalPlan} import org.apache.spark.sql.catalyst.plans.logical.{CreateTableAsSelect, CreateV2Table, DropTable, LogicalPlan}
import org.apache.spark.sql.execution.datasources.{CreateTable, DataSourceResolution} import org.apache.spark.sql.execution.datasources.{CreateTable, DataSourceResolution}
import org.apache.spark.sql.execution.datasources.v2.orc.OrcDataSourceV2
import org.apache.spark.sql.internal.SQLConf.DEFAULT_V2_CATALOG import org.apache.spark.sql.internal.SQLConf.DEFAULT_V2_CATALOG
import org.apache.spark.sql.sources.v2.InMemoryTableProvider
import org.apache.spark.sql.types.{DoubleType, IntegerType, LongType, StringType, StructType} import org.apache.spark.sql.types.{DoubleType, IntegerType, LongType, StringType, StructType}
import org.apache.spark.sql.util.CaseInsensitiveStringMap import org.apache.spark.sql.util.CaseInsensitiveStringMap
class PlanResolutionSuite extends AnalysisTest { class PlanResolutionSuite extends AnalysisTest {
import CatalystSqlParser._ import CatalystSqlParser._
private val orc2 = classOf[OrcDataSourceV2].getName private val v2Format = classOf[InMemoryTableProvider].getName
private val testCat: TableCatalog = { private val testCat: TableCatalog = {
val newCatalog = new TestTableCatalog val newCatalog = new TestTableCatalog
@ -427,7 +427,7 @@ class PlanResolutionSuite extends AnalysisTest {
| id bigint, | id bigint,
| description string, | description string,
| point struct<x: double, y: double>) | point struct<x: double, y: double>)
|USING $orc2 |USING $v2Format
|COMMENT 'This is the staging page view table' |COMMENT 'This is the staging page view table'
|LOCATION '/user/external/page_view' |LOCATION '/user/external/page_view'
|TBLPROPERTIES ('p1'='v1', 'p2'='v2') |TBLPROPERTIES ('p1'='v1', 'p2'='v2')
@ -436,7 +436,7 @@ class PlanResolutionSuite extends AnalysisTest {
val expectedProperties = Map( val expectedProperties = Map(
"p1" -> "v1", "p1" -> "v1",
"p2" -> "v2", "p2" -> "v2",
"provider" -> orc2, "provider" -> v2Format,
"location" -> "/user/external/page_view", "location" -> "/user/external/page_view",
"comment" -> "This is the staging page view table") "comment" -> "This is the staging page view table")
@ -530,7 +530,7 @@ class PlanResolutionSuite extends AnalysisTest {
val sql = val sql =
s""" s"""
|CREATE TABLE IF NOT EXISTS mydb.page_view |CREATE TABLE IF NOT EXISTS mydb.page_view
|USING $orc2 |USING $v2Format
|COMMENT 'This is the staging page view table' |COMMENT 'This is the staging page view table'
|LOCATION '/user/external/page_view' |LOCATION '/user/external/page_view'
|TBLPROPERTIES ('p1'='v1', 'p2'='v2') |TBLPROPERTIES ('p1'='v1', 'p2'='v2')
@ -540,7 +540,7 @@ class PlanResolutionSuite extends AnalysisTest {
val expectedProperties = Map( val expectedProperties = Map(
"p1" -> "v1", "p1" -> "v1",
"p2" -> "v2", "p2" -> "v2",
"provider" -> orc2, "provider" -> v2Format,
"location" -> "/user/external/page_view", "location" -> "/user/external/page_view",
"comment" -> "This is the staging page view table") "comment" -> "This is the staging page view table")

View file

@ -415,7 +415,7 @@ class FileSourceStrategySuite extends QueryTest with SharedSparkSession with Pre
test("[SPARK-16818] partition pruned file scans implement sameResult correctly") { test("[SPARK-16818] partition pruned file scans implement sameResult correctly") {
Seq("orc", "").foreach { useV1ReaderList => Seq("orc", "").foreach { useV1ReaderList =>
withSQLConf(SQLConf.USE_V1_SOURCE_READER_LIST.key -> useV1ReaderList) { withSQLConf(SQLConf.USE_V1_SOURCE_LIST.key -> useV1ReaderList) {
withTempPath { path => withTempPath { path =>
val tempDir = path.getCanonicalPath val tempDir = path.getCanonicalPath
spark.range(100) spark.range(100)

View file

@ -2063,7 +2063,7 @@ class CSVSuite extends QueryTest with SharedSparkSession with TestCsvData {
test("SPARK-27873: disabling enforceSchema should not fail columnNameOfCorruptRecord") { test("SPARK-27873: disabling enforceSchema should not fail columnNameOfCorruptRecord") {
Seq("csv", "").foreach { reader => Seq("csv", "").foreach { reader =>
withSQLConf(SQLConf.USE_V1_SOURCE_READER_LIST.key -> reader) { withSQLConf(SQLConf.USE_V1_SOURCE_LIST.key -> reader) {
withTempPath { path => withTempPath { path =>
val df = Seq(("0", "2013-111-11")).toDF("a", "b") val df = Seq(("0", "2013-111-11")).toDF("a", "b")
df.write df.write

View file

@ -256,8 +256,7 @@ class OrcV1PartitionDiscoverySuite extends OrcPartitionDiscoveryTest with Shared
override protected def sparkConf: SparkConf = override protected def sparkConf: SparkConf =
super super
.sparkConf .sparkConf
.set(SQLConf.USE_V1_SOURCE_READER_LIST, "orc") .set(SQLConf.USE_V1_SOURCE_LIST, "orc")
.set(SQLConf.USE_V1_SOURCE_WRITER_LIST, "orc")
test("read partitioned table - partition key included in orc file") { test("read partitioned table - partition key included in orc file") {
withTempDir { base => withTempDir { base =>

View file

@ -708,6 +708,5 @@ class OrcV1QuerySuite extends OrcQuerySuite {
override protected def sparkConf: SparkConf = override protected def sparkConf: SparkConf =
super super
.sparkConf .sparkConf
.set(SQLConf.USE_V1_SOURCE_READER_LIST, "orc") .set(SQLConf.USE_V1_SOURCE_LIST, "orc")
.set(SQLConf.USE_V1_SOURCE_WRITER_LIST, "orc")
} }

View file

@ -32,8 +32,7 @@ class OrcV1FilterSuite extends OrcFilterSuite {
override protected def sparkConf: SparkConf = override protected def sparkConf: SparkConf =
super super
.sparkConf .sparkConf
.set(SQLConf.USE_V1_SOURCE_READER_LIST, "orc") .set(SQLConf.USE_V1_SOURCE_LIST, "orc")
.set(SQLConf.USE_V1_SOURCE_WRITER_LIST, "orc")
override def checkFilterPredicate( override def checkFilterPredicate(
df: DataFrame, df: DataFrame,

View file

@ -29,6 +29,5 @@ class OrcV1SchemaPruningSuite extends SchemaPruningSuite {
override protected def sparkConf: SparkConf = override protected def sparkConf: SparkConf =
super super
.sparkConf .sparkConf
.set(SQLConf.USE_V1_SOURCE_READER_LIST, "orc") .set(SQLConf.USE_V1_SOURCE_LIST, "orc")
.set(SQLConf.USE_V1_SOURCE_WRITER_LIST, "orc")
} }

View file

@ -32,7 +32,7 @@ class OrcV2SchemaPruningSuite extends SchemaPruningSuite {
override protected def sparkConf: SparkConf = override protected def sparkConf: SparkConf =
super super
.sparkConf .sparkConf
.set(SQLConf.USE_V1_SOURCE_READER_LIST, "") .set(SQLConf.USE_V1_SOURCE_LIST, "")
override def checkScanSchemata(df: DataFrame, expectedSchemaCatalogStrings: String*): Unit = { override def checkScanSchemata(df: DataFrame, expectedSchemaCatalogStrings: String*): Unit = {
val fileSourceScanSchemata = val fileSourceScanSchemata =

View file

@ -1397,8 +1397,7 @@ class ParquetV1FilterSuite extends ParquetFilterSuite {
override protected def sparkConf: SparkConf = override protected def sparkConf: SparkConf =
super super
.sparkConf .sparkConf
.set(SQLConf.USE_V1_SOURCE_READER_LIST, "parquet") .set(SQLConf.USE_V1_SOURCE_LIST, "parquet")
.set(SQLConf.USE_V1_SOURCE_WRITER_LIST, "parquet")
override def checkFilterPredicate( override def checkFilterPredicate(
df: DataFrame, df: DataFrame,
@ -1458,7 +1457,7 @@ class ParquetV2FilterSuite extends ParquetFilterSuite {
override protected def sparkConf: SparkConf = override protected def sparkConf: SparkConf =
super super
.sparkConf .sparkConf
.set(SQLConf.USE_V1_SOURCE_READER_LIST, "") .set(SQLConf.USE_V1_SOURCE_LIST, "")
override def checkFilterPredicate( override def checkFilterPredicate(
df: DataFrame, df: DataFrame,

View file

@ -1047,8 +1047,7 @@ class ParquetV1PartitionDiscoverySuite extends ParquetPartitionDiscoverySuite {
override protected def sparkConf: SparkConf = override protected def sparkConf: SparkConf =
super super
.sparkConf .sparkConf
.set(SQLConf.USE_V1_SOURCE_READER_LIST, "parquet") .set(SQLConf.USE_V1_SOURCE_LIST, "parquet")
.set(SQLConf.USE_V1_SOURCE_WRITER_LIST, "parquet")
test("read partitioned table - partition key included in Parquet file") { test("read partitioned table - partition key included in Parquet file") {
withTempDir { base => withTempDir { base =>
@ -1195,7 +1194,7 @@ class ParquetV2PartitionDiscoverySuite extends ParquetPartitionDiscoverySuite {
override protected def sparkConf: SparkConf = override protected def sparkConf: SparkConf =
super super
.sparkConf .sparkConf
.set(SQLConf.USE_V1_SOURCE_READER_LIST, "") .set(SQLConf.USE_V1_SOURCE_LIST, "")
test("read partitioned table - partition key included in Parquet file") { test("read partitioned table - partition key included in Parquet file") {
withTempDir { base => withTempDir { base =>

View file

@ -911,8 +911,7 @@ class ParquetV1QuerySuite extends ParquetQuerySuite {
override protected def sparkConf: SparkConf = override protected def sparkConf: SparkConf =
super super
.sparkConf .sparkConf
.set(SQLConf.USE_V1_SOURCE_READER_LIST, "parquet") .set(SQLConf.USE_V1_SOURCE_LIST, "parquet")
.set(SQLConf.USE_V1_SOURCE_WRITER_LIST, "parquet")
test("returning batch for wide table") { test("returning batch for wide table") {
withSQLConf(SQLConf.WHOLESTAGE_MAX_NUM_FIELDS.key -> "10") { withSQLConf(SQLConf.WHOLESTAGE_MAX_NUM_FIELDS.key -> "10") {
@ -945,7 +944,7 @@ class ParquetV2QuerySuite extends ParquetQuerySuite {
override protected def sparkConf: SparkConf = override protected def sparkConf: SparkConf =
super super
.sparkConf .sparkConf
.set(SQLConf.USE_V1_SOURCE_READER_LIST, "") .set(SQLConf.USE_V1_SOURCE_LIST, "")
test("returning batch for wide table") { test("returning batch for wide table") {
withSQLConf(SQLConf.WHOLESTAGE_MAX_NUM_FIELDS.key -> "10") { withSQLConf(SQLConf.WHOLESTAGE_MAX_NUM_FIELDS.key -> "10") {

View file

@ -36,8 +36,7 @@ class ParquetV1SchemaPruningSuite extends ParquetSchemaPruningSuite {
override protected def sparkConf: SparkConf = override protected def sparkConf: SparkConf =
super super
.sparkConf .sparkConf
.set(SQLConf.USE_V1_SOURCE_READER_LIST, "parquet") .set(SQLConf.USE_V1_SOURCE_LIST, "parquet")
.set(SQLConf.USE_V1_SOURCE_WRITER_LIST, "parquet")
} }
class ParquetV2SchemaPruningSuite extends ParquetSchemaPruningSuite { class ParquetV2SchemaPruningSuite extends ParquetSchemaPruningSuite {
@ -45,7 +44,7 @@ class ParquetV2SchemaPruningSuite extends ParquetSchemaPruningSuite {
override protected def sparkConf: SparkConf = override protected def sparkConf: SparkConf =
super super
.sparkConf .sparkConf
.set(SQLConf.USE_V1_SOURCE_READER_LIST, "") .set(SQLConf.USE_V1_SOURCE_LIST, "")
override def checkScanSchemata(df: DataFrame, expectedSchemaCatalogStrings: String*): Unit = { override def checkScanSchemata(df: DataFrame, expectedSchemaCatalogStrings: String*): Unit = {
val fileSourceScanSchemata = val fileSourceScanSchemata =

View file

@ -437,7 +437,7 @@ class SQLMetricsSuite extends SharedSparkSession with SQLMetricsTestUtils {
assert(res2 === (150L, 0L, 150L) :: (0L, 150L, 10L) :: Nil) assert(res2 === (150L, 0L, 150L) :: (0L, 150L, 10L) :: Nil)
// TODO: test file source V2 as well when its statistics is correctly computed. // TODO: test file source V2 as well when its statistics is correctly computed.
withSQLConf(SQLConf.USE_V1_SOURCE_READER_LIST.key -> "parquet") { withSQLConf(SQLConf.USE_V1_SOURCE_LIST.key -> "parquet") {
withTempDir { tempDir => withTempDir { tempDir =>
val dir = new File(tempDir, "pqS").getCanonicalPath val dir = new File(tempDir, "pqS").getCanonicalPath

View file

@ -92,7 +92,7 @@ class ExtractPythonUDFsSuite extends SparkPlanTest with SharedSparkSession {
} }
test("Python UDF should not break column pruning/filter pushdown -- Parquet V1") { test("Python UDF should not break column pruning/filter pushdown -- Parquet V1") {
withSQLConf(SQLConf.USE_V1_SOURCE_READER_LIST.key -> "parquet") { withSQLConf(SQLConf.USE_V1_SOURCE_LIST.key -> "parquet") {
withTempPath { f => withTempPath { f =>
spark.range(10).select($"id".as("a"), $"id".as("b")) spark.range(10).select($"id".as("a"), $"id".as("b"))
.write.parquet(f.getCanonicalPath) .write.parquet(f.getCanonicalPath)
@ -129,7 +129,7 @@ class ExtractPythonUDFsSuite extends SparkPlanTest with SharedSparkSession {
} }
test("Python UDF should not break column pruning/filter pushdown -- Parquet V2") { test("Python UDF should not break column pruning/filter pushdown -- Parquet V2") {
withSQLConf(SQLConf.USE_V1_SOURCE_READER_LIST.key -> "") { withSQLConf(SQLConf.USE_V1_SOURCE_LIST.key -> "") {
withTempPath { f => withTempPath { f =>
spark.range(10).select($"id".as("a"), $"id".as("b")) spark.range(10).select($"id".as("a"), $"id".as("b"))
.write.parquet(f.getCanonicalPath) .write.parquet(f.getCanonicalPath)

View file

@ -547,7 +547,7 @@ class InsertSuite extends DataSourceTest with SharedSparkSession {
test("Throw exception on unsafe cast with strict casting policy") { test("Throw exception on unsafe cast with strict casting policy") {
withSQLConf( withSQLConf(
SQLConf.USE_V1_SOURCE_WRITER_LIST.key -> "parquet", SQLConf.USE_V1_SOURCE_LIST.key -> "parquet",
SQLConf.STORE_ASSIGNMENT_POLICY.key -> SQLConf.StoreAssignmentPolicy.STRICT.toString) { SQLConf.STORE_ASSIGNMENT_POLICY.key -> SQLConf.StoreAssignmentPolicy.STRICT.toString) {
withTable("t") { withTable("t") {
sql("create table t(i int, d double) using parquet") sql("create table t(i int, d double) using parquet")

View file

@ -24,7 +24,6 @@ import org.apache.spark.sql._
import org.apache.spark.sql.catalog.v2.{CatalogPlugin, Identifier, TableCatalog} import org.apache.spark.sql.catalog.v2.{CatalogPlugin, Identifier, TableCatalog}
import org.apache.spark.sql.catalyst.analysis.{CannotReplaceMissingTableException, NoSuchDatabaseException, NoSuchTableException, TableAlreadyExistsException} import org.apache.spark.sql.catalyst.analysis.{CannotReplaceMissingTableException, NoSuchDatabaseException, NoSuchTableException, TableAlreadyExistsException}
import org.apache.spark.sql.execution.datasources.v2.V2SessionCatalog import org.apache.spark.sql.execution.datasources.v2.V2SessionCatalog
import org.apache.spark.sql.execution.datasources.v2.orc.OrcDataSourceV2
import org.apache.spark.sql.internal.SQLConf import org.apache.spark.sql.internal.SQLConf
import org.apache.spark.sql.internal.SQLConf.V2_SESSION_CATALOG import org.apache.spark.sql.internal.SQLConf.V2_SESSION_CATALOG
import org.apache.spark.sql.sources.v2.internal.UnresolvedTable import org.apache.spark.sql.sources.v2.internal.UnresolvedTable
@ -36,7 +35,6 @@ class DataSourceV2SQLSuite
import org.apache.spark.sql.catalog.v2.CatalogV2Implicits._ import org.apache.spark.sql.catalog.v2.CatalogV2Implicits._
private val orc2 = classOf[OrcDataSourceV2].getName
private val v2Source = classOf[FakeV2Provider].getName private val v2Source = classOf[FakeV2Provider].getName
override protected val v2Format = v2Source override protected val v2Format = v2Source
override protected val catalogAndNamespace = "testcat.ns1.ns2." override protected val catalogAndNamespace = "testcat.ns1.ns2."
@ -63,7 +61,7 @@ class DataSourceV2SQLSuite
spark.conf.set( spark.conf.set(
"spark.sql.catalog.testcat_atomic", classOf[TestStagingInMemoryCatalog].getName) "spark.sql.catalog.testcat_atomic", classOf[TestStagingInMemoryCatalog].getName)
spark.conf.set("spark.sql.catalog.testcat2", classOf[TestInMemoryTableCatalog].getName) spark.conf.set("spark.sql.catalog.testcat2", classOf[TestInMemoryTableCatalog].getName)
spark.conf.set(V2_SESSION_CATALOG.key, classOf[TestInMemoryTableCatalog].getName) spark.conf.set(V2_SESSION_CATALOG.key, classOf[InMemoryTableSessionCatalog].getName)
val df = spark.createDataFrame(Seq((1L, "a"), (2L, "b"), (3L, "c"))).toDF("id", "data") val df = spark.createDataFrame(Seq((1L, "a"), (2L, "b"), (3L, "c"))).toDF("id", "data")
df.createOrReplaceTempView("source") df.createOrReplaceTempView("source")
@ -143,14 +141,14 @@ class DataSourceV2SQLSuite
} }
test("CreateTable: use v2 plan and session catalog when provider is v2") { test("CreateTable: use v2 plan and session catalog when provider is v2") {
spark.sql(s"CREATE TABLE table_name (id bigint, data string) USING $orc2") spark.sql(s"CREATE TABLE table_name (id bigint, data string) USING $v2Source")
val testCatalog = catalog("session").asTableCatalog val testCatalog = catalog("session").asTableCatalog
val table = testCatalog.loadTable(Identifier.of(Array(), "table_name")) val table = testCatalog.loadTable(Identifier.of(Array(), "table_name"))
assert(table.name == "session.table_name") assert(table.name == "default.table_name")
assert(table.partitioning.isEmpty) assert(table.partitioning.isEmpty)
assert(table.properties == Map("provider" -> orc2).asJava) assert(table.properties == Map("provider" -> v2Source).asJava)
assert(table.schema == new StructType().add("id", LongType).add("data", StringType)) assert(table.schema == new StructType().add("id", LongType).add("data", StringType))
val rdd = spark.sparkContext.parallelize(table.asInstanceOf[InMemoryTable].rows) val rdd = spark.sparkContext.parallelize(table.asInstanceOf[InMemoryTable].rows)
@ -341,7 +339,7 @@ class DataSourceV2SQLSuite
} }
test("ReplaceTable: Erases the table contents and changes the metadata.") { test("ReplaceTable: Erases the table contents and changes the metadata.") {
spark.sql(s"CREATE TABLE testcat.table_name USING $orc2 AS SELECT id, data FROM source") spark.sql(s"CREATE TABLE testcat.table_name USING $v2Source AS SELECT id, data FROM source")
val testCatalog = catalog("testcat").asTableCatalog val testCatalog = catalog("testcat").asTableCatalog
val table = testCatalog.loadTable(Identifier.of(Array(), "table_name")) val table = testCatalog.loadTable(Identifier.of(Array(), "table_name"))
@ -360,9 +358,16 @@ class DataSourceV2SQLSuite
test("ReplaceTableAsSelect: CREATE OR REPLACE new table has same behavior as CTAS.") { test("ReplaceTableAsSelect: CREATE OR REPLACE new table has same behavior as CTAS.") {
Seq("testcat", "testcat_atomic").foreach { catalogName => Seq("testcat", "testcat_atomic").foreach { catalogName =>
spark.sql(s"CREATE TABLE $catalogName.created USING $orc2 AS SELECT id, data FROM source")
spark.sql( spark.sql(
s"CREATE OR REPLACE TABLE $catalogName.replaced USING $orc2 AS SELECT id, data FROM source") s"""
|CREATE TABLE $catalogName.created USING $v2Source
|AS SELECT id, data FROM source
""".stripMargin)
spark.sql(
s"""
|CREATE OR REPLACE TABLE $catalogName.replaced USING $v2Source
|AS SELECT id, data FROM source
""".stripMargin)
val testCatalog = catalog(catalogName).asTableCatalog val testCatalog = catalog(catalogName).asTableCatalog
val createdTable = testCatalog.loadTable(Identifier.of(Array(), "created")) val createdTable = testCatalog.loadTable(Identifier.of(Array(), "created"))
@ -376,33 +381,33 @@ class DataSourceV2SQLSuite
test("ReplaceTableAsSelect: REPLACE TABLE throws exception if table does not exist.") { test("ReplaceTableAsSelect: REPLACE TABLE throws exception if table does not exist.") {
Seq("testcat", "testcat_atomic").foreach { catalog => Seq("testcat", "testcat_atomic").foreach { catalog =>
spark.sql(s"CREATE TABLE $catalog.created USING $orc2 AS SELECT id, data FROM source") spark.sql(s"CREATE TABLE $catalog.created USING $v2Source AS SELECT id, data FROM source")
intercept[CannotReplaceMissingTableException] { intercept[CannotReplaceMissingTableException] {
spark.sql(s"REPLACE TABLE $catalog.replaced USING $orc2 AS SELECT id, data FROM source") spark.sql(s"REPLACE TABLE $catalog.replaced USING $v2Source AS SELECT id, data FROM source")
} }
} }
} }
test("ReplaceTableAsSelect: REPLACE TABLE throws exception if table is dropped before commit.") { test("ReplaceTableAsSelect: REPLACE TABLE throws exception if table is dropped before commit.") {
import TestInMemoryTableCatalog._ import TestInMemoryTableCatalog._
spark.sql(s"CREATE TABLE testcat_atomic.created USING $orc2 AS SELECT id, data FROM source") spark.sql(s"CREATE TABLE testcat_atomic.created USING $v2Source AS SELECT id, data FROM source")
intercept[CannotReplaceMissingTableException] { intercept[CannotReplaceMissingTableException] {
spark.sql(s"REPLACE TABLE testcat_atomic.replaced" + spark.sql(s"REPLACE TABLE testcat_atomic.replaced" +
s" USING $orc2" + s" USING $v2Source" +
s" TBLPROPERTIES (`$SIMULATE_DROP_BEFORE_REPLACE_PROPERTY`=true)" + s" TBLPROPERTIES (`$SIMULATE_DROP_BEFORE_REPLACE_PROPERTY`=true)" +
s" AS SELECT id, data FROM source") s" AS SELECT id, data FROM source")
} }
} }
test("CreateTableAsSelect: use v2 plan and session catalog when provider is v2") { test("CreateTableAsSelect: use v2 plan and session catalog when provider is v2") {
spark.sql(s"CREATE TABLE table_name USING $orc2 AS SELECT id, data FROM source") spark.sql(s"CREATE TABLE table_name USING $v2Source AS SELECT id, data FROM source")
val testCatalog = catalog("session").asTableCatalog val testCatalog = catalog("session").asTableCatalog
val table = testCatalog.loadTable(Identifier.of(Array(), "table_name")) val table = testCatalog.loadTable(Identifier.of(Array(), "table_name"))
assert(table.name == "session.table_name") assert(table.name == "default.table_name")
assert(table.partitioning.isEmpty) assert(table.partitioning.isEmpty)
assert(table.properties == Map("provider" -> orc2).asJava) assert(table.properties == Map("provider" -> v2Source).asJava)
assert(table.schema == new StructType() assert(table.schema == new StructType()
.add("id", LongType) .add("id", LongType)
.add("data", StringType)) .add("data", StringType))

View file

@ -82,28 +82,28 @@ class DummyWriteOnlyFileTable extends Table with SupportsWrite {
class FileDataSourceV2FallBackSuite extends QueryTest with SharedSparkSession { class FileDataSourceV2FallBackSuite extends QueryTest with SharedSparkSession {
private val dummyParquetReaderV2 = classOf[DummyReadOnlyFileDataSourceV2].getName private val dummyReadOnlyFileSourceV2 = classOf[DummyReadOnlyFileDataSourceV2].getName
private val dummyParquetWriterV2 = classOf[DummyWriteOnlyFileDataSourceV2].getName private val dummyWriteOnlyFileSourceV2 = classOf[DummyWriteOnlyFileDataSourceV2].getName
test("Fall back to v1 when writing to file with read only FileDataSourceV2") { test("Fall back to v1 when writing to file with read only FileDataSourceV2") {
val df = spark.range(10).toDF() val df = spark.range(10).toDF()
withTempPath { file => withTempPath { file =>
val path = file.getCanonicalPath val path = file.getCanonicalPath
// Writing file should fall back to v1 and succeed. // Writing file should fall back to v1 and succeed.
df.write.format(dummyParquetReaderV2).save(path) df.write.format(dummyReadOnlyFileSourceV2).save(path)
// Validate write result with [[ParquetFileFormat]]. // Validate write result with [[ParquetFileFormat]].
checkAnswer(spark.read.parquet(path), df) checkAnswer(spark.read.parquet(path), df)
// Dummy File reader should fail as expected. // Dummy File reader should fail as expected.
val exception = intercept[AnalysisException] { val exception = intercept[AnalysisException] {
spark.read.format(dummyParquetReaderV2).load(path).collect() spark.read.format(dummyReadOnlyFileSourceV2).load(path).collect()
} }
assert(exception.message.equals("Dummy file reader")) assert(exception.message.equals("Dummy file reader"))
} }
} }
test("Fall back read path to v1 with configuration USE_V1_SOURCE_READER_LIST") { test("Fall back read path to v1 with configuration USE_V1_SOURCE_LIST") {
val df = spark.range(10).toDF() val df = spark.range(10).toDF()
withTempPath { file => withTempPath { file =>
val path = file.getCanonicalPath val path = file.getCanonicalPath
@ -111,19 +111,19 @@ class FileDataSourceV2FallBackSuite extends QueryTest with SharedSparkSession {
Seq( Seq(
"foo,parquet,bar", "foo,parquet,bar",
"ParQuet,bar,foo", "ParQuet,bar,foo",
s"foobar,$dummyParquetReaderV2" s"foobar,$dummyReadOnlyFileSourceV2"
).foreach { fallbackReaders => ).foreach { fallbackReaders =>
withSQLConf(SQLConf.USE_V1_SOURCE_READER_LIST.key -> fallbackReaders) { withSQLConf(SQLConf.USE_V1_SOURCE_LIST.key -> fallbackReaders) {
// Reading file should fall back to v1 and succeed. // Reading file should fall back to v1 and succeed.
checkAnswer(spark.read.format(dummyParquetReaderV2).load(path), df) checkAnswer(spark.read.format(dummyReadOnlyFileSourceV2).load(path), df)
checkAnswer(sql(s"SELECT * FROM parquet.`$path`"), df) checkAnswer(sql(s"SELECT * FROM parquet.`$path`"), df)
} }
} }
withSQLConf(SQLConf.USE_V1_SOURCE_READER_LIST.key -> "foo,bar") { withSQLConf(SQLConf.USE_V1_SOURCE_LIST.key -> "foo,bar") {
// Dummy File reader should fail as DISABLED_V2_FILE_DATA_SOURCE_READERS doesn't include it. // Dummy File reader should fail as DISABLED_V2_FILE_DATA_SOURCE_READERS doesn't include it.
val exception = intercept[AnalysisException] { val exception = intercept[AnalysisException] {
spark.read.format(dummyParquetReaderV2).load(path).collect() spark.read.format(dummyReadOnlyFileSourceV2).load(path).collect()
} }
assert(exception.message.equals("Dummy file reader")) assert(exception.message.equals("Dummy file reader"))
} }
@ -134,51 +134,24 @@ class FileDataSourceV2FallBackSuite extends QueryTest with SharedSparkSession {
val df = spark.range(10).toDF() val df = spark.range(10).toDF()
withTempPath { file => withTempPath { file =>
val path = file.getCanonicalPath val path = file.getCanonicalPath
// Dummy File writer should fail as expected.
withSQLConf(SQLConf.USE_V1_SOURCE_WRITER_LIST.key -> "") {
val exception = intercept[AnalysisException] {
df.write.format(dummyParquetWriterV2).save(path)
}
assert(exception.message.equals("Dummy file writer"))
}
df.write.parquet(path) df.write.parquet(path)
// Fallback reads to V1 // Fallback reads to V1
checkAnswer(spark.read.format(dummyParquetWriterV2).load(path), df) checkAnswer(spark.read.format(dummyWriteOnlyFileSourceV2).load(path), df)
} }
} }
test("Fall back write path to v1 with configuration USE_V1_SOURCE_WRITER_LIST") { test("Always fall back write path to v1") {
val df = spark.range(10).toDF() val df = spark.range(10).toDF()
Seq( withTempPath { path =>
"foo,parquet,bar", // Writes should fall back to v1 and succeed.
"ParQuet,bar,foo", df.write.format(dummyWriteOnlyFileSourceV2).save(path.getCanonicalPath)
s"foobar,$dummyParquetWriterV2" checkAnswer(spark.read.parquet(path.getCanonicalPath), df)
).foreach { fallbackWriters =>
withSQLConf(SQLConf.USE_V1_SOURCE_WRITER_LIST.key -> fallbackWriters) {
withTempPath { file =>
val path = file.getCanonicalPath
// Writes should fall back to v1 and succeed.
df.write.format(dummyParquetWriterV2).save(path)
checkAnswer(spark.read.parquet(path), df)
}
}
}
withSQLConf(SQLConf.USE_V1_SOURCE_WRITER_LIST.key -> "foo,bar") {
withTempPath { file =>
val path = file.getCanonicalPath
// Dummy File reader should fail as USE_V1_SOURCE_READER_LIST doesn't include it.
val exception = intercept[AnalysisException] {
df.write.format(dummyParquetWriterV2).save(path)
}
assert(exception.message.equals("Dummy file writer"))
}
} }
} }
test("Fallback Parquet V2 to V1") { test("Fallback Parquet V2 to V1") {
Seq("parquet", classOf[ParquetDataSourceV2].getCanonicalName).foreach { format => Seq("parquet", classOf[ParquetDataSourceV2].getCanonicalName).foreach { format =>
withSQLConf(SQLConf.USE_V1_SOURCE_READER_LIST.key -> format, withSQLConf(SQLConf.USE_V1_SOURCE_LIST.key -> format) {
SQLConf.USE_V1_SOURCE_WRITER_LIST.key -> format) {
val commands = ArrayBuffer.empty[(String, LogicalPlan)] val commands = ArrayBuffer.empty[(String, LogicalPlan)]
val errors = ArrayBuffer.empty[(String, Throwable)] val errors = ArrayBuffer.empty[(String, Throwable)]
val listener = new QueryExecutionListener { val listener = new QueryExecutionListener {

View file

@ -479,8 +479,7 @@ class FileStreamSinkV1Suite extends FileStreamSinkSuite {
override protected def sparkConf: SparkConf = override protected def sparkConf: SparkConf =
super super
.sparkConf .sparkConf
.set(SQLConf.USE_V1_SOURCE_READER_LIST, "csv,json,orc,text,parquet") .set(SQLConf.USE_V1_SOURCE_LIST, "csv,json,orc,text,parquet")
.set(SQLConf.USE_V1_SOURCE_WRITER_LIST, "csv,json,orc,text,parquet")
override def checkQueryExecution(df: DataFrame): Unit = { override def checkQueryExecution(df: DataFrame): Unit = {
// Verify that MetadataLogFileIndex is being used and the correct partitioning schema has // Verify that MetadataLogFileIndex is being used and the correct partitioning schema has
@ -530,8 +529,7 @@ class FileStreamSinkV2Suite extends FileStreamSinkSuite {
override protected def sparkConf: SparkConf = override protected def sparkConf: SparkConf =
super super
.sparkConf .sparkConf
.set(SQLConf.USE_V1_SOURCE_READER_LIST, "") .set(SQLConf.USE_V1_SOURCE_LIST, "")
.set(SQLConf.USE_V1_SOURCE_WRITER_LIST, "")
override def checkQueryExecution(df: DataFrame): Unit = { override def checkQueryExecution(df: DataFrame): Unit = {
// Verify that MetadataLogFileIndex is being used and the correct partitioning schema has // Verify that MetadataLogFileIndex is being used and the correct partitioning schema has

View file

@ -219,8 +219,8 @@ class StreamSuite extends StreamTest {
} }
val df = spark.readStream.format(classOf[FakeDefaultSource].getName).load() val df = spark.readStream.format(classOf[FakeDefaultSource].getName).load()
Seq("", "parquet").foreach { useV1SourceReader => Seq("", "parquet").foreach { useV1Source =>
withSQLConf(SQLConf.USE_V1_SOURCE_READER_LIST.key -> useV1SourceReader) { withSQLConf(SQLConf.USE_V1_SOURCE_LIST.key -> useV1Source) {
assertDF(df) assertDF(df)
assertDF(df) assertDF(df)
} }

View file

@ -308,7 +308,7 @@ class DataFrameReaderWriterSuite extends QueryTest with SharedSparkSession with
test("Throw exception on unsafe table insertion with strict casting policy") { test("Throw exception on unsafe table insertion with strict casting policy") {
withSQLConf( withSQLConf(
SQLConf.USE_V1_SOURCE_WRITER_LIST.key -> "parquet", SQLConf.USE_V1_SOURCE_LIST.key -> "parquet",
SQLConf.STORE_ASSIGNMENT_POLICY.key -> SQLConf.StoreAssignmentPolicy.STRICT.toString) { SQLConf.STORE_ASSIGNMENT_POLICY.key -> SQLConf.StoreAssignmentPolicy.STRICT.toString) {
withTable("t") { withTable("t") {
sql("create table t(i int, d double) using parquet") sql("create table t(i int, d double) using parquet")

View file

@ -2365,7 +2365,7 @@ class SQLQuerySuite extends QueryTest with SQLTestUtils with TestHiveSingleton {
// This test case is only for file source V1. As the rule OptimizeMetadataOnlyQuery is // This test case is only for file source V1. As the rule OptimizeMetadataOnlyQuery is
// disabled by default, we can skip testing file source v2 in current stage. // disabled by default, we can skip testing file source v2 in current stage.
withSQLConf(SQLConf.OPTIMIZER_METADATA_ONLY.key -> enableOptimizeMetadataOnlyQuery.toString, withSQLConf(SQLConf.OPTIMIZER_METADATA_ONLY.key -> enableOptimizeMetadataOnlyQuery.toString,
SQLConf.USE_V1_SOURCE_READER_LIST.key -> "parquet") { SQLConf.USE_V1_SOURCE_LIST.key -> "parquet") {
withTable("t") { withTable("t") {
sql("CREATE TABLE t (col1 INT, p1 INT) USING PARQUET PARTITIONED BY (p1)") sql("CREATE TABLE t (col1 INT, p1 INT) USING PARQUET PARTITIONED BY (p1)")
sql("INSERT INTO TABLE t PARTITION (p1 = 5) SELECT ID FROM range(1, 1)") sql("INSERT INTO TABLE t PARTITION (p1 = 5) SELECT ID FROM range(1, 1)")

View file

@ -818,7 +818,7 @@ abstract class HadoopFsRelationTest extends QueryTest with SQLTestUtils with Tes
assert(preferredLocations.distinct.length == 2) assert(preferredLocations.distinct.length == 2)
} }
withSQLConf(SQLConf.USE_V1_SOURCE_READER_LIST.key -> dataSourceName) { withSQLConf(SQLConf.USE_V1_SOURCE_LIST.key -> dataSourceName) {
checkLocality() checkLocality()
withSQLConf(SQLConf.PARALLEL_PARTITION_DISCOVERY_THRESHOLD.key -> "0") { withSQLConf(SQLConf.PARALLEL_PARTITION_DISCOVERY_THRESHOLD.key -> "0") {