[SPARK-15895][SQL] Filters out metadata files while doing partition discovery

## What changes were proposed in this pull request?

Take the following directory layout as an example:

```
dir/
+- p0=0/
   |-_metadata
   +- p1=0/
      |-part-00001.parquet
      |-part-00002.parquet
      |-...
```

The `_metadata` file under `p0=0` shouldn't fail partition discovery.

This PR filters output all metadata files whose names start with `_` while doing partition discovery.

## How was this patch tested?

New unit test added in `ParquetPartitionDiscoverySuite`.

Author: Cheng Lian <lian@databricks.com>

Closes #13623 from liancheng/spark-15895-partition-disco-no-metafiles.
This commit is contained in:
Cheng Lian 2016-06-14 12:13:12 -07:00 committed by Yin Huai
parent df4ea6614d
commit bd39ffe35c
3 changed files with 60 additions and 6 deletions

View file

@ -20,7 +20,7 @@ package org.apache.spark.sql.execution.datasources
import scala.collection.mutable
import scala.util.Try
import org.apache.hadoop.fs.{FileStatus, LocatedFileStatus, Path}
import org.apache.hadoop.fs.{FileStatus, Path}
import org.apache.hadoop.mapred.{FileInputFormat, JobConf}
import org.apache.spark.sql.SparkSession
@ -83,8 +83,9 @@ class ListingFileCatalog(
val statuses: Seq[FileStatus] = paths.flatMap { path =>
val fs = path.getFileSystem(hadoopConf)
logInfo(s"Listing $path on driver")
Try(HadoopFsRelation.listLeafFiles(fs, fs.getFileStatus(path), pathFilter)).
getOrElse(Array.empty)
Try {
HadoopFsRelation.listLeafFiles(fs, fs.getFileStatus(path), pathFilter)
}.getOrElse(Array.empty[FileStatus])
}
mutable.LinkedHashSet(statuses: _*)
}

View file

@ -50,14 +50,14 @@ abstract class PartitioningAwareFileCatalog(
override def listFiles(filters: Seq[Expression]): Seq[Partition] = {
val selectedPartitions = if (partitionSpec().partitionColumns.isEmpty) {
Partition(InternalRow.empty, allFiles().filterNot(_.getPath.getName startsWith "_")) :: Nil
Partition(InternalRow.empty, allFiles().filter(f => isDataPath(f.getPath))) :: Nil
} else {
prunePartitions(filters, partitionSpec()).map {
case PartitionDirectory(values, path) =>
val files: Seq[FileStatus] = leafDirToChildrenFiles.get(path) match {
case Some(existingDir) =>
// Directory has children files in it, return them
existingDir.filterNot(_.getPath.getName.startsWith("_"))
existingDir.filter(f => isDataPath(f.getPath))
case None =>
// Directory does not exist, or has no children files
@ -96,7 +96,11 @@ abstract class PartitioningAwareFileCatalog(
protected def inferPartitioning(): PartitionSpec = {
// We use leaf dirs containing data files to discover the schema.
val leafDirs = leafDirToChildrenFiles.keys.toSeq
val leafDirs = leafDirToChildrenFiles.filter { case (_, files) =>
// SPARK-15895: Metadata files (e.g. Parquet summary files) and temporary files should not be
// counted as data files, so that they shouldn't participate partition discovery.
files.exists(f => isDataPath(f.getPath))
}.keys.toSeq
partitionSchema match {
case Some(userProvidedSchema) if userProvidedSchema.nonEmpty =>
val spec = PartitioningUtils.parsePartitions(
@ -197,4 +201,9 @@ abstract class PartitioningAwareFileCatalog(
if (leafFiles.contains(qualifiedPath)) qualifiedPath.getParent else qualifiedPath }.toSet
}
}
private def isDataPath(path: Path): Boolean = {
val name = path.getName
!(name.startsWith("_") || name.startsWith("."))
}
}

View file

@ -25,11 +25,13 @@ import scala.collection.mutable.ArrayBuffer
import com.google.common.io.Files
import org.apache.hadoop.fs.Path
import org.apache.parquet.hadoop.ParquetOutputFormat
import org.apache.spark.sql._
import org.apache.spark.sql.catalyst.InternalRow
import org.apache.spark.sql.catalyst.expressions.Literal
import org.apache.spark.sql.execution.datasources.{HadoopFsRelation, LogicalRelation, PartitionDirectory => Partition, PartitioningUtils, PartitionSpec}
import org.apache.spark.sql.functions._
import org.apache.spark.sql.internal.SQLConf
import org.apache.spark.sql.test.SharedSQLContext
import org.apache.spark.sql.types._
@ -890,4 +892,46 @@ class ParquetPartitionDiscoverySuite extends QueryTest with ParquetTest with Sha
}
}
}
test("SPARK-15895 summary files in non-leaf partition directories") {
withTempPath { dir =>
val path = dir.getCanonicalPath
withSQLConf(ParquetOutputFormat.ENABLE_JOB_SUMMARY -> "true") {
spark.range(3).write.parquet(s"$path/p0=0/p1=0")
}
val p0 = new File(path, "p0=0")
val p1 = new File(p0, "p1=0")
// Builds the following directory layout by:
//
// 1. copying Parquet summary files we just wrote into `p0=0`, and
// 2. touching a dot-file `.dummy` under `p0=0`.
//
// <base>
// +- p0=0
// |- _metadata
// |- _common_metadata
// |- .dummy
// +- p1=0
// |- _metadata
// |- _common_metadata
// |- part-00000.parquet
// |- part-00001.parquet
// +- ...
//
// The summary files and the dot-file under `p0=0` should not fail partition discovery.
Files.copy(new File(p1, "_metadata"), new File(p0, "_metadata"))
Files.copy(new File(p1, "_common_metadata"), new File(p0, "_common_metadata"))
Files.touch(new File(p0, ".dummy"))
checkAnswer(spark.read.parquet(s"$path"), Seq(
Row(0, 0, 0),
Row(1, 0, 0),
Row(2, 0, 0)
))
}
}
}