[SPARK-3280] Made sort-based shuffle the default implementation
Sort-based shuffle has lower memory usage and seems to outperform hash-based in almost all of our testing. Author: Reynold Xin <rxin@apache.org> Closes #2178 from rxin/sort-shuffle and squashes the following commits: 713d341 [Reynold Xin] Fixed test failures by setting spark.shuffle.compress to the same value as spark.shuffle.spill.compress. 85165e6 [Reynold Xin] Fixed a comment typo. aa0d372 [Reynold Xin] [SPARK-3280] Made sort-based shuffle the default implementation
This commit is contained in:
parent
4ba2673569
commit
f25bbbdb3a
|
@ -217,7 +217,7 @@ object SparkEnv extends Logging {
|
|||
val shortShuffleMgrNames = Map(
|
||||
"hash" -> "org.apache.spark.shuffle.hash.HashShuffleManager",
|
||||
"sort" -> "org.apache.spark.shuffle.sort.SortShuffleManager")
|
||||
val shuffleMgrName = conf.get("spark.shuffle.manager", "hash")
|
||||
val shuffleMgrName = conf.get("spark.shuffle.manager", "sort")
|
||||
val shuffleMgrClass = shortShuffleMgrNames.getOrElse(shuffleMgrName.toLowerCase, shuffleMgrName)
|
||||
val shuffleManager = instantiateClass[ShuffleManager](shuffleMgrClass)
|
||||
|
||||
|
|
33
core/src/test/scala/org/apache/spark/HashShuffleSuite.scala
Normal file
33
core/src/test/scala/org/apache/spark/HashShuffleSuite.scala
Normal file
|
@ -0,0 +1,33 @@
|
|||
/*
|
||||
* Licensed to the Apache Software Foundation (ASF) under one or more
|
||||
* contributor license agreements. See the NOTICE file distributed with
|
||||
* this work for additional information regarding copyright ownership.
|
||||
* The ASF licenses this file to You under the Apache License, Version 2.0
|
||||
* (the "License"); you may not use this file except in compliance with
|
||||
* the License. You may obtain a copy of the License at
|
||||
*
|
||||
* http://www.apache.org/licenses/LICENSE-2.0
|
||||
*
|
||||
* Unless required by applicable law or agreed to in writing, software
|
||||
* distributed under the License is distributed on an "AS IS" BASIS,
|
||||
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
|
||||
* See the License for the specific language governing permissions and
|
||||
* limitations under the License.
|
||||
*/
|
||||
|
||||
package org.apache.spark
|
||||
|
||||
import org.scalatest.BeforeAndAfterAll
|
||||
|
||||
class HashShuffleSuite extends ShuffleSuite with BeforeAndAfterAll {
|
||||
|
||||
// This test suite should run all tests in ShuffleSuite with hash-based shuffle.
|
||||
|
||||
override def beforeAll() {
|
||||
System.setProperty("spark.shuffle.manager", "hash")
|
||||
}
|
||||
|
||||
override def afterAll() {
|
||||
System.clearProperty("spark.shuffle.manager")
|
||||
}
|
||||
}
|
|
@ -26,7 +26,7 @@ import org.apache.spark.rdd.{CoGroupedRDD, OrderedRDDFunctions, RDD, ShuffledRDD
|
|||
import org.apache.spark.serializer.KryoSerializer
|
||||
import org.apache.spark.util.MutablePair
|
||||
|
||||
class ShuffleSuite extends FunSuite with Matchers with LocalSparkContext {
|
||||
abstract class ShuffleSuite extends FunSuite with Matchers with LocalSparkContext {
|
||||
|
||||
val conf = new SparkConf(loadDefaults = false)
|
||||
|
||||
|
|
|
@ -24,8 +24,7 @@ class SortShuffleSuite extends ShuffleSuite with BeforeAndAfterAll {
|
|||
// This test suite should run all tests in ShuffleSuite with sort-based shuffle.
|
||||
|
||||
override def beforeAll() {
|
||||
System.setProperty("spark.shuffle.manager",
|
||||
"org.apache.spark.shuffle.sort.SortShuffleManager")
|
||||
System.setProperty("spark.shuffle.manager", "sort")
|
||||
}
|
||||
|
||||
override def afterAll() {
|
||||
|
|
|
@ -42,6 +42,7 @@ class ExternalAppendOnlyMapSuite extends FunSuite with LocalSparkContext {
|
|||
conf.set("spark.serializer.objectStreamReset", "1")
|
||||
conf.set("spark.serializer", "org.apache.spark.serializer.JavaSerializer")
|
||||
conf.set("spark.shuffle.spill.compress", codec.isDefined.toString)
|
||||
conf.set("spark.shuffle.compress", codec.isDefined.toString)
|
||||
codec.foreach { c => conf.set("spark.io.compression.codec", c) }
|
||||
// Ensure that we actually have multiple batches per spill file
|
||||
conf.set("spark.shuffle.spill.batchSize", "10")
|
||||
|
|
|
@ -293,12 +293,11 @@ Apart from these, the following properties are also available, and may be useful
|
|||
</tr>
|
||||
<tr>
|
||||
<td><code>spark.shuffle.manager</code></td>
|
||||
<td>HASH</td>
|
||||
<td>sort</td>
|
||||
<td>
|
||||
Implementation to use for shuffling data. A hash-based shuffle manager is the default, but
|
||||
starting in Spark 1.1 there is an experimental sort-based shuffle manager that is more
|
||||
memory-efficient in environments with small executors, such as YARN. To use that, change
|
||||
this value to <code>SORT</code>.
|
||||
Implementation to use for shuffling data. There are two implementations available:
|
||||
<code>sort</code> and <code>hash</code>. Sort-based shuffle is more memory-efficient and is
|
||||
the default option starting in 1.2.
|
||||
</td>
|
||||
</tr>
|
||||
<tr>
|
||||
|
|
Loading…
Reference in a new issue