[SPARK-25240][SQL] Fix for a deadlock in RECOVER PARTITIONS

## What changes were proposed in this pull request?

In the PR, I propose to not perform recursive parallel listening of files in the `scanPartitions` method because it can cause a deadlock. Instead of that I propose to do `scanPartitions` in parallel for top level partitions only.

## How was this patch tested?

I extended an existing test to trigger the deadlock.

Author: Maxim Gekk <maxim.gekk@databricks.com>

Closes #22233 from MaxGekk/fix-recover-partitions.
This commit is contained in:
Maxim Gekk 2018-08-28 11:29:05 -07:00 committed by Xiao Li
parent 4e3f3cebe4
commit aff8f15c15
3 changed files with 61 additions and 47 deletions

View file

@ -20,7 +20,7 @@ package org.apache.spark.sql.execution.command
import java.util.Locale
import scala.collection.{GenMap, GenSeq}
import scala.concurrent.ExecutionContext
import scala.collection.parallel.ForkJoinTaskSupport
import scala.util.control.NonFatal
import org.apache.hadoop.conf.Configuration
@ -29,7 +29,7 @@ import org.apache.hadoop.mapred.{FileInputFormat, JobConf}
import org.apache.spark.sql.{AnalysisException, Row, SparkSession}
import org.apache.spark.sql.catalyst.TableIdentifier
import org.apache.spark.sql.catalyst.analysis.Resolver
import org.apache.spark.sql.catalyst.analysis.{NoSuchTableException, Resolver}
import org.apache.spark.sql.catalyst.catalog._
import org.apache.spark.sql.catalyst.catalog.CatalogTypes.TablePartitionSpec
import org.apache.spark.sql.catalyst.expressions.{Attribute, AttributeReference}
@ -40,7 +40,6 @@ import org.apache.spark.sql.execution.datasources.parquet.ParquetSchemaConverter
import org.apache.spark.sql.internal.HiveSerDe
import org.apache.spark.sql.types._
import org.apache.spark.util.{SerializableConfiguration, ThreadUtils}
import org.apache.spark.util.ThreadUtils.parmap
// Note: The definition of these commands are based on the ones described in
// https://cwiki.apache.org/confluence/display/Hive/LanguageManual+DDL
@ -622,9 +621,8 @@ case class AlterTableRecoverPartitionsCommand(
val evalPool = ThreadUtils.newForkJoinPool("AlterTableRecoverPartitionsCommand", 8)
val partitionSpecsAndLocs: Seq[(TablePartitionSpec, Path)] =
try {
implicit val ec = ExecutionContext.fromExecutor(evalPool)
scanPartitions(spark, fs, pathFilter, root, Map(), table.partitionColumnNames, threshold,
spark.sessionState.conf.resolver)
spark.sessionState.conf.resolver, new ForkJoinTaskSupport(evalPool)).seq
} finally {
evalPool.shutdown()
}
@ -656,13 +654,23 @@ case class AlterTableRecoverPartitionsCommand(
spec: TablePartitionSpec,
partitionNames: Seq[String],
threshold: Int,
resolver: Resolver)(implicit ec: ExecutionContext): Seq[(TablePartitionSpec, Path)] = {
resolver: Resolver,
evalTaskSupport: ForkJoinTaskSupport): GenSeq[(TablePartitionSpec, Path)] = {
if (partitionNames.isEmpty) {
return Seq(spec -> path)
}
val statuses = fs.listStatus(path, filter).toSeq
def handleStatus(st: FileStatus): Seq[(TablePartitionSpec, Path)] = {
val statuses = fs.listStatus(path, filter)
val statusPar: GenSeq[FileStatus] =
if (partitionNames.length > 1 && statuses.length > threshold || partitionNames.length > 2) {
// parallelize the list of partitions here, then we can have better parallelism later.
val parArray = statuses.par
parArray.tasksupport = evalTaskSupport
parArray
} else {
statuses
}
statusPar.flatMap { st =>
val name = st.getPath.getName
if (st.isDirectory && name.contains("=")) {
val ps = name.split("=", 2)
@ -671,7 +679,7 @@ case class AlterTableRecoverPartitionsCommand(
val value = ExternalCatalogUtils.unescapePathName(ps(1))
if (resolver(columnName, partitionNames.head)) {
scanPartitions(spark, fs, filter, st.getPath, spec ++ Map(partitionNames.head -> value),
partitionNames.drop(1), threshold, resolver)
partitionNames.drop(1), threshold, resolver, evalTaskSupport)
} else {
logWarning(
s"expected partition column ${partitionNames.head}, but got ${ps(0)}, ignoring it")
@ -682,14 +690,6 @@ case class AlterTableRecoverPartitionsCommand(
Seq.empty
}
}
val result = if (partitionNames.length > 1 &&
statuses.length > threshold || partitionNames.length > 2) {
parmap(statuses)(handleStatus _)
} else {
statuses.map(handleStatus)
}
result.flatten
}
private def gatherPartitionStats(

View file

@ -52,23 +52,24 @@ class InMemoryCatalogedDDLSuite extends DDLSuite with SharedSQLContext with Befo
protected override def generateTable(
catalog: SessionCatalog,
name: TableIdentifier,
isDataSource: Boolean = true): CatalogTable = {
isDataSource: Boolean = true,
partitionCols: Seq[String] = Seq("a", "b")): CatalogTable = {
val storage =
CatalogStorageFormat.empty.copy(locationUri = Some(catalog.defaultTablePath(name)))
val metadata = new MetadataBuilder()
.putString("key", "value")
.build()
val schema = new StructType()
.add("col1", "int", nullable = true, metadata = metadata)
.add("col2", "string")
CatalogTable(
identifier = name,
tableType = CatalogTableType.EXTERNAL,
storage = storage,
schema = new StructType()
.add("col1", "int", nullable = true, metadata = metadata)
.add("col2", "string")
.add("a", "int")
.add("b", "int"),
schema = schema.copy(
fields = schema.fields ++ partitionCols.map(StructField(_, IntegerType))),
provider = Some("parquet"),
partitionColumnNames = Seq("a", "b"),
partitionColumnNames = partitionCols,
createTime = 0L,
createVersion = org.apache.spark.SPARK_VERSION,
tracksPartitionsInCatalog = true)
@ -176,7 +177,8 @@ abstract class DDLSuite extends QueryTest with SQLTestUtils {
protected def generateTable(
catalog: SessionCatalog,
name: TableIdentifier,
isDataSource: Boolean = true): CatalogTable
isDataSource: Boolean = true,
partitionCols: Seq[String] = Seq("a", "b")): CatalogTable
private val escapedIdentifier = "`(.+)`".r
@ -228,8 +230,10 @@ abstract class DDLSuite extends QueryTest with SQLTestUtils {
private def createTable(
catalog: SessionCatalog,
name: TableIdentifier,
isDataSource: Boolean = true): Unit = {
catalog.createTable(generateTable(catalog, name, isDataSource), ignoreIfExists = false)
isDataSource: Boolean = true,
partitionCols: Seq[String] = Seq("a", "b")): Unit = {
catalog.createTable(
generateTable(catalog, name, isDataSource, partitionCols), ignoreIfExists = false)
}
private def createTablePartition(
@ -1131,7 +1135,7 @@ abstract class DDLSuite extends QueryTest with SQLTestUtils {
}
test("alter table: recover partition (parallel)") {
withSQLConf("spark.rdd.parallelListingThreshold" -> "1") {
withSQLConf("spark.rdd.parallelListingThreshold" -> "0") {
testRecoverPartitions()
}
}
@ -1144,23 +1148,32 @@ abstract class DDLSuite extends QueryTest with SQLTestUtils {
}
val tableIdent = TableIdentifier("tab1")
createTable(catalog, tableIdent)
val part1 = Map("a" -> "1", "b" -> "5")
createTable(catalog, tableIdent, partitionCols = Seq("a", "b", "c"))
val part1 = Map("a" -> "1", "b" -> "5", "c" -> "19")
createTablePartition(catalog, part1, tableIdent)
assert(catalog.listPartitions(tableIdent).map(_.spec).toSet == Set(part1))
val part2 = Map("a" -> "2", "b" -> "6")
val part2 = Map("a" -> "2", "b" -> "6", "c" -> "31")
val root = new Path(catalog.getTableMetadata(tableIdent).location)
val fs = root.getFileSystem(spark.sessionState.newHadoopConf())
// valid
fs.mkdirs(new Path(new Path(root, "a=1"), "b=5"))
fs.createNewFile(new Path(new Path(root, "a=1/b=5"), "a.csv")) // file
fs.createNewFile(new Path(new Path(root, "a=1/b=5"), "_SUCCESS")) // file
fs.mkdirs(new Path(new Path(root, "A=2"), "B=6"))
fs.createNewFile(new Path(new Path(root, "A=2/B=6"), "b.csv")) // file
fs.createNewFile(new Path(new Path(root, "A=2/B=6"), "c.csv")) // file
fs.createNewFile(new Path(new Path(root, "A=2/B=6"), ".hiddenFile")) // file
fs.mkdirs(new Path(new Path(root, "A=2/B=6"), "_temporary"))
fs.mkdirs(new Path(new Path(new Path(root, "a=1"), "b=5"), "c=19"))
fs.createNewFile(new Path(new Path(root, "a=1/b=5/c=19"), "a.csv")) // file
fs.createNewFile(new Path(new Path(root, "a=1/b=5/c=19"), "_SUCCESS")) // file
fs.mkdirs(new Path(new Path(new Path(root, "A=2"), "B=6"), "C=31"))
fs.createNewFile(new Path(new Path(root, "A=2/B=6/C=31"), "b.csv")) // file
fs.createNewFile(new Path(new Path(root, "A=2/B=6/C=31"), "c.csv")) // file
fs.createNewFile(new Path(new Path(root, "A=2/B=6/C=31"), ".hiddenFile")) // file
fs.mkdirs(new Path(new Path(root, "A=2/B=6/C=31"), "_temporary"))
val parts = (10 to 100).map { a =>
val part = Map("a" -> a.toString, "b" -> "5", "c" -> "42")
fs.mkdirs(new Path(new Path(new Path(root, s"a=$a"), "b=5"), "c=42"))
fs.createNewFile(new Path(new Path(root, s"a=$a/b=5/c=42"), "a.csv")) // file
createTablePartition(catalog, part, tableIdent)
part
}
// invalid
fs.mkdirs(new Path(new Path(root, "a"), "b")) // bad name
@ -1174,7 +1187,7 @@ abstract class DDLSuite extends QueryTest with SQLTestUtils {
try {
sql("ALTER TABLE tab1 RECOVER PARTITIONS")
assert(catalog.listPartitions(tableIdent).map(_.spec).toSet ==
Set(part1, part2))
Set(part1, part2) ++ parts)
if (!isUsingHiveMetastore) {
assert(catalog.getPartition(tableIdent, part1).parameters("numFiles") == "1")
assert(catalog.getPartition(tableIdent, part2).parameters("numFiles") == "2")

View file

@ -60,7 +60,8 @@ class HiveCatalogedDDLSuite extends DDLSuite with TestHiveSingleton with BeforeA
protected override def generateTable(
catalog: SessionCatalog,
name: TableIdentifier,
isDataSource: Boolean): CatalogTable = {
isDataSource: Boolean,
partitionCols: Seq[String] = Seq("a", "b")): CatalogTable = {
val storage =
if (isDataSource) {
val serde = HiveSerDe.sourceToSerDe("parquet")
@ -84,17 +85,17 @@ class HiveCatalogedDDLSuite extends DDLSuite with TestHiveSingleton with BeforeA
val metadata = new MetadataBuilder()
.putString("key", "value")
.build()
val schema = new StructType()
.add("col1", "int", nullable = true, metadata = metadata)
.add("col2", "string")
CatalogTable(
identifier = name,
tableType = CatalogTableType.EXTERNAL,
storage = storage,
schema = new StructType()
.add("col1", "int", nullable = true, metadata = metadata)
.add("col2", "string")
.add("a", "int")
.add("b", "int"),
schema = schema.copy(
fields = schema.fields ++ partitionCols.map(StructField(_, IntegerType))),
provider = if (isDataSource) Some("parquet") else Some("hive"),
partitionColumnNames = Seq("a", "b"),
partitionColumnNames = partitionCols,
createTime = 0L,
createVersion = org.apache.spark.SPARK_VERSION,
tracksPartitionsInCatalog = true)