[SPARK-10063][SQL] Remove DirectParquetOutputCommitter

## What changes were proposed in this pull request?
This patch removes DirectParquetOutputCommitter. This was initially created by Databricks as a faster way to write Parquet data to S3. However, given how the underlying S3 Hadoop implementation works, this committer only works when there are no failures. If there are multiple attempts of the same task (e.g. speculation or task failures or node failures), the output data can be corrupted. I don't think this performance optimization outweighs the correctness issue.

## How was this patch tested?
Removed the related tests also.

Author: Reynold Xin <rxin@databricks.com>

Closes #12229 from rxin/SPARK-10063.
This commit is contained in:
Reynold Xin 2016-04-07 00:51:45 -07:00
parent e11aa9ec5c
commit 9ca0760d67
6 changed files with 5 additions and 224 deletions

View file

@ -1466,37 +1466,6 @@ Configuration of Parquet can be done using the `setConf` method on `SQLContext`
support.
</td>
</tr>
<tr>
<td><code>spark.sql.parquet.output.committer.class</code></td>
<td><code>org.apache.parquet.hadoop.<br />ParquetOutputCommitter</code></td>
<td>
<p>
The output committer class used by Parquet. The specified class needs to be a subclass of
<code>org.apache.hadoop.<br />mapreduce.OutputCommitter</code>. Typically, it's also a
subclass of <code>org.apache.parquet.hadoop.ParquetOutputCommitter</code>.
</p>
<p>
<b>Note:</b>
<ul>
<li>
This option is automatically ignored if <code>spark.speculation</code> is turned on.
</li>
<li>
This option must be set via Hadoop <code>Configuration</code> rather than Spark
<code>SQLConf</code>.
</li>
<li>
This option overrides <code>spark.sql.sources.<br />outputCommitterClass</code>.
</li>
</ul>
</p>
<p>
Spark SQL comes with a builtin
<code>org.apache.spark.sql.<br />parquet.DirectParquetOutputCommitter</code>, which can be more
efficient then the default Parquet output committer when writing data to S3.
</p>
</td>
</tr>
<tr>
<td><code>spark.sql.parquet.mergeSchema</code></td>
<td><code>false</code></td>
@ -2165,8 +2134,6 @@ options.
- In the `sql` dialect, floating point numbers are now parsed as decimal. HiveQL parsing remains
unchanged.
- The canonical name of SQL/DataFrame functions are now lower case (e.g. sum vs SUM).
- It has been determined that using the DirectOutputCommitter when speculation is enabled is unsafe
and thus this output committer will not be used when speculation is on, independent of configuration.
- JSON data source will not automatically load new files that are created by other applications
(i.e. files that are not inserted to the dataset through Spark SQL).
For a JSON persistent table (i.e. the metadata of the table is stored in Hive Metastore),

View file

