[SPARK-11078] Ensure spilling tests actually spill

#9084 uncovered that many tests that test spilling don't actually spill. This is a follow-up patch to fix that to ensure our unit tests actually catch potential bugs in spilling. The size of this patch is inflated by the refactoring of `ExternalSorterSuite`, which had a lot of duplicate code and logic.

Author: Andrew Or <andrew@databricks.com>

Closes #9124 from andrewor14/spilling-tests.
This commit is contained in:
Andrew Or 2015-10-15 14:50:01 -07:00
parent 2d000124b7
commit 3b364ff0a4
8 changed files with 550 additions and 597 deletions

View file

@ -24,10 +24,14 @@ import java.util.Arrays
import java.util.jar.{JarEntry, JarOutputStream}
import scala.collection.JavaConverters._
import scala.collection.mutable
import scala.collection.mutable.ArrayBuffer
import com.google.common.io.{ByteStreams, Files}
import javax.tools.{JavaFileObject, SimpleJavaFileObject, ToolProvider}
import org.apache.spark.executor.TaskMetrics
import org.apache.spark.scheduler._
import org.apache.spark.util.Utils
/**
@ -154,4 +158,51 @@ private[spark] object TestUtils {
" @Override public String toString() { return \"" + toStringValue + "\"; }}")
createCompiledClass(className, destDir, sourceFile, classpathUrls)
}
/**
* Run some code involving jobs submitted to the given context and assert that the jobs spilled.
*/
def assertSpilled[T](sc: SparkContext, identifier: String)(body: => T): Unit = {
val spillListener = new SpillListener
sc.addSparkListener(spillListener)
body
assert(spillListener.numSpilledStages > 0, s"expected $identifier to spill, but did not")
}
/**
* Run some code involving jobs submitted to the given context and assert that the jobs
* did not spill.
*/
def assertNotSpilled[T](sc: SparkContext, identifier: String)(body: => T): Unit = {
val spillListener = new SpillListener
sc.addSparkListener(spillListener)
body
assert(spillListener.numSpilledStages == 0, s"expected $identifier to not spill, but did")
}
}
/**
* A [[SparkListener]] that detects whether spills have occurred in Spark jobs.
*/
private class SpillListener extends SparkListener {
private val stageIdToTaskMetrics = new mutable.HashMap[Int, ArrayBuffer[TaskMetrics]]
private val spilledStageIds = new mutable.HashSet[Int]
def numSpilledStages: Int = spilledStageIds.size
override def onTaskEnd(taskEnd: SparkListenerTaskEnd): Unit = {
stageIdToTaskMetrics.getOrElseUpdate(
taskEnd.stageId, new ArrayBuffer[TaskMetrics]) += taskEnd.taskMetrics
}
override def onStageCompleted(stageComplete: SparkListenerStageCompleted): Unit = {
val stageId = stageComplete.stageInfo.stageId
val metrics = stageIdToTaskMetrics.remove(stageId).toSeq.flatten
val spilled = metrics.map(_.memoryBytesSpilled).sum > 0
if (spilled) {
spilledStageIds += stageId
}
}
}

View file

@ -139,8 +139,10 @@ class ShuffleMemoryManager protected (
throw new SparkException(
s"Internal error: release called on $numBytes bytes but task only has $curMem")
}
taskMemory(taskAttemptId) -= numBytes
memoryManager.releaseExecutionMemory(numBytes)
if (taskMemory.contains(taskAttemptId)) {
taskMemory(taskAttemptId) -= numBytes
memoryManager.releaseExecutionMemory(numBytes)
}
memoryManager.notifyAll() // Notify waiters in tryToAcquire that memory has been freed
}

View file

