[SPARK-11678][SQL] Partition discovery should stop at the root path of the table.

https://issues.apache.org/jira/browse/SPARK-11678

The change of this PR is to pass root paths of table to the partition discovery logic. So, the process of partition discovery stops at those root paths instead of going all the way to the root path of the file system.

Author: Yin Huai <yhuai@databricks.com>

Closes #9651 from yhuai/SPARK-11678.
This commit is contained in:
Yin Huai 2015-11-13 18:36:56 +08:00 committed by Cheng Lian
parent ec80c0c2fc
commit 7b5d9051cf
10 changed files with 235 additions and 51 deletions

View file

@ -75,10 +75,11 @@ private[sql] object PartitioningUtils {
private[sql] def parsePartitions(
paths: Seq[Path],
defaultPartitionName: String,
typeInference: Boolean): PartitionSpec = {
typeInference: Boolean,
basePaths: Set[Path]): PartitionSpec = {
// First, we need to parse every partition's path and see if we can find partition values.
val (partitionValues, optBasePaths) = paths.map { path =>
parsePartition(path, defaultPartitionName, typeInference)
val (partitionValues, optDiscoveredBasePaths) = paths.map { path =>
parsePartition(path, defaultPartitionName, typeInference, basePaths)
}.unzip
// We create pairs of (path -> path's partition value) here
@ -101,11 +102,15 @@ private[sql] object PartitioningUtils {
// It will be recognised as conflicting directory structure:
// "hdfs://host:9000/invalidPath"
// "hdfs://host:9000/path"
val basePaths = optBasePaths.flatMap(x => x)
val disvoeredBasePaths = optDiscoveredBasePaths.flatMap(x => x)
assert(
basePaths.distinct.size == 1,
disvoeredBasePaths.distinct.size == 1,
"Conflicting directory structures detected. Suspicious paths:\b" +
basePaths.distinct.mkString("\n\t", "\n\t", "\n\n"))
disvoeredBasePaths.distinct.mkString("\n\t", "\n\t", "\n\n") +
"If provided paths are partition directories, please set " +
"\"basePath\" in the options of the data source to specify the " +
"root directory of the table. If there are multiple root directories, " +
"please load them separately and then union them.")
val resolvedPartitionValues = resolvePartitions(pathsWithPartitionValues)
@ -131,7 +136,7 @@ private[sql] object PartitioningUtils {
/**
* Parses a single partition, returns column names and values of each partition column, also
* the base path. For example, given:
* the path when we stop partition discovery. For example, given:
* {{{
* path = hdfs://<host>:<port>/path/to/partition/a=42/b=hello/c=3.14
* }}}
@ -144,40 +149,63 @@ private[sql] object PartitioningUtils {
* Literal.create("hello", StringType),
* Literal.create(3.14, FloatType)))
* }}}
* and the base path:
* and the path when we stop the discovery is:
* {{{
* /path/to/partition
* hdfs://<host>:<port>/path/to/partition
* }}}
*/
private[sql] def parsePartition(
path: Path,
defaultPartitionName: String,
typeInference: Boolean): (Option[PartitionValues], Option[Path]) = {
typeInference: Boolean,
basePaths: Set[Path]): (Option[PartitionValues], Option[Path]) = {
val columns = ArrayBuffer.empty[(String, Literal)]
// Old Hadoop versions don't have `Path.isRoot`
var finished = path.getParent == null
var chopped = path
var basePath = path
// currentPath is the current path that we will use to parse partition column value.
var currentPath: Path = path
while (!finished) {
// Sometimes (e.g., when speculative task is enabled), temporary directories may be left
// uncleaned. Here we simply ignore them.
if (chopped.getName.toLowerCase == "_temporary") {
if (currentPath.getName.toLowerCase == "_temporary") {
return (None, None)
}
val maybeColumn = parsePartitionColumn(chopped.getName, defaultPartitionName, typeInference)
if (basePaths.contains(currentPath)) {
// If the currentPath is one of base paths. We should stop.
finished = true
} else {
// Let's say currentPath is a path of "/table/a=1/", currentPath.getName will give us a=1.
// Once we get the string, we try to parse it and find the partition column and value.
val maybeColumn =
parsePartitionColumn(currentPath.getName, defaultPartitionName, typeInference)
maybeColumn.foreach(columns += _)
basePath = chopped
chopped = chopped.getParent
finished = (maybeColumn.isEmpty && !columns.isEmpty) || chopped.getParent == null
// Now, we determine if we should stop.
// When we hit any of the following cases, we will stop:
// - In this iteration, we could not parse the value of partition column and value,
// i.e. maybeColumn is None, and columns is not empty. At here we check if columns is
// empty to handle cases like /table/a=1/_temporary/something (we need to find a=1 in
// this case).
// - After we get the new currentPath, this new currentPath represent the top level dir
// i.e. currentPath.getParent == null. For the example of "/table/a=1/",
// the top level dir is "/table".
finished =
(maybeColumn.isEmpty && !columns.isEmpty) || currentPath.getParent == null
if (!finished) {
// For the above example, currentPath will be "/table/".
currentPath = currentPath.getParent
}
}
}
if (columns.isEmpty) {
(None, Some(path))
} else {
val (columnNames, values) = columns.reverse.unzip
(Some(PartitionValues(columnNames, values)), Some(basePath))
(Some(PartitionValues(columnNames, values)), Some(currentPath))
}
}

View file

@ -56,13 +56,14 @@ class DefaultSource extends HadoopFsRelationProvider with DataSourceRegister {
val primitivesAsString = parameters.get("primitivesAsString").map(_.toBoolean).getOrElse(false)
new JSONRelation(
None,
samplingRatio,
primitivesAsString,
dataSchema,
None,
partitionColumns,
paths)(sqlContext)
inputRDD = None,
samplingRatio = samplingRatio,
primitivesAsString = primitivesAsString,
maybeDataSchema = dataSchema,
maybePartitionSpec = None,
userDefinedPartitionColumns = partitionColumns,
paths = paths,
parameters = parameters)(sqlContext)
}
}
@ -73,8 +74,10 @@ private[sql] class JSONRelation(
val maybeDataSchema: Option[StructType],
val maybePartitionSpec: Option[PartitionSpec],
override val userDefinedPartitionColumns: Option[StructType],
override val paths: Array[String] = Array.empty[String])(@transient val sqlContext: SQLContext)
extends HadoopFsRelation(maybePartitionSpec) {
override val paths: Array[String] = Array.empty[String],
parameters: Map[String, String] = Map.empty[String, String])
(@transient val sqlContext: SQLContext)
extends HadoopFsRelation(maybePartitionSpec, parameters) {
/** Constraints to be imposed on schema to be stored. */
private def checkConstraints(schema: StructType): Unit = {

View file

@ -109,7 +109,7 @@ private[sql] class ParquetRelation(
override val userDefinedPartitionColumns: Option[StructType],
parameters: Map[String, String])(
val sqlContext: SQLContext)
extends HadoopFsRelation(maybePartitionSpec)
extends HadoopFsRelation(maybePartitionSpec, parameters)
with Logging {
private[sql] def this(

View file

@ -71,9 +71,10 @@ class DefaultSource extends HadoopFsRelationProvider with DataSourceRegister {
private[sql] class TextRelation(
val maybePartitionSpec: Option[PartitionSpec],
override val userDefinedPartitionColumns: Option[StructType],
override val paths: Array[String] = Array.empty[String])
override val paths: Array[String] = Array.empty[String],
parameters: Map[String, String] = Map.empty[String, String])
(@transient val sqlContext: SQLContext)
extends HadoopFsRelation(maybePartitionSpec) {
extends HadoopFsRelation(maybePartitionSpec, parameters) {
/** Data schema is always a single column, named "text". */
override def dataSchema: StructType = new StructType().add("value", StringType)

View file

@ -416,12 +416,19 @@ abstract class OutputWriter {
* @since 1.4.0
*/
@Experimental
abstract class HadoopFsRelation private[sql](maybePartitionSpec: Option[PartitionSpec])
abstract class HadoopFsRelation private[sql](
maybePartitionSpec: Option[PartitionSpec],
parameters: Map[String, String])
extends BaseRelation with FileRelation with Logging {
override def toString: String = getClass.getSimpleName + paths.mkString("[", ",", "]")
def this() = this(None)
def this() = this(None, Map.empty[String, String])
def this(parameters: Map[String, String]) = this(None, parameters)
private[sql] def this(maybePartitionSpec: Option[PartitionSpec]) =
this(maybePartitionSpec, Map.empty[String, String])
private val hadoopConf = new Configuration(sqlContext.sparkContext.hadoopConfiguration)
@ -519,13 +526,37 @@ abstract class HadoopFsRelation private[sql](maybePartitionSpec: Option[Partitio
}
/**
* Base paths of this relation. For partitioned relations, it should be either root directories
* Paths of this relation. For partitioned relations, it should be root directories
* of all partition directories.
*
* @since 1.4.0
*/
def paths: Array[String]
/**
* Contains a set of paths that are considered as the base dirs of the input datasets.
* The partitioning discovery logic will make sure it will stop when it reaches any
* base path. By default, the paths of the dataset provided by users will be base paths.
* For example, if a user uses `sqlContext.read.parquet("/path/something=true/")`, the base path
* will be `/path/something=true/`, and the returned DataFrame will not contain a column of
* `something`. If users want to override the basePath. They can set `basePath` in the options
* to pass the new base path to the data source.
* For the above example, if the user-provided base path is `/path/`, the returned
* DataFrame will have the column of `something`.
*/
private def basePaths: Set[Path] = {
val userDefinedBasePath = parameters.get("basePath").map(basePath => Set(new Path(basePath)))
userDefinedBasePath.getOrElse {
// If the user does not provide basePath, we will just use paths.
val pathSet = paths.toSet
pathSet.map(p => new Path(p))
}.map { hdfsPath =>
// Make the path qualified (consistent with listLeafFiles and listLeafFilesInParallel).
val fs = hdfsPath.getFileSystem(hadoopConf)
hdfsPath.makeQualified(fs.getUri, fs.getWorkingDirectory)
}
}
override def inputFiles: Array[String] = cachedLeafStatuses().map(_.getPath.toString).toArray
override def sizeInBytes: Long = cachedLeafStatuses().map(_.getLen).sum
@ -559,7 +590,10 @@ abstract class HadoopFsRelation private[sql](maybePartitionSpec: Option[Partitio
userDefinedPartitionColumns match {
case Some(userProvidedSchema) if userProvidedSchema.nonEmpty =>
val spec = PartitioningUtils.parsePartitions(
leafDirs, PartitioningUtils.DEFAULT_PARTITION_NAME, typeInference = false)
leafDirs,
PartitioningUtils.DEFAULT_PARTITION_NAME,
typeInference = false,
basePaths = basePaths)
// Without auto inference, all of value in the `row` should be null or in StringType,
// we need to cast into the data type that user specified.
@ -577,8 +611,11 @@ abstract class HadoopFsRelation private[sql](maybePartitionSpec: Option[Partitio
case _ =>
// user did not provide a partitioning schema
PartitioningUtils.parsePartitions(leafDirs, PartitioningUtils.DEFAULT_PARTITION_NAME,
typeInference = sqlContext.conf.partitionColumnTypeInferenceEnabled())
PartitioningUtils.parsePartitions(
leafDirs,
PartitioningUtils.DEFAULT_PARTITION_NAME,
typeInference = sqlContext.conf.partitionColumnTypeInferenceEnabled(),
basePaths = basePaths)
}
}

View file

@ -294,7 +294,7 @@ class ParquetFilterSuite extends QueryTest with ParquetTest with SharedSQLContex
// If the "part = 1" filter gets pushed down, this query will throw an exception since
// "part" is not a valid column in the actual Parquet file
checkAnswer(
sqlContext.read.parquet(path).filter("part = 1"),
sqlContext.read.parquet(dir.getCanonicalPath).filter("part = 1"),
(1 to 3).map(i => Row(i, i.toString, 1)))
}
}
@ -311,7 +311,7 @@ class ParquetFilterSuite extends QueryTest with ParquetTest with SharedSQLContex
// If the "part = 1" filter gets pushed down, this query will throw an exception since
// "part" is not a valid column in the actual Parquet file
checkAnswer(
sqlContext.read.parquet(path).filter("a > 0 and (part = 0 or a > 1)"),
sqlContext.read.parquet(dir.getCanonicalPath).filter("a > 0 and (part = 0 or a > 1)"),
(2 to 3).map(i => Row(i, i.toString, 1)))
}
}

View file

@ -66,7 +66,7 @@ class ParquetPartitionDiscoverySuite extends QueryTest with ParquetTest with Sha
"hdfs://host:9000/path/a=10.5/b=hello")
var exception = intercept[AssertionError] {
parsePartitions(paths.map(new Path(_)), defaultPartitionName, true)
parsePartitions(paths.map(new Path(_)), defaultPartitionName, true, Set.empty[Path])
}
assert(exception.getMessage().contains("Conflicting directory structures detected"))
@ -76,7 +76,37 @@ class ParquetPartitionDiscoverySuite extends QueryTest with ParquetTest with Sha
"hdfs://host:9000/path/a=10/b=20",
"hdfs://host:9000/path/_temporary/path")
parsePartitions(paths.map(new Path(_)), defaultPartitionName, true)
parsePartitions(
paths.map(new Path(_)),
defaultPartitionName,
true,
Set(new Path("hdfs://host:9000/path/")))
// Valid
paths = Seq(
"hdfs://host:9000/path/something=true/table/",
"hdfs://host:9000/path/something=true/table/_temporary",
"hdfs://host:9000/path/something=true/table/a=10/b=20",
"hdfs://host:9000/path/something=true/table/_temporary/path")
parsePartitions(
paths.map(new Path(_)),
defaultPartitionName,
true,
Set(new Path("hdfs://host:9000/path/something=true/table")))
// Valid
paths = Seq(
"hdfs://host:9000/path/table=true/",
"hdfs://host:9000/path/table=true/_temporary",
"hdfs://host:9000/path/table=true/a=10/b=20",
"hdfs://host:9000/path/table=true/_temporary/path")
parsePartitions(
paths.map(new Path(_)),
defaultPartitionName,
true,
Set(new Path("hdfs://host:9000/path/table=true")))
// Invalid
paths = Seq(
@ -85,7 +115,11 @@ class ParquetPartitionDiscoverySuite extends QueryTest with ParquetTest with Sha
"hdfs://host:9000/path/path1")
exception = intercept[AssertionError] {
parsePartitions(paths.map(new Path(_)), defaultPartitionName, true)
parsePartitions(
paths.map(new Path(_)),
defaultPartitionName,
true,
Set(new Path("hdfs://host:9000/path/")))
}
assert(exception.getMessage().contains("Conflicting directory structures detected"))
@ -101,19 +135,24 @@ class ParquetPartitionDiscoverySuite extends QueryTest with ParquetTest with Sha
"hdfs://host:9000/tmp/tables/nonPartitionedTable2")
exception = intercept[AssertionError] {
parsePartitions(paths.map(new Path(_)), defaultPartitionName, true)
parsePartitions(
paths.map(new Path(_)),
defaultPartitionName,
true,
Set(new Path("hdfs://host:9000/tmp/tables/")))
}
assert(exception.getMessage().contains("Conflicting directory structures detected"))
}
test("parse partition") {
def check(path: String, expected: Option[PartitionValues]): Unit = {
assert(expected === parsePartition(new Path(path), defaultPartitionName, true)._1)
val actual = parsePartition(new Path(path), defaultPartitionName, true, Set.empty[Path])._1
assert(expected === actual)
}
def checkThrows[T <: Throwable: Manifest](path: String, expected: String): Unit = {
val message = intercept[T] {
parsePartition(new Path(path), defaultPartitionName, true)
parsePartition(new Path(path), defaultPartitionName, true, Set.empty[Path])
}.getMessage
assert(message.contains(expected))
@ -152,8 +191,17 @@ class ParquetPartitionDiscoverySuite extends QueryTest with ParquetTest with Sha
}
test("parse partitions") {
def check(paths: Seq[String], spec: PartitionSpec): Unit = {
assert(parsePartitions(paths.map(new Path(_)), defaultPartitionName, true) === spec)
def check(
paths: Seq[String],
spec: PartitionSpec,
rootPaths: Set[Path] = Set.empty[Path]): Unit = {
val actualSpec =
parsePartitions(
paths.map(new Path(_)),
defaultPartitionName,
true,
rootPaths)
assert(actualSpec === spec)
}
check(Seq(
@ -232,7 +280,9 @@ class ParquetPartitionDiscoverySuite extends QueryTest with ParquetTest with Sha
test("parse partitions with type inference disabled") {
def check(paths: Seq[String], spec: PartitionSpec): Unit = {
assert(parsePartitions(paths.map(new Path(_)), defaultPartitionName, false) === spec)
val actualSpec =
parsePartitions(paths.map(new Path(_)), defaultPartitionName, false, Set.empty[Path])
assert(actualSpec === spec)
}
check(Seq(
@ -590,6 +640,70 @@ class ParquetPartitionDiscoverySuite extends QueryTest with ParquetTest with Sha
}
}
test("SPARK-11678: Partition discovery stops at the root path of the dataset") {
withTempPath { dir =>
val tablePath = new File(dir, "key=value")
val df = (1 to 3).map(i => (i, i, i, i)).toDF("a", "b", "c", "d")
df.write
.format("parquet")
.partitionBy("b", "c", "d")
.save(tablePath.getCanonicalPath)
Files.touch(new File(s"${tablePath.getCanonicalPath}/", "_SUCCESS"))
Files.createParentDirs(new File(s"${dir.getCanonicalPath}/b=1/c=1/.foo/bar"))
checkAnswer(sqlContext.read.format("parquet").load(tablePath.getCanonicalPath), df)
}
withTempPath { dir =>
val path = new File(dir, "key=value")
val tablePath = new File(path, "table")
val df = (1 to 3).map(i => (i, i, i, i)).toDF("a", "b", "c", "d")
df.write
.format("parquet")
.partitionBy("b", "c", "d")
.save(tablePath.getCanonicalPath)
Files.touch(new File(s"${tablePath.getCanonicalPath}/", "_SUCCESS"))
Files.createParentDirs(new File(s"${dir.getCanonicalPath}/b=1/c=1/.foo/bar"))
checkAnswer(sqlContext.read.format("parquet").load(tablePath.getCanonicalPath), df)
}
}
test("use basePath to specify the root dir of a partitioned table.") {
withTempPath { dir =>
val tablePath = new File(dir, "table")
val df = (1 to 3).map(i => (i, i, i, i)).toDF("a", "b", "c", "d")
df.write
.format("parquet")
.partitionBy("b", "c", "d")
.save(tablePath.getCanonicalPath)
val twoPartitionsDF =
sqlContext
.read
.option("basePath", tablePath.getCanonicalPath)
.parquet(
s"${tablePath.getCanonicalPath}/b=1",
s"${tablePath.getCanonicalPath}/b=2")
checkAnswer(twoPartitionsDF, df.filter("b != 3"))
intercept[AssertionError] {
sqlContext
.read
.parquet(
s"${tablePath.getCanonicalPath}/b=1",
s"${tablePath.getCanonicalPath}/b=2")
}
}
}
test("listConflictingPartitionColumns") {
def makeExpectedMessage(colNameLists: Seq[String], paths: Seq[String]): String = {
val conflictingColNameLists = colNameLists.zipWithIndex.map { case (list, index) =>

View file

@ -157,7 +157,7 @@ private[sql] class OrcRelation(
override val userDefinedPartitionColumns: Option[StructType],
parameters: Map[String, String])(
@transient val sqlContext: SQLContext)
extends HadoopFsRelation(maybePartitionSpec)
extends HadoopFsRelation(maybePartitionSpec, parameters)
with Logging {
private[sql] def this(

View file

@ -89,7 +89,7 @@ class SimpleTextRelation(
override val userDefinedPartitionColumns: Option[StructType],
parameters: Map[String, String])(
@transient val sqlContext: SQLContext)
extends HadoopFsRelation {
extends HadoopFsRelation(parameters) {
import sqlContext.sparkContext

View file

@ -486,6 +486,7 @@ abstract class HadoopFsRelationTest extends QueryTest with SQLTestUtils with Tes
val df = sqlContext.read
.format(dataSourceName)
.option("dataSchema", dataSchema.json)
.option("basePath", file.getCanonicalPath)
.load(s"${file.getCanonicalPath}/p1=*/p2=???")
val expectedPaths = Set(