Merge remote branch 'origin/custom-serialization' into new-rdds

This commit is contained in:
Matei Zaharia 2011-03-07 19:20:28 -08:00
commit 8b6f3db415
31 changed files with 269 additions and 291 deletions

Binary file not shown.

View file

@ -1,3 +0,0 @@
It is highly recommended to use only the necessary ASM jars for your
application instead of using the asm-all jar, unless you really need
all ASM packages.

View file

@ -1,15 +0,0 @@
<project>
<modelVersion>4.0.0</modelVersion>
<parent>
<groupId>asm</groupId>
<artifactId>asm-parent</artifactId>
<version>3.2</version>
</parent>
<name>ASM All</name>
<groupId>asm</groupId>
<artifactId>asm-all</artifactId>
<packaging>jar</packaging>
</project>

View file

@ -1,15 +0,0 @@
<project>
<modelVersion>4.0.0</modelVersion>
<parent>
<groupId>asm</groupId>
<artifactId>asm-parent</artifactId>
<version>3.2</version>
</parent>
<name>ASM All</name>
<groupId>asm</groupId>
<artifactId>asm-all</artifactId>
<packaging>jar</packaging>
</project>

Binary file not shown.

View file

@ -1,14 +0,0 @@
<project>
<modelVersion>4.0.0</modelVersion>
<parent>
<artifactId>asm-parent</artifactId>
<groupId>asm</groupId>
<version>3.2</version>
</parent>
<name>ASM Core</name>
<artifactId>asm</artifactId>
<packaging>jar</packaging>
</project>

View file

@ -1,21 +0,0 @@
<project>
<modelVersion>4.0.0</modelVersion>
<parent>
<artifactId>asm-parent</artifactId>
<groupId>asm</groupId>
<version>3.2</version>
</parent>
<name>ASM Analysis</name>
<artifactId>asm-analysis</artifactId>
<packaging>jar</packaging>
<dependencies>
<dependency>
<artifactId>asm-tree</artifactId>
<groupId>asm</groupId>
</dependency>
</dependencies>
</project>

View file

@ -1,21 +0,0 @@
<project>
<modelVersion>4.0.0</modelVersion>
<parent>
<artifactId>asm-parent</artifactId>
<groupId>asm</groupId>
<version>3.2</version>
</parent>
<name>ASM Commons</name>
<artifactId>asm-commons</artifactId>
<packaging>jar</packaging>
<dependencies>
<dependency>
<artifactId>asm-tree</artifactId>
<groupId>asm</groupId>
</dependency>
</dependencies>
</project>

View file