@ -95,6 +95,12 @@ class ExternalAppendOnlyMap[K, V, C](
private val keyComparator = new HashComparator[K]
private val ser = serializer.newInstance()
/**
* Number of files this map has spilled so far.
* Exposed for testing.
*/
private[collection] def numSpills: Int = spilledMaps.size
/**
* Insert the given key and value into the map.
*/

View file

@ -43,10 +43,15 @@ private[spark] trait Spillable[C] extends Logging {
private[this] val shuffleMemoryManager = SparkEnv.get.shuffleMemoryManager
// Initial threshold for the size of a collection before we start tracking its memory usage
// Exposed for testing
// For testing only
private[this] val initialMemoryThreshold: Long =
SparkEnv.get.conf.getLong("spark.shuffle.spill.initialMemoryThreshold", 5 * 1024 * 1024)
// Force this collection to spill when there are this many elements in memory
// For testing only
private[this] val numElementsForceSpillThreshold: Long =
SparkEnv.get.conf.getLong("spark.shuffle.spill.numElementsForceSpillThreshold", Long.MaxValue)
// Threshold for this collection's size in bytes before we start tracking its memory usage
// To avoid a large number of small spills, initialize this to a value orders of magnitude > 0
private[this] var myMemoryThreshold = initialMemoryThreshold
@ -69,27 +74,27 @@ private[spark] trait Spillable[C] extends Logging {
* @return true if `collection` was spilled to disk; false otherwise
*/
protected def maybeSpill(collection: C, currentMemory: Long): Boolean = {
var shouldSpill = false
if (elementsRead % 32 == 0 && currentMemory >= myMemoryThreshold) {
// Claim up to double our current memory from the shuffle memory pool
val amountToRequest = 2 * currentMemory - myMemoryThreshold
val granted = shuffleMemoryManager.tryToAcquire(amountToRequest)
myMemoryThreshold += granted
if (myMemoryThreshold <= currentMemory) {
// We were granted too little memory to grow further (either tryToAcquire returned 0,
// or we already had more memory than myMemoryThreshold); spill the current collection
_spillCount += 1
logSpillage(currentMemory)
spill(collection)
_elementsRead = 0
// Keep track of spills, and release memory
_memoryBytesSpilled += currentMemory
releaseMemoryForThisThread()
return true
}
// If we were granted too little memory to grow further (either tryToAcquire returned 0,
// or we already had more memory than myMemoryThreshold), spill the current collection
shouldSpill = currentMemory >= myMemoryThreshold
}
false
shouldSpill = shouldSpill || _elementsRead > numElementsForceSpillThreshold
// Actually spill
if (shouldSpill) {
_spillCount += 1
logSpillage(currentMemory)
spill(collection)
_elementsRead = 0
_memoryBytesSpilled += currentMemory
releaseMemoryForThisThread()
}
shouldSpill
}
/**

View file

@ -203,22 +203,35 @@ class DistributedSuite extends SparkFunSuite with Matchers with LocalSparkContex
}
test("compute without caching when no partitions fit in memory") {
sc = new SparkContext(clusterUrl, "test")
// data will be 4 million * 4 bytes = 16 MB in size, but our memoryFraction set the cache
// to only 50 KB (0.0001 of 512 MB), so no partitions should fit in memory
val data = sc.parallelize(1 to 4000000, 2).persist(StorageLevel.MEMORY_ONLY_SER)
assert(data.count() === 4000000)
assert(data.count() === 4000000)
assert(data.count() === 4000000)
val size = 10000
val conf = new SparkConf()
.set("spark.storage.unrollMemoryThreshold", "1024")
.set("spark.testing.memory", (size / 2).toString)
sc = new SparkContext(clusterUrl, "test", conf)
val data = sc.parallelize(1 to size, 2).persist(StorageLevel.MEMORY_ONLY)
assert(data.count() === size)
assert(data.count() === size)
assert(data.count() === size)
// ensure only a subset of partitions were cached
val rddBlocks = sc.env.blockManager.master.getMatchingBlockIds(_.isRDD, askSlaves = true)
assert(rddBlocks.size === 0, s"expected no RDD blocks, found ${rddBlocks.size}")
}
test("compute when only some partitions fit in memory") {
sc = new SparkContext(clusterUrl, "test", new SparkConf)
// TODO: verify that only a subset of partitions fit in memory (SPARK-11078)
val data = sc.parallelize(1 to 4000000, 20).persist(StorageLevel.MEMORY_ONLY_SER)
assert(data.count() === 4000000)
assert(data.count() === 4000000)
assert(data.count() === 4000000)
val size = 10000
val numPartitions = 10
val conf = new SparkConf()
.set("spark.storage.unrollMemoryThreshold", "1024")
.set("spark.testing.memory", (size * numPartitions).toString)
sc = new SparkContext(clusterUrl, "test", conf)
val data = sc.parallelize(1 to size, numPartitions).persist(StorageLevel.MEMORY_ONLY)
assert(data.count() === size)
assert(data.count() === size)
assert(data.count() === size)
// ensure only a subset of partitions were cached
val rddBlocks = sc.env.blockManager.master.getMatchingBlockIds(_.isRDD, askSlaves = true)
assert(rddBlocks.size > 0, "no RDD blocks found")
assert(rddBlocks.size < numPartitions, s"too many RDD blocks found, expected <$numPartitions")
}
test("passing environment variables to cluster") {

View file

@ -22,9 +22,10 @@ import scala.collection.mutable.ArrayBuffer
import org.apache.spark._
import org.apache.spark.io.CompressionCodec
// TODO: some of these spilling tests probably aren't actually spilling (SPARK-11078)
class ExternalAppendOnlyMapSuite extends SparkFunSuite with LocalSparkContext {
import TestUtils.{assertNotSpilled, assertSpilled}
private val allCompressionCodecs = CompressionCodec.ALL_COMPRESSION_CODECS
private def createCombiner[T](i: T) = ArrayBuffer[T](i)
private def mergeValue[T](buffer: ArrayBuffer[T], i: T): ArrayBuffer[T] = buffer += i
@ -244,54 +245,53 @@ class ExternalAppendOnlyMapSuite extends SparkFunSuite with LocalSparkContext {
* If a compression codec is provided, use it. Otherwise, do not compress spills.
*/
private def testSimpleSpilling(codec: Option[String] = None): Unit = {
val size = 1000
val conf = createSparkConf(loadDefaults = true, codec) // Load defaults for Spark home
conf.set("spark.shuffle.manager", "hash") // avoid using external sorter
conf.set("spark.shuffle.spill.numElementsForceSpillThreshold", (size / 4).toString)
sc = new SparkContext("local-cluster[1,1,1024]", "test", conf)
// reduceByKey - should spill ~8 times
val rddA = sc.parallelize(0 until 100000).map(i => (i/2, i))
val resultA = rddA.reduceByKey(math.max).collect()
assert(resultA.length === 50000)
resultA.foreach { case (k, v) =>
assert(v === k * 2 + 1, s"Value for $k was wrong: expected ${k * 2 + 1}, got $v")
}
// groupByKey - should spill ~17 times
val rddB = sc.parallelize(0 until 100000).map(i => (i/4, i))
val resultB = rddB.groupByKey().collect()
assert(resultB.length === 25000)
resultB.foreach { case (i, seq) =>
val expected = Set(i * 4, i * 4 + 1, i * 4 + 2, i * 4 + 3)
assert(seq.toSet === expected,
s"Value for $i was wrong: expected $expected, got ${seq.toSet}")
}
// cogroup - should spill ~7 times
val rddC1 = sc.parallelize(0 until 10000).map(i => (i, i))
val rddC2 = sc.parallelize(0 until 10000).map(i => (i%1000, i))
val resultC = rddC1.cogroup(rddC2).collect()
assert(resultC.length === 10000)
resultC.foreach { case (i, (seq1, seq2)) =>
i match {
case 0 =>
assert(seq1.toSet === Set[Int](0))
assert(seq2.toSet === Set[Int](0, 1000, 2000, 3000, 4000, 5000, 6000, 7000, 8000, 9000))
case 1 =>
assert(seq1.toSet === Set[Int](1))
assert(seq2.toSet === Set[Int](1, 1001, 2001, 3001, 4001, 5001, 6001, 7001, 8001, 9001))
case 5000 =>
assert(seq1.toSet === Set[Int](5000))
assert(seq2.toSet === Set[Int]())
case 9999 =>
assert(seq1.toSet === Set[Int](9999))
assert(seq2.toSet === Set[Int]())
case _ =>
assertSpilled(sc, "reduceByKey") {
val result = sc.parallelize(0 until size)
.map { i => (i / 2, i) }.reduceByKey(math.max).collect()
assert(result.length === size / 2)
result.foreach { case (k, v) =>
val expected = k * 2 + 1
assert(v === expected, s"Value for $k was wrong: expected $expected, got $v")
}
}
assertSpilled(sc, "groupByKey") {
val result = sc.parallelize(0 until size).map { i => (i / 2, i) }.groupByKey().collect()
assert(result.length == size / 2)
result.foreach { case (i, seq) =>
val actual = seq.toSet
val expected = Set(i * 2, i * 2 + 1)
assert(actual === expected, s"Value for $i was wrong: expected $expected, got $actual")
}
}
assertSpilled(sc, "cogroup") {
val rdd1 = sc.parallelize(0 until size).map { i => (i / 2, i) }
val rdd2 = sc.parallelize(0 until size).map { i => (i / 2, i) }
val result = rdd1.cogroup(rdd2).collect()
assert(result.length === size / 2)
result.foreach { case (i, (seq1, seq2)) =>
val actual1 = seq1.toSet
val actual2 = seq2.toSet
val expected = Set(i * 2, i * 2 + 1)
assert(actual1 === expected, s"Value 1 for $i was wrong: expected $expected, got $actual1")
assert(actual2 === expected, s"Value 2 for $i was wrong: expected $expected, got $actual2")
}
}
sc.stop()
}
test("spilling with hash collisions") {
val size = 1000
val conf = createSparkConf(loadDefaults = true)
conf.set("spark.shuffle.spill.numElementsForceSpillThreshold", (size / 2).toString)
sc = new SparkContext("local-cluster[1,1,1024]", "test", conf)
val map = createExternalMap[String]
@ -315,11 +315,12 @@ class ExternalAppendOnlyMapSuite extends SparkFunSuite with LocalSparkContext {
assert(w1.hashCode === w2.hashCode)
}
map.insertAll((1 to 100000).iterator.map(_.toString).map(i => (i, i)))
map.insertAll((1 to size).iterator.map(_.toString).map(i => (i, i)))
collisionPairs.foreach { case (w1, w2) =>
map.insert(w1, w2)
map.insert(w2, w1)
}
assert(map.numSpills > 0, "map did not spill")
// A map of collision pairs in both directions
val collisionPairsMap = (collisionPairs ++ collisionPairs.map(_.swap)).toMap
@ -334,22 +335,25 @@ class ExternalAppendOnlyMapSuite extends SparkFunSuite with LocalSparkContext {
assert(kv._2.equals(expectedValue))
count += 1
}
assert(count === 100000 + collisionPairs.size * 2)
assert(count === size + collisionPairs.size * 2)
sc.stop()
}
test("spilling with many hash collisions") {
val size = 1000
val conf = createSparkConf(loadDefaults = true)
conf.set("spark.shuffle.spill.numElementsForceSpillThreshold", (size / 2).toString)
sc = new SparkContext("local-cluster[1,1,1024]", "test", conf)
val map = new ExternalAppendOnlyMap[FixedHashObject, Int, Int](_ => 1, _ + _, _ + _)
// Insert 10 copies each of lots of objects whose hash codes are either 0 or 1. This causes
// problems if the map fails to group together the objects with the same code (SPARK-2043).
for (i <- 1 to 10) {
for (j <- 1 to 10000) {
for (j <- 1 to size) {
map.insert(FixedHashObject(j, j % 2), 1)
}
}
assert(map.numSpills > 0, "map did not spill")
val it = map.iterator
var count = 0
@ -358,17 +362,20 @@ class ExternalAppendOnlyMapSuite extends SparkFunSuite with LocalSparkContext {
assert(kv._2 === 10)
count += 1
}
assert(count === 10000)
assert(count === size)
sc.stop()
}
test("spilling with hash collisions using the Int.MaxValue key") {
val size = 1000
val conf = createSparkConf(loadDefaults = true)
conf.set("spark.shuffle.spill.numElementsForceSpillThreshold", (size / 2).toString)
sc = new SparkContext("local-cluster[1,1,1024]", "test", conf)
val map = createExternalMap[Int]
(1 to 100000).foreach { i => map.insert(i, i) }
(1 to size).foreach { i => map.insert(i, i) }
map.insert(Int.MaxValue, Int.MaxValue)
assert(map.numSpills > 0, "map did not spill")
val it = map.iterator
while (it.hasNext) {
@ -379,14 +386,17 @@ class ExternalAppendOnlyMapSuite extends SparkFunSuite with LocalSparkContext {
}
test("spilling with null keys and values") {
val size = 1000
val conf = createSparkConf(loadDefaults = true)
conf.set("spark.shuffle.spill.numElementsForceSpillThreshold", (size / 2).toString)
sc = new SparkContext("local-cluster[1,1,1024]", "test", conf)
val map = createExternalMap[Int]
map.insertAll((1 to 100000).iterator.map(i => (i, i)))
map.insertAll((1 to size).iterator.map(i => (i, i)))
map.insert(null.asInstanceOf[Int], 1)
map.insert(1, null.asInstanceOf[Int])
map.insert(null.asInstanceOf[Int], null.asInstanceOf[Int])
assert(map.numSpills > 0, "map did not spill")
val it = map.iterator
while (it.hasNext) {
@ -397,17 +407,22 @@ class ExternalAppendOnlyMapSuite extends SparkFunSuite with LocalSparkContext {
}
test("external aggregation updates peak execution memory") {
val spillThreshold = 1000
val conf = createSparkConf(loadDefaults = false)
.set("spark.shuffle.manager", "hash") // make sure we're not also using ExternalSorter
.set("spark.testing.memory", (10 * 1024 * 1024).toString)
.set("spark.shuffle.spill.numElementsForceSpillThreshold", spillThreshold.toString)
sc = new SparkContext("local", "test", conf)
// No spilling
AccumulatorSuite.verifyPeakExecutionMemorySet(sc, "external map without spilling") {
sc.parallelize(1 to 10, 2).map { i => (i, i) }.reduceByKey(_ + _).count()
assertNotSpilled(sc, "verify peak memory") {
sc.parallelize(1 to spillThreshold / 2, 2).map { i => (i, i) }.reduceByKey(_ + _).count()
}
}
// With spilling
AccumulatorSuite.verifyPeakExecutionMemorySet(sc, "external map with spilling") {
sc.parallelize(1 to 1000 * 1000, 2).map { i => (i, i) }.reduceByKey(_ + _).count()
assertSpilled(sc, "verify peak memory") {
sc.parallelize(1 to spillThreshold * 3, 2).map { i => (i, i) }.reduceByKey(_ + _).count()
}
}
}

View file

@ -68,6 +68,8 @@ private class GrantEverythingMemoryManager extends MemoryManager {
blockId: BlockId,
numBytes: Long,
evictedBlocks: mutable.Buffer[(BlockId, BlockStatus)]): Boolean = true
override def releaseExecutionMemory(numBytes: Long): Unit = { }
override def releaseStorageMemory(numBytes: Long): Unit = { }
override def maxExecutionMemory: Long = Long.MaxValue
override def maxStorageMemory: Long = Long.MaxValue
}