[SPARK-7567] [SQL] [follow-up] Use a new flag to set output committer based on mapreduce apis
cc liancheng marmbrus
Author: Yin Huai <yhuai@databricks.com>
Closes #6130 from yhuai/directOutput and squashes the following commits:
312b07d [Yin Huai] A data source can use spark.sql.sources.outputCommitterClass to override the output committer.
(cherry picked from commit 530397ba2f
)
Signed-off-by: Michael Armbrust <michael@databricks.com>
This commit is contained in:
parent
d6f5f37911
commit
a385f4b8dd
|
@ -71,6 +71,10 @@ private[spark] object SQLConf {
|
||||||
// Whether to perform partition discovery when loading external data sources. Default to true.
|
// Whether to perform partition discovery when loading external data sources. Default to true.
|
||||||
val PARTITION_DISCOVERY_ENABLED = "spark.sql.sources.partitionDiscovery.enabled"
|
val PARTITION_DISCOVERY_ENABLED = "spark.sql.sources.partitionDiscovery.enabled"
|
||||||
|
|
||||||
|
// The output committer class used by FSBasedRelation. The specified class needs to be a
|
||||||
|
// subclass of org.apache.hadoop.mapreduce.OutputCommitter.
|
||||||
|
val OUTPUT_COMMITTER_CLASS = "spark.sql.sources.outputCommitterClass"
|
||||||
|
|
||||||
// Whether to perform eager analysis when constructing a dataframe.
|
// Whether to perform eager analysis when constructing a dataframe.
|
||||||
// Set to false when debugging requires the ability to look at invalid query plans.
|
// Set to false when debugging requires the ability to look at invalid query plans.
|
||||||
val DATAFRAME_EAGER_ANALYSIS = "spark.sql.eagerAnalysis"
|
val DATAFRAME_EAGER_ANALYSIS = "spark.sql.eagerAnalysis"
|
||||||
|
|
|
@ -197,7 +197,7 @@ private[sql] class ParquetRelation2(
|
||||||
classOf[ParquetOutputCommitter])
|
classOf[ParquetOutputCommitter])
|
||||||
|
|
||||||
conf.setClass(
|
conf.setClass(
|
||||||
"mapred.output.committer.class",
|
SQLConf.OUTPUT_COMMITTER_CLASS,
|
||||||
committerClass,
|
committerClass,
|
||||||
classOf[ParquetOutputCommitter])
|
classOf[ParquetOutputCommitter])
|
||||||
|
|
||||||
|
|
|
@ -23,7 +23,7 @@ import scala.collection.mutable
|
||||||
|
|
||||||
import org.apache.hadoop.fs.Path
|
import org.apache.hadoop.fs.Path
|
||||||
import org.apache.hadoop.mapreduce._
|
import org.apache.hadoop.mapreduce._
|
||||||
import org.apache.hadoop.mapreduce.lib.output.{FileOutputCommitter, FileOutputFormat}
|
import org.apache.hadoop.mapreduce.lib.output.{FileOutputCommitter => MapReduceFileOutputCommitter, FileOutputFormat}
|
||||||
import org.apache.hadoop.util.Shell
|
import org.apache.hadoop.util.Shell
|
||||||
import parquet.hadoop.util.ContextUtil
|
import parquet.hadoop.util.ContextUtil
|
||||||
|
|
||||||
|
@ -35,7 +35,7 @@ import org.apache.spark.sql.catalyst.expressions._
|
||||||
import org.apache.spark.sql.catalyst.expressions.codegen.GenerateProjection
|
import org.apache.spark.sql.catalyst.expressions.codegen.GenerateProjection
|
||||||
import org.apache.spark.sql.catalyst.plans.logical.LogicalPlan
|
import org.apache.spark.sql.catalyst.plans.logical.LogicalPlan
|
||||||
import org.apache.spark.sql.execution.RunnableCommand
|
import org.apache.spark.sql.execution.RunnableCommand
|
||||||
import org.apache.spark.sql.{DataFrame, SQLContext, SaveMode}
|
import org.apache.spark.sql.{SQLConf, DataFrame, SQLContext, SaveMode}
|
||||||
|
|
||||||
private[sql] case class InsertIntoDataSource(
|
private[sql] case class InsertIntoDataSource(
|
||||||
logicalRelation: LogicalRelation,
|
logicalRelation: LogicalRelation,
|
||||||
|
@ -287,24 +287,39 @@ private[sql] abstract class BaseWriterContainer(
|
||||||
protected def getWorkPath: String = {
|
protected def getWorkPath: String = {
|
||||||
outputCommitter match {
|
outputCommitter match {
|
||||||
// FileOutputCommitter writes to a temporary location returned by `getWorkPath`.
|
// FileOutputCommitter writes to a temporary location returned by `getWorkPath`.
|
||||||
case f: FileOutputCommitter => f.getWorkPath.toString
|
case f: MapReduceFileOutputCommitter => f.getWorkPath.toString
|
||||||
case _ => outputPath
|
case _ => outputPath
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
private def newOutputCommitter(context: TaskAttemptContext): OutputCommitter = {
|
private def newOutputCommitter(context: TaskAttemptContext): OutputCommitter = {
|
||||||
val committerClass = context.getConfiguration.getClass(
|
val committerClass = context.getConfiguration.getClass(
|
||||||
"mapred.output.committer.class", null, classOf[OutputCommitter])
|
SQLConf.OUTPUT_COMMITTER_CLASS, null, classOf[OutputCommitter])
|
||||||
|
|
||||||
Option(committerClass).map { clazz =>
|
Option(committerClass).map { clazz =>
|
||||||
val ctor = clazz.getDeclaredConstructor(classOf[Path], classOf[TaskAttemptContext])
|
// Every output format based on org.apache.hadoop.mapreduce.lib.output.OutputFormat
|
||||||
ctor.newInstance(new Path(outputPath), context)
|
// has an associated output committer. To override this output committer,
|
||||||
|
// we will first try to use the output committer set in SQLConf.OUTPUT_COMMITTER_CLASS.
|
||||||
|
// If a data source needs to override the output committer, it needs to set the
|
||||||
|
// output committer in prepareForWrite method.
|
||||||
|
if (classOf[MapReduceFileOutputCommitter].isAssignableFrom(clazz)) {
|
||||||
|
// The specified output committer is a FileOutputCommitter.
|
||||||
|
// So, we will use the FileOutputCommitter-specified constructor.
|
||||||
|
val ctor = clazz.getDeclaredConstructor(classOf[Path], classOf[TaskAttemptContext])
|
||||||
|
ctor.newInstance(new Path(outputPath), context)
|
||||||
|
} else {
|
||||||
|
// The specified output committer is just a OutputCommitter.
|
||||||
|
// So, we will use the no-argument constructor.
|
||||||
|
val ctor = clazz.getDeclaredConstructor()
|
||||||
|
ctor.newInstance()
|
||||||
|
}
|
||||||
}.getOrElse {
|
}.getOrElse {
|
||||||
|
// If output committer class is not set, we will use the one associated with the
|
||||||
|
// file output format.
|
||||||
outputFormatClass.newInstance().getOutputCommitter(context)
|
outputFormatClass.newInstance().getOutputCommitter(context)
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
|
|
||||||
private def setupIDs(jobId: Int, splitId: Int, attemptId: Int): Unit = {
|
private def setupIDs(jobId: Int, splitId: Int, attemptId: Int): Unit = {
|
||||||
this.jobId = SparkHadoopWriter.createJobID(new Date, jobId)
|
this.jobId = SparkHadoopWriter.createJobID(new Date, jobId)
|
||||||
this.taskId = new TaskID(this.jobId, true, splitId)
|
this.taskId = new TaskID(this.jobId, true, splitId)
|
||||||
|
|
|
@ -527,7 +527,8 @@ abstract class HadoopFsRelation private[sql](maybePartitionSpec: Option[Partitio
|
||||||
|
|
||||||
/**
|
/**
|
||||||
* Prepares a write job and returns an [[OutputWriterFactory]]. Client side job preparation can
|
* Prepares a write job and returns an [[OutputWriterFactory]]. Client side job preparation can
|
||||||
* be put here. For example, user defined output committer can be configured here.
|
* be put here. For example, user defined output committer can be configured here
|
||||||
|
* by setting the output committer class in the conf of spark.sql.sources.outputCommitterClass.
|
||||||
*
|
*
|
||||||
* Note that the only side effect expected here is mutating `job` via its setters. Especially,
|
* Note that the only side effect expected here is mutating `job` via its setters. Especially,
|
||||||
* Spark SQL caches [[BaseRelation]] instances for performance, mutating relation internal states
|
* Spark SQL caches [[BaseRelation]] instances for performance, mutating relation internal states
|
||||||
|
|
Loading…
Reference in a new issue