@ -1,136 +0,0 @@
<project xmlns="http://maven.apache.org/POM/4.0.0"
xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
xsi:schemaLocation="http://maven.apache.org/POM/4.0.0
http://maven.apache.org/maven-v4_0_0.xsd">
<modelVersion>4.0.0</modelVersion>
<artifactId>asm-parent</artifactId>
<groupId>asm</groupId>
<version>3.2</version>
<packaging>pom</packaging>
<name>ASM</name>
<description>A very small and fast Java bytecode manipulation framework</description>
<url>http://asm.objectweb.org/</url>
<organization>
<name>ObjectWeb</name>
<url>http://www.objectweb.org/</url>
</organization>
<inceptionYear>2000</inceptionYear>
<licenses>
<license>
<name>BSD</name>
<url>http://asm.objectweb.org/license.html</url>
</license>
</licenses>
<developers>
<developer>
<name>Eric Bruneton</name>
<id>ebruneton</id>
<email>Eric.Bruneton@rd.francetelecom.com</email>
<roles>
<role>Creator</role>
<role>Java Developer</role>
</roles>
</developer>
<developer>
<name>Eugene Kuleshov</name>
<id>eu</id>
<email>eu@javatx.org</email>
<roles>
<role>Java Developer</role>
</roles>
</developer>
</developers>
<scm>
<connection>scm:cvs:pserver:anonymous:@cvs.forge.objectweb.org:/cvsroot/asm:asm</connection>
<developerConnection>scm:cvs:ext:${maven.username}@cvs.forge.objectweb.org:/cvsroot/asm:asm</developerConnection>
<url>http://cvs.forge.objectweb.org/cgi-bin/viewcvs.cgi/asm/asm/</url>
</scm>
<issueManagement>
<url>http://forge.objectweb.org/tracker/?group_id=23</url>
</issueManagement>
<dependencyManagement>
<dependencies>
<dependency>
<artifactId>asm</artifactId>
<groupId>${project.groupId}</groupId>
<version>${project.version}</version>
</dependency>
<dependency>
<artifactId>asm-tree</artifactId>
<groupId>${project.groupId}</groupId>
<version>${project.version}</version>
</dependency>
<dependency>
<artifactId>asm-analysis</artifactId>
<groupId>${project.groupId}</groupId>
<version>${project.version}</version>
</dependency>
<dependency>
<artifactId>asm-commons</artifactId>
<groupId>${project.groupId}</groupId>
<version>${project.version}</version>
</dependency>
<dependency>
<artifactId>asm-util</artifactId>
<groupId>${project.groupId}</groupId>
<version>${project.version}</version>
</dependency>
<dependency>
<artifactId>asm-xml</artifactId>
<groupId>${project.groupId}</groupId>
<version>${project.version}</version>
</dependency>
</dependencies>
</dependencyManagement>
<mailingLists>
<mailingList>
<name>ASM Users List</name>
<subscribe>sympa@ow2.org?subject=subscribe%20asm</subscribe>
<unsubscribe>sympa@ow2.org?subject=unsubscribe%20asm</unsubscribe>
<post>asm@ow2.org</post>
<archive>http://www.ow2.org/wws/arc/asm</archive>
</mailingList>
<mailingList>
<name>ASM Team List</name>
<subscribe>sympa@ow2.org?subject=subscribe%20asm-team</subscribe>
<unsubscribe>sympa@ow2.org?subject=unsubscribe%20asm-team</unsubscribe>
<post>asm-team@ow2.org</post>
<archive>http://www.ow2.org/wws/arc/asm-team</archive>
</mailingList>
</mailingLists>
<distributionManagement>
<downloadUrl>http://mojo.codehaus.org/my-project</downloadUrl>
<repository>
<id>objectweb</id>
<uniqueVersion>false</uniqueVersion>
<name>ObjectWeb Maven 2.0 Repository</name>
<url>dav:https://maven.forge.objectweb.org:8002/maven2/</url>
<layout>default</layout>
</repository>
<snapshotRepository>
<id>objectweb.snapshots</id>
<uniqueVersion>false</uniqueVersion>
<name>ObjectWeb Maven 2.0 Snapshot Repository</name>
<url>dav:https://maven.forge.objectweb.org:8002/maven2-snapshot/</url>
<layout>default</layout>
</snapshotRepository>
</distributionManagement>
</project>

View file

@ -1,21 +0,0 @@
<project>
<modelVersion>4.0.0</modelVersion>
<parent>
<artifactId>asm-parent</artifactId>
<groupId>asm</groupId>
<version>3.2</version>
</parent>
<name>ASM Tree</name>
<artifactId>asm-tree</artifactId>
<packaging>jar</packaging>
<dependencies>
<dependency>
<artifactId>asm</artifactId>
<groupId>asm</groupId>
</dependency>
</dependencies>
</project>

View file

@ -1,21 +0,0 @@
<project>
<modelVersion>4.0.0</modelVersion>
<parent>
<artifactId>asm-parent</artifactId>
<groupId>asm</groupId>
<version>3.2</version>
</parent>
<name>ASM Util</name>
<artifactId>asm-util</artifactId>
<packaging>jar</packaging>
<dependencies>
<dependency>
<artifactId>asm-tree</artifactId>
<groupId>asm</groupId>
</dependency>
</dependencies>
</project>

Binary file not shown.

View file

