Merge branch 'master' of github.com:mesos/spark

This commit is contained in:
Reynold Xin 2012-05-30 18:41:07 -07:00
commit 1dd7d3dfff
9 changed files with 243 additions and 64 deletions

View file

@ -9,16 +9,16 @@ import java.util.LinkedHashMap
* some cache entries have pointers to a shared object. Nonetheless, this Cache should work well
* when most of the space is used by arrays of primitives or of simple classes.
*/
class BoundedMemoryCache extends Cache with Logging {
private val maxBytes: Long = getMaxBytes()
class BoundedMemoryCache(maxBytes: Long) extends Cache with Logging {
logInfo("BoundedMemoryCache.maxBytes = " + maxBytes)
def this() {
this(BoundedMemoryCache.getMaxBytes)
}
private var currentBytes = 0L
private val map = new LinkedHashMap[(Any, Int), Entry](32, 0.75f, true)
// An entry in our map; stores a cached object and its size in bytes
class Entry(val value: Any, val size: Long) {}
override def get(datasetId: Any, partition: Int): Any = {
synchronized {
val entry = map.get((datasetId, partition))
@ -33,13 +33,11 @@ class BoundedMemoryCache extends Cache with Logging {
override def put(datasetId: Any, partition: Int, value: Any): CachePutResponse = {
val key = (datasetId, partition)
logInfo("Asked to add key " + key)
val startTime = System.currentTimeMillis
val size = SizeEstimator.estimate(value.asInstanceOf[AnyRef])
val timeTaken = System.currentTimeMillis - startTime
logInfo("Estimated size for key %s is %d".format(key, size))
logInfo("Size estimation for key %s took %d ms".format(key, timeTaken))
val size = estimateValueSize(key, value)
synchronized {
if (ensureFreeSpace(datasetId, size)) {
if (size > getCapacity) {
return CachePutFailure()
} else if (ensureFreeSpace(datasetId, size)) {
logInfo("Adding key " + key)
map.put(key, new Entry(value, size))
currentBytes += size
@ -54,10 +52,16 @@ class BoundedMemoryCache extends Cache with Logging {
override def getCapacity: Long = maxBytes
private def getMaxBytes(): Long = {
val memoryFractionToUse = System.getProperty(
"spark.boundedMemoryCache.memoryFraction", "0.66").toDouble
(Runtime.getRuntime.maxMemory * memoryFractionToUse).toLong
/**
* Estimate sizeOf 'value'
*/
private def estimateValueSize(key: (Any, Int), value: Any) = {
val startTime = System.currentTimeMillis
val size = SizeEstimator.estimate(value.asInstanceOf[AnyRef])
val timeTaken = System.currentTimeMillis - startTime
logInfo("Estimated size for key %s is %d".format(key, size))
logInfo("Size estimation for key %s took %d ms".format(key, timeTaken))
size
}
/**
@ -85,8 +89,21 @@ class BoundedMemoryCache extends Cache with Logging {
}
protected def reportEntryDropped(datasetId: Any, partition: Int, entry: Entry) {
logInfo("Dropping key (%s, %d) of size %d to make space".format(
datasetId, partition, entry.size))
logInfo("Dropping key (%s, %d) of size %d to make space".format(datasetId, partition, entry.size))
SparkEnv.get.cacheTracker.dropEntry(datasetId, partition)
}
}
// An entry in our map; stores a cached object and its size in bytes
case class Entry(value: Any, size: Long)
object BoundedMemoryCache {
/**
* Get maximum cache capacity from system configuration
*/
def getMaxBytes: Long = {
val memoryFractionToUse = System.getProperty("spark.boundedMemoryCache.memoryFraction", "0.66").toDouble
(Runtime.getRuntime.maxMemory * memoryFractionToUse).toLong
}
}

View file

@ -4,7 +4,7 @@ import java.util.concurrent.atomic.AtomicInteger
sealed trait CachePutResponse
case class CachePutSuccess(size: Long) extends CachePutResponse
case class CachePutFailure extends CachePutResponse
case class CachePutFailure() extends CachePutResponse
/**
* An interface for caches in Spark, to allow for multiple implementations. Caches are used to store

View file

@ -56,7 +56,7 @@ class CacheTrackerActor extends DaemonActor with Logging {
case AddedToCache(rddId, partition, host, size) =>
if (size > 0) {
slaveUsage.put(host, slaveUsage.getOrElse(host, 0L) + size)
slaveUsage.put(host, getCacheUsage(host) + size)
logInfo("Cache entry added: (%s, %s) on %s (size added: %s, available: %s)".format(
rddId, partition, host, Utils.memoryBytesToString(size),
Utils.memoryBytesToString(getCacheAvailable(host))))
@ -71,10 +71,10 @@ class CacheTrackerActor extends DaemonActor with Logging {
logInfo("Cache entry removed: (%s, %s) on %s (size dropped: %s, available: %s)".format(
rddId, partition, host, Utils.memoryBytesToString(size),
Utils.memoryBytesToString(getCacheAvailable(host))))
slaveUsage.put(host, slaveUsage.getOrElse(host, 0L) - size)
slaveUsage.put(host, getCacheUsage(host) - size)
// Do a sanity check to make sure usage is greater than 0.
val usage = slaveUsage.getOrElse(host, 0L)
val usage = getCacheUsage(host)
if (usage < 0) {
logError("Cache usage on %s is negative (%d)".format(host, usage))
}
@ -82,22 +82,19 @@ class CacheTrackerActor extends DaemonActor with Logging {
logInfo("Cache entry removed: (%s, %s) on %s".format(rddId, partition, host))
}
locs(rddId)(partition) = locs(rddId)(partition).filterNot(_ == host)
reply('OK)
case MemoryCacheLost(host) =>
logInfo("Memory cache lost on " + host)
// TODO: Drop host from the memory locations list of all RDDs
case GetCacheLocations =>
logInfo("Asked for current cache locations")
val locsCopy = new HashMap[Int, Array[List[String]]]
for ((rddId, array) <- locs) {
locsCopy(rddId) = array.clone()
}
reply(locsCopy)
reply(locs.map{case (rrdId, array) => (rrdId -> array.clone())})
case GetCacheStatus =>
val status: Seq[Tuple3[String, Long, Long]] = slaveCapacity.keys.map { key =>
(key, slaveCapacity.getOrElse(key, 0L), slaveUsage.getOrElse(key, 0L))
val status = slaveCapacity.map { case (host,capacity) =>
(host, capacity, getCacheUsage(host))
}.toSeq
reply(status)
@ -130,9 +127,7 @@ class CacheTracker(isMaster: Boolean, theCache: Cache) extends Logging {
}
// Report the cache being started.
trackerActor !? SlaveCacheStarted(
System.getProperty("spark.hostname", Utils.localHostName),
cache.getCapacity)
trackerActor !? SlaveCacheStarted(Utils.getHost, cache.getCapacity)
// Remembers which splits are currently being loaded (on worker nodes)
val loading = new HashSet[(Int, Int)]
@ -151,20 +146,17 @@ class CacheTracker(isMaster: Boolean, theCache: Cache) extends Logging {
// Get a snapshot of the currently known locations
def getLocationsSnapshot(): HashMap[Int, Array[List[String]]] = {
(trackerActor !? GetCacheLocations) match {
case h: HashMap[_, _] =>
h.asInstanceOf[HashMap[Int, Array[List[String]]]]
case h: HashMap[_, _] => h.asInstanceOf[HashMap[Int, Array[List[String]]]]
case _ =>
throw new SparkException("Internal error: CacheTrackerActor did not reply with a HashMap")
case _ => throw new SparkException("Internal error: CacheTrackerActor did not reply with a HashMap")
}
}
// Get the usage status of slave caches. Each tuple in the returned sequence
// is in the form of (host name, capacity, usage).
def getCacheStatus(): Seq[Tuple3[String, Long, Long]] = {
def getCacheStatus(): Seq[(String, Long, Long)] = {
(trackerActor !? GetCacheStatus) match {
case h: Seq[Tuple3[String, Long, Long]] =>
h.asInstanceOf[Seq[Tuple3[String, Long, Long]]]
case h: Seq[(String, Long, Long)] => h.asInstanceOf[Seq[(String, Long, Long)]]
case _ =>
throw new SparkException(
@ -202,7 +194,7 @@ class CacheTracker(isMaster: Boolean, theCache: Cache) extends Logging {
}
// If we got here, we have to load the split
// Tell the master that we're doing so
val host = System.getProperty("spark.hostname", Utils.localHostName)
// TODO: fetch any remote copy of the split that may be available
logInfo("Computing partition " + split)
var array: Array[T] = null
@ -223,7 +215,7 @@ class CacheTracker(isMaster: Boolean, theCache: Cache) extends Logging {
case CachePutSuccess(size) => {
// Tell the master that we added the entry. Don't return until it
// replies so it can properly schedule future tasks that use this RDD.
trackerActor !? AddedToCache(rdd.id, split.index, host, size)
trackerActor !? AddedToCache(rdd.id, split.index, Utils.getHost, size)
}
case _ => null
}
@ -234,9 +226,8 @@ class CacheTracker(isMaster: Boolean, theCache: Cache) extends Logging {
// Called by the Cache to report that an entry has been dropped from it
def dropEntry(datasetId: Any, partition: Int) {
datasetId match {
case (cache.keySpaceId, rddId: Int) =>
val host = System.getProperty("spark.hostname", Utils.localHostName)
trackerActor !! DroppedFromCache(rddId, partition, host)
//TODO - do we really want to use '!!' when nobody checks returned future? '!' seems to enough here.
case (cache.keySpaceId, rddId: Int) => trackerActor !! DroppedFromCache(rddId, partition, Utils.getHost)
}
}

View file

@ -42,7 +42,7 @@ private class MesosScheduler(
// Memory used by each executor (in megabytes)
val EXECUTOR_MEMORY = {
if (System.getenv("SPARK_MEM") != null) {
memoryStringToMb(System.getenv("SPARK_MEM"))
MesosScheduler.memoryStringToMb(System.getenv("SPARK_MEM"))
// TODO: Might need to add some extra memory for the non-heap parts of the JVM
} else {
512
@ -78,9 +78,7 @@ private class MesosScheduler(
// Sorts jobs in reverse order of run ID for use in our priority queue (so lower IDs run first)
private val jobOrdering = new Ordering[Job] {
override def compare(j1: Job, j2: Job): Int = {
return j2.runId - j1.runId
}
override def compare(j1: Job, j2: Job): Int = j2.runId - j1.runId
}
def newJobId(): Int = this.synchronized {
@ -156,7 +154,7 @@ private class MesosScheduler(
activeJobs(jobId) = myJob
activeJobsQueue += myJob
logInfo("Adding job with ID " + jobId)
jobTasks(jobId) = new HashSet()
jobTasks(jobId) = HashSet.empty[String]
}
driver.reviveOffers();
}
@ -376,24 +374,27 @@ private class MesosScheduler(
}
override def offerRescinded(d: SchedulerDriver, o: OfferID) {}
}
object MesosScheduler {
/**
* Convert a Java memory parameter passed to -Xmx (such as 300m or 1g) to a number of megabytes.
* This is used to figure out how much memory to claim from Mesos based on the SPARK_MEM
* Convert a Java memory parameter passed to -Xmx (such as 300m or 1g) to a number of megabytes.
* This is used to figure out how much memory to claim from Mesos based on the SPARK_MEM
* environment variable.
*/
def memoryStringToMb(str: String): Int = {
val lower = str.toLowerCase
if (lower.endsWith("k")) {
(lower.substring(0, lower.length-1).toLong / 1024).toInt
(lower.substring(0, lower.length - 1).toLong / 1024).toInt
} else if (lower.endsWith("m")) {
lower.substring(0, lower.length-1).toInt
lower.substring(0, lower.length - 1).toInt
} else if (lower.endsWith("g")) {
lower.substring(0, lower.length-1).toInt * 1024
lower.substring(0, lower.length - 1).toInt * 1024
} else if (lower.endsWith("t")) {
lower.substring(0, lower.length-1).toInt * 1024 * 1024
} else {// no suffix, so it's just a number in bytes
lower.substring(0, lower.length - 1).toInt * 1024 * 1024
} else {
// no suffix, so it's just a number in bytes
(lower.toLong / 1024 / 1024).toInt
}
}
}
}

View file

@ -2,11 +2,11 @@ package spark
import java.io._
import java.net.InetAddress
import java.util.UUID
import java.util.concurrent.{Executors, ThreadFactory, ThreadPoolExecutor}
import scala.collection.mutable.ArrayBuffer
import scala.util.Random
import java.util.{Locale, UUID}
/**
* Various utility methods used by Spark.
@ -157,9 +157,12 @@ object Utils {
/**
* Get the local machine's hostname.
*/
def localHostName(): String = {
return InetAddress.getLocalHost().getHostName
}
def localHostName(): String = InetAddress.getLocalHost.getHostName
/**
* Get current host
*/
def getHost = System.getProperty("spark.hostname", localHostName())
/**
* Delete a file or directory and its contents recursively.
@ -184,7 +187,7 @@ object Utils {
val GB = 1L << 30
val MB = 1L << 20
val KB = 1L << 10
val B = 1L
val (value, unit) = {
if (size >= 2*GB) {
(size.asInstanceOf[Double] / GB, "GB")
@ -196,6 +199,6 @@ object Utils {
(size.asInstanceOf[Double], "B")
}
}
"%.1f%s".format(value, unit)
"%.1f%s".formatLocal(Locale.US, value, unit)
}
}

View file

@ -0,0 +1,31 @@
package spark
import org.scalatest.FunSuite
class BoundedMemoryCacheTest extends FunSuite {
test("constructor test") {
val cache = new BoundedMemoryCache(40)
expect(40)(cache.getCapacity)
}
test("caching") {
val cache = new BoundedMemoryCache(40) {
//TODO sorry about this, but there is not better way how to skip 'cacheTracker.dropEntry'
override protected def reportEntryDropped(datasetId: Any, partition: Int, entry: Entry) {
logInfo("Dropping key (%s, %d) of size %d to make space".format(datasetId, partition, entry.size))
}
}
//should be OK
expect(CachePutSuccess(30))(cache.put("1", 0, "Meh"))
//we cannot add this to cache (there is not enough space in cache) & we cannot evict the only value from
//cache because it's from the same dataset
expect(CachePutFailure())(cache.put("1", 1, "Meh"))
//should be OK, dataset '1' can be evicted from cache
expect(CachePutSuccess(30))(cache.put("2", 0, "Meh"))
//should fail, cache should obey it's capacity
expect(CachePutFailure())(cache.put("3", 0, "Very_long_and_useless_string"))
}
}

View file

@ -0,0 +1,97 @@
package spark
import org.scalatest.FunSuite
import collection.mutable.HashMap
class CacheTrackerSuite extends FunSuite {
test("CacheTrackerActor slave initialization & cache status") {
System.setProperty("spark.master.port", "1345")
val initialSize = 2L << 20
val tracker = new CacheTrackerActor
tracker.start()
tracker !? SlaveCacheStarted("host001", initialSize)
assert(tracker !? GetCacheStatus == Seq(("host001", 2097152L, 0L)))
tracker !? StopCacheTracker
}
test("RegisterRDD") {
System.setProperty("spark.master.port", "1345")
val initialSize = 2L << 20
val tracker = new CacheTrackerActor
tracker.start()
tracker !? SlaveCacheStarted("host001", initialSize)
tracker !? RegisterRDD(1, 3)
tracker !? RegisterRDD(2, 1)
assert(getCacheLocations(tracker) == Map(1 -> List(List(), List(), List()), 2 -> List(List())))
tracker !? StopCacheTracker
}
test("AddedToCache") {
System.setProperty("spark.master.port", "1345")
val initialSize = 2L << 20
val tracker = new CacheTrackerActor
tracker.start()
tracker !? SlaveCacheStarted("host001", initialSize)
tracker !? RegisterRDD(1, 2)
tracker !? RegisterRDD(2, 1)
tracker !? AddedToCache(1, 0, "host001", 2L << 15)
tracker !? AddedToCache(1, 1, "host001", 2L << 11)
tracker !? AddedToCache(2, 0, "host001", 3L << 10)
assert(tracker !? GetCacheStatus == Seq(("host001", 2097152L, 72704L)))
assert(getCacheLocations(tracker) == Map(1 -> List(List("host001"), List("host001")), 2 -> List(List("host001"))))
tracker !? StopCacheTracker
}
test("DroppedFromCache") {
System.setProperty("spark.master.port", "1345")
val initialSize = 2L << 20
val tracker = new CacheTrackerActor
tracker.start()
tracker !? SlaveCacheStarted("host001", initialSize)
tracker !? RegisterRDD(1, 2)
tracker !? RegisterRDD(2, 1)
tracker !? AddedToCache(1, 0, "host001", 2L << 15)
tracker !? AddedToCache(1, 1, "host001", 2L << 11)
tracker !? AddedToCache(2, 0, "host001", 3L << 10)
assert(tracker !? GetCacheStatus == Seq(("host001", 2097152L, 72704L)))
assert(getCacheLocations(tracker) == Map(1 -> List(List("host001"), List("host001")), 2 -> List(List("host001"))))
tracker !? DroppedFromCache(1, 1, "host001", 2L << 11)
assert(tracker !? GetCacheStatus == Seq(("host001", 2097152L, 68608L)))
assert(getCacheLocations(tracker) == Map(1 -> List(List("host001"),List()), 2 -> List(List("host001"))))
tracker !? StopCacheTracker
}
/**
* Helper function to get cacheLocations from CacheTracker
*/
def getCacheLocations(tracker: CacheTrackerActor) = tracker !? GetCacheLocations match {
case h: HashMap[_, _] => h.asInstanceOf[HashMap[Int, Array[List[String]]]].map {
case (i, arr) => (i -> arr.toList)
}
}
}

View file

@ -0,0 +1,28 @@
package spark
import org.scalatest.FunSuite
class MesosSchedulerSuite extends FunSuite {
test("memoryStringToMb"){
assert(MesosScheduler.memoryStringToMb("1") == 0)
assert(MesosScheduler.memoryStringToMb("1048575") == 0)
assert(MesosScheduler.memoryStringToMb("3145728") == 3)
assert(MesosScheduler.memoryStringToMb("1024k") == 1)
assert(MesosScheduler.memoryStringToMb("5000k") == 4)
assert(MesosScheduler.memoryStringToMb("4024k") == MesosScheduler.memoryStringToMb("4024K"))
assert(MesosScheduler.memoryStringToMb("1024m") == 1024)
assert(MesosScheduler.memoryStringToMb("5000m") == 5000)
assert(MesosScheduler.memoryStringToMb("4024m") == MesosScheduler.memoryStringToMb("4024M"))
assert(MesosScheduler.memoryStringToMb("2g") == 2048)
assert(MesosScheduler.memoryStringToMb("3g") == MesosScheduler.memoryStringToMb("3G"))
assert(MesosScheduler.memoryStringToMb("2t") == 2097152)
assert(MesosScheduler.memoryStringToMb("3t") == MesosScheduler.memoryStringToMb("3T"))
}
}

View file

@ -1,7 +1,8 @@
package spark
import org.scalatest.FunSuite
import java.io.{ByteArrayOutputStream, ByteArrayInputStream}
import util.Random
class UtilsSuite extends FunSuite {
@ -14,5 +15,15 @@ class UtilsSuite extends FunSuite {
assert(Utils.memoryBytesToString(5368709120L) === "5.0GB")
}
test("copyStream") {
//input array initialization
val bytes = Array.ofDim[Byte](9000)
Random.nextBytes(bytes)
val os = new ByteArrayOutputStream()
Utils.copyStream(new ByteArrayInputStream(bytes), os)
assert(os.toByteArray.toList.equals(bytes.toList))
}
}