diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/ListingFileCatalog.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/ListingFileCatalog.scala index 7d2854aaad..d96cf1bf07 100644 --- a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/ListingFileCatalog.scala +++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/ListingFileCatalog.scala @@ -20,7 +20,7 @@ package org.apache.spark.sql.execution.datasources import scala.collection.mutable import scala.util.Try -import org.apache.hadoop.fs.{FileStatus, LocatedFileStatus, Path} +import org.apache.hadoop.fs.{FileStatus, Path} import org.apache.hadoop.mapred.{FileInputFormat, JobConf} import org.apache.spark.sql.SparkSession @@ -83,8 +83,9 @@ class ListingFileCatalog( val statuses: Seq[FileStatus] = paths.flatMap { path => val fs = path.getFileSystem(hadoopConf) logInfo(s"Listing $path on driver") - Try(HadoopFsRelation.listLeafFiles(fs, fs.getFileStatus(path), pathFilter)). - getOrElse(Array.empty) + Try { + HadoopFsRelation.listLeafFiles(fs, fs.getFileStatus(path), pathFilter) + }.getOrElse(Array.empty[FileStatus]) } mutable.LinkedHashSet(statuses: _*) } diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/PartitioningAwareFileCatalog.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/PartitioningAwareFileCatalog.scala index 406d2e8e81..811e96c99a 100644 --- a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/PartitioningAwareFileCatalog.scala +++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/PartitioningAwareFileCatalog.scala @@ -50,14 +50,14 @@ abstract class PartitioningAwareFileCatalog( override def listFiles(filters: Seq[Expression]): Seq[Partition] = { val selectedPartitions = if (partitionSpec().partitionColumns.isEmpty) { - Partition(InternalRow.empty, allFiles().filterNot(_.getPath.getName startsWith "_")) :: Nil + Partition(InternalRow.empty, allFiles().filter(f => isDataPath(f.getPath))) :: Nil } else { prunePartitions(filters, partitionSpec()).map { case PartitionDirectory(values, path) => val files: Seq[FileStatus] = leafDirToChildrenFiles.get(path) match { case Some(existingDir) => // Directory has children files in it, return them - existingDir.filterNot(_.getPath.getName.startsWith("_")) + existingDir.filter(f => isDataPath(f.getPath)) case None => // Directory does not exist, or has no children files @@ -96,7 +96,11 @@ abstract class PartitioningAwareFileCatalog( protected def inferPartitioning(): PartitionSpec = { // We use leaf dirs containing data files to discover the schema. - val leafDirs = leafDirToChildrenFiles.keys.toSeq + val leafDirs = leafDirToChildrenFiles.filter { case (_, files) => + // SPARK-15895: Metadata files (e.g. Parquet summary files) and temporary files should not be + // counted as data files, so that they shouldn't participate partition discovery. + files.exists(f => isDataPath(f.getPath)) + }.keys.toSeq partitionSchema match { case Some(userProvidedSchema) if userProvidedSchema.nonEmpty => val spec = PartitioningUtils.parsePartitions( @@ -197,4 +201,9 @@ abstract class PartitioningAwareFileCatalog( if (leafFiles.contains(qualifiedPath)) qualifiedPath.getParent else qualifiedPath }.toSet } } + + private def isDataPath(path: Path): Boolean = { + val name = path.getName + !(name.startsWith("_") || name.startsWith(".")) + } } diff --git a/sql/core/src/test/scala/org/apache/spark/sql/execution/datasources/parquet/ParquetPartitionDiscoverySuite.scala b/sql/core/src/test/scala/org/apache/spark/sql/execution/datasources/parquet/ParquetPartitionDiscoverySuite.scala index e19345529e..133ffedf12 100644 --- a/sql/core/src/test/scala/org/apache/spark/sql/execution/datasources/parquet/ParquetPartitionDiscoverySuite.scala +++ b/sql/core/src/test/scala/org/apache/spark/sql/execution/datasources/parquet/ParquetPartitionDiscoverySuite.scala @@ -25,11 +25,13 @@ import scala.collection.mutable.ArrayBuffer import com.google.common.io.Files import org.apache.hadoop.fs.Path +import org.apache.parquet.hadoop.ParquetOutputFormat import org.apache.spark.sql._ import org.apache.spark.sql.catalyst.InternalRow import org.apache.spark.sql.catalyst.expressions.Literal import org.apache.spark.sql.execution.datasources.{HadoopFsRelation, LogicalRelation, PartitionDirectory => Partition, PartitioningUtils, PartitionSpec} +import org.apache.spark.sql.functions._ import org.apache.spark.sql.internal.SQLConf import org.apache.spark.sql.test.SharedSQLContext import org.apache.spark.sql.types._ @@ -890,4 +892,46 @@ class ParquetPartitionDiscoverySuite extends QueryTest with ParquetTest with Sha } } } + + test("SPARK-15895 summary files in non-leaf partition directories") { + withTempPath { dir => + val path = dir.getCanonicalPath + + withSQLConf(ParquetOutputFormat.ENABLE_JOB_SUMMARY -> "true") { + spark.range(3).write.parquet(s"$path/p0=0/p1=0") + } + + val p0 = new File(path, "p0=0") + val p1 = new File(p0, "p1=0") + + // Builds the following directory layout by: + // + // 1. copying Parquet summary files we just wrote into `p0=0`, and + // 2. touching a dot-file `.dummy` under `p0=0`. + // + // + // +- p0=0 + // |- _metadata + // |- _common_metadata + // |- .dummy + // +- p1=0 + // |- _metadata + // |- _common_metadata + // |- part-00000.parquet + // |- part-00001.parquet + // +- ... + // + // The summary files and the dot-file under `p0=0` should not fail partition discovery. + + Files.copy(new File(p1, "_metadata"), new File(p0, "_metadata")) + Files.copy(new File(p1, "_common_metadata"), new File(p0, "_common_metadata")) + Files.touch(new File(p0, ".dummy")) + + checkAnswer(spark.read.parquet(s"$path"), Seq( + Row(0, 0, 0), + Row(1, 0, 0), + Row(2, 0, 0) + )) + } + } }