diff --git a/core/src/main/scala/spark/BlockStoreShuffleFetcher.scala b/core/src/main/scala/spark/BlockStoreShuffleFetcher.scala index 06d2d09fce..3431ad2258 100644 --- a/core/src/main/scala/spark/BlockStoreShuffleFetcher.scala +++ b/core/src/main/scala/spark/BlockStoreShuffleFetcher.scala @@ -33,11 +33,7 @@ class BlockStoreShuffleFetcher extends ShuffleFetcher with Logging { } try { - val blockOptions = blockManager.get(blocksByAddress) - logDebug("Fetching map output blocks for shuffle %d, reduce %d took %d ms".format( - shuffleId, reduceId, System.currentTimeMillis - startTime)) - blockOptions.foreach(x => { - val (blockId, blockOption) = x + for ((blockId, blockOption) <- blockManager.getMultiple(blocksByAddress)) { blockOption match { case Some(block) => { val values = block @@ -50,7 +46,7 @@ class BlockStoreShuffleFetcher extends ShuffleFetcher with Logging { throw new BlockException(blockId, "Did not get block " + blockId) } } - }) + } } catch { case be: BlockException => { val regex = "shuffledid_([0-9]*)_([0-9]*)_([0-9]]*)".r @@ -65,5 +61,7 @@ class BlockStoreShuffleFetcher extends ShuffleFetcher with Logging { } } } + logDebug("Fetching and merging outputs of shuffle %d, reduce %d took %d ms".format( + shuffleId, reduceId, System.currentTimeMillis - startTime)) } } diff --git a/core/src/main/scala/spark/PipedRDD.scala b/core/src/main/scala/spark/PipedRDD.scala index 4fcfd869cf..3103d7889b 100644 --- a/core/src/main/scala/spark/PipedRDD.scala +++ b/core/src/main/scala/spark/PipedRDD.scala @@ -57,7 +57,21 @@ class PipedRDD[T: ClassManifest]( }.start() // Return an iterator that read lines from the process's stdout - Source.fromInputStream(proc.getInputStream).getLines + val lines = Source.fromInputStream(proc.getInputStream).getLines + return new Iterator[String] { + def next() = lines.next() + def hasNext = { + if (lines.hasNext) { + true + } else { + val exitStatus = proc.waitFor() + if (exitStatus != 0) { + throw new Exception("Subprocess exited with status " + exitStatus) + } + false + } + } + } } } diff --git a/core/src/main/scala/spark/SizeEstimator.scala b/core/src/main/scala/spark/SizeEstimator.scala index b3bd4daa73..e5ad8b52dc 100644 --- a/core/src/main/scala/spark/SizeEstimator.scala +++ b/core/src/main/scala/spark/SizeEstimator.scala @@ -7,6 +7,10 @@ import java.util.IdentityHashMap import java.util.concurrent.ConcurrentHashMap import java.util.Random +import javax.management.MBeanServer +import java.lang.management.ManagementFactory +import com.sun.management.HotSpotDiagnosticMXBean + import scala.collection.mutable.ArrayBuffer import it.unimi.dsi.fastutil.ints.IntOpenHashSet @@ -18,9 +22,7 @@ import it.unimi.dsi.fastutil.ints.IntOpenHashSet * Based on the following JavaWorld article: * http://www.javaworld.com/javaworld/javaqa/2003-12/02-qa-1226-sizeof.html */ -object SizeEstimator { - private val OBJECT_SIZE = 8 // Minimum size of a java.lang.Object - private val POINTER_SIZE = 4 // Size of an object reference +object SizeEstimator extends Logging { // Sizes of primitive types private val BYTE_SIZE = 1 @@ -32,9 +34,68 @@ object SizeEstimator { private val FLOAT_SIZE = 4 private val DOUBLE_SIZE = 8 + // Alignment boundary for objects + // TODO: Is this arch dependent ? + private val ALIGN_SIZE = 8 + // A cache of ClassInfo objects for each class private val classInfos = new ConcurrentHashMap[Class[_], ClassInfo] - classInfos.put(classOf[Object], new ClassInfo(OBJECT_SIZE, Nil)) + + // Object and pointer sizes are arch dependent + private var is64bit = false + + // Size of an object reference + // Based on https://wikis.oracle.com/display/HotSpotInternals/CompressedOops + private var isCompressedOops = false + private var pointerSize = 4 + + // Minimum size of a java.lang.Object + private var objectSize = 8 + + initialize() + + // Sets object size, pointer size based on architecture and CompressedOops settings + // from the JVM. + private def initialize() { + is64bit = System.getProperty("os.arch").contains("64") + isCompressedOops = getIsCompressedOops + + objectSize = if (!is64bit) 8 else { + if(!isCompressedOops) { + 16 + } else { + 12 + } + } + pointerSize = if (is64bit && !isCompressedOops) 8 else 4 + classInfos.clear() + classInfos.put(classOf[Object], new ClassInfo(objectSize, Nil)) + } + + private def getIsCompressedOops : Boolean = { + if (System.getProperty("spark.test.useCompressedOops") != null) { + return System.getProperty("spark.test.useCompressedOops").toBoolean + } + try { + val hotSpotMBeanName = "com.sun.management:type=HotSpotDiagnostic"; + val server = ManagementFactory.getPlatformMBeanServer(); + val bean = ManagementFactory.newPlatformMXBeanProxy(server, + hotSpotMBeanName, classOf[HotSpotDiagnosticMXBean]); + return bean.getVMOption("UseCompressedOops").getValue.toBoolean + } catch { + case e: IllegalArgumentException => { + logWarning("Exception while trying to check if compressed oops is enabled", e) + // Fall back to checking if maxMemory < 32GB + return Runtime.getRuntime.maxMemory < (32L*1024*1024*1024) + } + + case e: SecurityException => { + logWarning("No permission to create MBeanServer", e) + // Fall back to checking if maxMemory < 32GB + return Runtime.getRuntime.maxMemory < (32L*1024*1024*1024) + } + } + } /** * The state of an ongoing size estimation. Contains a stack of objects to visit as well as an @@ -101,10 +162,17 @@ object SizeEstimator { private def visitArray(array: AnyRef, cls: Class[_], state: SearchState) { val length = JArray.getLength(array) val elementClass = cls.getComponentType + + // Arrays have object header and length field which is an integer + var arrSize: Long = alignSize(objectSize + INT_SIZE) + if (elementClass.isPrimitive) { - state.size += length * primitiveSize(elementClass) + arrSize += alignSize(length * primitiveSize(elementClass)) + state.size += arrSize } else { - state.size += length * POINTER_SIZE + arrSize += alignSize(length * pointerSize) + state.size += arrSize + if (length <= ARRAY_SIZE_FOR_SAMPLING) { for (i <- 0 until length) { state.enqueue(JArray.get(array, i)) @@ -170,15 +238,22 @@ object SizeEstimator { shellSize += primitiveSize(fieldClass) } else { field.setAccessible(true) // Enable future get()'s on this field - shellSize += POINTER_SIZE + shellSize += pointerSize pointerFields = field :: pointerFields } } } + shellSize = alignSize(shellSize) + // Create and cache a new ClassInfo val newInfo = new ClassInfo(shellSize, pointerFields) classInfos.put(cls, newInfo) return newInfo } + + private def alignSize(size: Long): Long = { + val rem = size % ALIGN_SIZE + return if (rem == 0) size else (size + ALIGN_SIZE - rem) + } } diff --git a/core/src/main/scala/spark/api/java/JavaSparkContext.scala b/core/src/main/scala/spark/api/java/JavaSparkContext.scala index 08c92b145e..4a7d945a8d 100644 --- a/core/src/main/scala/spark/api/java/JavaSparkContext.scala +++ b/core/src/main/scala/spark/api/java/JavaSparkContext.scala @@ -20,6 +20,12 @@ class JavaSparkContext(val sc: SparkContext) extends JavaSparkContextVarargsWork def this(master: String, frameworkName: String) = this(new SparkContext(master, frameworkName)) + def this(master: String, frameworkName: String, sparkHome: String, jarFile: String) = + this(new SparkContext(master, frameworkName, sparkHome, Seq(jarFile))) + + def this(master: String, frameworkName: String, sparkHome: String, jars: Array[String]) = + this(new SparkContext(master, frameworkName, sparkHome, jars.toSeq)) + val env = sc.env def parallelize[T](list: java.util.List[T], numSlices: Int): JavaRDD[T] = { diff --git a/core/src/main/scala/spark/api/java/function/DoubleFlatMapFunction.java b/core/src/main/scala/spark/api/java/function/DoubleFlatMapFunction.java index 240747390c..7b6478c2cd 100644 --- a/core/src/main/scala/spark/api/java/function/DoubleFlatMapFunction.java +++ b/core/src/main/scala/spark/api/java/function/DoubleFlatMapFunction.java @@ -9,5 +9,9 @@ import java.io.Serializable; // overloaded for both FlatMapFunction and DoubleFlatMapFunction. public abstract class DoubleFlatMapFunction extends AbstractFunction1> implements Serializable { - public abstract Iterable apply(T t); + + public abstract Iterable call(T t); + + @Override + public final Iterable apply(T t) { return call(t); } } diff --git a/core/src/main/scala/spark/api/java/function/DoubleFunction.java b/core/src/main/scala/spark/api/java/function/DoubleFunction.java index 378ffd427d..a03a72c835 100644 --- a/core/src/main/scala/spark/api/java/function/DoubleFunction.java +++ b/core/src/main/scala/spark/api/java/function/DoubleFunction.java @@ -7,7 +7,8 @@ import java.io.Serializable; // DoubleFunction does not extend Function because some UDF functions, like map, // are overloaded for both Function and DoubleFunction. -public abstract class DoubleFunction extends AbstractFunction1 +public abstract class DoubleFunction extends WrappedFunction1 implements Serializable { - public abstract Double apply(T t); + + public abstract Double call(T t) throws Exception; } diff --git a/core/src/main/scala/spark/api/java/function/FlatMapFunction.scala b/core/src/main/scala/spark/api/java/function/FlatMapFunction.scala index 1045e006a0..bcba38c569 100644 --- a/core/src/main/scala/spark/api/java/function/FlatMapFunction.scala +++ b/core/src/main/scala/spark/api/java/function/FlatMapFunction.scala @@ -1,7 +1,8 @@ package spark.api.java.function abstract class FlatMapFunction[T, R] extends Function[T, java.lang.Iterable[R]] { - def apply(x: T) : java.lang.Iterable[R] + @throws(classOf[Exception]) + def call(x: T) : java.lang.Iterable[R] def elementType() : ClassManifest[R] = ClassManifest.Any.asInstanceOf[ClassManifest[R]] } diff --git a/core/src/main/scala/spark/api/java/function/Function.java b/core/src/main/scala/spark/api/java/function/Function.java index ad38b89f0f..f6f2e5fd76 100644 --- a/core/src/main/scala/spark/api/java/function/Function.java +++ b/core/src/main/scala/spark/api/java/function/Function.java @@ -11,8 +11,8 @@ import java.io.Serializable; * Base class for functions whose return types do not have special RDDs; DoubleFunction is * handled separately, to allow DoubleRDDs to be constructed when mapping RDDs to doubles. */ -public abstract class Function extends AbstractFunction1 implements Serializable { - public abstract R apply(T t); +public abstract class Function extends WrappedFunction1 implements Serializable { + public abstract R call(T t) throws Exception; public ClassManifest returnType() { return (ClassManifest) ClassManifest$.MODULE$.fromClass(Object.class); diff --git a/core/src/main/scala/spark/api/java/function/Function2.java b/core/src/main/scala/spark/api/java/function/Function2.java index 883804dfe4..be48b173b8 100644 --- a/core/src/main/scala/spark/api/java/function/Function2.java +++ b/core/src/main/scala/spark/api/java/function/Function2.java @@ -6,12 +6,13 @@ import scala.runtime.AbstractFunction2; import java.io.Serializable; -public abstract class Function2 extends AbstractFunction2 +public abstract class Function2 extends WrappedFunction2 implements Serializable { + + public abstract R call(T1 t1, T2 t2) throws Exception; + public ClassManifest returnType() { return (ClassManifest) ClassManifest$.MODULE$.fromClass(Object.class); } - - public abstract R apply(T1 t1, T2 t2); } diff --git a/core/src/main/scala/spark/api/java/function/PairFlatMapFunction.java b/core/src/main/scala/spark/api/java/function/PairFlatMapFunction.java index aead6c4e03..c074b9c717 100644 --- a/core/src/main/scala/spark/api/java/function/PairFlatMapFunction.java +++ b/core/src/main/scala/spark/api/java/function/PairFlatMapFunction.java @@ -9,8 +9,11 @@ import java.io.Serializable; // PairFlatMapFunction does not extend FlatMapFunction because flatMap is // overloaded for both FlatMapFunction and PairFlatMapFunction. -public abstract class PairFlatMapFunction extends AbstractFunction1>> implements Serializable { +public abstract class PairFlatMapFunction + extends WrappedFunction1>> + implements Serializable { + + public abstract Iterable> call(T t) throws Exception; public ClassManifest keyType() { return (ClassManifest) ClassManifest$.MODULE$.fromClass(Object.class); @@ -19,7 +22,4 @@ public abstract class PairFlatMapFunction extends AbstractFunction1 valueType() { return (ClassManifest) ClassManifest$.MODULE$.fromClass(Object.class); } - - public abstract Iterable> apply(T t); - } diff --git a/core/src/main/scala/spark/api/java/function/PairFunction.java b/core/src/main/scala/spark/api/java/function/PairFunction.java index 3284bfb11e..7f5bb7de13 100644 --- a/core/src/main/scala/spark/api/java/function/PairFunction.java +++ b/core/src/main/scala/spark/api/java/function/PairFunction.java @@ -9,8 +9,11 @@ import java.io.Serializable; // PairFunction does not extend Function because some UDF functions, like map, // are overloaded for both Function and PairFunction. -public abstract class PairFunction extends AbstractFunction1> implements Serializable { +public abstract class PairFunction + extends WrappedFunction1> + implements Serializable { + + public abstract Tuple2 call(T t) throws Exception; public ClassManifest keyType() { return (ClassManifest) ClassManifest$.MODULE$.fromClass(Object.class); @@ -19,7 +22,4 @@ public abstract class PairFunction extends AbstractFunction1 valueType() { return (ClassManifest) ClassManifest$.MODULE$.fromClass(Object.class); } - - public abstract Tuple2 apply(T t); - } diff --git a/core/src/main/scala/spark/api/java/function/VoidFunction.scala b/core/src/main/scala/spark/api/java/function/VoidFunction.scala index be4cbaff39..0eefe337e8 100644 --- a/core/src/main/scala/spark/api/java/function/VoidFunction.scala +++ b/core/src/main/scala/spark/api/java/function/VoidFunction.scala @@ -2,11 +2,12 @@ package spark.api.java.function // This allows Java users to write void methods without having to return Unit. abstract class VoidFunction[T] extends Serializable { - def apply(t: T) : Unit + @throws(classOf[Exception]) + def call(t: T) : Unit } // VoidFunction cannot extend AbstractFunction1 (because that would force users to explicitly // return Unit), so it is implicitly converted to a Function1[T, Unit]: object VoidFunction { - implicit def toFunction[T](f: VoidFunction[T]) : Function1[T, Unit] = ((x : T) => f(x)) + implicit def toFunction[T](f: VoidFunction[T]) : Function1[T, Unit] = ((x : T) => f.call(x)) } \ No newline at end of file diff --git a/core/src/main/scala/spark/api/java/function/WrappedFunction1.scala b/core/src/main/scala/spark/api/java/function/WrappedFunction1.scala new file mode 100644 index 0000000000..d08e1e9fbf --- /dev/null +++ b/core/src/main/scala/spark/api/java/function/WrappedFunction1.scala @@ -0,0 +1,15 @@ +package spark.api.java.function + +import scala.runtime.AbstractFunction1 + +/** + * Subclass of Function1 for ease of calling from Java. The main thing it does is re-expose the + * apply() method as call() and declare that it can throw Exception (since AbstractFunction1.apply + * isn't marked to allow that). + */ +abstract class WrappedFunction1[T, R] extends AbstractFunction1[T, R] { + @throws(classOf[Exception]) + def call(t: T): R + + final def apply(t: T): R = call(t) +} diff --git a/core/src/main/scala/spark/api/java/function/WrappedFunction2.scala b/core/src/main/scala/spark/api/java/function/WrappedFunction2.scala new file mode 100644 index 0000000000..c9d67d9771 --- /dev/null +++ b/core/src/main/scala/spark/api/java/function/WrappedFunction2.scala @@ -0,0 +1,15 @@ +package spark.api.java.function + +import scala.runtime.AbstractFunction2 + +/** + * Subclass of Function2 for ease of calling from Java. The main thing it does is re-expose the + * apply() method as call() and declare that it can throw Exception (since AbstractFunction2.apply + * isn't marked to allow that). + */ +abstract class WrappedFunction2[T1, T2, R] extends AbstractFunction2[T1, T2, R] { + @throws(classOf[Exception]) + def call(t1: T1, t2: T2): R + + final def apply(t1: T1, t2: T2): R = call(t1, t2) +} diff --git a/core/src/main/scala/spark/network/ConnectionManager.scala b/core/src/main/scala/spark/network/ConnectionManager.scala index f680a6419b..1a22d06cc8 100644 --- a/core/src/main/scala/spark/network/ConnectionManager.scala +++ b/core/src/main/scala/spark/network/ConnectionManager.scala @@ -2,20 +2,19 @@ package spark.network import spark._ -import scala.actors.Future -import scala.actors.Futures.future +import java.nio._ +import java.nio.channels._ +import java.nio.channels.spi._ +import java.net._ +import java.util.concurrent.Executors + import scala.collection.mutable.HashMap import scala.collection.mutable.SynchronizedMap import scala.collection.mutable.SynchronizedQueue import scala.collection.mutable.Queue import scala.collection.mutable.ArrayBuffer -import java.io._ -import java.nio._ -import java.nio.channels._ -import java.nio.channels.spi._ -import java.net._ -import java.util.concurrent.Executors +import akka.dispatch.{Promise, ExecutionContext, Future} case class ConnectionManagerId(host: String, port: Int) { def toSocketAddress() = new InetSocketAddress(host, port) @@ -29,10 +28,16 @@ object ConnectionManagerId { class ConnectionManager(port: Int) extends Logging { - case class MessageStatus(message: Message, connectionManagerId: ConnectionManagerId) { + class MessageStatus( + val message: Message, + val connectionManagerId: ConnectionManagerId, + completionHandler: MessageStatus => Unit) { + var ackMessage: Option[Message] = None var attempted = false var acked = false + + def markDone() { completionHandler(this) } } val selector = SelectorProvider.provider.openSelector() @@ -44,6 +49,9 @@ class ConnectionManager(port: Int) extends Logging { val connectionRequests = new SynchronizedQueue[SendingConnection] val keyInterestChangeRequests = new SynchronizedQueue[(SelectionKey, Int)] val sendMessageRequests = new Queue[(Message, SendingConnection)] + + implicit val futureExecContext = ExecutionContext.fromExecutor( + Executors.newCachedThreadPool(DaemonThreadFactory)) var onReceiveCallback: (BufferMessage, ConnectionManagerId) => Option[Message]= null @@ -173,7 +181,7 @@ class ConnectionManager(port: Int) extends Logging { status.synchronized { status.attempted = true status.acked = false - status.notifyAll() + status.markDone() } }) @@ -198,15 +206,14 @@ class ConnectionManager(port: Int) extends Logging { connectionsById -= sendingConnectionManagerId messageStatuses.synchronized { - messageStatuses - .values.filter(_.connectionManagerId == sendingConnectionManagerId).foreach(status => { - logInfo("Notifying " + status) - status.synchronized { - status.attempted = true - status.acked = false - status.notifyAll() - } - }) + for (s <- messageStatuses.values if s.connectionManagerId == sendingConnectionManagerId) { + logInfo("Notifying " + s) + s.synchronized { + s.attempted = true + s.acked = false + s.markDone() + } + } messageStatuses.retain((i, status) => { status.connectionManagerId != sendingConnectionManagerId @@ -260,7 +267,7 @@ class ConnectionManager(port: Int) extends Logging { sentMessageStatus.ackMessage = Some(message) sentMessageStatus.attempted = true sentMessageStatus.acked = true - sentMessageStatus.notifyAll() + sentMessageStatus.markDone() } } else { val ackMessage = if (onReceiveCallback != null) { @@ -296,7 +303,7 @@ class ConnectionManager(port: Int) extends Logging { connectionRequests += newConnection newConnection } - val connection = connectionsById.getOrElse(connectionManagerId, startNewConnection) + val connection = connectionsById.getOrElse(connectionManagerId, startNewConnection()) message.senderAddress = id.toSocketAddress() logInfo("Sending [" + message + "] to [" + connectionManagerId + "]") /*connection.send(message)*/ @@ -306,22 +313,15 @@ class ConnectionManager(port: Int) extends Logging { selector.wakeup() } - def sendMessageReliably(connectionManagerId: ConnectionManagerId, message: Message): Future[Option[Message]] = { - val messageStatus = new MessageStatus(message, connectionManagerId) + def sendMessageReliably(connectionManagerId: ConnectionManagerId, message: Message) + : Future[Option[Message]] = { + val promise = Promise[Option[Message]] + val status = new MessageStatus(message, connectionManagerId, s => promise.success(s.ackMessage)) messageStatuses.synchronized { - messageStatuses += ((message.id, messageStatus)) + messageStatuses += ((message.id, status)) } sendMessage(connectionManagerId, message) - future { - messageStatus.synchronized { - if (!messageStatus.attempted) { - logTrace("Waiting, " + messageStatuses.size + " statuses" ) - messageStatus.wait() - logTrace("Done waiting") - } - } - messageStatus.ackMessage - } + promise.future } def sendMessageReliablySync(connectionManagerId: ConnectionManagerId, message: Message): Option[Message] = { diff --git a/core/src/main/scala/spark/scheduler/ShuffleMapTask.scala b/core/src/main/scala/spark/scheduler/ShuffleMapTask.scala index db89db903e..f78e0e5fb2 100644 --- a/core/src/main/scala/spark/scheduler/ShuffleMapTask.scala +++ b/core/src/main/scala/spark/scheduler/ShuffleMapTask.scala @@ -5,6 +5,7 @@ import java.util.HashMap import java.util.zip.{GZIPInputStream, GZIPOutputStream} import scala.collection.mutable.ArrayBuffer +import scala.collection.JavaConversions._ import it.unimi.dsi.fastutil.io.FastBufferedOutputStream @@ -124,14 +125,10 @@ class ShuffleMapTask( val blockManager = SparkEnv.get.blockManager for (i <- 0 until numOutputSplits) { val blockId = "shuffleid_" + dep.shuffleId + "_" + partition + "_" + i - val arr = new ArrayBuffer[Any] - val iter = buckets(i).entrySet().iterator() - while (iter.hasNext()) { - val entry = iter.next() - arr += ((entry.getKey(), entry.getValue())) - } + // Get a scala iterator from java map + val iter: Iterator[(Any, Any)] = buckets(i).iterator // TODO: This should probably be DISK_ONLY - blockManager.put(blockId, arr.iterator, StorageLevel.MEMORY_ONLY, false) + blockManager.put(blockId, iter, StorageLevel.MEMORY_ONLY, false) } return SparkEnv.get.blockManager.blockManagerId } diff --git a/core/src/main/scala/spark/storage/BlockManager.scala b/core/src/main/scala/spark/storage/BlockManager.scala index cde74e5805..ff9914ae25 100644 --- a/core/src/main/scala/spark/storage/BlockManager.scala +++ b/core/src/main/scala/spark/storage/BlockManager.scala @@ -6,14 +6,10 @@ import java.nio.channels.FileChannel.MapMode import java.util.{HashMap => JHashMap} import java.util.LinkedHashMap import java.util.concurrent.ConcurrentHashMap +import java.util.concurrent.LinkedBlockingQueue import java.util.Collections -import scala.actors._ -import scala.actors.Actor._ -import scala.actors.Future -import scala.actors.Futures.future -import scala.actors.remote._ -import scala.actors.remote.RemoteActor._ +import akka.dispatch.{Await, Future} import scala.collection.mutable.ArrayBuffer import scala.collection.mutable.HashMap import scala.collection.JavaConversions._ @@ -29,6 +25,7 @@ import spark.SparkException import spark.Utils import spark.util.ByteBufferInputStream import spark.network._ +import akka.util.Duration class BlockManagerId(var ip: String, var port: Int) extends Externalizable { def this() = this(null, 0) @@ -80,6 +77,7 @@ class BlockManager(val master: BlockManagerMaster, val serializer: Serializer, m System.getProperty("spark.local.dir", System.getProperty("java.io.tmpdir"))) val connectionManager = new ConnectionManager(0) + implicit val futureExecContext = connectionManager.futureExecContext val connectionManagerId = connectionManager.id val blockManagerId = new BlockManagerId(connectionManagerId.host, connectionManagerId.port) @@ -261,15 +259,20 @@ class BlockManager(val master: BlockManagerMaster, val serializer: Serializer, m } /** - * Get many blocks from local and remote block manager using their BlockManagerIds. + * Get multiple blocks from local and remote block manager using their BlockManagerIds. Returns + * an Iterator of (block ID, value) pairs so that clients may handle blocks in a pipelined + * fashion as they're received. */ - def get(blocksByAddress: Seq[(BlockManagerId, Seq[String])]): HashMap[String, Option[Iterator[Any]]] = { + def getMultiple(blocksByAddress: Seq[(BlockManagerId, Seq[String])]) + : Iterator[(String, Option[Iterator[Any]])] = { + if (blocksByAddress == null) { throw new IllegalArgumentException("BlocksByAddress is null") } - logDebug("Getting " + blocksByAddress.map(_._2.size).sum + " blocks") + val totalBlocks = blocksByAddress.map(_._2.size).sum + logDebug("Getting " + totalBlocks + " blocks") var startTime = System.currentTimeMillis - val blocks = new HashMap[String,Option[Iterator[Any]]]() + val results = new LinkedBlockingQueue[(String, Option[Iterator[Any]])] val localBlockIds = new ArrayBuffer[String]() val remoteBlockIds = new ArrayBuffer[String]() val remoteBlockIdsPerLocation = new HashMap[BlockManagerId, Seq[String]]() @@ -285,12 +288,34 @@ class BlockManager(val master: BlockManagerMaster, val serializer: Serializer, m } // Start getting remote blocks - val remoteBlockFutures = remoteBlockIdsPerLocation.toSeq.map { case (bmId, bIds) => + for ((bmId, bIds) <- remoteBlockIdsPerLocation) { val cmId = ConnectionManagerId(bmId.ip, bmId.port) val blockMessages = bIds.map(bId => BlockMessage.fromGetBlock(GetBlock(bId))) val blockMessageArray = new BlockMessageArray(blockMessages) val future = connectionManager.sendMessageReliably(cmId, blockMessageArray.toBufferMessage) - (cmId, future) + future.onSuccess { + case Some(message) => { + val bufferMessage = message.asInstanceOf[BufferMessage] + val blockMessageArray = BlockMessageArray.fromBufferMessage(bufferMessage) + blockMessageArray.foreach(blockMessage => { + if (blockMessage.getType != BlockMessage.TYPE_GOT_BLOCK) { + throw new SparkException( + "Unexpected message " + blockMessage.getType + " received from " + cmId) + } + val buffer = blockMessage.getData + val blockId = blockMessage.getId + val block = dataDeserialize(buffer) + results.put((blockId, Some(block))) + logDebug("Got remote block " + blockId + " after " + Utils.getUsedTimeMs(startTime)) + }) + } + case None => { + logError("Could not get blocks from " + cmId) + for (blockId <- bIds) { + results.put((blockId, None)) + } + } + } } logDebug("Started remote gets for " + remoteBlockIds.size + " blocks in " + Utils.getUsedTimeMs(startTime) + " ms") @@ -300,7 +325,7 @@ class BlockManager(val master: BlockManagerMaster, val serializer: Serializer, m localBlockIds.foreach(id => { get(id) match { case Some(block) => { - blocks.update(id, Some(block)) + results.put((id, Some(block))) logDebug("Got local block " + id) } case None => { @@ -310,36 +335,17 @@ class BlockManager(val master: BlockManagerMaster, val serializer: Serializer, m }) logDebug("Got local blocks in " + Utils.getUsedTimeMs(startTime) + " ms") - // wait for and gather all the remote blocks - for ((cmId, future) <- remoteBlockFutures) { - var count = 0 - val oneBlockId = remoteBlockIdsPerLocation(new BlockManagerId(cmId.host, cmId.port)).head - future() match { - case Some(message) => { - val bufferMessage = message.asInstanceOf[BufferMessage] - val blockMessageArray = BlockMessageArray.fromBufferMessage(bufferMessage) - blockMessageArray.foreach(blockMessage => { - if (blockMessage.getType != BlockMessage.TYPE_GOT_BLOCK) { - throw new BlockException(oneBlockId, "Unexpected message received from " + cmId) - } - val buffer = blockMessage.getData - val blockId = blockMessage.getId - val block = dataDeserialize(buffer) - blocks.update(blockId, Some(block)) - logDebug("Got remote block " + blockId + " in " + Utils.getUsedTimeMs(startTime)) - count += 1 - }) - } - case None => { - throw new BlockException(oneBlockId, "Could not get blocks from " + cmId) - } - } - logDebug("Got remote " + count + " blocks from " + cmId.host + " in " + - Utils.getUsedTimeMs(startTime) + " ms") - } + // Return an iterator that will read fetched blocks off the queue as they arrive + return new Iterator[(String, Option[Iterator[Any]])] { + var resultsGotten = 0 - logDebug("Got all blocks in " + Utils.getUsedTimeMs(startTime) + " ms") - return blocks + def hasNext: Boolean = resultsGotten < totalBlocks + + def next(): (String, Option[Iterator[Any]]) = { + resultsGotten += 1 + results.take() + } + } } /** @@ -435,7 +441,7 @@ class BlockManager(val master: BlockManagerMaster, val serializer: Serializer, m // Initiate the replication before storing it locally. This is faster as // data is already serialized and ready for sending val replicationFuture = if (level.replication > 1) { - future { + Future { replicate(blockId, bytes, level) } } else { @@ -471,7 +477,7 @@ class BlockManager(val master: BlockManagerMaster, val serializer: Serializer, m if (replicationFuture == null) { throw new Exception("Unexpected") } - replicationFuture() + Await.ready(replicationFuture, Duration.Inf) } val finishTime = System.currentTimeMillis diff --git a/core/src/main/scala/spark/storage/BlockManagerWorker.scala b/core/src/main/scala/spark/storage/BlockManagerWorker.scala index c61e280252..d74cdb38a8 100644 --- a/core/src/main/scala/spark/storage/BlockManagerWorker.scala +++ b/core/src/main/scala/spark/storage/BlockManagerWorker.scala @@ -114,7 +114,7 @@ object BlockManagerWorker extends Logging { val blockMessage = BlockMessage.fromPutBlock(msg) val blockMessageArray = new BlockMessageArray(blockMessage) val resultMessage = connectionManager.sendMessageReliablySync( - toConnManagerId, blockMessageArray.toBufferMessage()) + toConnManagerId, blockMessageArray.toBufferMessage) return (resultMessage != None) } @@ -125,7 +125,7 @@ object BlockManagerWorker extends Logging { val blockMessage = BlockMessage.fromGetBlock(msg) val blockMessageArray = new BlockMessageArray(blockMessage) val responseMessage = connectionManager.sendMessageReliablySync( - toConnManagerId, blockMessageArray.toBufferMessage()) + toConnManagerId, blockMessageArray.toBufferMessage) responseMessage match { case Some(message) => { val bufferMessage = message.asInstanceOf[BufferMessage] diff --git a/core/src/main/scala/spark/storage/BlockMessageArray.scala b/core/src/main/scala/spark/storage/BlockMessageArray.scala index a108ab653e..497a19856e 100644 --- a/core/src/main/scala/spark/storage/BlockMessageArray.scala +++ b/core/src/main/scala/spark/storage/BlockMessageArray.scala @@ -36,7 +36,7 @@ class BlockMessageArray(var blockMessages: Seq[BlockMessage]) extends Seq[BlockM println() println() */ - while(buffer.remaining() > 0) { + while (buffer.remaining() > 0) { val size = buffer.getInt() logDebug("Creating block message of size " + size + " bytes") val newBuffer = buffer.slice() @@ -53,7 +53,7 @@ class BlockMessageArray(var blockMessages: Seq[BlockMessage]) extends Seq[BlockM this.blockMessages = newBlockMessages } - def toBufferMessage(): BufferMessage = { + def toBufferMessage: BufferMessage = { val buffers = new ArrayBuffer[ByteBuffer]() blockMessages.foreach(blockMessage => { diff --git a/core/src/test/scala/spark/BoundedMemoryCacheSuite.scala b/core/src/test/scala/spark/BoundedMemoryCacheSuite.scala index 024ce0b8d1..dff2970566 100644 --- a/core/src/test/scala/spark/BoundedMemoryCacheSuite.scala +++ b/core/src/test/scala/spark/BoundedMemoryCacheSuite.scala @@ -1,31 +1,50 @@ package spark import org.scalatest.FunSuite +import org.scalatest.PrivateMethodTester -class BoundedMemoryCacheSuite extends FunSuite { +class BoundedMemoryCacheSuite extends FunSuite with PrivateMethodTester { test("constructor test") { - val cache = new BoundedMemoryCache(40) - expect(40)(cache.getCapacity) + val cache = new BoundedMemoryCache(60) + expect(60)(cache.getCapacity) } test("caching") { - val cache = new BoundedMemoryCache(40) { + // Set the arch to 64-bit and compressedOops to true to get a deterministic test-case + val oldArch = System.setProperty("os.arch", "amd64") + val oldOops = System.setProperty("spark.test.useCompressedOops", "true") + val initialize = PrivateMethod[Unit]('initialize) + SizeEstimator invokePrivate initialize() + + val cache = new BoundedMemoryCache(60) { //TODO sorry about this, but there is not better way how to skip 'cacheTracker.dropEntry' override protected def reportEntryDropped(datasetId: Any, partition: Int, entry: Entry) { logInfo("Dropping key (%s, %d) of size %d to make space".format(datasetId, partition, entry.size)) } } //should be OK - expect(CachePutSuccess(30))(cache.put("1", 0, "Meh")) + expect(CachePutSuccess(56))(cache.put("1", 0, "Meh")) //we cannot add this to cache (there is not enough space in cache) & we cannot evict the only value from //cache because it's from the same dataset expect(CachePutFailure())(cache.put("1", 1, "Meh")) //should be OK, dataset '1' can be evicted from cache - expect(CachePutSuccess(30))(cache.put("2", 0, "Meh")) + expect(CachePutSuccess(56))(cache.put("2", 0, "Meh")) //should fail, cache should obey it's capacity expect(CachePutFailure())(cache.put("3", 0, "Very_long_and_useless_string")) + + if (oldArch != null) { + System.setProperty("os.arch", oldArch) + } else { + System.clearProperty("os.arch") + } + + if (oldOops != null) { + System.setProperty("spark.test.useCompressedOops", oldOops) + } else { + System.clearProperty("spark.test.useCompressedOops") + } } } diff --git a/core/src/test/scala/spark/JavaAPISuite.java b/core/src/test/scala/spark/JavaAPISuite.java index 5f0293e55b..24bf021710 100644 --- a/core/src/test/scala/spark/JavaAPISuite.java +++ b/core/src/test/scala/spark/JavaAPISuite.java @@ -117,7 +117,7 @@ public class JavaAPISuite implements Serializable { JavaRDD rdd = sc.parallelize(Arrays.asList("Hello", "World")); rdd.foreach(new VoidFunction() { @Override - public void apply(String s) { + public void call(String s) { System.out.println(s); } }); @@ -128,7 +128,7 @@ public class JavaAPISuite implements Serializable { JavaRDD rdd = sc.parallelize(Arrays.asList(1, 1, 2, 3, 5, 8, 13)); Function isOdd = new Function() { @Override - public Boolean apply(Integer x) { + public Boolean call(Integer x) { return x % 2 == 0; } }; @@ -166,7 +166,7 @@ public class JavaAPISuite implements Serializable { JavaRDD rdd = sc.parallelize(Arrays.asList(1, 1, 2, 3, 5, 8, 13)); Function2 add = new Function2() { @Override - public Integer apply(Integer a, Integer b) { + public Integer call(Integer a, Integer b) { return a + b; } }; @@ -191,7 +191,7 @@ public class JavaAPISuite implements Serializable { JavaPairRDD counts = rdd.reduceByKey( new Function2() { @Override - public Integer apply(Integer a, Integer b) { + public Integer call(Integer a, Integer b) { return a + b; } }); @@ -207,7 +207,7 @@ public class JavaAPISuite implements Serializable { localCounts = rdd.reduceByKeyLocally(new Function2() { @Override - public Integer apply(Integer a, Integer b) { + public Integer call(Integer a, Integer b) { return a + b; } }); @@ -252,7 +252,7 @@ public class JavaAPISuite implements Serializable { Assert.assertEquals(5, distinct.count()); JavaDoubleRDD filter = rdd.filter(new Function() { @Override - public Boolean apply(Double x) { + public Boolean call(Double x) { return x > 2.0; } }); @@ -279,19 +279,19 @@ public class JavaAPISuite implements Serializable { JavaRDD rdd = sc.parallelize(Arrays.asList(1, 2, 3, 4, 5)); JavaDoubleRDD doubles = rdd.map(new DoubleFunction() { @Override - public Double apply(Integer x) { + public Double call(Integer x) { return 1.0 * x; } }).cache(); JavaPairRDD pairs = rdd.map(new PairFunction() { @Override - public Tuple2 apply(Integer x) { + public Tuple2 call(Integer x) { return new Tuple2(x, x); } }).cache(); JavaRDD strings = rdd.map(new Function() { @Override - public String apply(Integer x) { + public String call(Integer x) { return x.toString(); } }).cache(); @@ -303,7 +303,7 @@ public class JavaAPISuite implements Serializable { "The quick brown fox jumps over the lazy dog.")); JavaRDD words = rdd.flatMap(new FlatMapFunction() { @Override - public Iterable apply(String x) { + public Iterable call(String x) { return Arrays.asList(x.split(" ")); } }); @@ -314,7 +314,7 @@ public class JavaAPISuite implements Serializable { new PairFlatMapFunction() { @Override - public Iterable> apply(String s) { + public Iterable> call(String s) { List> pairs = new LinkedList>(); for (String word : s.split(" ")) pairs.add(new Tuple2(word, word)); return pairs; @@ -326,7 +326,7 @@ public class JavaAPISuite implements Serializable { JavaDoubleRDD doubles = rdd.flatMap(new DoubleFlatMapFunction() { @Override - public Iterable apply(String s) { + public Iterable call(String s) { List lengths = new LinkedList(); for (String word : s.split(" ")) lengths.add(word.length() * 1.0); return lengths; @@ -343,7 +343,7 @@ public class JavaAPISuite implements Serializable { JavaRDD partitionSums = rdd.mapPartitions( new FlatMapFunction, Integer>() { @Override - public Iterable apply(Iterator iter) { + public Iterable call(Iterator iter) { int sum = 0; while (iter.hasNext()) { sum += iter.next(); @@ -417,7 +417,7 @@ public class JavaAPISuite implements Serializable { rdd.map(new PairFunction, IntWritable, Text>() { @Override - public Tuple2 apply(Tuple2 pair) { + public Tuple2 call(Tuple2 pair) { return new Tuple2(new IntWritable(pair._1()), new Text(pair._2())); } }).saveAsHadoopFile(outputDir, IntWritable.class, Text.class, SequenceFileOutputFormat.class); @@ -426,7 +426,7 @@ public class JavaAPISuite implements Serializable { JavaPairRDD readRDD = sc.sequenceFile(outputDir, IntWritable.class, Text.class).map(new PairFunction, Integer, String>() { @Override - public Tuple2 apply(Tuple2 pair) { + public Tuple2 call(Tuple2 pair) { return new Tuple2(pair._1().get(), pair._2().toString()); } }); @@ -446,7 +446,7 @@ public class JavaAPISuite implements Serializable { rdd.map(new PairFunction, IntWritable, Text>() { @Override - public Tuple2 apply(Tuple2 pair) { + public Tuple2 call(Tuple2 pair) { return new Tuple2(new IntWritable(pair._1()), new Text(pair._2())); } }).saveAsNewAPIHadoopFile(outputDir, IntWritable.class, Text.class, @@ -457,7 +457,7 @@ public class JavaAPISuite implements Serializable { Assert.assertEquals(pairs.toString(), output.map(new Function, String>() { @Override - public String apply(Tuple2 x) { + public String call(Tuple2 x) { return x.toString(); } }).collect().toString()); @@ -476,7 +476,7 @@ public class JavaAPISuite implements Serializable { rdd.map(new PairFunction, IntWritable, Text>() { @Override - public Tuple2 apply(Tuple2 pair) { + public Tuple2 call(Tuple2 pair) { return new Tuple2(new IntWritable(pair._1()), new Text(pair._2())); } }).saveAsHadoopFile(outputDir, IntWritable.class, Text.class, SequenceFileOutputFormat.class); @@ -487,7 +487,7 @@ public class JavaAPISuite implements Serializable { Assert.assertEquals(pairs.toString(), output.map(new Function, String>() { @Override - public String apply(Tuple2 x) { + public String call(Tuple2 x) { return x.toString(); } }).collect().toString()); @@ -534,7 +534,7 @@ public class JavaAPISuite implements Serializable { rdd.map(new PairFunction, IntWritable, Text>() { @Override - public Tuple2 apply(Tuple2 pair) { + public Tuple2 call(Tuple2 pair) { return new Tuple2(new IntWritable(pair._1()), new Text(pair._2())); } }).saveAsHadoopFile(outputDir, IntWritable.class, Text.class, SequenceFileOutputFormat.class); @@ -544,7 +544,7 @@ public class JavaAPISuite implements Serializable { Assert.assertEquals(pairs.toString(), output.map(new Function, String>() { @Override - public String apply(Tuple2 x) { + public String call(Tuple2 x) { return x.toString(); } }).collect().toString()); diff --git a/core/src/test/scala/spark/PipedRDDSuite.scala b/core/src/test/scala/spark/PipedRDDSuite.scala index db1b9835a0..d010a9be7a 100644 --- a/core/src/test/scala/spark/PipedRDDSuite.scala +++ b/core/src/test/scala/spark/PipedRDDSuite.scala @@ -39,6 +39,15 @@ class PipedRDDSuite extends FunSuite with BeforeAndAfter { assert(c(1) === "LALALA") } + test("pipe with non-zero exit status") { + sc = new SparkContext("local", "test") + val nums = sc.makeRDD(Array(1, 2, 3, 4), 2) + val piped = nums.pipe("cat nonexistent_file") + intercept[SparkException] { + piped.collect() + } + } + } diff --git a/core/src/test/scala/spark/SizeEstimatorSuite.scala b/core/src/test/scala/spark/SizeEstimatorSuite.scala index 63bc951858..a2015644ee 100644 --- a/core/src/test/scala/spark/SizeEstimatorSuite.scala +++ b/core/src/test/scala/spark/SizeEstimatorSuite.scala @@ -1,6 +1,8 @@ package spark import org.scalatest.FunSuite +import org.scalatest.BeforeAndAfterAll +import org.scalatest.PrivateMethodTester class DummyClass1 {} @@ -17,61 +19,114 @@ class DummyClass4(val d: DummyClass3) { val x: Int = 0 } -class SizeEstimatorSuite extends FunSuite { +class SizeEstimatorSuite extends FunSuite with BeforeAndAfterAll with PrivateMethodTester { + var oldArch: String = _ + var oldOops: String = _ + + override def beforeAll() { + // Set the arch to 64-bit and compressedOops to true to get a deterministic test-case + oldArch = System.setProperty("os.arch", "amd64") + oldOops = System.setProperty("spark.test.useCompressedOops", "true") + } + + override def afterAll() { + resetOrClear("os.arch", oldArch) + resetOrClear("spark.test.useCompressedOops", oldOops) + } + test("simple classes") { - expect(8)(SizeEstimator.estimate(new DummyClass1)) - expect(12)(SizeEstimator.estimate(new DummyClass2)) - expect(20)(SizeEstimator.estimate(new DummyClass3)) - expect(16)(SizeEstimator.estimate(new DummyClass4(null))) - expect(36)(SizeEstimator.estimate(new DummyClass4(new DummyClass3))) + expect(16)(SizeEstimator.estimate(new DummyClass1)) + expect(16)(SizeEstimator.estimate(new DummyClass2)) + expect(24)(SizeEstimator.estimate(new DummyClass3)) + expect(24)(SizeEstimator.estimate(new DummyClass4(null))) + expect(48)(SizeEstimator.estimate(new DummyClass4(new DummyClass3))) } test("strings") { - expect(24)(SizeEstimator.estimate("")) - expect(26)(SizeEstimator.estimate("a")) - expect(28)(SizeEstimator.estimate("ab")) - expect(40)(SizeEstimator.estimate("abcdefgh")) + expect(48)(SizeEstimator.estimate("")) + expect(56)(SizeEstimator.estimate("a")) + expect(56)(SizeEstimator.estimate("ab")) + expect(64)(SizeEstimator.estimate("abcdefgh")) } test("primitive arrays") { - expect(10)(SizeEstimator.estimate(new Array[Byte](10))) - expect(20)(SizeEstimator.estimate(new Array[Char](10))) - expect(20)(SizeEstimator.estimate(new Array[Short](10))) - expect(40)(SizeEstimator.estimate(new Array[Int](10))) - expect(80)(SizeEstimator.estimate(new Array[Long](10))) - expect(40)(SizeEstimator.estimate(new Array[Float](10))) - expect(80)(SizeEstimator.estimate(new Array[Double](10))) - expect(4000)(SizeEstimator.estimate(new Array[Int](1000))) - expect(8000)(SizeEstimator.estimate(new Array[Long](1000))) + expect(32)(SizeEstimator.estimate(new Array[Byte](10))) + expect(40)(SizeEstimator.estimate(new Array[Char](10))) + expect(40)(SizeEstimator.estimate(new Array[Short](10))) + expect(56)(SizeEstimator.estimate(new Array[Int](10))) + expect(96)(SizeEstimator.estimate(new Array[Long](10))) + expect(56)(SizeEstimator.estimate(new Array[Float](10))) + expect(96)(SizeEstimator.estimate(new Array[Double](10))) + expect(4016)(SizeEstimator.estimate(new Array[Int](1000))) + expect(8016)(SizeEstimator.estimate(new Array[Long](1000))) } test("object arrays") { // Arrays containing nulls should just have one pointer per element - expect(40)(SizeEstimator.estimate(new Array[String](10))) - expect(40)(SizeEstimator.estimate(new Array[AnyRef](10))) + expect(56)(SizeEstimator.estimate(new Array[String](10))) + expect(56)(SizeEstimator.estimate(new Array[AnyRef](10))) // For object arrays with non-null elements, each object should take one pointer plus // however many bytes that class takes. (Note that Array.fill calls the code in its // second parameter separately for each object, so we get distinct objects.) - expect(120)(SizeEstimator.estimate(Array.fill(10)(new DummyClass1))) - expect(160)(SizeEstimator.estimate(Array.fill(10)(new DummyClass2))) - expect(240)(SizeEstimator.estimate(Array.fill(10)(new DummyClass3))) - expect(12 + 16)(SizeEstimator.estimate(Array(new DummyClass1, new DummyClass2))) + expect(216)(SizeEstimator.estimate(Array.fill(10)(new DummyClass1))) + expect(216)(SizeEstimator.estimate(Array.fill(10)(new DummyClass2))) + expect(296)(SizeEstimator.estimate(Array.fill(10)(new DummyClass3))) + expect(56)(SizeEstimator.estimate(Array(new DummyClass1, new DummyClass2))) // Past size 100, our samples 100 elements, but we should still get the right size. - expect(24000)(SizeEstimator.estimate(Array.fill(1000)(new DummyClass3))) + expect(28016)(SizeEstimator.estimate(Array.fill(1000)(new DummyClass3))) // If an array contains the *same* element many times, we should only count it once. val d1 = new DummyClass1 - expect(48)(SizeEstimator.estimate(Array.fill(10)(d1))) // 10 pointers plus 8-byte object - expect(408)(SizeEstimator.estimate(Array.fill(100)(d1))) // 100 pointers plus 8-byte object + expect(72)(SizeEstimator.estimate(Array.fill(10)(d1))) // 10 pointers plus 8-byte object + expect(432)(SizeEstimator.estimate(Array.fill(100)(d1))) // 100 pointers plus 8-byte object // Same thing with huge array containing the same element many times. Note that this won't - // return exactly 4008 because it can't tell that *all* the elements will equal the first + // return exactly 4032 because it can't tell that *all* the elements will equal the first // one it samples, but it should be close to that. + + // TODO: If we sample 100 elements, this should always be 4176 ? val estimatedSize = SizeEstimator.estimate(Array.fill(1000)(d1)) assert(estimatedSize >= 4000, "Estimated size " + estimatedSize + " should be more than 4000") - assert(estimatedSize <= 4100, "Estimated size " + estimatedSize + " should be less than 4100") + assert(estimatedSize <= 4200, "Estimated size " + estimatedSize + " should be less than 4100") + } + + test("32-bit arch") { + val arch = System.setProperty("os.arch", "x86") + + val initialize = PrivateMethod[Unit]('initialize) + SizeEstimator invokePrivate initialize() + + expect(40)(SizeEstimator.estimate("")) + expect(48)(SizeEstimator.estimate("a")) + expect(48)(SizeEstimator.estimate("ab")) + expect(56)(SizeEstimator.estimate("abcdefgh")) + + resetOrClear("os.arch", arch) + } + + test("64-bit arch with no compressed oops") { + val arch = System.setProperty("os.arch", "amd64") + val oops = System.setProperty("spark.test.useCompressedOops", "false") + + val initialize = PrivateMethod[Unit]('initialize) + SizeEstimator invokePrivate initialize() + + expect(64)(SizeEstimator.estimate("")) + expect(72)(SizeEstimator.estimate("a")) + expect(72)(SizeEstimator.estimate("ab")) + expect(80)(SizeEstimator.estimate("abcdefgh")) + + resetOrClear("os.arch", arch) + resetOrClear("spark.test.useCompressedOops", oops) + } + + def resetOrClear(prop: String, oldValue: String) { + if (oldValue != null) { + System.setProperty(prop, oldValue) + } else { + System.clearProperty(prop) + } } } - diff --git a/core/src/test/scala/spark/storage/BlockManagerSuite.scala b/core/src/test/scala/spark/storage/BlockManagerSuite.scala index 61decd81e6..f3f891e471 100644 --- a/core/src/test/scala/spark/storage/BlockManagerSuite.scala +++ b/core/src/test/scala/spark/storage/BlockManagerSuite.scala @@ -6,17 +6,27 @@ import akka.actor._ import org.scalatest.FunSuite import org.scalatest.BeforeAndAfter +import org.scalatest.PrivateMethodTester import spark.KryoSerializer +import spark.SizeEstimator import spark.util.ByteBufferInputStream -class BlockManagerSuite extends FunSuite with BeforeAndAfter { +class BlockManagerSuite extends FunSuite with BeforeAndAfter with PrivateMethodTester { var actorSystem: ActorSystem = null var master: BlockManagerMaster = null + var oldArch: String = _ + var oldOops: String = _ before { actorSystem = ActorSystem("test") master = new BlockManagerMaster(actorSystem, true, true) + + // Set the arch to 64-bit and compressedOops to true to get a deterministic test-case + oldArch = System.setProperty("os.arch", "amd64") + oldOops = System.setProperty("spark.test.useCompressedOops", "true") + val initialize = PrivateMethod[Unit]('initialize) + SizeEstimator invokePrivate initialize() } after { @@ -24,6 +34,18 @@ class BlockManagerSuite extends FunSuite with BeforeAndAfter { actorSystem.awaitTermination() actorSystem = null master = null + + if (oldArch != null) { + System.setProperty("os.arch", oldArch) + } else { + System.clearProperty("os.arch") + } + + if (oldOops != null) { + System.setProperty("spark.test.useCompressedOops", oldOops) + } else { + System.clearProperty("spark.test.useCompressedOops") + } } test("manager-master interaction") { @@ -57,7 +79,7 @@ class BlockManagerSuite extends FunSuite with BeforeAndAfter { } test("in-memory LRU storage") { - val store = new BlockManager(master, new KryoSerializer, 1000) + val store = new BlockManager(master, new KryoSerializer, 1200) val a1 = new Array[Byte](400) val a2 = new Array[Byte](400) val a3 = new Array[Byte](400) @@ -78,7 +100,7 @@ class BlockManagerSuite extends FunSuite with BeforeAndAfter { } test("in-memory LRU storage with serialization") { - val store = new BlockManager(master, new KryoSerializer, 1000) + val store = new BlockManager(master, new KryoSerializer, 1200) val a1 = new Array[Byte](400) val a2 = new Array[Byte](400) val a3 = new Array[Byte](400) @@ -99,7 +121,7 @@ class BlockManagerSuite extends FunSuite with BeforeAndAfter { } test("on-disk storage") { - val store = new BlockManager(master, new KryoSerializer, 1000) + val store = new BlockManager(master, new KryoSerializer, 1200) val a1 = new Array[Byte](400) val a2 = new Array[Byte](400) val a3 = new Array[Byte](400) @@ -112,7 +134,7 @@ class BlockManagerSuite extends FunSuite with BeforeAndAfter { } test("disk and memory storage") { - val store = new BlockManager(master, new KryoSerializer, 1000) + val store = new BlockManager(master, new KryoSerializer, 1200) val a1 = new Array[Byte](400) val a2 = new Array[Byte](400) val a3 = new Array[Byte](400) @@ -126,7 +148,7 @@ class BlockManagerSuite extends FunSuite with BeforeAndAfter { } test("disk and memory storage with serialization") { - val store = new BlockManager(master, new KryoSerializer, 1000) + val store = new BlockManager(master, new KryoSerializer, 1200) val a1 = new Array[Byte](400) val a2 = new Array[Byte](400) val a3 = new Array[Byte](400) @@ -140,7 +162,7 @@ class BlockManagerSuite extends FunSuite with BeforeAndAfter { } test("LRU with mixed storage levels") { - val store = new BlockManager(master, new KryoSerializer, 1000) + val store = new BlockManager(master, new KryoSerializer, 1200) val a1 = new Array[Byte](400) val a2 = new Array[Byte](400) val a3 = new Array[Byte](400) @@ -166,7 +188,7 @@ class BlockManagerSuite extends FunSuite with BeforeAndAfter { } test("in-memory LRU with streams") { - val store = new BlockManager(master, new KryoSerializer, 1000) + val store = new BlockManager(master, new KryoSerializer, 1200) val list1 = List(new Array[Byte](200), new Array[Byte](200)) val list2 = List(new Array[Byte](200), new Array[Byte](200)) val list3 = List(new Array[Byte](200), new Array[Byte](200)) @@ -192,7 +214,7 @@ class BlockManagerSuite extends FunSuite with BeforeAndAfter { } test("LRU with mixed storage levels and streams") { - val store = new BlockManager(master, new KryoSerializer, 1000) + val store = new BlockManager(master, new KryoSerializer, 1200) val list1 = List(new Array[Byte](200), new Array[Byte](200)) val list2 = List(new Array[Byte](200), new Array[Byte](200)) val list3 = List(new Array[Byte](200), new Array[Byte](200)) diff --git a/ec2/spark-ec2-standalone b/ec2/spark-ec2-standalone deleted file mode 100755 index dccd478437..0000000000 --- a/ec2/spark-ec2-standalone +++ /dev/null @@ -1,20 +0,0 @@ -#!/bin/sh - -# Licensed to the Apache Software Foundation (ASF) under one -# or more contributor license agreements. See the NOTICE file -# distributed with this work for additional information -# regarding copyright ownership. The ASF licenses this file -# to you under the Apache License, Version 2.0 (the -# "License"); you may not use this file except in compliance -# with the License. You may obtain a copy of the License at -# -# http://www.apache.org/licenses/LICENSE-2.0 -# -# Unless required by applicable law or agreed to in writing, software -# distributed under the License is distributed on an "AS IS" BASIS, -# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. -# See the License for the specific language governing permissions and -# limitations under the License. - -cd "`dirname $0`" -PYTHONPATH="./third_party/boto-2.4.1.zip/boto-2.4.1:$PYTHONPATH" python ./spark_ec2.py --user ec2-user --cluster-type standalone -a standalone $@ diff --git a/ec2/spark_ec2.py b/ec2/spark_ec2.py index 971b0c6ad7..931e4068de 100755 --- a/ec2/spark_ec2.py +++ b/ec2/spark_ec2.py @@ -349,7 +349,7 @@ def setup_mesos_cluster(master, opts): def setup_standalone_cluster(master, slave_nodes, opts): slave_ips = '\n'.join([i.public_dns_name for i in slave_nodes]) ssh(master, opts, "echo \"%s\" > spark/conf/slaves" % (slave_ips)) - ssh(master, opts, "/home/ec2-user/spark/bin/start-all.sh") + ssh(master, opts, "/root/spark/bin/start-all.sh") # Wait for a whole cluster (masters, slaves and ZooKeeper) to start up @@ -448,7 +448,7 @@ def deploy_files(conn, root_dir, opts, master_nodes, slave_nodes, zoo_nodes): dest.close() # rsync the whole directory over to the master machine command = (("rsync -rv -e 'ssh -o StrictHostKeyChecking=no -i %s' " + - "'%s/' '%s@%s:~'") % (opts.identity_file, tmp_dir, opts.user, active_master)) + "'%s/' '%s@%s:/'") % (opts.identity_file, tmp_dir, opts.user, active_master)) subprocess.check_call(command, shell=True) # Remove the temp directory we created above shutil.rmtree(tmp_dir) diff --git a/examples/src/main/java/spark/examples/JavaHdfsLR.java b/examples/src/main/java/spark/examples/JavaHdfsLR.java index c7a6b4405a..71fc13fbce 100644 --- a/examples/src/main/java/spark/examples/JavaHdfsLR.java +++ b/examples/src/main/java/spark/examples/JavaHdfsLR.java @@ -26,8 +26,7 @@ public class JavaHdfsLR { } static class ParsePoint extends Function { - - public DataPoint apply(String line) { + public DataPoint call(String line) { StringTokenizer tok = new StringTokenizer(line, " "); double y = Double.parseDouble(tok.nextToken()); double[] x = new double[D]; @@ -41,8 +40,7 @@ public class JavaHdfsLR { } static class VectorSum extends Function2 { - - public double[] apply(double[] a, double[] b) { + public double[] call(double[] a, double[] b) { double[] result = new double[D]; for (int j = 0; j < D; j++) { result[j] = a[j] + b[j]; @@ -52,14 +50,13 @@ public class JavaHdfsLR { } static class ComputeGradient extends Function { - double[] weights; public ComputeGradient(double[] weights) { this.weights = weights; } - public double[] apply(DataPoint p) { + public double[] call(DataPoint p) { double[] gradient = new double[D]; for (int i = 0; i < D; i++) { double dot = dot(weights, p.x); diff --git a/examples/src/main/java/spark/examples/JavaTC.java b/examples/src/main/java/spark/examples/JavaTC.java index e6ca69ff97..25a465ec8e 100644 --- a/examples/src/main/java/spark/examples/JavaTC.java +++ b/examples/src/main/java/spark/examples/JavaTC.java @@ -11,6 +11,9 @@ import java.util.HashSet; import java.util.List; import java.util.Set; +/** + * Transitive closure on a graph, implemented in Java. + */ public class JavaTC { static int numEdges = 200; @@ -32,7 +35,7 @@ public class JavaTC { Integer, Integer> { static ProjectFn INSTANCE = new ProjectFn(); - public Tuple2 apply(Tuple2> triple) { + public Tuple2 call(Tuple2> triple) { return new Tuple2(triple._2()._2(), triple._2()._1()); } } @@ -53,12 +56,11 @@ public class JavaTC { // the graph to obtain the path (x, z). // Because join() joins on keys, the edges are stored in reversed order. - JavaPairRDD edges = tc.map(new PairFunction, - Integer, Integer>() { - @Override - public Tuple2 apply(Tuple2 e) { - return new Tuple2(e._2(), e._1()); - } + JavaPairRDD edges = tc.map( + new PairFunction, Integer, Integer>() { + public Tuple2 call(Tuple2 e) { + return new Tuple2(e._2(), e._1()); + } }); long oldCount = 0; diff --git a/examples/src/main/java/spark/examples/JavaWordCount.java b/examples/src/main/java/spark/examples/JavaWordCount.java index fb2feec09d..a44cf8a120 100644 --- a/examples/src/main/java/spark/examples/JavaWordCount.java +++ b/examples/src/main/java/spark/examples/JavaWordCount.java @@ -12,9 +12,7 @@ import java.util.Arrays; import java.util.List; public class JavaWordCount { - public static void main(String[] args) throws Exception { - if (args.length < 2) { System.err.println("Usage: JavaWordCount "); System.exit(1); @@ -23,16 +21,20 @@ public class JavaWordCount { JavaSparkContext ctx = new JavaSparkContext(args[0], "JavaWordCount"); JavaRDD lines = ctx.textFile(args[1], 1); - JavaPairRDD counts = lines.flatMap(new FlatMapFunction() { - public Iterable apply(String s) { + JavaRDD words = lines.flatMap(new FlatMapFunction() { + public Iterable call(String s) { return Arrays.asList(s.split(" ")); } - }).map(new PairFunction() { - public Tuple2 apply(String s) { + }); + + JavaPairRDD ones = words.map(new PairFunction() { + public Tuple2 call(String s) { return new Tuple2(s, 1); } - }).reduceByKey(new Function2() { - public Integer apply(Integer i1, Integer i2) { + }); + + JavaPairRDD counts = ones.reduceByKey(new Function2() { + public Integer call(Integer i1, Integer i2) { return i1 + i2; } }); diff --git a/examples/src/main/scala/spark/examples/SparkTC.scala b/examples/src/main/scala/spark/examples/SparkTC.scala index a095476a23..a6e4de4671 100644 --- a/examples/src/main/scala/spark/examples/SparkTC.scala +++ b/examples/src/main/scala/spark/examples/SparkTC.scala @@ -5,6 +5,9 @@ import SparkContext._ import scala.util.Random import scala.collection.mutable +/** + * Transitive closure on a graph. + */ object SparkTC { val numEdges = 200