From 24efa43826298c3bf0a01945670efb70c22207d9 Mon Sep 17 00:00:00 2001 From: Kent Yao Date: Tue, 21 Jan 2020 10:37:49 +0800 Subject: [PATCH] [SPARK-30019][SQL] Add the owner property to v2 table ### What changes were proposed in this pull request? Add `owner` property to v2 table, it is reversed by `TableCatalog`, indicates the table's owner. ### Why are the changes needed? enhance ownership management of catalog API ### Does this PR introduce any user-facing change? yes, add 1 reserved property - `owner` , and it is not allowed to use in OPTIONS/TBLPROPERTIES anymore, only if legacy on ### How was this patch tested? add uts Closes #27249 from yaooqinn/SPARK-30019. Authored-by: Kent Yao Signed-off-by: Wenchen Fan --- docs/sql-migration-guide.md | 18 +++++- .../sql/connector/catalog/TableCatalog.java | 8 ++- .../sql/catalyst/parser/AstBuilder.scala | 4 ++ .../sql/connector/catalog/CatalogV2Util.scala | 5 ++ .../datasources/v2/DataSourceV2Strategy.scala | 26 +++++---- .../datasources/v2/V2SessionCatalog.scala | 4 +- .../spark/sql/DataFrameWriterV2Suite.scala | 27 +++++---- .../spark/sql/connector/AlterTableTests.scala | 12 ++-- .../sql/connector/DataSourceV2SQLSuite.scala | 56 ++++++++++--------- .../spark/sql/hive/HiveExternalCatalog.scala | 8 ++- .../sql/hive/execution/HiveDDLSuite.scala | 14 ++++- 11 files changed, 121 insertions(+), 61 deletions(-) diff --git a/docs/sql-migration-guide.md b/docs/sql-migration-guide.md index 7e0def60e5..2ddbdb9d88 100644 --- a/docs/sql-migration-guide.md +++ b/docs/sql-migration-guide.md @@ -317,7 +317,7 @@ license: | no - For databases, please use the ALTER DATABASE ... SET OWNER syntax to modify it. + For databases, please use the ALTER DATABASE ... SET OWNER syntax to modify it @@ -331,7 +331,21 @@ license: | no - For databases, please use the ALTER DATABASE ... SET OWNER syntax to modify it. + For databases, please use the ALTER DATABASE ... SET OWNER syntax to modify it + + + + + owner + + + no + + + yes + + + For tables, it is determined by the user who runs spark and create the table. diff --git a/sql/catalyst/src/main/java/org/apache/spark/sql/connector/catalog/TableCatalog.java b/sql/catalyst/src/main/java/org/apache/spark/sql/connector/catalog/TableCatalog.java index 32c6f8f2cd..591e1c631b 100644 --- a/sql/catalyst/src/main/java/org/apache/spark/sql/connector/catalog/TableCatalog.java +++ b/sql/catalyst/src/main/java/org/apache/spark/sql/connector/catalog/TableCatalog.java @@ -56,10 +56,16 @@ public interface TableCatalog extends CatalogPlugin { */ String PROP_PROVIDER = "provider"; + /** + * A property to specify the owner of the table. + */ + String PROP_OWNER = "owner"; + /** * The list of reserved table properties. */ - List RESERVED_PROPERTIES = Arrays.asList(PROP_COMMENT, PROP_LOCATION, PROP_PROVIDER); + List RESERVED_PROPERTIES = + Arrays.asList(PROP_COMMENT, PROP_LOCATION, PROP_PROVIDER, PROP_OWNER); /** * List the tables in a namespace from the catalog. diff --git a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/parser/AstBuilder.scala b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/parser/AstBuilder.scala index d09c53ed91..2050ec3399 100644 --- a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/parser/AstBuilder.scala +++ b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/parser/AstBuilder.scala @@ -2680,6 +2680,10 @@ class AstBuilder(conf: SQLConf) extends SqlBaseBaseVisitor[AnyRef] with Logging throw new ParseException(s"$PROP_LOCATION is a reserved table property, please use" + s" the LOCATION clause to specify it.", ctx) case (PROP_LOCATION, _) => false + case (PROP_OWNER, _) if !legacyOn => + throw new ParseException(s"$PROP_OWNER is a reserved table property, it will be" + + s" set to the current user by default.", ctx) + case (PROP_OWNER, _) => false case _ => true } } diff --git a/sql/catalyst/src/main/scala/org/apache/spark/sql/connector/catalog/CatalogV2Util.scala b/sql/catalyst/src/main/scala/org/apache/spark/sql/connector/catalog/CatalogV2Util.scala index 3ee22548ca..a4c7b4c3a2 100644 --- a/sql/catalyst/src/main/scala/org/apache/spark/sql/connector/catalog/CatalogV2Util.scala +++ b/sql/catalyst/src/main/scala/org/apache/spark/sql/connector/catalog/CatalogV2Util.scala @@ -27,6 +27,7 @@ import org.apache.spark.sql.connector.catalog.TableChange._ import org.apache.spark.sql.execution.datasources.v2.DataSourceV2Relation import org.apache.spark.sql.types.{ArrayType, MapType, StructField, StructType} import org.apache.spark.sql.util.CaseInsensitiveStringMap +import org.apache.spark.util.Utils private[sql] object CatalogV2Util { import org.apache.spark.sql.connector.catalog.CatalogV2Implicits._ @@ -275,6 +276,10 @@ private[sql] object CatalogV2Util { location.map(TableCatalog.PROP_LOCATION -> _) } + def withDefaultOwnership(properties: Map[String, String]): Map[String, String] = { + properties ++ Map(TableCatalog.PROP_OWNER -> Utils.getCurrentUserName()) + } + def getTableProviderCatalog( provider: SupportsCatalogOptions, catalogManager: CatalogManager, diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/v2/DataSourceV2Strategy.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/v2/DataSourceV2Strategy.scala index 7169a43735..c6d8a12a94 100644 --- a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/v2/DataSourceV2Strategy.scala +++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/v2/DataSourceV2Strategy.scala @@ -24,7 +24,7 @@ import org.apache.spark.sql.catalyst.analysis.{ResolvedNamespace, ResolvedTable} import org.apache.spark.sql.catalyst.expressions.{And, Expression, NamedExpression, PredicateHelper, SubqueryExpression} import org.apache.spark.sql.catalyst.planning.PhysicalOperation import org.apache.spark.sql.catalyst.plans.logical._ -import org.apache.spark.sql.connector.catalog.{StagingTableCatalog, SupportsNamespaces, TableCapability, TableCatalog, TableChange} +import org.apache.spark.sql.connector.catalog.{CatalogV2Util, StagingTableCatalog, SupportsNamespaces, TableCapability, TableCatalog, TableChange} import org.apache.spark.sql.connector.read.streaming.{ContinuousStream, MicroBatchStream} import org.apache.spark.sql.execution.{FilterExec, LeafExecNode, ProjectExec, RowDataSourceScanExec, SparkPlan} import org.apache.spark.sql.execution.datasources.DataSourceStrategy @@ -114,31 +114,37 @@ class DataSourceV2Strategy(session: SparkSession) extends Strategy with Predicat WriteToDataSourceV2Exec(writer, planLater(query)) :: Nil case CreateV2Table(catalog, ident, schema, parts, props, ifNotExists) => - CreateTableExec(catalog, ident, schema, parts, props, ifNotExists) :: Nil + val propsWithOwner = CatalogV2Util.withDefaultOwnership(props) + CreateTableExec(catalog, ident, schema, parts, propsWithOwner, ifNotExists) :: Nil case CreateTableAsSelect(catalog, ident, parts, query, props, options, ifNotExists) => + val propsWithOwner = CatalogV2Util.withDefaultOwnership(props) val writeOptions = new CaseInsensitiveStringMap(options.asJava) catalog match { case staging: StagingTableCatalog => - AtomicCreateTableAsSelectExec( - staging, ident, parts, query, planLater(query), props, writeOptions, ifNotExists) :: Nil + AtomicCreateTableAsSelectExec(staging, ident, parts, query, planLater(query), + propsWithOwner, writeOptions, ifNotExists) :: Nil case _ => - CreateTableAsSelectExec( - catalog, ident, parts, query, planLater(query), props, writeOptions, ifNotExists) :: Nil + CreateTableAsSelectExec(catalog, ident, parts, query, planLater(query), + propsWithOwner, writeOptions, ifNotExists) :: Nil } case RefreshTable(catalog, ident) => RefreshTableExec(catalog, ident) :: Nil case ReplaceTable(catalog, ident, schema, parts, props, orCreate) => + val propsWithOwner = CatalogV2Util.withDefaultOwnership(props) catalog match { case staging: StagingTableCatalog => - AtomicReplaceTableExec(staging, ident, schema, parts, props, orCreate = orCreate) :: Nil + AtomicReplaceTableExec( + staging, ident, schema, parts, propsWithOwner, orCreate = orCreate) :: Nil case _ => - ReplaceTableExec(catalog, ident, schema, parts, props, orCreate = orCreate) :: Nil + ReplaceTableExec( + catalog, ident, schema, parts, propsWithOwner, orCreate = orCreate) :: Nil } case ReplaceTableAsSelect(catalog, ident, parts, query, props, options, orCreate) => + val propsWithOwner = CatalogV2Util.withDefaultOwnership(props) val writeOptions = new CaseInsensitiveStringMap(options.asJava) catalog match { case staging: StagingTableCatalog => @@ -148,7 +154,7 @@ class DataSourceV2Strategy(session: SparkSession) extends Strategy with Predicat parts, query, planLater(query), - props, + propsWithOwner, writeOptions, orCreate = orCreate) :: Nil case _ => @@ -158,7 +164,7 @@ class DataSourceV2Strategy(session: SparkSession) extends Strategy with Predicat parts, query, planLater(query), - props, + propsWithOwner, writeOptions, orCreate = orCreate) :: Nil } diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/v2/V2SessionCatalog.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/v2/V2SessionCatalog.scala index ddb2926eb6..8eea1cf9c0 100644 --- a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/v2/V2SessionCatalog.scala +++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/v2/V2SessionCatalog.scala @@ -125,10 +125,12 @@ class V2SessionCatalog(catalog: SessionCatalog, conf: SQLConf) val properties = CatalogV2Util.applyPropertiesChanges(catalogTable.properties, changes) val schema = CatalogV2Util.applySchemaChanges(catalogTable.schema, changes) val comment = properties.get(TableCatalog.PROP_COMMENT) + val owner = properties.getOrElse(TableCatalog.PROP_OWNER, catalogTable.owner) try { catalog.alterTable( - catalogTable.copy(properties = properties, schema = schema, comment = comment)) + catalogTable + .copy(properties = properties, schema = schema, owner = owner, comment = comment)) } catch { case _: NoSuchTableException => throw new NoSuchTableException(ident) diff --git a/sql/core/src/test/scala/org/apache/spark/sql/DataFrameWriterV2Suite.scala b/sql/core/src/test/scala/org/apache/spark/sql/DataFrameWriterV2Suite.scala index ce0a5f21fd..4e6381aea3 100644 --- a/sql/core/src/test/scala/org/apache/spark/sql/DataFrameWriterV2Suite.scala +++ b/sql/core/src/test/scala/org/apache/spark/sql/DataFrameWriterV2Suite.scala @@ -27,6 +27,7 @@ import org.apache.spark.sql.connector.catalog.{Identifier, TableCatalog} import org.apache.spark.sql.connector.expressions.{BucketTransform, DaysTransform, FieldReference, HoursTransform, IdentityTransform, LiteralValue, MonthsTransform, YearsTransform} import org.apache.spark.sql.test.SharedSparkSession import org.apache.spark.sql.types.{IntegerType, LongType, StringType, StructType} +import org.apache.spark.util.Utils class DataFrameWriterV2Suite extends QueryTest with SharedSparkSession with BeforeAndAfter { import org.apache.spark.sql.connector.catalog.CatalogV2Implicits._ @@ -37,6 +38,8 @@ class DataFrameWriterV2Suite extends QueryTest with SharedSparkSession with Befo spark.sessionState.catalogManager.catalog(name).asTableCatalog } + private val defaultOwnership = Map(TableCatalog.PROP_OWNER -> Utils.getCurrentUserName()) + before { spark.conf.set("spark.sql.catalog.testcat", classOf[InMemoryTableCatalog].getName) @@ -234,7 +237,7 @@ class DataFrameWriterV2Suite extends QueryTest with SharedSparkSession with Befo assert(table.name === "testcat.table_name") assert(table.schema === new StructType().add("id", LongType).add("data", StringType)) assert(table.partitioning.isEmpty) - assert(table.properties.isEmpty) + assert(table.properties == defaultOwnership.asJava) } test("Create: with using") { @@ -249,7 +252,7 @@ class DataFrameWriterV2Suite extends QueryTest with SharedSparkSession with Befo assert(table.name === "testcat.table_name") assert(table.schema === new StructType().add("id", LongType).add("data", StringType)) assert(table.partitioning.isEmpty) - assert(table.properties === Map("provider" -> "foo").asJava) + assert(table.properties === (Map("provider" -> "foo") ++ defaultOwnership).asJava) } test("Create: with property") { @@ -264,7 +267,7 @@ class DataFrameWriterV2Suite extends QueryTest with SharedSparkSession with Befo assert(table.name === "testcat.table_name") assert(table.schema === new StructType().add("id", LongType).add("data", StringType)) assert(table.partitioning.isEmpty) - assert(table.properties === Map("prop" -> "value").asJava) + assert(table.properties === (Map("prop" -> "value") ++ defaultOwnership).asJava) } test("Create: identity partitioned table") { @@ -279,7 +282,7 @@ class DataFrameWriterV2Suite extends QueryTest with SharedSparkSession with Befo assert(table.name === "testcat.table_name") assert(table.schema === new StructType().add("id", LongType).add("data", StringType)) assert(table.partitioning === Seq(IdentityTransform(FieldReference("id")))) - assert(table.properties.isEmpty) + assert(table.properties == defaultOwnership.asJava) } test("Create: partitioned by years(ts)") { @@ -368,7 +371,7 @@ class DataFrameWriterV2Suite extends QueryTest with SharedSparkSession with Befo assert(table.name === "testcat.table_name") assert(table.schema === new StructType().add("id", LongType).add("data", StringType)) assert(table.partitioning === Seq(IdentityTransform(FieldReference("id")))) - assert(table.properties === Map("provider" -> "foo").asJava) + assert(table.properties === (Map("provider" -> "foo") ++ defaultOwnership).asJava) } test("Replace: basic behavior") { @@ -386,7 +389,7 @@ class DataFrameWriterV2Suite extends QueryTest with SharedSparkSession with Befo assert(table.name === "testcat.table_name") assert(table.schema === new StructType().add("id", LongType).add("data", StringType)) assert(table.partitioning === Seq(IdentityTransform(FieldReference("id")))) - assert(table.properties === Map("provider" -> "foo").asJava) + assert(table.properties === (Map("provider" -> "foo") ++ defaultOwnership).asJava) spark.table("source2") .withColumn("even_or_odd", when(($"id" % 2) === 0, "even").otherwise("odd")) @@ -405,7 +408,7 @@ class DataFrameWriterV2Suite extends QueryTest with SharedSparkSession with Befo .add("data", StringType) .add("even_or_odd", StringType)) assert(replaced.partitioning.isEmpty) - assert(replaced.properties.isEmpty) + assert(replaced.properties === defaultOwnership.asJava) } test("Replace: partitioned table") { @@ -422,7 +425,7 @@ class DataFrameWriterV2Suite extends QueryTest with SharedSparkSession with Befo assert(table.name === "testcat.table_name") assert(table.schema === new StructType().add("id", LongType).add("data", StringType)) assert(table.partitioning.isEmpty) - assert(table.properties === Map("provider" -> "foo").asJava) + assert(table.properties === (Map("provider" -> "foo") ++ defaultOwnership).asJava) spark.table("source2") .withColumn("even_or_odd", when(($"id" % 2) === 0, "even").otherwise("odd")) @@ -441,7 +444,7 @@ class DataFrameWriterV2Suite extends QueryTest with SharedSparkSession with Befo .add("data", StringType) .add("even_or_odd", StringType)) assert(replaced.partitioning === Seq(IdentityTransform(FieldReference("id")))) - assert(replaced.properties.isEmpty) + assert(replaced.properties === defaultOwnership.asJava) } test("Replace: fail if table does not exist") { @@ -465,7 +468,7 @@ class DataFrameWriterV2Suite extends QueryTest with SharedSparkSession with Befo assert(replaced.name === "testcat.table_name") assert(replaced.schema === new StructType().add("id", LongType).add("data", StringType)) assert(replaced.partitioning.isEmpty) - assert(replaced.properties.isEmpty) + assert(replaced.properties === defaultOwnership.asJava) } test("CreateOrReplace: table exists") { @@ -483,7 +486,7 @@ class DataFrameWriterV2Suite extends QueryTest with SharedSparkSession with Befo assert(table.name === "testcat.table_name") assert(table.schema === new StructType().add("id", LongType).add("data", StringType)) assert(table.partitioning === Seq(IdentityTransform(FieldReference("id")))) - assert(table.properties === Map("provider" -> "foo").asJava) + assert(table.properties === (Map("provider" -> "foo") ++ defaultOwnership).asJava) spark.table("source2") .withColumn("even_or_odd", when(($"id" % 2) === 0, "even").otherwise("odd")) @@ -502,6 +505,6 @@ class DataFrameWriterV2Suite extends QueryTest with SharedSparkSession with Befo .add("data", StringType) .add("even_or_odd", StringType)) assert(replaced.partitioning.isEmpty) - assert(replaced.properties.isEmpty) + assert(replaced.properties === defaultOwnership.asJava) } } diff --git a/sql/core/src/test/scala/org/apache/spark/sql/connector/AlterTableTests.scala b/sql/core/src/test/scala/org/apache/spark/sql/connector/AlterTableTests.scala index d304d5b2ca..2fc5020c39 100644 --- a/sql/core/src/test/scala/org/apache/spark/sql/connector/AlterTableTests.scala +++ b/sql/core/src/test/scala/org/apache/spark/sql/connector/AlterTableTests.scala @@ -21,6 +21,7 @@ import scala.collection.JavaConverters._ import org.apache.spark.SparkException import org.apache.spark.sql.AnalysisException +import org.apache.spark.sql.connector.catalog.CatalogV2Util.withDefaultOwnership import org.apache.spark.sql.connector.catalog.Table import org.apache.spark.sql.test.SharedSparkSession import org.apache.spark.sql.types._ @@ -945,7 +946,7 @@ trait AlterTableTests extends SharedSparkSession { assert(table.name === fullTableName(t)) assert(table.properties === - Map("provider" -> v2Format, "location" -> "s3://bucket/path").asJava) + withDefaultOwnership(Map("provider" -> v2Format, "location" -> "s3://bucket/path")).asJava) } } @@ -971,7 +972,8 @@ trait AlterTableTests extends SharedSparkSession { val table = getTableMetadata(t) assert(table.name === fullTableName(t)) - assert(table.properties === Map("provider" -> v2Format, "test" -> "34").asJava) + assert(table.properties === + withDefaultOwnership(Map("provider" -> v2Format, "test" -> "34")).asJava) } } @@ -983,15 +985,15 @@ trait AlterTableTests extends SharedSparkSession { val table = getTableMetadata(t) assert(table.name === fullTableName(t)) - assert(table.properties === Map("provider" -> v2Format, "test" -> "34").asJava) + assert(table.properties === + withDefaultOwnership(Map("provider" -> v2Format, "test" -> "34")).asJava) sql(s"ALTER TABLE $t UNSET TBLPROPERTIES ('test')") val updated = getTableMetadata(t) assert(updated.name === fullTableName(t)) - assert(updated.properties === Map("provider" -> v2Format).asJava) + assert(updated.properties === withDefaultOwnership(Map("provider" -> v2Format)).asJava) } } - } diff --git a/sql/core/src/test/scala/org/apache/spark/sql/connector/DataSourceV2SQLSuite.scala b/sql/core/src/test/scala/org/apache/spark/sql/connector/DataSourceV2SQLSuite.scala index 4c5b1d95b1..e65030f715 100644 --- a/sql/core/src/test/scala/org/apache/spark/sql/connector/DataSourceV2SQLSuite.scala +++ b/sql/core/src/test/scala/org/apache/spark/sql/connector/DataSourceV2SQLSuite.scala @@ -25,6 +25,7 @@ import org.apache.spark.sql.catalyst.analysis.{CannotReplaceMissingTableExceptio import org.apache.spark.sql.catalyst.parser.ParseException import org.apache.spark.sql.connector.catalog._ import org.apache.spark.sql.connector.catalog.CatalogManager.SESSION_CATALOG_NAME +import org.apache.spark.sql.connector.catalog.CatalogV2Util.withDefaultOwnership import org.apache.spark.sql.internal.{SQLConf, StaticSQLConf} import org.apache.spark.sql.internal.SQLConf.V2_SESSION_CATALOG_IMPLEMENTATION import org.apache.spark.sql.sources.SimpleScanSource @@ -41,6 +42,7 @@ class DataSourceV2SQLSuite private val v2Source = classOf[FakeV2Provider].getName override protected val v2Format = v2Source override protected val catalogAndNamespace = "testcat.ns1.ns2." + private val defaultUser: String = Utils.getCurrentUserName() private def catalog(name: String): CatalogPlugin = { spark.sessionState.catalogManager.catalog(name) @@ -94,7 +96,7 @@ class DataSourceV2SQLSuite assert(table.name == "testcat.table_name") assert(table.partitioning.isEmpty) - assert(table.properties == Map("provider" -> "foo").asJava) + assert(table.properties == withDefaultOwnership(Map("provider" -> "foo")).asJava) assert(table.schema == new StructType() .add("id", LongType, nullable = false) .add("data", StringType)) @@ -160,6 +162,7 @@ class DataSourceV2SQLSuite Array("Comment", "this is a test table", ""), Array("Location", "/tmp/testcat/table_name", ""), Array("Provider", "foo", ""), + Array(TableCatalog.PROP_OWNER.capitalize, defaultUser, ""), Array("Table Properties", "[bar=baz]", ""))) } @@ -172,7 +175,7 @@ class DataSourceV2SQLSuite assert(table.name == "default.table_name") assert(table.partitioning.isEmpty) - assert(table.properties == Map("provider" -> v2Source).asJava) + assert(table.properties == withDefaultOwnership(Map("provider" -> v2Source)).asJava) assert(table.schema == new StructType().add("id", LongType).add("data", StringType)) val rdd = spark.sparkContext.parallelize(table.asInstanceOf[InMemoryTable].rows) @@ -187,7 +190,7 @@ class DataSourceV2SQLSuite val table = testCatalog.loadTable(Identifier.of(Array(), "table_name")) assert(table.name == "testcat.table_name") assert(table.partitioning.isEmpty) - assert(table.properties == Map("provider" -> "foo").asJava) + assert(table.properties == withDefaultOwnership(Map("provider" -> "foo")).asJava) assert(table.schema == new StructType().add("id", LongType).add("data", StringType)) // run a second create query that should fail @@ -201,7 +204,7 @@ class DataSourceV2SQLSuite val table2 = testCatalog.loadTable(Identifier.of(Array(), "table_name")) assert(table2.name == "testcat.table_name") assert(table2.partitioning.isEmpty) - assert(table2.properties == Map("provider" -> "foo").asJava) + assert(table2.properties == withDefaultOwnership(Map("provider" -> "foo")).asJava) assert(table2.schema == new StructType().add("id", LongType).add("data", StringType)) // check that the table is still empty @@ -218,7 +221,7 @@ class DataSourceV2SQLSuite assert(table.name == "testcat.table_name") assert(table.partitioning.isEmpty) - assert(table.properties == Map("provider" -> "foo").asJava) + assert(table.properties == withDefaultOwnership(Map("provider" -> "foo")).asJava) assert(table.schema == new StructType().add("id", LongType).add("data", StringType)) spark.sql("CREATE TABLE IF NOT EXISTS testcat.table_name (id bigint, data string) USING bar") @@ -227,7 +230,7 @@ class DataSourceV2SQLSuite val table2 = testCatalog.loadTable(Identifier.of(Array(), "table_name")) assert(table2.name == "testcat.table_name") assert(table2.partitioning.isEmpty) - assert(table2.properties == Map("provider" -> "foo").asJava) + assert(table2.properties == withDefaultOwnership(Map("provider" -> "foo")).asJava) assert(table2.schema == new StructType().add("id", LongType).add("data", StringType)) // check that the table is still empty @@ -244,7 +247,7 @@ class DataSourceV2SQLSuite assert(table.name == "testcat.table_name") assert(table.partitioning.isEmpty) - assert(table.properties == Map("provider" -> "foo").asJava) + assert(table.properties == withDefaultOwnership(Map("provider" -> "foo")).asJava) assert(table.schema == new StructType().add("id", LongType).add("data", StringType)) // check that the table is empty @@ -266,7 +269,7 @@ class DataSourceV2SQLSuite assert(table.name == identifier) assert(table.partitioning.isEmpty) - assert(table.properties == Map("provider" -> "foo").asJava) + assert(table.properties == withDefaultOwnership(Map("provider" -> "foo")).asJava) assert(table.schema == new StructType() .add("id", LongType) .add("data", StringType)) @@ -293,7 +296,7 @@ class DataSourceV2SQLSuite assert(replacedTable != originalTable, "Table should have been replaced.") assert(replacedTable.name == identifier) assert(replacedTable.partitioning.isEmpty) - assert(replacedTable.properties == Map("provider" -> "foo").asJava) + assert(replacedTable.properties == withDefaultOwnership(Map("provider" -> "foo")).asJava) assert(replacedTable.schema == new StructType().add("id", LongType)) val rdd = spark.sparkContext.parallelize(replacedTable.asInstanceOf[InMemoryTable].rows) @@ -431,7 +434,7 @@ class DataSourceV2SQLSuite assert(table.name == "default.table_name") assert(table.partitioning.isEmpty) - assert(table.properties == Map("provider" -> v2Source).asJava) + assert(table.properties == withDefaultOwnership(Map("provider" -> v2Source)).asJava) assert(table.schema == new StructType() .add("id", LongType) .add("data", StringType)) @@ -448,7 +451,7 @@ class DataSourceV2SQLSuite val table = testCatalog.loadTable(Identifier.of(Array(), "table_name")) assert(table.name == "testcat.table_name") assert(table.partitioning.isEmpty) - assert(table.properties == Map("provider" -> "foo").asJava) + assert(table.properties == withDefaultOwnership(Map("provider" -> "foo")).asJava) assert(table.schema == new StructType() .add("id", LongType) .add("data", StringType)) @@ -468,7 +471,7 @@ class DataSourceV2SQLSuite val table2 = testCatalog.loadTable(Identifier.of(Array(), "table_name")) assert(table2.name == "testcat.table_name") assert(table2.partitioning.isEmpty) - assert(table2.properties == Map("provider" -> "foo").asJava) + assert(table2.properties == withDefaultOwnership(Map("provider" -> "foo")).asJava) assert(table2.schema == new StructType() .add("id", LongType) .add("data", StringType)) @@ -486,7 +489,7 @@ class DataSourceV2SQLSuite assert(table.name == "testcat.table_name") assert(table.partitioning.isEmpty) - assert(table.properties == Map("provider" -> "foo").asJava) + assert(table.properties == withDefaultOwnership(Map("provider" -> "foo")).asJava) assert(table.schema == new StructType() .add("id", LongType) .add("data", StringType)) @@ -517,7 +520,7 @@ class DataSourceV2SQLSuite assert(table.name == "testcat.table_name") assert(table.partitioning.isEmpty) - assert(table.properties == Map("provider" -> "foo").asJava) + assert(table.properties == withDefaultOwnership(Map("provider" -> "foo")).asJava) assert(table.schema == new StructType() .add("id", LongType) .add("data", StringType)) @@ -557,7 +560,7 @@ class DataSourceV2SQLSuite assert(table.name == identifier) assert(table.partitioning.isEmpty) - assert(table.properties == Map("provider" -> "foo").asJava) + assert(table.properties == withDefaultOwnership(Map("provider" -> "foo")).asJava) assert(table.schema == new StructType().add("i", "int")) val rdd = spark.sparkContext.parallelize(table.asInstanceOf[InMemoryTable].rows) @@ -1059,7 +1062,7 @@ class DataSourceV2SQLSuite Row("Namespace Name", "ns2"), Row("Description", "test namespace"), Row("Location", "/tmp/ns_test"), - Row("Owner Name", Utils.getCurrentUserName()), + Row("Owner Name", defaultUser), Row("Owner Type", "USER") )) } @@ -1075,7 +1078,7 @@ class DataSourceV2SQLSuite Row("Namespace Name", "ns2"), Row("Description", "test namespace"), Row("Location", "/tmp/ns_test"), - Row("Owner Name", Utils.getCurrentUserName()), + Row("Owner Name", defaultUser), Row("Owner Type", "USER"), Row("Properties", "((a,b),(b,a),(c,c))") )) @@ -1123,7 +1126,7 @@ class DataSourceV2SQLSuite Row("Namespace Name", "ns2"), Row("Description", "test namespace"), Row("Location", "/tmp/ns_test_2"), - Row("Owner Name", Utils.getCurrentUserName()), + Row("Owner Name", defaultUser), Row("Owner Type", "USER") )) } @@ -1923,22 +1926,23 @@ class DataSourceV2SQLSuite test("SHOW TBLPROPERTIES: v2 table") { val t = "testcat.ns1.ns2.tbl" withTable(t) { - val owner = "andrew" + val user = "andrew" val status = "new" val provider = "foo" spark.sql(s"CREATE TABLE $t (id bigint, data string) USING $provider " + - s"TBLPROPERTIES ('owner'='$owner', 'status'='$status')") + s"TBLPROPERTIES ('user'='$user', 'status'='$status')") - val properties = sql(s"SHOW TBLPROPERTIES $t") + val properties = sql(s"SHOW TBLPROPERTIES $t").orderBy("key") val schema = new StructType() .add("key", StringType, nullable = false) .add("value", StringType, nullable = false) val expected = Seq( - Row("owner", owner), + Row(TableCatalog.PROP_OWNER, defaultUser), + Row("provider", provider), Row("status", status), - Row("provider", provider)) + Row("user", user)) assert(properties.schema === schema) assert(expected === properties.collect()) @@ -1948,11 +1952,11 @@ class DataSourceV2SQLSuite test("SHOW TBLPROPERTIES(key): v2 table") { val t = "testcat.ns1.ns2.tbl" withTable(t) { - val owner = "andrew" + val user = "andrew" val status = "new" val provider = "foo" spark.sql(s"CREATE TABLE $t (id bigint, data string) USING $provider " + - s"TBLPROPERTIES ('owner'='$owner', 'status'='$status')") + s"TBLPROPERTIES ('user'='$user', 'status'='$status')") val properties = sql(s"SHOW TBLPROPERTIES $t ('status')") @@ -1967,7 +1971,7 @@ class DataSourceV2SQLSuite withTable(t) { val nonExistingKey = "nonExistingKey" spark.sql(s"CREATE TABLE $t (id bigint, data string) USING foo " + - s"TBLPROPERTIES ('owner'='andrew', 'status'='new')") + s"TBLPROPERTIES ('user'='andrew', 'status'='new')") val properties = sql(s"SHOW TBLPROPERTIES $t ('$nonExistingKey')") diff --git a/sql/hive/src/main/scala/org/apache/spark/sql/hive/HiveExternalCatalog.scala b/sql/hive/src/main/scala/org/apache/spark/sql/hive/HiveExternalCatalog.scala index 03874d005a..ca292f65ef 100644 --- a/sql/hive/src/main/scala/org/apache/spark/sql/hive/HiveExternalCatalog.scala +++ b/sql/hive/src/main/scala/org/apache/spark/sql/hive/HiveExternalCatalog.scala @@ -40,8 +40,8 @@ import org.apache.spark.sql.catalyst.analysis.TableAlreadyExistsException import org.apache.spark.sql.catalyst.catalog._ import org.apache.spark.sql.catalyst.catalog.ExternalCatalogUtils._ import org.apache.spark.sql.catalyst.expressions._ -import org.apache.spark.sql.catalyst.plans.logical.ColumnStat import org.apache.spark.sql.catalyst.util.CaseInsensitiveMap +import org.apache.spark.sql.connector.catalog.TableCatalog import org.apache.spark.sql.execution.command.DDLUtils import org.apache.spark.sql.execution.datasources.{PartitioningUtils, SourceOptions} import org.apache.spark.sql.hive.client.HiveClient @@ -635,12 +635,16 @@ private[spark] class HiveExternalCatalog(conf: SparkConf, hadoopConf: Configurat k.startsWith(CREATED_SPARK_VERSION) } val newTableProps = propsFromOldTable ++ tableDefinition.properties + partitionProviderProp + + // // Add old table's owner if we need to restore + val owner = Option(tableDefinition.owner).filter(_.nonEmpty).getOrElse(oldTableDef.owner) val newDef = tableDefinition.copy( storage = newStorage, schema = oldTableDef.schema, partitionColumnNames = oldTableDef.partitionColumnNames, bucketSpec = oldTableDef.bucketSpec, - properties = newTableProps) + properties = newTableProps, + owner = owner) client.alterTable(newDef) } diff --git a/sql/hive/src/test/scala/org/apache/spark/sql/hive/execution/HiveDDLSuite.scala b/sql/hive/src/test/scala/org/apache/spark/sql/hive/execution/HiveDDLSuite.scala index b3f7fc4d05..59eadb8448 100644 --- a/sql/hive/src/test/scala/org/apache/spark/sql/hive/execution/HiveDDLSuite.scala +++ b/sql/hive/src/test/scala/org/apache/spark/sql/hive/execution/HiveDDLSuite.scala @@ -31,7 +31,7 @@ import org.apache.spark.sql.catalyst.TableIdentifier import org.apache.spark.sql.catalyst.analysis.{NoSuchPartitionException, TableAlreadyExistsException} import org.apache.spark.sql.catalyst.catalog._ import org.apache.spark.sql.catalyst.parser.ParseException -import org.apache.spark.sql.connector.catalog.CatalogManager +import org.apache.spark.sql.connector.catalog.{CatalogManager, TableCatalog} import org.apache.spark.sql.connector.catalog.SupportsNamespaces.{PROP_OWNER_NAME, PROP_OWNER_TYPE} import org.apache.spark.sql.execution.command.{DDLSuite, DDLUtils} import org.apache.spark.sql.functions._ @@ -418,13 +418,23 @@ class HiveCatalogedDDLSuite extends DDLSuite with TestHiveSingleton with BeforeA catalog.reset() } } + + test("Table Ownership") { + val catalog = spark.sessionState.catalog + try { + sql(s"CREATE TABLE spark_30019(k int)") + assert(sql(s"DESCRIBE TABLE EXTENDED spark_30019").where("col_name='Owner'") + .collect().head.getString(1) === Utils.getCurrentUserName()) + } finally { + catalog.reset() + } + } } class HiveDDLSuite extends QueryTest with SQLTestUtils with TestHiveSingleton with BeforeAndAfterEach { import testImplicits._ val hiveFormats = Seq("PARQUET", "ORC", "TEXTFILE", "SEQUENCEFILE", "RCFILE", "AVRO") - private val reversedProperties = Seq("ownerName", "ownerType") override def afterEach(): Unit = { try {