[SPARK-34577][SQL] Fix drop/add columns to a dataset of DESCRIBE NAMESPACE

### What changes were proposed in this pull request?
In the PR, I propose to generate "stable" output attributes per the logical node of the DESCRIBE NAMESPACE command.

### Why are the changes needed?
This fixes the issue demonstrated by the example:

```
sql(s"CREATE NAMESPACE ns")
val description = sql(s"DESCRIBE NAMESPACE ns")
description.drop("name")
```

```
[info]   org.apache.spark.sql.AnalysisException: Resolved attribute(s) name#74 missing from name#25,value#26 in operator !Project [name#74]. Attribute(s) with the same name appear in the operation: name. Please check if the right attribute(s) are used.;
[info] !Project [name#74]
[info] +- LocalRelation [name#25, value#26]
```

### Does this PR introduce _any_ user-facing change?
After this change user `drop()/add()` works well.

### How was this patch tested?
Added UT

Closes #31705 from AngersZhuuuu/SPARK-34577.

Authored-by: Angerszhuuuu <angers.zhu@gmail.com>
Signed-off-by: Wenchen Fan <wenchen@databricks.com>
This commit is contained in:
Angerszhuuuu 2021-03-04 13:22:10 +08:00 committed by Wenchen Fan
parent 8f1eec4d13
commit db627107b7
6 changed files with 47 additions and 21 deletions

View file

