[SPARK-31891][SQL] Support MSCK REPAIR TABLE .. [{ADD|DROP|SYNC} PARTITIONS]
### What changes were proposed in this pull request? In the PR, I propose to extend the `MSCK REPAIR TABLE` command, and support new options `{ADD|DROP|SYNC} PARTITIONS`. In particular: 1. Extend the logical node `RepairTable`, and add two new flags `enableAddPartitions` and `enableDropPartitions`. 2. Add similar flags to the v1 execution node `AlterTableRecoverPartitionsCommand` 3. Add new method `dropPartitions()` to `AlterTableRecoverPartitionsCommand` which drops partitions from the catalog if their locations in the file system don't exist. 4. Updated public docs about the `MSCK REPAIR TABLE` command: <img width="1037" alt="Screenshot 2021-02-16 at 13 46 39" src="https://user-images.githubusercontent.com/1580697/108052607-7446d280-705d-11eb-8e25-7398254787a4.png"> Closes #31097 ### Why are the changes needed? - The changes allow to recover tables with removed partitions. The example below portraits the problem: ```sql spark-sql> create table tbl2 (col int, part int) partitioned by (part); spark-sql> insert into tbl2 partition (part=1) select 1; spark-sql> insert into tbl2 partition (part=0) select 0; spark-sql> show table extended like 'tbl2' partition (part = 0); default tbl2 false Partition Values: [part=0] Location: file:/Users/maximgekk/proj/apache-spark/spark-warehouse/tbl2/part=0 ... ``` Remove the partition (part = 0) from the filesystem: ``` $ rm -rf /Users/maximgekk/proj/apache-spark/spark-warehouse/tbl2/part=0 ``` Even after recovering, we cannot query the table: ```sql spark-sql> msck repair table tbl2; spark-sql> select * from tbl2; 21/01/08 22:49:13 ERROR SparkSQLDriver: Failed in [select * from tbl2] org.apache.hadoop.mapred.InvalidInputException: Input path does not exist: file:/Users/maximgekk/proj/apache-spark/spark-warehouse/tbl2/part=0 ``` - To have feature parity with Hive: https://cwiki.apache.org/confluence/display/Hive/LanguageManual+DDL#LanguageManualDDL-RecoverPartitions(MSCKREPAIRTABLE) ### Does this PR introduce _any_ user-facing change? Yes. After the changes, we can query recovered table: ```sql spark-sql> msck repair table tbl2 sync partitions; spark-sql> select * from tbl2; 1 1 spark-sql> show partitions tbl2; part=1 ``` ### How was this patch tested? - By running the modified test suite: ``` $ build/sbt -Phive-2.3 -Phive-thriftserver "test:testOnly *MsckRepairTableParserSuite" $ build/sbt -Phive-2.3 -Phive-thriftserver "test:testOnly *PlanResolutionSuite" $ build/sbt -Phive-2.3 -Phive-thriftserver "test:testOnly *AlterTableRecoverPartitionsSuite" $ build/sbt -Phive-2.3 -Phive-thriftserver "test:testOnly *AlterTableRecoverPartitionsParallelSuite" ``` - Added unified v1 and v2 tests for `MSCK REPAIR TABLE`: ``` $ build/sbt -Phive-2.3 -Phive-thriftserver "test:testOnly *MsckRepairTableSuite" ``` Closes #31499 from MaxGekk/repair-table-drop-partitions. Authored-by: Max Gekk <max.gekk@gmail.com> Signed-off-by: Dongjoon Hyun <dhyun@apple.com>
This commit is contained in:
parent
95e45c6257
commit
7f27d33a3c
|
@ -397,6 +397,7 @@ Below is a list of all the keywords in Spark SQL.
|
|||
|STRUCT|non-reserved|non-reserved|non-reserved|
|
||||
|SUBSTR|non-reserved|non-reserved|non-reserved|
|
||||
|SUBSTRING|non-reserved|non-reserved|non-reserved|
|
||||
|SYNC|non-reserved|non-reserved|non-reserved|
|
||||
|TABLE|reserved|non-reserved|reserved|
|
||||
|TABLES|non-reserved|non-reserved|non-reserved|
|
||||
|TABLESAMPLE|non-reserved|non-reserved|reserved|
|
||||
|
|
|
@ -28,7 +28,7 @@ If the table is cached, the command clears cached data of the table and all its
|
|||
### Syntax
|
||||
|
||||
```sql
|
||||
MSCK REPAIR TABLE table_identifier
|
||||
MSCK REPAIR TABLE table_identifier [{ADD|DROP|SYNC} PARTITIONS]
|
||||
```
|
||||
|
||||
### Parameters
|
||||
|
@ -39,6 +39,13 @@ MSCK REPAIR TABLE table_identifier
|
|||
|
||||
**Syntax:** `[ database_name. ] table_name`
|
||||
|
||||
* **`{ADD|DROP|SYNC} PARTITIONS`**
|
||||
|
||||
* If specified, `MSCK REPAIR TABLE` only adds partitions to the session catalog.
|
||||
* **ADD**, the command adds new partitions to the session catalog for all sub-folder in the base table folder that don't belong to any table partitions.
|
||||
* **DROP**, the command drops all partitions from the session catalog that have non-existing locations in the file system.
|
||||
* **SYNC** is the combination of **DROP** and **ADD**.
|
||||
|
||||
### Examples
|
||||
|
||||
```sql
|
||||
|
|
|
@ -229,7 +229,8 @@ statement
|
|||
| LOAD DATA LOCAL? INPATH path=STRING OVERWRITE? INTO TABLE
|
||||
multipartIdentifier partitionSpec? #loadData
|
||||
| TRUNCATE TABLE multipartIdentifier partitionSpec? #truncateTable
|
||||
| MSCK REPAIR TABLE multipartIdentifier #repairTable
|
||||
| MSCK REPAIR TABLE multipartIdentifier
|
||||
(option=(ADD|DROP|SYNC) PARTITIONS)? #repairTable
|
||||
| op=(ADD | LIST) identifier (STRING | .*?) #manageResource
|
||||
| SET ROLE .*? #failNativeCommand
|
||||
| SET TIME ZONE interval #setTimeZone
|
||||
|
@ -1173,6 +1174,7 @@ ansiNonReserved
|
|||
| STRUCT
|
||||
| SUBSTR
|
||||
| SUBSTRING
|
||||
| SYNC
|
||||
| TABLES
|
||||
| TABLESAMPLE
|
||||
| TBLPROPERTIES
|
||||
|
@ -1429,6 +1431,7 @@ nonReserved
|
|||
| STRUCT
|
||||
| SUBSTR
|
||||
| SUBSTRING
|
||||
| SYNC
|
||||
| TABLE
|
||||
| TABLES
|
||||
| TABLESAMPLE
|
||||
|
@ -1687,6 +1690,7 @@ STRATIFY: 'STRATIFY';
|
|||
STRUCT: 'STRUCT';
|
||||
SUBSTR: 'SUBSTR';
|
||||
SUBSTRING: 'SUBSTRING';
|
||||
SYNC: 'SYNC';
|
||||
TABLE: 'TABLE';
|
||||
TABLES: 'TABLES';
|
||||
TABLESAMPLE: 'TABLESAMPLE';
|
||||
|
|
|
@ -3659,11 +3659,24 @@ class AstBuilder extends SqlBaseBaseVisitor[AnyRef] with SQLConfHelper with Logg
|
|||
*
|
||||
* For example:
|
||||
* {{{
|
||||
* MSCK REPAIR TABLE multi_part_name
|
||||
* MSCK REPAIR TABLE multi_part_name [{ADD|DROP|SYNC} PARTITIONS]
|
||||
* }}}
|
||||
*/
|
||||
override def visitRepairTable(ctx: RepairTableContext): LogicalPlan = withOrigin(ctx) {
|
||||
RepairTable(createUnresolvedTable(ctx.multipartIdentifier, "MSCK REPAIR TABLE"))
|
||||
val (enableAddPartitions, enableDropPartitions, option) =
|
||||
if (ctx.SYNC() != null) {
|
||||
(true, true, " ... SYNC PARTITIONS")
|
||||
} else if (ctx.DROP() != null) {
|
||||
(false, true, " ... DROP PARTITIONS")
|
||||
} else if (ctx.ADD() != null) {
|
||||
(true, false, " ... ADD PARTITIONS")
|
||||
} else {
|
||||
(true, false, "")
|
||||
}
|
||||
RepairTable(
|
||||
createUnresolvedTable(ctx.multipartIdentifier, s"MSCK REPAIR TABLE$option"),
|
||||
enableAddPartitions,
|
||||
enableDropPartitions)
|
||||
}
|
||||
|
||||
/**
|
||||
|
|
|
@ -802,7 +802,10 @@ case class DropView(
|
|||
/**
|
||||
* The logical plan of the MSCK REPAIR TABLE command.
|
||||
*/
|
||||
case class RepairTable(child: LogicalPlan) extends Command {
|
||||
case class RepairTable(
|
||||
child: LogicalPlan,
|
||||
enableAddPartitions: Boolean,
|
||||
enableDropPartitions: Boolean) extends Command {
|
||||
override def children: Seq[LogicalPlan] = child :: Nil
|
||||
}
|
||||
|
||||
|
|
|
@ -1913,12 +1913,6 @@ class DDLParserSuite extends AnalysisTest {
|
|||
"missing 'COLUMNS' at '<EOF>'")
|
||||
}
|
||||
|
||||
test("MSCK REPAIR TABLE") {
|
||||
comparePlans(
|
||||
parsePlan("MSCK REPAIR TABLE a.b.c"),
|
||||
RepairTable(UnresolvedTable(Seq("a", "b", "c"), "MSCK REPAIR TABLE", None)))
|
||||
}
|
||||
|
||||
test("LOAD DATA INTO table") {
|
||||
comparePlans(
|
||||
parsePlan("LOAD DATA INPATH 'filepath' INTO TABLE a.b.c"),
|
||||
|
|
|
@ -376,8 +376,12 @@ class ResolveSessionCatalog(val catalogManager: CatalogManager)
|
|||
case AnalyzeColumn(ResolvedV1TableOrViewIdentifier(ident), columnNames, allColumns) =>
|
||||
AnalyzeColumnCommand(ident.asTableIdentifier, columnNames, allColumns)
|
||||
|
||||
case RepairTable(ResolvedV1TableIdentifier(ident)) =>
|
||||
AlterTableRecoverPartitionsCommand(ident.asTableIdentifier, "MSCK REPAIR TABLE")
|
||||
case RepairTable(ResolvedV1TableIdentifier(ident), addPartitions, dropPartitions) =>
|
||||
AlterTableRecoverPartitionsCommand(
|
||||
ident.asTableIdentifier,
|
||||
addPartitions,
|
||||
dropPartitions,
|
||||
"MSCK REPAIR TABLE")
|
||||
|
||||
case LoadData(ResolvedV1TableIdentifier(ident), path, isLocal, isOverwrite, partition) =>
|
||||
LoadDataCommand(
|
||||
|
@ -420,6 +424,8 @@ class ResolveSessionCatalog(val catalogManager: CatalogManager)
|
|||
case RecoverPartitions(ResolvedV1TableIdentifier(ident)) =>
|
||||
AlterTableRecoverPartitionsCommand(
|
||||
ident.asTableIdentifier,
|
||||
enableAddPartitions = true,
|
||||
enableDropPartitions = false,
|
||||
"ALTER TABLE RECOVER PARTITIONS")
|
||||
|
||||
case AddPartitions(ResolvedV1TableIdentifier(ident), partSpecsAndLocs, ifNotExists) =>
|
||||
|
|
|
@ -189,7 +189,10 @@ case class CreateDataSourceTableAsSelectCommand(
|
|||
case fs: HadoopFsRelation if table.partitionColumnNames.nonEmpty &&
|
||||
sparkSession.sqlContext.conf.manageFilesourcePartitions =>
|
||||
// Need to recover partitions into the metastore so our saved data is visible.
|
||||
sessionState.executePlan(AlterTableRecoverPartitionsCommand(table.identifier)).toRdd
|
||||
sessionState.executePlan(AlterTableRecoverPartitionsCommand(
|
||||
table.identifier,
|
||||
enableAddPartitions = true,
|
||||
enableDropPartitions = false)).toRdd
|
||||
case _ =>
|
||||
}
|
||||
}
|
||||
|
|
|
@ -597,11 +597,13 @@ case class PartitionStatistics(numFiles: Int, totalSize: Long)
|
|||
* The syntax of this command is:
|
||||
* {{{
|
||||
* ALTER TABLE table RECOVER PARTITIONS;
|
||||
* MSCK REPAIR TABLE table;
|
||||
* MSCK REPAIR TABLE table [{ADD|DROP|SYNC} PARTITIONS];
|
||||
* }}}
|
||||
*/
|
||||
case class AlterTableRecoverPartitionsCommand(
|
||||
tableName: TableIdentifier,
|
||||
enableAddPartitions: Boolean,
|
||||
enableDropPartitions: Boolean,
|
||||
cmd: String = "ALTER TABLE RECOVER PARTITIONS") extends RunnableCommand {
|
||||
|
||||
// These are list of statistics that can be collected quickly without requiring a scan of the data
|
||||
|
@ -645,6 +647,10 @@ case class AlterTableRecoverPartitionsCommand(
|
|||
val hadoopConf = spark.sessionState.newHadoopConf()
|
||||
val fs = root.getFileSystem(hadoopConf)
|
||||
|
||||
val droppedAmount = if (enableDropPartitions) {
|
||||
dropPartitions(catalog, fs)
|
||||
} else 0
|
||||
val addedAmount = if (enableAddPartitions) {
|
||||
val threshold = spark.sparkContext.conf.get(RDD_PARALLEL_LISTING_THRESHOLD)
|
||||
val pathFilter = getPathFilter(hadoopConf)
|
||||
|
||||
|
@ -667,12 +673,14 @@ case class AlterTableRecoverPartitionsCommand(
|
|||
logInfo(s"Finished to gather the fast stats for all $total partitions.")
|
||||
|
||||
addPartitions(spark, table, partitionSpecsAndLocs, partitionStats)
|
||||
total
|
||||
} else 0
|
||||
// Updates the table to indicate that its partition metadata is stored in the Hive metastore.
|
||||
// This is always the case for Hive format tables, but is not true for Datasource tables created
|
||||
// before Spark 2.1 unless they are converted via `msck repair table`.
|
||||
spark.sessionState.catalog.alterTable(table.copy(tracksPartitionsInCatalog = true))
|
||||
spark.catalog.refreshTable(tableIdentWithDB)
|
||||
logInfo(s"Recovered all partitions ($total).")
|
||||
logInfo(s"Recovered all partitions: added ($addedAmount), dropped ($droppedAmount).")
|
||||
Seq.empty[Row]
|
||||
}
|
||||
|
||||
|
@ -791,8 +799,28 @@ case class AlterTableRecoverPartitionsCommand(
|
|||
logDebug(s"Recovered ${parts.length} partitions ($done/$total so far)")
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
// Drops the partitions that do not exist in the file system
|
||||
private def dropPartitions(catalog: SessionCatalog, fs: FileSystem): Int = {
|
||||
val dropPartSpecs = ThreadUtils.parmap(
|
||||
catalog.listPartitions(tableName),
|
||||
"AlterTableRecoverPartitionsCommand: non-existing partitions",
|
||||
maxThreads = 8) { partition =>
|
||||
partition.storage.locationUri.flatMap { uri =>
|
||||
if (fs.exists(new Path(uri))) None else Some(partition.spec)
|
||||
}
|
||||
}.flatten
|
||||
catalog.dropPartitions(
|
||||
tableName,
|
||||
dropPartSpecs,
|
||||
ignoreIfNotExists = true,
|
||||
purge = false,
|
||||
// Since we have already checked that partition directories do not exist, we can avoid
|
||||
// additional calls to the file system at the catalog side by setting this flag.
|
||||
retainData = true)
|
||||
dropPartSpecs.length
|
||||
}
|
||||
}
|
||||
|
||||
/**
|
||||
* A command that sets the location of a table or a partition.
|
||||
|
|
|
@ -409,7 +409,7 @@ class DataSourceV2Strategy(session: SparkSession) extends Strategy with Predicat
|
|||
table,
|
||||
pattern.map(_.asInstanceOf[ResolvedPartitionSpec])) :: Nil
|
||||
|
||||
case RepairTable(_: ResolvedTable) =>
|
||||
case RepairTable(_: ResolvedTable, _, _) =>
|
||||
throw new AnalysisException("MSCK REPAIR TABLE is not supported for v2 tables.")
|
||||
|
||||
case r: CacheTable =>
|
||||
|
|
|
@ -17,6 +17,9 @@
|
|||
|
||||
package org.apache.spark.sql.execution.command
|
||||
|
||||
import java.io.File
|
||||
|
||||
import org.apache.commons.io.FileUtils
|
||||
import org.apache.hadoop.fs.{FileSystem, Path}
|
||||
import org.scalactic.source.Position
|
||||
import org.scalatest.Tag
|
||||
|
@ -144,4 +147,26 @@ trait DDLCommandTestUtils extends SQLTestUtils {
|
|||
val fs = root.getFileSystem(spark.sessionState.newHadoopConf())
|
||||
f(fs, root)
|
||||
}
|
||||
|
||||
def getPartitionLocation(tableName: String, part: String): String = {
|
||||
val idents = tableName.split('.')
|
||||
val table = idents.last
|
||||
val catalogAndNs = idents.init
|
||||
val in = if (catalogAndNs.isEmpty) "" else s"IN ${catalogAndNs.mkString(".")}"
|
||||
val information = sql(s"SHOW TABLE EXTENDED $in LIKE '$table' PARTITION ($part)")
|
||||
.select("information")
|
||||
.first().getString(0)
|
||||
information
|
||||
.split("\\r?\\n")
|
||||
.filter(_.startsWith("Location:"))
|
||||
.head
|
||||
.replace("Location: file:", "")
|
||||
}
|
||||
|
||||
def copyPartition(tableName: String, from: String, to: String): String = {
|
||||
val part0Loc = getPartitionLocation(tableName, from)
|
||||
val part1Loc = part0Loc.replace(from, to)
|
||||
FileUtils.copyDirectory(new File(part0Loc), new File(part1Loc))
|
||||
part1Loc
|
||||
}
|
||||
}
|
||||
|
|
|
@ -0,0 +1,69 @@
|
|||
/*
|
||||
* Licensed to the Apache Software Foundation (ASF) under one or more
|
||||
* contributor license agreements. See the NOTICE file distributed with
|
||||
* this work for additional information regarding copyright ownership.
|
||||
* The ASF licenses this file to You under the Apache License, Version 2.0
|
||||
* (the "License"); you may not use this file except in compliance with
|
||||
* the License. You may obtain a copy of the License at
|
||||
*
|
||||
* http://www.apache.org/licenses/LICENSE-2.0
|
||||
*
|
||||
* Unless required by applicable law or agreed to in writing, software
|
||||
* distributed under the License is distributed on an "AS IS" BASIS,
|
||||
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
|
||||
* See the License for the specific language governing permissions and
|
||||
* limitations under the License.
|
||||
*/
|
||||
|
||||
package org.apache.spark.sql.execution.command
|
||||
|
||||
import org.apache.spark.sql.catalyst.analysis.{AnalysisTest, UnresolvedTable}
|
||||
import org.apache.spark.sql.catalyst.parser.CatalystSqlParser.parsePlan
|
||||
import org.apache.spark.sql.catalyst.plans.logical.RepairTable
|
||||
|
||||
class MsckRepairTableParserSuite extends AnalysisTest {
|
||||
test("repair a table") {
|
||||
comparePlans(
|
||||
parsePlan("MSCK REPAIR TABLE a.b.c"),
|
||||
RepairTable(
|
||||
UnresolvedTable(Seq("a", "b", "c"), "MSCK REPAIR TABLE", None),
|
||||
enableAddPartitions = true,
|
||||
enableDropPartitions = false))
|
||||
}
|
||||
|
||||
test("add partitions") {
|
||||
comparePlans(
|
||||
parsePlan("msck repair table ns.tbl add partitions"),
|
||||
RepairTable(
|
||||
UnresolvedTable(
|
||||
Seq("ns", "tbl"),
|
||||
"MSCK REPAIR TABLE ... ADD PARTITIONS",
|
||||
None),
|
||||
enableAddPartitions = true,
|
||||
enableDropPartitions = false))
|
||||
}
|
||||
|
||||
test("drop partitions") {
|
||||
comparePlans(
|
||||
parsePlan("MSCK repair table TBL Drop Partitions"),
|
||||
RepairTable(
|
||||
UnresolvedTable(
|
||||
Seq("TBL"),
|
||||
"MSCK REPAIR TABLE ... DROP PARTITIONS",
|
||||
None),
|
||||
enableAddPartitions = false,
|
||||
enableDropPartitions = true))
|
||||
}
|
||||
|
||||
test("sync partitions") {
|
||||
comparePlans(
|
||||
parsePlan("MSCK REPAIR TABLE spark_catalog.ns.tbl SYNC PARTITIONS"),
|
||||
RepairTable(
|
||||
UnresolvedTable(
|
||||
Seq("spark_catalog", "ns", "tbl"),
|
||||
"MSCK REPAIR TABLE ... SYNC PARTITIONS",
|
||||
None),
|
||||
enableAddPartitions = true,
|
||||
enableDropPartitions = true))
|
||||
}
|
||||
}
|
|
@ -0,0 +1,38 @@
|
|||
/*
|
||||
* Licensed to the Apache Software Foundation (ASF) under one or more
|
||||
* contributor license agreements. See the NOTICE file distributed with
|
||||
* this work for additional information regarding copyright ownership.
|
||||
* The ASF licenses this file to You under the Apache License, Version 2.0
|
||||
* (the "License"); you may not use this file except in compliance with
|
||||
* the License. You may obtain a copy of the License at
|
||||
*
|
||||
* http://www.apache.org/licenses/LICENSE-2.0
|
||||
*
|
||||
* Unless required by applicable law or agreed to in writing, software
|
||||
* distributed under the License is distributed on an "AS IS" BASIS,
|
||||
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
|
||||
* See the License for the specific language governing permissions and
|
||||
* limitations under the License.
|
||||
*/
|
||||
|
||||
package org.apache.spark.sql.execution.command
|
||||
|
||||
import org.apache.spark.sql.QueryTest
|
||||
|
||||
/**
|
||||
* This base suite contains unified tests for the `MSCK REPAIR TABLE` command that
|
||||
* check V1 and V2 table catalogs. The tests that cannot run for all supported catalogs are
|
||||
* located in more specific test suites:
|
||||
*
|
||||
* - V2 table catalog tests:
|
||||
* `org.apache.spark.sql.execution.command.v2.MsckRepairTableSuite`
|
||||
* - V1 table catalog tests:
|
||||
* `org.apache.spark.sql.execution.command.v1.MsckRepairTableSuiteBase`
|
||||
* - V1 In-Memory catalog:
|
||||
* `org.apache.spark.sql.execution.command.v1.MsckRepairTableSuite`
|
||||
* - V1 Hive External catalog:
|
||||
* `org.apache.spark.sql.hive.execution.command.MsckRepairTableSuite`
|
||||
*/
|
||||
trait MsckRepairTableSuiteBase extends QueryTest with DDLCommandTestUtils {
|
||||
override val command = "MSCK REPAIR TABLE"
|
||||
}
|
|
@ -17,10 +17,6 @@
|
|||
|
||||
package org.apache.spark.sql.execution.command.v1
|
||||
|
||||
import java.io.File
|
||||
|
||||
import org.apache.commons.io.FileUtils
|
||||
|
||||
import org.apache.spark.sql.{AnalysisException, Row}
|
||||
import org.apache.spark.sql.execution.command
|
||||
import org.apache.spark.sql.internal.SQLConf
|
||||
|
@ -47,24 +43,6 @@ trait AlterTableAddPartitionSuiteBase extends command.AlterTableAddPartitionSuit
|
|||
}
|
||||
}
|
||||
|
||||
private def copyPartition(tableName: String, from: String, to: String): String = {
|
||||
val idents = tableName.split('.')
|
||||
val table = idents.last
|
||||
val catalogAndNs = idents.init
|
||||
val in = if (catalogAndNs.isEmpty) "" else s"IN ${catalogAndNs.mkString(".")}"
|
||||
val information = sql(s"SHOW TABLE EXTENDED $in LIKE '$table' PARTITION ($from)")
|
||||
.select("information")
|
||||
.first().getString(0)
|
||||
val part0Loc = information
|
||||
.split("\\r?\\n")
|
||||
.filter(_.startsWith("Location:"))
|
||||
.head
|
||||
.replace("Location: file:", "")
|
||||
val part1Loc = part0Loc.replace(from, to)
|
||||
FileUtils.copyDirectory(new File(part0Loc), new File(part1Loc))
|
||||
part1Loc
|
||||
}
|
||||
|
||||
test("SPARK-34055: refresh cache in partition adding") {
|
||||
withTable("t") {
|
||||
sql(s"CREATE TABLE t (id int, part int) $defaultUsing PARTITIONED BY (part)")
|
||||
|
|
|
@ -0,0 +1,77 @@
|
|||
/*
|
||||
* Licensed to the Apache Software Foundation (ASF) under one or more
|
||||
* contributor license agreements. See the NOTICE file distributed with
|
||||
* this work for additional information regarding copyright ownership.
|
||||
* The ASF licenses this file to You under the Apache License, Version 2.0
|
||||
* (the "License"); you may not use this file except in compliance with
|
||||
* the License. You may obtain a copy of the License at
|
||||
*
|
||||
* http://www.apache.org/licenses/LICENSE-2.0
|
||||
*
|
||||
* Unless required by applicable law or agreed to in writing, software
|
||||
* distributed under the License is distributed on an "AS IS" BASIS,
|
||||
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
|
||||
* See the License for the specific language governing permissions and
|
||||
* limitations under the License.
|
||||
*/
|
||||
|
||||
package org.apache.spark.sql.execution.command.v1
|
||||
|
||||
import java.io.File
|
||||
|
||||
import org.apache.commons.io.FileUtils
|
||||
|
||||
import org.apache.spark.sql.Row
|
||||
import org.apache.spark.sql.execution.command
|
||||
|
||||
/**
|
||||
* This base suite contains unified tests for the `MSCK REPAIR TABLE` command that
|
||||
* check V1 table catalogs. The tests that cannot run for all V1 catalogs are located in more
|
||||
* specific test suites:
|
||||
*
|
||||
* - V1 In-Memory catalog:
|
||||
* `org.apache.spark.sql.execution.command.v1.MsckRepairTableSuite`
|
||||
* - V1 Hive External catalog:
|
||||
* `org.apache.spark.sql.hive.execution.command.MsckRepairTableSuite`
|
||||
*/
|
||||
trait MsckRepairTableSuiteBase extends command.MsckRepairTableSuiteBase {
|
||||
def deletePartitionDir(tableName: String, part: String): Unit = {
|
||||
val partLoc = getPartitionLocation(tableName, part)
|
||||
FileUtils.deleteDirectory(new File(partLoc))
|
||||
}
|
||||
|
||||
test("drop partitions") {
|
||||
withNamespaceAndTable("ns", "tbl") { t =>
|
||||
sql(s"CREATE TABLE $t (col INT, part INT) $defaultUsing PARTITIONED BY (part)")
|
||||
sql(s"INSERT INTO $t PARTITION (part=0) SELECT 0")
|
||||
sql(s"INSERT INTO $t PARTITION (part=1) SELECT 1")
|
||||
|
||||
checkAnswer(spark.table(t), Seq(Row(0, 0), Row(1, 1)))
|
||||
deletePartitionDir(t, "part=1")
|
||||
sql(s"MSCK REPAIR TABLE $t DROP PARTITIONS")
|
||||
checkPartitions(t, Map("part" -> "0"))
|
||||
checkAnswer(spark.table(t), Seq(Row(0, 0)))
|
||||
}
|
||||
}
|
||||
|
||||
test("sync partitions") {
|
||||
withNamespaceAndTable("ns", "tbl") { t =>
|
||||
sql(s"CREATE TABLE $t (col INT, part INT) $defaultUsing PARTITIONED BY (part)")
|
||||
sql(s"INSERT INTO $t PARTITION (part=0) SELECT 0")
|
||||
sql(s"INSERT INTO $t PARTITION (part=1) SELECT 1")
|
||||
|
||||
checkAnswer(sql(s"SELECT col, part FROM $t"), Seq(Row(0, 0), Row(1, 1)))
|
||||
copyPartition(t, "part=0", "part=2")
|
||||
deletePartitionDir(t, "part=0")
|
||||
sql(s"MSCK REPAIR TABLE $t SYNC PARTITIONS")
|
||||
checkPartitions(t, Map("part" -> "1"), Map("part" -> "2"))
|
||||
checkAnswer(sql(s"SELECT col, part FROM $t"), Seq(Row(1, 1), Row(0, 2)))
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
/**
|
||||
* The class contains tests for the `MSCK REPAIR TABLE` command to check
|
||||
* V1 In-Memory table catalog.
|
||||
*/
|
||||
class MsckRepairTableSuite extends MsckRepairTableSuiteBase with CommandSuiteBase
|
|
@ -0,0 +1,41 @@
|
|||
/*
|
||||
* Licensed to the Apache Software Foundation (ASF) under one or more
|
||||
* contributor license agreements. See the NOTICE file distributed with
|
||||
* this work for additional information regarding copyright ownership.
|
||||
* The ASF licenses this file to You under the Apache License, Version 2.0
|
||||
* (the "License"); you may not use this file except in compliance with
|
||||
* the License. You may obtain a copy of the License at
|
||||
*
|
||||
* http://www.apache.org/licenses/LICENSE-2.0
|
||||
*
|
||||
* Unless required by applicable law or agreed to in writing, software
|
||||
* distributed under the License is distributed on an "AS IS" BASIS,
|
||||
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
|
||||
* See the License for the specific language governing permissions and
|
||||
* limitations under the License.
|
||||
*/
|
||||
|
||||
package org.apache.spark.sql.execution.command.v2
|
||||
|
||||
import org.apache.spark.sql.AnalysisException
|
||||
import org.apache.spark.sql.execution.command
|
||||
|
||||
/**
|
||||
* The class contains tests for the `MSCK REPAIR TABLE` command
|
||||
* to check V2 table catalogs.
|
||||
*/
|
||||
class MsckRepairTableSuite
|
||||
extends command.MsckRepairTableSuiteBase
|
||||
with CommandSuiteBase {
|
||||
|
||||
// TODO(SPARK-34397): Support v2 `MSCK REPAIR TABLE`
|
||||
test("repairing of v2 tables is not supported") {
|
||||
withNamespaceAndTable("ns", "tbl") { t =>
|
||||
spark.sql(s"CREATE TABLE $t (id bigint, data string) $defaultUsing")
|
||||
val errMsg = intercept[AnalysisException] {
|
||||
sql(s"MSCK REPAIR TABLE $t")
|
||||
}.getMessage
|
||||
assert(errMsg.contains("MSCK REPAIR TABLE is not supported for v2 tables"))
|
||||
}
|
||||
}
|
||||
}
|
|
@ -0,0 +1,26 @@
|
|||
/*
|
||||
* Licensed to the Apache Software Foundation (ASF) under one or more
|
||||
* contributor license agreements. See the NOTICE file distributed with
|
||||
* this work for additional information regarding copyright ownership.
|
||||
* The ASF licenses this file to You under the Apache License, Version 2.0
|
||||
* (the "License"); you may not use this file except in compliance with
|
||||
* the License. You may obtain a copy of the License at
|
||||
*
|
||||
* http://www.apache.org/licenses/LICENSE-2.0
|
||||
*
|
||||
* Unless required by applicable law or agreed to in writing, software
|
||||
* distributed under the License is distributed on an "AS IS" BASIS,
|
||||
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
|
||||
* See the License for the specific language governing permissions and
|
||||
* limitations under the License.
|
||||
*/
|
||||
|
||||
package org.apache.spark.sql.hive.execution.command
|
||||
|
||||
import org.apache.spark.sql.execution.command.v1
|
||||
|
||||
/**
|
||||
* The class contains tests for the `MSCK REPAIR TABLE` command to check
|
||||
* V1 Hive external table catalog.
|
||||
*/
|
||||
class MsckRepairTableSuite extends v1.MsckRepairTableSuiteBase with CommandSuiteBase
|
Loading…
Reference in a new issue