Merge remote-tracking branch 'upstream/dev' into dev

This commit is contained in:
Mosharaf Chowdhury 2012-08-23 09:51:38 -07:00
commit 995ad6ba36
30 changed files with 457 additions and 234 deletions

View file

@ -33,11 +33,7 @@ class BlockStoreShuffleFetcher extends ShuffleFetcher with Logging {
} }
try { try {
val blockOptions = blockManager.get(blocksByAddress) for ((blockId, blockOption) <- blockManager.getMultiple(blocksByAddress)) {
logDebug("Fetching map output blocks for shuffle %d, reduce %d took %d ms".format(
shuffleId, reduceId, System.currentTimeMillis - startTime))
blockOptions.foreach(x => {
val (blockId, blockOption) = x
blockOption match { blockOption match {
case Some(block) => { case Some(block) => {
val values = block val values = block
@ -50,7 +46,7 @@ class BlockStoreShuffleFetcher extends ShuffleFetcher with Logging {
throw new BlockException(blockId, "Did not get block " + blockId) throw new BlockException(blockId, "Did not get block " + blockId)
} }
} }
}) }
} catch { } catch {
case be: BlockException => { case be: BlockException => {
val regex = "shuffledid_([0-9]*)_([0-9]*)_([0-9]]*)".r val regex = "shuffledid_([0-9]*)_([0-9]*)_([0-9]]*)".r
@ -65,5 +61,7 @@ class BlockStoreShuffleFetcher extends ShuffleFetcher with Logging {
} }
} }
} }
logDebug("Fetching and merging outputs of shuffle %d, reduce %d took %d ms".format(
shuffleId, reduceId, System.currentTimeMillis - startTime))
} }
} }

View file

@ -57,7 +57,21 @@ class PipedRDD[T: ClassManifest](
}.start() }.start()
// Return an iterator that read lines from the process's stdout // Return an iterator that read lines from the process's stdout
Source.fromInputStream(proc.getInputStream).getLines val lines = Source.fromInputStream(proc.getInputStream).getLines
return new Iterator[String] {
def next() = lines.next()
def hasNext = {
if (lines.hasNext) {
true
} else {
val exitStatus = proc.waitFor()
if (exitStatus != 0) {
throw new Exception("Subprocess exited with status " + exitStatus)
}
false
}
}
}
} }
} }

View file