@ -1,21 +0,0 @@
<project>
<modelVersion>4.0.0</modelVersion>
<parent>
<artifactId>asm-parent</artifactId>
<groupId>asm</groupId>
<version>3.2</version>
</parent>
<name>ASM XML</name>
<artifactId>asm-xml</artifactId>
<packaging>jar</packaging>
<dependencies>
<dependency>
<artifactId>asm-util</artifactId>
<groupId>asm</groupId>
</dependency>
</dependencies>
</project>

BIN
core/lib/asm-all-3.3.1.jar Normal file

Binary file not shown.

Binary file not shown.

Binary file not shown.

Binary file not shown.

View file

@ -24,6 +24,7 @@ class Executor extends mesos.Executor with Logging {
// Initialize cache and broadcast system (uses some properties read above)
Cache.initialize()
Serializer.initialize()
Broadcast.initialize(false)
MapOutputTracker.initialize(false)
RDDCache.initialize(false)

View file

@ -0,0 +1,48 @@
package spark
import java.io._
class JavaSerializationStream(out: OutputStream) extends SerializationStream {
val objOut = new ObjectOutputStream(out)
def writeObject[T](t: T) { objOut.writeObject(t) }
def flush() { objOut.flush() }
def close() { objOut.close() }
}
class JavaDeserializationStream(in: InputStream) extends DeserializationStream {
val objIn = new ObjectInputStream(in) {
override def resolveClass(desc: ObjectStreamClass) =
Class.forName(desc.getName, false, currentThread.getContextClassLoader)
}
def readObject[T](): T = objIn.readObject().asInstanceOf[T]
def close() { objIn.close() }
}
class JavaSerializer extends Serializer {
def serialize[T](t: T): Array[Byte] = {
val bos = new ByteArrayOutputStream()
val out = outputStream(bos)
out.writeObject(t)
out.close()
bos.toByteArray
}
def deserialize[T](bytes: Array[Byte]): T = {
val bis = new ByteArrayInputStream(bytes)
val in = inputStream(bis)
in.readObject().asInstanceOf[T]
}
def outputStream(s: OutputStream): SerializationStream = {
new JavaSerializationStream(s)
}
def inputStream(s: InputStream): DeserializationStream = {
new JavaDeserializationStream(s)
}
}
class JavaSerialization extends SerializationStrategy {
def newSerializer(): Serializer = new JavaSerializer
}

View file

@ -0,0 +1,145 @@
package spark
import java.io._
import java.nio.ByteBuffer
import java.nio.channels.Channels
import scala.collection.immutable
import scala.collection.mutable
import com.esotericsoftware.kryo._
object ZigZag {
def writeInt(n: Int, out: OutputStream) {
var value = n
if ((value & ~0x7F) == 0) {
out.write(value)
return
}
out.write(((value & 0x7F) | 0x80))
value >>>= 7
if ((value & ~0x7F) == 0) {
out.write(value)
return
}
out.write(((value & 0x7F) | 0x80))
value >>>= 7
if ((value & ~0x7F) == 0) {
out.write(value)
return
}
out.write(((value & 0x7F) | 0x80))
value >>>= 7
if ((value & ~0x7F) == 0) {
out.write(value)
return
}
out.write(((value & 0x7F) | 0x80))
value >>>= 7
out.write(value)
}
def readInt(in: InputStream): Int = {
var offset = 0
var result = 0
while (offset < 32) {
val b = in.read()
if (b == -1) {
throw new EOFException("End of stream")
}
result |= ((b & 0x7F) << offset)
if ((b & 0x80) == 0) {
return result
}
offset += 7
}
throw new SparkException("Malformed zigzag-encoded integer")
}
}
class KryoSerializationStream(kryo: Kryo, out: OutputStream)
extends SerializationStream {
val buf = ByteBuffer.allocateDirect(1024*1024)
def writeObject[T](t: T) {
kryo.writeClassAndObject(buf, t)
ZigZag.writeInt(buf.position(), out)
buf.flip()
Channels.newChannel(out).write(buf)
buf.clear()
}
def flush() { out.flush() }
def close() { out.close() }
}
class KryoDeserializationStream(kryo: Kryo, in: InputStream)
extends DeserializationStream {
val buf = new ObjectBuffer(kryo, 1024*1024)
def readObject[T](): T = {
val len = ZigZag.readInt(in)
buf.readClassAndObject(in, len).asInstanceOf[T]
}
def close() { in.close() }
}
class KryoSerializer(kryo: Kryo) extends Serializer {
val buf = new ObjectBuffer(kryo, 1024*1024)
def serialize[T](t: T): Array[Byte] = {
buf.writeClassAndObject(t)
}
def deserialize[T](bytes: Array[Byte]): T = {
buf.readClassAndObject(bytes).asInstanceOf[T]
}
def outputStream(s: OutputStream): SerializationStream = {
new KryoSerializationStream(kryo, s)
}
def inputStream(s: InputStream): DeserializationStream = {
new KryoDeserializationStream(kryo, s)
}
}
// Used by clients to register their own classes
trait KryoRegistrator {
def registerClasses(kryo: Kryo): Unit
}
class KryoSerialization extends SerializationStrategy with Logging {
val kryo = createKryo()
def createKryo(): Kryo = {
val kryo = new Kryo()
val toRegister: Seq[AnyRef] = Seq(
// Arrays
Array(1), Array(1.0), Array(1.0f), Array(1L), Array(""), Array(("", "")),
// Specialized Tuple2s
("", ""), (1, 1), (1.0, 1.0), (1L, 1L),
(1, 1.0), (1.0, 1), (1L, 1.0), (1.0, 1L), (1, 1L), (1L, 1),
// Scala collections
Nil, List(1), immutable.Map(1 -> 1), immutable.HashMap(1 -> 1),
mutable.Map(1 -> 1), mutable.HashMap(1 -> 1), mutable.ArrayBuffer(1),
// Options and Either
Some(1), None, Left(1), Right(1),
// Higher-dimensional tuples
(1, 1, 1), (1, 1, 1, 1), (1, 1, 1, 1, 1)
)
for (obj <- toRegister) {
kryo.register(obj.getClass)
}
val regCls = System.getProperty("spark.kryo.registrator")
if (regCls != null) {
logInfo("Running user registrator: " + regCls)
val reg = Class.forName(regCls).newInstance().asInstanceOf[KryoRegistrator]
reg.registerClasses(kryo)
}
kryo
}
def newSerializer(): Serializer = new KryoSerializer(kryo)
}

View file

@ -47,9 +47,10 @@ class LocalFileShuffle[K, V, C] extends Shuffle[K, V, C] with Logging {
case None => createCombiner(v)
}
}
val ser = Serializer.newInstance()
for (i <- 0 until numOutputSplits) {
val file = LocalFileShuffle.getOutputFile(shuffleId, myIndex, i)
val out = new ObjectOutputStream(new FileOutputStream(file))
val out = ser.outputStream(new FileOutputStream(file))
buckets(i).foreach(pair => out.writeObject(pair))
out.close()
}
@ -69,10 +70,11 @@ class LocalFileShuffle[K, V, C] extends Shuffle[K, V, C] with Logging {
val indexes = sc.parallelize(0 until numOutputSplits, numOutputSplits)
return indexes.flatMap((myId: Int) => {
val combiners = new HashMap[K, C]
val ser = Serializer.newInstance()
for ((serverUri, inputIds) <- Utils.shuffle(splitsByUri)) {
for (i <- inputIds) {
val url = "%s/shuffle/%d/%d/%d".format(serverUri, shuffleId, i, myId)
val inputStream = new ObjectInputStream(new URL(url).openStream())
val inputStream = ser.inputStream(new URL(url).openStream())
try {
while (true) {
val (k, c) = inputStream.readObject().asInstanceOf[(K, C)]

View file

@ -0,0 +1,40 @@
package spark
import java.io.{InputStream, OutputStream}
trait SerializationStream {
def writeObject[T](t: T): Unit
def flush(): Unit
def close(): Unit
}
trait DeserializationStream {
def readObject[T](): T
def close(): Unit
}
trait Serializer {
def serialize[T](t: T): Array[Byte]
def deserialize[T](bytes: Array[Byte]): T
def outputStream(s: OutputStream): SerializationStream
def inputStream(s: InputStream): DeserializationStream
}
trait SerializationStrategy {
def newSerializer(): Serializer
}
object Serializer {
var strat: SerializationStrategy = null
def initialize() {
val cls = System.getProperty("spark.serialization",
"spark.JavaSerialization")
strat = Class.forName(cls).newInstance().asInstanceOf[SerializationStrategy]
}
// Return a serializer ** for use by a single thread **
def newInstance(): Serializer = {
strat.newSerializer()
}
}

View file

@ -0,0 +1,26 @@
package spark
import java.io._
/**
* Wrapper around a BoundedMemoryCache that stores serialized objects as
* byte arrays in order to reduce storage cost and GC overhead
*/
class SerializingCache extends Cache with Logging {
val bmc = new BoundedMemoryCache
override def put(key: Any, value: Any) {
val ser = Serializer.newInstance()
bmc.put(key, ser.serialize(value))
}
override def get(key: Any): Any = {
val bytes = bmc.get(key)
if (bytes != null) {
val ser = Serializer.newInstance()
return ser.deserialize(bytes.asInstanceOf[Array[Byte]])
} else {
return null
}
}
}

View file

@ -40,6 +40,7 @@ extends Logging {
// Start the scheduler, the cache and the broadcast system
scheduler.start()
Cache.initialize()
Serializer.initialize()
Broadcast.initialize(true)
MapOutputTracker.initialize(true)
RDDCache.initialize(true)

5
run
View file

@ -40,7 +40,7 @@ EXAMPLES_DIR=$FWDIR/examples
CLASSPATH="$SPARK_CLASSPATH:$CORE_DIR/target/scala_2.8.1/classes:$MESOS_CLASSPATH"
CLASSPATH+=:$FWDIR/conf
CLASSPATH+=:$CORE_DIR/lib/mesos.jar
CLASSPATH+=:$CORE_DIR/lib/asm-3.2/lib/all/asm-all-3.2.jar
CLASSPATH+=:$CORE_DIR/lib/asm-all-3.3.1.jar
CLASSPATH+=:$CORE_DIR/lib/colt.jar
CLASSPATH+=:$CORE_DIR/lib/guava-r07/guava-r07.jar
CLASSPATH+=:$CORE_DIR/lib/hadoop-0.20.2/hadoop-0.20.2-core.jar
@ -48,6 +48,9 @@ CLASSPATH+=:$CORE_DIR/lib/scalatest-1.2/scalatest-1.2.jar
CLASSPATH+=:$CORE_DIR/lib/scalacheck_2.8.0-1.7.jar
CLASSPATH+=:$CORE_DIR/lib/jetty-7.1.6.v20100715/jetty-server-7.1.6.v20100715.jar
CLASSPATH+=:$CORE_DIR/lib/jetty-7.1.6.v20100715/servlet-api-2.5.jar
CLASSPATH+=:$CORE_DIR/lib/kryo-1.04-mod/kryo-1.04-mod.jar
CLASSPATH+=:$CORE_DIR/lib/kryo-1.04-mod/minlog-1.2.jar
CLASSPATH+=:$CORE_DIR/lib/kryo-1.04-mod/reflectasm-1.01.jar
CLASSPATH+=:$CORE_DIR/lib/apache-log4j-1.2.16/log4j-1.2.16.jar
CLASSPATH+=:$CORE_DIR/lib/slf4j-1.6.1/slf4j-api-1.6.1.jar
CLASSPATH+=:$CORE_DIR/lib/slf4j-1.6.1/slf4j-log4j12-1.6.1.jar