[SPARK-6016][SQL] Cannot read the parquet table after overwriting the existing table when spark.sql.parquet.cacheMetadata=true

Please see JIRA (https://issues.apache.org/jira/browse/SPARK-6016) for details of the bug.

Author: Yin Huai <yhuai@databricks.com>

Closes #4775 from yhuai/parquetFooterCache and squashes the following commits:

78787b1 [Yin Huai] Remove footerCache in FilteringParquetRowInputFormat.
dff6fba [Yin Huai] Failed unit test.
This commit is contained in:
Yin Huai 2015-02-27 01:01:32 +08:00 committed by Cheng Lian
parent f02394d064
commit 192e42a293
3 changed files with 42 additions and 42 deletions

View file

@ -374,8 +374,6 @@ private[parquet] class AppendingParquetOutputFormat(offset: Int)
private[parquet] class FilteringParquetRowInputFormat
extends parquet.hadoop.ParquetInputFormat[Row] with Logging {
private var footers: JList[Footer] = _
private var fileStatuses = Map.empty[Path, FileStatus]
override def createRecordReader(
@ -396,46 +394,15 @@ private[parquet] class FilteringParquetRowInputFormat
}
}
override def getFooters(jobContext: JobContext): JList[Footer] = {
import org.apache.spark.sql.parquet.FilteringParquetRowInputFormat.footerCache
if (footers eq null) {
val conf = ContextUtil.getConfiguration(jobContext)
val cacheMetadata = conf.getBoolean(SQLConf.PARQUET_CACHE_METADATA, true)
// This is only a temporary solution sicne we need to use fileStatuses in
// both getClientSideSplits and getTaskSideSplits. It can be removed once we get rid of these
// two methods.
override def getSplits(jobContext: JobContext): JList[InputSplit] = {
// First set fileStatuses.
val statuses = listStatus(jobContext)
fileStatuses = statuses.map(file => file.getPath -> file).toMap
if (statuses.isEmpty) {
footers = Collections.emptyList[Footer]
} else if (!cacheMetadata) {
// Read the footers from HDFS
footers = getFooters(conf, statuses)
} else {
// Read only the footers that are not in the footerCache
val foundFooters = footerCache.getAllPresent(statuses)
val toFetch = new ArrayList[FileStatus]
for (s <- statuses) {
if (!foundFooters.containsKey(s)) {
toFetch.add(s)
}
}
val newFooters = new mutable.HashMap[FileStatus, Footer]
if (toFetch.size > 0) {
val startFetch = System.currentTimeMillis
val fetched = getFooters(conf, toFetch)
logInfo(s"Fetched $toFetch footers in ${System.currentTimeMillis - startFetch} ms")
for ((status, i) <- toFetch.zipWithIndex) {
newFooters(status) = fetched.get(i)
}
footerCache.putAll(newFooters)
}
footers = new ArrayList[Footer](statuses.size)
for (status <- statuses) {
footers.add(newFooters.getOrElse(status, foundFooters.get(status)))
}
}
}
footers
super.getSplits(jobContext)
}
// TODO Remove this method and related code once PARQUET-16 is fixed

View file

@ -200,7 +200,7 @@ private[sql] case class ParquetRelation2(
private var commonMetadataStatuses: Array[FileStatus] = _
// Parquet footer cache.
private var footers: Map[FileStatus, Footer] = _
var footers: Map[FileStatus, Footer] = _
// `FileStatus` objects of all data files (Parquet part-files).
var dataStatuses: Array[FileStatus] = _
@ -400,6 +400,7 @@ private[sql] case class ParquetRelation2(
} else {
metadataCache.dataStatuses.toSeq
}
val selectedFooters = selectedFiles.map(metadataCache.footers)
// FileInputFormat cannot handle empty lists.
if (selectedFiles.nonEmpty) {
@ -447,11 +448,16 @@ private[sql] case class ParquetRelation2(
@transient
val cachedStatus = selectedFiles
@transient
val cachedFooters = selectedFooters
// Overridden so we can inject our own cached files statuses.
override def getPartitions: Array[SparkPartition] = {
val inputFormat = if (cacheMetadata) {
new FilteringParquetRowInputFormat {
override def listStatus(jobContext: JobContext): JList[FileStatus] = cachedStatus
override def getFooters(jobContext: JobContext): JList[Footer] = cachedFooters
}
} else {
new FilteringParquetRowInputFormat

View file

@ -29,6 +29,7 @@ import org.apache.spark.sql.hive.execution.{InsertIntoHiveTable, HiveTableScan}
import org.apache.spark.sql.hive.test.TestHive._
import org.apache.spark.sql.hive.test.TestHive.implicits._
import org.apache.spark.sql.sources.{InsertIntoDataSource, LogicalRelation}
import org.apache.spark.sql.SaveMode
// The data where the partitioning key exists only in the directory structure.
case class ParquetData(intField: Int, stringField: String)
@ -409,6 +410,32 @@ class ParquetSourceSuiteBase extends ParquetPartitioningTest {
)
""")
}
test("SPARK-6016 make sure to use the latest footers") {
sql("drop table if exists spark_6016_fix")
// Create a DataFrame with two partitions. So, the created table will have two parquet files.
val df1 = jsonRDD(sparkContext.parallelize((1 to 10).map(i => s"""{"a":$i}"""), 2))
df1.saveAsTable("spark_6016_fix", "parquet", SaveMode.Overwrite)
checkAnswer(
sql("select * from spark_6016_fix"),
(1 to 10).map(i => Row(i))
)
// Create a DataFrame with four partitions. So, the created table will have four parquet files.
val df2 = jsonRDD(sparkContext.parallelize((1 to 10).map(i => s"""{"b":$i}"""), 4))
df2.saveAsTable("spark_6016_fix", "parquet", SaveMode.Overwrite)
// For the bug of SPARK-6016, we are caching two outdated footers for df1. Then,
// since the new table has four parquet files, we are trying to read new footers from two files
// and then merge metadata in footers of these four (two outdated ones and two latest one),
// which will cause an error.
checkAnswer(
sql("select * from spark_6016_fix"),
(1 to 10).map(i => Row(i))
)
sql("drop table spark_6016_fix")
}
}
class ParquetDataSourceOnSourceSuite extends ParquetSourceSuiteBase {