[SPARK-29425][SQL] The ownership of a database should be respected

### What changes were proposed in this pull request?

Keep the owner of a database when executing alter database commands

### Why are the changes needed?

Spark will inadvertently delete the owner of a database for executing databases ddls

### Does this PR introduce any user-facing change?

NO

### How was this patch tested?

add and modify uts

Closes #26080 from yaooqinn/SPARK-29425.

Authored-by: Kent Yao <yaooqinn@hotmail.com>
Signed-off-by: Wenchen Fan <wenchen@databricks.com>
This commit is contained in:
Kent Yao 2019-12-05 16:14:27 +08:00 committed by Wenchen Fan
parent 0ab922c1eb
commit 332e252a14
9 changed files with 191 additions and 37 deletions

View file

@ -53,6 +53,16 @@ public interface SupportsNamespaces extends CatalogPlugin {
*/
String PROP_COMMENT = "comment";
/**
* A property to specify the owner of the namespace.
*/
String PROP_OWNER_NAME = "ownerName";
/**
* A property to specify the type of the namespace's owner.
*/
String PROP_OWNER_TYPE = "ownerType";
/**
* The list of reserved namespace properties.
*/

View file

@ -31,6 +31,7 @@ import org.apache.spark.sql.catalyst.analysis.{FunctionAlreadyExistsException, N
import org.apache.spark.sql.catalyst.analysis.TableAlreadyExistsException
import org.apache.spark.sql.catalyst.dsl.expressions._
import org.apache.spark.sql.catalyst.expressions._
import org.apache.spark.sql.connector.catalog.SupportsNamespaces.{PROP_OWNER_NAME, PROP_OWNER_TYPE}
import org.apache.spark.sql.types._
import org.apache.spark.util.Utils
@ -143,8 +144,9 @@ abstract class ExternalCatalogSuite extends SparkFunSuite with BeforeAndAfterEac
// Note: alter properties here because Hive does not support altering other fields
catalog.alterDatabase(db1.copy(properties = Map("k" -> "v3", "good" -> "true")))
val newDb1 = catalog.getDatabase("db1")
assert(db1.properties.isEmpty)
assert(newDb1.properties.size == 2)
val reversedProperties = Seq(PROP_OWNER_NAME, PROP_OWNER_TYPE)
assert((db1.properties -- reversedProperties).isEmpty)
assert((newDb1.properties -- reversedProperties).size == 2)
assert(newDb1.properties.get("k") == Some("v3"))
assert(newDb1.properties.get("good") == Some("true"))
}

View file

@ -18,11 +18,12 @@
package org.apache.spark.sql.catalyst.catalog
import org.apache.spark.sql.AnalysisException
import org.apache.spark.sql.catalyst.{AliasIdentifier, FunctionIdentifier, TableIdentifier}
import org.apache.spark.sql.catalyst.{FunctionIdentifier, TableIdentifier}
import org.apache.spark.sql.catalyst.analysis._
import org.apache.spark.sql.catalyst.expressions._
import org.apache.spark.sql.catalyst.parser.CatalystSqlParser
import org.apache.spark.sql.catalyst.plans.logical.{Range, SubqueryAlias, View}
import org.apache.spark.sql.connector.catalog.SupportsNamespaces.{PROP_OWNER_NAME, PROP_OWNER_TYPE}
import org.apache.spark.sql.internal.SQLConf
import org.apache.spark.sql.types._
@ -207,8 +208,9 @@ abstract class SessionCatalogSuite extends AnalysisTest {
// Note: alter properties here because Hive does not support altering other fields
catalog.alterDatabase(db1.copy(properties = Map("k" -> "v3", "good" -> "true")))
val newDb1 = catalog.getDatabaseMetadata("db1")
assert(db1.properties.isEmpty)
assert(newDb1.properties.size == 2)
val reversedProperties = Seq(PROP_OWNER_NAME, PROP_OWNER_TYPE)
assert((db1.properties -- reversedProperties).isEmpty)
assert((newDb1.properties -- reversedProperties).size == 2)
assert(newDb1.properties.get("k") == Some("v3"))
assert(newDb1.properties.get("good") == Some("true"))
}

View file

@ -21,6 +21,7 @@ import java.util.Locale
import java.util.concurrent.TimeUnit._
import scala.collection.{GenMap, GenSeq}
import scala.collection.JavaConverters._
import scala.collection.parallel.ForkJoinTaskSupport
import scala.collection.parallel.immutable.ParVector
import scala.util.control.NonFatal
@ -37,6 +38,7 @@ import org.apache.spark.sql.catalyst.catalog._
import org.apache.spark.sql.catalyst.catalog.CatalogTypes.TablePartitionSpec
import org.apache.spark.sql.catalyst.expressions.{Attribute, AttributeReference}
import org.apache.spark.sql.catalyst.plans.logical.LogicalPlan
import org.apache.spark.sql.connector.catalog.SupportsNamespaces._
import org.apache.spark.sql.execution.datasources.{HadoopFsRelation, LogicalRelation, PartitioningUtils}
import org.apache.spark.sql.execution.datasources.orc.OrcFileFormat
import org.apache.spark.sql.execution.datasources.parquet.ParquetSchemaConverter
@ -172,19 +174,23 @@ case class DescribeDatabaseCommand(
override def run(sparkSession: SparkSession): Seq[Row] = {
val dbMetadata: CatalogDatabase =
sparkSession.sessionState.catalog.getDatabaseMetadata(databaseName)
val allDbProperties = dbMetadata.properties
val result =
Row("Database Name", dbMetadata.name) ::
Row("Description", dbMetadata.description) ::
Row("Location", CatalogUtils.URIToString(dbMetadata.locationUri)) :: Nil
Row("Location", CatalogUtils.URIToString(dbMetadata.locationUri))::
Row("Owner Name", allDbProperties.getOrElse(PROP_OWNER_NAME, "")) ::
Row("Owner Type", allDbProperties.getOrElse(PROP_OWNER_TYPE, "")) :: Nil
if (extended) {
val properties =
if (dbMetadata.properties.isEmpty) {
val properties = allDbProperties -- Seq(PROP_OWNER_NAME, PROP_OWNER_TYPE)
val propertiesStr =
if (properties.isEmpty) {
""
} else {
dbMetadata.properties.toSeq.mkString("(", ", ", ")")
properties.toSeq.mkString("(", ", ", ")")
}
result :+ Row("Properties", properties)
result :+ Row("Properties", propertiesStr)
} else {
result
}

View file

@ -22,7 +22,6 @@ import java.net.URI
import java.util.Locale
import org.apache.hadoop.fs.Path
import org.scalatest.BeforeAndAfterEach
import org.apache.spark.internal.config
import org.apache.spark.internal.config.RDD_PARALLEL_LISTING_THRESHOLD
@ -31,6 +30,7 @@ import org.apache.spark.sql.catalyst.TableIdentifier
import org.apache.spark.sql.catalyst.analysis.{FunctionRegistry, NoSuchDatabaseException, NoSuchPartitionException, NoSuchTableException, TempTableAlreadyExistsException}
import org.apache.spark.sql.catalyst.catalog._
import org.apache.spark.sql.catalyst.catalog.CatalogTypes.TablePartitionSpec
import org.apache.spark.sql.connector.catalog.SupportsNamespaces.{PROP_OWNER_NAME, PROP_OWNER_TYPE}
import org.apache.spark.sql.internal.SQLConf
import org.apache.spark.sql.internal.StaticSQLConf.CATALOG_IMPLEMENTATION
import org.apache.spark.sql.test.{SharedSparkSession, SQLTestUtils}
@ -185,6 +185,8 @@ class InMemoryCatalogedDDLSuite extends DDLSuite with SharedSparkSession {
abstract class DDLSuite extends QueryTest with SQLTestUtils {
protected val reversedProperties = Seq(PROP_OWNER_NAME, PROP_OWNER_TYPE)
protected def isUsingHiveMetastore: Boolean = {
spark.sparkContext.conf.get(CATALOG_IMPLEMENTATION) == "hive"
}
@ -328,7 +330,7 @@ abstract class DDLSuite extends QueryTest with SQLTestUtils {
try {
sql(s"CREATE DATABASE $dbName")
val db1 = catalog.getDatabaseMetadata(dbName)
assert(db1 == CatalogDatabase(
assert(db1.copy(properties = db1.properties -- reversedProperties) == CatalogDatabase(
dbName,
"",
getDBPath(dbName),
@ -351,7 +353,7 @@ abstract class DDLSuite extends QueryTest with SQLTestUtils {
sql(s"CREATE DATABASE $dbName Location '$path'")
val db1 = catalog.getDatabaseMetadata(dbNameWithoutBackTicks)
val expPath = makeQualifiedPath(tmpDir.toString)
assert(db1 == CatalogDatabase(
assert(db1.copy(properties = db1.properties -- reversedProperties) == CatalogDatabase(
dbNameWithoutBackTicks,
"",
expPath,
@ -374,7 +376,7 @@ abstract class DDLSuite extends QueryTest with SQLTestUtils {
val dbNameWithoutBackTicks = cleanIdentifier(dbName)
sql(s"CREATE DATABASE $dbName")
val db1 = catalog.getDatabaseMetadata(dbNameWithoutBackTicks)
assert(db1 == CatalogDatabase(
assert(db1.copy(properties = db1.properties -- reversedProperties) == CatalogDatabase(
dbNameWithoutBackTicks,
"",
getDBPath(dbNameWithoutBackTicks),
@ -747,7 +749,8 @@ abstract class DDLSuite extends QueryTest with SQLTestUtils {
sql(s"CREATE DATABASE $dbName")
checkAnswer(
sql(s"DESCRIBE DATABASE EXTENDED $dbName"),
sql(s"DESCRIBE DATABASE EXTENDED $dbName").toDF("key", "value")
.where("key not like 'Owner%'"), // filter for consistency with in-memory catalog
Row("Database Name", dbNameWithoutBackTicks) ::
Row("Description", "") ::
Row("Location", CatalogUtils.URIToString(location)) ::
@ -756,7 +759,8 @@ abstract class DDLSuite extends QueryTest with SQLTestUtils {
sql(s"ALTER DATABASE $dbName SET DBPROPERTIES ('a'='a', 'b'='b', 'c'='c')")
checkAnswer(
sql(s"DESCRIBE DATABASE EXTENDED $dbName"),
sql(s"DESCRIBE DATABASE EXTENDED $dbName").toDF("key", "value")
.where("key not like 'Owner%'"), // filter for consistency with in-memory catalog
Row("Database Name", dbNameWithoutBackTicks) ::
Row("Description", "") ::
Row("Location", CatalogUtils.URIToString(location)) ::
@ -765,7 +769,8 @@ abstract class DDLSuite extends QueryTest with SQLTestUtils {
sql(s"ALTER DATABASE $dbName SET DBPROPERTIES ('d'='d')")
checkAnswer(
sql(s"DESCRIBE DATABASE EXTENDED $dbName"),
sql(s"DESCRIBE DATABASE EXTENDED $dbName").toDF("key", "value")
.where("key not like 'Owner%'"), // filter for consistency with in-memory catalog
Row("Database Name", dbNameWithoutBackTicks) ::
Row("Description", "") ::
Row("Location", CatalogUtils.URIToString(location)) ::

View file

@ -32,8 +32,7 @@ import org.apache.hadoop.hive.common.StatsSetupConst
import org.apache.hadoop.hive.conf.HiveConf
import org.apache.hadoop.hive.conf.HiveConf.ConfVars
import org.apache.hadoop.hive.metastore.{IMetaStoreClient, TableType => HiveTableType}
import org.apache.hadoop.hive.metastore.api.{Database => HiveDatabase, Table => MetaStoreApiTable}
import org.apache.hadoop.hive.metastore.api.{FieldSchema, Order, SerDeInfo, StorageDescriptor}
import org.apache.hadoop.hive.metastore.api.{Database => HiveDatabase, Table => MetaStoreApiTable, _}
import org.apache.hadoop.hive.ql.Driver
import org.apache.hadoop.hive.ql.metadata.{Hive, HiveException, Partition => HivePartition, Table => HiveTable}
import org.apache.hadoop.hive.ql.parse.BaseSemanticAnalyzer.HIVE_COLUMN_ORDER_ASC
@ -54,6 +53,7 @@ import org.apache.spark.sql.catalyst.catalog._
import org.apache.spark.sql.catalyst.catalog.CatalogTypes.TablePartitionSpec
import org.apache.spark.sql.catalyst.expressions.Expression
import org.apache.spark.sql.catalyst.parser.{CatalystSqlParser, ParseException}
import org.apache.spark.sql.connector.catalog.SupportsNamespaces._
import org.apache.spark.sql.execution.QueryExecutionException
import org.apache.spark.sql.hive.HiveExternalCatalog
import org.apache.spark.sql.hive.HiveExternalCatalog.{DATASOURCE_SCHEMA, DATASOURCE_SCHEMA_NUMPARTS, DATASOURCE_SCHEMA_PART_PREFIX}
@ -355,13 +355,8 @@ private[hive] class HiveClientImpl(
override def createDatabase(
database: CatalogDatabase,
ignoreIfExists: Boolean): Unit = withHiveState {
client.createDatabase(
new HiveDatabase(
database.name,
database.description,
CatalogUtils.URIToString(database.locationUri),
Option(database.properties).map(_.asJava).orNull),
ignoreIfExists)
val hiveDb = toHiveDatabase(database, true)
client.createDatabase(hiveDb, ignoreIfExists)
}
override def dropDatabase(
@ -379,22 +374,38 @@ private[hive] class HiveClientImpl(
s"Hive ${version.fullVersion} does not support altering database location")
}
}
client.alterDatabase(
val hiveDb = toHiveDatabase(database, false)
client.alterDatabase(database.name, hiveDb)
}
private def toHiveDatabase(database: CatalogDatabase, isCreate: Boolean): HiveDatabase = {
val props = database.properties
val hiveDb = new HiveDatabase(
database.name,
new HiveDatabase(
database.name,
database.description,
CatalogUtils.URIToString(database.locationUri),
Option(database.properties).map(_.asJava).orNull))
database.description,
CatalogUtils.URIToString(database.locationUri),
(props -- Seq(PROP_OWNER_NAME, PROP_OWNER_TYPE)).asJava)
props.get(PROP_OWNER_NAME).orElse(if (isCreate) Some(userName) else None).foreach { ownerName =>
shim.setDatabaseOwnerName(hiveDb, ownerName)
}
props.get(PROP_OWNER_TYPE).orElse(if (isCreate) Some(PrincipalType.USER.name) else None)
.foreach { ownerType =>
shim.setDatabaseOwnerType(hiveDb, ownerType)
}
hiveDb
}
override def getDatabase(dbName: String): CatalogDatabase = withHiveState {
Option(client.getDatabase(dbName)).map { d =>
val paras = Option(d.getParameters).map(_.asScala.toMap).getOrElse(Map()) ++
Map(PROP_OWNER_NAME -> shim.getDatabaseOwnerName(d),
PROP_OWNER_TYPE -> shim.getDatabaseOwnerType(d))
CatalogDatabase(
name = d.getName,
description = Option(d.getDescription).getOrElse(""),
locationUri = CatalogUtils.stringToURI(d.getLocationUri),
properties = Option(d.getParameters).map(_.asScala.toMap).orNull)
properties = paras)
}.getOrElse(throw new NoSuchDatabaseException(dbName))
}

View file

@ -29,8 +29,7 @@ import scala.util.control.NonFatal
import org.apache.hadoop.fs.Path
import org.apache.hadoop.hive.conf.HiveConf
import org.apache.hadoop.hive.metastore.IMetaStoreClient
import org.apache.hadoop.hive.metastore.api.{EnvironmentContext, Function => HiveFunction, FunctionType}
import org.apache.hadoop.hive.metastore.api.{MetaException, PrincipalType, ResourceType, ResourceUri}
import org.apache.hadoop.hive.metastore.api.{Database, EnvironmentContext, Function => HiveFunction, FunctionType, MetaException, PrincipalType, ResourceType, ResourceUri}
import org.apache.hadoop.hive.ql.Driver
import org.apache.hadoop.hive.ql.io.AcidUtils
import org.apache.hadoop.hive.ql.metadata.{Hive, HiveException, Partition, Table}
@ -154,6 +153,14 @@ private[client] sealed abstract class Shim {
deleteData: Boolean,
purge: Boolean): Unit
def getDatabaseOwnerName(db: Database): String
def setDatabaseOwnerName(db: Database, owner: String): Unit
def getDatabaseOwnerType(db: Database): String
def setDatabaseOwnerType(db: Database, ownerType: String): Unit
protected def findStaticMethod(klass: Class[_], name: String, args: Class[_]*): Method = {
val method = findMethod(klass, name, args: _*)
require(Modifier.isStatic(method.getModifiers()),
@ -456,6 +463,14 @@ private[client] class Shim_v0_12 extends Shim with Logging {
def listFunctions(hive: Hive, db: String, pattern: String): Seq[String] = {
Seq.empty[String]
}
override def getDatabaseOwnerName(db: Database): String = ""
override def setDatabaseOwnerName(db: Database, owner: String): Unit = {}
override def getDatabaseOwnerType(db: Database): String = ""
override def setDatabaseOwnerType(db: Database, ownerType: String): Unit = {}
}
private[client] class Shim_v0_13 extends Shim_v0_12 {
@ -493,6 +508,28 @@ private[client] class Shim_v0_13 extends Shim_v0_12 {
"getResults",
classOf[JList[Object]])
private lazy val getDatabaseOwnerNameMethod =
findMethod(
classOf[Database],
"getOwnerName")
private lazy val setDatabaseOwnerNameMethod =
findMethod(
classOf[Database],
"setOwnerName",
classOf[String])
private lazy val getDatabaseOwnerTypeMethod =
findMethod(
classOf[Database],
"getOwnerType")
private lazy val setDatabaseOwnerTypeMethod =
findMethod(
classOf[Database],
"setOwnerType",
classOf[PrincipalType])
override def setCurrentSessionState(state: SessionState): Unit =
setCurrentSessionStateMethod.invoke(null, state)
@ -809,6 +846,22 @@ private[client] class Shim_v0_13 extends Shim_v0_12 {
}
}
override def getDatabaseOwnerName(db: Database): String = {
Option(getDatabaseOwnerNameMethod.invoke(db)).map(_.asInstanceOf[String]).getOrElse("")
}
override def setDatabaseOwnerName(db: Database, owner: String): Unit = {
setDatabaseOwnerNameMethod.invoke(db, owner)
}
override def getDatabaseOwnerType(db: Database): String = {
Option(getDatabaseOwnerTypeMethod.invoke(db))
.map(_.asInstanceOf[PrincipalType].name()).getOrElse("")
}
override def setDatabaseOwnerType(db: Database, ownerType: String): Unit = {
setDatabaseOwnerTypeMethod.invoke(db, PrincipalType.valueOf(ownerType))
}
}
private[client] class Shim_v0_14 extends Shim_v0_13 {

View file

@ -27,6 +27,7 @@ import org.apache.hadoop.hive.common.StatsSetupConst
import org.apache.hadoop.hive.ql.io.HiveIgnoreKeyTextOutputFormat
import org.apache.hadoop.hive.serde2.`lazy`.LazySimpleSerDe
import org.apache.hadoop.mapred.TextInputFormat
import org.apache.hadoop.security.UserGroupInformation
import org.apache.spark.SparkFunSuite
import org.apache.spark.internal.Logging
@ -170,6 +171,34 @@ class VersionsSuite extends SparkFunSuite with Logging {
client.createDatabase(tempDB, ignoreIfExists = true)
}
test(s"$version: create/get/alter database should pick right user name as owner") {
if (version != "0.12") {
val currentUser = UserGroupInformation.getCurrentUser.getUserName
val ownerName = "SPARK_29425"
val db1 = "SPARK_29425_1"
val db2 = "SPARK_29425_2"
val ownerProps = Map("ownerName" -> ownerName)
// create database with owner
val dbWithOwner = CatalogDatabase(db1, "desc", Utils.createTempDir().toURI, ownerProps)
client.createDatabase(dbWithOwner, ignoreIfExists = true)
val getDbWithOwner = client.getDatabase(db1)
assert(getDbWithOwner.properties("ownerName") === ownerName)
// alter database without owner
client.alterDatabase(getDbWithOwner.copy(properties = Map()))
assert(client.getDatabase(db1).properties("ownerName") === "")
// create database without owner
val dbWithoutOwner = CatalogDatabase(db2, "desc", Utils.createTempDir().toURI, Map())
client.createDatabase(dbWithoutOwner, ignoreIfExists = true)
val getDbWithoutOwner = client.getDatabase(db2)
assert(getDbWithoutOwner.properties("ownerName") === currentUser)
// alter database with owner
client.alterDatabase(getDbWithoutOwner.copy(properties = ownerProps))
assert(client.getDatabase(db2).properties("ownerName") === ownerName)
}
}
test(s"$version: createDatabase with null description") {
withTempDir { tmpDir =>
val dbWithNullDesc =

View file

@ -31,6 +31,7 @@ import org.apache.spark.sql.catalyst.TableIdentifier
import org.apache.spark.sql.catalyst.analysis.{NoSuchPartitionException, TableAlreadyExistsException}
import org.apache.spark.sql.catalyst.catalog._
import org.apache.spark.sql.catalyst.parser.ParseException
import org.apache.spark.sql.connector.catalog.SupportsNamespaces.{PROP_OWNER_NAME, PROP_OWNER_TYPE}
import org.apache.spark.sql.execution.command.{DDLSuite, DDLUtils}
import org.apache.spark.sql.functions._
import org.apache.spark.sql.hive.HiveExternalCatalog
@ -372,12 +373,45 @@ class HiveCatalogedDDLSuite extends DDLSuite with TestHiveSingleton with BeforeA
assert(table.provider == Some("org.apache.spark.sql.hive.orc"))
}
}
private def checkOwner(db: String, expected: String): Unit = {
val owner = sql(s"DESCRIBE DATABASE EXTENDED $db")
.where("database_description_item='Owner Name'")
.collect().head.getString(1)
assert(owner === expected)
}
test("Database Ownership") {
val catalog = spark.sessionState.catalog
try {
val db1 = "spark_29425_1"
val db2 = "spark_29425_2"
val owner = "spark_29425"
sql(s"CREATE DATABASE $db1")
checkOwner(db1, Utils.getCurrentUserName())
sql(s"ALTER DATABASE $db1 SET DBPROPERTIES ('a'='a')")
checkOwner(db1, Utils.getCurrentUserName())
// TODO: Specify ownership should be forbidden after we implement `SET OWNER` syntax
sql(s"CREATE DATABASE $db2 WITH DBPROPERTIES('ownerName'='$owner')")
checkOwner(db2, owner)
sql(s"ALTER DATABASE $db2 SET DBPROPERTIES ('a'='a')")
checkOwner(db2, owner)
// TODO: Changing ownership should be forbidden after we implement `SET OWNER` syntax
sql(s"ALTER DATABASE $db2 SET DBPROPERTIES ('ownerName'='a')")
checkOwner(db2, "a")
} finally {
catalog.reset()
}
}
}
class HiveDDLSuite
extends QueryTest with SQLTestUtils with TestHiveSingleton with BeforeAndAfterEach {
import testImplicits._
val hiveFormats = Seq("PARQUET", "ORC", "TEXTFILE", "SEQUENCEFILE", "RCFILE", "AVRO")
private val reversedProperties = Seq("ownerName", "ownerType")
override def afterEach(): Unit = {
try {
@ -1112,7 +1146,8 @@ class HiveDDLSuite
sql(s"CREATE DATABASE $dbName Location '${tmpDir.toURI.getPath.stripSuffix("/")}'")
val db1 = catalog.getDatabaseMetadata(dbName)
val dbPath = new URI(tmpDir.toURI.toString.stripSuffix("/"))
assert(db1 == CatalogDatabase(dbName, "", dbPath, Map.empty))
assert(db1.copy(properties = db1.properties -- Seq(PROP_OWNER_NAME, PROP_OWNER_TYPE)) ===
CatalogDatabase(dbName, "", dbPath, Map.empty))
sql("USE db1")
sql(s"CREATE TABLE $tabName as SELECT 1")
@ -1150,7 +1185,8 @@ class HiveDDLSuite
val expectedDBLocation = s"file:${dbPath.toUri.getPath.stripSuffix("/")}/$dbName.db"
val expectedDBUri = CatalogUtils.stringToURI(expectedDBLocation)
val db1 = catalog.getDatabaseMetadata(dbName)
assert(db1 == CatalogDatabase(
assert(db1.copy(properties = db1.properties -- Seq(PROP_OWNER_NAME, PROP_OWNER_TYPE)) ==
CatalogDatabase(
dbName,
"",
expectedDBUri,