Merge branch 'http-repl-class-serving'

This commit is contained in:
Matei Zaharia 2010-09-28 22:43:04 -07:00
commit e5e9edeeb3
8 changed files with 135 additions and 24 deletions

View file

@ -10,6 +10,8 @@ JARS += third_party/hadoop-0.20.0/hadoop-0.20.0-core.jar
JARS += third_party/hadoop-0.20.0/lib/commons-logging-1.0.4.jar
JARS += third_party/scalatest-1.2/scalatest-1.2.jar
JARS += third_party/scalacheck_2.8.0-1.7.jar
JARS += third_party/jetty-7.1.6.v20100715/jetty-server-7.1.6.v20100715.jar
JARS += third_party/jetty-7.1.6.v20100715/servlet-api-2.5.jar
CLASSPATH = $(subst $(SPACE),:,$(JARS))
SCALA_SOURCES = src/examples/*.scala src/scala/spark/*.scala src/scala/spark/repl/*.scala

2
run
View file

@ -36,6 +36,8 @@ SPARK_CLASSPATH+=:$FWDIR/third_party/guava-r06/guava-r06.jar
SPARK_CLASSPATH+=:$FWDIR/third_party/hadoop-0.20.0/hadoop-0.20.0-core.jar
SPARK_CLASSPATH+=:third_party/scalatest-1.2/scalatest-1.2.jar
SPARK_CLASSPATH+=:third_party/scalacheck_2.8.0-1.7.jar
SPARK_CLASSPATH+=:third_party/jetty-7.1.6.v20100715/jetty-server-7.1.6.v20100715.jar
SPARK_CLASSPATH+=:third_party/jetty-7.1.6.v20100715/servlet-api-2.5.jar
for jar in $FWDIR/third_party/hadoop-0.20.0/lib/*.jar; do
SPARK_CLASSPATH+=:$jar
done

View file

@ -25,10 +25,10 @@ object Executor {
// If the REPL is in use, create a ClassLoader that will be able to
// read new classes defined by the REPL as the user types code
classLoader = this.getClass.getClassLoader
val classDir = System.getProperty("spark.repl.current.classdir")
if (classDir != null) {
println("Using REPL classdir: " + classDir)
classLoader = new repl.ExecutorClassLoader(classDir, classLoader)
val classUri = System.getProperty("spark.repl.class.uri")
if (classUri != null) {
println("Using REPL class URI: " + classUri)
classLoader = new repl.ExecutorClassLoader(classUri, classLoader)
}
Thread.currentThread.setContextClassLoader(classLoader)

View file

@ -0,0 +1,74 @@
package spark.repl
import java.io.File
import java.net.InetAddress
import org.eclipse.jetty.server.Server
import org.eclipse.jetty.server.handler.DefaultHandler
import org.eclipse.jetty.server.handler.HandlerList
import org.eclipse.jetty.server.handler.ResourceHandler
/**
* Exception type thrown by ClassServer when it is in the wrong state
* for an operation.
*/
class ServerStateException(message: String) extends Exception(message)
/**
* An HTTP server used by the interpreter to allow worker nodes to access
* class files created as the user types in lines of code. This is just a
* wrapper around a Jetty embedded HTTP server.
*/
class ClassServer(classDir: File) {
private var server: Server = null
private var port: Int = -1
def start() {
if (server != null) {
throw new ServerStateException("Server is already started")
} else {
server = new Server(0)
val resHandler = new ResourceHandler
resHandler.setResourceBase(classDir.getAbsolutePath)
val handlerList = new HandlerList
handlerList.setHandlers(Array(resHandler, new DefaultHandler))
server.setHandler(handlerList)
server.start()
port = server.getConnectors()(0).getLocalPort()
}
}
def stop() {
if (server == null) {
throw new ServerStateException("Server is already stopped")
} else {
server.stop()
port = -1
server = null
}
}
/**
* Get the URI of this HTTP server (http://host:port)
*/
def uri: String = {
if (server == null) {
throw new ServerStateException("Server is not started")
} else {
return "http://" + getLocalIpAddress + ":" + port
}
}
/**
* Get the local host's IP address in dotted-quad format (e.g. 1.2.3.4)
*/
private def getLocalIpAddress: String = {
// Get local IP as an array of four bytes
val bytes = InetAddress.getLocalHost().getAddress()
// Convert the bytes to ints (keeping in mind that they may be negative)
// and join them into a string
return bytes.map(b => (b.toInt + 256) % 256).mkString(".")
}
}

View file

