[SPARK-7599] [SQL] Don't restrict customized output committers to be subclasses of FileOutputCommitter
Author: Cheng Lian <lian@databricks.com>
Closes #6118 from liancheng/spark-7599 and squashes the following commits:
31e1bd6 [Cheng Lian] Don't restrict customized output committers to be subclasses of FileOutputCommitter
(cherry picked from commit 10c546e9d4
)
Signed-off-by: Yin Huai <yhuai@databricks.com>
This commit is contained in:
parent
bfdecace5d
commit
cb1fe81339
|
@ -244,7 +244,7 @@ private[sql] abstract class BaseWriterContainer(
|
|||
@transient private val jobContext: JobContext = job
|
||||
|
||||
// The following fields are initialized and used on both driver and executor side.
|
||||
@transient protected var outputCommitter: FileOutputCommitter = _
|
||||
@transient protected var outputCommitter: OutputCommitter = _
|
||||
@transient private var jobId: JobID = _
|
||||
@transient private var taskId: TaskID = _
|
||||
@transient private var taskAttemptId: TaskAttemptID = _
|
||||
|
@ -282,14 +282,18 @@ private[sql] abstract class BaseWriterContainer(
|
|||
initWriters()
|
||||
}
|
||||
|
||||
private def newOutputCommitter(context: TaskAttemptContext): FileOutputCommitter = {
|
||||
outputFormatClass.newInstance().getOutputCommitter(context) match {
|
||||
case f: FileOutputCommitter => f
|
||||
case f => sys.error(
|
||||
s"FileOutputCommitter or its subclass is expected, but got a ${f.getClass.getName}.")
|
||||
protected def getWorkPath: String = {
|
||||
outputCommitter match {
|
||||
// FileOutputCommitter writes to a temporary location returned by `getWorkPath`.
|
||||
case f: FileOutputCommitter => f.getWorkPath.toString
|
||||
case _ => outputPath
|
||||
}
|
||||
}
|
||||
|
||||
private def newOutputCommitter(context: TaskAttemptContext): OutputCommitter = {
|
||||
outputFormatClass.newInstance().getOutputCommitter(context)
|
||||
}
|
||||
|
||||
private def setupIDs(jobId: Int, splitId: Int, attemptId: Int): Unit = {
|
||||
this.jobId = SparkHadoopWriter.createJobID(new Date, jobId)
|
||||
this.taskId = new TaskID(this.jobId, true, splitId)
|
||||
|
@ -339,7 +343,7 @@ private[sql] class DefaultWriterContainer(
|
|||
|
||||
override protected def initWriters(): Unit = {
|
||||
writer = outputWriterClass.newInstance()
|
||||
writer.init(outputCommitter.getWorkPath.toString, dataSchema, taskAttemptContext)
|
||||
writer.init(getWorkPath, dataSchema, taskAttemptContext)
|
||||
}
|
||||
|
||||
override def outputWriterForRow(row: Row): OutputWriter = writer
|
||||
|
@ -381,7 +385,7 @@ private[sql] class DynamicPartitionWriterContainer(
|
|||
}.mkString
|
||||
|
||||
outputWriters.getOrElseUpdate(partitionPath, {
|
||||
val path = new Path(outputCommitter.getWorkPath, partitionPath.stripPrefix(Path.SEPARATOR))
|
||||
val path = new Path(getWorkPath, partitionPath.stripPrefix(Path.SEPARATOR))
|
||||
val writer = outputWriterClass.newInstance()
|
||||
writer.init(path.toString, dataSchema, taskAttemptContext)
|
||||
writer
|
||||
|
|
Loading…
Reference in a new issue