[SPARK-33019][CORE] Use spark.hadoop.mapreduce.fileoutputcommitter.algorithm.version=1 by default

### What changes were proposed in this pull request?

Apache Spark 3.1's default Hadoop profile is `hadoop-3.2`. Instead of having a warning documentation, this PR aims to use a consistent and safer version of Apache Hadoop file output committer algorithm which is `v1`. This will prevent a silent correctness regression during migration from Apache Spark 2.4/3.0 to Apache Spark 3.1.0. Of course, if there is a user-provided configuration, `spark.hadoop.mapreduce.fileoutputcommitter.algorithm.version=2`, that will be used still.

### Why are the changes needed?

Apache Spark provides multiple distributions with Hadoop 2.7 and Hadoop 3.2. `spark.hadoop.mapreduce.fileoutputcommitter.algorithm.version` depends on the Hadoop version. Apache Hadoop 3.0 switches the default algorithm from `v1` to `v2` and now there exists a discussion to remove `v2`. We had better provide a consistent default behavior of `v1` across various Spark distributions.

- [MAPREDUCE-7282](https://issues.apache.org/jira/browse/MAPREDUCE-7282) MR v2 commit algorithm should be deprecated and not the default

### Does this PR introduce _any_ user-facing change?

Yes. This changes the default behavior. Users can override this conf.

### How was this patch tested?

Manual.

**BEFORE (spark-3.0.1-bin-hadoop3.2)**
```scala
scala> sc.version
res0: String = 3.0.1

scala> sc.hadoopConfiguration.get("mapreduce.fileoutputcommitter.algorithm.version")
res1: String = 2
```

**AFTER**
```scala
scala> sc.hadoopConfiguration.get("mapreduce.fileoutputcommitter.algorithm.version")
res0: String = 1
```

Closes #29895 from dongjoon-hyun/SPARK-DEFAUT-COMMITTER.

Authored-by: Dongjoon Hyun <dhyun@apple.com>
Signed-off-by: Dongjoon Hyun <dhyun@apple.com>
This commit is contained in:
Dongjoon Hyun 2020-09-29 12:02:45 -07:00
parent 711d8dd28a
commit cc06266ade
2 changed files with 5 additions and 8 deletions

View file

@ -462,6 +462,9 @@ private[spark] object SparkHadoopUtil {
for ((key, value) <- conf.getAll if key.startsWith("spark.hadoop.")) { for ((key, value) <- conf.getAll if key.startsWith("spark.hadoop.")) {
hadoopConf.set(key.substring("spark.hadoop.".length), value) hadoopConf.set(key.substring("spark.hadoop.".length), value)
} }
if (conf.getOption("spark.hadoop.mapreduce.fileoutputcommitter.algorithm.version").isEmpty) {
hadoopConf.set("mapreduce.fileoutputcommitter.algorithm.version", "1")
}
} }
private def appendSparkHiveConfigs(conf: SparkConf, hadoopConf: Configuration): Unit = { private def appendSparkHiveConfigs(conf: SparkConf, hadoopConf: Configuration): Unit = {

View file

@ -1761,16 +1761,10 @@ Apart from these, the following properties are also available, and may be useful
</tr> </tr>
<tr> <tr>
<td><code>spark.hadoop.mapreduce.fileoutputcommitter.algorithm.version</code></td> <td><code>spark.hadoop.mapreduce.fileoutputcommitter.algorithm.version</code></td>
<td>Dependent on environment</td> <td>1</td>
<td> <td>
The file output committer algorithm version, valid algorithm version number: 1 or 2. The file output committer algorithm version, valid algorithm version number: 1 or 2.
Version 2 may have better performance, but version 1 may handle failures better in certain situations, Note that 2 may cause a correctness issue like MAPREDUCE-7282.
as per <a href="https://issues.apache.org/jira/browse/MAPREDUCE-4815">MAPREDUCE-4815</a>.
The default value depends on the Hadoop version used in an environment:
1 for Hadoop versions lower than 3.0
2 for Hadoop versions 3.0 and higher
It's important to note that this can change back to 1 again in the future once <a href="https://issues.apache.org/jira/browse/MAPREDUCE-7282">MAPREDUCE-7282</a>
is fixed and merged.
</td> </td>
<td>2.2.0</td> <td>2.2.0</td>
</tr> </tr>