@ -1,7 +1,7 @@
package spark.repl
import java.io.{ByteArrayOutputStream, InputStream}
import java.net.{URI, URL, URLClassLoader}
import java.net.{URI, URL, URLClassLoader, URLEncoder}
import java.util.concurrent.{Executors, ExecutorService}
import org.apache.hadoop.conf.Configuration
@ -12,18 +12,35 @@ import org.objectweb.asm.commons.EmptyVisitor
import org.objectweb.asm.Opcodes._
// A ClassLoader that reads classes from a Hadoop FileSystem URL, used to load
// classes defined by the interpreter when the REPL is in use
class ExecutorClassLoader(classDir: String, parent: ClassLoader)
/**
* A ClassLoader that reads classes from a Hadoop FileSystem or HTTP URI,
* used to load classes defined by the interpreter when the REPL is used
*/
class ExecutorClassLoader(classUri: String, parent: ClassLoader)
extends ClassLoader(parent) {
val fileSystem = FileSystem.get(new URI(classDir), new Configuration())
val directory = new URI(classDir).getPath
val uri = new URI(classUri)
val directory = uri.getPath
// Hadoop FileSystem object for our URI, if it isn't using HTTP
var fileSystem: FileSystem = {
if (uri.getScheme() == "http")
null
else
FileSystem.get(uri, new Configuration())
}
override def findClass(name: String): Class[_] = {
try {
//println("repl.ExecutorClassLoader resolving " + name)
val path = new Path(directory, name.replace('.', '/') + ".class")
val bytes = readAndTransformClass(name, fileSystem.open(path))
val pathInDirectory = name.replace('.', '/') + ".class"
val inputStream = {
if (fileSystem != null)
fileSystem.open(new Path(directory, pathInDirectory))
else
new URL(classUri + "/" + urlEncode(pathInDirectory)).openStream()
}
val bytes = readAndTransformClass(name, inputStream)
inputStream.close()
return defineClass(name, bytes, 0, bytes.length)
} catch {
case e: Exception => throw new ClassNotFoundException(name, e)
@ -57,6 +74,13 @@ extends ClassLoader(parent) {
return bos.toByteArray
}
}
/**
* URL-encode a string, preserving only slashes
*/
def urlEncode(str: String): String = {
str.split('/').map(part => URLEncoder.encode(part, "UTF-8")).mkString("/")
}
}
class ConstructorCleaner(className: String, cv: ClassVisitor)

View file

@ -90,32 +90,40 @@ class SparkInterpreter(val settings: Settings, out: PrintWriter) {
val SPARK_DEBUG_REPL: Boolean = (System.getenv("SPARK_DEBUG_REPL") == "1")
/** directory to save .class files to */
//val virtualDirectory = new VirtualDirectory("(memory)", None)
val virtualDirectory = {
val outputDir = {
val rootDir = new File(System.getProperty("spark.repl.classdir",
System.getProperty("java.io.tmpdir")))
var attempts = 0
val maxAttempts = 10
var outputDir: File = null
while (outputDir == null) {
var dir: File = null
while (dir == null) {
attempts += 1
if (attempts > maxAttempts) {
throw new IOException("Failed to create a temp directory " +
"after " + maxAttempts + " attempts!")
}
try {
outputDir = new File(rootDir, "spark-" + UUID.randomUUID.toString)
if (outputDir.exists() || !outputDir.mkdirs())
outputDir = null
dir = new File(rootDir, "spark-" + UUID.randomUUID.toString)
if (dir.exists() || !dir.mkdirs())
dir = null
} catch { case e: IOException => ; }
}
System.setProperty("spark.repl.current.classdir",
"file://" + outputDir.getAbsolutePath + "/")
if (SPARK_DEBUG_REPL)
println("Output directory: " + outputDir)
new PlainFile(outputDir)
println("Output directory: " + dir)
dir
}
/** directory to save .class files to */
//val virtualDirectory = new VirtualDirectory("(memory)", None)
val virtualDirectory = new PlainFile(outputDir)
/** Jetty server that will serve our classes to worker nodes */
val classServer = new ClassServer(outputDir)
// Start the classServer and remember its URI in a spark system property */
classServer.start()
println("ClassServer started, URI = " + classServer.uri)
System.setProperty("spark.repl.class.uri", classServer.uri)
/** reporter */
object reporter extends ConsoleReporter(settings, null, out) {
@ -714,6 +722,7 @@ class SparkInterpreter(val settings: Settings, out: PrintWriter) {
*/
def close() {
reporter.flush
classServer.stop()
}
/** A traverser that finds all mentioned identifiers, i.e. things

Binary file not shown.

Binary file not shown.