@ -296,14 +296,18 @@ case class DropNamespace(
*/
case class DescribeNamespace(
namespace: LogicalPlan,
extended: Boolean) extends Command {
extended: Boolean,
override val output: Seq[Attribute] = DescribeNamespace.getOutputAttr) extends Command {
override def children: Seq[LogicalPlan] = Seq(namespace)
override def output: Seq[Attribute] = Seq(
AttributeReference("name", StringType, nullable = false,
new MetadataBuilder().putString("comment", "name of the column").build())(),
AttributeReference("value", StringType, nullable = true,
new MetadataBuilder().putString("comment", "value of the column").build())())
}
object DescribeNamespace {
def getOutputAttr: Seq[Attribute] = Seq(
AttributeReference("info_name", StringType, nullable = false,
new MetadataBuilder().putString("comment", "name of the namespace info").build())(),
AttributeReference("info_value", StringType, nullable = true,
new MetadataBuilder().putString("comment", "value of the namespace info").build())())
}
/**

View file

@ -179,8 +179,15 @@ class ResolveSessionCatalog(val catalogManager: CatalogManager)
case UnsetViewProperties(ResolvedView(ident, _), keys, ifExists) =>
AlterTableUnsetPropertiesCommand(ident.asTableIdentifier, keys, ifExists, isView = true)
case d @ DescribeNamespace(DatabaseInSessionCatalog(db), _) =>
DescribeDatabaseCommand(db, d.extended)
case DescribeNamespace(DatabaseInSessionCatalog(db), extended, output) =>
val newOutput = if (conf.getConf(SQLConf.LEGACY_KEEP_COMMAND_OUTPUT_SCHEMA)) {
assert(output.length == 2)
Seq(output.head.withName("database_description_item"),
output.last.withName("database_description_value"))
} else {
output
}
DescribeDatabaseCommand(db, extended, newOutput)
case SetNamespaceProperties(DatabaseInSessionCatalog(db), properties) =>
AlterDatabasePropertiesCommand(db, properties)

View file

@ -35,7 +35,7 @@ import org.apache.spark.sql.catalyst.TableIdentifier
import org.apache.spark.sql.catalyst.analysis.Resolver
import org.apache.spark.sql.catalyst.catalog._
import org.apache.spark.sql.catalyst.catalog.CatalogTypes.TablePartitionSpec
import org.apache.spark.sql.catalyst.expressions.{Attribute, AttributeReference}
import org.apache.spark.sql.catalyst.expressions.Attribute
import org.apache.spark.sql.catalyst.plans.logical.LogicalPlan
import org.apache.spark.sql.connector.catalog.{CatalogV2Util, TableCatalog}
import org.apache.spark.sql.connector.catalog.SupportsNamespaces._
@ -169,7 +169,8 @@ case class AlterDatabaseSetLocationCommand(databaseName: String, location: Strin
*/
case class DescribeDatabaseCommand(
databaseName: String,
extended: Boolean)
extended: Boolean,
override val output: Seq[Attribute])
extends RunnableCommand {
override def run(sparkSession: SparkSession): Seq[Row] = {
@ -195,11 +196,6 @@ case class DescribeDatabaseCommand(
result
}
}
override val output: Seq[Attribute] = {
AttributeReference("database_description_item", StringType, nullable = false)() ::
AttributeReference("database_description_value", StringType, nullable = false)() :: Nil
}
}
/**

View file

@ -264,8 +264,8 @@ class DataSourceV2Strategy(session: SparkSession) extends Strategy with Predicat
case WriteToContinuousDataSource(writer, query) =>
WriteToContinuousDataSourceExec(writer, planLater(query)) :: Nil
case desc @ DescribeNamespace(ResolvedNamespace(catalog, ns), extended) =>
DescribeNamespaceExec(desc.output, catalog.asNamespaceCatalog, ns, extended) :: Nil
case DescribeNamespace(ResolvedNamespace(catalog, ns), extended, output) =>
DescribeNamespaceExec(output, catalog.asNamespaceCatalog, ns, extended) :: Nil
case DescribeRelation(r: ResolvedTable, partitionSpec, isExtended, output) =>
if (partitionSpec.nonEmpty) {

View file

@ -1209,8 +1209,8 @@ class DataSourceV2SQLSuite
val descriptionDf = sql("DESCRIBE NAMESPACE testcat.ns1.ns2")
assert(descriptionDf.schema.map(field => (field.name, field.dataType)) ===
Seq(
("name", StringType),
("value", StringType)
("info_name", StringType),
("info_value", StringType)
))
val description = descriptionDf.collect()
assert(description === Seq(
@ -2671,6 +2671,25 @@ class DataSourceV2SQLSuite
}
}
test("SPARK-34577: drop/add columns to a dataset of `DESCRIBE NAMESPACE`") {
withNamespace("ns") {
sql("CREATE NAMESPACE ns")
val description = sql(s"DESCRIBE NAMESPACE ns")
val noCommentDataset = description.drop("info_name")
val expectedSchema = new StructType()
.add(
name = "info_value",
dataType = StringType,
nullable = true,
metadata = new MetadataBuilder()
.putString("comment", "value of the namespace info").build())
assert(noCommentDataset.schema === expectedSchema)
val isNullDataset = noCommentDataset
.withColumn("is_null", noCommentDataset("info_value").isNull)
assert(isNullDataset.schema === expectedSchema.add("is_null", BooleanType, false))
}
}
private def testNotSupportedV2Command(sqlCommand: String, sqlParams: String): Unit = {
val e = intercept[AnalysisException] {
sql(s"$sqlCommand $sqlParams")

View file

@ -369,11 +369,11 @@ class HiveCatalogedDDLSuite extends DDLSuite with TestHiveSingleton with BeforeA
val db = "spark_29425_1"
sql(s"CREATE DATABASE $db")
assert(sql(s"DESCRIBE DATABASE EXTENDED $db")
.where("database_description_item='Owner'")
.where("info_name='Owner'")
.collect().head.getString(1) === Utils.getCurrentUserName())
sql(s"ALTER DATABASE $db SET DBPROPERTIES('abc'='xyz')")
assert(sql(s"DESCRIBE DATABASE EXTENDED $db")
.where("database_description_item='Owner'")
.where("info_name='Owner'")
.collect().head.getString(1) === Utils.getCurrentUserName())
} finally {
catalog.reset()