[SPARK-23977][SQL] Support High Performance S3A committers [test-hadoop3.2]

This patch adds the binding classes to enable spark to switch dataframe output to using the S3A zero-rename committers shipping in Hadoop 3.1+. It adds a source tree into the hadoop-cloud-storage module which only compiles with the hadoop-3.2 profile, and contains a binding for normal output and a specific bridge class for Parquet (as the parquet output format requires a subclass of `ParquetOutputCommitter`.

Commit algorithms are a critical topic. There's no formal proof of correctness, but the algorithms are documented an analysed in [A Zero Rename Committer](https://github.com/steveloughran/zero-rename-committer/releases). This also reviews the classic v1 and v2 algorithms, IBM's swift committer and the one from EMRFS which they admit was based on the concepts implemented here.

Test-wise

* There's a public set of scala test suites [on github](https://github.com/hortonworks-spark/cloud-integration)
* We have run integration tests against Spark on Yarn clusters.
* This code has been shipping for ~12 months in HDP-3.x.

Closes #24970 from steveloughran/cloud/SPARK-23977-s3a-committer.

Authored-by: Steve Loughran <stevel@cloudera.com>
Signed-off-by: Marcelo Vanzin <vanzin@cloudera.com>
This commit is contained in:
Steve Loughran 2019-08-15 09:38:31 -07:00 committed by Marcelo Vanzin
parent ba5ee27706
commit 2ac6163a5d
9 changed files with 741 additions and 18 deletions

View file

@ -42,7 +42,7 @@ import org.apache.spark.util.Utils
* 3. When all necessary tasks completed successfully, the driver calls commitJob. If the job
* failed to execute (e.g. too many failed tasks), the job should call abortJob.
*/
abstract class FileCommitProtocol {
abstract class FileCommitProtocol extends Logging {
import FileCommitProtocol._
/**
@ -129,7 +129,9 @@ abstract class FileCommitProtocol {
* before the job has finished. These same task commit messages will be passed to commitJob()
* if the entire job succeeds.
*/
def onTaskCommit(taskCommit: TaskCommitMessage): Unit = {}
def onTaskCommit(taskCommit: TaskCommitMessage): Unit = {
logDebug(s"onTaskCommit($taskCommit)")
}
}

View file

@ -17,6 +17,7 @@
package org.apache.spark.internal.io
import java.io.IOException
import java.util.{Date, UUID}
import scala.collection.mutable
@ -136,7 +137,7 @@ class HadoopMapReduceCommitProtocol(
tmpOutputPath
}
private def getFilename(taskContext: TaskAttemptContext, ext: String): String = {
protected def getFilename(taskContext: TaskAttemptContext, ext: String): String = {
// The file name looks like part-00000-2dd664f9-d2c4-4ffe-878f-c6c70c1fb0cb_00003-c000.parquet
// Note that %05d does not truncate the split number, so if we have more than 100000 tasks,
// the file name is fine and won't overflow.
@ -205,12 +206,29 @@ class HadoopMapReduceCommitProtocol(
}
}
/**
* Abort the job; log and ignore any IO exception thrown.
* This is invariably invoked in an exception handler; raising
* an exception here will lose the root cause of the failure.
*
* @param jobContext job context
*/
override def abortJob(jobContext: JobContext): Unit = {
try {
committer.abortJob(jobContext, JobStatus.State.FAILED)
} catch {
case e: IOException =>
logWarning(s"Exception while aborting ${jobContext.getJobID}", e)
}
try {
if (hasValidPath) {
val fs = stagingDir.getFileSystem(jobContext.getConfiguration)
fs.delete(stagingDir, true)
}
} catch {
case e: IOException =>
logWarning(s"Exception while aborting ${jobContext.getJobID}", e)
}
}
override def setupTask(taskContext: TaskAttemptContext): Unit = {
@ -222,17 +240,35 @@ class HadoopMapReduceCommitProtocol(
override def commitTask(taskContext: TaskAttemptContext): TaskCommitMessage = {
val attemptId = taskContext.getTaskAttemptID
logTrace(s"Commit task ${attemptId}")
SparkHadoopMapRedUtil.commitTask(
committer, taskContext, attemptId.getJobID.getId, attemptId.getTaskID.getId)
new TaskCommitMessage(addedAbsPathFiles.toMap -> partitionPaths.toSet)
}
/**
* Abort the task; log and ignore any failure thrown.
* This is invariably invoked in an exception handler; raising
* an exception here will lose the root cause of the failure.
*
* @param taskContext context
*/
override def abortTask(taskContext: TaskAttemptContext): Unit = {
try {
committer.abortTask(taskContext)
} catch {
case e: IOException =>
logWarning(s"Exception while aborting ${taskContext.getTaskAttemptID}", e)
}
// best effort cleanup of other staged files
try {
for ((src, _) <- addedAbsPathFiles) {
val tmp = new Path(src)
tmp.getFileSystem(taskContext.getConfiguration).delete(tmp, false)
}
} catch {
case e: IOException =>
logWarning(s"Exception while aborting ${taskContext.getTaskAttemptID}", e)
}
}
}

View file

@ -125,7 +125,7 @@ consult the relevant documentation.
### Recommended settings for writing to object stores
For object stores whose consistency model means that rename-based commits are safe
use the `FileOutputCommitter` v2 algorithm for performance:
use the `FileOutputCommitter` v2 algorithm for performance; v1 for safety.
```
spark.hadoop.mapreduce.fileoutputcommitter.algorithm.version 2
@ -143,8 +143,30 @@ job failure:
spark.hadoop.mapreduce.fileoutputcommitter.cleanup-failures.ignored true
```
The original v1 commit algorithm renames the output of successful tasks
to a job attempt directory, and then renames all the files in that directory
into the final destination during the job commit phase:
```
spark.hadoop.mapreduce.fileoutputcommitter.algorithm.version 1
```
The slow performance of mimicked renames on Amazon S3 makes this algorithm
very, very slow. The recommended solution to this is switch to an S3 "Zero Rename"
committer (see below).
For reference, here are the performance and safety characteristics of
different stores and connectors when renaming directories:
| Store | Connector | Directory Rename Safety | Rename Performance |
|---------------|-----------|-------------------------|--------------------|
| Amazon S3 | s3a | Unsafe | O(data) |
| Azure Storage | wasb | Safe | O(files) |
| Azure Datalake Gen 2 | abfs | Safe | O(1) |
| Google GCS | gs | Safe | O(1) |
As storing temporary files can run up charges; delete
directories called `"_temporary"` on a regular basis to avoid this.
directories called `"_temporary"` on a regular basis.
### Parquet I/O Settings
@ -190,15 +212,49 @@ while they are still being written. Applications can write straight to the monit
atomic `rename()` operation.
Otherwise the checkpointing may be slow and potentially unreliable.
## Committing work into cloud storage safely and fast.
As covered earlier, commit-by-rename is dangerous on any object store which
exhibits eventual consistency (example: S3), and often slower than classic
filesystem renames.
Some object store connectors provide custom committers to commit tasks and
jobs without using rename. In versions of Spark built with Hadoop 3.1 or later,
the S3A connector for AWS S3 is such a committer.
Instead of writing data to a temporary directory on the store for renaming,
these committers write the files to the final destination, but do not issue
the final POST command to make a large "multi-part" upload visible. Those
operations are postponed until the job commit itself. As a result, task and
job commit are much faster, and task failures do not affect the result.
To switch to the S3A committers, use a version of Spark was built with Hadoop
3.1 or later, and switch the committers through the following options.
```
spark.hadoop.fs.s3a.committer.name directory
spark.sql.sources.commitProtocolClass org.apache.spark.internal.io.cloud.PathOutputCommitProtocol
spark.sql.parquet.output.committer.class org.apache.spark.internal.io.cloud.BindingParquetOutputCommitter
```
It has been tested with the most common formats supported by Spark.
```python
mydataframe.write.format("parquet").save("s3a://bucket/destination")
```
More details on these committers can be found in the latest Hadoop documentation.
## Further Reading
Here is the documentation on the standard connectors both from Apache and the cloud providers.
* [OpenStack Swift](https://hadoop.apache.org/docs/current/hadoop-openstack/index.html). Hadoop 2.6+
* [Azure Blob Storage](https://hadoop.apache.org/docs/current/hadoop-aws/tools/hadoop-aws/index.html). Since Hadoop 2.7
* [Azure Data Lake](https://hadoop.apache.org/docs/current/hadoop-azure-datalake/index.html). Since Hadoop 2.8
* [Amazon S3 via S3A and S3N](https://hadoop.apache.org/docs/current/hadoop-aws/tools/hadoop-aws/index.html). Hadoop 2.6+
* [OpenStack Swift](https://hadoop.apache.org/docs/current/hadoop-openstack/index.html).
* [Azure Blob Storage and Azure Datalake Gen 2](https://hadoop.apache.org/docs/current/hadoop-aws/tools/hadoop-aws/index.html).
* [Azure Data Lake Gen 1](https://hadoop.apache.org/docs/current/hadoop-azure-datalake/index.html).
* [Hadoop-AWS module (Hadoop 3.x)](https://hadoop.apache.org/docs/current3/hadoop-aws/tools/hadoop-aws/index.html).
* [Amazon S3 via S3A and S3N (Hadoop 2.x)](https://hadoop.apache.org/docs/current/hadoop-aws/tools/hadoop-aws/index.html).
* [Amazon EMR File System (EMRFS)](https://docs.aws.amazon.com/emr/latest/ManagementGuide/emr-fs.html). From Amazon
* [Google Cloud Storage Connector for Spark and Hadoop](https://cloud.google.com/hadoop/google-cloud-storage-connector). From Google
* [The Azure Blob Filesystem driver (ABFS)](https://docs.microsoft.com/en-us/azure/storage/blobs/data-lake-storage-abfs-driver)

View file

@ -198,6 +198,45 @@
-->
<profile>
<id>hadoop-3.2</id>
<properties>
<extra.source.dir>src/hadoop-3/main/scala</extra.source.dir>
<extra.testsource.dir>src/hadoop-3/test/scala</extra.testsource.dir>
</properties>
<build>
<plugins>
<plugin>
<groupId>org.codehaus.mojo</groupId>
<artifactId>build-helper-maven-plugin</artifactId>
<executions>
<execution>
<id>add-scala-sources</id>
<phase>generate-sources</phase>
<goals>
<goal>add-source</goal>
</goals>
<configuration>
<sources>
<source>${extra.source.dir}</source>
</sources>
</configuration>
</execution>
<execution>
<id>add-scala-test-sources</id>
<phase>generate-test-sources</phase>
<goals>
<goal>add-test-source</goal>
</goals>
<configuration>
<sources>
<source>${extra.testsource.dir}</source>
</sources>
</configuration>
</execution>
</executions>
</plugin>
</plugins>
</build>
<dependencies>
<!--
There's now a hadoop-cloud-storage which transitively pulls in the store JARs,

View file

@ -0,0 +1,122 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.spark.internal.io.cloud
import java.io.IOException
import org.apache.hadoop.fs.Path
import org.apache.hadoop.mapreduce.{JobContext, JobStatus, TaskAttemptContext}
import org.apache.hadoop.mapreduce.lib.output.{BindingPathOutputCommitter, PathOutputCommitter}
import org.apache.parquet.hadoop.ParquetOutputCommitter
import org.apache.spark.internal.Logging
/**
* This Parquet Committer subclass dynamically binds to the factory-configured
* output committer, and is intended to allow callers to use any 'PathOutputCommitter',
* even if not a subclass of 'ParquetOutputCommitter'.
*
* The Parquet `parquet.enable.summary-metadata` option will only be supported
* if the instantiated committer itself supports it.
*/
class BindingParquetOutputCommitter(
path: Path,
context: TaskAttemptContext)
extends ParquetOutputCommitter(path, context) with Logging {
logTrace(s"${this.getClass.getName} binding to configured PathOutputCommitter and dest $path")
private val committer = new BindingPathOutputCommitter(path, context)
/**
* This is the committer ultimately bound to.
* @return the committer instantiated by the factory.
*/
private[cloud] def boundCommitter(): PathOutputCommitter = {
committer.getCommitter
}
override def getWorkPath(): Path = {
committer.getWorkPath()
}
override def setupTask(taskAttemptContext: TaskAttemptContext): Unit = {
committer.setupTask(taskAttemptContext)
}
override def commitTask(taskAttemptContext: TaskAttemptContext): Unit = {
committer.commitTask(taskAttemptContext)
}
override def abortTask(taskAttemptContext: TaskAttemptContext): Unit = {
committer.abortTask(taskAttemptContext)
}
override def setupJob(jobContext: JobContext): Unit = {
committer.setupJob(jobContext)
}
override def needsTaskCommit(taskAttemptContext: TaskAttemptContext): Boolean = {
committer.needsTaskCommit(taskAttemptContext)
}
override def cleanupJob(jobContext: JobContext): Unit = {
committer.cleanupJob(jobContext)
}
override def isCommitJobRepeatable(jobContext: JobContext): Boolean = {
committer.isCommitJobRepeatable(jobContext)
}
override def commitJob(jobContext: JobContext): Unit = {
committer.commitJob(jobContext)
}
override def recoverTask(taskAttemptContext: TaskAttemptContext): Unit = {
committer.recoverTask(taskAttemptContext)
}
/**
* Abort the job; log and ignore any IO exception thrown.
* This is invariably invoked in an exception handler; raising
* an exception here will lose the root cause of the failure.
*
* @param jobContext job context
* @param state final state of the job
*/
override def abortJob(jobContext: JobContext, state: JobStatus.State): Unit = {
try {
committer.abortJob(jobContext, state)
} catch {
case e: IOException =>
// swallow exception to avoid problems when called within exception
// handlers
logWarning("Abort job failed", e)
}
}
override def isRecoverySupported: Boolean = {
committer.isRecoverySupported()
}
override def isRecoverySupported(jobContext: JobContext): Boolean = {
committer.isRecoverySupported(jobContext)
}
override def toString: String = s"BindingParquetOutputCommitter($committer)"
}

View file

@ -0,0 +1,166 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.spark.internal.io.cloud
import java.io.IOException
import org.apache.hadoop.fs.Path
import org.apache.hadoop.mapreduce.TaskAttemptContext
import org.apache.hadoop.mapreduce.lib.output.{FileOutputCommitter, PathOutputCommitter, PathOutputCommitterFactory}
import org.apache.spark.internal.io.HadoopMapReduceCommitProtocol
/**
* Spark Commit protocol for Path Output Committers.
* This committer will work with the `FileOutputCommitter` and subclasses.
* All implementations *must* be serializable.
*
* Rather than ask the `FileOutputFormat` for a committer, it uses the
* `org.apache.hadoop.mapreduce.lib.output.PathOutputCommitterFactory` factory
* API to create the committer.
*
* In `setupCommitter` the factory is identified and instantiated;
* this factory then creates the actual committer implementation.
*
* @constructor Instantiate. dynamic partition overwrite is not supported,
* so that committers for stores which do not support rename
* will not get confused.
* @param jobId job
* @param dest destination
* @param dynamicPartitionOverwrite does the caller want support for dynamic
* partition overwrite. If so, it will be
* refused.
*/
class PathOutputCommitProtocol(
jobId: String,
dest: String,
dynamicPartitionOverwrite: Boolean = false)
extends HadoopMapReduceCommitProtocol(jobId, dest, false) with Serializable {
if (dynamicPartitionOverwrite) {
// until there's explicit extensions to the PathOutputCommitProtocols
// to support the spark mechanism, it's left to the individual committer
// choice to handle partitioning.
throw new IOException(PathOutputCommitProtocol.UNSUPPORTED)
}
/** The committer created. */
@transient private var committer: PathOutputCommitter = _
require(dest != null, "Null destination specified")
private[cloud] val destination: String = dest
/** The destination path. This is serializable in Hadoop 3. */
private[cloud] val destPath: Path = new Path(destination)
logTrace(s"Instantiated committer with job ID=$jobId;" +
s" destination=$destPath;" +
s" dynamicPartitionOverwrite=$dynamicPartitionOverwrite")
import PathOutputCommitProtocol._
/**
* Set up the committer.
* This creates it by talking directly to the Hadoop factories, instead
* of the V1 `mapred.FileOutputFormat` methods.
* @param context task attempt
* @return the committer to use. This will always be a subclass of
* `PathOutputCommitter`.
*/
override protected def setupCommitter(context: TaskAttemptContext): PathOutputCommitter = {
logTrace(s"Setting up committer for path $destination")
committer = PathOutputCommitterFactory.createCommitter(destPath, context)
// Special feature to force out the FileOutputCommitter, so as to guarantee
// that the binding is working properly.
val rejectFileOutput = context.getConfiguration
.getBoolean(REJECT_FILE_OUTPUT, REJECT_FILE_OUTPUT_DEFVAL)
if (rejectFileOutput && committer.isInstanceOf[FileOutputCommitter]) {
// the output format returned a file output format committer, which
// is exactly what we do not want. So switch back to the factory.
val factory = PathOutputCommitterFactory.getCommitterFactory(
destPath,
context.getConfiguration)
logTrace(s"Using committer factory $factory")
committer = factory.createOutputCommitter(destPath, context)
}
logTrace(s"Using committer ${committer.getClass}")
logTrace(s"Committer details: $committer")
if (committer.isInstanceOf[FileOutputCommitter]) {
require(!rejectFileOutput,
s"Committer created is the FileOutputCommitter $committer")
if (committer.isCommitJobRepeatable(context)) {
// If FileOutputCommitter says its job commit is repeatable, it means
// it is using the v2 algorithm, which is not safe for task commit
// failures. Warn
logTrace(s"Committer $committer may not be tolerant of task commit failures")
}
}
committer
}
/**
* Create a temporary file for a task.
*
* @param taskContext task context
* @param dir optional subdirectory
* @param ext file extension
* @return a path as a string
*/
override def newTaskTempFile(
taskContext: TaskAttemptContext,
dir: Option[String],
ext: String): String = {
val workDir = committer.getWorkPath
val parent = dir.map {
d => new Path(workDir, d)
}.getOrElse(workDir)
val file = new Path(parent, getFilename(taskContext, ext))
logTrace(s"Creating task file $file for dir $dir and ext $ext")
file.toString
}
}
object PathOutputCommitProtocol {
/**
* Hadoop configuration option.
* Fail fast if the committer is using the path output protocol.
* This option can be used to catch configuration issues early.
*
* It's mostly relevant when testing/diagnostics, as it can be used to
* enforce that schema-specific options are triggering a switch
* to a new committer.
*/
val REJECT_FILE_OUTPUT = "pathoutputcommit.reject.fileoutput"
/**
* Default behavior: accept the file output.
*/
val REJECT_FILE_OUTPUT_DEFVAL = false
/** Error string for tests. */
private[cloud] val UNSUPPORTED: String = "PathOutputCommitProtocol does not support" +
" dynamicPartitionOverwrite"
}

View file

@ -0,0 +1,146 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.spark.internal.io.cloud
import java.io.{File, FileInputStream, FileOutputStream, IOException, ObjectInputStream, ObjectOutputStream}
import java.lang.reflect.InvocationTargetException
import org.apache.hadoop.conf.Configuration
import org.apache.hadoop.fs.Path
import org.apache.hadoop.io.IOUtils
import org.apache.hadoop.mapreduce.{Job, JobStatus, MRJobConfig, TaskAttemptID}
import org.apache.hadoop.mapreduce.lib.output.FileOutputFormat
import org.apache.hadoop.mapreduce.task.TaskAttemptContextImpl
import org.apache.spark.SparkFunSuite
import org.apache.spark.internal.io.FileCommitProtocol
class CommitterBindingSuite extends SparkFunSuite {
private val jobId = "2007071202143_0101"
private val taskAttempt0 = "attempt_" + jobId + "_m_000000_0"
private val taskAttemptId0 = TaskAttemptID.forName(taskAttempt0)
/**
* The classname to use when referring to the path output committer.
*/
private val pathCommitProtocolClassname: String = classOf[PathOutputCommitProtocol].getName
/** hadoop-mapreduce option to enable the _SUCCESS marker. */
private val successMarker = "mapreduce.fileoutputcommitter.marksuccessfuljobs"
/**
* Does the
* [[BindingParquetOutputCommitter]] committer bind to the schema-specific
* committer declared for the destination path? And that lifecycle events
* are correctly propagated?
*/
test("BindingParquetOutputCommitter binds to the inner committer") {
val path = new Path("http://example/data")
val job = newJob(path)
val conf = job.getConfiguration
conf.set(MRJobConfig.TASK_ATTEMPT_ID, taskAttempt0)
conf.setInt(MRJobConfig.APPLICATION_ATTEMPT_ID, 1)
StubPathOutputCommitterFactory.bind(conf, "http")
val tContext = new TaskAttemptContextImpl(conf, taskAttemptId0)
val parquet = new BindingParquetOutputCommitter(path, tContext)
val inner = parquet.boundCommitter.asInstanceOf[StubPathOutputCommitter]
parquet.setupJob(tContext)
assert(inner.jobSetup, s"$inner job not setup")
parquet.setupTask(tContext)
assert(inner.taskSetup, s"$inner task not setup")
assert(parquet.needsTaskCommit(tContext), "needsTaskCommit false")
inner.needsTaskCommit = false
assert(!parquet.needsTaskCommit(tContext), "needsTaskCommit true")
parquet.commitTask(tContext)
assert(inner.taskCommitted, s"$inner task not committed")
parquet.abortTask(tContext)
assert(inner.taskAborted, s"$inner task not aborted")
parquet.commitJob(tContext)
assert(inner.jobCommitted, s"$inner job not committed")
parquet.abortJob(tContext, JobStatus.State.RUNNING)
assert(inner.jobAborted, s"$inner job not aborted")
}
/**
* Create a a new job. Sets the task attempt ID.
*
* @return the new job
*/
def newJob(outDir: Path): Job = {
val job = Job.getInstance(new Configuration())
val conf = job.getConfiguration
conf.set(MRJobConfig.TASK_ATTEMPT_ID, taskAttempt0)
conf.setBoolean(successMarker, true)
FileOutputFormat.setOutputPath(job, outDir)
job
}
test("committer protocol can be serialized and deserialized") {
val tempDir = File.createTempFile("ser", ".bin")
tempDir.delete()
val committer = new PathOutputCommitProtocol(jobId, tempDir.toURI.toString, false)
val serData = File.createTempFile("ser", ".bin")
var out: ObjectOutputStream = null
var in: ObjectInputStream = null
try {
out = new ObjectOutputStream(new FileOutputStream(serData))
out.writeObject(committer)
out.close
in = new ObjectInputStream(new FileInputStream(serData))
val result = in.readObject()
val committer2 = result.asInstanceOf[PathOutputCommitProtocol]
assert(committer.destination === committer2.destination,
"destination mismatch on round trip")
assert(committer.destPath === committer2.destPath,
"destPath mismatch on round trip")
} finally {
IOUtils.closeStreams(out, in)
serData.delete()
}
}
test("local filesystem instantiation") {
val instance = FileCommitProtocol.instantiate(
pathCommitProtocolClassname,
jobId, "file:///tmp", false)
val protocol = instance.asInstanceOf[PathOutputCommitProtocol]
assert("file:///tmp" === protocol.destination)
}
test("reject dynamic partitioning") {
val cause = intercept[InvocationTargetException] {
FileCommitProtocol.instantiate(
pathCommitProtocolClassname,
jobId, "file:///tmp", true)
}.getCause
if (cause == null || !cause.isInstanceOf[IOException]
|| !cause.getMessage.contains(PathOutputCommitProtocol.UNSUPPORTED)) {
throw cause
}
}
}

View file

@ -0,0 +1,120 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.spark.internal.io.cloud
import org.apache.hadoop.conf.Configuration
import org.apache.hadoop.fs.Path
import org.apache.hadoop.mapreduce.{JobContext, JobStatus, TaskAttemptContext}
import org.apache.hadoop.mapreduce.lib.output.{PathOutputCommitter, PathOutputCommitterFactory}
/**
* A local path output committer which tracks its state, for use in tests.
* @param outputPath final destination.
* @param workPath work path
* @param context task/job attempt.
*/
class StubPathOutputCommitter(
outputPath: Path,
workPath: Path,
context: TaskAttemptContext) extends PathOutputCommitter(workPath, context) {
var jobSetup: Boolean = false
var jobCommitted: Boolean = false
var jobAborted: Boolean = false
var taskSetup: Boolean = false
var taskCommitted: Boolean = false
var taskAborted: Boolean = false
var needsTaskCommit: Boolean = true
override def getOutputPath: Path = outputPath
override def getWorkPath: Path = {
workPath
}
override def setupTask(taskAttemptContext: TaskAttemptContext): Unit = {
taskSetup = true
}
override def abortTask(taskAttemptContext: TaskAttemptContext): Unit = {
taskAborted = true
}
override def commitTask(taskAttemptContext: TaskAttemptContext): Unit = {
taskCommitted = true
}
override def setupJob(jobContext: JobContext): Unit = {
jobSetup = true
}
override def commitJob(jobContext: JobContext): Unit = {
jobCommitted = true
}
override def abortJob(
jobContext: JobContext,
state: JobStatus.State): Unit = {
jobAborted = true
}
override def needsTaskCommit(taskAttemptContext: TaskAttemptContext): Boolean = {
needsTaskCommit
}
override def toString(): String = s"StubPathOutputCommitter(setup=$jobSetup," +
s" committed=$jobCommitted, aborted=$jobAborted)"
}
class StubPathOutputCommitterFactory extends PathOutputCommitterFactory {
override def createOutputCommitter(
outputPath: Path,
context: TaskAttemptContext): PathOutputCommitter = {
new StubPathOutputCommitter(outputPath, workPath(outputPath), context)
}
private def workPath(out: Path): Path = new Path(out,
StubPathOutputCommitterFactory.TEMP_DIR_NAME)
}
object StubPathOutputCommitterFactory {
/**
* This is the "Pending" directory of the FileOutputCommitter;
* data written here is, in that algorithm, renamed into place.
*/
val TEMP_DIR_NAME = "_temporary"
/**
* Scheme prefix for per-filesystem scheme committers.
*/
val OUTPUTCOMMITTER_FACTORY_SCHEME = "mapreduce.outputcommitter.factory.scheme"
/**
* Given a hadoop configuration, set up the factory binding for the scheme.
* @param conf config to patch
* @param scheme filesystem scheme.
*/
def bind(conf: Configuration, scheme: String): Unit = {
val key = OUTPUTCOMMITTER_FACTORY_SCHEME + "." + scheme
conf.set(key, classOf[StubPathOutputCommitterFactory].getName())
}
}

View file

@ -0,0 +1,36 @@
#
# Licensed to the Apache Software Foundation (ASF) under one or more
# contributor license agreements. See the NOTICE file distributed with
# this work for additional information regarding copyright ownership.
# The ASF licenses this file to You under the Apache License, Version 2.0
# (the "License"); you may not use this file except in compliance with
# the License. You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.
#
# Set everything to be logged to the file target/unit-tests.log
test.appender=file
log4j.rootCategory=INFO, ${test.appender}
log4j.appender.file=org.apache.log4j.FileAppender
log4j.appender.file.append=true
log4j.appender.file.file=target/unit-tests.log
log4j.appender.file.layout=org.apache.log4j.PatternLayout
log4j.appender.file.layout.ConversionPattern=%d{yy/MM/dd HH:mm:ss.SSS} %t %p %c{1}: %m%n
# Tests that launch java subprocesses can set the "test.appender" system property to
# "console" to avoid having the child process's logs overwrite the unit test's
# log file.
log4j.appender.console=org.apache.log4j.ConsoleAppender
log4j.appender.console.target=System.err
log4j.appender.console.layout=org.apache.log4j.PatternLayout
log4j.appender.console.layout.ConversionPattern=%t: %m%n
# Ignore messages below warning level from Jetty, because it's a bit verbose
log4j.logger.org.spark_project.jetty=WARN