diff --git a/sql/catalyst/src/main/java/org/apache/spark/sql/connector/catalog/SupportsNamespaces.java b/sql/catalyst/src/main/java/org/apache/spark/sql/connector/catalog/SupportsNamespaces.java index 171777777d..2e60487287 100644 --- a/sql/catalyst/src/main/java/org/apache/spark/sql/connector/catalog/SupportsNamespaces.java +++ b/sql/catalyst/src/main/java/org/apache/spark/sql/connector/catalog/SupportsNamespaces.java @@ -53,6 +53,16 @@ public interface SupportsNamespaces extends CatalogPlugin { */ String PROP_COMMENT = "comment"; + /** + * A property to specify the owner of the namespace. + */ + String PROP_OWNER_NAME = "ownerName"; + + /** + * A property to specify the type of the namespace's owner. + */ + String PROP_OWNER_TYPE = "ownerType"; + /** * The list of reserved namespace properties. */ diff --git a/sql/catalyst/src/test/scala/org/apache/spark/sql/catalyst/catalog/ExternalCatalogSuite.scala b/sql/catalyst/src/test/scala/org/apache/spark/sql/catalyst/catalog/ExternalCatalogSuite.scala index 6b1c35094e..98a53b59fa 100644 --- a/sql/catalyst/src/test/scala/org/apache/spark/sql/catalyst/catalog/ExternalCatalogSuite.scala +++ b/sql/catalyst/src/test/scala/org/apache/spark/sql/catalyst/catalog/ExternalCatalogSuite.scala @@ -31,6 +31,7 @@ import org.apache.spark.sql.catalyst.analysis.{FunctionAlreadyExistsException, N import org.apache.spark.sql.catalyst.analysis.TableAlreadyExistsException import org.apache.spark.sql.catalyst.dsl.expressions._ import org.apache.spark.sql.catalyst.expressions._ +import org.apache.spark.sql.connector.catalog.SupportsNamespaces.{PROP_OWNER_NAME, PROP_OWNER_TYPE} import org.apache.spark.sql.types._ import org.apache.spark.util.Utils @@ -143,8 +144,9 @@ abstract class ExternalCatalogSuite extends SparkFunSuite with BeforeAndAfterEac // Note: alter properties here because Hive does not support altering other fields catalog.alterDatabase(db1.copy(properties = Map("k" -> "v3", "good" -> "true"))) val newDb1 = catalog.getDatabase("db1") - assert(db1.properties.isEmpty) - assert(newDb1.properties.size == 2) + val reversedProperties = Seq(PROP_OWNER_NAME, PROP_OWNER_TYPE) + assert((db1.properties -- reversedProperties).isEmpty) + assert((newDb1.properties -- reversedProperties).size == 2) assert(newDb1.properties.get("k") == Some("v3")) assert(newDb1.properties.get("good") == Some("true")) } diff --git a/sql/catalyst/src/test/scala/org/apache/spark/sql/catalyst/catalog/SessionCatalogSuite.scala b/sql/catalyst/src/test/scala/org/apache/spark/sql/catalyst/catalog/SessionCatalogSuite.scala index f334ba5690..a21aaa2429 100644 --- a/sql/catalyst/src/test/scala/org/apache/spark/sql/catalyst/catalog/SessionCatalogSuite.scala +++ b/sql/catalyst/src/test/scala/org/apache/spark/sql/catalyst/catalog/SessionCatalogSuite.scala @@ -18,11 +18,12 @@ package org.apache.spark.sql.catalyst.catalog import org.apache.spark.sql.AnalysisException -import org.apache.spark.sql.catalyst.{AliasIdentifier, FunctionIdentifier, TableIdentifier} +import org.apache.spark.sql.catalyst.{FunctionIdentifier, TableIdentifier} import org.apache.spark.sql.catalyst.analysis._ import org.apache.spark.sql.catalyst.expressions._ import org.apache.spark.sql.catalyst.parser.CatalystSqlParser import org.apache.spark.sql.catalyst.plans.logical.{Range, SubqueryAlias, View} +import org.apache.spark.sql.connector.catalog.SupportsNamespaces.{PROP_OWNER_NAME, PROP_OWNER_TYPE} import org.apache.spark.sql.internal.SQLConf import org.apache.spark.sql.types._ @@ -207,8 +208,9 @@ abstract class SessionCatalogSuite extends AnalysisTest { // Note: alter properties here because Hive does not support altering other fields catalog.alterDatabase(db1.copy(properties = Map("k" -> "v3", "good" -> "true"))) val newDb1 = catalog.getDatabaseMetadata("db1") - assert(db1.properties.isEmpty) - assert(newDb1.properties.size == 2) + val reversedProperties = Seq(PROP_OWNER_NAME, PROP_OWNER_TYPE) + assert((db1.properties -- reversedProperties).isEmpty) + assert((newDb1.properties -- reversedProperties).size == 2) assert(newDb1.properties.get("k") == Some("v3")) assert(newDb1.properties.get("good") == Some("true")) } diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/command/ddl.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/command/ddl.scala index 3645d38b3b..bdba10eb48 100644 --- a/sql/core/src/main/scala/org/apache/spark/sql/execution/command/ddl.scala +++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/command/ddl.scala @@ -21,6 +21,7 @@ import java.util.Locale import java.util.concurrent.TimeUnit._ import scala.collection.{GenMap, GenSeq} +import scala.collection.JavaConverters._ import scala.collection.parallel.ForkJoinTaskSupport import scala.collection.parallel.immutable.ParVector import scala.util.control.NonFatal @@ -37,6 +38,7 @@ import org.apache.spark.sql.catalyst.catalog._ import org.apache.spark.sql.catalyst.catalog.CatalogTypes.TablePartitionSpec import org.apache.spark.sql.catalyst.expressions.{Attribute, AttributeReference} import org.apache.spark.sql.catalyst.plans.logical.LogicalPlan +import org.apache.spark.sql.connector.catalog.SupportsNamespaces._ import org.apache.spark.sql.execution.datasources.{HadoopFsRelation, LogicalRelation, PartitioningUtils} import org.apache.spark.sql.execution.datasources.orc.OrcFileFormat import org.apache.spark.sql.execution.datasources.parquet.ParquetSchemaConverter @@ -172,19 +174,23 @@ case class DescribeDatabaseCommand( override def run(sparkSession: SparkSession): Seq[Row] = { val dbMetadata: CatalogDatabase = sparkSession.sessionState.catalog.getDatabaseMetadata(databaseName) + val allDbProperties = dbMetadata.properties val result = Row("Database Name", dbMetadata.name) :: Row("Description", dbMetadata.description) :: - Row("Location", CatalogUtils.URIToString(dbMetadata.locationUri)) :: Nil + Row("Location", CatalogUtils.URIToString(dbMetadata.locationUri)):: + Row("Owner Name", allDbProperties.getOrElse(PROP_OWNER_NAME, "")) :: + Row("Owner Type", allDbProperties.getOrElse(PROP_OWNER_TYPE, "")) :: Nil if (extended) { - val properties = - if (dbMetadata.properties.isEmpty) { + val properties = allDbProperties -- Seq(PROP_OWNER_NAME, PROP_OWNER_TYPE) + val propertiesStr = + if (properties.isEmpty) { "" } else { - dbMetadata.properties.toSeq.mkString("(", ", ", ")") + properties.toSeq.mkString("(", ", ", ")") } - result :+ Row("Properties", properties) + result :+ Row("Properties", propertiesStr) } else { result } diff --git a/sql/core/src/test/scala/org/apache/spark/sql/execution/command/DDLSuite.scala b/sql/core/src/test/scala/org/apache/spark/sql/execution/command/DDLSuite.scala index f777fa44b5..c762f25d62 100644 --- a/sql/core/src/test/scala/org/apache/spark/sql/execution/command/DDLSuite.scala +++ b/sql/core/src/test/scala/org/apache/spark/sql/execution/command/DDLSuite.scala @@ -22,7 +22,6 @@ import java.net.URI import java.util.Locale import org.apache.hadoop.fs.Path -import org.scalatest.BeforeAndAfterEach import org.apache.spark.internal.config import org.apache.spark.internal.config.RDD_PARALLEL_LISTING_THRESHOLD @@ -31,6 +30,7 @@ import org.apache.spark.sql.catalyst.TableIdentifier import org.apache.spark.sql.catalyst.analysis.{FunctionRegistry, NoSuchDatabaseException, NoSuchPartitionException, NoSuchTableException, TempTableAlreadyExistsException} import org.apache.spark.sql.catalyst.catalog._ import org.apache.spark.sql.catalyst.catalog.CatalogTypes.TablePartitionSpec +import org.apache.spark.sql.connector.catalog.SupportsNamespaces.{PROP_OWNER_NAME, PROP_OWNER_TYPE} import org.apache.spark.sql.internal.SQLConf import org.apache.spark.sql.internal.StaticSQLConf.CATALOG_IMPLEMENTATION import org.apache.spark.sql.test.{SharedSparkSession, SQLTestUtils} @@ -185,6 +185,8 @@ class InMemoryCatalogedDDLSuite extends DDLSuite with SharedSparkSession { abstract class DDLSuite extends QueryTest with SQLTestUtils { + protected val reversedProperties = Seq(PROP_OWNER_NAME, PROP_OWNER_TYPE) + protected def isUsingHiveMetastore: Boolean = { spark.sparkContext.conf.get(CATALOG_IMPLEMENTATION) == "hive" } @@ -328,7 +330,7 @@ abstract class DDLSuite extends QueryTest with SQLTestUtils { try { sql(s"CREATE DATABASE $dbName") val db1 = catalog.getDatabaseMetadata(dbName) - assert(db1 == CatalogDatabase( + assert(db1.copy(properties = db1.properties -- reversedProperties) == CatalogDatabase( dbName, "", getDBPath(dbName), @@ -351,7 +353,7 @@ abstract class DDLSuite extends QueryTest with SQLTestUtils { sql(s"CREATE DATABASE $dbName Location '$path'") val db1 = catalog.getDatabaseMetadata(dbNameWithoutBackTicks) val expPath = makeQualifiedPath(tmpDir.toString) - assert(db1 == CatalogDatabase( + assert(db1.copy(properties = db1.properties -- reversedProperties) == CatalogDatabase( dbNameWithoutBackTicks, "", expPath, @@ -374,7 +376,7 @@ abstract class DDLSuite extends QueryTest with SQLTestUtils { val dbNameWithoutBackTicks = cleanIdentifier(dbName) sql(s"CREATE DATABASE $dbName") val db1 = catalog.getDatabaseMetadata(dbNameWithoutBackTicks) - assert(db1 == CatalogDatabase( + assert(db1.copy(properties = db1.properties -- reversedProperties) == CatalogDatabase( dbNameWithoutBackTicks, "", getDBPath(dbNameWithoutBackTicks), @@ -747,7 +749,8 @@ abstract class DDLSuite extends QueryTest with SQLTestUtils { sql(s"CREATE DATABASE $dbName") checkAnswer( - sql(s"DESCRIBE DATABASE EXTENDED $dbName"), + sql(s"DESCRIBE DATABASE EXTENDED $dbName").toDF("key", "value") + .where("key not like 'Owner%'"), // filter for consistency with in-memory catalog Row("Database Name", dbNameWithoutBackTicks) :: Row("Description", "") :: Row("Location", CatalogUtils.URIToString(location)) :: @@ -756,7 +759,8 @@ abstract class DDLSuite extends QueryTest with SQLTestUtils { sql(s"ALTER DATABASE $dbName SET DBPROPERTIES ('a'='a', 'b'='b', 'c'='c')") checkAnswer( - sql(s"DESCRIBE DATABASE EXTENDED $dbName"), + sql(s"DESCRIBE DATABASE EXTENDED $dbName").toDF("key", "value") + .where("key not like 'Owner%'"), // filter for consistency with in-memory catalog Row("Database Name", dbNameWithoutBackTicks) :: Row("Description", "") :: Row("Location", CatalogUtils.URIToString(location)) :: @@ -765,7 +769,8 @@ abstract class DDLSuite extends QueryTest with SQLTestUtils { sql(s"ALTER DATABASE $dbName SET DBPROPERTIES ('d'='d')") checkAnswer( - sql(s"DESCRIBE DATABASE EXTENDED $dbName"), + sql(s"DESCRIBE DATABASE EXTENDED $dbName").toDF("key", "value") + .where("key not like 'Owner%'"), // filter for consistency with in-memory catalog Row("Database Name", dbNameWithoutBackTicks) :: Row("Description", "") :: Row("Location", CatalogUtils.URIToString(location)) :: diff --git a/sql/hive/src/main/scala/org/apache/spark/sql/hive/client/HiveClientImpl.scala b/sql/hive/src/main/scala/org/apache/spark/sql/hive/client/HiveClientImpl.scala index d4342e4271..700c0884dd 100644 --- a/sql/hive/src/main/scala/org/apache/spark/sql/hive/client/HiveClientImpl.scala +++ b/sql/hive/src/main/scala/org/apache/spark/sql/hive/client/HiveClientImpl.scala @@ -32,8 +32,7 @@ import org.apache.hadoop.hive.common.StatsSetupConst import org.apache.hadoop.hive.conf.HiveConf import org.apache.hadoop.hive.conf.HiveConf.ConfVars import org.apache.hadoop.hive.metastore.{IMetaStoreClient, TableType => HiveTableType} -import org.apache.hadoop.hive.metastore.api.{Database => HiveDatabase, Table => MetaStoreApiTable} -import org.apache.hadoop.hive.metastore.api.{FieldSchema, Order, SerDeInfo, StorageDescriptor} +import org.apache.hadoop.hive.metastore.api.{Database => HiveDatabase, Table => MetaStoreApiTable, _} import org.apache.hadoop.hive.ql.Driver import org.apache.hadoop.hive.ql.metadata.{Hive, HiveException, Partition => HivePartition, Table => HiveTable} import org.apache.hadoop.hive.ql.parse.BaseSemanticAnalyzer.HIVE_COLUMN_ORDER_ASC @@ -54,6 +53,7 @@ import org.apache.spark.sql.catalyst.catalog._ import org.apache.spark.sql.catalyst.catalog.CatalogTypes.TablePartitionSpec import org.apache.spark.sql.catalyst.expressions.Expression import org.apache.spark.sql.catalyst.parser.{CatalystSqlParser, ParseException} +import org.apache.spark.sql.connector.catalog.SupportsNamespaces._ import org.apache.spark.sql.execution.QueryExecutionException import org.apache.spark.sql.hive.HiveExternalCatalog import org.apache.spark.sql.hive.HiveExternalCatalog.{DATASOURCE_SCHEMA, DATASOURCE_SCHEMA_NUMPARTS, DATASOURCE_SCHEMA_PART_PREFIX} @@ -355,13 +355,8 @@ private[hive] class HiveClientImpl( override def createDatabase( database: CatalogDatabase, ignoreIfExists: Boolean): Unit = withHiveState { - client.createDatabase( - new HiveDatabase( - database.name, - database.description, - CatalogUtils.URIToString(database.locationUri), - Option(database.properties).map(_.asJava).orNull), - ignoreIfExists) + val hiveDb = toHiveDatabase(database, true) + client.createDatabase(hiveDb, ignoreIfExists) } override def dropDatabase( @@ -379,22 +374,38 @@ private[hive] class HiveClientImpl( s"Hive ${version.fullVersion} does not support altering database location") } } - client.alterDatabase( + val hiveDb = toHiveDatabase(database, false) + client.alterDatabase(database.name, hiveDb) + } + + private def toHiveDatabase(database: CatalogDatabase, isCreate: Boolean): HiveDatabase = { + val props = database.properties + val hiveDb = new HiveDatabase( database.name, - new HiveDatabase( - database.name, - database.description, - CatalogUtils.URIToString(database.locationUri), - Option(database.properties).map(_.asJava).orNull)) + database.description, + CatalogUtils.URIToString(database.locationUri), + (props -- Seq(PROP_OWNER_NAME, PROP_OWNER_TYPE)).asJava) + props.get(PROP_OWNER_NAME).orElse(if (isCreate) Some(userName) else None).foreach { ownerName => + shim.setDatabaseOwnerName(hiveDb, ownerName) + } + props.get(PROP_OWNER_TYPE).orElse(if (isCreate) Some(PrincipalType.USER.name) else None) + .foreach { ownerType => + shim.setDatabaseOwnerType(hiveDb, ownerType) + } + hiveDb } override def getDatabase(dbName: String): CatalogDatabase = withHiveState { Option(client.getDatabase(dbName)).map { d => + val paras = Option(d.getParameters).map(_.asScala.toMap).getOrElse(Map()) ++ + Map(PROP_OWNER_NAME -> shim.getDatabaseOwnerName(d), + PROP_OWNER_TYPE -> shim.getDatabaseOwnerType(d)) + CatalogDatabase( name = d.getName, description = Option(d.getDescription).getOrElse(""), locationUri = CatalogUtils.stringToURI(d.getLocationUri), - properties = Option(d.getParameters).map(_.asScala.toMap).orNull) + properties = paras) }.getOrElse(throw new NoSuchDatabaseException(dbName)) } diff --git a/sql/hive/src/main/scala/org/apache/spark/sql/hive/client/HiveShim.scala b/sql/hive/src/main/scala/org/apache/spark/sql/hive/client/HiveShim.scala index 586fbbefad..01ddaf2ee8 100644 --- a/sql/hive/src/main/scala/org/apache/spark/sql/hive/client/HiveShim.scala +++ b/sql/hive/src/main/scala/org/apache/spark/sql/hive/client/HiveShim.scala @@ -29,8 +29,7 @@ import scala.util.control.NonFatal import org.apache.hadoop.fs.Path import org.apache.hadoop.hive.conf.HiveConf import org.apache.hadoop.hive.metastore.IMetaStoreClient -import org.apache.hadoop.hive.metastore.api.{EnvironmentContext, Function => HiveFunction, FunctionType} -import org.apache.hadoop.hive.metastore.api.{MetaException, PrincipalType, ResourceType, ResourceUri} +import org.apache.hadoop.hive.metastore.api.{Database, EnvironmentContext, Function => HiveFunction, FunctionType, MetaException, PrincipalType, ResourceType, ResourceUri} import org.apache.hadoop.hive.ql.Driver import org.apache.hadoop.hive.ql.io.AcidUtils import org.apache.hadoop.hive.ql.metadata.{Hive, HiveException, Partition, Table} @@ -154,6 +153,14 @@ private[client] sealed abstract class Shim { deleteData: Boolean, purge: Boolean): Unit + def getDatabaseOwnerName(db: Database): String + + def setDatabaseOwnerName(db: Database, owner: String): Unit + + def getDatabaseOwnerType(db: Database): String + + def setDatabaseOwnerType(db: Database, ownerType: String): Unit + protected def findStaticMethod(klass: Class[_], name: String, args: Class[_]*): Method = { val method = findMethod(klass, name, args: _*) require(Modifier.isStatic(method.getModifiers()), @@ -456,6 +463,14 @@ private[client] class Shim_v0_12 extends Shim with Logging { def listFunctions(hive: Hive, db: String, pattern: String): Seq[String] = { Seq.empty[String] } + + override def getDatabaseOwnerName(db: Database): String = "" + + override def setDatabaseOwnerName(db: Database, owner: String): Unit = {} + + override def getDatabaseOwnerType(db: Database): String = "" + + override def setDatabaseOwnerType(db: Database, ownerType: String): Unit = {} } private[client] class Shim_v0_13 extends Shim_v0_12 { @@ -493,6 +508,28 @@ private[client] class Shim_v0_13 extends Shim_v0_12 { "getResults", classOf[JList[Object]]) + private lazy val getDatabaseOwnerNameMethod = + findMethod( + classOf[Database], + "getOwnerName") + + private lazy val setDatabaseOwnerNameMethod = + findMethod( + classOf[Database], + "setOwnerName", + classOf[String]) + + private lazy val getDatabaseOwnerTypeMethod = + findMethod( + classOf[Database], + "getOwnerType") + + private lazy val setDatabaseOwnerTypeMethod = + findMethod( + classOf[Database], + "setOwnerType", + classOf[PrincipalType]) + override def setCurrentSessionState(state: SessionState): Unit = setCurrentSessionStateMethod.invoke(null, state) @@ -809,6 +846,22 @@ private[client] class Shim_v0_13 extends Shim_v0_12 { } } + override def getDatabaseOwnerName(db: Database): String = { + Option(getDatabaseOwnerNameMethod.invoke(db)).map(_.asInstanceOf[String]).getOrElse("") + } + + override def setDatabaseOwnerName(db: Database, owner: String): Unit = { + setDatabaseOwnerNameMethod.invoke(db, owner) + } + + override def getDatabaseOwnerType(db: Database): String = { + Option(getDatabaseOwnerTypeMethod.invoke(db)) + .map(_.asInstanceOf[PrincipalType].name()).getOrElse("") + } + + override def setDatabaseOwnerType(db: Database, ownerType: String): Unit = { + setDatabaseOwnerTypeMethod.invoke(db, PrincipalType.valueOf(ownerType)) + } } private[client] class Shim_v0_14 extends Shim_v0_13 { diff --git a/sql/hive/src/test/scala/org/apache/spark/sql/hive/client/VersionsSuite.scala b/sql/hive/src/test/scala/org/apache/spark/sql/hive/client/VersionsSuite.scala index e379704188..e07978ae18 100644 --- a/sql/hive/src/test/scala/org/apache/spark/sql/hive/client/VersionsSuite.scala +++ b/sql/hive/src/test/scala/org/apache/spark/sql/hive/client/VersionsSuite.scala @@ -27,6 +27,7 @@ import org.apache.hadoop.hive.common.StatsSetupConst import org.apache.hadoop.hive.ql.io.HiveIgnoreKeyTextOutputFormat import org.apache.hadoop.hive.serde2.`lazy`.LazySimpleSerDe import org.apache.hadoop.mapred.TextInputFormat +import org.apache.hadoop.security.UserGroupInformation import org.apache.spark.SparkFunSuite import org.apache.spark.internal.Logging @@ -170,6 +171,34 @@ class VersionsSuite extends SparkFunSuite with Logging { client.createDatabase(tempDB, ignoreIfExists = true) } + test(s"$version: create/get/alter database should pick right user name as owner") { + if (version != "0.12") { + val currentUser = UserGroupInformation.getCurrentUser.getUserName + val ownerName = "SPARK_29425" + val db1 = "SPARK_29425_1" + val db2 = "SPARK_29425_2" + val ownerProps = Map("ownerName" -> ownerName) + + // create database with owner + val dbWithOwner = CatalogDatabase(db1, "desc", Utils.createTempDir().toURI, ownerProps) + client.createDatabase(dbWithOwner, ignoreIfExists = true) + val getDbWithOwner = client.getDatabase(db1) + assert(getDbWithOwner.properties("ownerName") === ownerName) + // alter database without owner + client.alterDatabase(getDbWithOwner.copy(properties = Map())) + assert(client.getDatabase(db1).properties("ownerName") === "") + + // create database without owner + val dbWithoutOwner = CatalogDatabase(db2, "desc", Utils.createTempDir().toURI, Map()) + client.createDatabase(dbWithoutOwner, ignoreIfExists = true) + val getDbWithoutOwner = client.getDatabase(db2) + assert(getDbWithoutOwner.properties("ownerName") === currentUser) + // alter database with owner + client.alterDatabase(getDbWithoutOwner.copy(properties = ownerProps)) + assert(client.getDatabase(db2).properties("ownerName") === ownerName) + } + } + test(s"$version: createDatabase with null description") { withTempDir { tmpDir => val dbWithNullDesc = diff --git a/sql/hive/src/test/scala/org/apache/spark/sql/hive/execution/HiveDDLSuite.scala b/sql/hive/src/test/scala/org/apache/spark/sql/hive/execution/HiveDDLSuite.scala index 56f424d978..0684d66558 100644 --- a/sql/hive/src/test/scala/org/apache/spark/sql/hive/execution/HiveDDLSuite.scala +++ b/sql/hive/src/test/scala/org/apache/spark/sql/hive/execution/HiveDDLSuite.scala @@ -31,6 +31,7 @@ import org.apache.spark.sql.catalyst.TableIdentifier import org.apache.spark.sql.catalyst.analysis.{NoSuchPartitionException, TableAlreadyExistsException} import org.apache.spark.sql.catalyst.catalog._ import org.apache.spark.sql.catalyst.parser.ParseException +import org.apache.spark.sql.connector.catalog.SupportsNamespaces.{PROP_OWNER_NAME, PROP_OWNER_TYPE} import org.apache.spark.sql.execution.command.{DDLSuite, DDLUtils} import org.apache.spark.sql.functions._ import org.apache.spark.sql.hive.HiveExternalCatalog @@ -372,12 +373,45 @@ class HiveCatalogedDDLSuite extends DDLSuite with TestHiveSingleton with BeforeA assert(table.provider == Some("org.apache.spark.sql.hive.orc")) } } + + private def checkOwner(db: String, expected: String): Unit = { + val owner = sql(s"DESCRIBE DATABASE EXTENDED $db") + .where("database_description_item='Owner Name'") + .collect().head.getString(1) + assert(owner === expected) + } + + test("Database Ownership") { + val catalog = spark.sessionState.catalog + try { + val db1 = "spark_29425_1" + val db2 = "spark_29425_2" + val owner = "spark_29425" + + sql(s"CREATE DATABASE $db1") + checkOwner(db1, Utils.getCurrentUserName()) + sql(s"ALTER DATABASE $db1 SET DBPROPERTIES ('a'='a')") + checkOwner(db1, Utils.getCurrentUserName()) + + // TODO: Specify ownership should be forbidden after we implement `SET OWNER` syntax + sql(s"CREATE DATABASE $db2 WITH DBPROPERTIES('ownerName'='$owner')") + checkOwner(db2, owner) + sql(s"ALTER DATABASE $db2 SET DBPROPERTIES ('a'='a')") + checkOwner(db2, owner) + // TODO: Changing ownership should be forbidden after we implement `SET OWNER` syntax + sql(s"ALTER DATABASE $db2 SET DBPROPERTIES ('ownerName'='a')") + checkOwner(db2, "a") + } finally { + catalog.reset() + } + } } class HiveDDLSuite extends QueryTest with SQLTestUtils with TestHiveSingleton with BeforeAndAfterEach { import testImplicits._ val hiveFormats = Seq("PARQUET", "ORC", "TEXTFILE", "SEQUENCEFILE", "RCFILE", "AVRO") + private val reversedProperties = Seq("ownerName", "ownerType") override def afterEach(): Unit = { try { @@ -1112,7 +1146,8 @@ class HiveDDLSuite sql(s"CREATE DATABASE $dbName Location '${tmpDir.toURI.getPath.stripSuffix("/")}'") val db1 = catalog.getDatabaseMetadata(dbName) val dbPath = new URI(tmpDir.toURI.toString.stripSuffix("/")) - assert(db1 == CatalogDatabase(dbName, "", dbPath, Map.empty)) + assert(db1.copy(properties = db1.properties -- Seq(PROP_OWNER_NAME, PROP_OWNER_TYPE)) === + CatalogDatabase(dbName, "", dbPath, Map.empty)) sql("USE db1") sql(s"CREATE TABLE $tabName as SELECT 1") @@ -1150,7 +1185,8 @@ class HiveDDLSuite val expectedDBLocation = s"file:${dbPath.toUri.getPath.stripSuffix("/")}/$dbName.db" val expectedDBUri = CatalogUtils.stringToURI(expectedDBLocation) val db1 = catalog.getDatabaseMetadata(dbName) - assert(db1 == CatalogDatabase( + assert(db1.copy(properties = db1.properties -- Seq(PROP_OWNER_NAME, PROP_OWNER_TYPE)) == + CatalogDatabase( dbName, "", expectedDBUri,