Merge branch 'master' into streaming

Conflicts:
	core/src/main/scala/spark/MapOutputTracker.scala
This commit is contained in:
Tathagata Das 2013-01-16 12:57:11 -08:00
commit f466ee44bc
5 changed files with 111 additions and 16 deletions

View file

@ -142,8 +142,7 @@ private[spark] class MapOutputTracker(actorSystem: ActorSystem, isMaster: Boolea
case e: InterruptedException => case e: InterruptedException =>
} }
} }
return mapStatuses(shuffleId).map(status => return MapOutputTracker.convertMapStatuses(shuffleId, reduceId, mapStatuses(shuffleId))
(status.address, MapOutputTracker.decompressSize(status.compressedSizes(reduceId))))
} else { } else {
fetching += shuffleId fetching += shuffleId
} }
@ -159,21 +158,15 @@ private[spark] class MapOutputTracker(actorSystem: ActorSystem, isMaster: Boolea
fetchedStatuses = deserializeStatuses(fetchedBytes) fetchedStatuses = deserializeStatuses(fetchedBytes)
logInfo("Got the output locations") logInfo("Got the output locations")
mapStatuses.put(shuffleId, fetchedStatuses) mapStatuses.put(shuffleId, fetchedStatuses)
if (fetchedStatuses.contains(null)) {
throw new FetchFailedException(null, shuffleId, -1, reduceId,
new Exception("Missing an output location for shuffle " + shuffleId))
}
} finally { } finally {
fetching.synchronized { fetching.synchronized {
fetching -= shuffleId fetching -= shuffleId
fetching.notifyAll() fetching.notifyAll()
} }
} }
return fetchedStatuses.map(s => return MapOutputTracker.convertMapStatuses(shuffleId, reduceId, fetchedStatuses)
(s.address, MapOutputTracker.decompressSize(s.compressedSizes(reduceId))))
} else { } else {
return statuses.map(s => return MapOutputTracker.convertMapStatuses(shuffleId, reduceId, statuses)
(s.address, MapOutputTracker.decompressSize(s.compressedSizes(reduceId))))
} }
} }
@ -267,6 +260,28 @@ private[spark] class MapOutputTracker(actorSystem: ActorSystem, isMaster: Boolea
private[spark] object MapOutputTracker { private[spark] object MapOutputTracker {
private val LOG_BASE = 1.1 private val LOG_BASE = 1.1
// Convert an array of MapStatuses to locations and sizes for a given reduce ID. If
// any of the statuses is null (indicating a missing location due to a failed mapper),
// throw a FetchFailedException.
def convertMapStatuses(
shuffleId: Int,
reduceId: Int,
statuses: Array[MapStatus]): Array[(BlockManagerId, Long)] = {
if (statuses == null) {
throw new FetchFailedException(null, shuffleId, -1, reduceId,
new Exception("Missing all output locations for shuffle " + shuffleId))
}
statuses.map {
status =>
if (status == null) {
throw new FetchFailedException(null, shuffleId, -1, reduceId,
new Exception("Missing an output location for shuffle " + shuffleId))
} else {
(status.address, decompressSize(status.compressedSizes(reduceId)))
}
}
}
/** /**
* Compress a size in bytes to 8 bits for efficient reporting of map output sizes. * Compress a size in bytes to 8 bits for efficient reporting of map output sizes.
* We do this by encoding the log base 1.1 of the size as an integer, which can support * We do this by encoding the log base 1.1 of the size as an integer, which can support

View file

@ -72,9 +72,11 @@ private[spark] class ResultTask[T, U](
override def run(attemptId: Long): U = { override def run(attemptId: Long): U = {
val context = new TaskContext(stageId, partition, attemptId) val context = new TaskContext(stageId, partition, attemptId)
val result = func(context, rdd.iterator(split, context)) try {
context.executeOnCompleteCallbacks() func(context, rdd.iterator(split, context))
result } finally {
context.executeOnCompleteCallbacks()
}
} }
override def preferredLocations: Seq[String] = locs override def preferredLocations: Seq[String] = locs

View file

@ -1,12 +1,18 @@
package spark package spark
import org.scalatest.FunSuite import org.scalatest.FunSuite
import org.scalatest.BeforeAndAfter
import akka.actor._ import akka.actor._
import spark.scheduler.MapStatus import spark.scheduler.MapStatus
import spark.storage.BlockManagerId import spark.storage.BlockManagerId
import spark.util.AkkaUtils
class MapOutputTrackerSuite extends FunSuite { class MapOutputTrackerSuite extends FunSuite with BeforeAndAfter {
after {
System.clearProperty("spark.master.port")
}
test("compressSize") { test("compressSize") {
assert(MapOutputTracker.compressSize(0L) === 0) assert(MapOutputTracker.compressSize(0L) === 0)
assert(MapOutputTracker.compressSize(1L) === 1) assert(MapOutputTracker.compressSize(1L) === 1)
@ -71,6 +77,36 @@ class MapOutputTrackerSuite extends FunSuite {
// The remaining reduce task might try to grab the output dispite the shuffle failure; // The remaining reduce task might try to grab the output dispite the shuffle failure;
// this should cause it to fail, and the scheduler will ignore the failure due to the // this should cause it to fail, and the scheduler will ignore the failure due to the
// stage already being aborted. // stage already being aborted.
intercept[Exception] { tracker.getServerStatuses(10, 1) } intercept[FetchFailedException] { tracker.getServerStatuses(10, 1) }
}
test("remote fetch") {
System.clearProperty("spark.master.host")
val (actorSystem, boundPort) =
AkkaUtils.createActorSystem("test", "localhost", 0)
System.setProperty("spark.master.port", boundPort.toString)
val masterTracker = new MapOutputTracker(actorSystem, true)
val slaveTracker = new MapOutputTracker(actorSystem, false)
masterTracker.registerShuffle(10, 1)
masterTracker.incrementGeneration()
slaveTracker.updateGeneration(masterTracker.getGeneration)
intercept[FetchFailedException] { slaveTracker.getServerStatuses(10, 0) }
val compressedSize1000 = MapOutputTracker.compressSize(1000L)
val size1000 = MapOutputTracker.decompressSize(compressedSize1000)
masterTracker.registerMapOutput(10, 0, new MapStatus(
new BlockManagerId("hostA", 1000), Array(compressedSize1000)))
masterTracker.incrementGeneration()
slaveTracker.updateGeneration(masterTracker.getGeneration)
assert(slaveTracker.getServerStatuses(10, 0).toSeq ===
Seq((new BlockManagerId("hostA", 1000), size1000)))
masterTracker.unregisterMapOutput(10, 0, new BlockManagerId("hostA", 1000))
masterTracker.incrementGeneration()
slaveTracker.updateGeneration(masterTracker.getGeneration)
intercept[FetchFailedException] { slaveTracker.getServerStatuses(10, 0) }
// failure should be cached
intercept[FetchFailedException] { slaveTracker.getServerStatuses(10, 0) }
} }
} }

View file

@ -0,0 +1,42 @@
package spark.scheduler
import org.scalatest.FunSuite
import org.scalatest.BeforeAndAfter
import spark.TaskContext
import spark.RDD
import spark.SparkContext
import spark.Split
class TaskContextSuite extends FunSuite with BeforeAndAfter {
var sc: SparkContext = _
after {
if (sc != null) {
sc.stop()
sc = null
}
// To avoid Akka rebinding to the same port, since it doesn't unbind immediately on shutdown
System.clearProperty("spark.master.port")
}
test("Calls executeOnCompleteCallbacks after failure") {
var completed = false
sc = new SparkContext("local", "test")
val rdd = new RDD[String](sc, List()) {
override def getSplits = Array[Split](StubSplit(0))
override def compute(split: Split, context: TaskContext) = {
context.addOnCompleteCallback(() => completed = true)
sys.error("failed")
}
}
val func = (c: TaskContext, i: Iterator[String]) => i.next
val task = new ResultTask[String, String](0, rdd, func, 0, Seq(), 0)
intercept[RuntimeException] {
task.run(0)
}
assert(completed === true)
}
case class StubSplit(val index: Int) extends Split
}

View file

@ -1,6 +1,6 @@
@echo off @echo off
set SCALA_VERSION=2.9.1 set SCALA_VERSION=2.9.2
rem Figure out where the Spark framework is installed rem Figure out where the Spark framework is installed
set FWDIR=%~dp0 set FWDIR=%~dp0