[SPARK-15515][SQL] Error Handling in Running SQL Directly On Files
#### What changes were proposed in this pull request? This PR is to address the following issues: - **ISSUE 1:** For ORC source format, we are reporting the strange error message when we did not enable Hive support: ```SQL SQL Example: select id from `org.apache.spark.sql.hive.orc`.`file_path` Error Message: Table or view not found: `org.apache.spark.sql.hive.orc`.`file_path` ``` Instead, we should issue the error message like: ``` Expected Error Message: The ORC data source must be used with Hive support enabled ``` - **ISSUE 2:** For the Avro format, we report the strange error message like: The example query is like ```SQL SQL Example: select id from `avro`.`file_path` select id from `com.databricks.spark.avro`.`file_path` Error Message: Table or view not found: `com.databricks.spark.avro`.`file_path` ``` The desired message should be like: ``` Expected Error Message: Failed to find data source: avro. Please use Spark package http://spark-packages.org/package/databricks/spark-avro" ``` - ~~**ISSUE 3:** Unable to detect incompatibility libraries for Spark 2.0 in Data Source Resolution. We report a strange error message:~~ **Update**: The latest code changes contains - For JDBC format, we added an extra checking in the rule `ResolveRelations` of `Analyzer`. Without the PR, Spark will return the error message like: `Option 'url' not specified`. Now, we are reporting `Unsupported data source type for direct query on files: jdbc` - Make data source format name case incensitive so that error handling behaves consistent with the normal cases. - Added the test cases for all the supported formats. #### How was this patch tested? Added test cases to cover all the above issues Author: gatorsmile <gatorsmile@gmail.com> Author: xiaoli <lixiao1983@gmail.com> Author: Xiao Li <xiaoli@Xiaos-MacBook-Pro.local> Closes #13283 from gatorsmile/runSQLAgainstFile.
This commit is contained in:
parent
8900c8d8ff
commit
9aff6f3b19
|
@ -132,28 +132,20 @@ case class DataSource(
|
|||
// Found the data source using fully qualified path
|
||||
dataSource
|
||||
case Failure(error) =>
|
||||
if (error.isInstanceOf[ClassNotFoundException]) {
|
||||
val className = error.getMessage
|
||||
if (spark2RemovedClasses.contains(className)) {
|
||||
throw new ClassNotFoundException(s"$className is removed in Spark 2.0. " +
|
||||
"Please check if your library is compatible with Spark 2.0")
|
||||
}
|
||||
}
|
||||
if (provider.startsWith("org.apache.spark.sql.hive.orc")) {
|
||||
throw new ClassNotFoundException(
|
||||
"The ORC data source must be used with Hive support enabled.", error)
|
||||
if (provider.toLowerCase == "orc" ||
|
||||
provider.startsWith("org.apache.spark.sql.hive.orc")) {
|
||||
throw new AnalysisException(
|
||||
"The ORC data source must be used with Hive support enabled")
|
||||
} else if (provider.toLowerCase == "avro" ||
|
||||
provider == "com.databricks.spark.avro") {
|
||||
throw new AnalysisException(
|
||||
s"Failed to find data source: ${provider.toLowerCase}. Please use Spark " +
|
||||
"package http://spark-packages.org/package/databricks/spark-avro")
|
||||
} else {
|
||||
if (provider == "avro" || provider == "com.databricks.spark.avro") {
|
||||
throw new ClassNotFoundException(
|
||||
s"Failed to find data source: $provider. Please use Spark package " +
|
||||
"http://spark-packages.org/package/databricks/spark-avro",
|
||||
error)
|
||||
} else {
|
||||
throw new ClassNotFoundException(
|
||||
s"Failed to find data source: $provider. Please find packages at " +
|
||||
"http://spark-packages.org",
|
||||
error)
|
||||
}
|
||||
throw new ClassNotFoundException(
|
||||
s"Failed to find data source: $provider. Please find packages at " +
|
||||
"http://spark-packages.org",
|
||||
error)
|
||||
}
|
||||
}
|
||||
} catch {
|
||||
|
|
|
@ -17,6 +17,8 @@
|
|||
|
||||
package org.apache.spark.sql.execution.datasources
|
||||
|
||||
import scala.util.control.NonFatal
|
||||
|
||||
import org.apache.spark.sql.{AnalysisException, SaveMode, SparkSession}
|
||||
import org.apache.spark.sql.catalyst.analysis._
|
||||
import org.apache.spark.sql.catalyst.catalog.SessionCatalog
|
||||
|
@ -28,7 +30,7 @@ import org.apache.spark.sql.internal.SQLConf
|
|||
import org.apache.spark.sql.sources.{BaseRelation, InsertableRelation}
|
||||
|
||||
/**
|
||||
* Try to replaces [[UnresolvedRelation]]s with [[ResolvedDataSource]].
|
||||
* Try to replaces [[UnresolvedRelation]]s with [[ResolveDataSource]].
|
||||
*/
|
||||
private[sql] class ResolveDataSource(sparkSession: SparkSession) extends Rule[LogicalPlan] {
|
||||
def apply(plan: LogicalPlan): LogicalPlan = plan resolveOperators {
|
||||
|
@ -38,6 +40,16 @@ private[sql] class ResolveDataSource(sparkSession: SparkSession) extends Rule[Lo
|
|||
sparkSession,
|
||||
paths = u.tableIdentifier.table :: Nil,
|
||||
className = u.tableIdentifier.database.get)
|
||||
|
||||
val notSupportDirectQuery = try {
|
||||
!classOf[FileFormat].isAssignableFrom(dataSource.providingClass)
|
||||
} catch {
|
||||
case NonFatal(e) => false
|
||||
}
|
||||
if (notSupportDirectQuery) {
|
||||
throw new AnalysisException("Unsupported data source type for direct query on files: " +
|
||||
s"${u.tableIdentifier.database.get}")
|
||||
}
|
||||
val plan = LogicalRelation(dataSource.resolveRelation())
|
||||
u.alias.map(a => SubqueryAlias(u.alias.get, plan)).getOrElse(plan)
|
||||
} catch {
|
||||
|
|
|
@ -1838,20 +1838,61 @@ class SQLQuerySuite extends QueryTest with SharedSQLContext {
|
|||
df)
|
||||
})
|
||||
|
||||
val e1 = intercept[AnalysisException] {
|
||||
var e = intercept[AnalysisException] {
|
||||
sql("select * from in_valid_table")
|
||||
}
|
||||
assert(e1.message.contains("Table or view not found"))
|
||||
assert(e.message.contains("Table or view not found"))
|
||||
|
||||
val e2 = intercept[AnalysisException] {
|
||||
e = intercept[AnalysisException] {
|
||||
sql("select * from no_db.no_table").show()
|
||||
}
|
||||
assert(e2.message.contains("Table or view not found"))
|
||||
assert(e.message.contains("Table or view not found"))
|
||||
|
||||
val e3 = intercept[AnalysisException] {
|
||||
e = intercept[AnalysisException] {
|
||||
sql("select * from json.invalid_file")
|
||||
}
|
||||
assert(e3.message.contains("Path does not exist"))
|
||||
assert(e.message.contains("Path does not exist"))
|
||||
|
||||
e = intercept[AnalysisException] {
|
||||
sql(s"select id from `org.apache.spark.sql.hive.orc`.`file_path`")
|
||||
}
|
||||
assert(e.message.contains("The ORC data source must be used with Hive support enabled"))
|
||||
|
||||
e = intercept[AnalysisException] {
|
||||
sql(s"select id from `com.databricks.spark.avro`.`file_path`")
|
||||
}
|
||||
assert(e.message.contains("Failed to find data source: com.databricks.spark.avro. " +
|
||||
"Please use Spark package http://spark-packages.org/package/databricks/spark-avro"))
|
||||
|
||||
// data source type is case insensitive
|
||||
e = intercept[AnalysisException] {
|
||||
sql(s"select id from Avro.`file_path`")
|
||||
}
|
||||
assert(e.message.contains("Failed to find data source: avro. Please use Spark package " +
|
||||
"http://spark-packages.org/package/databricks/spark-avro"))
|
||||
|
||||
e = intercept[AnalysisException] {
|
||||
sql(s"select id from avro.`file_path`")
|
||||
}
|
||||
assert(e.message.contains("Failed to find data source: avro. Please use Spark package " +
|
||||
"http://spark-packages.org/package/databricks/spark-avro"))
|
||||
|
||||
e = intercept[AnalysisException] {
|
||||
sql(s"select id from `org.apache.spark.sql.sources.HadoopFsRelationProvider`.`file_path`")
|
||||
}
|
||||
assert(e.message.contains("Table or view not found: " +
|
||||
"`org.apache.spark.sql.sources.HadoopFsRelationProvider`.`file_path`"))
|
||||
|
||||
e = intercept[AnalysisException] {
|
||||
sql(s"select id from `Jdbc`.`file_path`")
|
||||
}
|
||||
assert(e.message.contains("Unsupported data source type for direct query on files: Jdbc"))
|
||||
|
||||
e = intercept[AnalysisException] {
|
||||
sql(s"select id from `org.apache.spark.sql.execution.datasources.jdbc`.`file_path`")
|
||||
}
|
||||
assert(e.message.contains("Unsupported data source type for direct query on files: " +
|
||||
"org.apache.spark.sql.execution.datasources.jdbc"))
|
||||
}
|
||||
|
||||
test("SortMergeJoin returns wrong results when using UnsafeRows") {
|
||||
|
|
|
@ -17,7 +17,7 @@
|
|||
|
||||
package org.apache.spark.sql.sources
|
||||
|
||||
import org.apache.spark.sql.SQLContext
|
||||
import org.apache.spark.sql.{AnalysisException, SQLContext}
|
||||
import org.apache.spark.sql.test.SharedSQLContext
|
||||
import org.apache.spark.sql.types.{StringType, StructField, StructType}
|
||||
|
||||
|
@ -42,9 +42,10 @@ class DDLSourceLoadSuite extends DataSourceTest with SharedSQLContext {
|
|||
}
|
||||
|
||||
test("should fail to load ORC without Hive Support") {
|
||||
intercept[ClassNotFoundException] {
|
||||
val e = intercept[AnalysisException] {
|
||||
spark.read.format("orc").load()
|
||||
}
|
||||
assert(e.message.contains("The ORC data source must be used with Hive support enabled"))
|
||||
}
|
||||
}
|
||||
|
||||
|
|
|
@ -18,6 +18,7 @@
|
|||
package org.apache.spark.sql.sources
|
||||
|
||||
import org.apache.spark.SparkFunSuite
|
||||
import org.apache.spark.sql.AnalysisException
|
||||
import org.apache.spark.sql.execution.datasources.DataSource
|
||||
|
||||
class ResolvedDataSourceSuite extends SparkFunSuite {
|
||||
|
@ -60,13 +61,22 @@ class ResolvedDataSourceSuite extends SparkFunSuite {
|
|||
classOf[org.apache.spark.sql.execution.datasources.parquet.ParquetFileFormat])
|
||||
}
|
||||
|
||||
test("csv") {
|
||||
assert(
|
||||
getProvidingClass("csv") ===
|
||||
classOf[org.apache.spark.sql.execution.datasources.csv.CSVFileFormat])
|
||||
assert(
|
||||
getProvidingClass("com.databricks.spark.csv") ===
|
||||
classOf[org.apache.spark.sql.execution.datasources.csv.CSVFileFormat])
|
||||
}
|
||||
|
||||
test("error message for unknown data sources") {
|
||||
val error1 = intercept[ClassNotFoundException] {
|
||||
val error1 = intercept[AnalysisException] {
|
||||
getProvidingClass("avro")
|
||||
}
|
||||
assert(error1.getMessage.contains("spark-packages"))
|
||||
|
||||
val error2 = intercept[ClassNotFoundException] {
|
||||
val error2 = intercept[AnalysisException] {
|
||||
getProvidingClass("com.databricks.spark.avro")
|
||||
}
|
||||
assert(error2.getMessage.contains("spark-packages"))
|
||||
|
|
|
@ -1247,11 +1247,12 @@ class SQLQuerySuite extends QueryTest with SQLTestUtils with TestHiveSingleton {
|
|||
}
|
||||
}
|
||||
|
||||
test("run sql directly on files") {
|
||||
test("run sql directly on files - parquet") {
|
||||
val df = spark.range(100).toDF()
|
||||
withTempPath(f => {
|
||||
df.write.parquet(f.getCanonicalPath)
|
||||
checkAnswer(sql(s"select id from parquet.`${f.getCanonicalPath}`"),
|
||||
// data source type is case insensitive
|
||||
checkAnswer(sql(s"select id from Parquet.`${f.getCanonicalPath}`"),
|
||||
df)
|
||||
checkAnswer(sql(s"select id from `org.apache.spark.sql.parquet`.`${f.getCanonicalPath}`"),
|
||||
df)
|
||||
|
@ -1260,6 +1261,49 @@ class SQLQuerySuite extends QueryTest with SQLTestUtils with TestHiveSingleton {
|
|||
})
|
||||
}
|
||||
|
||||
test("run sql directly on files - orc") {
|
||||
val df = spark.range(100).toDF()
|
||||
withTempPath(f => {
|
||||
df.write.orc(f.getCanonicalPath)
|
||||
// data source type is case insensitive
|
||||
checkAnswer(sql(s"select id from ORC.`${f.getCanonicalPath}`"),
|
||||
df)
|
||||
checkAnswer(sql(s"select id from `org.apache.spark.sql.hive.orc`.`${f.getCanonicalPath}`"),
|
||||
df)
|
||||
checkAnswer(sql(s"select a.id from orc.`${f.getCanonicalPath}` as a"),
|
||||
df)
|
||||
})
|
||||
}
|
||||
|
||||
test("run sql directly on files - csv") {
|
||||
val df = spark.range(100).toDF()
|
||||
withTempPath(f => {
|
||||
df.write.csv(f.getCanonicalPath)
|
||||
// data source type is case insensitive
|
||||
checkAnswer(sql(s"select cast(_c0 as int) id from CSV.`${f.getCanonicalPath}`"),
|
||||
df)
|
||||
checkAnswer(
|
||||
sql(s"select cast(_c0 as int) id from `com.databricks.spark.csv`.`${f.getCanonicalPath}`"),
|
||||
df)
|
||||
checkAnswer(sql(s"select cast(a._c0 as int) id from csv.`${f.getCanonicalPath}` as a"),
|
||||
df)
|
||||
})
|
||||
}
|
||||
|
||||
test("run sql directly on files - json") {
|
||||
val df = spark.range(100).toDF()
|
||||
withTempPath(f => {
|
||||
df.write.json(f.getCanonicalPath)
|
||||
// data source type is case insensitive
|
||||
checkAnswer(sql(s"select id from jsoN.`${f.getCanonicalPath}`"),
|
||||
df)
|
||||
checkAnswer(sql(s"select id from `org.apache.spark.sql.json`.`${f.getCanonicalPath}`"),
|
||||
df)
|
||||
checkAnswer(sql(s"select a.id from json.`${f.getCanonicalPath}` as a"),
|
||||
df)
|
||||
})
|
||||
}
|
||||
|
||||
test("SPARK-8976 Wrong Result for Rollup #1") {
|
||||
checkAnswer(sql(
|
||||
"SELECT count(*) AS cnt, key % 5, grouping_id() FROM src GROUP BY key%5 WITH ROLLUP"),
|
||||
|
|
Loading…
Reference in a new issue