From 9aff6f3b1915523432b1921fdd30fa015ed5d670 Mon Sep 17 00:00:00 2001 From: gatorsmile Date: Thu, 2 Jun 2016 13:22:43 -0700 Subject: [PATCH] [SPARK-15515][SQL] Error Handling in Running SQL Directly On Files #### What changes were proposed in this pull request? This PR is to address the following issues: - **ISSUE 1:** For ORC source format, we are reporting the strange error message when we did not enable Hive support: ```SQL SQL Example: select id from `org.apache.spark.sql.hive.orc`.`file_path` Error Message: Table or view not found: `org.apache.spark.sql.hive.orc`.`file_path` ``` Instead, we should issue the error message like: ``` Expected Error Message: The ORC data source must be used with Hive support enabled ``` - **ISSUE 2:** For the Avro format, we report the strange error message like: The example query is like ```SQL SQL Example: select id from `avro`.`file_path` select id from `com.databricks.spark.avro`.`file_path` Error Message: Table or view not found: `com.databricks.spark.avro`.`file_path` ``` The desired message should be like: ``` Expected Error Message: Failed to find data source: avro. Please use Spark package http://spark-packages.org/package/databricks/spark-avro" ``` - ~~**ISSUE 3:** Unable to detect incompatibility libraries for Spark 2.0 in Data Source Resolution. We report a strange error message:~~ **Update**: The latest code changes contains - For JDBC format, we added an extra checking in the rule `ResolveRelations` of `Analyzer`. Without the PR, Spark will return the error message like: `Option 'url' not specified`. Now, we are reporting `Unsupported data source type for direct query on files: jdbc` - Make data source format name case incensitive so that error handling behaves consistent with the normal cases. - Added the test cases for all the supported formats. #### How was this patch tested? Added test cases to cover all the above issues Author: gatorsmile Author: xiaoli Author: Xiao Li Closes #13283 from gatorsmile/runSQLAgainstFile. --- .../execution/datasources/DataSource.scala | 34 +++++------- .../sql/execution/datasources/rules.scala | 14 ++++- .../org/apache/spark/sql/SQLQuerySuite.scala | 53 ++++++++++++++++--- .../sql/sources/DDLSourceLoadSuite.scala | 5 +- .../sql/sources/ResolvedDataSourceSuite.scala | 14 ++++- .../sql/hive/execution/SQLQuerySuite.scala | 48 ++++++++++++++++- 6 files changed, 134 insertions(+), 34 deletions(-) diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/DataSource.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/DataSource.scala index 93f1ad01bf..5f17fdf946 100644 --- a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/DataSource.scala +++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/DataSource.scala @@ -132,28 +132,20 @@ case class DataSource( // Found the data source using fully qualified path dataSource case Failure(error) => - if (error.isInstanceOf[ClassNotFoundException]) { - val className = error.getMessage - if (spark2RemovedClasses.contains(className)) { - throw new ClassNotFoundException(s"$className is removed in Spark 2.0. " + - "Please check if your library is compatible with Spark 2.0") - } - } - if (provider.startsWith("org.apache.spark.sql.hive.orc")) { - throw new ClassNotFoundException( - "The ORC data source must be used with Hive support enabled.", error) + if (provider.toLowerCase == "orc" || + provider.startsWith("org.apache.spark.sql.hive.orc")) { + throw new AnalysisException( + "The ORC data source must be used with Hive support enabled") + } else if (provider.toLowerCase == "avro" || + provider == "com.databricks.spark.avro") { + throw new AnalysisException( + s"Failed to find data source: ${provider.toLowerCase}. Please use Spark " + + "package http://spark-packages.org/package/databricks/spark-avro") } else { - if (provider == "avro" || provider == "com.databricks.spark.avro") { - throw new ClassNotFoundException( - s"Failed to find data source: $provider. Please use Spark package " + - "http://spark-packages.org/package/databricks/spark-avro", - error) - } else { - throw new ClassNotFoundException( - s"Failed to find data source: $provider. Please find packages at " + - "http://spark-packages.org", - error) - } + throw new ClassNotFoundException( + s"Failed to find data source: $provider. Please find packages at " + + "http://spark-packages.org", + error) } } } catch { diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/rules.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/rules.scala index b622f85941..9afd715016 100644 --- a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/rules.scala +++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/rules.scala @@ -17,6 +17,8 @@ package org.apache.spark.sql.execution.datasources +import scala.util.control.NonFatal + import org.apache.spark.sql.{AnalysisException, SaveMode, SparkSession} import org.apache.spark.sql.catalyst.analysis._ import org.apache.spark.sql.catalyst.catalog.SessionCatalog @@ -28,7 +30,7 @@ import org.apache.spark.sql.internal.SQLConf import org.apache.spark.sql.sources.{BaseRelation, InsertableRelation} /** - * Try to replaces [[UnresolvedRelation]]s with [[ResolvedDataSource]]. + * Try to replaces [[UnresolvedRelation]]s with [[ResolveDataSource]]. */ private[sql] class ResolveDataSource(sparkSession: SparkSession) extends Rule[LogicalPlan] { def apply(plan: LogicalPlan): LogicalPlan = plan resolveOperators { @@ -38,6 +40,16 @@ private[sql] class ResolveDataSource(sparkSession: SparkSession) extends Rule[Lo sparkSession, paths = u.tableIdentifier.table :: Nil, className = u.tableIdentifier.database.get) + + val notSupportDirectQuery = try { + !classOf[FileFormat].isAssignableFrom(dataSource.providingClass) + } catch { + case NonFatal(e) => false + } + if (notSupportDirectQuery) { + throw new AnalysisException("Unsupported data source type for direct query on files: " + + s"${u.tableIdentifier.database.get}") + } val plan = LogicalRelation(dataSource.resolveRelation()) u.alias.map(a => SubqueryAlias(u.alias.get, plan)).getOrElse(plan) } catch { diff --git a/sql/core/src/test/scala/org/apache/spark/sql/SQLQuerySuite.scala b/sql/core/src/test/scala/org/apache/spark/sql/SQLQuerySuite.scala index 1a7f6ebbb2..4fcd6bc0d9 100644 --- a/sql/core/src/test/scala/org/apache/spark/sql/SQLQuerySuite.scala +++ b/sql/core/src/test/scala/org/apache/spark/sql/SQLQuerySuite.scala @@ -1838,20 +1838,61 @@ class SQLQuerySuite extends QueryTest with SharedSQLContext { df) }) - val e1 = intercept[AnalysisException] { + var e = intercept[AnalysisException] { sql("select * from in_valid_table") } - assert(e1.message.contains("Table or view not found")) + assert(e.message.contains("Table or view not found")) - val e2 = intercept[AnalysisException] { + e = intercept[AnalysisException] { sql("select * from no_db.no_table").show() } - assert(e2.message.contains("Table or view not found")) + assert(e.message.contains("Table or view not found")) - val e3 = intercept[AnalysisException] { + e = intercept[AnalysisException] { sql("select * from json.invalid_file") } - assert(e3.message.contains("Path does not exist")) + assert(e.message.contains("Path does not exist")) + + e = intercept[AnalysisException] { + sql(s"select id from `org.apache.spark.sql.hive.orc`.`file_path`") + } + assert(e.message.contains("The ORC data source must be used with Hive support enabled")) + + e = intercept[AnalysisException] { + sql(s"select id from `com.databricks.spark.avro`.`file_path`") + } + assert(e.message.contains("Failed to find data source: com.databricks.spark.avro. " + + "Please use Spark package http://spark-packages.org/package/databricks/spark-avro")) + + // data source type is case insensitive + e = intercept[AnalysisException] { + sql(s"select id from Avro.`file_path`") + } + assert(e.message.contains("Failed to find data source: avro. Please use Spark package " + + "http://spark-packages.org/package/databricks/spark-avro")) + + e = intercept[AnalysisException] { + sql(s"select id from avro.`file_path`") + } + assert(e.message.contains("Failed to find data source: avro. Please use Spark package " + + "http://spark-packages.org/package/databricks/spark-avro")) + + e = intercept[AnalysisException] { + sql(s"select id from `org.apache.spark.sql.sources.HadoopFsRelationProvider`.`file_path`") + } + assert(e.message.contains("Table or view not found: " + + "`org.apache.spark.sql.sources.HadoopFsRelationProvider`.`file_path`")) + + e = intercept[AnalysisException] { + sql(s"select id from `Jdbc`.`file_path`") + } + assert(e.message.contains("Unsupported data source type for direct query on files: Jdbc")) + + e = intercept[AnalysisException] { + sql(s"select id from `org.apache.spark.sql.execution.datasources.jdbc`.`file_path`") + } + assert(e.message.contains("Unsupported data source type for direct query on files: " + + "org.apache.spark.sql.execution.datasources.jdbc")) } test("SortMergeJoin returns wrong results when using UnsafeRows") { diff --git a/sql/core/src/test/scala/org/apache/spark/sql/sources/DDLSourceLoadSuite.scala b/sql/core/src/test/scala/org/apache/spark/sql/sources/DDLSourceLoadSuite.scala index f07c33042a..85ba33e58a 100644 --- a/sql/core/src/test/scala/org/apache/spark/sql/sources/DDLSourceLoadSuite.scala +++ b/sql/core/src/test/scala/org/apache/spark/sql/sources/DDLSourceLoadSuite.scala @@ -17,7 +17,7 @@ package org.apache.spark.sql.sources -import org.apache.spark.sql.SQLContext +import org.apache.spark.sql.{AnalysisException, SQLContext} import org.apache.spark.sql.test.SharedSQLContext import org.apache.spark.sql.types.{StringType, StructField, StructType} @@ -42,9 +42,10 @@ class DDLSourceLoadSuite extends DataSourceTest with SharedSQLContext { } test("should fail to load ORC without Hive Support") { - intercept[ClassNotFoundException] { + val e = intercept[AnalysisException] { spark.read.format("orc").load() } + assert(e.message.contains("The ORC data source must be used with Hive support enabled")) } } diff --git a/sql/core/src/test/scala/org/apache/spark/sql/sources/ResolvedDataSourceSuite.scala b/sql/core/src/test/scala/org/apache/spark/sql/sources/ResolvedDataSourceSuite.scala index 320aaea1e4..5ea1f32433 100644 --- a/sql/core/src/test/scala/org/apache/spark/sql/sources/ResolvedDataSourceSuite.scala +++ b/sql/core/src/test/scala/org/apache/spark/sql/sources/ResolvedDataSourceSuite.scala @@ -18,6 +18,7 @@ package org.apache.spark.sql.sources import org.apache.spark.SparkFunSuite +import org.apache.spark.sql.AnalysisException import org.apache.spark.sql.execution.datasources.DataSource class ResolvedDataSourceSuite extends SparkFunSuite { @@ -60,13 +61,22 @@ class ResolvedDataSourceSuite extends SparkFunSuite { classOf[org.apache.spark.sql.execution.datasources.parquet.ParquetFileFormat]) } + test("csv") { + assert( + getProvidingClass("csv") === + classOf[org.apache.spark.sql.execution.datasources.csv.CSVFileFormat]) + assert( + getProvidingClass("com.databricks.spark.csv") === + classOf[org.apache.spark.sql.execution.datasources.csv.CSVFileFormat]) + } + test("error message for unknown data sources") { - val error1 = intercept[ClassNotFoundException] { + val error1 = intercept[AnalysisException] { getProvidingClass("avro") } assert(error1.getMessage.contains("spark-packages")) - val error2 = intercept[ClassNotFoundException] { + val error2 = intercept[AnalysisException] { getProvidingClass("com.databricks.spark.avro") } assert(error2.getMessage.contains("spark-packages")) diff --git a/sql/hive/src/test/scala/org/apache/spark/sql/hive/execution/SQLQuerySuite.scala b/sql/hive/src/test/scala/org/apache/spark/sql/hive/execution/SQLQuerySuite.scala index b5691450ca..24de223cf8 100644 --- a/sql/hive/src/test/scala/org/apache/spark/sql/hive/execution/SQLQuerySuite.scala +++ b/sql/hive/src/test/scala/org/apache/spark/sql/hive/execution/SQLQuerySuite.scala @@ -1247,11 +1247,12 @@ class SQLQuerySuite extends QueryTest with SQLTestUtils with TestHiveSingleton { } } - test("run sql directly on files") { + test("run sql directly on files - parquet") { val df = spark.range(100).toDF() withTempPath(f => { df.write.parquet(f.getCanonicalPath) - checkAnswer(sql(s"select id from parquet.`${f.getCanonicalPath}`"), + // data source type is case insensitive + checkAnswer(sql(s"select id from Parquet.`${f.getCanonicalPath}`"), df) checkAnswer(sql(s"select id from `org.apache.spark.sql.parquet`.`${f.getCanonicalPath}`"), df) @@ -1260,6 +1261,49 @@ class SQLQuerySuite extends QueryTest with SQLTestUtils with TestHiveSingleton { }) } + test("run sql directly on files - orc") { + val df = spark.range(100).toDF() + withTempPath(f => { + df.write.orc(f.getCanonicalPath) + // data source type is case insensitive + checkAnswer(sql(s"select id from ORC.`${f.getCanonicalPath}`"), + df) + checkAnswer(sql(s"select id from `org.apache.spark.sql.hive.orc`.`${f.getCanonicalPath}`"), + df) + checkAnswer(sql(s"select a.id from orc.`${f.getCanonicalPath}` as a"), + df) + }) + } + + test("run sql directly on files - csv") { + val df = spark.range(100).toDF() + withTempPath(f => { + df.write.csv(f.getCanonicalPath) + // data source type is case insensitive + checkAnswer(sql(s"select cast(_c0 as int) id from CSV.`${f.getCanonicalPath}`"), + df) + checkAnswer( + sql(s"select cast(_c0 as int) id from `com.databricks.spark.csv`.`${f.getCanonicalPath}`"), + df) + checkAnswer(sql(s"select cast(a._c0 as int) id from csv.`${f.getCanonicalPath}` as a"), + df) + }) + } + + test("run sql directly on files - json") { + val df = spark.range(100).toDF() + withTempPath(f => { + df.write.json(f.getCanonicalPath) + // data source type is case insensitive + checkAnswer(sql(s"select id from jsoN.`${f.getCanonicalPath}`"), + df) + checkAnswer(sql(s"select id from `org.apache.spark.sql.json`.`${f.getCanonicalPath}`"), + df) + checkAnswer(sql(s"select a.id from json.`${f.getCanonicalPath}` as a"), + df) + }) + } + test("SPARK-8976 Wrong Result for Rollup #1") { checkAnswer(sql( "SELECT count(*) AS cnt, key % 5, grouping_id() FROM src GROUP BY key%5 WITH ROLLUP"),