Replace the code to check for Option != None with Option.isDefined call in Scala code.
This hopefully will make the code cleaner.
This commit is contained in:
parent
749f842827
commit
90ea9d5a8f
|
@ -49,7 +49,7 @@ object Partitioner {
|
||||||
*/
|
*/
|
||||||
def defaultPartitioner(rdd: RDD[_], others: RDD[_]*): Partitioner = {
|
def defaultPartitioner(rdd: RDD[_], others: RDD[_]*): Partitioner = {
|
||||||
val bySize = (Seq(rdd) ++ others).sortBy(_.partitions.size).reverse
|
val bySize = (Seq(rdd) ++ others).sortBy(_.partitions.size).reverse
|
||||||
for (r <- bySize if r.partitioner != None) {
|
for (r <- bySize if r.partitioner.isDefined) {
|
||||||
return r.partitioner.get
|
return r.partitioner.get
|
||||||
}
|
}
|
||||||
if (rdd.context.conf.contains("spark.default.parallelism")) {
|
if (rdd.context.conf.contains("spark.default.parallelism")) {
|
||||||
|
|
|
@ -67,7 +67,7 @@ private[spark] class ApplicationPage(parent: MasterWebUI) {
|
||||||
<li><strong>User:</strong> {app.desc.user}</li>
|
<li><strong>User:</strong> {app.desc.user}</li>
|
||||||
<li><strong>Cores:</strong>
|
<li><strong>Cores:</strong>
|
||||||
{
|
{
|
||||||
if (app.desc.maxCores == None) {
|
if (app.desc.maxCores.isEmpty) {
|
||||||
"Unlimited (%s granted)".format(app.coresGranted)
|
"Unlimited (%s granted)".format(app.coresGranted)
|
||||||
} else {
|
} else {
|
||||||
"%s (%s granted, %s left)".format(
|
"%s (%s granted, %s left)".format(
|
||||||
|
|
|
@ -80,7 +80,7 @@ private[spark] class MetricsConfig(val configFile: Option[String]) extends Loggi
|
||||||
val subProperties = new mutable.HashMap[String, Properties]
|
val subProperties = new mutable.HashMap[String, Properties]
|
||||||
import scala.collection.JavaConversions._
|
import scala.collection.JavaConversions._
|
||||||
prop.foreach { kv =>
|
prop.foreach { kv =>
|
||||||
if (regex.findPrefixOf(kv._1) != None) {
|
if (regex.findPrefixOf(kv._1).isDefined) {
|
||||||
val regex(prefix, suffix) = kv._1
|
val regex(prefix, suffix) = kv._1
|
||||||
subProperties.getOrElseUpdate(prefix, new Properties).setProperty(suffix, kv._2)
|
subProperties.getOrElseUpdate(prefix, new Properties).setProperty(suffix, kv._2)
|
||||||
}
|
}
|
||||||
|
|
|
@ -71,7 +71,7 @@ private[spark] class ApproximateActionListener[T, U, R](
|
||||||
val finishTime = startTime + timeout
|
val finishTime = startTime + timeout
|
||||||
while (true) {
|
while (true) {
|
||||||
val time = System.currentTimeMillis()
|
val time = System.currentTimeMillis()
|
||||||
if (failure != None) {
|
if (failure.isDefined) {
|
||||||
throw failure.get
|
throw failure.get
|
||||||
} else if (finishedTasks == totalTasks) {
|
} else if (finishedTasks == totalTasks) {
|
||||||
return new PartialResult(evaluator.currentResult(), true)
|
return new PartialResult(evaluator.currentResult(), true)
|
||||||
|
|
|
@ -31,10 +31,10 @@ class PartialResult[R](initialVal: R, isFinal: Boolean) {
|
||||||
* Blocking method to wait for and return the final value.
|
* Blocking method to wait for and return the final value.
|
||||||
*/
|
*/
|
||||||
def getFinalValue(): R = synchronized {
|
def getFinalValue(): R = synchronized {
|
||||||
while (finalValue == None && failure == None) {
|
while (finalValue.isEmpty && failure.isEmpty) {
|
||||||
this.wait()
|
this.wait()
|
||||||
}
|
}
|
||||||
if (finalValue != None) {
|
if (finalValue.isDefined) {
|
||||||
return finalValue.get
|
return finalValue.get
|
||||||
} else {
|
} else {
|
||||||
throw failure.get
|
throw failure.get
|
||||||
|
@ -46,11 +46,11 @@ class PartialResult[R](initialVal: R, isFinal: Boolean) {
|
||||||
* is supported per PartialResult.
|
* is supported per PartialResult.
|
||||||
*/
|
*/
|
||||||
def onComplete(handler: R => Unit): PartialResult[R] = synchronized {
|
def onComplete(handler: R => Unit): PartialResult[R] = synchronized {
|
||||||
if (completionHandler != None) {
|
if (completionHandler.isDefined) {
|
||||||
throw new UnsupportedOperationException("onComplete cannot be called twice")
|
throw new UnsupportedOperationException("onComplete cannot be called twice")
|
||||||
}
|
}
|
||||||
completionHandler = Some(handler)
|
completionHandler = Some(handler)
|
||||||
if (finalValue != None) {
|
if (finalValue.isDefined) {
|
||||||
// We already have a final value, so let's call the handler
|
// We already have a final value, so let's call the handler
|
||||||
handler(finalValue.get)
|
handler(finalValue.get)
|
||||||
}
|
}
|
||||||
|
@ -63,11 +63,11 @@ class PartialResult[R](initialVal: R, isFinal: Boolean) {
|
||||||
*/
|
*/
|
||||||
def onFail(handler: Exception => Unit) {
|
def onFail(handler: Exception => Unit) {
|
||||||
synchronized {
|
synchronized {
|
||||||
if (failureHandler != None) {
|
if (failureHandler.isDefined) {
|
||||||
throw new UnsupportedOperationException("onFail cannot be called twice")
|
throw new UnsupportedOperationException("onFail cannot be called twice")
|
||||||
}
|
}
|
||||||
failureHandler = Some(handler)
|
failureHandler = Some(handler)
|
||||||
if (failure != None) {
|
if (failure.isDefined) {
|
||||||
// We already have a failure, so let's call the handler
|
// We already have a failure, so let's call the handler
|
||||||
handler(failure.get)
|
handler(failure.get)
|
||||||
}
|
}
|
||||||
|
@ -102,7 +102,7 @@ class PartialResult[R](initialVal: R, isFinal: Boolean) {
|
||||||
|
|
||||||
private[spark] def setFinalValue(value: R) {
|
private[spark] def setFinalValue(value: R) {
|
||||||
synchronized {
|
synchronized {
|
||||||
if (finalValue != None) {
|
if (finalValue.isDefined) {
|
||||||
throw new UnsupportedOperationException("setFinalValue called twice on a PartialResult")
|
throw new UnsupportedOperationException("setFinalValue called twice on a PartialResult")
|
||||||
}
|
}
|
||||||
finalValue = Some(value)
|
finalValue = Some(value)
|
||||||
|
@ -117,7 +117,7 @@ class PartialResult[R](initialVal: R, isFinal: Boolean) {
|
||||||
|
|
||||||
private[spark] def setFailure(exception: Exception) {
|
private[spark] def setFailure(exception: Exception) {
|
||||||
synchronized {
|
synchronized {
|
||||||
if (failure != None) {
|
if (failure.isDefined) {
|
||||||
throw new UnsupportedOperationException("setFailure called twice on a PartialResult")
|
throw new UnsupportedOperationException("setFailure called twice on a PartialResult")
|
||||||
}
|
}
|
||||||
failure = Some(exception)
|
failure = Some(exception)
|
||||||
|
|
|
@ -666,7 +666,7 @@ abstract class RDD[T: ClassTag](
|
||||||
}
|
}
|
||||||
var jobResult: Option[T] = None
|
var jobResult: Option[T] = None
|
||||||
val mergeResult = (index: Int, taskResult: Option[T]) => {
|
val mergeResult = (index: Int, taskResult: Option[T]) => {
|
||||||
if (taskResult != None) {
|
if (taskResult.isDefined) {
|
||||||
jobResult = jobResult match {
|
jobResult = jobResult match {
|
||||||
case Some(value) => Some(f(value, taskResult.get))
|
case Some(value) => Some(f(value, taskResult.get))
|
||||||
case None => taskResult
|
case None => taskResult
|
||||||
|
|
|
@ -877,7 +877,7 @@ class DAGScheduler(
|
||||||
logInfo("running: " + running)
|
logInfo("running: " + running)
|
||||||
logInfo("waiting: " + waiting)
|
logInfo("waiting: " + waiting)
|
||||||
logInfo("failed: " + failed)
|
logInfo("failed: " + failed)
|
||||||
if (stage.shuffleDep != None) {
|
if (stage.shuffleDep.isDefined) {
|
||||||
// We supply true to increment the epoch number here in case this is a
|
// We supply true to increment the epoch number here in case this is a
|
||||||
// recomputation of the map outputs. In that case, some nodes may have cached
|
// recomputation of the map outputs. In that case, some nodes may have cached
|
||||||
// locations with holes (from when we detected the error) and will need the
|
// locations with holes (from when we detected the error) and will need the
|
||||||
|
|
|
@ -46,7 +46,7 @@ private[spark] class Stage(
|
||||||
callSite: Option[String])
|
callSite: Option[String])
|
||||||
extends Logging {
|
extends Logging {
|
||||||
|
|
||||||
val isShuffleMap = shuffleDep != None
|
val isShuffleMap = shuffleDep.isDefined
|
||||||
val numPartitions = rdd.partitions.size
|
val numPartitions = rdd.partitions.size
|
||||||
val outputLocs = Array.fill[List[MapStatus]](numPartitions)(Nil)
|
val outputLocs = Array.fill[List[MapStatus]](numPartitions)(Nil)
|
||||||
var numAvailableOutputs = 0
|
var numAvailableOutputs = 0
|
||||||
|
|
|
@ -293,7 +293,7 @@ private[spark] class TaskSchedulerImpl(
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
// Update the DAGScheduler without holding a lock on this, since that can deadlock
|
// Update the DAGScheduler without holding a lock on this, since that can deadlock
|
||||||
if (failedExecutor != None) {
|
if (failedExecutor.isDefined) {
|
||||||
dagScheduler.executorLost(failedExecutor.get)
|
dagScheduler.executorLost(failedExecutor.get)
|
||||||
backend.reviveOffers()
|
backend.reviveOffers()
|
||||||
}
|
}
|
||||||
|
@ -387,7 +387,7 @@ private[spark] class TaskSchedulerImpl(
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
// Call dagScheduler.executorLost without holding the lock on this to prevent deadlock
|
// Call dagScheduler.executorLost without holding the lock on this to prevent deadlock
|
||||||
if (failedExecutor != None) {
|
if (failedExecutor.isDefined) {
|
||||||
dagScheduler.executorLost(failedExecutor.get)
|
dagScheduler.executorLost(failedExecutor.get)
|
||||||
backend.reviveOffers()
|
backend.reviveOffers()
|
||||||
}
|
}
|
||||||
|
|
|
@ -111,7 +111,7 @@ private[spark] object BlockManagerWorker extends Logging {
|
||||||
val blockMessageArray = new BlockMessageArray(blockMessage)
|
val blockMessageArray = new BlockMessageArray(blockMessage)
|
||||||
val resultMessage = connectionManager.sendMessageReliablySync(
|
val resultMessage = connectionManager.sendMessageReliablySync(
|
||||||
toConnManagerId, blockMessageArray.toBufferMessage)
|
toConnManagerId, blockMessageArray.toBufferMessage)
|
||||||
resultMessage != None
|
resultMessage.isDefined
|
||||||
}
|
}
|
||||||
|
|
||||||
def syncGetBlock(msg: GetBlock, toConnManagerId: ConnectionManagerId): ByteBuffer = {
|
def syncGetBlock(msg: GetBlock, toConnManagerId: ConnectionManagerId): ByteBuffer = {
|
||||||
|
|
|
@ -214,7 +214,7 @@ private class MemoryStore(blockManager: BlockManager, maxMemory: Long)
|
||||||
while (maxMemory - (currentMemory - selectedMemory) < space && iterator.hasNext) {
|
while (maxMemory - (currentMemory - selectedMemory) < space && iterator.hasNext) {
|
||||||
val pair = iterator.next()
|
val pair = iterator.next()
|
||||||
val blockId = pair.getKey
|
val blockId = pair.getKey
|
||||||
if (rddToAdd != None && rddToAdd == getRddId(blockId)) {
|
if (rddToAdd.isDefined && rddToAdd == getRddId(blockId)) {
|
||||||
logInfo("Will not store " + blockIdToAdd + " as it would require dropping another " +
|
logInfo("Will not store " + blockIdToAdd + " as it would require dropping another " +
|
||||||
"block from the same RDD")
|
"block from the same RDD")
|
||||||
return false
|
return false
|
||||||
|
|
|
@ -673,7 +673,7 @@ private[spark] object Utils extends Logging {
|
||||||
|
|
||||||
for (el <- trace) {
|
for (el <- trace) {
|
||||||
if (!finished) {
|
if (!finished) {
|
||||||
if (SPARK_CLASS_REGEX.findFirstIn(el.getClassName) != None) {
|
if (SPARK_CLASS_REGEX.findFirstIn(el.getClassName).isDefined) {
|
||||||
lastSparkMethod = if (el.getMethodName == "<init>") {
|
lastSparkMethod = if (el.getMethodName == "<init>") {
|
||||||
// Spark method is a constructor; get its class name
|
// Spark method is a constructor; get its class name
|
||||||
el.getClassName.substring(el.getClassName.lastIndexOf('.') + 1)
|
el.getClassName.substring(el.getClassName.lastIndexOf('.') + 1)
|
||||||
|
|
|
@ -287,7 +287,7 @@ class TaskSetManagerSuite extends FunSuite with LocalSparkContext with Logging {
|
||||||
// after the last failure.
|
// after the last failure.
|
||||||
(1 to manager.maxTaskFailures).foreach { index =>
|
(1 to manager.maxTaskFailures).foreach { index =>
|
||||||
val offerResult = manager.resourceOffer("exec1", "host1", 1, ANY)
|
val offerResult = manager.resourceOffer("exec1", "host1", 1, ANY)
|
||||||
assert(offerResult != None,
|
assert(offerResult.isDefined,
|
||||||
"Expect resource offer on iteration %s to return a task".format(index))
|
"Expect resource offer on iteration %s to return a task".format(index))
|
||||||
assert(offerResult.get.index === 0)
|
assert(offerResult.get.index === 0)
|
||||||
manager.handleFailedTask(offerResult.get.taskId, TaskState.FINISHED, Some(TaskResultLost))
|
manager.handleFailedTask(offerResult.get.taskId, TaskState.FINISHED, Some(TaskResultLost))
|
||||||
|
|
|
@ -137,9 +137,9 @@ class BlockManagerSuite extends FunSuite with BeforeAndAfter with PrivateMethodT
|
||||||
store.putSingle("a3", a3, StorageLevel.MEMORY_ONLY, tellMaster = false)
|
store.putSingle("a3", a3, StorageLevel.MEMORY_ONLY, tellMaster = false)
|
||||||
|
|
||||||
// Checking whether blocks are in memory
|
// Checking whether blocks are in memory
|
||||||
assert(store.getSingle("a1") != None, "a1 was not in store")
|
assert(store.getSingle("a1").isDefined, "a1 was not in store")
|
||||||
assert(store.getSingle("a2") != None, "a2 was not in store")
|
assert(store.getSingle("a2").isDefined, "a2 was not in store")
|
||||||
assert(store.getSingle("a3") != None, "a3 was not in store")
|
assert(store.getSingle("a3").isDefined, "a3 was not in store")
|
||||||
|
|
||||||
// Checking whether master knows about the blocks or not
|
// Checking whether master knows about the blocks or not
|
||||||
assert(master.getLocations("a1").size > 0, "master was not told about a1")
|
assert(master.getLocations("a1").size > 0, "master was not told about a1")
|
||||||
|
@ -186,9 +186,9 @@ class BlockManagerSuite extends FunSuite with BeforeAndAfter with PrivateMethodT
|
||||||
val memStatus = master.getMemoryStatus.head._2
|
val memStatus = master.getMemoryStatus.head._2
|
||||||
assert(memStatus._1 == 2000L, "total memory " + memStatus._1 + " should equal 2000")
|
assert(memStatus._1 == 2000L, "total memory " + memStatus._1 + " should equal 2000")
|
||||||
assert(memStatus._2 <= 1200L, "remaining memory " + memStatus._2 + " should <= 1200")
|
assert(memStatus._2 <= 1200L, "remaining memory " + memStatus._2 + " should <= 1200")
|
||||||
assert(store.getSingle("a1-to-remove") != None, "a1 was not in store")
|
assert(store.getSingle("a1-to-remove").isDefined, "a1 was not in store")
|
||||||
assert(store.getSingle("a2-to-remove") != None, "a2 was not in store")
|
assert(store.getSingle("a2-to-remove").isDefined, "a2 was not in store")
|
||||||
assert(store.getSingle("a3-to-remove") != None, "a3 was not in store")
|
assert(store.getSingle("a3-to-remove").isDefined, "a3 was not in store")
|
||||||
|
|
||||||
// Checking whether master knows about the blocks or not
|
// Checking whether master knows about the blocks or not
|
||||||
assert(master.getLocations("a1-to-remove").size > 0, "master was not told about a1")
|
assert(master.getLocations("a1-to-remove").size > 0, "master was not told about a1")
|
||||||
|
@ -259,7 +259,7 @@ class BlockManagerSuite extends FunSuite with BeforeAndAfter with PrivateMethodT
|
||||||
|
|
||||||
store.putSingle("a1", a1, StorageLevel.MEMORY_ONLY)
|
store.putSingle("a1", a1, StorageLevel.MEMORY_ONLY)
|
||||||
|
|
||||||
assert(store.getSingle("a1") != None, "a1 was not in store")
|
assert(store.getSingle("a1").isDefined, "a1 was not in store")
|
||||||
assert(master.getLocations("a1").size > 0, "master was not told about a1")
|
assert(master.getLocations("a1").size > 0, "master was not told about a1")
|
||||||
|
|
||||||
master.removeExecutor(store.blockManagerId.executorId)
|
master.removeExecutor(store.blockManagerId.executorId)
|
||||||
|
@ -333,14 +333,14 @@ class BlockManagerSuite extends FunSuite with BeforeAndAfter with PrivateMethodT
|
||||||
store.putSingle("a1", a1, StorageLevel.MEMORY_ONLY)
|
store.putSingle("a1", a1, StorageLevel.MEMORY_ONLY)
|
||||||
store.putSingle("a2", a2, StorageLevel.MEMORY_ONLY)
|
store.putSingle("a2", a2, StorageLevel.MEMORY_ONLY)
|
||||||
store.putSingle("a3", a3, StorageLevel.MEMORY_ONLY)
|
store.putSingle("a3", a3, StorageLevel.MEMORY_ONLY)
|
||||||
assert(store.getSingle("a2") != None, "a2 was not in store")
|
assert(store.getSingle("a2").isDefined, "a2 was not in store")
|
||||||
assert(store.getSingle("a3") != None, "a3 was not in store")
|
assert(store.getSingle("a3").isDefined, "a3 was not in store")
|
||||||
assert(store.getSingle("a1") === None, "a1 was in store")
|
assert(store.getSingle("a1") === None, "a1 was in store")
|
||||||
assert(store.getSingle("a2") != None, "a2 was not in store")
|
assert(store.getSingle("a2").isDefined, "a2 was not in store")
|
||||||
// At this point a2 was gotten last, so LRU will getSingle rid of a3
|
// At this point a2 was gotten last, so LRU will getSingle rid of a3
|
||||||
store.putSingle("a1", a1, StorageLevel.MEMORY_ONLY)
|
store.putSingle("a1", a1, StorageLevel.MEMORY_ONLY)
|
||||||
assert(store.getSingle("a1") != None, "a1 was not in store")
|
assert(store.getSingle("a1").isDefined, "a1 was not in store")
|
||||||
assert(store.getSingle("a2") != None, "a2 was not in store")
|
assert(store.getSingle("a2").isDefined, "a2 was not in store")
|
||||||
assert(store.getSingle("a3") === None, "a3 was in store")
|
assert(store.getSingle("a3") === None, "a3 was in store")
|
||||||
}
|
}
|
||||||
|
|
||||||
|
@ -352,14 +352,14 @@ class BlockManagerSuite extends FunSuite with BeforeAndAfter with PrivateMethodT
|
||||||
store.putSingle("a1", a1, StorageLevel.MEMORY_ONLY_SER)
|
store.putSingle("a1", a1, StorageLevel.MEMORY_ONLY_SER)
|
||||||
store.putSingle("a2", a2, StorageLevel.MEMORY_ONLY_SER)
|
store.putSingle("a2", a2, StorageLevel.MEMORY_ONLY_SER)
|
||||||
store.putSingle("a3", a3, StorageLevel.MEMORY_ONLY_SER)
|
store.putSingle("a3", a3, StorageLevel.MEMORY_ONLY_SER)
|
||||||
assert(store.getSingle("a2") != None, "a2 was not in store")
|
assert(store.getSingle("a2").isDefined, "a2 was not in store")
|
||||||
assert(store.getSingle("a3") != None, "a3 was not in store")
|
assert(store.getSingle("a3").isDefined, "a3 was not in store")
|
||||||
assert(store.getSingle("a1") === None, "a1 was in store")
|
assert(store.getSingle("a1") === None, "a1 was in store")
|
||||||
assert(store.getSingle("a2") != None, "a2 was not in store")
|
assert(store.getSingle("a2").isDefined, "a2 was not in store")
|
||||||
// At this point a2 was gotten last, so LRU will getSingle rid of a3
|
// At this point a2 was gotten last, so LRU will getSingle rid of a3
|
||||||
store.putSingle("a1", a1, StorageLevel.MEMORY_ONLY_SER)
|
store.putSingle("a1", a1, StorageLevel.MEMORY_ONLY_SER)
|
||||||
assert(store.getSingle("a1") != None, "a1 was not in store")
|
assert(store.getSingle("a1").isDefined, "a1 was not in store")
|
||||||
assert(store.getSingle("a2") != None, "a2 was not in store")
|
assert(store.getSingle("a2").isDefined, "a2 was not in store")
|
||||||
assert(store.getSingle("a3") === None, "a3 was in store")
|
assert(store.getSingle("a3") === None, "a3 was in store")
|
||||||
}
|
}
|
||||||
|
|
||||||
|
@ -374,8 +374,8 @@ class BlockManagerSuite extends FunSuite with BeforeAndAfter with PrivateMethodT
|
||||||
// Even though we accessed rdd_0_3 last, it should not have replaced partitions 1 and 2
|
// Even though we accessed rdd_0_3 last, it should not have replaced partitions 1 and 2
|
||||||
// from the same RDD
|
// from the same RDD
|
||||||
assert(store.getSingle(rdd(0, 3)) === None, "rdd_0_3 was in store")
|
assert(store.getSingle(rdd(0, 3)) === None, "rdd_0_3 was in store")
|
||||||
assert(store.getSingle(rdd(0, 2)) != None, "rdd_0_2 was not in store")
|
assert(store.getSingle(rdd(0, 2)).isDefined, "rdd_0_2 was not in store")
|
||||||
assert(store.getSingle(rdd(0, 1)) != None, "rdd_0_1 was not in store")
|
assert(store.getSingle(rdd(0, 1)).isDefined, "rdd_0_1 was not in store")
|
||||||
// Check that rdd_0_3 doesn't replace them even after further accesses
|
// Check that rdd_0_3 doesn't replace them even after further accesses
|
||||||
assert(store.getSingle(rdd(0, 3)) === None, "rdd_0_3 was in store")
|
assert(store.getSingle(rdd(0, 3)) === None, "rdd_0_3 was in store")
|
||||||
assert(store.getSingle(rdd(0, 3)) === None, "rdd_0_3 was in store")
|
assert(store.getSingle(rdd(0, 3)) === None, "rdd_0_3 was in store")
|
||||||
|
@ -392,7 +392,7 @@ class BlockManagerSuite extends FunSuite with BeforeAndAfter with PrivateMethodT
|
||||||
assert(!store.memoryStore.contains(rdd(0, 1)), "rdd_0_1 was in store")
|
assert(!store.memoryStore.contains(rdd(0, 1)), "rdd_0_1 was in store")
|
||||||
assert(store.memoryStore.contains(rdd(0, 2)), "rdd_0_2 was not in store")
|
assert(store.memoryStore.contains(rdd(0, 2)), "rdd_0_2 was not in store")
|
||||||
// Do a get() on rdd_0_2 so that it is the most recently used item
|
// Do a get() on rdd_0_2 so that it is the most recently used item
|
||||||
assert(store.getSingle(rdd(0, 2)) != None, "rdd_0_2 was not in store")
|
assert(store.getSingle(rdd(0, 2)).isDefined, "rdd_0_2 was not in store")
|
||||||
// Put in more partitions from RDD 0; they should replace rdd_1_1
|
// Put in more partitions from RDD 0; they should replace rdd_1_1
|
||||||
store.putSingle(rdd(0, 3), new Array[Byte](400), StorageLevel.MEMORY_ONLY)
|
store.putSingle(rdd(0, 3), new Array[Byte](400), StorageLevel.MEMORY_ONLY)
|
||||||
store.putSingle(rdd(0, 4), new Array[Byte](400), StorageLevel.MEMORY_ONLY)
|
store.putSingle(rdd(0, 4), new Array[Byte](400), StorageLevel.MEMORY_ONLY)
|
||||||
|
@ -413,9 +413,9 @@ class BlockManagerSuite extends FunSuite with BeforeAndAfter with PrivateMethodT
|
||||||
store.putSingle("a1", a1, StorageLevel.DISK_ONLY)
|
store.putSingle("a1", a1, StorageLevel.DISK_ONLY)
|
||||||
store.putSingle("a2", a2, StorageLevel.DISK_ONLY)
|
store.putSingle("a2", a2, StorageLevel.DISK_ONLY)
|
||||||
store.putSingle("a3", a3, StorageLevel.DISK_ONLY)
|
store.putSingle("a3", a3, StorageLevel.DISK_ONLY)
|
||||||
assert(store.getSingle("a2") != None, "a2 was in store")
|
assert(store.getSingle("a2").isDefined, "a2 was in store")
|
||||||
assert(store.getSingle("a3") != None, "a3 was in store")
|
assert(store.getSingle("a3").isDefined, "a3 was in store")
|
||||||
assert(store.getSingle("a1") != None, "a1 was in store")
|
assert(store.getSingle("a1").isDefined, "a1 was in store")
|
||||||
}
|
}
|
||||||
|
|
||||||
test("disk and memory storage") {
|
test("disk and memory storage") {
|
||||||
|
@ -426,11 +426,11 @@ class BlockManagerSuite extends FunSuite with BeforeAndAfter with PrivateMethodT
|
||||||
store.putSingle("a1", a1, StorageLevel.MEMORY_AND_DISK)
|
store.putSingle("a1", a1, StorageLevel.MEMORY_AND_DISK)
|
||||||
store.putSingle("a2", a2, StorageLevel.MEMORY_AND_DISK)
|
store.putSingle("a2", a2, StorageLevel.MEMORY_AND_DISK)
|
||||||
store.putSingle("a3", a3, StorageLevel.MEMORY_AND_DISK)
|
store.putSingle("a3", a3, StorageLevel.MEMORY_AND_DISK)
|
||||||
assert(store.getSingle("a2") != None, "a2 was not in store")
|
assert(store.getSingle("a2").isDefined, "a2 was not in store")
|
||||||
assert(store.getSingle("a3") != None, "a3 was not in store")
|
assert(store.getSingle("a3").isDefined, "a3 was not in store")
|
||||||
assert(store.memoryStore.getValues("a1") == None, "a1 was in memory store")
|
assert(store.memoryStore.getValues("a1") == None, "a1 was in memory store")
|
||||||
assert(store.getSingle("a1") != None, "a1 was not in store")
|
assert(store.getSingle("a1").isDefined, "a1 was not in store")
|
||||||
assert(store.memoryStore.getValues("a1") != None, "a1 was not in memory store")
|
assert(store.memoryStore.getValues("a1").isDefined, "a1 was not in memory store")
|
||||||
}
|
}
|
||||||
|
|
||||||
test("disk and memory storage with getLocalBytes") {
|
test("disk and memory storage with getLocalBytes") {
|
||||||
|
@ -441,11 +441,11 @@ class BlockManagerSuite extends FunSuite with BeforeAndAfter with PrivateMethodT
|
||||||
store.putSingle("a1", a1, StorageLevel.MEMORY_AND_DISK)
|
store.putSingle("a1", a1, StorageLevel.MEMORY_AND_DISK)
|
||||||
store.putSingle("a2", a2, StorageLevel.MEMORY_AND_DISK)
|
store.putSingle("a2", a2, StorageLevel.MEMORY_AND_DISK)
|
||||||
store.putSingle("a3", a3, StorageLevel.MEMORY_AND_DISK)
|
store.putSingle("a3", a3, StorageLevel.MEMORY_AND_DISK)
|
||||||
assert(store.getLocalBytes("a2") != None, "a2 was not in store")
|
assert(store.getLocalBytes("a2").isDefined, "a2 was not in store")
|
||||||
assert(store.getLocalBytes("a3") != None, "a3 was not in store")
|
assert(store.getLocalBytes("a3").isDefined, "a3 was not in store")
|
||||||
assert(store.memoryStore.getValues("a1") == None, "a1 was in memory store")
|
assert(store.memoryStore.getValues("a1") == None, "a1 was in memory store")
|
||||||
assert(store.getLocalBytes("a1") != None, "a1 was not in store")
|
assert(store.getLocalBytes("a1").isDefined, "a1 was not in store")
|
||||||
assert(store.memoryStore.getValues("a1") != None, "a1 was not in memory store")
|
assert(store.memoryStore.getValues("a1").isDefined, "a1 was not in memory store")
|
||||||
}
|
}
|
||||||
|
|
||||||
test("disk and memory storage with serialization") {
|
test("disk and memory storage with serialization") {
|
||||||
|
@ -456,11 +456,11 @@ class BlockManagerSuite extends FunSuite with BeforeAndAfter with PrivateMethodT
|
||||||
store.putSingle("a1", a1, StorageLevel.MEMORY_AND_DISK_SER)
|
store.putSingle("a1", a1, StorageLevel.MEMORY_AND_DISK_SER)
|
||||||
store.putSingle("a2", a2, StorageLevel.MEMORY_AND_DISK_SER)
|
store.putSingle("a2", a2, StorageLevel.MEMORY_AND_DISK_SER)
|
||||||
store.putSingle("a3", a3, StorageLevel.MEMORY_AND_DISK_SER)
|
store.putSingle("a3", a3, StorageLevel.MEMORY_AND_DISK_SER)
|
||||||
assert(store.getSingle("a2") != None, "a2 was not in store")
|
assert(store.getSingle("a2").isDefined, "a2 was not in store")
|
||||||
assert(store.getSingle("a3") != None, "a3 was not in store")
|
assert(store.getSingle("a3").isDefined, "a3 was not in store")
|
||||||
assert(store.memoryStore.getValues("a1") == None, "a1 was in memory store")
|
assert(store.memoryStore.getValues("a1") == None, "a1 was in memory store")
|
||||||
assert(store.getSingle("a1") != None, "a1 was not in store")
|
assert(store.getSingle("a1").isDefined, "a1 was not in store")
|
||||||
assert(store.memoryStore.getValues("a1") != None, "a1 was not in memory store")
|
assert(store.memoryStore.getValues("a1").isDefined, "a1 was not in memory store")
|
||||||
}
|
}
|
||||||
|
|
||||||
test("disk and memory storage with serialization and getLocalBytes") {
|
test("disk and memory storage with serialization and getLocalBytes") {
|
||||||
|
@ -471,11 +471,11 @@ class BlockManagerSuite extends FunSuite with BeforeAndAfter with PrivateMethodT
|
||||||
store.putSingle("a1", a1, StorageLevel.MEMORY_AND_DISK_SER)
|
store.putSingle("a1", a1, StorageLevel.MEMORY_AND_DISK_SER)
|
||||||
store.putSingle("a2", a2, StorageLevel.MEMORY_AND_DISK_SER)
|
store.putSingle("a2", a2, StorageLevel.MEMORY_AND_DISK_SER)
|
||||||
store.putSingle("a3", a3, StorageLevel.MEMORY_AND_DISK_SER)
|
store.putSingle("a3", a3, StorageLevel.MEMORY_AND_DISK_SER)
|
||||||
assert(store.getLocalBytes("a2") != None, "a2 was not in store")
|
assert(store.getLocalBytes("a2").isDefined, "a2 was not in store")
|
||||||
assert(store.getLocalBytes("a3") != None, "a3 was not in store")
|
assert(store.getLocalBytes("a3").isDefined, "a3 was not in store")
|
||||||
assert(store.memoryStore.getValues("a1") == None, "a1 was in memory store")
|
assert(store.memoryStore.getValues("a1") == None, "a1 was in memory store")
|
||||||
assert(store.getLocalBytes("a1") != None, "a1 was not in store")
|
assert(store.getLocalBytes("a1").isDefined, "a1 was not in store")
|
||||||
assert(store.memoryStore.getValues("a1") != None, "a1 was not in memory store")
|
assert(store.memoryStore.getValues("a1").isDefined, "a1 was not in memory store")
|
||||||
}
|
}
|
||||||
|
|
||||||
test("LRU with mixed storage levels") {
|
test("LRU with mixed storage levels") {
|
||||||
|
@ -489,18 +489,18 @@ class BlockManagerSuite extends FunSuite with BeforeAndAfter with PrivateMethodT
|
||||||
store.putSingle("a2", a2, StorageLevel.MEMORY_ONLY_SER)
|
store.putSingle("a2", a2, StorageLevel.MEMORY_ONLY_SER)
|
||||||
store.putSingle("a3", a3, StorageLevel.DISK_ONLY)
|
store.putSingle("a3", a3, StorageLevel.DISK_ONLY)
|
||||||
// At this point LRU should not kick in because a3 is only on disk
|
// At this point LRU should not kick in because a3 is only on disk
|
||||||
assert(store.getSingle("a1") != None, "a2 was not in store")
|
assert(store.getSingle("a1").isDefined, "a2 was not in store")
|
||||||
assert(store.getSingle("a2") != None, "a3 was not in store")
|
assert(store.getSingle("a2").isDefined, "a3 was not in store")
|
||||||
assert(store.getSingle("a3") != None, "a1 was not in store")
|
assert(store.getSingle("a3").isDefined, "a1 was not in store")
|
||||||
assert(store.getSingle("a1") != None, "a2 was not in store")
|
assert(store.getSingle("a1").isDefined, "a2 was not in store")
|
||||||
assert(store.getSingle("a2") != None, "a3 was not in store")
|
assert(store.getSingle("a2").isDefined, "a3 was not in store")
|
||||||
assert(store.getSingle("a3") != None, "a1 was not in store")
|
assert(store.getSingle("a3").isDefined, "a1 was not in store")
|
||||||
// Now let's add in a4, which uses both disk and memory; a1 should drop out
|
// Now let's add in a4, which uses both disk and memory; a1 should drop out
|
||||||
store.putSingle("a4", a4, StorageLevel.MEMORY_AND_DISK_SER)
|
store.putSingle("a4", a4, StorageLevel.MEMORY_AND_DISK_SER)
|
||||||
assert(store.getSingle("a1") == None, "a1 was in store")
|
assert(store.getSingle("a1") == None, "a1 was in store")
|
||||||
assert(store.getSingle("a2") != None, "a2 was not in store")
|
assert(store.getSingle("a2").isDefined, "a2 was not in store")
|
||||||
assert(store.getSingle("a3") != None, "a3 was not in store")
|
assert(store.getSingle("a3").isDefined, "a3 was not in store")
|
||||||
assert(store.getSingle("a4") != None, "a4 was not in store")
|
assert(store.getSingle("a4").isDefined, "a4 was not in store")
|
||||||
}
|
}
|
||||||
|
|
||||||
test("in-memory LRU with streams") {
|
test("in-memory LRU with streams") {
|
||||||
|
@ -511,18 +511,18 @@ class BlockManagerSuite extends FunSuite with BeforeAndAfter with PrivateMethodT
|
||||||
store.put("list1", list1.iterator, StorageLevel.MEMORY_ONLY, tellMaster = true)
|
store.put("list1", list1.iterator, StorageLevel.MEMORY_ONLY, tellMaster = true)
|
||||||
store.put("list2", list2.iterator, StorageLevel.MEMORY_ONLY, tellMaster = true)
|
store.put("list2", list2.iterator, StorageLevel.MEMORY_ONLY, tellMaster = true)
|
||||||
store.put("list3", list3.iterator, StorageLevel.MEMORY_ONLY, tellMaster = true)
|
store.put("list3", list3.iterator, StorageLevel.MEMORY_ONLY, tellMaster = true)
|
||||||
assert(store.get("list2") != None, "list2 was not in store")
|
assert(store.get("list2").isDefined, "list2 was not in store")
|
||||||
assert(store.get("list2").get.size == 2)
|
assert(store.get("list2").get.size == 2)
|
||||||
assert(store.get("list3") != None, "list3 was not in store")
|
assert(store.get("list3").isDefined, "list3 was not in store")
|
||||||
assert(store.get("list3").get.size == 2)
|
assert(store.get("list3").get.size == 2)
|
||||||
assert(store.get("list1") === None, "list1 was in store")
|
assert(store.get("list1") === None, "list1 was in store")
|
||||||
assert(store.get("list2") != None, "list2 was not in store")
|
assert(store.get("list2").isDefined, "list2 was not in store")
|
||||||
assert(store.get("list2").get.size == 2)
|
assert(store.get("list2").get.size == 2)
|
||||||
// At this point list2 was gotten last, so LRU will getSingle rid of list3
|
// At this point list2 was gotten last, so LRU will getSingle rid of list3
|
||||||
store.put("list1", list1.iterator, StorageLevel.MEMORY_ONLY, tellMaster = true)
|
store.put("list1", list1.iterator, StorageLevel.MEMORY_ONLY, tellMaster = true)
|
||||||
assert(store.get("list1") != None, "list1 was not in store")
|
assert(store.get("list1").isDefined, "list1 was not in store")
|
||||||
assert(store.get("list1").get.size == 2)
|
assert(store.get("list1").get.size == 2)
|
||||||
assert(store.get("list2") != None, "list2 was not in store")
|
assert(store.get("list2").isDefined, "list2 was not in store")
|
||||||
assert(store.get("list2").get.size == 2)
|
assert(store.get("list2").get.size == 2)
|
||||||
assert(store.get("list3") === None, "list1 was in store")
|
assert(store.get("list3") === None, "list1 was in store")
|
||||||
}
|
}
|
||||||
|
@ -538,26 +538,26 @@ class BlockManagerSuite extends FunSuite with BeforeAndAfter with PrivateMethodT
|
||||||
store.put("list2", list2.iterator, StorageLevel.MEMORY_ONLY_SER, tellMaster = true)
|
store.put("list2", list2.iterator, StorageLevel.MEMORY_ONLY_SER, tellMaster = true)
|
||||||
store.put("list3", list3.iterator, StorageLevel.DISK_ONLY, tellMaster = true)
|
store.put("list3", list3.iterator, StorageLevel.DISK_ONLY, tellMaster = true)
|
||||||
// At this point LRU should not kick in because list3 is only on disk
|
// At this point LRU should not kick in because list3 is only on disk
|
||||||
assert(store.get("list1") != None, "list2 was not in store")
|
assert(store.get("list1").isDefined, "list2 was not in store")
|
||||||
assert(store.get("list1").get.size === 2)
|
assert(store.get("list1").get.size === 2)
|
||||||
assert(store.get("list2") != None, "list3 was not in store")
|
assert(store.get("list2").isDefined, "list3 was not in store")
|
||||||
assert(store.get("list2").get.size === 2)
|
assert(store.get("list2").get.size === 2)
|
||||||
assert(store.get("list3") != None, "list1 was not in store")
|
assert(store.get("list3").isDefined, "list1 was not in store")
|
||||||
assert(store.get("list3").get.size === 2)
|
assert(store.get("list3").get.size === 2)
|
||||||
assert(store.get("list1") != None, "list2 was not in store")
|
assert(store.get("list1").isDefined, "list2 was not in store")
|
||||||
assert(store.get("list1").get.size === 2)
|
assert(store.get("list1").get.size === 2)
|
||||||
assert(store.get("list2") != None, "list3 was not in store")
|
assert(store.get("list2").isDefined, "list3 was not in store")
|
||||||
assert(store.get("list2").get.size === 2)
|
assert(store.get("list2").get.size === 2)
|
||||||
assert(store.get("list3") != None, "list1 was not in store")
|
assert(store.get("list3").isDefined, "list1 was not in store")
|
||||||
assert(store.get("list3").get.size === 2)
|
assert(store.get("list3").get.size === 2)
|
||||||
// Now let's add in list4, which uses both disk and memory; list1 should drop out
|
// Now let's add in list4, which uses both disk and memory; list1 should drop out
|
||||||
store.put("list4", list4.iterator, StorageLevel.MEMORY_AND_DISK_SER, tellMaster = true)
|
store.put("list4", list4.iterator, StorageLevel.MEMORY_AND_DISK_SER, tellMaster = true)
|
||||||
assert(store.get("list1") === None, "list1 was in store")
|
assert(store.get("list1") === None, "list1 was in store")
|
||||||
assert(store.get("list2") != None, "list3 was not in store")
|
assert(store.get("list2").isDefined, "list3 was not in store")
|
||||||
assert(store.get("list2").get.size === 2)
|
assert(store.get("list2").get.size === 2)
|
||||||
assert(store.get("list3") != None, "list1 was not in store")
|
assert(store.get("list3").isDefined, "list1 was not in store")
|
||||||
assert(store.get("list3").get.size === 2)
|
assert(store.get("list3").get.size === 2)
|
||||||
assert(store.get("list4") != None, "list4 was not in store")
|
assert(store.get("list4").isDefined, "list4 was not in store")
|
||||||
assert(store.get("list4").get.size === 2)
|
assert(store.get("list4").get.size === 2)
|
||||||
}
|
}
|
||||||
|
|
||||||
|
@ -579,7 +579,7 @@ class BlockManagerSuite extends FunSuite with BeforeAndAfter with PrivateMethodT
|
||||||
assert(store.getSingle("a1") === None, "a1 was in store")
|
assert(store.getSingle("a1") === None, "a1 was in store")
|
||||||
store.putSingle("a2", new Array[Byte](1000), StorageLevel.MEMORY_AND_DISK)
|
store.putSingle("a2", new Array[Byte](1000), StorageLevel.MEMORY_AND_DISK)
|
||||||
assert(store.memoryStore.getValues("a2") === None, "a2 was in memory store")
|
assert(store.memoryStore.getValues("a2") === None, "a2 was in memory store")
|
||||||
assert(store.getSingle("a2") != None, "a2 was not in store")
|
assert(store.getSingle("a2").isDefined, "a2 was not in store")
|
||||||
}
|
}
|
||||||
|
|
||||||
test("block compression") {
|
test("block compression") {
|
||||||
|
|
Loading…
Reference in a new issue