diff --git a/docs/sql-keywords.md b/docs/sql-keywords.md index 08be6b62a8..16ce35d55e 100644 --- a/docs/sql-keywords.md +++ b/docs/sql-keywords.md @@ -179,6 +179,7 @@ Below is a list of all the keywords in Spark SQL. MONTHreservednon-reservedreserved MONTHSnon-reservednon-reservednon-reserved MSCKnon-reservednon-reservednon-reserved + NAMESPACESnon-reservednon-reservednon-reserved NATURALreservedstrict-non-reservedreserved NOnon-reservednon-reservedreserved NOTreservednon-reservedreserved diff --git a/sql/catalyst/src/main/antlr4/org/apache/spark/sql/catalyst/parser/SqlBase.g4 b/sql/catalyst/src/main/antlr4/org/apache/spark/sql/catalyst/parser/SqlBase.g4 index f16ac6df8c..70c0d0e505 100644 --- a/sql/catalyst/src/main/antlr4/org/apache/spark/sql/catalyst/parser/SqlBase.g4 +++ b/sql/catalyst/src/main/antlr4/org/apache/spark/sql/catalyst/parser/SqlBase.g4 @@ -92,6 +92,8 @@ statement | DROP database (IF EXISTS)? db=errorCapturingIdentifier (RESTRICT | CASCADE)? #dropDatabase | SHOW DATABASES (LIKE? pattern=STRING)? #showDatabases + | SHOW NAMESPACES ((FROM | IN) multipartIdentifier)? + (LIKE? pattern=STRING)? #showNamespaces | createTableHeader ('(' colTypeList ')')? tableProvider ((OPTIONS options=tablePropertyList) | (PARTITIONED BY partitioning=transformList) | @@ -1006,6 +1008,7 @@ ansiNonReserved | MINUTES | MONTHS | MSCK + | NAMESPACES | NO | NULLS | OF @@ -1255,6 +1258,7 @@ nonReserved | MONTH | MONTHS | MSCK + | NAMESPACES | NO | NOT | NULL @@ -1515,6 +1519,7 @@ MINUTES: 'MINUTES'; MONTH: 'MONTH'; MONTHS: 'MONTHS'; MSCK: 'MSCK'; +NAMESPACES: 'NAMESPACES'; NATURAL: 'NATURAL'; NO: 'NO'; NOT: 'NOT' | '!'; diff --git a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalog/v2/CatalogV2Implicits.scala b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalog/v2/CatalogV2Implicits.scala index f512cd5e23..87070fb2ad 100644 --- a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalog/v2/CatalogV2Implicits.scala +++ b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalog/v2/CatalogV2Implicits.scala @@ -68,6 +68,14 @@ object CatalogV2Implicits { case _ => throw new AnalysisException(s"Cannot use catalog ${plugin.name}: not a TableCatalog") } + + def asNamespaceCatalog: SupportsNamespaces = plugin match { + case namespaceCatalog: SupportsNamespaces => + namespaceCatalog + case _ => + throw new AnalysisException( + s"Cannot use catalog ${plugin.name}: does not support namespaces") + } } implicit class NamespaceHelper(namespace: Array[String]) { diff --git a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/parser/AstBuilder.scala b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/parser/AstBuilder.scala index 90a533735a..25bfaa8901 100644 --- a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/parser/AstBuilder.scala +++ b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/parser/AstBuilder.scala @@ -38,7 +38,7 @@ import org.apache.spark.sql.catalyst.expressions.aggregate.{First, Last} import org.apache.spark.sql.catalyst.parser.SqlBaseParser._ import org.apache.spark.sql.catalyst.plans._ import org.apache.spark.sql.catalyst.plans.logical._ -import org.apache.spark.sql.catalyst.plans.logical.sql.{AlterTableAddColumnsStatement, AlterTableAlterColumnStatement, AlterTableDropColumnsStatement, AlterTableRenameColumnStatement, AlterTableSetLocationStatement, AlterTableSetPropertiesStatement, AlterTableUnsetPropertiesStatement, AlterViewSetPropertiesStatement, AlterViewUnsetPropertiesStatement, CreateTableAsSelectStatement, CreateTableStatement, DeleteFromStatement, DescribeColumnStatement, DescribeTableStatement, DropTableStatement, DropViewStatement, InsertIntoStatement, QualifiedColType, ReplaceTableAsSelectStatement, ReplaceTableStatement, ShowTablesStatement} +import org.apache.spark.sql.catalyst.plans.logical.sql.{AlterTableAddColumnsStatement, AlterTableAlterColumnStatement, AlterTableDropColumnsStatement, AlterTableRenameColumnStatement, AlterTableSetLocationStatement, AlterTableSetPropertiesStatement, AlterTableUnsetPropertiesStatement, AlterViewSetPropertiesStatement, AlterViewUnsetPropertiesStatement, CreateTableAsSelectStatement, CreateTableStatement, DeleteFromStatement, DescribeColumnStatement, DescribeTableStatement, DropTableStatement, DropViewStatement, InsertIntoStatement, QualifiedColType, ReplaceTableAsSelectStatement, ReplaceTableStatement, ShowNamespacesStatement, ShowTablesStatement} import org.apache.spark.sql.catalyst.util.DateTimeUtils.{getZoneId, stringToDate, stringToTimestamp} import org.apache.spark.sql.internal.SQLConf import org.apache.spark.sql.types._ @@ -2260,6 +2260,15 @@ class AstBuilder(conf: SQLConf) extends SqlBaseBaseVisitor[AnyRef] with Logging } } + /** + * Create a [[ShowNamespacesStatement]] command. + */ + override def visitShowNamespaces(ctx: ShowNamespacesContext): LogicalPlan = withOrigin(ctx) { + ShowNamespacesStatement( + Option(ctx.multipartIdentifier).map(visitMultipartIdentifier), + Option(ctx.pattern).map(string)) + } + /** * Create a table, returning a [[CreateTableStatement]] logical plan. * diff --git a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/plans/logical/basicLogicalOperators.scala b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/plans/logical/basicLogicalOperators.scala index 0be61cf147..aa613d2649 100644 --- a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/plans/logical/basicLogicalOperators.scala +++ b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/plans/logical/basicLogicalOperators.scala @@ -17,7 +17,7 @@ package org.apache.spark.sql.catalyst.plans.logical -import org.apache.spark.sql.catalog.v2.{Identifier, TableCatalog, TableChange} +import org.apache.spark.sql.catalog.v2.{Identifier, SupportsNamespaces, TableCatalog, TableChange} import org.apache.spark.sql.catalog.v2.TableChange.{AddColumn, ColumnChange} import org.apache.spark.sql.catalog.v2.expressions.Transform import org.apache.spark.sql.catalyst.AliasIdentifier @@ -560,6 +560,17 @@ object OverwritePartitionsDynamic { } } +/** + * The logical plan of the SHOW NAMESPACES command that works for v2 catalogs. + */ +case class ShowNamespaces( + catalog: SupportsNamespaces, + namespace: Option[Seq[String]], + pattern: Option[String]) extends Command { + override val output: Seq[Attribute] = Seq( + AttributeReference("namespace", StringType, nullable = false)()) +} + case class DescribeTable(table: NamedRelation, isExtended: Boolean) extends Command { override def children: Seq[LogicalPlan] = Seq(table) diff --git a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/plans/logical/sql/ShowNamespacesStatement.scala b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/plans/logical/sql/ShowNamespacesStatement.scala new file mode 100644 index 0000000000..95d48a8e80 --- /dev/null +++ b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/plans/logical/sql/ShowNamespacesStatement.scala @@ -0,0 +1,24 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.spark.sql.catalyst.plans.logical.sql + +/** + * A SHOW NAMESPACES statement, as parsed from SQL. + */ +case class ShowNamespacesStatement(namespace: Option[Seq[String]], pattern: Option[String]) + extends ParsedStatement diff --git a/sql/catalyst/src/test/scala/org/apache/spark/sql/catalyst/parser/DDLParserSuite.scala b/sql/catalyst/src/test/scala/org/apache/spark/sql/catalyst/parser/DDLParserSuite.scala index 9a0a326ecd..2e5ff8c3ce 100644 --- a/sql/catalyst/src/test/scala/org/apache/spark/sql/catalyst/parser/DDLParserSuite.scala +++ b/sql/catalyst/src/test/scala/org/apache/spark/sql/catalyst/parser/DDLParserSuite.scala @@ -24,7 +24,7 @@ import org.apache.spark.sql.catalog.v2.expressions.{ApplyTransform, BucketTransf import org.apache.spark.sql.catalyst.analysis.{AnalysisTest, UnresolvedRelation, UnresolvedStar} import org.apache.spark.sql.catalyst.catalog.BucketSpec import org.apache.spark.sql.catalyst.plans.logical.{LogicalPlan, Project} -import org.apache.spark.sql.catalyst.plans.logical.sql.{AlterTableAddColumnsStatement, AlterTableAlterColumnStatement, AlterTableDropColumnsStatement, AlterTableRenameColumnStatement, AlterTableSetLocationStatement, AlterTableSetPropertiesStatement, AlterTableUnsetPropertiesStatement, AlterViewSetPropertiesStatement, AlterViewUnsetPropertiesStatement, CreateTableAsSelectStatement, CreateTableStatement, DescribeColumnStatement, DescribeTableStatement, DropTableStatement, DropViewStatement, InsertIntoStatement, QualifiedColType, ReplaceTableAsSelectStatement, ReplaceTableStatement, ShowTablesStatement} +import org.apache.spark.sql.catalyst.plans.logical.sql.{AlterTableAddColumnsStatement, AlterTableAlterColumnStatement, AlterTableDropColumnsStatement, AlterTableRenameColumnStatement, AlterTableSetLocationStatement, AlterTableSetPropertiesStatement, AlterTableUnsetPropertiesStatement, AlterViewSetPropertiesStatement, AlterViewUnsetPropertiesStatement, CreateTableAsSelectStatement, CreateTableStatement, DescribeColumnStatement, DescribeTableStatement, DropTableStatement, DropViewStatement, InsertIntoStatement, QualifiedColType, ReplaceTableAsSelectStatement, ReplaceTableStatement, ShowNamespacesStatement, ShowTablesStatement} import org.apache.spark.sql.types.{IntegerType, LongType, StringType, StructType, TimestampType} import org.apache.spark.unsafe.types.UTF8String @@ -779,6 +779,21 @@ class DDLParserSuite extends AnalysisTest { ShowTablesStatement(Some(Seq("tbl")), Some("*dog*"))) } + test("show namespaces") { + comparePlans( + parsePlan("SHOW NAMESPACES"), + ShowNamespacesStatement(None, None)) + comparePlans( + parsePlan("SHOW NAMESPACES FROM testcat.ns1.ns2"), + ShowNamespacesStatement(Some(Seq("testcat", "ns1", "ns2")), None)) + comparePlans( + parsePlan("SHOW NAMESPACES IN testcat.ns1.ns2"), + ShowNamespacesStatement(Some(Seq("testcat", "ns1", "ns2")), None)) + comparePlans( + parsePlan("SHOW NAMESPACES IN testcat.ns1 LIKE '*pattern*'"), + ShowNamespacesStatement(Some(Seq("testcat", "ns1")), Some("*pattern*"))) + } + private case class TableSpec( name: Seq[String], schema: Option[StructType], diff --git a/sql/catalyst/src/test/scala/org/apache/spark/sql/connector/InMemoryTableCatalog.scala b/sql/catalyst/src/test/scala/org/apache/spark/sql/connector/InMemoryTableCatalog.scala index cff09f7550..8b14ad0d37 100644 --- a/sql/catalyst/src/test/scala/org/apache/spark/sql/connector/InMemoryTableCatalog.scala +++ b/sql/catalyst/src/test/scala/org/apache/spark/sql/connector/InMemoryTableCatalog.scala @@ -30,12 +30,9 @@ import org.apache.spark.sql.sources.v2.Table import org.apache.spark.sql.types.StructType import org.apache.spark.sql.util.CaseInsensitiveStringMap -class InMemoryTableCatalog extends TableCatalog with SupportsNamespaces { +class BasicInMemoryTableCatalog extends TableCatalog { import org.apache.spark.sql.catalog.v2.CatalogV2Implicits._ - protected val namespaces: util.Map[List[String], Map[String, String]] = - new ConcurrentHashMap[List[String], Map[String, String]]() - protected val tables: util.Map[Identifier, InMemoryTable] = new ConcurrentHashMap[Identifier, InMemoryTable]() @@ -112,6 +109,13 @@ class InMemoryTableCatalog extends TableCatalog with SupportsNamespaces { def clearTables(): Unit = { tables.clear() } +} + +class InMemoryTableCatalog extends BasicInMemoryTableCatalog with SupportsNamespaces { + import org.apache.spark.sql.catalog.v2.CatalogV2Implicits._ + + protected val namespaces: util.Map[List[String], Map[String, String]] = + new ConcurrentHashMap[List[String], Map[String, String]]() private def allNamespaces: Seq[Seq[String]] = { (tables.keySet.asScala.map(_.namespace.toSeq) ++ namespaces.keySet.asScala).toSeq.distinct diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/DataSourceResolution.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/DataSourceResolution.scala index 43bee695bd..68d32059bf 100644 --- a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/DataSourceResolution.scala +++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/DataSourceResolution.scala @@ -20,15 +20,15 @@ package org.apache.spark.sql.execution.datasources import scala.collection.mutable import org.apache.spark.sql.{AnalysisException, SaveMode} -import org.apache.spark.sql.catalog.v2.{CatalogManager, Identifier, LookupCatalog, TableCatalog} +import org.apache.spark.sql.catalog.v2.{CatalogManager, Identifier, LookupCatalog, SupportsNamespaces, TableCatalog} import org.apache.spark.sql.catalog.v2.expressions.Transform import org.apache.spark.sql.catalyst.TableIdentifier import org.apache.spark.sql.catalyst.analysis.{CastSupport, UnresolvedRelation} import org.apache.spark.sql.catalyst.catalog.{BucketSpec, CatalogTable, CatalogTableType, CatalogUtils, UnresolvedCatalogRelation} -import org.apache.spark.sql.catalyst.plans.logical.{CreateTableAsSelect, CreateV2Table, DeleteFromTable, DropTable, Filter, LogicalPlan, ReplaceTable, ReplaceTableAsSelect, ShowTables, SubqueryAlias} -import org.apache.spark.sql.catalyst.plans.logical.sql.{AlterTableAddColumnsStatement, AlterTableSetLocationStatement, AlterTableSetPropertiesStatement, AlterTableUnsetPropertiesStatement, AlterViewSetPropertiesStatement, AlterViewUnsetPropertiesStatement, CreateTableAsSelectStatement, CreateTableStatement, DeleteFromStatement, DescribeColumnStatement, DescribeTableStatement, DropTableStatement, DropViewStatement, QualifiedColType, ReplaceTableAsSelectStatement, ReplaceTableStatement, ShowTablesStatement} +import org.apache.spark.sql.catalyst.plans.logical.{CreateTableAsSelect, CreateV2Table, DeleteFromTable, DropTable, Filter, LogicalPlan, ReplaceTable, ReplaceTableAsSelect, ShowNamespaces, ShowTables, SubqueryAlias} +import org.apache.spark.sql.catalyst.plans.logical.sql.{AlterTableAddColumnsStatement, AlterTableSetLocationStatement, AlterTableSetPropertiesStatement, AlterTableUnsetPropertiesStatement, AlterViewSetPropertiesStatement, AlterViewUnsetPropertiesStatement, CreateTableAsSelectStatement, CreateTableStatement, DeleteFromStatement, DescribeColumnStatement, DescribeTableStatement, DropTableStatement, DropViewStatement, QualifiedColType, ReplaceTableAsSelectStatement, ReplaceTableStatement, ShowNamespacesStatement, ShowTablesStatement} import org.apache.spark.sql.catalyst.rules.Rule -import org.apache.spark.sql.execution.command.{AlterTableAddColumnsCommand, AlterTableSetLocationCommand, AlterTableSetPropertiesCommand, AlterTableUnsetPropertiesCommand, DescribeColumnCommand, DescribeTableCommand, DropTableCommand, ShowTablesCommand} +import org.apache.spark.sql.execution.command.{AlterTableAddColumnsCommand, AlterTableSetLocationCommand, AlterTableSetPropertiesCommand, AlterTableUnsetPropertiesCommand, DescribeColumnCommand, DescribeTableCommand, DropTableCommand, ShowDatabasesCommand, ShowTablesCommand} import org.apache.spark.sql.execution.datasources.v2.FileDataSourceV2 import org.apache.spark.sql.internal.SQLConf import org.apache.spark.sql.types.{HIVE_TYPE_STRING, HiveStringType, MetadataBuilder, StructField, StructType} @@ -169,6 +169,24 @@ case class DataSourceResolution( val aliased = delete.tableAlias.map(SubqueryAlias(_, relation)).getOrElse(relation) DeleteFromTable(aliased, delete.condition) + case ShowNamespacesStatement(None, pattern) => + defaultCatalog match { + case Some(catalog) => + ShowNamespaces(catalog.asNamespaceCatalog, None, pattern) + case None => + throw new AnalysisException("No default v2 catalog is set.") + } + + case ShowNamespacesStatement(Some(namespace), pattern) => + val CatalogNamespace(maybeCatalog, ns) = namespace + maybeCatalog match { + case Some(catalog) => + ShowNamespaces(catalog.asNamespaceCatalog, Some(ns), pattern) + case None => + throw new AnalysisException( + s"No v2 catalog is available for ${namespace.quoted}") + } + case ShowTablesStatement(None, pattern) => defaultCatalog match { case Some(catalog) => diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/v2/DataSourceV2Strategy.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/v2/DataSourceV2Strategy.scala index f629f36642..10dcf402ed 100644 --- a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/v2/DataSourceV2Strategy.scala +++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/v2/DataSourceV2Strategy.scala @@ -26,7 +26,7 @@ import org.apache.spark.sql.{AnalysisException, Strategy} import org.apache.spark.sql.catalog.v2.StagingTableCatalog import org.apache.spark.sql.catalyst.expressions.{And, AttributeReference, AttributeSet, Expression, PredicateHelper, SubqueryExpression} import org.apache.spark.sql.catalyst.planning.PhysicalOperation -import org.apache.spark.sql.catalyst.plans.logical.{AlterTable, AppendData, CreateTableAsSelect, CreateV2Table, DeleteFromTable, DescribeTable, DropTable, LogicalPlan, OverwriteByExpression, OverwritePartitionsDynamic, Repartition, ReplaceTable, ReplaceTableAsSelect, ShowTables} +import org.apache.spark.sql.catalyst.plans.logical.{AlterTable, AppendData, CreateTableAsSelect, CreateV2Table, DeleteFromTable, DescribeTable, DropTable, LogicalPlan, OverwriteByExpression, OverwritePartitionsDynamic, Repartition, ReplaceTable, ReplaceTableAsSelect, ShowNamespaces, ShowTables} import org.apache.spark.sql.execution.{FilterExec, ProjectExec, SparkPlan} import org.apache.spark.sql.execution.datasources.DataSourceStrategy import org.apache.spark.sql.execution.streaming.continuous.{ContinuousCoalesceExec, WriteToContinuousDataSource, WriteToContinuousDataSourceExec} @@ -291,6 +291,9 @@ object DataSourceV2Strategy extends Strategy with PredicateHelper { case AlterTable(catalog, ident, _, changes) => AlterTableExec(catalog, ident, changes) :: Nil + case r: ShowNamespaces => + ShowNamespacesExec(r.output, r.catalog, r.namespace, r.pattern) :: Nil + case r : ShowTables => ShowTablesExec(r.output, r.catalog, r.namespace, r.pattern) :: Nil diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/v2/ShowNamespacesExec.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/v2/ShowNamespacesExec.scala new file mode 100644 index 0000000000..f70b943fd2 --- /dev/null +++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/v2/ShowNamespacesExec.scala @@ -0,0 +1,63 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.spark.sql.execution.datasources.v2 + +import scala.collection.mutable.ArrayBuffer + +import org.apache.spark.rdd.RDD +import org.apache.spark.sql.catalog.v2.CatalogV2Implicits.NamespaceHelper +import org.apache.spark.sql.catalog.v2.SupportsNamespaces +import org.apache.spark.sql.catalyst.InternalRow +import org.apache.spark.sql.catalyst.encoders.RowEncoder +import org.apache.spark.sql.catalyst.expressions.{Attribute, GenericRowWithSchema} +import org.apache.spark.sql.catalyst.util.StringUtils +import org.apache.spark.sql.execution.LeafExecNode + +/** + * Physical plan node for showing namespaces. + */ +case class ShowNamespacesExec( + output: Seq[Attribute], + catalog: SupportsNamespaces, + namespace: Option[Seq[String]], + pattern: Option[String]) + extends LeafExecNode { + override protected def doExecute(): RDD[InternalRow] = { + val namespaces = namespace.map { ns => + if (ns.nonEmpty) { + catalog.listNamespaces(ns.toArray) + } else { + catalog.listNamespaces() + } + } + .getOrElse(catalog.listNamespaces()) + + val rows = new ArrayBuffer[InternalRow]() + val encoder = RowEncoder(schema).resolveAndBind() + + namespaces.map(_.quoted).map { ns => + if (pattern.map(StringUtils.filterPattern(Seq(ns), _).nonEmpty).getOrElse(true)) { + rows += encoder + .toRow(new GenericRowWithSchema(Array(ns), schema)) + .copy() + } + } + + sparkContext.parallelize(rows, 1) + } +} diff --git a/sql/core/src/test/scala/org/apache/spark/sql/sources/v2/DataSourceV2SQLSuite.scala b/sql/core/src/test/scala/org/apache/spark/sql/sources/v2/DataSourceV2SQLSuite.scala index de5b4692b8..4bfbefef44 100644 --- a/sql/core/src/test/scala/org/apache/spark/sql/sources/v2/DataSourceV2SQLSuite.scala +++ b/sql/core/src/test/scala/org/apache/spark/sql/sources/v2/DataSourceV2SQLSuite.scala @@ -22,7 +22,7 @@ import scala.collection.JavaConverters._ import org.apache.spark.sql._ import org.apache.spark.sql.catalog.v2.{CatalogPlugin, Identifier, TableCatalog} import org.apache.spark.sql.catalyst.analysis.{CannotReplaceMissingTableException, NoSuchDatabaseException, NoSuchTableException, TableAlreadyExistsException} -import org.apache.spark.sql.connector.{InMemoryTable, InMemoryTableCatalog, StagingInMemoryTableCatalog} +import org.apache.spark.sql.connector.{BasicInMemoryTableCatalog, InMemoryTable, InMemoryTableCatalog, StagingInMemoryTableCatalog} import org.apache.spark.sql.internal.SQLConf import org.apache.spark.sql.internal.SQLConf.V2_SESSION_CATALOG import org.apache.spark.sql.sources.v2.internal.V1Table @@ -746,6 +746,92 @@ class DataSourceV2SQLSuite assert(expected === df.collect()) } + test("ShowNamespaces: show root namespaces with default v2 catalog") { + spark.conf.set("spark.sql.default.catalog", "testcat") + + testShowNamespaces("SHOW NAMESPACES", Seq()) + + spark.sql("CREATE TABLE testcat.ns1.table (id bigint) USING foo") + spark.sql("CREATE TABLE testcat.ns1.ns1_1.table (id bigint) USING foo") + spark.sql("CREATE TABLE testcat.ns2.table (id bigint) USING foo") + + testShowNamespaces("SHOW NAMESPACES", Seq("ns1", "ns2")) + testShowNamespaces("SHOW NAMESPACES LIKE '*1*'", Seq("ns1")) + } + + test("ShowNamespaces: show namespaces with v2 catalog") { + spark.sql("CREATE TABLE testcat.ns1.table (id bigint) USING foo") + spark.sql("CREATE TABLE testcat.ns1.ns1_1.table (id bigint) USING foo") + spark.sql("CREATE TABLE testcat.ns1.ns1_2.table (id bigint) USING foo") + spark.sql("CREATE TABLE testcat.ns2.table (id bigint) USING foo") + spark.sql("CREATE TABLE testcat.ns2.ns2_1.table (id bigint) USING foo") + + // Look up only with catalog name, which should list root namespaces. + testShowNamespaces("SHOW NAMESPACES IN testcat", Seq("ns1", "ns2")) + + // Look up sub-namespaces. + testShowNamespaces("SHOW NAMESPACES IN testcat.ns1", Seq("ns1.ns1_1", "ns1.ns1_2")) + testShowNamespaces("SHOW NAMESPACES IN testcat.ns1 LIKE '*2*'", Seq("ns1.ns1_2")) + testShowNamespaces("SHOW NAMESPACES IN testcat.ns2", Seq("ns2.ns2_1")) + + // Try to look up namespaces that do not exist. + testShowNamespaces("SHOW NAMESPACES IN testcat.ns3", Seq()) + testShowNamespaces("SHOW NAMESPACES IN testcat.ns1.ns3", Seq()) + } + + test("ShowNamespaces: default v2 catalog is not set") { + spark.sql("CREATE TABLE testcat.ns.table (id bigint) USING foo") + + val exception = intercept[AnalysisException] { + sql("SHOW NAMESPACES") + } + + assert(exception.getMessage.contains("No default v2 catalog is set")) + } + + test("ShowNamespaces: default v2 catalog doesn't support namespace") { + spark.conf.set( + "spark.sql.catalog.testcat_no_namspace", + classOf[BasicInMemoryTableCatalog].getName) + spark.conf.set("spark.sql.default.catalog", "testcat_no_namspace") + + val exception = intercept[AnalysisException] { + sql("SHOW NAMESPACES") + } + + assert(exception.getMessage.contains("does not support namespaces")) + } + + test("ShowNamespaces: v2 catalog doesn't support namespace") { + spark.conf.set( + "spark.sql.catalog.testcat_no_namspace", + classOf[BasicInMemoryTableCatalog].getName) + + val exception = intercept[AnalysisException] { + sql("SHOW NAMESPACES in testcat_no_namspace") + } + + assert(exception.getMessage.contains("does not support namespaces")) + } + + test("ShowNamespaces: no v2 catalog is available") { + val exception = intercept[AnalysisException] { + sql("SHOW NAMESPACES in dummy") + } + + assert(exception.getMessage.contains("No v2 catalog is available")) + } + + private def testShowNamespaces( + sqlText: String, + expected: Seq[String]): Unit = { + val schema = new StructType().add("namespace", StringType, nullable = false) + + val df = spark.sql(sqlText) + assert(df.schema === schema) + assert(df.collect().map(_.getAs[String](0)).sorted === expected.sorted) + } + test("tableCreation: partition column case insensitive resolution") { val testCatalog = catalog("testcat").asTableCatalog val sessionCatalog = catalog("session").asTableCatalog