[SPARK-16119][SQL] Support PURGE option to drop table / partition.
This option is used by Hive to directly delete the files instead of moving them to the trash. This is needed in certain configurations where moving the files does not work. For non-Hive tables and partitions, Spark already behaves as if the PURGE option was set, so there's no need to do anything. Hive support for PURGE was added in 0.14 (for tables) and 1.2 (for partitions), so the code reflects that: trying to use the option with older versions of Hive will cause an exception to be thrown. The change is a little noisier than I would like, because of the code to propagate the new flag through all the interfaces and implementations; the main changes are in the parser and in HiveShim, aside from the tests (DDLCommandSuite, VersionsSuite). Tested by running sql and catalyst unit tests, plus VersionsSuite which has been updated to test the version-specific behavior. I also ran an internal test suite that uses PURGE and would not pass previously. Author: Marcelo Vanzin <vanzin@cloudera.com> Closes #13831 from vanzin/SPARK-16119.
This commit is contained in:
parent
68df47aca5
commit
7f968867ff
|
@ -71,7 +71,7 @@ abstract class ExternalCatalog {
|
|||
|
||||
def createTable(db: String, tableDefinition: CatalogTable, ignoreIfExists: Boolean): Unit
|
||||
|
||||
def dropTable(db: String, table: String, ignoreIfNotExists: Boolean): Unit
|
||||
def dropTable(db: String, table: String, ignoreIfNotExists: Boolean, purge: Boolean): Unit
|
||||
|
||||
def renameTable(db: String, oldName: String, newName: String): Unit
|
||||
|
||||
|
@ -125,7 +125,8 @@ abstract class ExternalCatalog {
|
|||
db: String,
|
||||
table: String,
|
||||
parts: Seq[TablePartitionSpec],
|
||||
ignoreIfNotExists: Boolean): Unit
|
||||
ignoreIfNotExists: Boolean,
|
||||
purge: Boolean): Unit
|
||||
|
||||
/**
|
||||
* Override the specs of one or many existing table partitions, assuming they exist.
|
||||
|
|
|
@ -220,7 +220,8 @@ class InMemoryCatalog(hadoopConfig: Configuration = new Configuration) extends E
|
|||
override def dropTable(
|
||||
db: String,
|
||||
table: String,
|
||||
ignoreIfNotExists: Boolean): Unit = synchronized {
|
||||
ignoreIfNotExists: Boolean,
|
||||
purge: Boolean): Unit = synchronized {
|
||||
requireDbExists(db)
|
||||
if (tableExists(db, table)) {
|
||||
if (getTable(db, table).tableType == CatalogTableType.MANAGED) {
|
||||
|
@ -358,7 +359,8 @@ class InMemoryCatalog(hadoopConfig: Configuration = new Configuration) extends E
|
|||
db: String,
|
||||
table: String,
|
||||
partSpecs: Seq[TablePartitionSpec],
|
||||
ignoreIfNotExists: Boolean): Unit = synchronized {
|
||||
ignoreIfNotExists: Boolean,
|
||||
purge: Boolean): Unit = synchronized {
|
||||
requireTableExists(db, table)
|
||||
val existingParts = catalog(db).tables(table).partitions
|
||||
if (!ignoreIfNotExists) {
|
||||
|
|
|
@ -397,7 +397,10 @@ class SessionCatalog(
|
|||
* If no database is specified, this will first attempt to drop a temporary table with
|
||||
* the same name, then, if that does not exist, drop the table from the current database.
|
||||
*/
|
||||
def dropTable(name: TableIdentifier, ignoreIfNotExists: Boolean): Unit = synchronized {
|
||||
def dropTable(
|
||||
name: TableIdentifier,
|
||||
ignoreIfNotExists: Boolean,
|
||||
purge: Boolean): Unit = synchronized {
|
||||
val db = formatDatabaseName(name.database.getOrElse(currentDb))
|
||||
val table = formatTableName(name.table)
|
||||
if (name.database.isDefined || !tempTables.contains(table)) {
|
||||
|
@ -405,7 +408,7 @@ class SessionCatalog(
|
|||
// When ignoreIfNotExists is false, no exception is issued when the table does not exist.
|
||||
// Instead, log it as an error message.
|
||||
if (tableExists(TableIdentifier(table, Option(db)))) {
|
||||
externalCatalog.dropTable(db, table, ignoreIfNotExists = true)
|
||||
externalCatalog.dropTable(db, table, ignoreIfNotExists = true, purge = purge)
|
||||
} else if (!ignoreIfNotExists) {
|
||||
throw new NoSuchTableException(db = db, table = table)
|
||||
}
|
||||
|
@ -550,13 +553,14 @@ class SessionCatalog(
|
|||
def dropPartitions(
|
||||
tableName: TableIdentifier,
|
||||
specs: Seq[TablePartitionSpec],
|
||||
ignoreIfNotExists: Boolean): Unit = {
|
||||
ignoreIfNotExists: Boolean,
|
||||
purge: Boolean): Unit = {
|
||||
requirePartialMatchedPartitionSpec(specs, getTableMetadata(tableName))
|
||||
val db = formatDatabaseName(tableName.database.getOrElse(getCurrentDatabase))
|
||||
val table = formatTableName(tableName.table)
|
||||
requireDbExists(db)
|
||||
requireTableExists(TableIdentifier(table, Option(db)))
|
||||
externalCatalog.dropPartitions(db, table, specs, ignoreIfNotExists)
|
||||
externalCatalog.dropPartitions(db, table, specs, ignoreIfNotExists, purge)
|
||||
}
|
||||
|
||||
/**
|
||||
|
@ -908,7 +912,7 @@ class SessionCatalog(
|
|||
dropDatabase(db, ignoreIfNotExists = false, cascade = true)
|
||||
}
|
||||
listTables(DEFAULT_DATABASE).foreach { table =>
|
||||
dropTable(table, ignoreIfNotExists = false)
|
||||
dropTable(table, ignoreIfNotExists = false, purge = false)
|
||||
}
|
||||
listFunctions(DEFAULT_DATABASE).map(_._1).foreach { func =>
|
||||
if (func.database.isDefined) {
|
||||
|
|
|
@ -99,8 +99,8 @@ abstract class ExternalCatalogSuite extends SparkFunSuite with BeforeAndAfterEac
|
|||
test("drop database when the database is not empty") {
|
||||
// Throw exception if there are functions left
|
||||
val catalog1 = newBasicCatalog()
|
||||
catalog1.dropTable("db2", "tbl1", ignoreIfNotExists = false)
|
||||
catalog1.dropTable("db2", "tbl2", ignoreIfNotExists = false)
|
||||
catalog1.dropTable("db2", "tbl1", ignoreIfNotExists = false, purge = false)
|
||||
catalog1.dropTable("db2", "tbl2", ignoreIfNotExists = false, purge = false)
|
||||
intercept[AnalysisException] {
|
||||
catalog1.dropDatabase("db2", ignoreIfNotExists = false, cascade = false)
|
||||
}
|
||||
|
@ -164,7 +164,7 @@ abstract class ExternalCatalogSuite extends SparkFunSuite with BeforeAndAfterEac
|
|||
test("drop table") {
|
||||
val catalog = newBasicCatalog()
|
||||
assert(catalog.listTables("db2").toSet == Set("tbl1", "tbl2"))
|
||||
catalog.dropTable("db2", "tbl1", ignoreIfNotExists = false)
|
||||
catalog.dropTable("db2", "tbl1", ignoreIfNotExists = false, purge = false)
|
||||
assert(catalog.listTables("db2").toSet == Set("tbl2"))
|
||||
}
|
||||
|
||||
|
@ -172,16 +172,16 @@ abstract class ExternalCatalogSuite extends SparkFunSuite with BeforeAndAfterEac
|
|||
val catalog = newBasicCatalog()
|
||||
// Should always throw exception when the database does not exist
|
||||
intercept[AnalysisException] {
|
||||
catalog.dropTable("unknown_db", "unknown_table", ignoreIfNotExists = false)
|
||||
catalog.dropTable("unknown_db", "unknown_table", ignoreIfNotExists = false, purge = false)
|
||||
}
|
||||
intercept[AnalysisException] {
|
||||
catalog.dropTable("unknown_db", "unknown_table", ignoreIfNotExists = true)
|
||||
catalog.dropTable("unknown_db", "unknown_table", ignoreIfNotExists = true, purge = false)
|
||||
}
|
||||
// Should throw exception when the table does not exist, if ignoreIfNotExists is false
|
||||
intercept[AnalysisException] {
|
||||
catalog.dropTable("db2", "unknown_table", ignoreIfNotExists = false)
|
||||
catalog.dropTable("db2", "unknown_table", ignoreIfNotExists = false, purge = false)
|
||||
}
|
||||
catalog.dropTable("db2", "unknown_table", ignoreIfNotExists = true)
|
||||
catalog.dropTable("db2", "unknown_table", ignoreIfNotExists = true, purge = false)
|
||||
}
|
||||
|
||||
test("rename table") {
|
||||
|
@ -292,13 +292,13 @@ abstract class ExternalCatalogSuite extends SparkFunSuite with BeforeAndAfterEac
|
|||
val catalog = newBasicCatalog()
|
||||
assert(catalogPartitionsEqual(catalog, "db2", "tbl2", Seq(part1, part2)))
|
||||
catalog.dropPartitions(
|
||||
"db2", "tbl2", Seq(part1.spec), ignoreIfNotExists = false)
|
||||
"db2", "tbl2", Seq(part1.spec), ignoreIfNotExists = false, purge = false)
|
||||
assert(catalogPartitionsEqual(catalog, "db2", "tbl2", Seq(part2)))
|
||||
resetState()
|
||||
val catalog2 = newBasicCatalog()
|
||||
assert(catalogPartitionsEqual(catalog2, "db2", "tbl2", Seq(part1, part2)))
|
||||
catalog2.dropPartitions(
|
||||
"db2", "tbl2", Seq(part1.spec, part2.spec), ignoreIfNotExists = false)
|
||||
"db2", "tbl2", Seq(part1.spec, part2.spec), ignoreIfNotExists = false, purge = false)
|
||||
assert(catalog2.listPartitions("db2", "tbl2").isEmpty)
|
||||
}
|
||||
|
||||
|
@ -306,11 +306,11 @@ abstract class ExternalCatalogSuite extends SparkFunSuite with BeforeAndAfterEac
|
|||
val catalog = newBasicCatalog()
|
||||
intercept[AnalysisException] {
|
||||
catalog.dropPartitions(
|
||||
"does_not_exist", "tbl1", Seq(), ignoreIfNotExists = false)
|
||||
"does_not_exist", "tbl1", Seq(), ignoreIfNotExists = false, purge = false)
|
||||
}
|
||||
intercept[AnalysisException] {
|
||||
catalog.dropPartitions(
|
||||
"db2", "does_not_exist", Seq(), ignoreIfNotExists = false)
|
||||
"db2", "does_not_exist", Seq(), ignoreIfNotExists = false, purge = false)
|
||||
}
|
||||
}
|
||||
|
||||
|
@ -318,10 +318,10 @@ abstract class ExternalCatalogSuite extends SparkFunSuite with BeforeAndAfterEac
|
|||
val catalog = newBasicCatalog()
|
||||
intercept[AnalysisException] {
|
||||
catalog.dropPartitions(
|
||||
"db2", "tbl2", Seq(part3.spec), ignoreIfNotExists = false)
|
||||
"db2", "tbl2", Seq(part3.spec), ignoreIfNotExists = false, purge = false)
|
||||
}
|
||||
catalog.dropPartitions(
|
||||
"db2", "tbl2", Seq(part3.spec), ignoreIfNotExists = true)
|
||||
"db2", "tbl2", Seq(part3.spec), ignoreIfNotExists = true, purge = false)
|
||||
}
|
||||
|
||||
test("get partition") {
|
||||
|
@ -561,7 +561,7 @@ abstract class ExternalCatalogSuite extends SparkFunSuite with BeforeAndAfterEac
|
|||
assert(!exists(db.locationUri, "my_table"))
|
||||
assert(exists(db.locationUri, "your_table"))
|
||||
|
||||
catalog.dropTable("db1", "your_table", ignoreIfNotExists = false)
|
||||
catalog.dropTable("db1", "your_table", ignoreIfNotExists = false, purge = false)
|
||||
assert(!exists(db.locationUri, "your_table"))
|
||||
|
||||
val externalTable = CatalogTable(
|
||||
|
@ -600,7 +600,8 @@ abstract class ExternalCatalogSuite extends SparkFunSuite with BeforeAndAfterEac
|
|||
assert(!exists(databaseDir, "tbl", "a=1", "b=2"))
|
||||
assert(exists(databaseDir, "tbl", "a=5", "b=6"))
|
||||
|
||||
catalog.dropPartitions("db1", "tbl", Seq(part2.spec, part3.spec), ignoreIfNotExists = false)
|
||||
catalog.dropPartitions("db1", "tbl", Seq(part2.spec, part3.spec), ignoreIfNotExists = false,
|
||||
purge = false)
|
||||
assert(!exists(databaseDir, "tbl", "a=3", "b=4"))
|
||||
assert(!exists(databaseDir, "tbl", "a=5", "b=6"))
|
||||
|
||||
|
|
|
@ -98,8 +98,8 @@ class SessionCatalogSuite extends SparkFunSuite {
|
|||
// Throw exception if there are functions left
|
||||
val externalCatalog1 = newBasicCatalog()
|
||||
val sessionCatalog1 = new SessionCatalog(externalCatalog1)
|
||||
externalCatalog1.dropTable("db2", "tbl1", ignoreIfNotExists = false)
|
||||
externalCatalog1.dropTable("db2", "tbl2", ignoreIfNotExists = false)
|
||||
externalCatalog1.dropTable("db2", "tbl1", ignoreIfNotExists = false, purge = false)
|
||||
externalCatalog1.dropTable("db2", "tbl2", ignoreIfNotExists = false, purge = false)
|
||||
intercept[AnalysisException] {
|
||||
sessionCatalog1.dropDatabase("db2", ignoreIfNotExists = false, cascade = false)
|
||||
}
|
||||
|
@ -217,11 +217,12 @@ class SessionCatalogSuite extends SparkFunSuite {
|
|||
val externalCatalog = newBasicCatalog()
|
||||
val sessionCatalog = new SessionCatalog(externalCatalog)
|
||||
assert(externalCatalog.listTables("db2").toSet == Set("tbl1", "tbl2"))
|
||||
sessionCatalog.dropTable(TableIdentifier("tbl1", Some("db2")), ignoreIfNotExists = false)
|
||||
sessionCatalog.dropTable(TableIdentifier("tbl1", Some("db2")), ignoreIfNotExists = false,
|
||||
purge = false)
|
||||
assert(externalCatalog.listTables("db2").toSet == Set("tbl2"))
|
||||
// Drop table without explicitly specifying database
|
||||
sessionCatalog.setCurrentDatabase("db2")
|
||||
sessionCatalog.dropTable(TableIdentifier("tbl2"), ignoreIfNotExists = false)
|
||||
sessionCatalog.dropTable(TableIdentifier("tbl2"), ignoreIfNotExists = false, purge = false)
|
||||
assert(externalCatalog.listTables("db2").isEmpty)
|
||||
}
|
||||
|
||||
|
@ -229,15 +230,19 @@ class SessionCatalogSuite extends SparkFunSuite {
|
|||
val catalog = new SessionCatalog(newBasicCatalog())
|
||||
// Should always throw exception when the database does not exist
|
||||
intercept[NoSuchDatabaseException] {
|
||||
catalog.dropTable(TableIdentifier("tbl1", Some("unknown_db")), ignoreIfNotExists = false)
|
||||
catalog.dropTable(TableIdentifier("tbl1", Some("unknown_db")), ignoreIfNotExists = false,
|
||||
purge = false)
|
||||
}
|
||||
intercept[NoSuchDatabaseException] {
|
||||
catalog.dropTable(TableIdentifier("tbl1", Some("unknown_db")), ignoreIfNotExists = true)
|
||||
catalog.dropTable(TableIdentifier("tbl1", Some("unknown_db")), ignoreIfNotExists = true,
|
||||
purge = false)
|
||||
}
|
||||
intercept[NoSuchTableException] {
|
||||
catalog.dropTable(TableIdentifier("unknown_table", Some("db2")), ignoreIfNotExists = false)
|
||||
catalog.dropTable(TableIdentifier("unknown_table", Some("db2")), ignoreIfNotExists = false,
|
||||
purge = false)
|
||||
}
|
||||
catalog.dropTable(TableIdentifier("unknown_table", Some("db2")), ignoreIfNotExists = true)
|
||||
catalog.dropTable(TableIdentifier("unknown_table", Some("db2")), ignoreIfNotExists = true,
|
||||
purge = false)
|
||||
}
|
||||
|
||||
test("drop temp table") {
|
||||
|
@ -249,16 +254,17 @@ class SessionCatalogSuite extends SparkFunSuite {
|
|||
assert(sessionCatalog.getTempTable("tbl1") == Some(tempTable))
|
||||
assert(externalCatalog.listTables("db2").toSet == Set("tbl1", "tbl2"))
|
||||
// If database is not specified, temp table should be dropped first
|
||||
sessionCatalog.dropTable(TableIdentifier("tbl1"), ignoreIfNotExists = false)
|
||||
sessionCatalog.dropTable(TableIdentifier("tbl1"), ignoreIfNotExists = false, purge = false)
|
||||
assert(sessionCatalog.getTempTable("tbl1") == None)
|
||||
assert(externalCatalog.listTables("db2").toSet == Set("tbl1", "tbl2"))
|
||||
// If temp table does not exist, the table in the current database should be dropped
|
||||
sessionCatalog.dropTable(TableIdentifier("tbl1"), ignoreIfNotExists = false)
|
||||
sessionCatalog.dropTable(TableIdentifier("tbl1"), ignoreIfNotExists = false, purge = false)
|
||||
assert(externalCatalog.listTables("db2").toSet == Set("tbl2"))
|
||||
// If database is specified, temp tables are never dropped
|
||||
sessionCatalog.createTempView("tbl1", tempTable, overrideIfExists = false)
|
||||
sessionCatalog.createTable(newTable("tbl1", "db2"), ignoreIfExists = false)
|
||||
sessionCatalog.dropTable(TableIdentifier("tbl1", Some("db2")), ignoreIfNotExists = false)
|
||||
sessionCatalog.dropTable(TableIdentifier("tbl1", Some("db2")), ignoreIfNotExists = false,
|
||||
purge = false)
|
||||
assert(sessionCatalog.getTempTable("tbl1") == Some(tempTable))
|
||||
assert(externalCatalog.listTables("db2").toSet == Set("tbl2"))
|
||||
}
|
||||
|
@ -394,7 +400,7 @@ class SessionCatalogSuite extends SparkFunSuite {
|
|||
assert(sessionCatalog.lookupRelation(TableIdentifier("tbl1"))
|
||||
== SubqueryAlias("tbl1", tempTable1))
|
||||
// Then, if that does not exist, look up the relation in the current database
|
||||
sessionCatalog.dropTable(TableIdentifier("tbl1"), ignoreIfNotExists = false)
|
||||
sessionCatalog.dropTable(TableIdentifier("tbl1"), ignoreIfNotExists = false, purge = false)
|
||||
assert(sessionCatalog.lookupRelation(TableIdentifier("tbl1"))
|
||||
== SubqueryAlias("tbl1", SimpleCatalogRelation("db2", metastoreTable1)))
|
||||
}
|
||||
|
@ -575,14 +581,16 @@ class SessionCatalogSuite extends SparkFunSuite {
|
|||
sessionCatalog.dropPartitions(
|
||||
TableIdentifier("tbl2", Some("db2")),
|
||||
Seq(part1.spec),
|
||||
ignoreIfNotExists = false)
|
||||
ignoreIfNotExists = false,
|
||||
purge = false)
|
||||
assert(catalogPartitionsEqual(externalCatalog, "db2", "tbl2", Seq(part2)))
|
||||
// Drop partitions without explicitly specifying database
|
||||
sessionCatalog.setCurrentDatabase("db2")
|
||||
sessionCatalog.dropPartitions(
|
||||
TableIdentifier("tbl2"),
|
||||
Seq(part2.spec),
|
||||
ignoreIfNotExists = false)
|
||||
ignoreIfNotExists = false,
|
||||
purge = false)
|
||||
assert(externalCatalog.listPartitions("db2", "tbl2").isEmpty)
|
||||
// Drop multiple partitions at once
|
||||
sessionCatalog.createPartitions(
|
||||
|
@ -591,7 +599,8 @@ class SessionCatalogSuite extends SparkFunSuite {
|
|||
sessionCatalog.dropPartitions(
|
||||
TableIdentifier("tbl2", Some("db2")),
|
||||
Seq(part1.spec, part2.spec),
|
||||
ignoreIfNotExists = false)
|
||||
ignoreIfNotExists = false,
|
||||
purge = false)
|
||||
assert(externalCatalog.listPartitions("db2", "tbl2").isEmpty)
|
||||
}
|
||||
|
||||
|
@ -601,13 +610,15 @@ class SessionCatalogSuite extends SparkFunSuite {
|
|||
catalog.dropPartitions(
|
||||
TableIdentifier("tbl1", Some("unknown_db")),
|
||||
Seq(),
|
||||
ignoreIfNotExists = false)
|
||||
ignoreIfNotExists = false,
|
||||
purge = false)
|
||||
}
|
||||
intercept[NoSuchTableException] {
|
||||
catalog.dropPartitions(
|
||||
TableIdentifier("does_not_exist", Some("db2")),
|
||||
Seq(),
|
||||
ignoreIfNotExists = false)
|
||||
ignoreIfNotExists = false,
|
||||
purge = false)
|
||||
}
|
||||
}
|
||||
|
||||
|
@ -617,12 +628,14 @@ class SessionCatalogSuite extends SparkFunSuite {
|
|||
catalog.dropPartitions(
|
||||
TableIdentifier("tbl2", Some("db2")),
|
||||
Seq(part3.spec),
|
||||
ignoreIfNotExists = false)
|
||||
ignoreIfNotExists = false,
|
||||
purge = false)
|
||||
}
|
||||
catalog.dropPartitions(
|
||||
TableIdentifier("tbl2", Some("db2")),
|
||||
Seq(part3.spec),
|
||||
ignoreIfNotExists = true)
|
||||
ignoreIfNotExists = true,
|
||||
purge = false)
|
||||
}
|
||||
|
||||
test("drop partitions with invalid partition spec") {
|
||||
|
@ -631,7 +644,8 @@ class SessionCatalogSuite extends SparkFunSuite {
|
|||
catalog.dropPartitions(
|
||||
TableIdentifier("tbl2", Some("db2")),
|
||||
Seq(partWithMoreColumns.spec),
|
||||
ignoreIfNotExists = false)
|
||||
ignoreIfNotExists = false,
|
||||
purge = false)
|
||||
}
|
||||
assert(e.getMessage.contains(
|
||||
"Partition spec is invalid. The spec (a, b, c) must be contained within " +
|
||||
|
@ -640,7 +654,8 @@ class SessionCatalogSuite extends SparkFunSuite {
|
|||
catalog.dropPartitions(
|
||||
TableIdentifier("tbl2", Some("db2")),
|
||||
Seq(partWithUnknownColumns.spec),
|
||||
ignoreIfNotExists = false)
|
||||
ignoreIfNotExists = false,
|
||||
purge = false)
|
||||
}
|
||||
assert(e.getMessage.contains(
|
||||
"Partition spec is invalid. The spec (a, unknown) must be contained within " +
|
||||
|
|
|
@ -622,13 +622,11 @@ class SparkSqlAstBuilder(conf: SQLConf) extends AstBuilder {
|
|||
* Create a [[DropTableCommand]] command.
|
||||
*/
|
||||
override def visitDropTable(ctx: DropTableContext): LogicalPlan = withOrigin(ctx) {
|
||||
if (ctx.PURGE != null) {
|
||||
operationNotAllowed("DROP TABLE ... PURGE", ctx)
|
||||
}
|
||||
DropTableCommand(
|
||||
visitTableIdentifier(ctx.tableIdentifier),
|
||||
ctx.EXISTS != null,
|
||||
ctx.VIEW != null)
|
||||
ctx.VIEW != null,
|
||||
ctx.PURGE != null)
|
||||
}
|
||||
|
||||
/**
|
||||
|
@ -768,13 +766,11 @@ class SparkSqlAstBuilder(conf: SQLConf) extends AstBuilder {
|
|||
if (ctx.VIEW != null) {
|
||||
operationNotAllowed("ALTER VIEW ... DROP PARTITION", ctx)
|
||||
}
|
||||
if (ctx.PURGE != null) {
|
||||
operationNotAllowed("ALTER TABLE ... DROP PARTITION ... PURGE", ctx)
|
||||
}
|
||||
AlterTableDropPartitionCommand(
|
||||
visitTableIdentifier(ctx.tableIdentifier),
|
||||
ctx.partitionSpec.asScala.map(visitNonOptionalPartitionSpec),
|
||||
ctx.EXISTS != null)
|
||||
ctx.EXISTS != null,
|
||||
ctx.PURGE != null)
|
||||
}
|
||||
|
||||
/**
|
||||
|
|
|
@ -179,7 +179,8 @@ case class DescribeDatabaseCommand(
|
|||
case class DropTableCommand(
|
||||
tableName: TableIdentifier,
|
||||
ifExists: Boolean,
|
||||
isView: Boolean) extends RunnableCommand {
|
||||
isView: Boolean,
|
||||
purge: Boolean) extends RunnableCommand {
|
||||
|
||||
override def run(sparkSession: SparkSession): Seq[Row] = {
|
||||
val catalog = sparkSession.sessionState.catalog
|
||||
|
@ -207,7 +208,7 @@ case class DropTableCommand(
|
|||
case NonFatal(e) => log.warn(e.toString, e)
|
||||
}
|
||||
catalog.refreshTable(tableName)
|
||||
catalog.dropTable(tableName, ifExists)
|
||||
catalog.dropTable(tableName, ifExists, purge)
|
||||
}
|
||||
Seq.empty[Row]
|
||||
}
|
||||
|
@ -408,7 +409,8 @@ case class AlterTableRenamePartitionCommand(
|
|||
case class AlterTableDropPartitionCommand(
|
||||
tableName: TableIdentifier,
|
||||
specs: Seq[TablePartitionSpec],
|
||||
ifExists: Boolean)
|
||||
ifExists: Boolean,
|
||||
purge: Boolean)
|
||||
extends RunnableCommand {
|
||||
|
||||
override def run(sparkSession: SparkSession): Seq[Row] = {
|
||||
|
@ -418,7 +420,7 @@ case class AlterTableDropPartitionCommand(
|
|||
throw new AnalysisException(
|
||||
"ALTER TABLE DROP PARTITIONS is not allowed for tables defined using the datasource API")
|
||||
}
|
||||
catalog.dropPartitions(tableName, specs, ignoreIfNotExists = ifExists)
|
||||
catalog.dropPartitions(tableName, specs, ignoreIfNotExists = ifExists, purge = purge)
|
||||
Seq.empty[Row]
|
||||
}
|
||||
|
||||
|
|
|
@ -297,7 +297,7 @@ class CatalogImpl(sparkSession: SparkSession) extends Catalog {
|
|||
*/
|
||||
override def dropTempView(viewName: String): Unit = {
|
||||
sparkSession.sharedState.cacheManager.uncacheQuery(sparkSession.table(viewName))
|
||||
sessionCatalog.dropTable(TableIdentifier(viewName), ignoreIfNotExists = true)
|
||||
sessionCatalog.dropTable(TableIdentifier(viewName), ignoreIfNotExists = true, purge = false)
|
||||
}
|
||||
|
||||
/**
|
||||
|
|
|
@ -95,7 +95,7 @@ class SQLContextSuite extends SparkFunSuite with SharedSparkContext {
|
|||
Row("listtablessuitetable", true) :: Nil)
|
||||
|
||||
sqlContext.sessionState.catalog.dropTable(
|
||||
TableIdentifier("listtablessuitetable"), ignoreIfNotExists = true)
|
||||
TableIdentifier("listtablessuitetable"), ignoreIfNotExists = true, purge = false)
|
||||
assert(sqlContext.tables().filter("tableName = 'listtablessuitetable'").count() === 0)
|
||||
}
|
||||
|
||||
|
@ -112,7 +112,7 @@ class SQLContextSuite extends SparkFunSuite with SharedSparkContext {
|
|||
.collect().toSeq == Row("listtablessuitetable", true) :: Nil)
|
||||
|
||||
sqlContext.sessionState.catalog.dropTable(
|
||||
TableIdentifier("listtablessuitetable"), ignoreIfNotExists = true)
|
||||
TableIdentifier("listtablessuitetable"), ignoreIfNotExists = true, purge = false)
|
||||
assert(sqlContext.tables().filter("tableName = 'listtablessuitetable'").count() === 0)
|
||||
}
|
||||
|
||||
|
|
|
@ -612,8 +612,7 @@ class DDLCommandSuite extends PlanTest {
|
|||
|
||||
val parsed1_table = parser.parsePlan(sql1_table)
|
||||
val parsed2_table = parser.parsePlan(sql2_table)
|
||||
assertUnsupported(sql1_table + " PURGE")
|
||||
assertUnsupported(sql2_table + " PURGE")
|
||||
val parsed1_purge = parser.parsePlan(sql1_table + " PURGE")
|
||||
assertUnsupported(sql1_view)
|
||||
assertUnsupported(sql2_view)
|
||||
|
||||
|
@ -623,11 +622,14 @@ class DDLCommandSuite extends PlanTest {
|
|||
Seq(
|
||||
Map("dt" -> "2008-08-08", "country" -> "us"),
|
||||
Map("dt" -> "2009-09-09", "country" -> "uk")),
|
||||
ifExists = true)
|
||||
ifExists = true,
|
||||
purge = false)
|
||||
val expected2_table = expected1_table.copy(ifExists = false)
|
||||
val expected1_purge = expected1_table.copy(purge = true)
|
||||
|
||||
comparePlans(parsed1_table, expected1_table)
|
||||
comparePlans(parsed2_table, expected2_table)
|
||||
comparePlans(parsed1_purge, expected1_purge)
|
||||
}
|
||||
|
||||
test("alter table: archive partition (not supported)") {
|
||||
|
@ -772,25 +774,30 @@ class DDLCommandSuite extends PlanTest {
|
|||
val tableName1 = "db.tab"
|
||||
val tableName2 = "tab"
|
||||
|
||||
val parsed1 = parser.parsePlan(s"DROP TABLE $tableName1")
|
||||
val parsed2 = parser.parsePlan(s"DROP TABLE IF EXISTS $tableName1")
|
||||
val parsed3 = parser.parsePlan(s"DROP TABLE $tableName2")
|
||||
val parsed4 = parser.parsePlan(s"DROP TABLE IF EXISTS $tableName2")
|
||||
assertUnsupported(s"DROP TABLE IF EXISTS $tableName2 PURGE")
|
||||
val parsed = Seq(
|
||||
s"DROP TABLE $tableName1",
|
||||
s"DROP TABLE IF EXISTS $tableName1",
|
||||
s"DROP TABLE $tableName2",
|
||||
s"DROP TABLE IF EXISTS $tableName2",
|
||||
s"DROP TABLE $tableName2 PURGE",
|
||||
s"DROP TABLE IF EXISTS $tableName2 PURGE"
|
||||
).map(parser.parsePlan)
|
||||
|
||||
val expected1 =
|
||||
DropTableCommand(TableIdentifier("tab", Option("db")), ifExists = false, isView = false)
|
||||
val expected2 =
|
||||
DropTableCommand(TableIdentifier("tab", Option("db")), ifExists = true, isView = false)
|
||||
val expected3 =
|
||||
DropTableCommand(TableIdentifier("tab", None), ifExists = false, isView = false)
|
||||
val expected4 =
|
||||
DropTableCommand(TableIdentifier("tab", None), ifExists = true, isView = false)
|
||||
val expected = Seq(
|
||||
DropTableCommand(TableIdentifier("tab", Option("db")), ifExists = false, isView = false,
|
||||
purge = false),
|
||||
DropTableCommand(TableIdentifier("tab", Option("db")), ifExists = true, isView = false,
|
||||
purge = false),
|
||||
DropTableCommand(TableIdentifier("tab", None), ifExists = false, isView = false,
|
||||
purge = false),
|
||||
DropTableCommand(TableIdentifier("tab", None), ifExists = true, isView = false,
|
||||
purge = false),
|
||||
DropTableCommand(TableIdentifier("tab", None), ifExists = false, isView = false,
|
||||
purge = true),
|
||||
DropTableCommand(TableIdentifier("tab", None), ifExists = true, isView = false,
|
||||
purge = true))
|
||||
|
||||
comparePlans(parsed1, expected1)
|
||||
comparePlans(parsed2, expected2)
|
||||
comparePlans(parsed3, expected3)
|
||||
comparePlans(parsed4, expected4)
|
||||
parsed.zip(expected).foreach { case (p, e) => comparePlans(p, e) }
|
||||
}
|
||||
|
||||
test("drop view") {
|
||||
|
@ -803,13 +810,17 @@ class DDLCommandSuite extends PlanTest {
|
|||
val parsed4 = parser.parsePlan(s"DROP VIEW IF EXISTS $viewName2")
|
||||
|
||||
val expected1 =
|
||||
DropTableCommand(TableIdentifier("view", Option("db")), ifExists = false, isView = true)
|
||||
DropTableCommand(TableIdentifier("view", Option("db")), ifExists = false, isView = true,
|
||||
purge = false)
|
||||
val expected2 =
|
||||
DropTableCommand(TableIdentifier("view", Option("db")), ifExists = true, isView = true)
|
||||
DropTableCommand(TableIdentifier("view", Option("db")), ifExists = true, isView = true,
|
||||
purge = false)
|
||||
val expected3 =
|
||||
DropTableCommand(TableIdentifier("view", None), ifExists = false, isView = true)
|
||||
DropTableCommand(TableIdentifier("view", None), ifExists = false, isView = true,
|
||||
purge = false)
|
||||
val expected4 =
|
||||
DropTableCommand(TableIdentifier("view", None), ifExists = true, isView = true)
|
||||
DropTableCommand(TableIdentifier("view", None), ifExists = true, isView = true,
|
||||
purge = false)
|
||||
|
||||
comparePlans(parsed1, expected1)
|
||||
comparePlans(parsed2, expected2)
|
||||
|
|
|
@ -352,7 +352,7 @@ class DDLSuite extends QueryTest with SharedSQLContext with BeforeAndAfterEach {
|
|||
}.getMessage
|
||||
assert(message.contains(s"Database '$dbName' is not empty. One or more tables exist"))
|
||||
|
||||
catalog.dropTable(tableIdent1, ignoreIfNotExists = false)
|
||||
catalog.dropTable(tableIdent1, ignoreIfNotExists = false, purge = false)
|
||||
|
||||
assert(catalog.listDatabases().contains(dbName))
|
||||
sql(s"DROP DATABASE $dbName RESTRICT")
|
||||
|
|
|
@ -54,7 +54,7 @@ class ParquetQuerySuite extends QueryTest with ParquetTest with SharedSQLContext
|
|||
checkAnswer(spark.table("t"), (data ++ data).map(Row.fromTuple))
|
||||
}
|
||||
spark.sessionState.catalog.dropTable(
|
||||
TableIdentifier("tmp"), ignoreIfNotExists = true)
|
||||
TableIdentifier("tmp"), ignoreIfNotExists = true, purge = false)
|
||||
}
|
||||
|
||||
test("overwriting") {
|
||||
|
@ -65,7 +65,7 @@ class ParquetQuerySuite extends QueryTest with ParquetTest with SharedSQLContext
|
|||
checkAnswer(spark.table("t"), data.map(Row.fromTuple))
|
||||
}
|
||||
spark.sessionState.catalog.dropTable(
|
||||
TableIdentifier("tmp"), ignoreIfNotExists = true)
|
||||
TableIdentifier("tmp"), ignoreIfNotExists = true, purge = false)
|
||||
}
|
||||
|
||||
test("SPARK-15678: not use cache on overwrite") {
|
||||
|
|
|
@ -62,7 +62,7 @@ class CatalogSuite
|
|||
}
|
||||
|
||||
private def dropTable(name: String, db: Option[String] = None): Unit = {
|
||||
sessionCatalog.dropTable(TableIdentifier(name, db), ignoreIfNotExists = false)
|
||||
sessionCatalog.dropTable(TableIdentifier(name, db), ignoreIfNotExists = false, purge = false)
|
||||
}
|
||||
|
||||
private def createFunction(name: String, db: Option[String] = None): Unit = {
|
||||
|
|
|
@ -192,9 +192,10 @@ private[spark] class HiveExternalCatalog(client: HiveClient, hadoopConf: Configu
|
|||
override def dropTable(
|
||||
db: String,
|
||||
table: String,
|
||||
ignoreIfNotExists: Boolean): Unit = withClient {
|
||||
ignoreIfNotExists: Boolean,
|
||||
purge: Boolean): Unit = withClient {
|
||||
requireDbExists(db)
|
||||
client.dropTable(db, table, ignoreIfNotExists)
|
||||
client.dropTable(db, table, ignoreIfNotExists, purge)
|
||||
}
|
||||
|
||||
override def renameTable(db: String, oldName: String, newName: String): Unit = withClient {
|
||||
|
@ -295,9 +296,10 @@ private[spark] class HiveExternalCatalog(client: HiveClient, hadoopConf: Configu
|
|||
db: String,
|
||||
table: String,
|
||||
parts: Seq[TablePartitionSpec],
|
||||
ignoreIfNotExists: Boolean): Unit = withClient {
|
||||
ignoreIfNotExists: Boolean,
|
||||
purge: Boolean): Unit = withClient {
|
||||
requireTableExists(db, table)
|
||||
client.dropPartitions(db, table, parts, ignoreIfNotExists)
|
||||
client.dropPartitions(db, table, parts, ignoreIfNotExists, purge)
|
||||
}
|
||||
|
||||
override def renamePartitions(
|
||||
|
|
|
@ -80,7 +80,7 @@ private[hive] trait HiveClient {
|
|||
def createTable(table: CatalogTable, ignoreIfExists: Boolean): Unit
|
||||
|
||||
/** Drop the specified table. */
|
||||
def dropTable(dbName: String, tableName: String, ignoreIfNotExists: Boolean): Unit
|
||||
def dropTable(dbName: String, tableName: String, ignoreIfNotExists: Boolean, purge: Boolean): Unit
|
||||
|
||||
/** Alter a table whose name matches the one specified in `table`, assuming it exists. */
|
||||
final def alterTable(table: CatalogTable): Unit = alterTable(table.identifier.table, table)
|
||||
|
@ -121,7 +121,8 @@ private[hive] trait HiveClient {
|
|||
db: String,
|
||||
table: String,
|
||||
specs: Seq[TablePartitionSpec],
|
||||
ignoreIfNotExists: Boolean): Unit
|
||||
ignoreIfNotExists: Boolean,
|
||||
purge: Boolean): Unit
|
||||
|
||||
/**
|
||||
* Rename one or many existing table partitions, assuming they exist.
|
||||
|
|
|
@ -406,8 +406,9 @@ private[hive] class HiveClientImpl(
|
|||
override def dropTable(
|
||||
dbName: String,
|
||||
tableName: String,
|
||||
ignoreIfNotExists: Boolean): Unit = withHiveState {
|
||||
client.dropTable(dbName, tableName, true, ignoreIfNotExists)
|
||||
ignoreIfNotExists: Boolean,
|
||||
purge: Boolean): Unit = withHiveState {
|
||||
shim.dropTable(client, dbName, tableName, true, ignoreIfNotExists, purge)
|
||||
}
|
||||
|
||||
override def alterTable(tableName: String, table: CatalogTable): Unit = withHiveState {
|
||||
|
@ -429,7 +430,8 @@ private[hive] class HiveClientImpl(
|
|||
db: String,
|
||||
table: String,
|
||||
specs: Seq[TablePartitionSpec],
|
||||
ignoreIfNotExists: Boolean): Unit = withHiveState {
|
||||
ignoreIfNotExists: Boolean,
|
||||
purge: Boolean): Unit = withHiveState {
|
||||
// TODO: figure out how to drop multiple partitions in one call
|
||||
val hiveTable = client.getTable(db, table, true /* throw exception */)
|
||||
// do the check at first and collect all the matching partitions
|
||||
|
@ -450,7 +452,7 @@ private[hive] class HiveClientImpl(
|
|||
matchingParts.foreach { partition =>
|
||||
try {
|
||||
val deleteData = true
|
||||
client.dropPartition(db, table, partition, deleteData)
|
||||
shim.dropPartition(client, db, table, partition, deleteData, purge)
|
||||
} catch {
|
||||
case e: Exception =>
|
||||
val remainingParts = matchingParts.toBuffer -- droppedParts
|
||||
|
|
|
@ -18,7 +18,7 @@
|
|||
package org.apache.spark.sql.hive.client
|
||||
|
||||
import java.lang.{Boolean => JBoolean, Integer => JInteger, Long => JLong}
|
||||
import java.lang.reflect.{Method, Modifier}
|
||||
import java.lang.reflect.{InvocationTargetException, Method, Modifier}
|
||||
import java.net.URI
|
||||
import java.util.{ArrayList => JArrayList, List => JList, Map => JMap, Set => JSet}
|
||||
import java.util.concurrent.TimeUnit
|
||||
|
@ -43,7 +43,7 @@ import org.apache.spark.sql.catalyst.analysis.NoSuchPermanentFunctionException
|
|||
import org.apache.spark.sql.catalyst.catalog.{CatalogFunction, CatalogTablePartition, FunctionResource, FunctionResourceType}
|
||||
import org.apache.spark.sql.catalyst.expressions._
|
||||
import org.apache.spark.sql.types.{IntegralType, StringType}
|
||||
|
||||
import org.apache.spark.util.Utils
|
||||
|
||||
/**
|
||||
* A shim that defines the interface between [[HiveClientImpl]] and the underlying Hive library used
|
||||
|
@ -129,6 +129,22 @@ private[client] sealed abstract class Shim {
|
|||
|
||||
def dropIndex(hive: Hive, dbName: String, tableName: String, indexName: String): Unit
|
||||
|
||||
def dropTable(
|
||||
hive: Hive,
|
||||
dbName: String,
|
||||
tableName: String,
|
||||
deleteData: Boolean,
|
||||
ignoreIfNotExists: Boolean,
|
||||
purge: Boolean): Unit
|
||||
|
||||
def dropPartition(
|
||||
hive: Hive,
|
||||
dbName: String,
|
||||
tableName: String,
|
||||
part: JList[String],
|
||||
deleteData: Boolean,
|
||||
purge: Boolean): Unit
|
||||
|
||||
protected def findStaticMethod(klass: Class[_], name: String, args: Class[_]*): Method = {
|
||||
val method = findMethod(klass, name, args: _*)
|
||||
require(Modifier.isStatic(method.getModifiers()),
|
||||
|
@ -343,6 +359,32 @@ private[client] class Shim_v0_12 extends Shim with Logging {
|
|||
dropIndexMethod.invoke(hive, dbName, tableName, indexName, true: JBoolean)
|
||||
}
|
||||
|
||||
override def dropTable(
|
||||
hive: Hive,
|
||||
dbName: String,
|
||||
tableName: String,
|
||||
deleteData: Boolean,
|
||||
ignoreIfNotExists: Boolean,
|
||||
purge: Boolean): Unit = {
|
||||
if (purge) {
|
||||
throw new UnsupportedOperationException("DROP TABLE ... PURGE")
|
||||
}
|
||||
hive.dropTable(dbName, tableName, deleteData, ignoreIfNotExists)
|
||||
}
|
||||
|
||||
override def dropPartition(
|
||||
hive: Hive,
|
||||
dbName: String,
|
||||
tableName: String,
|
||||
part: JList[String],
|
||||
deleteData: Boolean,
|
||||
purge: Boolean): Unit = {
|
||||
if (purge) {
|
||||
throw new UnsupportedOperationException("ALTER TABLE ... DROP PARTITION ... PURGE")
|
||||
}
|
||||
hive.dropPartition(dbName, tableName, part, deleteData)
|
||||
}
|
||||
|
||||
override def createFunction(hive: Hive, db: String, func: CatalogFunction): Unit = {
|
||||
throw new AnalysisException("Hive 0.12 doesn't support creating permanent functions. " +
|
||||
"Please use Hive 0.13 or higher.")
|
||||
|
@ -599,6 +641,15 @@ private[client] class Shim_v0_14 extends Shim_v0_13 {
|
|||
JBoolean.TYPE,
|
||||
JBoolean.TYPE,
|
||||
JBoolean.TYPE)
|
||||
private lazy val dropTableMethod =
|
||||
findMethod(
|
||||
classOf[Hive],
|
||||
"dropTable",
|
||||
classOf[String],
|
||||
classOf[String],
|
||||
JBoolean.TYPE,
|
||||
JBoolean.TYPE,
|
||||
JBoolean.TYPE)
|
||||
private lazy val getTimeVarMethod =
|
||||
findMethod(
|
||||
classOf[HiveConf],
|
||||
|
@ -643,6 +694,21 @@ private[client] class Shim_v0_14 extends Shim_v0_13 {
|
|||
numDP: JInteger, holdDDLTime: JBoolean, listBucketingEnabled: JBoolean, JBoolean.FALSE)
|
||||
}
|
||||
|
||||
override def dropTable(
|
||||
hive: Hive,
|
||||
dbName: String,
|
||||
tableName: String,
|
||||
deleteData: Boolean,
|
||||
ignoreIfNotExists: Boolean,
|
||||
purge: Boolean): Unit = {
|
||||
try {
|
||||
dropTableMethod.invoke(hive, dbName, tableName, deleteData: JBoolean,
|
||||
ignoreIfNotExists: JBoolean, purge: JBoolean)
|
||||
} catch {
|
||||
case e: InvocationTargetException => throw e.getCause()
|
||||
}
|
||||
}
|
||||
|
||||
override def getMetastoreClientConnectRetryDelayMillis(conf: HiveConf): Long = {
|
||||
getTimeVarMethod.invoke(
|
||||
conf,
|
||||
|
@ -696,6 +762,19 @@ private[client] class Shim_v1_2 extends Shim_v1_1 {
|
|||
JBoolean.TYPE,
|
||||
JLong.TYPE)
|
||||
|
||||
private lazy val dropOptionsClass =
|
||||
Utils.classForName("org.apache.hadoop.hive.metastore.PartitionDropOptions")
|
||||
private lazy val dropOptionsDeleteData = dropOptionsClass.getField("deleteData")
|
||||
private lazy val dropOptionsPurge = dropOptionsClass.getField("purgeData")
|
||||
private lazy val dropPartitionMethod =
|
||||
findMethod(
|
||||
classOf[Hive],
|
||||
"dropPartition",
|
||||
classOf[String],
|
||||
classOf[String],
|
||||
classOf[JList[String]],
|
||||
dropOptionsClass)
|
||||
|
||||
override def loadDynamicPartitions(
|
||||
hive: Hive,
|
||||
loadPath: Path,
|
||||
|
@ -710,4 +789,21 @@ private[client] class Shim_v1_2 extends Shim_v1_1 {
|
|||
0L: JLong)
|
||||
}
|
||||
|
||||
override def dropPartition(
|
||||
hive: Hive,
|
||||
dbName: String,
|
||||
tableName: String,
|
||||
part: JList[String],
|
||||
deleteData: Boolean,
|
||||
purge: Boolean): Unit = {
|
||||
val dropOptions = dropOptionsClass.newInstance().asInstanceOf[Object]
|
||||
dropOptionsDeleteData.setBoolean(dropOptions, deleteData)
|
||||
dropOptionsPurge.setBoolean(dropOptions, purge)
|
||||
try {
|
||||
dropPartitionMethod.invoke(hive, dbName, tableName, part, dropOptions)
|
||||
} catch {
|
||||
case e: InvocationTargetException => throw e.getCause()
|
||||
}
|
||||
}
|
||||
|
||||
}
|
||||
|
|
|
@ -95,7 +95,8 @@ case class CreateHiveTableAsSelectCommand(
|
|||
} catch {
|
||||
case NonFatal(e) =>
|
||||
// drop the created table.
|
||||
sparkSession.sessionState.catalog.dropTable(tableIdentifier, ignoreIfNotExists = true)
|
||||
sparkSession.sessionState.catalog.dropTable(tableIdentifier, ignoreIfNotExists = true,
|
||||
purge = false)
|
||||
throw e
|
||||
}
|
||||
}
|
||||
|
|
|
@ -43,7 +43,7 @@ class ListTablesSuite extends QueryTest with TestHiveSingleton with BeforeAndAft
|
|||
override def afterAll(): Unit = {
|
||||
try {
|
||||
sessionState.catalog.dropTable(
|
||||
TableIdentifier("ListTablesSuiteTable"), ignoreIfNotExists = true)
|
||||
TableIdentifier("ListTablesSuiteTable"), ignoreIfNotExists = true, purge = false)
|
||||
sql("DROP TABLE IF EXISTS HiveListTablesSuiteTable")
|
||||
sql("DROP TABLE IF EXISTS ListTablesSuiteDB.HiveInDBListTablesSuiteTable")
|
||||
sql("DROP DATABASE IF EXISTS ListTablesSuiteDB")
|
||||
|
|
|
@ -165,7 +165,7 @@ class StatisticsSuite extends QueryTest with TestHiveSingleton with SQLTestUtils
|
|||
sql("ANALYZE TABLE tempTable COMPUTE STATISTICS")
|
||||
}
|
||||
spark.sessionState.catalog.dropTable(
|
||||
TableIdentifier("tempTable"), ignoreIfNotExists = true)
|
||||
TableIdentifier("tempTable"), ignoreIfNotExists = true, purge = false)
|
||||
}
|
||||
|
||||
test("estimates the size of a test MetastoreRelation") {
|
||||
|
|
|
@ -249,7 +249,19 @@ class VersionsSuite extends SparkFunSuite with Logging {
|
|||
}
|
||||
|
||||
test(s"$version: dropTable") {
|
||||
client.dropTable("default", tableName = "temporary", ignoreIfNotExists = false)
|
||||
val versionsWithoutPurge = versions.takeWhile(_ != "0.14")
|
||||
// First try with the purge option set. This should fail if the version is < 0.14, in which
|
||||
// case we check the version and try without it.
|
||||
try {
|
||||
client.dropTable("default", tableName = "temporary", ignoreIfNotExists = false,
|
||||
purge = true)
|
||||
assert(!versionsWithoutPurge.contains(version))
|
||||
} catch {
|
||||
case _: UnsupportedOperationException =>
|
||||
assert(versionsWithoutPurge.contains(version))
|
||||
client.dropTable("default", tableName = "temporary", ignoreIfNotExists = false,
|
||||
purge = false)
|
||||
}
|
||||
assert(client.listTables("default") === Seq("src"))
|
||||
}
|
||||
|
||||
|
@ -366,7 +378,20 @@ class VersionsSuite extends SparkFunSuite with Logging {
|
|||
|
||||
test(s"$version: dropPartitions") {
|
||||
val spec = Map("key1" -> "1", "key2" -> "3")
|
||||
client.dropPartitions("default", "src_part", Seq(spec), ignoreIfNotExists = true)
|
||||
val versionsWithoutPurge = versions.takeWhile(_ != "1.2")
|
||||
// Similar to dropTable; try with purge set, and if it fails, make sure we're running
|
||||
// with a version that is older than the minimum (1.2 in this case).
|
||||
try {
|
||||
client.dropPartitions("default", "src_part", Seq(spec), ignoreIfNotExists = true,
|
||||
purge = true)
|
||||
assert(!versionsWithoutPurge.contains(version))
|
||||
} catch {
|
||||
case _: UnsupportedOperationException =>
|
||||
assert(versionsWithoutPurge.contains(version))
|
||||
client.dropPartitions("default", "src_part", Seq(spec), ignoreIfNotExists = true,
|
||||
purge = false)
|
||||
}
|
||||
|
||||
assert(client.getPartitionOption("default", "src_part", spec).isEmpty)
|
||||
}
|
||||
|
||||
|
|
|
@ -222,7 +222,7 @@ class OrcQuerySuite extends QueryTest with BeforeAndAfterAll with OrcTest {
|
|||
sql("INSERT INTO TABLE t SELECT * FROM tmp")
|
||||
checkAnswer(table("t"), (data ++ data).map(Row.fromTuple))
|
||||
}
|
||||
sessionState.catalog.dropTable(TableIdentifier("tmp"), ignoreIfNotExists = true)
|
||||
sessionState.catalog.dropTable(TableIdentifier("tmp"), ignoreIfNotExists = true, purge = false)
|
||||
}
|
||||
|
||||
test("overwriting") {
|
||||
|
@ -232,7 +232,7 @@ class OrcQuerySuite extends QueryTest with BeforeAndAfterAll with OrcTest {
|
|||
sql("INSERT OVERWRITE TABLE t SELECT * FROM tmp")
|
||||
checkAnswer(table("t"), data.map(Row.fromTuple))
|
||||
}
|
||||
sessionState.catalog.dropTable(TableIdentifier("tmp"), ignoreIfNotExists = true)
|
||||
sessionState.catalog.dropTable(TableIdentifier("tmp"), ignoreIfNotExists = true, purge = false)
|
||||
}
|
||||
|
||||
test("self-join") {
|
||||
|
|
Loading…
Reference in a new issue