[SPARK-30019][SQL] Add the owner property to v2 table
### What changes were proposed in this pull request? Add `owner` property to v2 table, it is reversed by `TableCatalog`, indicates the table's owner. ### Why are the changes needed? enhance ownership management of catalog API ### Does this PR introduce any user-facing change? yes, add 1 reserved property - `owner` , and it is not allowed to use in OPTIONS/TBLPROPERTIES anymore, only if legacy on ### How was this patch tested? add uts Closes #27249 from yaooqinn/SPARK-30019. Authored-by: Kent Yao <yaooqinn@hotmail.com> Signed-off-by: Wenchen Fan <wenchen@databricks.com>
This commit is contained in:
parent
14bc2a2162
commit
24efa43826
|
@ -317,7 +317,7 @@ license: |
|
|||
no
|
||||
</td>
|
||||
<td>
|
||||
For databases, please use the ALTER DATABASE ... SET OWNER syntax to modify it.
|
||||
For databases, please use the ALTER DATABASE ... SET OWNER syntax to modify it
|
||||
</td>
|
||||
</tr>
|
||||
<tr>
|
||||
|
@ -331,7 +331,21 @@ license: |
|
|||
no
|
||||
</td>
|
||||
<td>
|
||||
For databases, please use the ALTER DATABASE ... SET OWNER syntax to modify it.
|
||||
For databases, please use the ALTER DATABASE ... SET OWNER syntax to modify it
|
||||
</td>
|
||||
</tr>
|
||||
<tr>
|
||||
<td>
|
||||
owner
|
||||
</td>
|
||||
<td>
|
||||
no
|
||||
</td>
|
||||
<td>
|
||||
yes
|
||||
</td>
|
||||
<td>
|
||||
For tables, it is determined by the user who runs spark and create the table.
|
||||
</td>
|
||||
</tr>
|
||||
</table>
|
||||
|
|
|
@ -56,10 +56,16 @@ public interface TableCatalog extends CatalogPlugin {
|
|||
*/
|
||||
String PROP_PROVIDER = "provider";
|
||||
|
||||
/**
|
||||
* A property to specify the owner of the table.
|
||||
*/
|
||||
String PROP_OWNER = "owner";
|
||||
|
||||
/**
|
||||
* The list of reserved table properties.
|
||||
*/
|
||||
List<String> RESERVED_PROPERTIES = Arrays.asList(PROP_COMMENT, PROP_LOCATION, PROP_PROVIDER);
|
||||
List<String> RESERVED_PROPERTIES =
|
||||
Arrays.asList(PROP_COMMENT, PROP_LOCATION, PROP_PROVIDER, PROP_OWNER);
|
||||
|
||||
/**
|
||||
* List the tables in a namespace from the catalog.
|
||||
|
|
|
@ -2680,6 +2680,10 @@ class AstBuilder(conf: SQLConf) extends SqlBaseBaseVisitor[AnyRef] with Logging
|
|||
throw new ParseException(s"$PROP_LOCATION is a reserved table property, please use" +
|
||||
s" the LOCATION clause to specify it.", ctx)
|
||||
case (PROP_LOCATION, _) => false
|
||||
case (PROP_OWNER, _) if !legacyOn =>
|
||||
throw new ParseException(s"$PROP_OWNER is a reserved table property, it will be" +
|
||||
s" set to the current user by default.", ctx)
|
||||
case (PROP_OWNER, _) => false
|
||||
case _ => true
|
||||
}
|
||||
}
|
||||
|
|
|
@ -27,6 +27,7 @@ import org.apache.spark.sql.connector.catalog.TableChange._
|
|||
import org.apache.spark.sql.execution.datasources.v2.DataSourceV2Relation
|
||||
import org.apache.spark.sql.types.{ArrayType, MapType, StructField, StructType}
|
||||
import org.apache.spark.sql.util.CaseInsensitiveStringMap
|
||||
import org.apache.spark.util.Utils
|
||||
|
||||
private[sql] object CatalogV2Util {
|
||||
import org.apache.spark.sql.connector.catalog.CatalogV2Implicits._
|
||||
|
@ -275,6 +276,10 @@ private[sql] object CatalogV2Util {
|
|||
location.map(TableCatalog.PROP_LOCATION -> _)
|
||||
}
|
||||
|
||||
def withDefaultOwnership(properties: Map[String, String]): Map[String, String] = {
|
||||
properties ++ Map(TableCatalog.PROP_OWNER -> Utils.getCurrentUserName())
|
||||
}
|
||||
|
||||
def getTableProviderCatalog(
|
||||
provider: SupportsCatalogOptions,
|
||||
catalogManager: CatalogManager,
|
||||
|
|
|
@ -24,7 +24,7 @@ import org.apache.spark.sql.catalyst.analysis.{ResolvedNamespace, ResolvedTable}
|
|||
import org.apache.spark.sql.catalyst.expressions.{And, Expression, NamedExpression, PredicateHelper, SubqueryExpression}
|
||||
import org.apache.spark.sql.catalyst.planning.PhysicalOperation
|
||||
import org.apache.spark.sql.catalyst.plans.logical._
|
||||
import org.apache.spark.sql.connector.catalog.{StagingTableCatalog, SupportsNamespaces, TableCapability, TableCatalog, TableChange}
|
||||
import org.apache.spark.sql.connector.catalog.{CatalogV2Util, StagingTableCatalog, SupportsNamespaces, TableCapability, TableCatalog, TableChange}
|
||||
import org.apache.spark.sql.connector.read.streaming.{ContinuousStream, MicroBatchStream}
|
||||
import org.apache.spark.sql.execution.{FilterExec, LeafExecNode, ProjectExec, RowDataSourceScanExec, SparkPlan}
|
||||
import org.apache.spark.sql.execution.datasources.DataSourceStrategy
|
||||
|
@ -114,31 +114,37 @@ class DataSourceV2Strategy(session: SparkSession) extends Strategy with Predicat
|
|||
WriteToDataSourceV2Exec(writer, planLater(query)) :: Nil
|
||||
|
||||
case CreateV2Table(catalog, ident, schema, parts, props, ifNotExists) =>
|
||||
CreateTableExec(catalog, ident, schema, parts, props, ifNotExists) :: Nil
|
||||
val propsWithOwner = CatalogV2Util.withDefaultOwnership(props)
|
||||
CreateTableExec(catalog, ident, schema, parts, propsWithOwner, ifNotExists) :: Nil
|
||||
|
||||
case CreateTableAsSelect(catalog, ident, parts, query, props, options, ifNotExists) =>
|
||||
val propsWithOwner = CatalogV2Util.withDefaultOwnership(props)
|
||||
val writeOptions = new CaseInsensitiveStringMap(options.asJava)
|
||||
catalog match {
|
||||
case staging: StagingTableCatalog =>
|
||||
AtomicCreateTableAsSelectExec(
|
||||
staging, ident, parts, query, planLater(query), props, writeOptions, ifNotExists) :: Nil
|
||||
AtomicCreateTableAsSelectExec(staging, ident, parts, query, planLater(query),
|
||||
propsWithOwner, writeOptions, ifNotExists) :: Nil
|
||||
case _ =>
|
||||
CreateTableAsSelectExec(
|
||||
catalog, ident, parts, query, planLater(query), props, writeOptions, ifNotExists) :: Nil
|
||||
CreateTableAsSelectExec(catalog, ident, parts, query, planLater(query),
|
||||
propsWithOwner, writeOptions, ifNotExists) :: Nil
|
||||
}
|
||||
|
||||
case RefreshTable(catalog, ident) =>
|
||||
RefreshTableExec(catalog, ident) :: Nil
|
||||
|
||||
case ReplaceTable(catalog, ident, schema, parts, props, orCreate) =>
|
||||
val propsWithOwner = CatalogV2Util.withDefaultOwnership(props)
|
||||
catalog match {
|
||||
case staging: StagingTableCatalog =>
|
||||
AtomicReplaceTableExec(staging, ident, schema, parts, props, orCreate = orCreate) :: Nil
|
||||
AtomicReplaceTableExec(
|
||||
staging, ident, schema, parts, propsWithOwner, orCreate = orCreate) :: Nil
|
||||
case _ =>
|
||||
ReplaceTableExec(catalog, ident, schema, parts, props, orCreate = orCreate) :: Nil
|
||||
ReplaceTableExec(
|
||||
catalog, ident, schema, parts, propsWithOwner, orCreate = orCreate) :: Nil
|
||||
}
|
||||
|
||||
case ReplaceTableAsSelect(catalog, ident, parts, query, props, options, orCreate) =>
|
||||
val propsWithOwner = CatalogV2Util.withDefaultOwnership(props)
|
||||
val writeOptions = new CaseInsensitiveStringMap(options.asJava)
|
||||
catalog match {
|
||||
case staging: StagingTableCatalog =>
|
||||
|
@ -148,7 +154,7 @@ class DataSourceV2Strategy(session: SparkSession) extends Strategy with Predicat
|
|||
parts,
|
||||
query,
|
||||
planLater(query),
|
||||
props,
|
||||
propsWithOwner,
|
||||
writeOptions,
|
||||
orCreate = orCreate) :: Nil
|
||||
case _ =>
|
||||
|
@ -158,7 +164,7 @@ class DataSourceV2Strategy(session: SparkSession) extends Strategy with Predicat
|
|||
parts,
|
||||
query,
|
||||
planLater(query),
|
||||
props,
|
||||
propsWithOwner,
|
||||
writeOptions,
|
||||
orCreate = orCreate) :: Nil
|
||||
}
|
||||
|
|
|
@ -125,10 +125,12 @@ class V2SessionCatalog(catalog: SessionCatalog, conf: SQLConf)
|
|||
val properties = CatalogV2Util.applyPropertiesChanges(catalogTable.properties, changes)
|
||||
val schema = CatalogV2Util.applySchemaChanges(catalogTable.schema, changes)
|
||||
val comment = properties.get(TableCatalog.PROP_COMMENT)
|
||||
val owner = properties.getOrElse(TableCatalog.PROP_OWNER, catalogTable.owner)
|
||||
|
||||
try {
|
||||
catalog.alterTable(
|
||||
catalogTable.copy(properties = properties, schema = schema, comment = comment))
|
||||
catalogTable
|
||||
.copy(properties = properties, schema = schema, owner = owner, comment = comment))
|
||||
} catch {
|
||||
case _: NoSuchTableException =>
|
||||
throw new NoSuchTableException(ident)
|
||||
|
|
|
@ -27,6 +27,7 @@ import org.apache.spark.sql.connector.catalog.{Identifier, TableCatalog}
|
|||
import org.apache.spark.sql.connector.expressions.{BucketTransform, DaysTransform, FieldReference, HoursTransform, IdentityTransform, LiteralValue, MonthsTransform, YearsTransform}
|
||||
import org.apache.spark.sql.test.SharedSparkSession
|
||||
import org.apache.spark.sql.types.{IntegerType, LongType, StringType, StructType}
|
||||
import org.apache.spark.util.Utils
|
||||
|
||||
class DataFrameWriterV2Suite extends QueryTest with SharedSparkSession with BeforeAndAfter {
|
||||
import org.apache.spark.sql.connector.catalog.CatalogV2Implicits._
|
||||
|
@ -37,6 +38,8 @@ class DataFrameWriterV2Suite extends QueryTest with SharedSparkSession with Befo
|
|||
spark.sessionState.catalogManager.catalog(name).asTableCatalog
|
||||
}
|
||||
|
||||
private val defaultOwnership = Map(TableCatalog.PROP_OWNER -> Utils.getCurrentUserName())
|
||||
|
||||
before {
|
||||
spark.conf.set("spark.sql.catalog.testcat", classOf[InMemoryTableCatalog].getName)
|
||||
|
||||
|
@ -234,7 +237,7 @@ class DataFrameWriterV2Suite extends QueryTest with SharedSparkSession with Befo
|
|||
assert(table.name === "testcat.table_name")
|
||||
assert(table.schema === new StructType().add("id", LongType).add("data", StringType))
|
||||
assert(table.partitioning.isEmpty)
|
||||
assert(table.properties.isEmpty)
|
||||
assert(table.properties == defaultOwnership.asJava)
|
||||
}
|
||||
|
||||
test("Create: with using") {
|
||||
|
@ -249,7 +252,7 @@ class DataFrameWriterV2Suite extends QueryTest with SharedSparkSession with Befo
|
|||
assert(table.name === "testcat.table_name")
|
||||
assert(table.schema === new StructType().add("id", LongType).add("data", StringType))
|
||||
assert(table.partitioning.isEmpty)
|
||||
assert(table.properties === Map("provider" -> "foo").asJava)
|
||||
assert(table.properties === (Map("provider" -> "foo") ++ defaultOwnership).asJava)
|
||||
}
|
||||
|
||||
test("Create: with property") {
|
||||
|
@ -264,7 +267,7 @@ class DataFrameWriterV2Suite extends QueryTest with SharedSparkSession with Befo
|
|||
assert(table.name === "testcat.table_name")
|
||||
assert(table.schema === new StructType().add("id", LongType).add("data", StringType))
|
||||
assert(table.partitioning.isEmpty)
|
||||
assert(table.properties === Map("prop" -> "value").asJava)
|
||||
assert(table.properties === (Map("prop" -> "value") ++ defaultOwnership).asJava)
|
||||
}
|
||||
|
||||
test("Create: identity partitioned table") {
|
||||
|
@ -279,7 +282,7 @@ class DataFrameWriterV2Suite extends QueryTest with SharedSparkSession with Befo
|
|||
assert(table.name === "testcat.table_name")
|
||||
assert(table.schema === new StructType().add("id", LongType).add("data", StringType))
|
||||
assert(table.partitioning === Seq(IdentityTransform(FieldReference("id"))))
|
||||
assert(table.properties.isEmpty)
|
||||
assert(table.properties == defaultOwnership.asJava)
|
||||
}
|
||||
|
||||
test("Create: partitioned by years(ts)") {
|
||||
|
@ -368,7 +371,7 @@ class DataFrameWriterV2Suite extends QueryTest with SharedSparkSession with Befo
|
|||
assert(table.name === "testcat.table_name")
|
||||
assert(table.schema === new StructType().add("id", LongType).add("data", StringType))
|
||||
assert(table.partitioning === Seq(IdentityTransform(FieldReference("id"))))
|
||||
assert(table.properties === Map("provider" -> "foo").asJava)
|
||||
assert(table.properties === (Map("provider" -> "foo") ++ defaultOwnership).asJava)
|
||||
}
|
||||
|
||||
test("Replace: basic behavior") {
|
||||
|
@ -386,7 +389,7 @@ class DataFrameWriterV2Suite extends QueryTest with SharedSparkSession with Befo
|
|||
assert(table.name === "testcat.table_name")
|
||||
assert(table.schema === new StructType().add("id", LongType).add("data", StringType))
|
||||
assert(table.partitioning === Seq(IdentityTransform(FieldReference("id"))))
|
||||
assert(table.properties === Map("provider" -> "foo").asJava)
|
||||
assert(table.properties === (Map("provider" -> "foo") ++ defaultOwnership).asJava)
|
||||
|
||||
spark.table("source2")
|
||||
.withColumn("even_or_odd", when(($"id" % 2) === 0, "even").otherwise("odd"))
|
||||
|
@ -405,7 +408,7 @@ class DataFrameWriterV2Suite extends QueryTest with SharedSparkSession with Befo
|
|||
.add("data", StringType)
|
||||
.add("even_or_odd", StringType))
|
||||
assert(replaced.partitioning.isEmpty)
|
||||
assert(replaced.properties.isEmpty)
|
||||
assert(replaced.properties === defaultOwnership.asJava)
|
||||
}
|
||||
|
||||
test("Replace: partitioned table") {
|
||||
|
@ -422,7 +425,7 @@ class DataFrameWriterV2Suite extends QueryTest with SharedSparkSession with Befo
|
|||
assert(table.name === "testcat.table_name")
|
||||
assert(table.schema === new StructType().add("id", LongType).add("data", StringType))
|
||||
assert(table.partitioning.isEmpty)
|
||||
assert(table.properties === Map("provider" -> "foo").asJava)
|
||||
assert(table.properties === (Map("provider" -> "foo") ++ defaultOwnership).asJava)
|
||||
|
||||
spark.table("source2")
|
||||
.withColumn("even_or_odd", when(($"id" % 2) === 0, "even").otherwise("odd"))
|
||||
|
@ -441,7 +444,7 @@ class DataFrameWriterV2Suite extends QueryTest with SharedSparkSession with Befo
|
|||
.add("data", StringType)
|
||||
.add("even_or_odd", StringType))
|
||||
assert(replaced.partitioning === Seq(IdentityTransform(FieldReference("id"))))
|
||||
assert(replaced.properties.isEmpty)
|
||||
assert(replaced.properties === defaultOwnership.asJava)
|
||||
}
|
||||
|
||||
test("Replace: fail if table does not exist") {
|
||||
|
@ -465,7 +468,7 @@ class DataFrameWriterV2Suite extends QueryTest with SharedSparkSession with Befo
|
|||
assert(replaced.name === "testcat.table_name")
|
||||
assert(replaced.schema === new StructType().add("id", LongType).add("data", StringType))
|
||||
assert(replaced.partitioning.isEmpty)
|
||||
assert(replaced.properties.isEmpty)
|
||||
assert(replaced.properties === defaultOwnership.asJava)
|
||||
}
|
||||
|
||||
test("CreateOrReplace: table exists") {
|
||||
|
@ -483,7 +486,7 @@ class DataFrameWriterV2Suite extends QueryTest with SharedSparkSession with Befo
|
|||
assert(table.name === "testcat.table_name")
|
||||
assert(table.schema === new StructType().add("id", LongType).add("data", StringType))
|
||||
assert(table.partitioning === Seq(IdentityTransform(FieldReference("id"))))
|
||||
assert(table.properties === Map("provider" -> "foo").asJava)
|
||||
assert(table.properties === (Map("provider" -> "foo") ++ defaultOwnership).asJava)
|
||||
|
||||
spark.table("source2")
|
||||
.withColumn("even_or_odd", when(($"id" % 2) === 0, "even").otherwise("odd"))
|
||||
|
@ -502,6 +505,6 @@ class DataFrameWriterV2Suite extends QueryTest with SharedSparkSession with Befo
|
|||
.add("data", StringType)
|
||||
.add("even_or_odd", StringType))
|
||||
assert(replaced.partitioning.isEmpty)
|
||||
assert(replaced.properties.isEmpty)
|
||||
assert(replaced.properties === defaultOwnership.asJava)
|
||||
}
|
||||
}
|
||||
|
|
|
@ -21,6 +21,7 @@ import scala.collection.JavaConverters._
|
|||
|
||||
import org.apache.spark.SparkException
|
||||
import org.apache.spark.sql.AnalysisException
|
||||
import org.apache.spark.sql.connector.catalog.CatalogV2Util.withDefaultOwnership
|
||||
import org.apache.spark.sql.connector.catalog.Table
|
||||
import org.apache.spark.sql.test.SharedSparkSession
|
||||
import org.apache.spark.sql.types._
|
||||
|
@ -945,7 +946,7 @@ trait AlterTableTests extends SharedSparkSession {
|
|||
|
||||
assert(table.name === fullTableName(t))
|
||||
assert(table.properties ===
|
||||
Map("provider" -> v2Format, "location" -> "s3://bucket/path").asJava)
|
||||
withDefaultOwnership(Map("provider" -> v2Format, "location" -> "s3://bucket/path")).asJava)
|
||||
}
|
||||
}
|
||||
|
||||
|
@ -971,7 +972,8 @@ trait AlterTableTests extends SharedSparkSession {
|
|||
val table = getTableMetadata(t)
|
||||
|
||||
assert(table.name === fullTableName(t))
|
||||
assert(table.properties === Map("provider" -> v2Format, "test" -> "34").asJava)
|
||||
assert(table.properties ===
|
||||
withDefaultOwnership(Map("provider" -> v2Format, "test" -> "34")).asJava)
|
||||
}
|
||||
}
|
||||
|
||||
|
@ -983,15 +985,15 @@ trait AlterTableTests extends SharedSparkSession {
|
|||
val table = getTableMetadata(t)
|
||||
|
||||
assert(table.name === fullTableName(t))
|
||||
assert(table.properties === Map("provider" -> v2Format, "test" -> "34").asJava)
|
||||
assert(table.properties ===
|
||||
withDefaultOwnership(Map("provider" -> v2Format, "test" -> "34")).asJava)
|
||||
|
||||
sql(s"ALTER TABLE $t UNSET TBLPROPERTIES ('test')")
|
||||
|
||||
val updated = getTableMetadata(t)
|
||||
|
||||
assert(updated.name === fullTableName(t))
|
||||
assert(updated.properties === Map("provider" -> v2Format).asJava)
|
||||
assert(updated.properties === withDefaultOwnership(Map("provider" -> v2Format)).asJava)
|
||||
}
|
||||
}
|
||||
|
||||
}
|
||||
|
|
|
@ -25,6 +25,7 @@ import org.apache.spark.sql.catalyst.analysis.{CannotReplaceMissingTableExceptio
|
|||
import org.apache.spark.sql.catalyst.parser.ParseException
|
||||
import org.apache.spark.sql.connector.catalog._
|
||||
import org.apache.spark.sql.connector.catalog.CatalogManager.SESSION_CATALOG_NAME
|
||||
import org.apache.spark.sql.connector.catalog.CatalogV2Util.withDefaultOwnership
|
||||
import org.apache.spark.sql.internal.{SQLConf, StaticSQLConf}
|
||||
import org.apache.spark.sql.internal.SQLConf.V2_SESSION_CATALOG_IMPLEMENTATION
|
||||
import org.apache.spark.sql.sources.SimpleScanSource
|
||||
|
@ -41,6 +42,7 @@ class DataSourceV2SQLSuite
|
|||
private val v2Source = classOf[FakeV2Provider].getName
|
||||
override protected val v2Format = v2Source
|
||||
override protected val catalogAndNamespace = "testcat.ns1.ns2."
|
||||
private val defaultUser: String = Utils.getCurrentUserName()
|
||||
|
||||
private def catalog(name: String): CatalogPlugin = {
|
||||
spark.sessionState.catalogManager.catalog(name)
|
||||
|
@ -94,7 +96,7 @@ class DataSourceV2SQLSuite
|
|||
|
||||
assert(table.name == "testcat.table_name")
|
||||
assert(table.partitioning.isEmpty)
|
||||
assert(table.properties == Map("provider" -> "foo").asJava)
|
||||
assert(table.properties == withDefaultOwnership(Map("provider" -> "foo")).asJava)
|
||||
assert(table.schema == new StructType()
|
||||
.add("id", LongType, nullable = false)
|
||||
.add("data", StringType))
|
||||
|
@ -160,6 +162,7 @@ class DataSourceV2SQLSuite
|
|||
Array("Comment", "this is a test table", ""),
|
||||
Array("Location", "/tmp/testcat/table_name", ""),
|
||||
Array("Provider", "foo", ""),
|
||||
Array(TableCatalog.PROP_OWNER.capitalize, defaultUser, ""),
|
||||
Array("Table Properties", "[bar=baz]", "")))
|
||||
|
||||
}
|
||||
|
@ -172,7 +175,7 @@ class DataSourceV2SQLSuite
|
|||
|
||||
assert(table.name == "default.table_name")
|
||||
assert(table.partitioning.isEmpty)
|
||||
assert(table.properties == Map("provider" -> v2Source).asJava)
|
||||
assert(table.properties == withDefaultOwnership(Map("provider" -> v2Source)).asJava)
|
||||
assert(table.schema == new StructType().add("id", LongType).add("data", StringType))
|
||||
|
||||
val rdd = spark.sparkContext.parallelize(table.asInstanceOf[InMemoryTable].rows)
|
||||
|
@ -187,7 +190,7 @@ class DataSourceV2SQLSuite
|
|||
val table = testCatalog.loadTable(Identifier.of(Array(), "table_name"))
|
||||
assert(table.name == "testcat.table_name")
|
||||
assert(table.partitioning.isEmpty)
|
||||
assert(table.properties == Map("provider" -> "foo").asJava)
|
||||
assert(table.properties == withDefaultOwnership(Map("provider" -> "foo")).asJava)
|
||||
assert(table.schema == new StructType().add("id", LongType).add("data", StringType))
|
||||
|
||||
// run a second create query that should fail
|
||||
|
@ -201,7 +204,7 @@ class DataSourceV2SQLSuite
|
|||
val table2 = testCatalog.loadTable(Identifier.of(Array(), "table_name"))
|
||||
assert(table2.name == "testcat.table_name")
|
||||
assert(table2.partitioning.isEmpty)
|
||||
assert(table2.properties == Map("provider" -> "foo").asJava)
|
||||
assert(table2.properties == withDefaultOwnership(Map("provider" -> "foo")).asJava)
|
||||
assert(table2.schema == new StructType().add("id", LongType).add("data", StringType))
|
||||
|
||||
// check that the table is still empty
|
||||
|
@ -218,7 +221,7 @@ class DataSourceV2SQLSuite
|
|||
|
||||
assert(table.name == "testcat.table_name")
|
||||
assert(table.partitioning.isEmpty)
|
||||
assert(table.properties == Map("provider" -> "foo").asJava)
|
||||
assert(table.properties == withDefaultOwnership(Map("provider" -> "foo")).asJava)
|
||||
assert(table.schema == new StructType().add("id", LongType).add("data", StringType))
|
||||
|
||||
spark.sql("CREATE TABLE IF NOT EXISTS testcat.table_name (id bigint, data string) USING bar")
|
||||
|
@ -227,7 +230,7 @@ class DataSourceV2SQLSuite
|
|||
val table2 = testCatalog.loadTable(Identifier.of(Array(), "table_name"))
|
||||
assert(table2.name == "testcat.table_name")
|
||||
assert(table2.partitioning.isEmpty)
|
||||
assert(table2.properties == Map("provider" -> "foo").asJava)
|
||||
assert(table2.properties == withDefaultOwnership(Map("provider" -> "foo")).asJava)
|
||||
assert(table2.schema == new StructType().add("id", LongType).add("data", StringType))
|
||||
|
||||
// check that the table is still empty
|
||||
|
@ -244,7 +247,7 @@ class DataSourceV2SQLSuite
|
|||
|
||||
assert(table.name == "testcat.table_name")
|
||||
assert(table.partitioning.isEmpty)
|
||||
assert(table.properties == Map("provider" -> "foo").asJava)
|
||||
assert(table.properties == withDefaultOwnership(Map("provider" -> "foo")).asJava)
|
||||
assert(table.schema == new StructType().add("id", LongType).add("data", StringType))
|
||||
|
||||
// check that the table is empty
|
||||
|
@ -266,7 +269,7 @@ class DataSourceV2SQLSuite
|
|||
|
||||
assert(table.name == identifier)
|
||||
assert(table.partitioning.isEmpty)
|
||||
assert(table.properties == Map("provider" -> "foo").asJava)
|
||||
assert(table.properties == withDefaultOwnership(Map("provider" -> "foo")).asJava)
|
||||
assert(table.schema == new StructType()
|
||||
.add("id", LongType)
|
||||
.add("data", StringType))
|
||||
|
@ -293,7 +296,7 @@ class DataSourceV2SQLSuite
|
|||
assert(replacedTable != originalTable, "Table should have been replaced.")
|
||||
assert(replacedTable.name == identifier)
|
||||
assert(replacedTable.partitioning.isEmpty)
|
||||
assert(replacedTable.properties == Map("provider" -> "foo").asJava)
|
||||
assert(replacedTable.properties == withDefaultOwnership(Map("provider" -> "foo")).asJava)
|
||||
assert(replacedTable.schema == new StructType().add("id", LongType))
|
||||
|
||||
val rdd = spark.sparkContext.parallelize(replacedTable.asInstanceOf[InMemoryTable].rows)
|
||||
|
@ -431,7 +434,7 @@ class DataSourceV2SQLSuite
|
|||
|
||||
assert(table.name == "default.table_name")
|
||||
assert(table.partitioning.isEmpty)
|
||||
assert(table.properties == Map("provider" -> v2Source).asJava)
|
||||
assert(table.properties == withDefaultOwnership(Map("provider" -> v2Source)).asJava)
|
||||
assert(table.schema == new StructType()
|
||||
.add("id", LongType)
|
||||
.add("data", StringType))
|
||||
|
@ -448,7 +451,7 @@ class DataSourceV2SQLSuite
|
|||
val table = testCatalog.loadTable(Identifier.of(Array(), "table_name"))
|
||||
assert(table.name == "testcat.table_name")
|
||||
assert(table.partitioning.isEmpty)
|
||||
assert(table.properties == Map("provider" -> "foo").asJava)
|
||||
assert(table.properties == withDefaultOwnership(Map("provider" -> "foo")).asJava)
|
||||
assert(table.schema == new StructType()
|
||||
.add("id", LongType)
|
||||
.add("data", StringType))
|
||||
|
@ -468,7 +471,7 @@ class DataSourceV2SQLSuite
|
|||
val table2 = testCatalog.loadTable(Identifier.of(Array(), "table_name"))
|
||||
assert(table2.name == "testcat.table_name")
|
||||
assert(table2.partitioning.isEmpty)
|
||||
assert(table2.properties == Map("provider" -> "foo").asJava)
|
||||
assert(table2.properties == withDefaultOwnership(Map("provider" -> "foo")).asJava)
|
||||
assert(table2.schema == new StructType()
|
||||
.add("id", LongType)
|
||||
.add("data", StringType))
|
||||
|
@ -486,7 +489,7 @@ class DataSourceV2SQLSuite
|
|||
|
||||
assert(table.name == "testcat.table_name")
|
||||
assert(table.partitioning.isEmpty)
|
||||
assert(table.properties == Map("provider" -> "foo").asJava)
|
||||
assert(table.properties == withDefaultOwnership(Map("provider" -> "foo")).asJava)
|
||||
assert(table.schema == new StructType()
|
||||
.add("id", LongType)
|
||||
.add("data", StringType))
|
||||
|
@ -517,7 +520,7 @@ class DataSourceV2SQLSuite
|
|||
|
||||
assert(table.name == "testcat.table_name")
|
||||
assert(table.partitioning.isEmpty)
|
||||
assert(table.properties == Map("provider" -> "foo").asJava)
|
||||
assert(table.properties == withDefaultOwnership(Map("provider" -> "foo")).asJava)
|
||||
assert(table.schema == new StructType()
|
||||
.add("id", LongType)
|
||||
.add("data", StringType))
|
||||
|
@ -557,7 +560,7 @@ class DataSourceV2SQLSuite
|
|||
|
||||
assert(table.name == identifier)
|
||||
assert(table.partitioning.isEmpty)
|
||||
assert(table.properties == Map("provider" -> "foo").asJava)
|
||||
assert(table.properties == withDefaultOwnership(Map("provider" -> "foo")).asJava)
|
||||
assert(table.schema == new StructType().add("i", "int"))
|
||||
|
||||
val rdd = spark.sparkContext.parallelize(table.asInstanceOf[InMemoryTable].rows)
|
||||
|
@ -1059,7 +1062,7 @@ class DataSourceV2SQLSuite
|
|||
Row("Namespace Name", "ns2"),
|
||||
Row("Description", "test namespace"),
|
||||
Row("Location", "/tmp/ns_test"),
|
||||
Row("Owner Name", Utils.getCurrentUserName()),
|
||||
Row("Owner Name", defaultUser),
|
||||
Row("Owner Type", "USER")
|
||||
))
|
||||
}
|
||||
|
@ -1075,7 +1078,7 @@ class DataSourceV2SQLSuite
|
|||
Row("Namespace Name", "ns2"),
|
||||
Row("Description", "test namespace"),
|
||||
Row("Location", "/tmp/ns_test"),
|
||||
Row("Owner Name", Utils.getCurrentUserName()),
|
||||
Row("Owner Name", defaultUser),
|
||||
Row("Owner Type", "USER"),
|
||||
Row("Properties", "((a,b),(b,a),(c,c))")
|
||||
))
|
||||
|
@ -1123,7 +1126,7 @@ class DataSourceV2SQLSuite
|
|||
Row("Namespace Name", "ns2"),
|
||||
Row("Description", "test namespace"),
|
||||
Row("Location", "/tmp/ns_test_2"),
|
||||
Row("Owner Name", Utils.getCurrentUserName()),
|
||||
Row("Owner Name", defaultUser),
|
||||
Row("Owner Type", "USER")
|
||||
))
|
||||
}
|
||||
|
@ -1923,22 +1926,23 @@ class DataSourceV2SQLSuite
|
|||
test("SHOW TBLPROPERTIES: v2 table") {
|
||||
val t = "testcat.ns1.ns2.tbl"
|
||||
withTable(t) {
|
||||
val owner = "andrew"
|
||||
val user = "andrew"
|
||||
val status = "new"
|
||||
val provider = "foo"
|
||||
spark.sql(s"CREATE TABLE $t (id bigint, data string) USING $provider " +
|
||||
s"TBLPROPERTIES ('owner'='$owner', 'status'='$status')")
|
||||
s"TBLPROPERTIES ('user'='$user', 'status'='$status')")
|
||||
|
||||
val properties = sql(s"SHOW TBLPROPERTIES $t")
|
||||
val properties = sql(s"SHOW TBLPROPERTIES $t").orderBy("key")
|
||||
|
||||
val schema = new StructType()
|
||||
.add("key", StringType, nullable = false)
|
||||
.add("value", StringType, nullable = false)
|
||||
|
||||
val expected = Seq(
|
||||
Row("owner", owner),
|
||||
Row(TableCatalog.PROP_OWNER, defaultUser),
|
||||
Row("provider", provider),
|
||||
Row("status", status),
|
||||
Row("provider", provider))
|
||||
Row("user", user))
|
||||
|
||||
assert(properties.schema === schema)
|
||||
assert(expected === properties.collect())
|
||||
|
@ -1948,11 +1952,11 @@ class DataSourceV2SQLSuite
|
|||
test("SHOW TBLPROPERTIES(key): v2 table") {
|
||||
val t = "testcat.ns1.ns2.tbl"
|
||||
withTable(t) {
|
||||
val owner = "andrew"
|
||||
val user = "andrew"
|
||||
val status = "new"
|
||||
val provider = "foo"
|
||||
spark.sql(s"CREATE TABLE $t (id bigint, data string) USING $provider " +
|
||||
s"TBLPROPERTIES ('owner'='$owner', 'status'='$status')")
|
||||
s"TBLPROPERTIES ('user'='$user', 'status'='$status')")
|
||||
|
||||
val properties = sql(s"SHOW TBLPROPERTIES $t ('status')")
|
||||
|
||||
|
@ -1967,7 +1971,7 @@ class DataSourceV2SQLSuite
|
|||
withTable(t) {
|
||||
val nonExistingKey = "nonExistingKey"
|
||||
spark.sql(s"CREATE TABLE $t (id bigint, data string) USING foo " +
|
||||
s"TBLPROPERTIES ('owner'='andrew', 'status'='new')")
|
||||
s"TBLPROPERTIES ('user'='andrew', 'status'='new')")
|
||||
|
||||
val properties = sql(s"SHOW TBLPROPERTIES $t ('$nonExistingKey')")
|
||||
|
||||
|
|
|
@ -40,8 +40,8 @@ import org.apache.spark.sql.catalyst.analysis.TableAlreadyExistsException
|
|||
import org.apache.spark.sql.catalyst.catalog._
|
||||
import org.apache.spark.sql.catalyst.catalog.ExternalCatalogUtils._
|
||||
import org.apache.spark.sql.catalyst.expressions._
|
||||
import org.apache.spark.sql.catalyst.plans.logical.ColumnStat
|
||||
import org.apache.spark.sql.catalyst.util.CaseInsensitiveMap
|
||||
import org.apache.spark.sql.connector.catalog.TableCatalog
|
||||
import org.apache.spark.sql.execution.command.DDLUtils
|
||||
import org.apache.spark.sql.execution.datasources.{PartitioningUtils, SourceOptions}
|
||||
import org.apache.spark.sql.hive.client.HiveClient
|
||||
|
@ -635,12 +635,16 @@ private[spark] class HiveExternalCatalog(conf: SparkConf, hadoopConf: Configurat
|
|||
k.startsWith(CREATED_SPARK_VERSION)
|
||||
}
|
||||
val newTableProps = propsFromOldTable ++ tableDefinition.properties + partitionProviderProp
|
||||
|
||||
// // Add old table's owner if we need to restore
|
||||
val owner = Option(tableDefinition.owner).filter(_.nonEmpty).getOrElse(oldTableDef.owner)
|
||||
val newDef = tableDefinition.copy(
|
||||
storage = newStorage,
|
||||
schema = oldTableDef.schema,
|
||||
partitionColumnNames = oldTableDef.partitionColumnNames,
|
||||
bucketSpec = oldTableDef.bucketSpec,
|
||||
properties = newTableProps)
|
||||
properties = newTableProps,
|
||||
owner = owner)
|
||||
|
||||
client.alterTable(newDef)
|
||||
}
|
||||
|
|
|
@ -31,7 +31,7 @@ import org.apache.spark.sql.catalyst.TableIdentifier
|
|||
import org.apache.spark.sql.catalyst.analysis.{NoSuchPartitionException, TableAlreadyExistsException}
|
||||
import org.apache.spark.sql.catalyst.catalog._
|
||||
import org.apache.spark.sql.catalyst.parser.ParseException
|
||||
import org.apache.spark.sql.connector.catalog.CatalogManager
|
||||
import org.apache.spark.sql.connector.catalog.{CatalogManager, TableCatalog}
|
||||
import org.apache.spark.sql.connector.catalog.SupportsNamespaces.{PROP_OWNER_NAME, PROP_OWNER_TYPE}
|
||||
import org.apache.spark.sql.execution.command.{DDLSuite, DDLUtils}
|
||||
import org.apache.spark.sql.functions._
|
||||
|
@ -418,13 +418,23 @@ class HiveCatalogedDDLSuite extends DDLSuite with TestHiveSingleton with BeforeA
|
|||
catalog.reset()
|
||||
}
|
||||
}
|
||||
|
||||
test("Table Ownership") {
|
||||
val catalog = spark.sessionState.catalog
|
||||
try {
|
||||
sql(s"CREATE TABLE spark_30019(k int)")
|
||||
assert(sql(s"DESCRIBE TABLE EXTENDED spark_30019").where("col_name='Owner'")
|
||||
.collect().head.getString(1) === Utils.getCurrentUserName())
|
||||
} finally {
|
||||
catalog.reset()
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
class HiveDDLSuite
|
||||
extends QueryTest with SQLTestUtils with TestHiveSingleton with BeforeAndAfterEach {
|
||||
import testImplicits._
|
||||
val hiveFormats = Seq("PARQUET", "ORC", "TEXTFILE", "SEQUENCEFILE", "RCFILE", "AVRO")
|
||||
private val reversedProperties = Seq("ownerName", "ownerType")
|
||||
|
||||
override def afterEach(): Unit = {
|
||||
try {
|
||||
|
|
Loading…
Reference in a new issue