diff --git a/core/src/main/scala/org/apache/spark/internal/io/FileCommitProtocol.scala b/core/src/main/scala/org/apache/spark/internal/io/FileCommitProtocol.scala index 854093851f..0746e43bab 100644 --- a/core/src/main/scala/org/apache/spark/internal/io/FileCommitProtocol.scala +++ b/core/src/main/scala/org/apache/spark/internal/io/FileCommitProtocol.scala @@ -42,7 +42,7 @@ import org.apache.spark.util.Utils * 3. When all necessary tasks completed successfully, the driver calls commitJob. If the job * failed to execute (e.g. too many failed tasks), the job should call abortJob. */ -abstract class FileCommitProtocol { +abstract class FileCommitProtocol extends Logging { import FileCommitProtocol._ /** @@ -129,7 +129,9 @@ abstract class FileCommitProtocol { * before the job has finished. These same task commit messages will be passed to commitJob() * if the entire job succeeds. */ - def onTaskCommit(taskCommit: TaskCommitMessage): Unit = {} + def onTaskCommit(taskCommit: TaskCommitMessage): Unit = { + logDebug(s"onTaskCommit($taskCommit)") + } } diff --git a/core/src/main/scala/org/apache/spark/internal/io/HadoopMapReduceCommitProtocol.scala b/core/src/main/scala/org/apache/spark/internal/io/HadoopMapReduceCommitProtocol.scala index 7477e03bfa..11ce608f52 100644 --- a/core/src/main/scala/org/apache/spark/internal/io/HadoopMapReduceCommitProtocol.scala +++ b/core/src/main/scala/org/apache/spark/internal/io/HadoopMapReduceCommitProtocol.scala @@ -17,6 +17,7 @@ package org.apache.spark.internal.io +import java.io.IOException import java.util.{Date, UUID} import scala.collection.mutable @@ -136,7 +137,7 @@ class HadoopMapReduceCommitProtocol( tmpOutputPath } - private def getFilename(taskContext: TaskAttemptContext, ext: String): String = { + protected def getFilename(taskContext: TaskAttemptContext, ext: String): String = { // The file name looks like part-00000-2dd664f9-d2c4-4ffe-878f-c6c70c1fb0cb_00003-c000.parquet // Note that %05d does not truncate the split number, so if we have more than 100000 tasks, // the file name is fine and won't overflow. @@ -205,11 +206,28 @@ class HadoopMapReduceCommitProtocol( } } + /** + * Abort the job; log and ignore any IO exception thrown. + * This is invariably invoked in an exception handler; raising + * an exception here will lose the root cause of the failure. + * + * @param jobContext job context + */ override def abortJob(jobContext: JobContext): Unit = { - committer.abortJob(jobContext, JobStatus.State.FAILED) - if (hasValidPath) { - val fs = stagingDir.getFileSystem(jobContext.getConfiguration) - fs.delete(stagingDir, true) + try { + committer.abortJob(jobContext, JobStatus.State.FAILED) + } catch { + case e: IOException => + logWarning(s"Exception while aborting ${jobContext.getJobID}", e) + } + try { + if (hasValidPath) { + val fs = stagingDir.getFileSystem(jobContext.getConfiguration) + fs.delete(stagingDir, true) + } + } catch { + case e: IOException => + logWarning(s"Exception while aborting ${jobContext.getJobID}", e) } } @@ -222,17 +240,35 @@ class HadoopMapReduceCommitProtocol( override def commitTask(taskContext: TaskAttemptContext): TaskCommitMessage = { val attemptId = taskContext.getTaskAttemptID + logTrace(s"Commit task ${attemptId}") SparkHadoopMapRedUtil.commitTask( committer, taskContext, attemptId.getJobID.getId, attemptId.getTaskID.getId) new TaskCommitMessage(addedAbsPathFiles.toMap -> partitionPaths.toSet) } + /** + * Abort the task; log and ignore any failure thrown. + * This is invariably invoked in an exception handler; raising + * an exception here will lose the root cause of the failure. + * + * @param taskContext context + */ override def abortTask(taskContext: TaskAttemptContext): Unit = { - committer.abortTask(taskContext) + try { + committer.abortTask(taskContext) + } catch { + case e: IOException => + logWarning(s"Exception while aborting ${taskContext.getTaskAttemptID}", e) + } // best effort cleanup of other staged files - for ((src, _) <- addedAbsPathFiles) { - val tmp = new Path(src) - tmp.getFileSystem(taskContext.getConfiguration).delete(tmp, false) + try { + for ((src, _) <- addedAbsPathFiles) { + val tmp = new Path(src) + tmp.getFileSystem(taskContext.getConfiguration).delete(tmp, false) + } + } catch { + case e: IOException => + logWarning(s"Exception while aborting ${taskContext.getTaskAttemptID}", e) } } } diff --git a/docs/cloud-integration.md b/docs/cloud-integration.md index b64ffe55d8..a8d40fe745 100644 --- a/docs/cloud-integration.md +++ b/docs/cloud-integration.md @@ -125,7 +125,7 @@ consult the relevant documentation. ### Recommended settings for writing to object stores For object stores whose consistency model means that rename-based commits are safe -use the `FileOutputCommitter` v2 algorithm for performance: +use the `FileOutputCommitter` v2 algorithm for performance; v1 for safety. ``` spark.hadoop.mapreduce.fileoutputcommitter.algorithm.version 2 @@ -143,8 +143,30 @@ job failure: spark.hadoop.mapreduce.fileoutputcommitter.cleanup-failures.ignored true ``` +The original v1 commit algorithm renames the output of successful tasks +to a job attempt directory, and then renames all the files in that directory +into the final destination during the job commit phase: + +``` +spark.hadoop.mapreduce.fileoutputcommitter.algorithm.version 1 +``` + +The slow performance of mimicked renames on Amazon S3 makes this algorithm +very, very slow. The recommended solution to this is switch to an S3 "Zero Rename" +committer (see below). + +For reference, here are the performance and safety characteristics of +different stores and connectors when renaming directories: + +| Store | Connector | Directory Rename Safety | Rename Performance | +|---------------|-----------|-------------------------|--------------------| +| Amazon S3 | s3a | Unsafe | O(data) | +| Azure Storage | wasb | Safe | O(files) | +| Azure Datalake Gen 2 | abfs | Safe | O(1) | +| Google GCS | gs | Safe | O(1) | + As storing temporary files can run up charges; delete -directories called `"_temporary"` on a regular basis to avoid this. +directories called `"_temporary"` on a regular basis. ### Parquet I/O Settings @@ -190,15 +212,49 @@ while they are still being written. Applications can write straight to the monit atomic `rename()` operation. Otherwise the checkpointing may be slow and potentially unreliable. +## Committing work into cloud storage safely and fast. + +As covered earlier, commit-by-rename is dangerous on any object store which +exhibits eventual consistency (example: S3), and often slower than classic +filesystem renames. + +Some object store connectors provide custom committers to commit tasks and +jobs without using rename. In versions of Spark built with Hadoop 3.1 or later, +the S3A connector for AWS S3 is such a committer. + +Instead of writing data to a temporary directory on the store for renaming, +these committers write the files to the final destination, but do not issue +the final POST command to make a large "multi-part" upload visible. Those +operations are postponed until the job commit itself. As a result, task and +job commit are much faster, and task failures do not affect the result. + +To switch to the S3A committers, use a version of Spark was built with Hadoop +3.1 or later, and switch the committers through the following options. + +``` +spark.hadoop.fs.s3a.committer.name directory +spark.sql.sources.commitProtocolClass org.apache.spark.internal.io.cloud.PathOutputCommitProtocol +spark.sql.parquet.output.committer.class org.apache.spark.internal.io.cloud.BindingParquetOutputCommitter +``` + +It has been tested with the most common formats supported by Spark. + +```python +mydataframe.write.format("parquet").save("s3a://bucket/destination") +``` + +More details on these committers can be found in the latest Hadoop documentation. + ## Further Reading Here is the documentation on the standard connectors both from Apache and the cloud providers. -* [OpenStack Swift](https://hadoop.apache.org/docs/current/hadoop-openstack/index.html). Hadoop 2.6+ -* [Azure Blob Storage](https://hadoop.apache.org/docs/current/hadoop-aws/tools/hadoop-aws/index.html). Since Hadoop 2.7 -* [Azure Data Lake](https://hadoop.apache.org/docs/current/hadoop-azure-datalake/index.html). Since Hadoop 2.8 -* [Amazon S3 via S3A and S3N](https://hadoop.apache.org/docs/current/hadoop-aws/tools/hadoop-aws/index.html). Hadoop 2.6+ +* [OpenStack Swift](https://hadoop.apache.org/docs/current/hadoop-openstack/index.html). +* [Azure Blob Storage and Azure Datalake Gen 2](https://hadoop.apache.org/docs/current/hadoop-aws/tools/hadoop-aws/index.html). +* [Azure Data Lake Gen 1](https://hadoop.apache.org/docs/current/hadoop-azure-datalake/index.html). +* [Hadoop-AWS module (Hadoop 3.x)](https://hadoop.apache.org/docs/current3/hadoop-aws/tools/hadoop-aws/index.html). +* [Amazon S3 via S3A and S3N (Hadoop 2.x)](https://hadoop.apache.org/docs/current/hadoop-aws/tools/hadoop-aws/index.html). * [Amazon EMR File System (EMRFS)](https://docs.aws.amazon.com/emr/latest/ManagementGuide/emr-fs.html). From Amazon * [Google Cloud Storage Connector for Spark and Hadoop](https://cloud.google.com/hadoop/google-cloud-storage-connector). From Google - +* [The Azure Blob Filesystem driver (ABFS)](https://docs.microsoft.com/en-us/azure/storage/blobs/data-lake-storage-abfs-driver) diff --git a/hadoop-cloud/pom.xml b/hadoop-cloud/pom.xml index dbf4b98d5f..31c729c501 100644 --- a/hadoop-cloud/pom.xml +++ b/hadoop-cloud/pom.xml @@ -198,6 +198,45 @@ --> hadoop-3.2 + + src/hadoop-3/main/scala + src/hadoop-3/test/scala + + + + + + org.codehaus.mojo + build-helper-maven-plugin + + + add-scala-sources + generate-sources + + add-source + + + + ${extra.source.dir} + + + + + add-scala-test-sources + generate-test-sources + + add-test-source + + + + ${extra.testsource.dir} + + + + + + +