[SPARK-18219] Move commit protocol API (internal) from sql/core to core module
## What changes were proposed in this pull request? This patch moves the new commit protocol API from sql/core to core module, so we can use it in the future in the RDD API. As part of this patch, I also moved the speficiation of the random uuid for the write path out of the commit protocol, and instead pass in a job id. ## How was this patch tested? N/A Author: Reynold Xin <rxin@databricks.com> Closes #15731 from rxin/SPARK-18219.
This commit is contained in:
parent
96cc1b5675
commit
937af592e6
|
@ -0,0 +1,126 @@
|
|||
/*
|
||||
* Licensed to the Apache Software Foundation (ASF) under one or more
|
||||
* contributor license agreements. See the NOTICE file distributed with
|
||||
* this work for additional information regarding copyright ownership.
|
||||
* The ASF licenses this file to You under the Apache License, Version 2.0
|
||||
* (the "License"); you may not use this file except in compliance with
|
||||
* the License. You may obtain a copy of the License at
|
||||
*
|
||||
* http://www.apache.org/licenses/LICENSE-2.0
|
||||
*
|
||||
* Unless required by applicable law or agreed to in writing, software
|
||||
* distributed under the License is distributed on an "AS IS" BASIS,
|
||||
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
|
||||
* See the License for the specific language governing permissions and
|
||||
* limitations under the License.
|
||||
*/
|
||||
|
||||
package org.apache.spark.internal.io
|
||||
|
||||
import org.apache.hadoop.mapreduce._
|
||||
|
||||
import org.apache.spark.util.Utils
|
||||
|
||||
|
||||
/**
|
||||
* An interface to define how a single Spark job commits its outputs. Two notes:
|
||||
*
|
||||
* 1. Implementations must be serializable, as the committer instance instantiated on the driver
|
||||
* will be used for tasks on executors.
|
||||
* 2. Implementations should have a constructor with either 2 or 3 arguments:
|
||||
* (jobId: String, path: String) or (jobId: String, path: String, isAppend: Boolean).
|
||||
* 3. A committer should not be reused across multiple Spark jobs.
|
||||
*
|
||||
* The proper call sequence is:
|
||||
*
|
||||
* 1. Driver calls setupJob.
|
||||
* 2. As part of each task's execution, executor calls setupTask and then commitTask
|
||||
* (or abortTask if task failed).
|
||||
* 3. When all necessary tasks completed successfully, the driver calls commitJob. If the job
|
||||
* failed to execute (e.g. too many failed tasks), the job should call abortJob.
|
||||
*/
|
||||
abstract class FileCommitProtocol {
|
||||
import FileCommitProtocol._
|
||||
|
||||
/**
|
||||
* Setups up a job. Must be called on the driver before any other methods can be invoked.
|
||||
*/
|
||||
def setupJob(jobContext: JobContext): Unit
|
||||
|
||||
/**
|
||||
* Commits a job after the writes succeed. Must be called on the driver.
|
||||
*/
|
||||
def commitJob(jobContext: JobContext, taskCommits: Seq[TaskCommitMessage]): Unit
|
||||
|
||||
/**
|
||||
* Aborts a job after the writes fail. Must be called on the driver.
|
||||
*
|
||||
* Calling this function is a best-effort attempt, because it is possible that the driver
|
||||
* just crashes (or killed) before it can call abort.
|
||||
*/
|
||||
def abortJob(jobContext: JobContext): Unit
|
||||
|
||||
/**
|
||||
* Sets up a task within a job.
|
||||
* Must be called before any other task related methods can be invoked.
|
||||
*/
|
||||
def setupTask(taskContext: TaskAttemptContext): Unit
|
||||
|
||||
/**
|
||||
* Notifies the commit protocol to add a new file, and gets back the full path that should be
|
||||
* used. Must be called on the executors when running tasks.
|
||||
*
|
||||
* Note that the returned temp file may have an arbitrary path. The commit protocol only
|
||||
* promises that the file will be at the location specified by the arguments after job commit.
|
||||
*
|
||||
* A full file path consists of the following parts:
|
||||
* 1. the base path
|
||||
* 2. some sub-directory within the base path, used to specify partitioning
|
||||
* 3. file prefix, usually some unique job id with the task id
|
||||
* 4. bucket id
|
||||
* 5. source specific file extension, e.g. ".snappy.parquet"
|
||||
*
|
||||
* The "dir" parameter specifies 2, and "ext" parameter specifies both 4 and 5, and the rest
|
||||
* are left to the commit protocol implementation to decide.
|
||||
*/
|
||||
def newTaskTempFile(taskContext: TaskAttemptContext, dir: Option[String], ext: String): String
|
||||
|
||||
/**
|
||||
* Commits a task after the writes succeed. Must be called on the executors when running tasks.
|
||||
*/
|
||||
def commitTask(taskContext: TaskAttemptContext): TaskCommitMessage
|
||||
|
||||
/**
|
||||
* Aborts a task after the writes have failed. Must be called on the executors when running tasks.
|
||||
*
|
||||
* Calling this function is a best-effort attempt, because it is possible that the executor
|
||||
* just crashes (or killed) before it can call abort.
|
||||
*/
|
||||
def abortTask(taskContext: TaskAttemptContext): Unit
|
||||
}
|
||||
|
||||
|
||||
object FileCommitProtocol {
|
||||
class TaskCommitMessage(val obj: Any) extends Serializable
|
||||
|
||||
object EmptyTaskCommitMessage extends TaskCommitMessage(null)
|
||||
|
||||
/**
|
||||
* Instantiates a FileCommitProtocol using the given className.
|
||||
*/
|
||||
def instantiate(className: String, jobId: String, outputPath: String, isAppend: Boolean)
|
||||
: FileCommitProtocol = {
|
||||
val clazz = Utils.classForName(className).asInstanceOf[Class[FileCommitProtocol]]
|
||||
|
||||
// First try the one with argument (jobId: String, outputPath: String, isAppend: Boolean).
|
||||
// If that doesn't exist, try the one with (jobId: string, outputPath: String).
|
||||
try {
|
||||
val ctor = clazz.getDeclaredConstructor(classOf[String], classOf[String], classOf[Boolean])
|
||||
ctor.newInstance(jobId, outputPath, isAppend.asInstanceOf[java.lang.Boolean])
|
||||
} catch {
|
||||
case _: NoSuchMethodException =>
|
||||
val ctor = clazz.getDeclaredConstructor(classOf[String], classOf[String])
|
||||
ctor.newInstance(jobId, outputPath)
|
||||
}
|
||||
}
|
||||
}
|
|
@ -0,0 +1,111 @@
|
|||
/*
|
||||
* Licensed to the Apache Software Foundation (ASF) under one or more
|
||||
* contributor license agreements. See the NOTICE file distributed with
|
||||
* this work for additional information regarding copyright ownership.
|
||||
* The ASF licenses this file to You under the Apache License, Version 2.0
|
||||
* (the "License"); you may not use this file except in compliance with
|
||||
* the License. You may obtain a copy of the License at
|
||||
*
|
||||
* http://www.apache.org/licenses/LICENSE-2.0
|
||||
*
|
||||
* Unless required by applicable law or agreed to in writing, software
|
||||
* distributed under the License is distributed on an "AS IS" BASIS,
|
||||
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
|
||||
* See the License for the specific language governing permissions and
|
||||
* limitations under the License.
|
||||
*/
|
||||
|
||||
package org.apache.spark.internal.io
|
||||
|
||||
import java.util.Date
|
||||
|
||||
import org.apache.hadoop.fs.Path
|
||||
import org.apache.hadoop.mapreduce._
|
||||
import org.apache.hadoop.mapreduce.lib.output.FileOutputCommitter
|
||||
import org.apache.hadoop.mapreduce.task.TaskAttemptContextImpl
|
||||
|
||||
import org.apache.spark.SparkHadoopWriter
|
||||
import org.apache.spark.internal.Logging
|
||||
import org.apache.spark.mapred.SparkHadoopMapRedUtil
|
||||
|
||||
/**
|
||||
* An [[FileCommitProtocol]] implementation backed by an underlying Hadoop OutputCommitter
|
||||
* (from the newer mapreduce API, not the old mapred API).
|
||||
*
|
||||
* Unlike Hadoop's OutputCommitter, this implementation is serializable.
|
||||
*/
|
||||
class HadoopMapReduceCommitProtocol(jobId: String, path: String)
|
||||
extends FileCommitProtocol with Serializable with Logging {
|
||||
|
||||
import FileCommitProtocol._
|
||||
|
||||
/** OutputCommitter from Hadoop is not serializable so marking it transient. */
|
||||
@transient private var committer: OutputCommitter = _
|
||||
|
||||
protected def setupCommitter(context: TaskAttemptContext): OutputCommitter = {
|
||||
context.getOutputFormatClass.newInstance().getOutputCommitter(context)
|
||||
}
|
||||
|
||||
override def newTaskTempFile(
|
||||
taskContext: TaskAttemptContext, dir: Option[String], ext: String): String = {
|
||||
// The file name looks like part-r-00000-2dd664f9-d2c4-4ffe-878f-c6c70c1fb0cb_00003.gz.parquet
|
||||
// Note that %05d does not truncate the split number, so if we have more than 100000 tasks,
|
||||
// the file name is fine and won't overflow.
|
||||
val split = taskContext.getTaskAttemptID.getTaskID.getId
|
||||
val filename = f"part-$split%05d-$jobId$ext"
|
||||
|
||||
val stagingDir: String = committer match {
|
||||
// For FileOutputCommitter it has its own staging path called "work path".
|
||||
case f: FileOutputCommitter => Option(f.getWorkPath.toString).getOrElse(path)
|
||||
case _ => path
|
||||
}
|
||||
|
||||
dir.map { d =>
|
||||
new Path(new Path(stagingDir, d), filename).toString
|
||||
}.getOrElse {
|
||||
new Path(stagingDir, filename).toString
|
||||
}
|
||||
}
|
||||
|
||||
override def setupJob(jobContext: JobContext): Unit = {
|
||||
// Setup IDs
|
||||
val jobId = SparkHadoopWriter.createJobID(new Date, 0)
|
||||
val taskId = new TaskID(jobId, TaskType.MAP, 0)
|
||||
val taskAttemptId = new TaskAttemptID(taskId, 0)
|
||||
|
||||
// Set up the configuration object
|
||||
jobContext.getConfiguration.set("mapred.job.id", jobId.toString)
|
||||
jobContext.getConfiguration.set("mapred.tip.id", taskAttemptId.getTaskID.toString)
|
||||
jobContext.getConfiguration.set("mapred.task.id", taskAttemptId.toString)
|
||||
jobContext.getConfiguration.setBoolean("mapred.task.is.map", true)
|
||||
jobContext.getConfiguration.setInt("mapred.task.partition", 0)
|
||||
|
||||
val taskAttemptContext = new TaskAttemptContextImpl(jobContext.getConfiguration, taskAttemptId)
|
||||
committer = setupCommitter(taskAttemptContext)
|
||||
committer.setupJob(jobContext)
|
||||
}
|
||||
|
||||
override def commitJob(jobContext: JobContext, taskCommits: Seq[TaskCommitMessage]): Unit = {
|
||||
committer.commitJob(jobContext)
|
||||
}
|
||||
|
||||
override def abortJob(jobContext: JobContext): Unit = {
|
||||
committer.abortJob(jobContext, JobStatus.State.FAILED)
|
||||
}
|
||||
|
||||
override def setupTask(taskContext: TaskAttemptContext): Unit = {
|
||||
committer = setupCommitter(taskContext)
|
||||
committer.setupTask(taskContext)
|
||||
}
|
||||
|
||||
override def commitTask(taskContext: TaskAttemptContext): TaskCommitMessage = {
|
||||
val attemptId = taskContext.getTaskAttemptID
|
||||
SparkHadoopMapRedUtil.commitTask(
|
||||
committer, taskContext, attemptId.getJobID.getId, attemptId.getTaskID.getId)
|
||||
EmptyTaskCommitMessage
|
||||
}
|
||||
|
||||
override def abortTask(taskContext: TaskAttemptContext): Unit = {
|
||||
committer.abortTask(taskContext)
|
||||
}
|
||||
}
|
|
@ -1,257 +0,0 @@
|
|||
/*
|
||||
* Licensed to the Apache Software Foundation (ASF) under one or more
|
||||
* contributor license agreements. See the NOTICE file distributed with
|
||||
* this work for additional information regarding copyright ownership.
|
||||
* The ASF licenses this file to You under the Apache License, Version 2.0
|
||||
* (the "License"); you may not use this file except in compliance with
|
||||
* the License. You may obtain a copy of the License at
|
||||
*
|
||||
* http://www.apache.org/licenses/LICENSE-2.0
|
||||
*
|
||||
* Unless required by applicable law or agreed to in writing, software
|
||||
* distributed under the License is distributed on an "AS IS" BASIS,
|
||||
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
|
||||
* See the License for the specific language governing permissions and
|
||||
* limitations under the License.
|
||||
*/
|
||||
|
||||
package org.apache.spark.sql.execution.datasources
|
||||
|
||||
import java.util.{Date, UUID}
|
||||
|
||||
import org.apache.hadoop.fs.Path
|
||||
import org.apache.hadoop.mapreduce._
|
||||
import org.apache.hadoop.mapreduce.lib.output.FileOutputCommitter
|
||||
import org.apache.hadoop.mapreduce.task.TaskAttemptContextImpl
|
||||
|
||||
import org.apache.spark.SparkHadoopWriter
|
||||
import org.apache.spark.internal.Logging
|
||||
import org.apache.spark.mapred.SparkHadoopMapRedUtil
|
||||
import org.apache.spark.sql.internal.SQLConf
|
||||
import org.apache.spark.util.Utils
|
||||
|
||||
|
||||
object FileCommitProtocol {
|
||||
class TaskCommitMessage(val obj: Any) extends Serializable
|
||||
|
||||
object EmptyTaskCommitMessage extends TaskCommitMessage(null)
|
||||
|
||||
/**
|
||||
* Instantiates a FileCommitProtocol using the given className.
|
||||
*/
|
||||
def instantiate(className: String, outputPath: String, isAppend: Boolean): FileCommitProtocol = {
|
||||
try {
|
||||
val clazz = Utils.classForName(className).asInstanceOf[Class[FileCommitProtocol]]
|
||||
|
||||
// First try the one with argument (outputPath: String, isAppend: Boolean).
|
||||
// If that doesn't exist, try the one with (outputPath: String).
|
||||
try {
|
||||
val ctor = clazz.getDeclaredConstructor(classOf[String], classOf[Boolean])
|
||||
ctor.newInstance(outputPath, isAppend.asInstanceOf[java.lang.Boolean])
|
||||
} catch {
|
||||
case _: NoSuchMethodException =>
|
||||
val ctor = clazz.getDeclaredConstructor(classOf[String])
|
||||
ctor.newInstance(outputPath)
|
||||
}
|
||||
} catch {
|
||||
case e: ClassNotFoundException =>
|
||||
throw e
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
|
||||
/**
|
||||
* An interface to define how a single Spark job commits its outputs. Two notes:
|
||||
*
|
||||
* 1. Implementations must be serializable, as the committer instance instantiated on the driver
|
||||
* will be used for tasks on executors.
|
||||
* 2. A committer should not be reused across multiple Spark jobs.
|
||||
*
|
||||
* The proper call sequence is:
|
||||
*
|
||||
* 1. Driver calls setupJob.
|
||||
* 2. As part of each task's execution, executor calls setupTask and then commitTask
|
||||
* (or abortTask if task failed).
|
||||
* 3. When all necessary tasks completed successfully, the driver calls commitJob. If the job
|
||||
* failed to execute (e.g. too many failed tasks), the job should call abortJob.
|
||||
*/
|
||||
abstract class FileCommitProtocol {
|
||||
import FileCommitProtocol._
|
||||
|
||||
/**
|
||||
* Setups up a job. Must be called on the driver before any other methods can be invoked.
|
||||
*/
|
||||
def setupJob(jobContext: JobContext): Unit
|
||||
|
||||
/**
|
||||
* Commits a job after the writes succeed. Must be called on the driver.
|
||||
*/
|
||||
def commitJob(jobContext: JobContext, taskCommits: Seq[TaskCommitMessage]): Unit
|
||||
|
||||
/**
|
||||
* Aborts a job after the writes fail. Must be called on the driver.
|
||||
*
|
||||
* Calling this function is a best-effort attempt, because it is possible that the driver
|
||||
* just crashes (or killed) before it can call abort.
|
||||
*/
|
||||
def abortJob(jobContext: JobContext): Unit
|
||||
|
||||
/**
|
||||
* Sets up a task within a job.
|
||||
* Must be called before any other task related methods can be invoked.
|
||||
*/
|
||||
def setupTask(taskContext: TaskAttemptContext): Unit
|
||||
|
||||
/**
|
||||
* Notifies the commit protocol to add a new file, and gets back the full path that should be
|
||||
* used. Must be called on the executors when running tasks.
|
||||
*
|
||||
* Note that the returned temp file may have an arbitrary path. The commit protocol only
|
||||
* promises that the file will be at the location specified by the arguments after job commit.
|
||||
*
|
||||
* A full file path consists of the following parts:
|
||||
* 1. the base path
|
||||
* 2. some sub-directory within the base path, used to specify partitioning
|
||||
* 3. file prefix, usually some unique job id with the task id
|
||||
* 4. bucket id
|
||||
* 5. source specific file extension, e.g. ".snappy.parquet"
|
||||
*
|
||||
* The "dir" parameter specifies 2, and "ext" parameter specifies both 4 and 5, and the rest
|
||||
* are left to the commit protocol implementation to decide.
|
||||
*/
|
||||
def newTaskTempFile(taskContext: TaskAttemptContext, dir: Option[String], ext: String): String
|
||||
|
||||
/**
|
||||
* Commits a task after the writes succeed. Must be called on the executors when running tasks.
|
||||
*/
|
||||
def commitTask(taskContext: TaskAttemptContext): TaskCommitMessage
|
||||
|
||||
/**
|
||||
* Aborts a task after the writes have failed. Must be called on the executors when running tasks.
|
||||
*
|
||||
* Calling this function is a best-effort attempt, because it is possible that the executor
|
||||
* just crashes (or killed) before it can call abort.
|
||||
*/
|
||||
def abortTask(taskContext: TaskAttemptContext): Unit
|
||||
}
|
||||
|
||||
|
||||
/**
|
||||
* An [[FileCommitProtocol]] implementation backed by an underlying Hadoop OutputCommitter
|
||||
* (from the newer mapreduce API, not the old mapred API).
|
||||
*
|
||||
* Unlike Hadoop's OutputCommitter, this implementation is serializable.
|
||||
*/
|
||||
class HadoopCommitProtocolWrapper(path: String, isAppend: Boolean)
|
||||
extends FileCommitProtocol with Serializable with Logging {
|
||||
|
||||
import FileCommitProtocol._
|
||||
|
||||
/** OutputCommitter from Hadoop is not serializable so marking it transient. */
|
||||
@transient private var committer: OutputCommitter = _
|
||||
|
||||
/** UUID used to identify the job in file name. */
|
||||
private val uuid: String = UUID.randomUUID().toString
|
||||
|
||||
private def setupCommitter(context: TaskAttemptContext): Unit = {
|
||||
committer = context.getOutputFormatClass.newInstance().getOutputCommitter(context)
|
||||
|
||||
if (!isAppend) {
|
||||
// If we are appending data to an existing dir, we will only use the output committer
|
||||
// associated with the file output format since it is not safe to use a custom
|
||||
// committer for appending. For example, in S3, direct parquet output committer may
|
||||
// leave partial data in the destination dir when the appending job fails.
|
||||
// See SPARK-8578 for more details.
|
||||
val configuration = context.getConfiguration
|
||||
val clazz =
|
||||
configuration.getClass(SQLConf.OUTPUT_COMMITTER_CLASS.key, null, classOf[OutputCommitter])
|
||||
|
||||
if (clazz != null) {
|
||||
logInfo(s"Using user defined output committer class ${clazz.getCanonicalName}")
|
||||
|
||||
// Every output format based on org.apache.hadoop.mapreduce.lib.output.OutputFormat
|
||||
// has an associated output committer. To override this output committer,
|
||||
// we will first try to use the output committer set in SQLConf.OUTPUT_COMMITTER_CLASS.
|
||||
// If a data source needs to override the output committer, it needs to set the
|
||||
// output committer in prepareForWrite method.
|
||||
if (classOf[FileOutputCommitter].isAssignableFrom(clazz)) {
|
||||
// The specified output committer is a FileOutputCommitter.
|
||||
// So, we will use the FileOutputCommitter-specified constructor.
|
||||
val ctor = clazz.getDeclaredConstructor(classOf[Path], classOf[TaskAttemptContext])
|
||||
committer = ctor.newInstance(new Path(path), context)
|
||||
} else {
|
||||
// The specified output committer is just an OutputCommitter.
|
||||
// So, we will use the no-argument constructor.
|
||||
val ctor = clazz.getDeclaredConstructor()
|
||||
committer = ctor.newInstance()
|
||||
}
|
||||
}
|
||||
}
|
||||
logInfo(s"Using output committer class ${committer.getClass.getCanonicalName}")
|
||||
}
|
||||
|
||||
override def newTaskTempFile(
|
||||
taskContext: TaskAttemptContext, dir: Option[String], ext: String): String = {
|
||||
// The file name looks like part-r-00000-2dd664f9-d2c4-4ffe-878f-c6c70c1fb0cb_00003.gz.parquet
|
||||
// Note that %05d does not truncate the split number, so if we have more than 100000 tasks,
|
||||
// the file name is fine and won't overflow.
|
||||
val split = taskContext.getTaskAttemptID.getTaskID.getId
|
||||
val filename = f"part-$split%05d-$uuid$ext"
|
||||
|
||||
val stagingDir: String = committer match {
|
||||
// For FileOutputCommitter it has its own staging path called "work path".
|
||||
case f: FileOutputCommitter => Option(f.getWorkPath.toString).getOrElse(path)
|
||||
case _ => path
|
||||
}
|
||||
|
||||
dir.map { d =>
|
||||
new Path(new Path(stagingDir, d), filename).toString
|
||||
}.getOrElse {
|
||||
new Path(stagingDir, filename).toString
|
||||
}
|
||||
}
|
||||
|
||||
override def setupJob(jobContext: JobContext): Unit = {
|
||||
// Setup IDs
|
||||
val jobId = SparkHadoopWriter.createJobID(new Date, 0)
|
||||
val taskId = new TaskID(jobId, TaskType.MAP, 0)
|
||||
val taskAttemptId = new TaskAttemptID(taskId, 0)
|
||||
|
||||
// Set up the configuration object
|
||||
jobContext.getConfiguration.set("mapred.job.id", jobId.toString)
|
||||
jobContext.getConfiguration.set("mapred.tip.id", taskAttemptId.getTaskID.toString)
|
||||
jobContext.getConfiguration.set("mapred.task.id", taskAttemptId.toString)
|
||||
jobContext.getConfiguration.setBoolean("mapred.task.is.map", true)
|
||||
jobContext.getConfiguration.setInt("mapred.task.partition", 0)
|
||||
|
||||
val taskAttemptContext = new TaskAttemptContextImpl(jobContext.getConfiguration, taskAttemptId)
|
||||
setupCommitter(taskAttemptContext)
|
||||
|
||||
committer.setupJob(jobContext)
|
||||
}
|
||||
|
||||
override def commitJob(jobContext: JobContext, taskCommits: Seq[TaskCommitMessage]): Unit = {
|
||||
committer.commitJob(jobContext)
|
||||
}
|
||||
|
||||
override def abortJob(jobContext: JobContext): Unit = {
|
||||
committer.abortJob(jobContext, JobStatus.State.FAILED)
|
||||
}
|
||||
|
||||
override def setupTask(taskContext: TaskAttemptContext): Unit = {
|
||||
setupCommitter(taskContext)
|
||||
committer.setupTask(taskContext)
|
||||
}
|
||||
|
||||
override def commitTask(taskContext: TaskAttemptContext): TaskCommitMessage = {
|
||||
val attemptId = taskContext.getTaskAttemptID
|
||||
SparkHadoopMapRedUtil.commitTask(
|
||||
committer, taskContext, attemptId.getJobID.getId, attemptId.getTaskID.getId)
|
||||
EmptyTaskCommitMessage
|
||||
}
|
||||
|
||||
override def abortTask(taskContext: TaskAttemptContext): Unit = {
|
||||
committer.abortTask(taskContext)
|
||||
}
|
||||
}
|
|
@ -29,6 +29,8 @@ import org.apache.hadoop.mapreduce.task.TaskAttemptContextImpl
|
|||
|
||||
import org.apache.spark._
|
||||
import org.apache.spark.internal.Logging
|
||||
import org.apache.spark.internal.io.FileCommitProtocol
|
||||
import org.apache.spark.internal.io.FileCommitProtocol.TaskCommitMessage
|
||||
import org.apache.spark.sql.{Dataset, SparkSession}
|
||||
import org.apache.spark.sql.catalyst.catalog.BucketSpec
|
||||
import org.apache.spark.sql.catalyst.catalog.CatalogTypes.TablePartitionSpec
|
||||
|
@ -37,7 +39,6 @@ import org.apache.spark.sql.catalyst.plans.physical.HashPartitioning
|
|||
import org.apache.spark.sql.catalyst.InternalRow
|
||||
import org.apache.spark.sql.catalyst.plans.logical.LogicalPlan
|
||||
import org.apache.spark.sql.execution.{SQLExecution, UnsafeKVExternalSorter}
|
||||
import org.apache.spark.sql.execution.datasources.FileCommitProtocol.TaskCommitMessage
|
||||
import org.apache.spark.sql.types.{IntegerType, StringType, StructField, StructType}
|
||||
import org.apache.spark.util.{SerializableConfiguration, Utils}
|
||||
import org.apache.spark.util.collection.unsafe.sort.UnsafeExternalSorter
|
||||
|
|
|
@ -21,6 +21,7 @@ import java.io.IOException
|
|||
|
||||
import org.apache.hadoop.fs.Path
|
||||
|
||||
import org.apache.spark.internal.io.FileCommitProtocol
|
||||
import org.apache.spark.sql._
|
||||
import org.apache.spark.sql.catalyst.catalog.BucketSpec
|
||||
import org.apache.spark.sql.catalyst.catalog.CatalogTypes.TablePartitionSpec
|
||||
|
@ -86,8 +87,9 @@ case class InsertIntoHadoopFsRelationCommand(
|
|||
if (doInsertion) {
|
||||
val committer = FileCommitProtocol.instantiate(
|
||||
sparkSession.sessionState.conf.fileCommitProtocolClass,
|
||||
outputPath.toString,
|
||||
isAppend)
|
||||
jobId = java.util.UUID.randomUUID().toString,
|
||||
outputPath = outputPath.toString,
|
||||
isAppend = isAppend)
|
||||
|
||||
FileFormatWriter.write(
|
||||
sparkSession = sparkSession,
|
||||
|
|
|
@ -0,0 +1,72 @@
|
|||
/*
|
||||
* Licensed to the Apache Software Foundation (ASF) under one or more
|
||||
* contributor license agreements. See the NOTICE file distributed with
|
||||
* this work for additional information regarding copyright ownership.
|
||||
* The ASF licenses this file to You under the Apache License, Version 2.0
|
||||
* (the "License"); you may not use this file except in compliance with
|
||||
* the License. You may obtain a copy of the License at
|
||||
*
|
||||
* http://www.apache.org/licenses/LICENSE-2.0
|
||||
*
|
||||
* Unless required by applicable law or agreed to in writing, software
|
||||
* distributed under the License is distributed on an "AS IS" BASIS,
|
||||
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
|
||||
* See the License for the specific language governing permissions and
|
||||
* limitations under the License.
|
||||
*/
|
||||
|
||||
package org.apache.spark.sql.execution.datasources
|
||||
|
||||
import org.apache.hadoop.fs.Path
|
||||
import org.apache.hadoop.mapreduce.{OutputCommitter, TaskAttemptContext}
|
||||
import org.apache.hadoop.mapreduce.lib.output.FileOutputCommitter
|
||||
|
||||
import org.apache.spark.internal.Logging
|
||||
import org.apache.spark.internal.io.HadoopMapReduceCommitProtocol
|
||||
import org.apache.spark.sql.internal.SQLConf
|
||||
|
||||
/**
|
||||
* A variant of [[HadoopMapReduceCommitProtocol]] that allows specifying the actual
|
||||
* Hadoop output committer using an option specified in SQLConf.
|
||||
*/
|
||||
class SQLHadoopMapReduceCommitProtocol(jobId: String, path: String, isAppend: Boolean)
|
||||
extends HadoopMapReduceCommitProtocol(jobId, path) with Serializable with Logging {
|
||||
|
||||
override protected def setupCommitter(context: TaskAttemptContext): OutputCommitter = {
|
||||
var committer = context.getOutputFormatClass.newInstance().getOutputCommitter(context)
|
||||
|
||||
if (!isAppend) {
|
||||
// If we are appending data to an existing dir, we will only use the output committer
|
||||
// associated with the file output format since it is not safe to use a custom
|
||||
// committer for appending. For example, in S3, direct parquet output committer may
|
||||
// leave partial data in the destination dir when the appending job fails.
|
||||
// See SPARK-8578 for more details.
|
||||
val configuration = context.getConfiguration
|
||||
val clazz =
|
||||
configuration.getClass(SQLConf.OUTPUT_COMMITTER_CLASS.key, null, classOf[OutputCommitter])
|
||||
|
||||
if (clazz != null) {
|
||||
logInfo(s"Using user defined output committer class ${clazz.getCanonicalName}")
|
||||
|
||||
// Every output format based on org.apache.hadoop.mapreduce.lib.output.OutputFormat
|
||||
// has an associated output committer. To override this output committer,
|
||||
// we will first try to use the output committer set in SQLConf.OUTPUT_COMMITTER_CLASS.
|
||||
// If a data source needs to override the output committer, it needs to set the
|
||||
// output committer in prepareForWrite method.
|
||||
if (classOf[FileOutputCommitter].isAssignableFrom(clazz)) {
|
||||
// The specified output committer is a FileOutputCommitter.
|
||||
// So, we will use the FileOutputCommitter-specified constructor.
|
||||
val ctor = clazz.getDeclaredConstructor(classOf[Path], classOf[TaskAttemptContext])
|
||||
committer = ctor.newInstance(new Path(path), context)
|
||||
} else {
|
||||
// The specified output committer is just an OutputCommitter.
|
||||
// So, we will use the no-argument constructor.
|
||||
val ctor = clazz.getDeclaredConstructor()
|
||||
committer = ctor.newInstance()
|
||||
}
|
||||
}
|
||||
}
|
||||
logInfo(s"Using output committer class ${committer.getClass.getCanonicalName}")
|
||||
committer
|
||||
}
|
||||
}
|
|
@ -20,9 +20,10 @@ package org.apache.spark.sql.execution.streaming
|
|||
import org.apache.hadoop.fs.Path
|
||||
|
||||
import org.apache.spark.internal.Logging
|
||||
import org.apache.spark.internal.io.FileCommitProtocol
|
||||
import org.apache.spark.sql.{DataFrame, SparkSession}
|
||||
import org.apache.spark.sql.catalyst.expressions._
|
||||
import org.apache.spark.sql.execution.datasources.{FileCommitProtocol, FileFormat, FileFormatWriter}
|
||||
import org.apache.spark.sql.execution.datasources.{FileFormat, FileFormatWriter}
|
||||
|
||||
object FileStreamSink {
|
||||
// The name of the subdirectory that is used to store metadata about which files are valid.
|
||||
|
@ -54,7 +55,11 @@ class FileStreamSink(
|
|||
logInfo(s"Skipping already committed batch $batchId")
|
||||
} else {
|
||||
val committer = FileCommitProtocol.instantiate(
|
||||
sparkSession.sessionState.conf.streamingFileCommitProtocolClass, path, isAppend = false)
|
||||
className = sparkSession.sessionState.conf.streamingFileCommitProtocolClass,
|
||||
jobId = batchId.toString,
|
||||
outputPath = path,
|
||||
isAppend = false)
|
||||
|
||||
committer match {
|
||||
case manifestCommitter: ManifestFileCommitProtocol =>
|
||||
manifestCommitter.setupManifestOptions(fileLog, batchId)
|
||||
|
|
|
@ -25,8 +25,8 @@ import org.apache.hadoop.fs.Path
|
|||
import org.apache.hadoop.mapreduce.{JobContext, TaskAttemptContext}
|
||||
|
||||
import org.apache.spark.internal.Logging
|
||||
import org.apache.spark.sql.execution.datasources.FileCommitProtocol
|
||||
import org.apache.spark.sql.execution.datasources.FileCommitProtocol.TaskCommitMessage
|
||||
import org.apache.spark.internal.io.FileCommitProtocol
|
||||
import org.apache.spark.internal.io.FileCommitProtocol.TaskCommitMessage
|
||||
|
||||
/**
|
||||
* A [[FileCommitProtocol]] that tracks the list of valid files in a manifest file, used in
|
||||
|
@ -34,7 +34,7 @@ import org.apache.spark.sql.execution.datasources.FileCommitProtocol.TaskCommitM
|
|||
*
|
||||
* @param path path to write the final output to.
|
||||
*/
|
||||
class ManifestFileCommitProtocol(path: String)
|
||||
class ManifestFileCommitProtocol(jobId: String, path: String)
|
||||
extends FileCommitProtocol with Serializable with Logging {
|
||||
|
||||
// Track the list of files added by a task, only used on the executors.
|
||||
|
|
|
@ -30,7 +30,7 @@ import org.apache.spark.internal.Logging
|
|||
import org.apache.spark.internal.config._
|
||||
import org.apache.spark.network.util.ByteUnit
|
||||
import org.apache.spark.sql.catalyst.CatalystConf
|
||||
import org.apache.spark.sql.execution.datasources.HadoopCommitProtocolWrapper
|
||||
import org.apache.spark.sql.execution.datasources.SQLHadoopMapReduceCommitProtocol
|
||||
import org.apache.spark.sql.execution.streaming.ManifestFileCommitProtocol
|
||||
import org.apache.spark.util.Utils
|
||||
|
||||
|
@ -385,7 +385,7 @@ object SQLConf {
|
|||
SQLConfigBuilder("spark.sql.sources.commitProtocolClass")
|
||||
.internal()
|
||||
.stringConf
|
||||
.createWithDefault(classOf[HadoopCommitProtocolWrapper].getName)
|
||||
.createWithDefault(classOf[SQLHadoopMapReduceCommitProtocol].getName)
|
||||
|
||||
val PARALLEL_PARTITION_DISCOVERY_THRESHOLD =
|
||||
SQLConfigBuilder("spark.sql.sources.parallelPartitionDiscovery.threshold")
|
||||
|
|
Loading…
Reference in a new issue