diff --git a/docs/sql-ref-ansi-compliance.md b/docs/sql-ref-ansi-compliance.md index f97b166206..48f6a6e508 100644 --- a/docs/sql-ref-ansi-compliance.md +++ b/docs/sql-ref-ansi-compliance.md @@ -397,6 +397,7 @@ Below is a list of all the keywords in Spark SQL. |STRUCT|non-reserved|non-reserved|non-reserved| |SUBSTR|non-reserved|non-reserved|non-reserved| |SUBSTRING|non-reserved|non-reserved|non-reserved| +|SYNC|non-reserved|non-reserved|non-reserved| |TABLE|reserved|non-reserved|reserved| |TABLES|non-reserved|non-reserved|non-reserved| |TABLESAMPLE|non-reserved|non-reserved|reserved| diff --git a/docs/sql-ref-syntax-ddl-repair-table.md b/docs/sql-ref-syntax-ddl-repair-table.md index 36145126d2..41499c3314 100644 --- a/docs/sql-ref-syntax-ddl-repair-table.md +++ b/docs/sql-ref-syntax-ddl-repair-table.md @@ -28,7 +28,7 @@ If the table is cached, the command clears cached data of the table and all its ### Syntax ```sql -MSCK REPAIR TABLE table_identifier +MSCK REPAIR TABLE table_identifier [{ADD|DROP|SYNC} PARTITIONS] ``` ### Parameters @@ -39,6 +39,13 @@ MSCK REPAIR TABLE table_identifier **Syntax:** `[ database_name. ] table_name` +* **`{ADD|DROP|SYNC} PARTITIONS`** + + * If specified, `MSCK REPAIR TABLE` only adds partitions to the session catalog. + * **ADD**, the command adds new partitions to the session catalog for all sub-folder in the base table folder that don't belong to any table partitions. + * **DROP**, the command drops all partitions from the session catalog that have non-existing locations in the file system. + * **SYNC** is the combination of **DROP** and **ADD**. + ### Examples ```sql diff --git a/sql/catalyst/src/main/antlr4/org/apache/spark/sql/catalyst/parser/SqlBase.g4 b/sql/catalyst/src/main/antlr4/org/apache/spark/sql/catalyst/parser/SqlBase.g4 index ab4b783350..50ef3764f3 100644 --- a/sql/catalyst/src/main/antlr4/org/apache/spark/sql/catalyst/parser/SqlBase.g4 +++ b/sql/catalyst/src/main/antlr4/org/apache/spark/sql/catalyst/parser/SqlBase.g4 @@ -229,7 +229,8 @@ statement | LOAD DATA LOCAL? INPATH path=STRING OVERWRITE? INTO TABLE multipartIdentifier partitionSpec? #loadData | TRUNCATE TABLE multipartIdentifier partitionSpec? #truncateTable - | MSCK REPAIR TABLE multipartIdentifier #repairTable + | MSCK REPAIR TABLE multipartIdentifier + (option=(ADD|DROP|SYNC) PARTITIONS)? #repairTable | op=(ADD | LIST) identifier (STRING | .*?) #manageResource | SET ROLE .*? #failNativeCommand | SET TIME ZONE interval #setTimeZone @@ -1173,6 +1174,7 @@ ansiNonReserved | STRUCT | SUBSTR | SUBSTRING + | SYNC | TABLES | TABLESAMPLE | TBLPROPERTIES @@ -1429,6 +1431,7 @@ nonReserved | STRUCT | SUBSTR | SUBSTRING + | SYNC | TABLE | TABLES | TABLESAMPLE @@ -1687,6 +1690,7 @@ STRATIFY: 'STRATIFY'; STRUCT: 'STRUCT'; SUBSTR: 'SUBSTR'; SUBSTRING: 'SUBSTRING'; +SYNC: 'SYNC'; TABLE: 'TABLE'; TABLES: 'TABLES'; TABLESAMPLE: 'TABLESAMPLE'; diff --git a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/parser/AstBuilder.scala b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/parser/AstBuilder.scala index 595a3a5ba5..23f9c8398b 100644 --- a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/parser/AstBuilder.scala +++ b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/parser/AstBuilder.scala @@ -3659,11 +3659,24 @@ class AstBuilder extends SqlBaseBaseVisitor[AnyRef] with SQLConfHelper with Logg * * For example: * {{{ - * MSCK REPAIR TABLE multi_part_name + * MSCK REPAIR TABLE multi_part_name [{ADD|DROP|SYNC} PARTITIONS] * }}} */ override def visitRepairTable(ctx: RepairTableContext): LogicalPlan = withOrigin(ctx) { - RepairTable(createUnresolvedTable(ctx.multipartIdentifier, "MSCK REPAIR TABLE")) + val (enableAddPartitions, enableDropPartitions, option) = + if (ctx.SYNC() != null) { + (true, true, " ... SYNC PARTITIONS") + } else if (ctx.DROP() != null) { + (false, true, " ... DROP PARTITIONS") + } else if (ctx.ADD() != null) { + (true, false, " ... ADD PARTITIONS") + } else { + (true, false, "") + } + RepairTable( + createUnresolvedTable(ctx.multipartIdentifier, s"MSCK REPAIR TABLE$option"), + enableAddPartitions, + enableDropPartitions) } /** diff --git a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/plans/logical/v2Commands.scala b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/plans/logical/v2Commands.scala index 8797b107f9..12f13e73ea 100644 --- a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/plans/logical/v2Commands.scala +++ b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/plans/logical/v2Commands.scala @@ -802,7 +802,10 @@ case class DropView( /** * The logical plan of the MSCK REPAIR TABLE command. */ -case class RepairTable(child: LogicalPlan) extends Command { +case class RepairTable( + child: LogicalPlan, + enableAddPartitions: Boolean, + enableDropPartitions: Boolean) extends Command { override def children: Seq[LogicalPlan] = child :: Nil } diff --git a/sql/catalyst/src/test/scala/org/apache/spark/sql/catalyst/parser/DDLParserSuite.scala b/sql/catalyst/src/test/scala/org/apache/spark/sql/catalyst/parser/DDLParserSuite.scala index cb9dda8260..870ff388ed 100644 --- a/sql/catalyst/src/test/scala/org/apache/spark/sql/catalyst/parser/DDLParserSuite.scala +++ b/sql/catalyst/src/test/scala/org/apache/spark/sql/catalyst/parser/DDLParserSuite.scala @@ -1913,12 +1913,6 @@ class DDLParserSuite extends AnalysisTest { "missing 'COLUMNS' at ''") } - test("MSCK REPAIR TABLE") { - comparePlans( - parsePlan("MSCK REPAIR TABLE a.b.c"), - RepairTable(UnresolvedTable(Seq("a", "b", "c"), "MSCK REPAIR TABLE", None))) - } - test("LOAD DATA INTO table") { comparePlans( parsePlan("LOAD DATA INPATH 'filepath' INTO TABLE a.b.c"), diff --git a/sql/core/src/main/scala/org/apache/spark/sql/catalyst/analysis/ResolveSessionCatalog.scala b/sql/core/src/main/scala/org/apache/spark/sql/catalyst/analysis/ResolveSessionCatalog.scala index 7a8f4dd390..55e8c5fba0 100644 --- a/sql/core/src/main/scala/org/apache/spark/sql/catalyst/analysis/ResolveSessionCatalog.scala +++ b/sql/core/src/main/scala/org/apache/spark/sql/catalyst/analysis/ResolveSessionCatalog.scala @@ -376,8 +376,12 @@ class ResolveSessionCatalog(val catalogManager: CatalogManager) case AnalyzeColumn(ResolvedV1TableOrViewIdentifier(ident), columnNames, allColumns) => AnalyzeColumnCommand(ident.asTableIdentifier, columnNames, allColumns) - case RepairTable(ResolvedV1TableIdentifier(ident)) => - AlterTableRecoverPartitionsCommand(ident.asTableIdentifier, "MSCK REPAIR TABLE") + case RepairTable(ResolvedV1TableIdentifier(ident), addPartitions, dropPartitions) => + AlterTableRecoverPartitionsCommand( + ident.asTableIdentifier, + addPartitions, + dropPartitions, + "MSCK REPAIR TABLE") case LoadData(ResolvedV1TableIdentifier(ident), path, isLocal, isOverwrite, partition) => LoadDataCommand( @@ -420,6 +424,8 @@ class ResolveSessionCatalog(val catalogManager: CatalogManager) case RecoverPartitions(ResolvedV1TableIdentifier(ident)) => AlterTableRecoverPartitionsCommand( ident.asTableIdentifier, + enableAddPartitions = true, + enableDropPartitions = false, "ALTER TABLE RECOVER PARTITIONS") case AddPartitions(ResolvedV1TableIdentifier(ident), partSpecsAndLocs, ifNotExists) => diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/command/createDataSourceTables.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/command/createDataSourceTables.scala index be7fa7b1b4..b3e48e37c6 100644 --- a/sql/core/src/main/scala/org/apache/spark/sql/execution/command/createDataSourceTables.scala +++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/command/createDataSourceTables.scala @@ -189,7 +189,10 @@ case class CreateDataSourceTableAsSelectCommand( case fs: HadoopFsRelation if table.partitionColumnNames.nonEmpty && sparkSession.sqlContext.conf.manageFilesourcePartitions => // Need to recover partitions into the metastore so our saved data is visible. - sessionState.executePlan(AlterTableRecoverPartitionsCommand(table.identifier)).toRdd + sessionState.executePlan(AlterTableRecoverPartitionsCommand( + table.identifier, + enableAddPartitions = true, + enableDropPartitions = false)).toRdd case _ => } } diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/command/ddl.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/command/ddl.scala index 7b4feb4af3..f0219efbf9 100644 --- a/sql/core/src/main/scala/org/apache/spark/sql/execution/command/ddl.scala +++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/command/ddl.scala @@ -597,11 +597,13 @@ case class PartitionStatistics(numFiles: Int, totalSize: Long) * The syntax of this command is: * {{{ * ALTER TABLE table RECOVER PARTITIONS; - * MSCK REPAIR TABLE table; + * MSCK REPAIR TABLE table [{ADD|DROP|SYNC} PARTITIONS]; * }}} */ case class AlterTableRecoverPartitionsCommand( tableName: TableIdentifier, + enableAddPartitions: Boolean, + enableDropPartitions: Boolean, cmd: String = "ALTER TABLE RECOVER PARTITIONS") extends RunnableCommand { // These are list of statistics that can be collected quickly without requiring a scan of the data @@ -645,34 +647,40 @@ case class AlterTableRecoverPartitionsCommand( val hadoopConf = spark.sessionState.newHadoopConf() val fs = root.getFileSystem(hadoopConf) - val threshold = spark.sparkContext.conf.get(RDD_PARALLEL_LISTING_THRESHOLD) - val pathFilter = getPathFilter(hadoopConf) + val droppedAmount = if (enableDropPartitions) { + dropPartitions(catalog, fs) + } else 0 + val addedAmount = if (enableAddPartitions) { + val threshold = spark.sparkContext.conf.get(RDD_PARALLEL_LISTING_THRESHOLD) + val pathFilter = getPathFilter(hadoopConf) - val evalPool = ThreadUtils.newForkJoinPool("AlterTableRecoverPartitionsCommand", 8) - val partitionSpecsAndLocs: GenSeq[(TablePartitionSpec, Path)] = - try { - scanPartitions(spark, fs, pathFilter, root, Map(), table.partitionColumnNames, threshold, - spark.sessionState.conf.resolver, new ForkJoinTaskSupport(evalPool)).seq - } finally { - evalPool.shutdown() + val evalPool = ThreadUtils.newForkJoinPool("AlterTableRecoverPartitionsCommand", 8) + val partitionSpecsAndLocs: GenSeq[(TablePartitionSpec, Path)] = + try { + scanPartitions(spark, fs, pathFilter, root, Map(), table.partitionColumnNames, threshold, + spark.sessionState.conf.resolver, new ForkJoinTaskSupport(evalPool)).seq + } finally { + evalPool.shutdown() + } + val total = partitionSpecsAndLocs.length + logInfo(s"Found $total partitions in $root") + + val partitionStats = if (spark.sqlContext.conf.gatherFastStats) { + gatherPartitionStats(spark, partitionSpecsAndLocs, fs, pathFilter, threshold) + } else { + GenMap.empty[String, PartitionStatistics] } - val total = partitionSpecsAndLocs.length - logInfo(s"Found $total partitions in $root") + logInfo(s"Finished to gather the fast stats for all $total partitions.") - val partitionStats = if (spark.sqlContext.conf.gatherFastStats) { - gatherPartitionStats(spark, partitionSpecsAndLocs, fs, pathFilter, threshold) - } else { - GenMap.empty[String, PartitionStatistics] - } - logInfo(s"Finished to gather the fast stats for all $total partitions.") - - addPartitions(spark, table, partitionSpecsAndLocs, partitionStats) + addPartitions(spark, table, partitionSpecsAndLocs, partitionStats) + total + } else 0 // Updates the table to indicate that its partition metadata is stored in the Hive metastore. // This is always the case for Hive format tables, but is not true for Datasource tables created // before Spark 2.1 unless they are converted via `msck repair table`. spark.sessionState.catalog.alterTable(table.copy(tracksPartitionsInCatalog = true)) spark.catalog.refreshTable(tableIdentWithDB) - logInfo(s"Recovered all partitions ($total).") + logInfo(s"Recovered all partitions: added ($addedAmount), dropped ($droppedAmount).") Seq.empty[Row] } @@ -791,8 +799,28 @@ case class AlterTableRecoverPartitionsCommand( logDebug(s"Recovered ${parts.length} partitions ($done/$total so far)") } } -} + // Drops the partitions that do not exist in the file system + private def dropPartitions(catalog: SessionCatalog, fs: FileSystem): Int = { + val dropPartSpecs = ThreadUtils.parmap( + catalog.listPartitions(tableName), + "AlterTableRecoverPartitionsCommand: non-existing partitions", + maxThreads = 8) { partition => + partition.storage.locationUri.flatMap { uri => + if (fs.exists(new Path(uri))) None else Some(partition.spec) + } + }.flatten + catalog.dropPartitions( + tableName, + dropPartSpecs, + ignoreIfNotExists = true, + purge = false, + // Since we have already checked that partition directories do not exist, we can avoid + // additional calls to the file system at the catalog side by setting this flag. + retainData = true) + dropPartSpecs.length + } +} /** * A command that sets the location of a table or a partition. diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/v2/DataSourceV2Strategy.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/v2/DataSourceV2Strategy.scala index 3eed7160b6..16a6b2ef2f 100644 --- a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/v2/DataSourceV2Strategy.scala +++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/v2/DataSourceV2Strategy.scala @@ -409,7 +409,7 @@ class DataSourceV2Strategy(session: SparkSession) extends Strategy with Predicat table, pattern.map(_.asInstanceOf[ResolvedPartitionSpec])) :: Nil - case RepairTable(_: ResolvedTable) => + case RepairTable(_: ResolvedTable, _, _) => throw new AnalysisException("MSCK REPAIR TABLE is not supported for v2 tables.") case r: CacheTable => diff --git a/sql/core/src/test/scala/org/apache/spark/sql/execution/command/DDLCommandTestUtils.scala b/sql/core/src/test/scala/org/apache/spark/sql/execution/command/DDLCommandTestUtils.scala index 547fef6aca..f9e26f8277 100644 --- a/sql/core/src/test/scala/org/apache/spark/sql/execution/command/DDLCommandTestUtils.scala +++ b/sql/core/src/test/scala/org/apache/spark/sql/execution/command/DDLCommandTestUtils.scala @@ -17,6 +17,9 @@ package org.apache.spark.sql.execution.command +import java.io.File + +import org.apache.commons.io.FileUtils import org.apache.hadoop.fs.{FileSystem, Path} import org.scalactic.source.Position import org.scalatest.Tag @@ -144,4 +147,26 @@ trait DDLCommandTestUtils extends SQLTestUtils { val fs = root.getFileSystem(spark.sessionState.newHadoopConf()) f(fs, root) } + + def getPartitionLocation(tableName: String, part: String): String = { + val idents = tableName.split('.') + val table = idents.last + val catalogAndNs = idents.init + val in = if (catalogAndNs.isEmpty) "" else s"IN ${catalogAndNs.mkString(".")}" + val information = sql(s"SHOW TABLE EXTENDED $in LIKE '$table' PARTITION ($part)") + .select("information") + .first().getString(0) + information + .split("\\r?\\n") + .filter(_.startsWith("Location:")) + .head + .replace("Location: file:", "") + } + + def copyPartition(tableName: String, from: String, to: String): String = { + val part0Loc = getPartitionLocation(tableName, from) + val part1Loc = part0Loc.replace(from, to) + FileUtils.copyDirectory(new File(part0Loc), new File(part1Loc)) + part1Loc + } } diff --git a/sql/core/src/test/scala/org/apache/spark/sql/execution/command/MsckRepairTableParserSuite.scala b/sql/core/src/test/scala/org/apache/spark/sql/execution/command/MsckRepairTableParserSuite.scala new file mode 100644 index 0000000000..458b3a4fc3 --- /dev/null +++ b/sql/core/src/test/scala/org/apache/spark/sql/execution/command/MsckRepairTableParserSuite.scala @@ -0,0 +1,69 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.spark.sql.execution.command + +import org.apache.spark.sql.catalyst.analysis.{AnalysisTest, UnresolvedTable} +import org.apache.spark.sql.catalyst.parser.CatalystSqlParser.parsePlan +import org.apache.spark.sql.catalyst.plans.logical.RepairTable + +class MsckRepairTableParserSuite extends AnalysisTest { + test("repair a table") { + comparePlans( + parsePlan("MSCK REPAIR TABLE a.b.c"), + RepairTable( + UnresolvedTable(Seq("a", "b", "c"), "MSCK REPAIR TABLE", None), + enableAddPartitions = true, + enableDropPartitions = false)) + } + + test("add partitions") { + comparePlans( + parsePlan("msck repair table ns.tbl add partitions"), + RepairTable( + UnresolvedTable( + Seq("ns", "tbl"), + "MSCK REPAIR TABLE ... ADD PARTITIONS", + None), + enableAddPartitions = true, + enableDropPartitions = false)) + } + + test("drop partitions") { + comparePlans( + parsePlan("MSCK repair table TBL Drop Partitions"), + RepairTable( + UnresolvedTable( + Seq("TBL"), + "MSCK REPAIR TABLE ... DROP PARTITIONS", + None), + enableAddPartitions = false, + enableDropPartitions = true)) + } + + test("sync partitions") { + comparePlans( + parsePlan("MSCK REPAIR TABLE spark_catalog.ns.tbl SYNC PARTITIONS"), + RepairTable( + UnresolvedTable( + Seq("spark_catalog", "ns", "tbl"), + "MSCK REPAIR TABLE ... SYNC PARTITIONS", + None), + enableAddPartitions = true, + enableDropPartitions = true)) + } +} diff --git a/sql/core/src/test/scala/org/apache/spark/sql/execution/command/MsckRepairTableSuiteBase.scala b/sql/core/src/test/scala/org/apache/spark/sql/execution/command/MsckRepairTableSuiteBase.scala new file mode 100644 index 0000000000..b8b0d003a3 --- /dev/null +++ b/sql/core/src/test/scala/org/apache/spark/sql/execution/command/MsckRepairTableSuiteBase.scala @@ -0,0 +1,38 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.spark.sql.execution.command + +import org.apache.spark.sql.QueryTest + +/** + * This base suite contains unified tests for the `MSCK REPAIR TABLE` command that + * check V1 and V2 table catalogs. The tests that cannot run for all supported catalogs are + * located in more specific test suites: + * + * - V2 table catalog tests: + * `org.apache.spark.sql.execution.command.v2.MsckRepairTableSuite` + * - V1 table catalog tests: + * `org.apache.spark.sql.execution.command.v1.MsckRepairTableSuiteBase` + * - V1 In-Memory catalog: + * `org.apache.spark.sql.execution.command.v1.MsckRepairTableSuite` + * - V1 Hive External catalog: + * `org.apache.spark.sql.hive.execution.command.MsckRepairTableSuite` + */ +trait MsckRepairTableSuiteBase extends QueryTest with DDLCommandTestUtils { + override val command = "MSCK REPAIR TABLE" +} diff --git a/sql/core/src/test/scala/org/apache/spark/sql/execution/command/v1/AlterTableAddPartitionSuite.scala b/sql/core/src/test/scala/org/apache/spark/sql/execution/command/v1/AlterTableAddPartitionSuite.scala index 4013f623e0..b2e626be1b 100644 --- a/sql/core/src/test/scala/org/apache/spark/sql/execution/command/v1/AlterTableAddPartitionSuite.scala +++ b/sql/core/src/test/scala/org/apache/spark/sql/execution/command/v1/AlterTableAddPartitionSuite.scala @@ -17,10 +17,6 @@ package org.apache.spark.sql.execution.command.v1 -import java.io.File - -import org.apache.commons.io.FileUtils - import org.apache.spark.sql.{AnalysisException, Row} import org.apache.spark.sql.execution.command import org.apache.spark.sql.internal.SQLConf @@ -47,24 +43,6 @@ trait AlterTableAddPartitionSuiteBase extends command.AlterTableAddPartitionSuit } } - private def copyPartition(tableName: String, from: String, to: String): String = { - val idents = tableName.split('.') - val table = idents.last - val catalogAndNs = idents.init - val in = if (catalogAndNs.isEmpty) "" else s"IN ${catalogAndNs.mkString(".")}" - val information = sql(s"SHOW TABLE EXTENDED $in LIKE '$table' PARTITION ($from)") - .select("information") - .first().getString(0) - val part0Loc = information - .split("\\r?\\n") - .filter(_.startsWith("Location:")) - .head - .replace("Location: file:", "") - val part1Loc = part0Loc.replace(from, to) - FileUtils.copyDirectory(new File(part0Loc), new File(part1Loc)) - part1Loc - } - test("SPARK-34055: refresh cache in partition adding") { withTable("t") { sql(s"CREATE TABLE t (id int, part int) $defaultUsing PARTITIONED BY (part)") diff --git a/sql/core/src/test/scala/org/apache/spark/sql/execution/command/v1/MsckRepairTableSuite.scala b/sql/core/src/test/scala/org/apache/spark/sql/execution/command/v1/MsckRepairTableSuite.scala new file mode 100644 index 0000000000..45dc9e0e00 --- /dev/null +++ b/sql/core/src/test/scala/org/apache/spark/sql/execution/command/v1/MsckRepairTableSuite.scala @@ -0,0 +1,77 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.spark.sql.execution.command.v1 + +import java.io.File + +import org.apache.commons.io.FileUtils + +import org.apache.spark.sql.Row +import org.apache.spark.sql.execution.command + +/** + * This base suite contains unified tests for the `MSCK REPAIR TABLE` command that + * check V1 table catalogs. The tests that cannot run for all V1 catalogs are located in more + * specific test suites: + * + * - V1 In-Memory catalog: + * `org.apache.spark.sql.execution.command.v1.MsckRepairTableSuite` + * - V1 Hive External catalog: + * `org.apache.spark.sql.hive.execution.command.MsckRepairTableSuite` + */ +trait MsckRepairTableSuiteBase extends command.MsckRepairTableSuiteBase { + def deletePartitionDir(tableName: String, part: String): Unit = { + val partLoc = getPartitionLocation(tableName, part) + FileUtils.deleteDirectory(new File(partLoc)) + } + + test("drop partitions") { + withNamespaceAndTable("ns", "tbl") { t => + sql(s"CREATE TABLE $t (col INT, part INT) $defaultUsing PARTITIONED BY (part)") + sql(s"INSERT INTO $t PARTITION (part=0) SELECT 0") + sql(s"INSERT INTO $t PARTITION (part=1) SELECT 1") + + checkAnswer(spark.table(t), Seq(Row(0, 0), Row(1, 1))) + deletePartitionDir(t, "part=1") + sql(s"MSCK REPAIR TABLE $t DROP PARTITIONS") + checkPartitions(t, Map("part" -> "0")) + checkAnswer(spark.table(t), Seq(Row(0, 0))) + } + } + + test("sync partitions") { + withNamespaceAndTable("ns", "tbl") { t => + sql(s"CREATE TABLE $t (col INT, part INT) $defaultUsing PARTITIONED BY (part)") + sql(s"INSERT INTO $t PARTITION (part=0) SELECT 0") + sql(s"INSERT INTO $t PARTITION (part=1) SELECT 1") + + checkAnswer(sql(s"SELECT col, part FROM $t"), Seq(Row(0, 0), Row(1, 1))) + copyPartition(t, "part=0", "part=2") + deletePartitionDir(t, "part=0") + sql(s"MSCK REPAIR TABLE $t SYNC PARTITIONS") + checkPartitions(t, Map("part" -> "1"), Map("part" -> "2")) + checkAnswer(sql(s"SELECT col, part FROM $t"), Seq(Row(1, 1), Row(0, 2))) + } + } +} + +/** + * The class contains tests for the `MSCK REPAIR TABLE` command to check + * V1 In-Memory table catalog. + */ +class MsckRepairTableSuite extends MsckRepairTableSuiteBase with CommandSuiteBase diff --git a/sql/core/src/test/scala/org/apache/spark/sql/execution/command/v2/MsckRepairTableSuite.scala b/sql/core/src/test/scala/org/apache/spark/sql/execution/command/v2/MsckRepairTableSuite.scala new file mode 100644 index 0000000000..d4b23e5078 --- /dev/null +++ b/sql/core/src/test/scala/org/apache/spark/sql/execution/command/v2/MsckRepairTableSuite.scala @@ -0,0 +1,41 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.spark.sql.execution.command.v2 + +import org.apache.spark.sql.AnalysisException +import org.apache.spark.sql.execution.command + +/** + * The class contains tests for the `MSCK REPAIR TABLE` command + * to check V2 table catalogs. + */ +class MsckRepairTableSuite + extends command.MsckRepairTableSuiteBase + with CommandSuiteBase { + + // TODO(SPARK-34397): Support v2 `MSCK REPAIR TABLE` + test("repairing of v2 tables is not supported") { + withNamespaceAndTable("ns", "tbl") { t => + spark.sql(s"CREATE TABLE $t (id bigint, data string) $defaultUsing") + val errMsg = intercept[AnalysisException] { + sql(s"MSCK REPAIR TABLE $t") + }.getMessage + assert(errMsg.contains("MSCK REPAIR TABLE is not supported for v2 tables")) + } + } +} diff --git a/sql/hive/src/test/scala/org/apache/spark/sql/hive/execution/command/MsckRepairTableSuite.scala b/sql/hive/src/test/scala/org/apache/spark/sql/hive/execution/command/MsckRepairTableSuite.scala new file mode 100644 index 0000000000..fc40aa2b82 --- /dev/null +++ b/sql/hive/src/test/scala/org/apache/spark/sql/hive/execution/command/MsckRepairTableSuite.scala @@ -0,0 +1,26 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.spark.sql.hive.execution.command + +import org.apache.spark.sql.execution.command.v1 + +/** + * The class contains tests for the `MSCK REPAIR TABLE` command to check + * V1 Hive external table catalog. + */ +class MsckRepairTableSuite extends v1.MsckRepairTableSuiteBase with CommandSuiteBase