[SPARK-6352] [SQL] Add DirectParquetOutputCommitter
Add a DirectParquetOutputCommitter class that skips _temporary directory when saving to s3. Add new config value "spark.sql.parquet.useDirectParquetOutputCommitter" (default false) to choose between the default output committer. Author: Pei-Lun Lee <pllee@appier.com> Closes #5042 from ypcat/spark-6352 and squashes the following commits: e17bf47 [Pei-Lun Lee] Merge branch 'master' of https://github.com/apache/spark into spark-6352 9ae7545 [Pei-Lun Lee] [SPARL-6352] [SQL] Change to allow custom parquet output committer. 0d540b9 [Pei-Lun Lee] [SPARK-6352] [SQL] add license c42468c [Pei-Lun Lee] [SPARK-6352] [SQL] add test case 0fc03ca [Pei-Lun Lee] [SPARK-6532] [SQL] hide class DirectParquetOutputCommitter 769bd67 [Pei-Lun Lee] DirectParquetOutputCommitter f75e261 [Pei-Lun Lee] DirectParquetOutputCommitter
This commit is contained in:
parent
202ebf06e0
commit
b29663eeea
|
@ -0,0 +1,66 @@
|
|||
/*
|
||||
* Licensed to the Apache Software Foundation (ASF) under one or more
|
||||
* contributor license agreements. See the NOTICE file distributed with
|
||||
* this work for additional information regarding copyright ownership.
|
||||
* The ASF licenses this file to You under the Apache License, Version 2.0
|
||||
* (the "License"); you may not use this file except in compliance with
|
||||
* the License. You may obtain a copy of the License at
|
||||
*
|
||||
* http://www.apache.org/licenses/LICENSE-2.0
|
||||
*
|
||||
* Unless required by applicable law or agreed to in writing, software
|
||||
* distributed under the License is distributed on an "AS IS" BASIS,
|
||||
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
|
||||
* See the License for the specific language governing permissions and
|
||||
* limitations under the License.
|
||||
*/
|
||||
|
||||
package org.apache.spark.sql.parquet
|
||||
|
||||
import org.apache.hadoop.fs.Path
|
||||
import org.apache.hadoop.mapreduce.{JobContext, TaskAttemptContext}
|
||||
import org.apache.hadoop.mapreduce.lib.output.FileOutputCommitter
|
||||
|
||||
import parquet.Log
|
||||
import parquet.hadoop.util.ContextUtil
|
||||
import parquet.hadoop.{ParquetFileReader, ParquetFileWriter, ParquetOutputCommitter}
|
||||
|
||||
private[parquet] class DirectParquetOutputCommitter(outputPath: Path, context: TaskAttemptContext)
|
||||
extends ParquetOutputCommitter(outputPath, context) {
|
||||
val LOG = Log.getLog(classOf[ParquetOutputCommitter])
|
||||
|
||||
override def getWorkPath(): Path = outputPath
|
||||
override def abortTask(taskContext: TaskAttemptContext): Unit = {}
|
||||
override def commitTask(taskContext: TaskAttemptContext): Unit = {}
|
||||
override def needsTaskCommit(taskContext: TaskAttemptContext): Boolean = true
|
||||
override def setupJob(jobContext: JobContext): Unit = {}
|
||||
override def setupTask(taskContext: TaskAttemptContext): Unit = {}
|
||||
|
||||
override def commitJob(jobContext: JobContext) {
|
||||
try {
|
||||
val configuration = ContextUtil.getConfiguration(jobContext)
|
||||
val fileSystem = outputPath.getFileSystem(configuration)
|
||||
val outputStatus = fileSystem.getFileStatus(outputPath)
|
||||
val footers = ParquetFileReader.readAllFootersInParallel(configuration, outputStatus)
|
||||
try {
|
||||
ParquetFileWriter.writeMetadataFile(configuration, outputPath, footers)
|
||||
if (configuration.getBoolean("mapreduce.fileoutputcommitter.marksuccessfuljobs", true)) {
|
||||
val successPath = new Path(outputPath, FileOutputCommitter.SUCCEEDED_FILE_NAME)
|
||||
fileSystem.create(successPath).close()
|
||||
}
|
||||
} catch {
|
||||
case e: Exception => {
|
||||
LOG.warn("could not write summary file for " + outputPath, e)
|
||||
val metadataPath = new Path(outputPath, ParquetFileWriter.PARQUET_METADATA_FILE)
|
||||
if (fileSystem.exists(metadataPath)) {
|
||||
fileSystem.delete(metadataPath, true)
|
||||
}
|
||||
}
|
||||
}
|
||||
} catch {
|
||||
case e: Exception => LOG.warn("could not write summary file for " + outputPath, e)
|
||||
}
|
||||
}
|
||||
|
||||
}
|
||||
|
|
@ -379,6 +379,8 @@ private[sql] case class InsertIntoParquetTable(
|
|||
*/
|
||||
private[parquet] class AppendingParquetOutputFormat(offset: Int)
|
||||
extends parquet.hadoop.ParquetOutputFormat[Row] {
|
||||
var committer: OutputCommitter = null
|
||||
|
||||
// override to accept existing directories as valid output directory
|
||||
override def checkOutputSpecs(job: JobContext): Unit = {}
|
||||
|
||||
|
@ -403,6 +405,26 @@ private[parquet] class AppendingParquetOutputFormat(offset: Int)
|
|||
private def getTaskAttemptID(context: TaskAttemptContext): TaskAttemptID = {
|
||||
context.getClass.getMethod("getTaskAttemptID").invoke(context).asInstanceOf[TaskAttemptID]
|
||||
}
|
||||
|
||||
// override to create output committer from configuration
|
||||
override def getOutputCommitter(context: TaskAttemptContext): OutputCommitter = {
|
||||
if (committer == null) {
|
||||
val output = getOutputPath(context)
|
||||
val cls = context.getConfiguration.getClass("spark.sql.parquet.output.committer.class",
|
||||
classOf[ParquetOutputCommitter], classOf[ParquetOutputCommitter])
|
||||
val ctor = cls.getDeclaredConstructor(classOf[Path], classOf[TaskAttemptContext])
|
||||
committer = ctor.newInstance(output, context).asInstanceOf[ParquetOutputCommitter]
|
||||
}
|
||||
committer
|
||||
}
|
||||
|
||||
// FileOutputFormat.getOutputPath takes JobConf in hadoop-1 but JobContext in hadoop-2
|
||||
private def getOutputPath(context: TaskAttemptContext): Path = {
|
||||
context.getConfiguration().get("mapred.output.dir") match {
|
||||
case null => null
|
||||
case name => new Path(name)
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
/**
|
||||
|
|
|
@ -381,6 +381,27 @@ class ParquetIOSuiteBase extends QueryTest with ParquetTest {
|
|||
}
|
||||
}
|
||||
}
|
||||
|
||||
test("SPARK-6352 DirectParquetOutputCommitter") {
|
||||
// Write to a parquet file and let it fail.
|
||||
// _temporary should be missing if direct output committer works.
|
||||
try {
|
||||
configuration.set("spark.sql.parquet.output.committer.class",
|
||||
"org.apache.spark.sql.parquet.DirectParquetOutputCommitter")
|
||||
sqlContext.udf.register("div0", (x: Int) => x / 0)
|
||||
withTempPath { dir =>
|
||||
intercept[org.apache.spark.SparkException] {
|
||||
sqlContext.sql("select div0(1)").saveAsParquetFile(dir.getCanonicalPath)
|
||||
}
|
||||
val path = new Path(dir.getCanonicalPath, "_temporary")
|
||||
val fs = path.getFileSystem(configuration)
|
||||
assert(!fs.exists(path))
|
||||
}
|
||||
}
|
||||
finally {
|
||||
configuration.unset("spark.sql.parquet.output.committer.class")
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
class ParquetDataSourceOnIOSuite extends ParquetIOSuiteBase with BeforeAndAfterAll {
|
||||
|
|
Loading…
Reference in a new issue