[SPARK-25129][SQL] Make the mapping of com.databricks.spark.avro to built-in module configurable

## What changes were proposed in this pull request?

In https://issues.apache.org/jira/browse/SPARK-24924, the data source provider com.databricks.spark.avro is mapped to the new package org.apache.spark.sql.avro .

As per the discussion in the [Jira](https://issues.apache.org/jira/browse/SPARK-24924) and PR #22119, we should make the mapping configurable.

This PR also improve the error message when data source of Avro/Kafka is not found.

## How was this patch tested?

Unit test

Closes #22133 from gengliangwang/configurable_avro_mapping.

Authored-by: Gengliang Wang <gengliang.wang@databricks.com>
Signed-off-by: Xiao Li <gatorsmile@gmail.com>
This commit is contained in:
Gengliang Wang 2018-08-21 15:26:24 -07:00 committed by Xiao Li
parent 6c5cb85856
commit ac0174e55a
4 changed files with 52 additions and 3 deletions

View file

@ -77,10 +77,19 @@ class AvroSuite extends QueryTest with SharedSQLContext with SQLTestUtils {
}
test("resolve avro data source") {
Seq("avro", "com.databricks.spark.avro").foreach { provider =>
val databricksAvro = "com.databricks.spark.avro"
// By default the backward compatibility for com.databricks.spark.avro is enabled.
Seq("avro", "org.apache.spark.sql.avro.AvroFileFormat", databricksAvro).foreach { provider =>
assert(DataSource.lookupDataSource(provider, spark.sessionState.conf) ===
classOf[org.apache.spark.sql.avro.AvroFileFormat])
}
withSQLConf(SQLConf.LEGACY_REPLACE_DATABRICKS_SPARK_AVRO_ENABLED.key -> "false") {
val message = intercept[AnalysisException] {
DataSource.lookupDataSource(databricksAvro, spark.sessionState.conf)
}.getMessage
assert(message.contains(s"Failed to find data source: $databricksAvro"))
}
}
test("reading from multiple paths") {

View file

@ -1469,6 +1469,13 @@ object SQLConf {
.checkValues((1 to 9).toSet + Deflater.DEFAULT_COMPRESSION)
.createWithDefault(Deflater.DEFAULT_COMPRESSION)
val LEGACY_REPLACE_DATABRICKS_SPARK_AVRO_ENABLED =
buildConf("spark.sql.legacy.replaceDatabricksSparkAvro.enabled")
.doc("If it is set to true, the data source provider com.databricks.spark.avro is mapped " +
"to the built-in but external Avro data source module for backward compatibility.")
.booleanConf
.createWithDefault(true)
val LEGACY_SETOPS_PRECEDENCE_ENABLED =
buildConf("spark.sql.legacy.setopsPrecedence.enabled")
.internal()
@ -1881,6 +1888,9 @@ class SQLConf extends Serializable with Logging {
def avroDeflateLevel: Int = getConf(SQLConf.AVRO_DEFLATE_LEVEL)
def replaceDatabricksSparkAvroEnabled: Boolean =
getConf(SQLConf.LEGACY_REPLACE_DATABRICKS_SPARK_AVRO_ENABLED)
def setOpsPrecedenceEnforced: Boolean = getConf(SQLConf.LEGACY_SETOPS_PRECEDENCE_ENABLED)
def parallelFileListingInStatsComputation: Boolean =

View file

@ -571,7 +571,6 @@ object DataSource extends Logging {
val nativeOrc = classOf[OrcFileFormat].getCanonicalName
val socket = classOf[TextSocketSourceProvider].getCanonicalName
val rate = classOf[RateStreamProvider].getCanonicalName
val avro = "org.apache.spark.sql.avro.AvroFileFormat"
Map(
"org.apache.spark.sql.jdbc" -> jdbc,
@ -593,7 +592,6 @@ object DataSource extends Logging {
"org.apache.spark.ml.source.libsvm.DefaultSource" -> libsvm,
"org.apache.spark.ml.source.libsvm" -> libsvm,
"com.databricks.spark.csv" -> csv,
"com.databricks.spark.avro" -> avro,
"org.apache.spark.sql.execution.streaming.TextSocketSourceProvider" -> socket,
"org.apache.spark.sql.execution.streaming.RateSourceProvider" -> rate
)
@ -616,6 +614,8 @@ object DataSource extends Logging {
case name if name.equalsIgnoreCase("orc") &&
conf.getConf(SQLConf.ORC_IMPLEMENTATION) == "hive" =>
"org.apache.spark.sql.hive.orc.OrcFileFormat"
case "com.databricks.spark.avro" if conf.replaceDatabricksSparkAvroEnabled =>
"org.apache.spark.sql.avro.AvroFileFormat"
case name => name
}
val provider2 = s"$provider1.DefaultSource"
@ -637,6 +637,18 @@ object DataSource extends Logging {
"Hive built-in ORC data source must be used with Hive support enabled. " +
"Please use the native ORC data source by setting 'spark.sql.orc.impl' to " +
"'native'")
} else if (provider1.toLowerCase(Locale.ROOT) == "avro" ||
provider1 == "com.databricks.spark.avro" ||
provider1 == "org.apache.spark.sql.avro") {
throw new AnalysisException(
s"Failed to find data source: $provider1. Avro is built-in but external data " +
"source module since Spark 2.4. Please deploy the application as per " +
"the deployment section of \"Apache Avro Data Source Guide\".")
} else if (provider1.toLowerCase(Locale.ROOT) == "kafka") {
throw new AnalysisException(
s"Failed to find data source: $provider1. Please deploy the application as " +
"per the deployment section of " +
"\"Structured Streaming + Kafka Integration Guide\".")
} else {
throw new ClassNotFoundException(
s"Failed to find data source: $provider1. Please find packages at " +

View file

@ -76,6 +76,24 @@ class ResolvedDataSourceSuite extends SparkFunSuite with SharedSQLContext {
classOf[org.apache.spark.sql.execution.datasources.csv.CSVFileFormat])
}
test("avro: show deploy guide for loading the external avro module") {
Seq("avro", "org.apache.spark.sql.avro").foreach { provider =>
val message = intercept[AnalysisException] {
getProvidingClass(provider)
}.getMessage
assert(message.contains(s"Failed to find data source: $provider"))
assert(message.contains("Please deploy the application as per the deployment section of"))
}
}
test("kafka: show deploy guide for loading the external kafka module") {
val message = intercept[AnalysisException] {
getProvidingClass("kafka")
}.getMessage
assert(message.contains("Failed to find data source: kafka"))
assert(message.contains("Please deploy the application as per the deployment section of"))
}
test("error message for unknown data sources") {
val error = intercept[ClassNotFoundException] {
getProvidingClass("asfdwefasdfasdf")