[SPARK-10304] [SQL] Partition discovery should throw an exception if the dir structure is invalid

JIRA: https://issues.apache.org/jira/browse/SPARK-10304

This patch detects if the structure of partition directories is not valid.

The test cases are from #8547. Thanks zhzhan.

cc liancheng

Author: Liang-Chi Hsieh <viirya@appier.com>

Closes #8840 from viirya/detect_invalid_part_dir.
This commit is contained in:
Liang-Chi Hsieh 2015-11-03 07:41:50 -08:00 committed by Davies Liu
parent 57446eb69c
commit d6035d97c9
2 changed files with 59 additions and 13 deletions

View file

@ -77,9 +77,11 @@ private[sql] object PartitioningUtils {
defaultPartitionName: String, defaultPartitionName: String,
typeInference: Boolean): PartitionSpec = { typeInference: Boolean): PartitionSpec = {
// First, we need to parse every partition's path and see if we can find partition values. // First, we need to parse every partition's path and see if we can find partition values.
val pathsWithPartitionValues = paths.flatMap { path => val (partitionValues, optBasePaths) = paths.map { path =>
parsePartition(path, defaultPartitionName, typeInference).map(path -> _) parsePartition(path, defaultPartitionName, typeInference)
} }.unzip
val pathsWithPartitionValues = paths.zip(partitionValues).flatMap(x => x._2.map(x._1 -> _))
if (pathsWithPartitionValues.isEmpty) { if (pathsWithPartitionValues.isEmpty) {
// This dataset is not partitioned. // This dataset is not partitioned.
@ -87,6 +89,12 @@ private[sql] object PartitioningUtils {
} else { } else {
// This dataset is partitioned. We need to check whether all partitions have the same // This dataset is partitioned. We need to check whether all partitions have the same
// partition columns and resolve potential type conflicts. // partition columns and resolve potential type conflicts.
val basePaths = optBasePaths.flatMap(x => x)
assert(
basePaths.distinct.size == 1,
"Conflicting directory structures detected. Suspicious paths:\b" +
basePaths.mkString("\n\t", "\n\t", "\n\n"))
val resolvedPartitionValues = resolvePartitions(pathsWithPartitionValues) val resolvedPartitionValues = resolvePartitions(pathsWithPartitionValues)
// Creates the StructType which represents the partition columns. // Creates the StructType which represents the partition columns.
@ -110,12 +118,12 @@ private[sql] object PartitioningUtils {
} }
/** /**
* Parses a single partition, returns column names and values of each partition column. For * Parses a single partition, returns column names and values of each partition column, also
* example, given: * the base path. For example, given:
* {{{ * {{{
* path = hdfs://<host>:<port>/path/to/partition/a=42/b=hello/c=3.14 * path = hdfs://<host>:<port>/path/to/partition/a=42/b=hello/c=3.14
* }}} * }}}
* it returns: * it returns the partition:
* {{{ * {{{
* PartitionValues( * PartitionValues(
* Seq("a", "b", "c"), * Seq("a", "b", "c"),
@ -124,34 +132,40 @@ private[sql] object PartitioningUtils {
* Literal.create("hello", StringType), * Literal.create("hello", StringType),
* Literal.create(3.14, FloatType))) * Literal.create(3.14, FloatType)))
* }}} * }}}
* and the base path:
* {{{
* /path/to/partition
* }}}
*/ */
private[sql] def parsePartition( private[sql] def parsePartition(
path: Path, path: Path,
defaultPartitionName: String, defaultPartitionName: String,
typeInference: Boolean): Option[PartitionValues] = { typeInference: Boolean): (Option[PartitionValues], Option[Path]) = {
val columns = ArrayBuffer.empty[(String, Literal)] val columns = ArrayBuffer.empty[(String, Literal)]
// Old Hadoop versions don't have `Path.isRoot` // Old Hadoop versions don't have `Path.isRoot`
var finished = path.getParent == null var finished = path.getParent == null
var chopped = path var chopped = path
var basePath = path
while (!finished) { while (!finished) {
// Sometimes (e.g., when speculative task is enabled), temporary directories may be left // Sometimes (e.g., when speculative task is enabled), temporary directories may be left
// uncleaned. Here we simply ignore them. // uncleaned. Here we simply ignore them.
if (chopped.getName.toLowerCase == "_temporary") { if (chopped.getName.toLowerCase == "_temporary") {
return None return (None, None)
} }
val maybeColumn = parsePartitionColumn(chopped.getName, defaultPartitionName, typeInference) val maybeColumn = parsePartitionColumn(chopped.getName, defaultPartitionName, typeInference)
maybeColumn.foreach(columns += _) maybeColumn.foreach(columns += _)
basePath = chopped
chopped = chopped.getParent chopped = chopped.getParent
finished = maybeColumn.isEmpty || chopped.getParent == null finished = (maybeColumn.isEmpty && !columns.isEmpty) || chopped.getParent == null
} }
if (columns.isEmpty) { if (columns.isEmpty) {
None (None, Some(path))
} else { } else {
val (columnNames, values) = columns.reverse.unzip val (columnNames, values) = columns.reverse.unzip
Some(PartitionValues(columnNames, values)) (Some(PartitionValues(columnNames, values)), Some(basePath))
} }
} }

View file

@ -58,14 +58,46 @@ class ParquetPartitionDiscoverySuite extends QueryTest with ParquetTest with Sha
check(defaultPartitionName, Literal.create(null, NullType)) check(defaultPartitionName, Literal.create(null, NullType))
} }
test("parse invalid partitioned directories") {
// Invalid
var paths = Seq(
"hdfs://host:9000/invalidPath",
"hdfs://host:9000/path/a=10/b=20",
"hdfs://host:9000/path/a=10.5/b=hello")
var exception = intercept[AssertionError] {
parsePartitions(paths.map(new Path(_)), defaultPartitionName, true)
}
assert(exception.getMessage().contains("Conflicting directory structures detected"))
// Valid
paths = Seq(
"hdfs://host:9000/path/_temporary",
"hdfs://host:9000/path/a=10/b=20",
"hdfs://host:9000/path/_temporary/path")
parsePartitions(paths.map(new Path(_)), defaultPartitionName, true)
// Invalid
paths = Seq(
"hdfs://host:9000/path/_temporary",
"hdfs://host:9000/path/a=10/b=20",
"hdfs://host:9000/path/path1")
exception = intercept[AssertionError] {
parsePartitions(paths.map(new Path(_)), defaultPartitionName, true)
}
assert(exception.getMessage().contains("Conflicting directory structures detected"))
}
test("parse partition") { test("parse partition") {
def check(path: String, expected: Option[PartitionValues]): Unit = { def check(path: String, expected: Option[PartitionValues]): Unit = {
assert(expected === parsePartition(new Path(path), defaultPartitionName, true)) assert(expected === parsePartition(new Path(path), defaultPartitionName, true)._1)
} }
def checkThrows[T <: Throwable: Manifest](path: String, expected: String): Unit = { def checkThrows[T <: Throwable: Manifest](path: String, expected: String): Unit = {
val message = intercept[T] { val message = intercept[T] {
parsePartition(new Path(path), defaultPartitionName, true).get parsePartition(new Path(path), defaultPartitionName, true)
}.getMessage }.getMessage
assert(message.contains(expected)) assert(message.contains(expected))