@ -7,6 +7,10 @@ import java.util.IdentityHashMap
import java.util.concurrent.ConcurrentHashMap import java.util.concurrent.ConcurrentHashMap
import java.util.Random import java.util.Random
import javax.management.MBeanServer
import java.lang.management.ManagementFactory
import com.sun.management.HotSpotDiagnosticMXBean
import scala.collection.mutable.ArrayBuffer import scala.collection.mutable.ArrayBuffer
import it.unimi.dsi.fastutil.ints.IntOpenHashSet import it.unimi.dsi.fastutil.ints.IntOpenHashSet
@ -18,9 +22,7 @@ import it.unimi.dsi.fastutil.ints.IntOpenHashSet
* Based on the following JavaWorld article: * Based on the following JavaWorld article:
* http://www.javaworld.com/javaworld/javaqa/2003-12/02-qa-1226-sizeof.html * http://www.javaworld.com/javaworld/javaqa/2003-12/02-qa-1226-sizeof.html
*/ */
object SizeEstimator { object SizeEstimator extends Logging {
private val OBJECT_SIZE = 8 // Minimum size of a java.lang.Object
private val POINTER_SIZE = 4 // Size of an object reference
// Sizes of primitive types // Sizes of primitive types
private val BYTE_SIZE = 1 private val BYTE_SIZE = 1
@ -32,9 +34,68 @@ object SizeEstimator {
private val FLOAT_SIZE = 4 private val FLOAT_SIZE = 4
private val DOUBLE_SIZE = 8 private val DOUBLE_SIZE = 8
// Alignment boundary for objects
// TODO: Is this arch dependent ?
private val ALIGN_SIZE = 8
// A cache of ClassInfo objects for each class // A cache of ClassInfo objects for each class
private val classInfos = new ConcurrentHashMap[Class[_], ClassInfo] private val classInfos = new ConcurrentHashMap[Class[_], ClassInfo]
classInfos.put(classOf[Object], new ClassInfo(OBJECT_SIZE, Nil))
// Object and pointer sizes are arch dependent
private var is64bit = false
// Size of an object reference
// Based on https://wikis.oracle.com/display/HotSpotInternals/CompressedOops
private var isCompressedOops = false
private var pointerSize = 4
// Minimum size of a java.lang.Object
private var objectSize = 8
initialize()
// Sets object size, pointer size based on architecture and CompressedOops settings
// from the JVM.
private def initialize() {
is64bit = System.getProperty("os.arch").contains("64")
isCompressedOops = getIsCompressedOops
objectSize = if (!is64bit) 8 else {
if(!isCompressedOops) {
16
} else {
12
}
}
pointerSize = if (is64bit && !isCompressedOops) 8 else 4
classInfos.clear()
classInfos.put(classOf[Object], new ClassInfo(objectSize, Nil))
}
private def getIsCompressedOops : Boolean = {
if (System.getProperty("spark.test.useCompressedOops") != null) {
return System.getProperty("spark.test.useCompressedOops").toBoolean
}
try {
val hotSpotMBeanName = "com.sun.management:type=HotSpotDiagnostic";
val server = ManagementFactory.getPlatformMBeanServer();
val bean = ManagementFactory.newPlatformMXBeanProxy(server,
hotSpotMBeanName, classOf[HotSpotDiagnosticMXBean]);
return bean.getVMOption("UseCompressedOops").getValue.toBoolean
} catch {
case e: IllegalArgumentException => {
logWarning("Exception while trying to check if compressed oops is enabled", e)
// Fall back to checking if maxMemory < 32GB
return Runtime.getRuntime.maxMemory < (32L*1024*1024*1024)
}
case e: SecurityException => {
logWarning("No permission to create MBeanServer", e)
// Fall back to checking if maxMemory < 32GB
return Runtime.getRuntime.maxMemory < (32L*1024*1024*1024)
}
}
}
/** /**
* The state of an ongoing size estimation. Contains a stack of objects to visit as well as an * The state of an ongoing size estimation. Contains a stack of objects to visit as well as an
@ -101,10 +162,17 @@ object SizeEstimator {
private def visitArray(array: AnyRef, cls: Class[_], state: SearchState) { private def visitArray(array: AnyRef, cls: Class[_], state: SearchState) {
val length = JArray.getLength(array) val length = JArray.getLength(array)
val elementClass = cls.getComponentType val elementClass = cls.getComponentType
// Arrays have object header and length field which is an integer
var arrSize: Long = alignSize(objectSize + INT_SIZE)
if (elementClass.isPrimitive) { if (elementClass.isPrimitive) {
state.size += length * primitiveSize(elementClass) arrSize += alignSize(length * primitiveSize(elementClass))
state.size += arrSize
} else { } else {
state.size += length * POINTER_SIZE arrSize += alignSize(length * pointerSize)
state.size += arrSize
if (length <= ARRAY_SIZE_FOR_SAMPLING) { if (length <= ARRAY_SIZE_FOR_SAMPLING) {
for (i <- 0 until length) { for (i <- 0 until length) {
state.enqueue(JArray.get(array, i)) state.enqueue(JArray.get(array, i))
@ -170,15 +238,22 @@ object SizeEstimator {
shellSize += primitiveSize(fieldClass) shellSize += primitiveSize(fieldClass)
} else { } else {
field.setAccessible(true) // Enable future get()'s on this field field.setAccessible(true) // Enable future get()'s on this field
shellSize += POINTER_SIZE shellSize += pointerSize
pointerFields = field :: pointerFields pointerFields = field :: pointerFields
} }
} }
} }
shellSize = alignSize(shellSize)
// Create and cache a new ClassInfo // Create and cache a new ClassInfo
val newInfo = new ClassInfo(shellSize, pointerFields) val newInfo = new ClassInfo(shellSize, pointerFields)
classInfos.put(cls, newInfo) classInfos.put(cls, newInfo)
return newInfo return newInfo
} }
private def alignSize(size: Long): Long = {
val rem = size % ALIGN_SIZE
return if (rem == 0) size else (size + ALIGN_SIZE - rem)
}
} }

View file

@ -20,6 +20,12 @@ class JavaSparkContext(val sc: SparkContext) extends JavaSparkContextVarargsWork
def this(master: String, frameworkName: String) = this(new SparkContext(master, frameworkName)) def this(master: String, frameworkName: String) = this(new SparkContext(master, frameworkName))
def this(master: String, frameworkName: String, sparkHome: String, jarFile: String) =
this(new SparkContext(master, frameworkName, sparkHome, Seq(jarFile)))
def this(master: String, frameworkName: String, sparkHome: String, jars: Array[String]) =
this(new SparkContext(master, frameworkName, sparkHome, jars.toSeq))
val env = sc.env val env = sc.env
def parallelize[T](list: java.util.List[T], numSlices: Int): JavaRDD[T] = { def parallelize[T](list: java.util.List[T], numSlices: Int): JavaRDD[T] = {

View file

@ -9,5 +9,9 @@ import java.io.Serializable;
// overloaded for both FlatMapFunction and DoubleFlatMapFunction. // overloaded for both FlatMapFunction and DoubleFlatMapFunction.
public abstract class DoubleFlatMapFunction<T> extends AbstractFunction1<T, Iterable<Double>> public abstract class DoubleFlatMapFunction<T> extends AbstractFunction1<T, Iterable<Double>>
implements Serializable { implements Serializable {
public abstract Iterable<Double> apply(T t);
public abstract Iterable<Double> call(T t);
@Override
public final Iterable<Double> apply(T t) { return call(t); }
} }

View file

@ -7,7 +7,8 @@ import java.io.Serializable;
// DoubleFunction does not extend Function because some UDF functions, like map, // DoubleFunction does not extend Function because some UDF functions, like map,
// are overloaded for both Function and DoubleFunction. // are overloaded for both Function and DoubleFunction.
public abstract class DoubleFunction<T> extends AbstractFunction1<T, Double> public abstract class DoubleFunction<T> extends WrappedFunction1<T, Double>
implements Serializable { implements Serializable {
public abstract Double apply(T t);
public abstract Double call(T t) throws Exception;
} }

View file

@ -1,7 +1,8 @@
package spark.api.java.function package spark.api.java.function
abstract class FlatMapFunction[T, R] extends Function[T, java.lang.Iterable[R]] { abstract class FlatMapFunction[T, R] extends Function[T, java.lang.Iterable[R]] {
def apply(x: T) : java.lang.Iterable[R] @throws(classOf[Exception])
def call(x: T) : java.lang.Iterable[R]
def elementType() : ClassManifest[R] = ClassManifest.Any.asInstanceOf[ClassManifest[R]] def elementType() : ClassManifest[R] = ClassManifest.Any.asInstanceOf[ClassManifest[R]]
} }

View file

@ -11,8 +11,8 @@ import java.io.Serializable;
* Base class for functions whose return types do not have special RDDs; DoubleFunction is * Base class for functions whose return types do not have special RDDs; DoubleFunction is
* handled separately, to allow DoubleRDDs to be constructed when mapping RDDs to doubles. * handled separately, to allow DoubleRDDs to be constructed when mapping RDDs to doubles.
*/ */
public abstract class Function<T, R> extends AbstractFunction1<T, R> implements Serializable { public abstract class Function<T, R> extends WrappedFunction1<T, R> implements Serializable {
public abstract R apply(T t); public abstract R call(T t) throws Exception;
public ClassManifest<R> returnType() { public ClassManifest<R> returnType() {
return (ClassManifest<R>) ClassManifest$.MODULE$.fromClass(Object.class); return (ClassManifest<R>) ClassManifest$.MODULE$.fromClass(Object.class);

View file

@ -6,12 +6,13 @@ import scala.runtime.AbstractFunction2;
import java.io.Serializable; import java.io.Serializable;
public abstract class Function2<T1, T2, R> extends AbstractFunction2<T1, T2, R> public abstract class Function2<T1, T2, R> extends WrappedFunction2<T1, T2, R>
implements Serializable { implements Serializable {
public abstract R call(T1 t1, T2 t2) throws Exception;
public ClassManifest<R> returnType() { public ClassManifest<R> returnType() {
return (ClassManifest<R>) ClassManifest$.MODULE$.fromClass(Object.class); return (ClassManifest<R>) ClassManifest$.MODULE$.fromClass(Object.class);
} }
public abstract R apply(T1 t1, T2 t2);
} }

View file

@ -9,8 +9,11 @@ import java.io.Serializable;
// PairFlatMapFunction does not extend FlatMapFunction because flatMap is // PairFlatMapFunction does not extend FlatMapFunction because flatMap is
// overloaded for both FlatMapFunction and PairFlatMapFunction. // overloaded for both FlatMapFunction and PairFlatMapFunction.
public abstract class PairFlatMapFunction<T, K, V> extends AbstractFunction1<T, Iterable<Tuple2<K, public abstract class PairFlatMapFunction<T, K, V>
V>>> implements Serializable { extends WrappedFunction1<T, Iterable<Tuple2<K, V>>>
implements Serializable {
public abstract Iterable<Tuple2<K, V>> call(T t) throws Exception;
public ClassManifest<K> keyType() { public ClassManifest<K> keyType() {
return (ClassManifest<K>) ClassManifest$.MODULE$.fromClass(Object.class); return (ClassManifest<K>) ClassManifest$.MODULE$.fromClass(Object.class);
@ -19,7 +22,4 @@ public abstract class PairFlatMapFunction<T, K, V> extends AbstractFunction1<T,
public ClassManifest<V> valueType() { public ClassManifest<V> valueType() {
return (ClassManifest<V>) ClassManifest$.MODULE$.fromClass(Object.class); return (ClassManifest<V>) ClassManifest$.MODULE$.fromClass(Object.class);
} }
public abstract Iterable<Tuple2<K, V>> apply(T t);
} }

View file

@ -9,8 +9,11 @@ import java.io.Serializable;
// PairFunction does not extend Function because some UDF functions, like map, // PairFunction does not extend Function because some UDF functions, like map,
// are overloaded for both Function and PairFunction. // are overloaded for both Function and PairFunction.
public abstract class PairFunction<T, K, V> extends AbstractFunction1<T, Tuple2<K, public abstract class PairFunction<T, K, V>
V>> implements Serializable { extends WrappedFunction1<T, Tuple2<K, V>>
implements Serializable {
public abstract Tuple2<K, V> call(T t) throws Exception;
public ClassManifest<K> keyType() { public ClassManifest<K> keyType() {
return (ClassManifest<K>) ClassManifest$.MODULE$.fromClass(Object.class); return (ClassManifest<K>) ClassManifest$.MODULE$.fromClass(Object.class);
@ -19,7 +22,4 @@ public abstract class PairFunction<T, K, V> extends AbstractFunction1<T, Tuple2<
public ClassManifest<V> valueType() { public ClassManifest<V> valueType() {
return (ClassManifest<V>) ClassManifest$.MODULE$.fromClass(Object.class); return (ClassManifest<V>) ClassManifest$.MODULE$.fromClass(Object.class);
} }
public abstract Tuple2<K, V> apply(T t);
} }

View file

@ -2,11 +2,12 @@ package spark.api.java.function
// This allows Java users to write void methods without having to return Unit. // This allows Java users to write void methods without having to return Unit.
abstract class VoidFunction[T] extends Serializable { abstract class VoidFunction[T] extends Serializable {
def apply(t: T) : Unit @throws(classOf[Exception])
def call(t: T) : Unit
} }
// VoidFunction cannot extend AbstractFunction1 (because that would force users to explicitly // VoidFunction cannot extend AbstractFunction1 (because that would force users to explicitly
// return Unit), so it is implicitly converted to a Function1[T, Unit]: // return Unit), so it is implicitly converted to a Function1[T, Unit]:
object VoidFunction { object VoidFunction {
implicit def toFunction[T](f: VoidFunction[T]) : Function1[T, Unit] = ((x : T) => f(x)) implicit def toFunction[T](f: VoidFunction[T]) : Function1[T, Unit] = ((x : T) => f.call(x))
} }

View file

@ -0,0 +1,15 @@
package spark.api.java.function
import scala.runtime.AbstractFunction1
/**
* Subclass of Function1 for ease of calling from Java. The main thing it does is re-expose the
* apply() method as call() and declare that it can throw Exception (since AbstractFunction1.apply
* isn't marked to allow that).
*/
abstract class WrappedFunction1[T, R] extends AbstractFunction1[T, R] {
@throws(classOf[Exception])
def call(t: T): R
final def apply(t: T): R = call(t)
}

View file

@ -0,0 +1,15 @@
package spark.api.java.function
import scala.runtime.AbstractFunction2
/**
* Subclass of Function2 for ease of calling from Java. The main thing it does is re-expose the
* apply() method as call() and declare that it can throw Exception (since AbstractFunction2.apply
* isn't marked to allow that).
*/
abstract class WrappedFunction2[T1, T2, R] extends AbstractFunction2[T1, T2, R] {
@throws(classOf[Exception])
def call(t1: T1, t2: T2): R
final def apply(t1: T1, t2: T2): R = call(t1, t2)
}

View file

@ -2,20 +2,19 @@ package spark.network
import spark._ import spark._
import scala.actors.Future import java.nio._
import scala.actors.Futures.future import java.nio.channels._
import java.nio.channels.spi._
import java.net._
import java.util.concurrent.Executors
import scala.collection.mutable.HashMap import scala.collection.mutable.HashMap
import scala.collection.mutable.SynchronizedMap import scala.collection.mutable.SynchronizedMap
import scala.collection.mutable.SynchronizedQueue import scala.collection.mutable.SynchronizedQueue
import scala.collection.mutable.Queue import scala.collection.mutable.Queue
import scala.collection.mutable.ArrayBuffer import scala.collection.mutable.ArrayBuffer
import java.io._ import akka.dispatch.{Promise, ExecutionContext, Future}
import java.nio._
import java.nio.channels._
import java.nio.channels.spi._
import java.net._
import java.util.concurrent.Executors
case class ConnectionManagerId(host: String, port: Int) { case class ConnectionManagerId(host: String, port: Int) {
def toSocketAddress() = new InetSocketAddress(host, port) def toSocketAddress() = new InetSocketAddress(host, port)
@ -29,10 +28,16 @@ object ConnectionManagerId {
class ConnectionManager(port: Int) extends Logging { class ConnectionManager(port: Int) extends Logging {
case class MessageStatus(message: Message, connectionManagerId: ConnectionManagerId) { class MessageStatus(
val message: Message,
val connectionManagerId: ConnectionManagerId,
completionHandler: MessageStatus => Unit) {
var ackMessage: Option[Message] = None var ackMessage: Option[Message] = None
var attempted = false var attempted = false
var acked = false var acked = false
def markDone() { completionHandler(this) }
} }
val selector = SelectorProvider.provider.openSelector() val selector = SelectorProvider.provider.openSelector()
@ -45,6 +50,9 @@ class ConnectionManager(port: Int) extends Logging {
val keyInterestChangeRequests = new SynchronizedQueue[(SelectionKey, Int)] val keyInterestChangeRequests = new SynchronizedQueue[(SelectionKey, Int)]
val sendMessageRequests = new Queue[(Message, SendingConnection)] val sendMessageRequests = new Queue[(Message, SendingConnection)]
implicit val futureExecContext = ExecutionContext.fromExecutor(
Executors.newCachedThreadPool(DaemonThreadFactory))
var onReceiveCallback: (BufferMessage, ConnectionManagerId) => Option[Message]= null var onReceiveCallback: (BufferMessage, ConnectionManagerId) => Option[Message]= null
serverChannel.configureBlocking(false) serverChannel.configureBlocking(false)
@ -173,7 +181,7 @@ class ConnectionManager(port: Int) extends Logging {
status.synchronized { status.synchronized {
status.attempted = true status.attempted = true
status.acked = false status.acked = false
status.notifyAll() status.markDone()
} }
}) })
@ -198,15 +206,14 @@ class ConnectionManager(port: Int) extends Logging {
connectionsById -= sendingConnectionManagerId connectionsById -= sendingConnectionManagerId
messageStatuses.synchronized { messageStatuses.synchronized {
messageStatuses for (s <- messageStatuses.values if s.connectionManagerId == sendingConnectionManagerId) {
.values.filter(_.connectionManagerId == sendingConnectionManagerId).foreach(status => { logInfo("Notifying " + s)
logInfo("Notifying " + status) s.synchronized {
status.synchronized { s.attempted = true
status.attempted = true s.acked = false
status.acked = false s.markDone()
status.notifyAll() }
} }
})
messageStatuses.retain((i, status) => { messageStatuses.retain((i, status) => {
status.connectionManagerId != sendingConnectionManagerId status.connectionManagerId != sendingConnectionManagerId
@ -260,7 +267,7 @@ class ConnectionManager(port: Int) extends Logging {
sentMessageStatus.ackMessage = Some(message) sentMessageStatus.ackMessage = Some(message)
sentMessageStatus.attempted = true sentMessageStatus.attempted = true
sentMessageStatus.acked = true sentMessageStatus.acked = true
sentMessageStatus.notifyAll() sentMessageStatus.markDone()
} }
} else { } else {
val ackMessage = if (onReceiveCallback != null) { val ackMessage = if (onReceiveCallback != null) {
@ -296,7 +303,7 @@ class ConnectionManager(port: Int) extends Logging {
connectionRequests += newConnection connectionRequests += newConnection
newConnection newConnection
} }
val connection = connectionsById.getOrElse(connectionManagerId, startNewConnection) val connection = connectionsById.getOrElse(connectionManagerId, startNewConnection())
message.senderAddress = id.toSocketAddress() message.senderAddress = id.toSocketAddress()
logInfo("Sending [" + message + "] to [" + connectionManagerId + "]") logInfo("Sending [" + message + "] to [" + connectionManagerId + "]")
/*connection.send(message)*/ /*connection.send(message)*/
@ -306,22 +313,15 @@ class ConnectionManager(port: Int) extends Logging {
selector.wakeup() selector.wakeup()
} }
def sendMessageReliably(connectionManagerId: ConnectionManagerId, message: Message): Future[Option[Message]] = { def sendMessageReliably(connectionManagerId: ConnectionManagerId, message: Message)
val messageStatus = new MessageStatus(message, connectionManagerId) : Future[Option[Message]] = {
val promise = Promise[Option[Message]]
val status = new MessageStatus(message, connectionManagerId, s => promise.success(s.ackMessage))
messageStatuses.synchronized { messageStatuses.synchronized {
messageStatuses += ((message.id, messageStatus)) messageStatuses += ((message.id, status))
} }
sendMessage(connectionManagerId, message) sendMessage(connectionManagerId, message)
future { promise.future
messageStatus.synchronized {
if (!messageStatus.attempted) {
logTrace("Waiting, " + messageStatuses.size + " statuses" )
messageStatus.wait()
logTrace("Done waiting")
}
}
messageStatus.ackMessage
}
} }
def sendMessageReliablySync(connectionManagerId: ConnectionManagerId, message: Message): Option[Message] = { def sendMessageReliablySync(connectionManagerId: ConnectionManagerId, message: Message): Option[Message] = {

View file

@ -5,6 +5,7 @@ import java.util.HashMap
import java.util.zip.{GZIPInputStream, GZIPOutputStream} import java.util.zip.{GZIPInputStream, GZIPOutputStream}
import scala.collection.mutable.ArrayBuffer import scala.collection.mutable.ArrayBuffer
import scala.collection.JavaConversions._
import it.unimi.dsi.fastutil.io.FastBufferedOutputStream import it.unimi.dsi.fastutil.io.FastBufferedOutputStream
@ -124,14 +125,10 @@ class ShuffleMapTask(
val blockManager = SparkEnv.get.blockManager val blockManager = SparkEnv.get.blockManager
for (i <- 0 until numOutputSplits) { for (i <- 0 until numOutputSplits) {
val blockId = "shuffleid_" + dep.shuffleId + "_" + partition + "_" + i val blockId = "shuffleid_" + dep.shuffleId + "_" + partition + "_" + i
val arr = new ArrayBuffer[Any] // Get a scala iterator from java map
val iter = buckets(i).entrySet().iterator() val iter: Iterator[(Any, Any)] = buckets(i).iterator
while (iter.hasNext()) {
val entry = iter.next()
arr += ((entry.getKey(), entry.getValue()))
}
// TODO: This should probably be DISK_ONLY // TODO: This should probably be DISK_ONLY
blockManager.put(blockId, arr.iterator, StorageLevel.MEMORY_ONLY, false) blockManager.put(blockId, iter, StorageLevel.MEMORY_ONLY, false)
} }
return SparkEnv.get.blockManager.blockManagerId return SparkEnv.get.blockManager.blockManagerId
} }

View file

@ -6,14 +6,10 @@ import java.nio.channels.FileChannel.MapMode
import java.util.{HashMap => JHashMap} import java.util.{HashMap => JHashMap}
import java.util.LinkedHashMap import java.util.LinkedHashMap
import java.util.concurrent.ConcurrentHashMap import java.util.concurrent.ConcurrentHashMap
import java.util.concurrent.LinkedBlockingQueue
import java.util.Collections import java.util.Collections
import scala.actors._ import akka.dispatch.{Await, Future}
import scala.actors.Actor._
import scala.actors.Future
import scala.actors.Futures.future
import scala.actors.remote._
import scala.actors.remote.RemoteActor._
import scala.collection.mutable.ArrayBuffer import scala.collection.mutable.ArrayBuffer
import scala.collection.mutable.HashMap import scala.collection.mutable.HashMap
import scala.collection.JavaConversions._ import scala.collection.JavaConversions._
@ -29,6 +25,7 @@ import spark.SparkException
import spark.Utils import spark.Utils
import spark.util.ByteBufferInputStream import spark.util.ByteBufferInputStream
import spark.network._ import spark.network._
import akka.util.Duration
class BlockManagerId(var ip: String, var port: Int) extends Externalizable { class BlockManagerId(var ip: String, var port: Int) extends Externalizable {
def this() = this(null, 0) def this() = this(null, 0)
@ -80,6 +77,7 @@ class BlockManager(val master: BlockManagerMaster, val serializer: Serializer, m
System.getProperty("spark.local.dir", System.getProperty("java.io.tmpdir"))) System.getProperty("spark.local.dir", System.getProperty("java.io.tmpdir")))
val connectionManager = new ConnectionManager(0) val connectionManager = new ConnectionManager(0)
implicit val futureExecContext = connectionManager.futureExecContext
val connectionManagerId = connectionManager.id val connectionManagerId = connectionManager.id
val blockManagerId = new BlockManagerId(connectionManagerId.host, connectionManagerId.port) val blockManagerId = new BlockManagerId(connectionManagerId.host, connectionManagerId.port)
@ -261,15 +259,20 @@ class BlockManager(val master: BlockManagerMaster, val serializer: Serializer, m
} }
/** /**
* Get many blocks from local and remote block manager using their BlockManagerIds. * Get multiple blocks from local and remote block manager using their BlockManagerIds. Returns
* an Iterator of (block ID, value) pairs so that clients may handle blocks in a pipelined
* fashion as they're received.
*/ */
def get(blocksByAddress: Seq[(BlockManagerId, Seq[String])]): HashMap[String, Option[Iterator[Any]]] = { def getMultiple(blocksByAddress: Seq[(BlockManagerId, Seq[String])])
: Iterator[(String, Option[Iterator[Any]])] = {
if (blocksByAddress == null) { if (blocksByAddress == null) {
throw new IllegalArgumentException("BlocksByAddress is null") throw new IllegalArgumentException("BlocksByAddress is null")
} }
logDebug("Getting " + blocksByAddress.map(_._2.size).sum + " blocks") val totalBlocks = blocksByAddress.map(_._2.size).sum
logDebug("Getting " + totalBlocks + " blocks")
var startTime = System.currentTimeMillis var startTime = System.currentTimeMillis
val blocks = new HashMap[String,Option[Iterator[Any]]]() val results = new LinkedBlockingQueue[(String, Option[Iterator[Any]])]
val localBlockIds = new ArrayBuffer[String]() val localBlockIds = new ArrayBuffer[String]()
val remoteBlockIds = new ArrayBuffer[String]() val remoteBlockIds = new ArrayBuffer[String]()
val remoteBlockIdsPerLocation = new HashMap[BlockManagerId, Seq[String]]() val remoteBlockIdsPerLocation = new HashMap[BlockManagerId, Seq[String]]()
@ -285,12 +288,34 @@ class BlockManager(val master: BlockManagerMaster, val serializer: Serializer, m
} }
// Start getting remote blocks // Start getting remote blocks
val remoteBlockFutures = remoteBlockIdsPerLocation.toSeq.map { case (bmId, bIds) => for ((bmId, bIds) <- remoteBlockIdsPerLocation) {
val cmId = ConnectionManagerId(bmId.ip, bmId.port) val cmId = ConnectionManagerId(bmId.ip, bmId.port)
val blockMessages = bIds.map(bId => BlockMessage.fromGetBlock(GetBlock(bId))) val blockMessages = bIds.map(bId => BlockMessage.fromGetBlock(GetBlock(bId)))
val blockMessageArray = new BlockMessageArray(blockMessages) val blockMessageArray = new BlockMessageArray(blockMessages)
val future = connectionManager.sendMessageReliably(cmId, blockMessageArray.toBufferMessage) val future = connectionManager.sendMessageReliably(cmId, blockMessageArray.toBufferMessage)
(cmId, future) future.onSuccess {
case Some(message) => {
val bufferMessage = message.asInstanceOf[BufferMessage]
val blockMessageArray = BlockMessageArray.fromBufferMessage(bufferMessage)
blockMessageArray.foreach(blockMessage => {
if (blockMessage.getType != BlockMessage.TYPE_GOT_BLOCK) {
throw new SparkException(
"Unexpected message " + blockMessage.getType + " received from " + cmId)
}
val buffer = blockMessage.getData
val blockId = blockMessage.getId
val block = dataDeserialize(buffer)
results.put((blockId, Some(block)))
logDebug("Got remote block " + blockId + " after " + Utils.getUsedTimeMs(startTime))
})
}
case None => {
logError("Could not get blocks from " + cmId)
for (blockId <- bIds) {
results.put((blockId, None))
}
}
}
} }
logDebug("Started remote gets for " + remoteBlockIds.size + " blocks in " + logDebug("Started remote gets for " + remoteBlockIds.size + " blocks in " +
Utils.getUsedTimeMs(startTime) + " ms") Utils.getUsedTimeMs(startTime) + " ms")
@ -300,7 +325,7 @@ class BlockManager(val master: BlockManagerMaster, val serializer: Serializer, m
localBlockIds.foreach(id => { localBlockIds.foreach(id => {
get(id) match { get(id) match {
case Some(block) => { case Some(block) => {
blocks.update(id, Some(block)) results.put((id, Some(block)))
logDebug("Got local block " + id) logDebug("Got local block " + id)
} }
case None => { case None => {
@ -310,36 +335,17 @@ class BlockManager(val master: BlockManagerMaster, val serializer: Serializer, m
}) })
logDebug("Got local blocks in " + Utils.getUsedTimeMs(startTime) + " ms") logDebug("Got local blocks in " + Utils.getUsedTimeMs(startTime) + " ms")
// wait for and gather all the remote blocks // Return an iterator that will read fetched blocks off the queue as they arrive
for ((cmId, future) <- remoteBlockFutures) { return new Iterator[(String, Option[Iterator[Any]])] {
var count = 0 var resultsGotten = 0
val oneBlockId = remoteBlockIdsPerLocation(new BlockManagerId(cmId.host, cmId.port)).head
future() match {
case Some(message) => {
val bufferMessage = message.asInstanceOf[BufferMessage]
val blockMessageArray = BlockMessageArray.fromBufferMessage(bufferMessage)
blockMessageArray.foreach(blockMessage => {
if (blockMessage.getType != BlockMessage.TYPE_GOT_BLOCK) {
throw new BlockException(oneBlockId, "Unexpected message received from " + cmId)
}
val buffer = blockMessage.getData
val blockId = blockMessage.getId
val block = dataDeserialize(buffer)
blocks.update(blockId, Some(block))
logDebug("Got remote block " + blockId + " in " + Utils.getUsedTimeMs(startTime))
count += 1
})
}
case None => {
throw new BlockException(oneBlockId, "Could not get blocks from " + cmId)
}
}
logDebug("Got remote " + count + " blocks from " + cmId.host + " in " +
Utils.getUsedTimeMs(startTime) + " ms")
}
logDebug("Got all blocks in " + Utils.getUsedTimeMs(startTime) + " ms") def hasNext: Boolean = resultsGotten < totalBlocks
return blocks
def next(): (String, Option[Iterator[Any]]) = {
resultsGotten += 1
results.take()
}
}
} }
/** /**
@ -435,7 +441,7 @@ class BlockManager(val master: BlockManagerMaster, val serializer: Serializer, m
// Initiate the replication before storing it locally. This is faster as // Initiate the replication before storing it locally. This is faster as
// data is already serialized and ready for sending // data is already serialized and ready for sending
val replicationFuture = if (level.replication > 1) { val replicationFuture = if (level.replication > 1) {
future { Future {
replicate(blockId, bytes, level) replicate(blockId, bytes, level)
} }
} else { } else {
@ -471,7 +477,7 @@ class BlockManager(val master: BlockManagerMaster, val serializer: Serializer, m
if (replicationFuture == null) { if (replicationFuture == null) {
throw new Exception("Unexpected") throw new Exception("Unexpected")
} }
replicationFuture() Await.ready(replicationFuture, Duration.Inf)
} }
val finishTime = System.currentTimeMillis val finishTime = System.currentTimeMillis

View file

@ -114,7 +114,7 @@ object BlockManagerWorker extends Logging {
val blockMessage = BlockMessage.fromPutBlock(msg) val blockMessage = BlockMessage.fromPutBlock(msg)
val blockMessageArray = new BlockMessageArray(blockMessage) val blockMessageArray = new BlockMessageArray(blockMessage)
val resultMessage = connectionManager.sendMessageReliablySync( val resultMessage = connectionManager.sendMessageReliablySync(
toConnManagerId, blockMessageArray.toBufferMessage()) toConnManagerId, blockMessageArray.toBufferMessage)
return (resultMessage != None) return (resultMessage != None)
} }
@ -125,7 +125,7 @@ object BlockManagerWorker extends Logging {
val blockMessage = BlockMessage.fromGetBlock(msg) val blockMessage = BlockMessage.fromGetBlock(msg)
val blockMessageArray = new BlockMessageArray(blockMessage) val blockMessageArray = new BlockMessageArray(blockMessage)
val responseMessage = connectionManager.sendMessageReliablySync( val responseMessage = connectionManager.sendMessageReliablySync(
toConnManagerId, blockMessageArray.toBufferMessage()) toConnManagerId, blockMessageArray.toBufferMessage)
responseMessage match { responseMessage match {
case Some(message) => { case Some(message) => {
val bufferMessage = message.asInstanceOf[BufferMessage] val bufferMessage = message.asInstanceOf[BufferMessage]

View file

@ -36,7 +36,7 @@ class BlockMessageArray(var blockMessages: Seq[BlockMessage]) extends Seq[BlockM
println() println()
println() println()
*/ */
while(buffer.remaining() > 0) { while (buffer.remaining() > 0) {
val size = buffer.getInt() val size = buffer.getInt()
logDebug("Creating block message of size " + size + " bytes") logDebug("Creating block message of size " + size + " bytes")
val newBuffer = buffer.slice() val newBuffer = buffer.slice()
@ -53,7 +53,7 @@ class BlockMessageArray(var blockMessages: Seq[BlockMessage]) extends Seq[BlockM
this.blockMessages = newBlockMessages this.blockMessages = newBlockMessages
} }
def toBufferMessage(): BufferMessage = { def toBufferMessage: BufferMessage = {
val buffers = new ArrayBuffer[ByteBuffer]() val buffers = new ArrayBuffer[ByteBuffer]()
blockMessages.foreach(blockMessage => { blockMessages.foreach(blockMessage => {

View file

@ -1,31 +1,50 @@
package spark package spark
import org.scalatest.FunSuite import org.scalatest.FunSuite
import org.scalatest.PrivateMethodTester
class BoundedMemoryCacheSuite extends FunSuite { class BoundedMemoryCacheSuite extends FunSuite with PrivateMethodTester {
test("constructor test") { test("constructor test") {
val cache = new BoundedMemoryCache(40) val cache = new BoundedMemoryCache(60)
expect(40)(cache.getCapacity) expect(60)(cache.getCapacity)
} }
test("caching") { test("caching") {
val cache = new BoundedMemoryCache(40) { // Set the arch to 64-bit and compressedOops to true to get a deterministic test-case
val oldArch = System.setProperty("os.arch", "amd64")
val oldOops = System.setProperty("spark.test.useCompressedOops", "true")
val initialize = PrivateMethod[Unit]('initialize)
SizeEstimator invokePrivate initialize()
val cache = new BoundedMemoryCache(60) {
//TODO sorry about this, but there is not better way how to skip 'cacheTracker.dropEntry' //TODO sorry about this, but there is not better way how to skip 'cacheTracker.dropEntry'
override protected def reportEntryDropped(datasetId: Any, partition: Int, entry: Entry) { override protected def reportEntryDropped(datasetId: Any, partition: Int, entry: Entry) {
logInfo("Dropping key (%s, %d) of size %d to make space".format(datasetId, partition, entry.size)) logInfo("Dropping key (%s, %d) of size %d to make space".format(datasetId, partition, entry.size))
} }
} }
//should be OK //should be OK
expect(CachePutSuccess(30))(cache.put("1", 0, "Meh")) expect(CachePutSuccess(56))(cache.put("1", 0, "Meh"))
//we cannot add this to cache (there is not enough space in cache) & we cannot evict the only value from //we cannot add this to cache (there is not enough space in cache) & we cannot evict the only value from
//cache because it's from the same dataset //cache because it's from the same dataset
expect(CachePutFailure())(cache.put("1", 1, "Meh")) expect(CachePutFailure())(cache.put("1", 1, "Meh"))
//should be OK, dataset '1' can be evicted from cache //should be OK, dataset '1' can be evicted from cache
expect(CachePutSuccess(30))(cache.put("2", 0, "Meh")) expect(CachePutSuccess(56))(cache.put("2", 0, "Meh"))
//should fail, cache should obey it's capacity //should fail, cache should obey it's capacity
expect(CachePutFailure())(cache.put("3", 0, "Very_long_and_useless_string")) expect(CachePutFailure())(cache.put("3", 0, "Very_long_and_useless_string"))
if (oldArch != null) {
System.setProperty("os.arch", oldArch)
} else {
System.clearProperty("os.arch")
}
if (oldOops != null) {
System.setProperty("spark.test.useCompressedOops", oldOops)
} else {
System.clearProperty("spark.test.useCompressedOops")
}
} }
} }

View file

@ -117,7 +117,7 @@ public class JavaAPISuite implements Serializable {
JavaRDD<String> rdd = sc.parallelize(Arrays.asList("Hello", "World")); JavaRDD<String> rdd = sc.parallelize(Arrays.asList("Hello", "World"));
rdd.foreach(new VoidFunction<String>() { rdd.foreach(new VoidFunction<String>() {
@Override @Override
public void apply(String s) { public void call(String s) {
System.out.println(s); System.out.println(s);
} }
}); });
@ -128,7 +128,7 @@ public class JavaAPISuite implements Serializable {
JavaRDD<Integer> rdd = sc.parallelize(Arrays.asList(1, 1, 2, 3, 5, 8, 13)); JavaRDD<Integer> rdd = sc.parallelize(Arrays.asList(1, 1, 2, 3, 5, 8, 13));
Function<Integer, Boolean> isOdd = new Function<Integer, Boolean>() { Function<Integer, Boolean> isOdd = new Function<Integer, Boolean>() {
@Override @Override
public Boolean apply(Integer x) { public Boolean call(Integer x) {
return x % 2 == 0; return x % 2 == 0;
} }
}; };
@ -166,7 +166,7 @@ public class JavaAPISuite implements Serializable {
JavaRDD<Integer> rdd = sc.parallelize(Arrays.asList(1, 1, 2, 3, 5, 8, 13)); JavaRDD<Integer> rdd = sc.parallelize(Arrays.asList(1, 1, 2, 3, 5, 8, 13));
Function2<Integer, Integer, Integer> add = new Function2<Integer, Integer, Integer>() { Function2<Integer, Integer, Integer> add = new Function2<Integer, Integer, Integer>() {
@Override @Override
public Integer apply(Integer a, Integer b) { public Integer call(Integer a, Integer b) {
return a + b; return a + b;
} }
}; };
@ -191,7 +191,7 @@ public class JavaAPISuite implements Serializable {
JavaPairRDD<Integer, Integer> counts = rdd.reduceByKey( JavaPairRDD<Integer, Integer> counts = rdd.reduceByKey(
new Function2<Integer, Integer, Integer>() { new Function2<Integer, Integer, Integer>() {
@Override @Override
public Integer apply(Integer a, Integer b) { public Integer call(Integer a, Integer b) {
return a + b; return a + b;
} }
}); });
@ -207,7 +207,7 @@ public class JavaAPISuite implements Serializable {
localCounts = rdd.reduceByKeyLocally(new Function2<Integer, Integer, localCounts = rdd.reduceByKeyLocally(new Function2<Integer, Integer,
Integer>() { Integer>() {
@Override @Override
public Integer apply(Integer a, Integer b) { public Integer call(Integer a, Integer b) {
return a + b; return a + b;
} }
}); });
@ -252,7 +252,7 @@ public class JavaAPISuite implements Serializable {
Assert.assertEquals(5, distinct.count()); Assert.assertEquals(5, distinct.count());
JavaDoubleRDD filter = rdd.filter(new Function<Double, Boolean>() { JavaDoubleRDD filter = rdd.filter(new Function<Double, Boolean>() {
@Override @Override
public Boolean apply(Double x) { public Boolean call(Double x) {
return x > 2.0; return x > 2.0;
} }
}); });
@ -279,19 +279,19 @@ public class JavaAPISuite implements Serializable {
JavaRDD<Integer> rdd = sc.parallelize(Arrays.asList(1, 2, 3, 4, 5)); JavaRDD<Integer> rdd = sc.parallelize(Arrays.asList(1, 2, 3, 4, 5));
JavaDoubleRDD doubles = rdd.map(new DoubleFunction<Integer>() { JavaDoubleRDD doubles = rdd.map(new DoubleFunction<Integer>() {
@Override @Override
public Double apply(Integer x) { public Double call(Integer x) {
return 1.0 * x; return 1.0 * x;
} }
}).cache(); }).cache();
JavaPairRDD<Integer, Integer> pairs = rdd.map(new PairFunction<Integer, Integer, Integer>() { JavaPairRDD<Integer, Integer> pairs = rdd.map(new PairFunction<Integer, Integer, Integer>() {
@Override @Override
public Tuple2<Integer, Integer> apply(Integer x) { public Tuple2<Integer, Integer> call(Integer x) {
return new Tuple2<Integer, Integer>(x, x); return new Tuple2<Integer, Integer>(x, x);
} }
}).cache(); }).cache();
JavaRDD<String> strings = rdd.map(new Function<Integer, String>() { JavaRDD<String> strings = rdd.map(new Function<Integer, String>() {
@Override @Override
public String apply(Integer x) { public String call(Integer x) {
return x.toString(); return x.toString();
} }
}).cache(); }).cache();
@ -303,7 +303,7 @@ public class JavaAPISuite implements Serializable {
"The quick brown fox jumps over the lazy dog.")); "The quick brown fox jumps over the lazy dog."));
JavaRDD<String> words = rdd.flatMap(new FlatMapFunction<String, String>() { JavaRDD<String> words = rdd.flatMap(new FlatMapFunction<String, String>() {
@Override @Override
public Iterable<String> apply(String x) { public Iterable<String> call(String x) {
return Arrays.asList(x.split(" ")); return Arrays.asList(x.split(" "));
} }
}); });
@ -314,7 +314,7 @@ public class JavaAPISuite implements Serializable {
new PairFlatMapFunction<String, String, String>() { new PairFlatMapFunction<String, String, String>() {
@Override @Override
public Iterable<Tuple2<String, String>> apply(String s) { public Iterable<Tuple2<String, String>> call(String s) {
List<Tuple2<String, String>> pairs = new LinkedList<Tuple2<String, String>>(); List<Tuple2<String, String>> pairs = new LinkedList<Tuple2<String, String>>();
for (String word : s.split(" ")) pairs.add(new Tuple2<String, String>(word, word)); for (String word : s.split(" ")) pairs.add(new Tuple2<String, String>(word, word));
return pairs; return pairs;
@ -326,7 +326,7 @@ public class JavaAPISuite implements Serializable {
JavaDoubleRDD doubles = rdd.flatMap(new DoubleFlatMapFunction<String>() { JavaDoubleRDD doubles = rdd.flatMap(new DoubleFlatMapFunction<String>() {
@Override @Override
public Iterable<Double> apply(String s) { public Iterable<Double> call(String s) {
List<Double> lengths = new LinkedList<Double>(); List<Double> lengths = new LinkedList<Double>();
for (String word : s.split(" ")) lengths.add(word.length() * 1.0); for (String word : s.split(" ")) lengths.add(word.length() * 1.0);
return lengths; return lengths;
@ -343,7 +343,7 @@ public class JavaAPISuite implements Serializable {
JavaRDD<Integer> partitionSums = rdd.mapPartitions( JavaRDD<Integer> partitionSums = rdd.mapPartitions(
new FlatMapFunction<Iterator<Integer>, Integer>() { new FlatMapFunction<Iterator<Integer>, Integer>() {
@Override @Override
public Iterable<Integer> apply(Iterator<Integer> iter) { public Iterable<Integer> call(Iterator<Integer> iter) {
int sum = 0; int sum = 0;
while (iter.hasNext()) { while (iter.hasNext()) {
sum += iter.next(); sum += iter.next();
@ -417,7 +417,7 @@ public class JavaAPISuite implements Serializable {
rdd.map(new PairFunction<Tuple2<Integer, String>, IntWritable, Text>() { rdd.map(new PairFunction<Tuple2<Integer, String>, IntWritable, Text>() {
@Override @Override
public Tuple2<IntWritable, Text> apply(Tuple2<Integer, String> pair) { public Tuple2<IntWritable, Text> call(Tuple2<Integer, String> pair) {
return new Tuple2<IntWritable, Text>(new IntWritable(pair._1()), new Text(pair._2())); return new Tuple2<IntWritable, Text>(new IntWritable(pair._1()), new Text(pair._2()));
} }
}).saveAsHadoopFile(outputDir, IntWritable.class, Text.class, SequenceFileOutputFormat.class); }).saveAsHadoopFile(outputDir, IntWritable.class, Text.class, SequenceFileOutputFormat.class);
@ -426,7 +426,7 @@ public class JavaAPISuite implements Serializable {
JavaPairRDD<Integer, String> readRDD = sc.sequenceFile(outputDir, IntWritable.class, JavaPairRDD<Integer, String> readRDD = sc.sequenceFile(outputDir, IntWritable.class,
Text.class).map(new PairFunction<Tuple2<IntWritable, Text>, Integer, String>() { Text.class).map(new PairFunction<Tuple2<IntWritable, Text>, Integer, String>() {
@Override @Override
public Tuple2<Integer, String> apply(Tuple2<IntWritable, Text> pair) { public Tuple2<Integer, String> call(Tuple2<IntWritable, Text> pair) {
return new Tuple2<Integer, String>(pair._1().get(), pair._2().toString()); return new Tuple2<Integer, String>(pair._1().get(), pair._2().toString());
} }
}); });
@ -446,7 +446,7 @@ public class JavaAPISuite implements Serializable {
rdd.map(new PairFunction<Tuple2<Integer, String>, IntWritable, Text>() { rdd.map(new PairFunction<Tuple2<Integer, String>, IntWritable, Text>() {
@Override @Override
public Tuple2<IntWritable, Text> apply(Tuple2<Integer, String> pair) { public Tuple2<IntWritable, Text> call(Tuple2<Integer, String> pair) {
return new Tuple2<IntWritable, Text>(new IntWritable(pair._1()), new Text(pair._2())); return new Tuple2<IntWritable, Text>(new IntWritable(pair._1()), new Text(pair._2()));
} }
}).saveAsNewAPIHadoopFile(outputDir, IntWritable.class, Text.class, }).saveAsNewAPIHadoopFile(outputDir, IntWritable.class, Text.class,
@ -457,7 +457,7 @@ public class JavaAPISuite implements Serializable {
Assert.assertEquals(pairs.toString(), output.map(new Function<Tuple2<IntWritable, Text>, Assert.assertEquals(pairs.toString(), output.map(new Function<Tuple2<IntWritable, Text>,
String>() { String>() {
@Override @Override
public String apply(Tuple2<IntWritable, Text> x) { public String call(Tuple2<IntWritable, Text> x) {
return x.toString(); return x.toString();
} }
}).collect().toString()); }).collect().toString());
@ -476,7 +476,7 @@ public class JavaAPISuite implements Serializable {
rdd.map(new PairFunction<Tuple2<Integer, String>, IntWritable, Text>() { rdd.map(new PairFunction<Tuple2<Integer, String>, IntWritable, Text>() {
@Override @Override
public Tuple2<IntWritable, Text> apply(Tuple2<Integer, String> pair) { public Tuple2<IntWritable, Text> call(Tuple2<Integer, String> pair) {
return new Tuple2<IntWritable, Text>(new IntWritable(pair._1()), new Text(pair._2())); return new Tuple2<IntWritable, Text>(new IntWritable(pair._1()), new Text(pair._2()));
} }
}).saveAsHadoopFile(outputDir, IntWritable.class, Text.class, SequenceFileOutputFormat.class); }).saveAsHadoopFile(outputDir, IntWritable.class, Text.class, SequenceFileOutputFormat.class);
@ -487,7 +487,7 @@ public class JavaAPISuite implements Serializable {
Assert.assertEquals(pairs.toString(), output.map(new Function<Tuple2<IntWritable, Text>, Assert.assertEquals(pairs.toString(), output.map(new Function<Tuple2<IntWritable, Text>,
String>() { String>() {
@Override @Override
public String apply(Tuple2<IntWritable, Text> x) { public String call(Tuple2<IntWritable, Text> x) {
return x.toString(); return x.toString();
} }
}).collect().toString()); }).collect().toString());
@ -534,7 +534,7 @@ public class JavaAPISuite implements Serializable {
rdd.map(new PairFunction<Tuple2<Integer, String>, IntWritable, Text>() { rdd.map(new PairFunction<Tuple2<Integer, String>, IntWritable, Text>() {
@Override @Override
public Tuple2<IntWritable, Text> apply(Tuple2<Integer, String> pair) { public Tuple2<IntWritable, Text> call(Tuple2<Integer, String> pair) {
return new Tuple2<IntWritable, Text>(new IntWritable(pair._1()), new Text(pair._2())); return new Tuple2<IntWritable, Text>(new IntWritable(pair._1()), new Text(pair._2()));
} }
}).saveAsHadoopFile(outputDir, IntWritable.class, Text.class, SequenceFileOutputFormat.class); }).saveAsHadoopFile(outputDir, IntWritable.class, Text.class, SequenceFileOutputFormat.class);
@ -544,7 +544,7 @@ public class JavaAPISuite implements Serializable {
Assert.assertEquals(pairs.toString(), output.map(new Function<Tuple2<IntWritable, Text>, Assert.assertEquals(pairs.toString(), output.map(new Function<Tuple2<IntWritable, Text>,
String>() { String>() {
@Override @Override
public String apply(Tuple2<IntWritable, Text> x) { public String call(Tuple2<IntWritable, Text> x) {
return x.toString(); return x.toString();
} }
}).collect().toString()); }).collect().toString());

View file

@ -39,6 +39,15 @@ class PipedRDDSuite extends FunSuite with BeforeAndAfter {
assert(c(1) === "LALALA") assert(c(1) === "LALALA")
} }
test("pipe with non-zero exit status") {
sc = new SparkContext("local", "test")
val nums = sc.makeRDD(Array(1, 2, 3, 4), 2)
val piped = nums.pipe("cat nonexistent_file")
intercept[SparkException] {
piped.collect()
}
}
} }

View file

@ -1,6 +1,8 @@
package spark package spark
import org.scalatest.FunSuite import org.scalatest.FunSuite
import org.scalatest.BeforeAndAfterAll
import org.scalatest.PrivateMethodTester
class DummyClass1 {} class DummyClass1 {}
@ -17,61 +19,114 @@ class DummyClass4(val d: DummyClass3) {
val x: Int = 0 val x: Int = 0
} }
class SizeEstimatorSuite extends FunSuite { class SizeEstimatorSuite extends FunSuite with BeforeAndAfterAll with PrivateMethodTester {
var oldArch: String = _
var oldOops: String = _
override def beforeAll() {
// Set the arch to 64-bit and compressedOops to true to get a deterministic test-case
oldArch = System.setProperty("os.arch", "amd64")
oldOops = System.setProperty("spark.test.useCompressedOops", "true")
}
override def afterAll() {
resetOrClear("os.arch", oldArch)
resetOrClear("spark.test.useCompressedOops", oldOops)
}
test("simple classes") { test("simple classes") {
expect(8)(SizeEstimator.estimate(new DummyClass1)) expect(16)(SizeEstimator.estimate(new DummyClass1))
expect(12)(SizeEstimator.estimate(new DummyClass2)) expect(16)(SizeEstimator.estimate(new DummyClass2))
expect(20)(SizeEstimator.estimate(new DummyClass3)) expect(24)(SizeEstimator.estimate(new DummyClass3))
expect(16)(SizeEstimator.estimate(new DummyClass4(null))) expect(24)(SizeEstimator.estimate(new DummyClass4(null)))
expect(36)(SizeEstimator.estimate(new DummyClass4(new DummyClass3))) expect(48)(SizeEstimator.estimate(new DummyClass4(new DummyClass3)))
} }
test("strings") { test("strings") {
expect(24)(SizeEstimator.estimate("")) expect(48)(SizeEstimator.estimate(""))
expect(26)(SizeEstimator.estimate("a")) expect(56)(SizeEstimator.estimate("a"))
expect(28)(SizeEstimator.estimate("ab")) expect(56)(SizeEstimator.estimate("ab"))
expect(40)(SizeEstimator.estimate("abcdefgh")) expect(64)(SizeEstimator.estimate("abcdefgh"))
} }
test("primitive arrays") { test("primitive arrays") {
expect(10)(SizeEstimator.estimate(new Array[Byte](10))) expect(32)(SizeEstimator.estimate(new Array[Byte](10)))
expect(20)(SizeEstimator.estimate(new Array[Char](10))) expect(40)(SizeEstimator.estimate(new Array[Char](10)))
expect(20)(SizeEstimator.estimate(new Array[Short](10))) expect(40)(SizeEstimator.estimate(new Array[Short](10)))
expect(40)(SizeEstimator.estimate(new Array[Int](10))) expect(56)(SizeEstimator.estimate(new Array[Int](10)))
expect(80)(SizeEstimator.estimate(new Array[Long](10))) expect(96)(SizeEstimator.estimate(new Array[Long](10)))
expect(40)(SizeEstimator.estimate(new Array[Float](10))) expect(56)(SizeEstimator.estimate(new Array[Float](10)))
expect(80)(SizeEstimator.estimate(new Array[Double](10))) expect(96)(SizeEstimator.estimate(new Array[Double](10)))
expect(4000)(SizeEstimator.estimate(new Array[Int](1000))) expect(4016)(SizeEstimator.estimate(new Array[Int](1000)))
expect(8000)(SizeEstimator.estimate(new Array[Long](1000))) expect(8016)(SizeEstimator.estimate(new Array[Long](1000)))
} }
test("object arrays") { test("object arrays") {
// Arrays containing nulls should just have one pointer per element // Arrays containing nulls should just have one pointer per element
expect(40)(SizeEstimator.estimate(new Array[String](10))) expect(56)(SizeEstimator.estimate(new Array[String](10)))
expect(40)(SizeEstimator.estimate(new Array[AnyRef](10))) expect(56)(SizeEstimator.estimate(new Array[AnyRef](10)))
// For object arrays with non-null elements, each object should take one pointer plus // For object arrays with non-null elements, each object should take one pointer plus
// however many bytes that class takes. (Note that Array.fill calls the code in its // however many bytes that class takes. (Note that Array.fill calls the code in its
// second parameter separately for each object, so we get distinct objects.) // second parameter separately for each object, so we get distinct objects.)
expect(120)(SizeEstimator.estimate(Array.fill(10)(new DummyClass1))) expect(216)(SizeEstimator.estimate(Array.fill(10)(new DummyClass1)))
expect(160)(SizeEstimator.estimate(Array.fill(10)(new DummyClass2))) expect(216)(SizeEstimator.estimate(Array.fill(10)(new DummyClass2)))
expect(240)(SizeEstimator.estimate(Array.fill(10)(new DummyClass3))) expect(296)(SizeEstimator.estimate(Array.fill(10)(new DummyClass3)))
expect(12 + 16)(SizeEstimator.estimate(Array(new DummyClass1, new DummyClass2))) expect(56)(SizeEstimator.estimate(Array(new DummyClass1, new DummyClass2)))
// Past size 100, our samples 100 elements, but we should still get the right size. // Past size 100, our samples 100 elements, but we should still get the right size.
expect(24000)(SizeEstimator.estimate(Array.fill(1000)(new DummyClass3))) expect(28016)(SizeEstimator.estimate(Array.fill(1000)(new DummyClass3)))
// If an array contains the *same* element many times, we should only count it once. // If an array contains the *same* element many times, we should only count it once.
val d1 = new DummyClass1 val d1 = new DummyClass1
expect(48)(SizeEstimator.estimate(Array.fill(10)(d1))) // 10 pointers plus 8-byte object expect(72)(SizeEstimator.estimate(Array.fill(10)(d1))) // 10 pointers plus 8-byte object
expect(408)(SizeEstimator.estimate(Array.fill(100)(d1))) // 100 pointers plus 8-byte object expect(432)(SizeEstimator.estimate(Array.fill(100)(d1))) // 100 pointers plus 8-byte object
// Same thing with huge array containing the same element many times. Note that this won't // Same thing with huge array containing the same element many times. Note that this won't
// return exactly 4008 because it can't tell that *all* the elements will equal the first // return exactly 4032 because it can't tell that *all* the elements will equal the first
// one it samples, but it should be close to that. // one it samples, but it should be close to that.
// TODO: If we sample 100 elements, this should always be 4176 ?
val estimatedSize = SizeEstimator.estimate(Array.fill(1000)(d1)) val estimatedSize = SizeEstimator.estimate(Array.fill(1000)(d1))
assert(estimatedSize >= 4000, "Estimated size " + estimatedSize + " should be more than 4000") assert(estimatedSize >= 4000, "Estimated size " + estimatedSize + " should be more than 4000")
assert(estimatedSize <= 4100, "Estimated size " + estimatedSize + " should be less than 4100") assert(estimatedSize <= 4200, "Estimated size " + estimatedSize + " should be less than 4100")
}
test("32-bit arch") {
val arch = System.setProperty("os.arch", "x86")
val initialize = PrivateMethod[Unit]('initialize)
SizeEstimator invokePrivate initialize()
expect(40)(SizeEstimator.estimate(""))
expect(48)(SizeEstimator.estimate("a"))
expect(48)(SizeEstimator.estimate("ab"))
expect(56)(SizeEstimator.estimate("abcdefgh"))
resetOrClear("os.arch", arch)
}
test("64-bit arch with no compressed oops") {
val arch = System.setProperty("os.arch", "amd64")
val oops = System.setProperty("spark.test.useCompressedOops", "false")
val initialize = PrivateMethod[Unit]('initialize)
SizeEstimator invokePrivate initialize()
expect(64)(SizeEstimator.estimate(""))
expect(72)(SizeEstimator.estimate("a"))
expect(72)(SizeEstimator.estimate("ab"))
expect(80)(SizeEstimator.estimate("abcdefgh"))
resetOrClear("os.arch", arch)
resetOrClear("spark.test.useCompressedOops", oops)
}
def resetOrClear(prop: String, oldValue: String) {
if (oldValue != null) {
System.setProperty(prop, oldValue)
} else {
System.clearProperty(prop)
}
} }
} }

View file

@ -6,17 +6,27 @@ import akka.actor._
import org.scalatest.FunSuite import org.scalatest.FunSuite
import org.scalatest.BeforeAndAfter import org.scalatest.BeforeAndAfter
import org.scalatest.PrivateMethodTester
import spark.KryoSerializer import spark.KryoSerializer
import spark.SizeEstimator
import spark.util.ByteBufferInputStream import spark.util.ByteBufferInputStream
class BlockManagerSuite extends FunSuite with BeforeAndAfter { class BlockManagerSuite extends FunSuite with BeforeAndAfter with PrivateMethodTester {
var actorSystem: ActorSystem = null var actorSystem: ActorSystem = null
var master: BlockManagerMaster = null var master: BlockManagerMaster = null
var oldArch: String = _
var oldOops: String = _
before { before {
actorSystem = ActorSystem("test") actorSystem = ActorSystem("test")
master = new BlockManagerMaster(actorSystem, true, true) master = new BlockManagerMaster(actorSystem, true, true)
// Set the arch to 64-bit and compressedOops to true to get a deterministic test-case
oldArch = System.setProperty("os.arch", "amd64")
oldOops = System.setProperty("spark.test.useCompressedOops", "true")
val initialize = PrivateMethod[Unit]('initialize)
SizeEstimator invokePrivate initialize()
} }
after { after {
@ -24,6 +34,18 @@ class BlockManagerSuite extends FunSuite with BeforeAndAfter {
actorSystem.awaitTermination() actorSystem.awaitTermination()
actorSystem = null actorSystem = null
master = null master = null
if (oldArch != null) {
System.setProperty("os.arch", oldArch)
} else {
System.clearProperty("os.arch")
}
if (oldOops != null) {
System.setProperty("spark.test.useCompressedOops", oldOops)
} else {
System.clearProperty("spark.test.useCompressedOops")
}
} }
test("manager-master interaction") { test("manager-master interaction") {
@ -57,7 +79,7 @@ class BlockManagerSuite extends FunSuite with BeforeAndAfter {
} }
test("in-memory LRU storage") { test("in-memory LRU storage") {
val store = new BlockManager(master, new KryoSerializer, 1000) val store = new BlockManager(master, new KryoSerializer, 1200)
val a1 = new Array[Byte](400) val a1 = new Array[Byte](400)
val a2 = new Array[Byte](400) val a2 = new Array[Byte](400)
val a3 = new Array[Byte](400) val a3 = new Array[Byte](400)
@ -78,7 +100,7 @@ class BlockManagerSuite extends FunSuite with BeforeAndAfter {
} }
test("in-memory LRU storage with serialization") { test("in-memory LRU storage with serialization") {
val store = new BlockManager(master, new KryoSerializer, 1000) val store = new BlockManager(master, new KryoSerializer, 1200)
val a1 = new Array[Byte](400) val a1 = new Array[Byte](400)
val a2 = new Array[Byte](400) val a2 = new Array[Byte](400)
val a3 = new Array[Byte](400) val a3 = new Array[Byte](400)
@ -99,7 +121,7 @@ class BlockManagerSuite extends FunSuite with BeforeAndAfter {
} }
test("on-disk storage") { test("on-disk storage") {
val store = new BlockManager(master, new KryoSerializer, 1000) val store = new BlockManager(master, new KryoSerializer, 1200)
val a1 = new Array[Byte](400) val a1 = new Array[Byte](400)
val a2 = new Array[Byte](400) val a2 = new Array[Byte](400)
val a3 = new Array[Byte](400) val a3 = new Array[Byte](400)
@ -112,7 +134,7 @@ class BlockManagerSuite extends FunSuite with BeforeAndAfter {
} }
test("disk and memory storage") { test("disk and memory storage") {
val store = new BlockManager(master, new KryoSerializer, 1000) val store = new BlockManager(master, new KryoSerializer, 1200)
val a1 = new Array[Byte](400) val a1 = new Array[Byte](400)
val a2 = new Array[Byte](400) val a2 = new Array[Byte](400)
val a3 = new Array[Byte](400) val a3 = new Array[Byte](400)
@ -126,7 +148,7 @@ class BlockManagerSuite extends FunSuite with BeforeAndAfter {
} }
test("disk and memory storage with serialization") { test("disk and memory storage with serialization") {
val store = new BlockManager(master, new KryoSerializer, 1000) val store = new BlockManager(master, new KryoSerializer, 1200)
val a1 = new Array[Byte](400) val a1 = new Array[Byte](400)
val a2 = new Array[Byte](400) val a2 = new Array[Byte](400)
val a3 = new Array[Byte](400) val a3 = new Array[Byte](400)
@ -140,7 +162,7 @@ class BlockManagerSuite extends FunSuite with BeforeAndAfter {
} }
test("LRU with mixed storage levels") { test("LRU with mixed storage levels") {
val store = new BlockManager(master, new KryoSerializer, 1000) val store = new BlockManager(master, new KryoSerializer, 1200)
val a1 = new Array[Byte](400) val a1 = new Array[Byte](400)
val a2 = new Array[Byte](400) val a2 = new Array[Byte](400)
val a3 = new Array[Byte](400) val a3 = new Array[Byte](400)
@ -166,7 +188,7 @@ class BlockManagerSuite extends FunSuite with BeforeAndAfter {
} }
test("in-memory LRU with streams") { test("in-memory LRU with streams") {
val store = new BlockManager(master, new KryoSerializer, 1000) val store = new BlockManager(master, new KryoSerializer, 1200)
val list1 = List(new Array[Byte](200), new Array[Byte](200)) val list1 = List(new Array[Byte](200), new Array[Byte](200))
val list2 = List(new Array[Byte](200), new Array[Byte](200)) val list2 = List(new Array[Byte](200), new Array[Byte](200))
val list3 = List(new Array[Byte](200), new Array[Byte](200)) val list3 = List(new Array[Byte](200), new Array[Byte](200))
@ -192,7 +214,7 @@ class BlockManagerSuite extends FunSuite with BeforeAndAfter {
} }
test("LRU with mixed storage levels and streams") { test("LRU with mixed storage levels and streams") {
val store = new BlockManager(master, new KryoSerializer, 1000) val store = new BlockManager(master, new KryoSerializer, 1200)
val list1 = List(new Array[Byte](200), new Array[Byte](200)) val list1 = List(new Array[Byte](200), new Array[Byte](200))
val list2 = List(new Array[Byte](200), new Array[Byte](200)) val list2 = List(new Array[Byte](200), new Array[Byte](200))
val list3 = List(new Array[Byte](200), new Array[Byte](200)) val list3 = List(new Array[Byte](200), new Array[Byte](200))

View file

@ -1,20 +0,0 @@
#!/bin/sh
# Licensed to the Apache Software Foundation (ASF) under one
# or more contributor license agreements. See the NOTICE file
# distributed with this work for additional information
# regarding copyright ownership. The ASF licenses this file
# to you under the Apache License, Version 2.0 (the
# "License"); you may not use this file except in compliance
# with the License. You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.
cd "`dirname $0`"
PYTHONPATH="./third_party/boto-2.4.1.zip/boto-2.4.1:$PYTHONPATH" python ./spark_ec2.py --user ec2-user --cluster-type standalone -a standalone $@

View file

@ -349,7 +349,7 @@ def setup_mesos_cluster(master, opts):
def setup_standalone_cluster(master, slave_nodes, opts): def setup_standalone_cluster(master, slave_nodes, opts):
slave_ips = '\n'.join([i.public_dns_name for i in slave_nodes]) slave_ips = '\n'.join([i.public_dns_name for i in slave_nodes])
ssh(master, opts, "echo \"%s\" > spark/conf/slaves" % (slave_ips)) ssh(master, opts, "echo \"%s\" > spark/conf/slaves" % (slave_ips))
ssh(master, opts, "/home/ec2-user/spark/bin/start-all.sh") ssh(master, opts, "/root/spark/bin/start-all.sh")
# Wait for a whole cluster (masters, slaves and ZooKeeper) to start up # Wait for a whole cluster (masters, slaves and ZooKeeper) to start up
@ -448,7 +448,7 @@ def deploy_files(conn, root_dir, opts, master_nodes, slave_nodes, zoo_nodes):
dest.close() dest.close()
# rsync the whole directory over to the master machine # rsync the whole directory over to the master machine
command = (("rsync -rv -e 'ssh -o StrictHostKeyChecking=no -i %s' " + command = (("rsync -rv -e 'ssh -o StrictHostKeyChecking=no -i %s' " +
"'%s/' '%s@%s:~'") % (opts.identity_file, tmp_dir, opts.user, active_master)) "'%s/' '%s@%s:/'") % (opts.identity_file, tmp_dir, opts.user, active_master))
subprocess.check_call(command, shell=True) subprocess.check_call(command, shell=True)
# Remove the temp directory we created above # Remove the temp directory we created above
shutil.rmtree(tmp_dir) shutil.rmtree(tmp_dir)

View file

@ -26,8 +26,7 @@ public class JavaHdfsLR {
} }
static class ParsePoint extends Function<String, DataPoint> { static class ParsePoint extends Function<String, DataPoint> {
public DataPoint call(String line) {
public DataPoint apply(String line) {
StringTokenizer tok = new StringTokenizer(line, " "); StringTokenizer tok = new StringTokenizer(line, " ");
double y = Double.parseDouble(tok.nextToken()); double y = Double.parseDouble(tok.nextToken());
double[] x = new double[D]; double[] x = new double[D];
@ -41,8 +40,7 @@ public class JavaHdfsLR {
} }
static class VectorSum extends Function2<double[], double[], double[]> { static class VectorSum extends Function2<double[], double[], double[]> {
public double[] call(double[] a, double[] b) {
public double[] apply(double[] a, double[] b) {
double[] result = new double[D]; double[] result = new double[D];
for (int j = 0; j < D; j++) { for (int j = 0; j < D; j++) {
result[j] = a[j] + b[j]; result[j] = a[j] + b[j];
@ -52,14 +50,13 @@ public class JavaHdfsLR {
} }
static class ComputeGradient extends Function<DataPoint, double[]> { static class ComputeGradient extends Function<DataPoint, double[]> {
double[] weights; double[] weights;
public ComputeGradient(double[] weights) { public ComputeGradient(double[] weights) {
this.weights = weights; this.weights = weights;
} }
public double[] apply(DataPoint p) { public double[] call(DataPoint p) {
double[] gradient = new double[D]; double[] gradient = new double[D];
for (int i = 0; i < D; i++) { for (int i = 0; i < D; i++) {
double dot = dot(weights, p.x); double dot = dot(weights, p.x);

View file

@ -11,6 +11,9 @@ import java.util.HashSet;
import java.util.List; import java.util.List;
import java.util.Set; import java.util.Set;
/**
* Transitive closure on a graph, implemented in Java.
*/
public class JavaTC { public class JavaTC {
static int numEdges = 200; static int numEdges = 200;
@ -32,7 +35,7 @@ public class JavaTC {
Integer, Integer> { Integer, Integer> {
static ProjectFn INSTANCE = new ProjectFn(); static ProjectFn INSTANCE = new ProjectFn();
public Tuple2<Integer, Integer> apply(Tuple2<Integer, Tuple2<Integer, Integer>> triple) { public Tuple2<Integer, Integer> call(Tuple2<Integer, Tuple2<Integer, Integer>> triple) {
return new Tuple2<Integer, Integer>(triple._2()._2(), triple._2()._1()); return new Tuple2<Integer, Integer>(triple._2()._2(), triple._2()._1());
} }
} }
@ -53,12 +56,11 @@ public class JavaTC {
// the graph to obtain the path (x, z). // the graph to obtain the path (x, z).
// Because join() joins on keys, the edges are stored in reversed order. // Because join() joins on keys, the edges are stored in reversed order.
JavaPairRDD<Integer, Integer> edges = tc.map(new PairFunction<Tuple2<Integer, Integer>, JavaPairRDD<Integer, Integer> edges = tc.map(
Integer, Integer>() { new PairFunction<Tuple2<Integer, Integer>, Integer, Integer>() {
@Override public Tuple2<Integer, Integer> call(Tuple2<Integer, Integer> e) {
public Tuple2<Integer, Integer> apply(Tuple2<Integer, Integer> e) { return new Tuple2<Integer, Integer>(e._2(), e._1());
return new Tuple2<Integer, Integer>(e._2(), e._1()); }
}
}); });
long oldCount = 0; long oldCount = 0;

View file

@ -12,9 +12,7 @@ import java.util.Arrays;
import java.util.List; import java.util.List;
public class JavaWordCount { public class JavaWordCount {
public static void main(String[] args) throws Exception { public static void main(String[] args) throws Exception {
if (args.length < 2) { if (args.length < 2) {
System.err.println("Usage: JavaWordCount <master> <file>"); System.err.println("Usage: JavaWordCount <master> <file>");
System.exit(1); System.exit(1);
@ -23,16 +21,20 @@ public class JavaWordCount {
JavaSparkContext ctx = new JavaSparkContext(args[0], "JavaWordCount"); JavaSparkContext ctx = new JavaSparkContext(args[0], "JavaWordCount");
JavaRDD<String> lines = ctx.textFile(args[1], 1); JavaRDD<String> lines = ctx.textFile(args[1], 1);
JavaPairRDD<String, Integer> counts = lines.flatMap(new FlatMapFunction<String, String>() { JavaRDD<String> words = lines.flatMap(new FlatMapFunction<String, String>() {
public Iterable<String> apply(String s) { public Iterable<String> call(String s) {
return Arrays.asList(s.split(" ")); return Arrays.asList(s.split(" "));
} }
}).map(new PairFunction<String, String, Integer>() { });
public Tuple2<String, Integer> apply(String s) {
JavaPairRDD<String, Integer> ones = words.map(new PairFunction<String, String, Integer>() {
public Tuple2<String, Integer> call(String s) {
return new Tuple2(s, 1); return new Tuple2(s, 1);
} }
}).reduceByKey(new Function2<Integer, Integer, Integer>() { });
public Integer apply(Integer i1, Integer i2) {
JavaPairRDD<String, Integer> counts = ones.reduceByKey(new Function2<Integer, Integer, Integer>() {
public Integer call(Integer i1, Integer i2) {
return i1 + i2; return i1 + i2;
} }
}); });

View file

@ -5,6 +5,9 @@ import SparkContext._
import scala.util.Random import scala.util.Random
import scala.collection.mutable import scala.collection.mutable
/**
* Transitive closure on a graph.
*/
object SparkTC { object SparkTC {
val numEdges = 200 val numEdges = 200