@ -129,16 +129,17 @@ private[sql] abstract class BaseWriterContainer(
outputWriterFactory.newInstance(path, bucketId, dataSchema, taskAttemptContext)
} catch {
case e: org.apache.hadoop.fs.FileAlreadyExistsException =>
if (outputCommitter.isInstanceOf[parquet.DirectParquetOutputCommitter]) {
// Spark-11382: DirectParquetOutputCommitter is not idempotent, meaning on retry
if (outputCommitter.getClass.getName.contains("Direct")) {
// SPARK-11382: DirectParquetOutputCommitter is not idempotent, meaning on retry
// attempts, the task will fail because the output file is created from a prior attempt.
// This often means the most visible error to the user is misleading. Augment the error
// to tell the user to look for the actual error.
throw new SparkException("The output file already exists but this could be due to a " +
"failure from an earlier attempt. Look through the earlier logs or stage page for " +
"the first error.\n File exists error: " + e)
"the first error.\n File exists error: " + e.getLocalizedMessage, e)
} else {
throw e
}
throw e
}
}
@ -156,15 +157,6 @@ private[sql] abstract class BaseWriterContainer(
s"Using default output committer ${defaultOutputCommitter.getClass.getCanonicalName} " +
"for appending.")
defaultOutputCommitter
} else if (speculationEnabled) {
// When speculation is enabled, it's not safe to use customized output committer classes,
// especially direct output committers (e.g. `DirectParquetOutputCommitter`).
//
// See SPARK-9899 for more details.
logInfo(
s"Using default output committer ${defaultOutputCommitter.getClass.getCanonicalName} " +
"because spark.speculation is configured to be true.")
defaultOutputCommitter
} else {
val configuration = context.getConfiguration
val committerClass = configuration.getClass(

View file

@ -1,88 +0,0 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.spark.sql.execution.datasources.parquet
import org.apache.hadoop.conf.Configuration
import org.apache.hadoop.fs.Path
import org.apache.hadoop.mapreduce.{JobContext, TaskAttemptContext}
import org.apache.hadoop.mapreduce.lib.output.FileOutputCommitter
import org.apache.parquet.Log
import org.apache.parquet.hadoop.{ParquetFileReader, ParquetFileWriter, ParquetOutputCommitter, ParquetOutputFormat}
import org.apache.parquet.hadoop.util.ContextUtil
/**
* An output committer for writing Parquet files. In stead of writing to the `_temporary` folder
* like what [[ParquetOutputCommitter]] does, this output committer writes data directly to the
* destination folder. This can be useful for data stored in S3, where directory operations are
* relatively expensive.
*
* To enable this output committer, users may set the "spark.sql.parquet.output.committer.class"
* property via Hadoop [[Configuration]]. Not that this property overrides
* "spark.sql.sources.outputCommitterClass".
*
* *NOTE*
*
* NEVER use [[DirectParquetOutputCommitter]] when appending data, because currently there's
* no safe way undo a failed appending job (that's why both `abortTask()` and `abortJob()` are
* left empty).
*/
private[datasources] class DirectParquetOutputCommitter(
outputPath: Path, context: TaskAttemptContext)
extends ParquetOutputCommitter(outputPath, context) {
val LOG = Log.getLog(classOf[ParquetOutputCommitter])
override def getWorkPath: Path = outputPath
override def abortTask(taskContext: TaskAttemptContext): Unit = {}
override def commitTask(taskContext: TaskAttemptContext): Unit = {}
override def needsTaskCommit(taskContext: TaskAttemptContext): Boolean = true
override def setupJob(jobContext: JobContext): Unit = {}
override def setupTask(taskContext: TaskAttemptContext): Unit = {}
override def commitJob(jobContext: JobContext) {
val configuration = ContextUtil.getConfiguration(jobContext)
val fileSystem = outputPath.getFileSystem(configuration)
if (configuration.getBoolean(ParquetOutputFormat.ENABLE_JOB_SUMMARY, true)) {
try {
val outputStatus = fileSystem.getFileStatus(outputPath)
val footers = ParquetFileReader.readAllFootersInParallel(configuration, outputStatus)
try {
ParquetFileWriter.writeMetadataFile(configuration, outputPath, footers)
} catch { case e: Exception =>
LOG.warn("could not write summary file for " + outputPath, e)
val metadataPath = new Path(outputPath, ParquetFileWriter.PARQUET_METADATA_FILE)
if (fileSystem.exists(metadataPath)) {
fileSystem.delete(metadataPath, true)
}
}
} catch {
case e: Exception => LOG.warn("could not write summary file for " + outputPath, e)
}
}
if (configuration.getBoolean("mapreduce.fileoutputcommitter.marksuccessfuljobs", true)) {
try {
val successPath = new Path(outputPath, FileOutputCommitter.SUCCEEDED_FILE_NAME)
fileSystem.create(successPath).close()
} catch {
case e: Exception => LOG.warn("could not write success file for " + outputPath, e)
}
}
}
}

View file

@ -76,13 +76,6 @@ private[sql] class DefaultSource
val conf = ContextUtil.getConfiguration(job)
// SPARK-9849 DirectParquetOutputCommitter qualified name should be backward compatible
val committerClassName = conf.get(SQLConf.PARQUET_OUTPUT_COMMITTER_CLASS.key)
if (committerClassName == "org.apache.spark.sql.parquet.DirectParquetOutputCommitter") {
conf.set(SQLConf.PARQUET_OUTPUT_COMMITTER_CLASS.key,
classOf[DirectParquetOutputCommitter].getCanonicalName)
}
val committerClass =
conf.getClass(
SQLConf.PARQUET_OUTPUT_COMMITTER_CLASS.key,

View file

@ -445,55 +445,6 @@ class ParquetIOSuite extends QueryTest with ParquetTest with SharedSQLContext {
}
}
testQuietly("SPARK-6352 DirectParquetOutputCommitter") {
val clonedConf = new Configuration(hadoopConfiguration)
// Write to a parquet file and let it fail.
// _temporary should be missing if direct output committer works.
try {
hadoopConfiguration.set("spark.sql.parquet.output.committer.class",
classOf[DirectParquetOutputCommitter].getCanonicalName)
sqlContext.udf.register("div0", (x: Int) => x / 0)
withTempPath { dir =>
intercept[org.apache.spark.SparkException] {
sqlContext.sql("select div0(1) as div0").write.parquet(dir.getCanonicalPath)
}
val path = new Path(dir.getCanonicalPath, "_temporary")
val fs = path.getFileSystem(hadoopConfiguration)
assert(!fs.exists(path))
}
} finally {
// Hadoop 1 doesn't have `Configuration.unset`
hadoopConfiguration.clear()
clonedConf.asScala.foreach(entry => hadoopConfiguration.set(entry.getKey, entry.getValue))
}
}
testQuietly("SPARK-9849 DirectParquetOutputCommitter qualified name backwards compatibility") {
val clonedConf = new Configuration(hadoopConfiguration)
// Write to a parquet file and let it fail.
// _temporary should be missing if direct output committer works.
try {
hadoopConfiguration.set("spark.sql.parquet.output.committer.class",
"org.apache.spark.sql.parquet.DirectParquetOutputCommitter")
sqlContext.udf.register("div0", (x: Int) => x / 0)
withTempPath { dir =>
intercept[org.apache.spark.SparkException] {
sqlContext.sql("select div0(1) as div0").write.parquet(dir.getCanonicalPath)
}
val path = new Path(dir.getCanonicalPath, "_temporary")
val fs = path.getFileSystem(hadoopConfiguration)
assert(!fs.exists(path))
}
} finally {
// Hadoop 1 doesn't have `Configuration.unset`
hadoopConfiguration.clear()
clonedConf.asScala.foreach(entry => hadoopConfiguration.set(entry.getKey, entry.getValue))
}
}
test("SPARK-8121: spark.sql.parquet.output.committer.class shouldn't be overridden") {
withTempPath { dir =>
val clonedConf = new Configuration(hadoopConfiguration)

View file

@ -668,40 +668,6 @@ abstract class HadoopFsRelationTest extends QueryTest with SQLTestUtils with Tes
df.write.format(dataSourceName).partitionBy("c", "d", "e").saveAsTable("t")
}
}
test("SPARK-9899 Disable customized output committer when speculation is on") {
val clonedConf = new Configuration(hadoopConfiguration)
val speculationEnabled =
sqlContext.sparkContext.conf.getBoolean("spark.speculation", defaultValue = false)
try {
withTempPath { dir =>
// Enables task speculation
sqlContext.sparkContext.conf.set("spark.speculation", "true")
// Uses a customized output committer which always fails
hadoopConfiguration.set(
SQLConf.OUTPUT_COMMITTER_CLASS.key,
classOf[AlwaysFailOutputCommitter].getName)
// Code below shouldn't throw since customized output committer should be disabled.
val df = sqlContext.range(10).toDF().coalesce(1)
df.write.format(dataSourceName).save(dir.getCanonicalPath)
checkAnswer(
sqlContext
.read
.format(dataSourceName)
.option("dataSchema", df.schema.json)
.load(dir.getCanonicalPath),
df)
}
} finally {
// Hadoop 1 doesn't have `Configuration.unset`
hadoopConfiguration.clear()
clonedConf.asScala.foreach(entry => hadoopConfiguration.set(entry.getKey, entry.getValue))
sqlContext.sparkContext.conf.set("spark.speculation", speculationEnabled.toString)
}
}
}
// This class is used to test SPARK-8578. We should not use any custom output committer when