Merge branch 'matei-logging'
This commit is contained in:
commit
b6debf5da1
1
.gitignore
vendored
1
.gitignore
vendored
|
@ -7,3 +7,4 @@ third_party/libmesos.so
|
|||
third_party/libmesos.dylib
|
||||
conf/java-opts
|
||||
conf/spark-env.sh
|
||||
conf/log4j.properties
|
||||
|
|
3
Makefile
3
Makefile
|
@ -12,6 +12,9 @@ JARS += third_party/scalatest-1.2/scalatest-1.2.jar
|
|||
JARS += third_party/scalacheck_2.8.0-1.7.jar
|
||||
JARS += third_party/jetty-7.1.6.v20100715/jetty-server-7.1.6.v20100715.jar
|
||||
JARS += third_party/jetty-7.1.6.v20100715/servlet-api-2.5.jar
|
||||
JARS += third_party/apache-log4j-1.2.16/log4j-1.2.16.jar
|
||||
JARS += third_party/slf4j-1.6.1/slf4j-api-1.6.1.jar
|
||||
JARS += third_party/slf4j-1.6.1/slf4j-log4j12-1.6.1.jar
|
||||
CLASSPATH = $(subst $(SPACE),:,$(JARS))
|
||||
|
||||
SCALA_SOURCES = src/examples/*.scala src/scala/spark/*.scala src/scala/spark/repl/*.scala
|
||||
|
|
8
conf/log4j.properties
Normal file
8
conf/log4j.properties
Normal file
|
@ -0,0 +1,8 @@
|
|||
# Set everything to be logged to the console
|
||||
log4j.rootCategory=INFO, console
|
||||
log4j.appender.console=org.apache.log4j.ConsoleAppender
|
||||
log4j.appender.console.layout=org.apache.log4j.PatternLayout
|
||||
log4j.appender.console.layout.ConversionPattern=%d{yy/MM/dd HH:mm:ss} %p %c{1}: %m%n
|
||||
|
||||
# Ignore messages below warning level from Jetty, because it's a bit verbose
|
||||
log4j.logger.org.eclipse.jetty=WARN
|
31
run
31
run
|
@ -28,21 +28,24 @@ fi
|
|||
export JAVA_OPTS
|
||||
|
||||
# Build up classpath
|
||||
SPARK_CLASSPATH="$SPARK_CLASSPATH:$FWDIR/build/classes"
|
||||
SPARK_CLASSPATH+=:$FWDIR/third_party/mesos.jar
|
||||
SPARK_CLASSPATH+=:$FWDIR/third_party/asm-3.2/lib/all/asm-all-3.2.jar
|
||||
SPARK_CLASSPATH+=:$FWDIR/third_party/colt.jar
|
||||
SPARK_CLASSPATH+=:$FWDIR/third_party/guava-r06/guava-r06.jar
|
||||
SPARK_CLASSPATH+=:$FWDIR/third_party/hadoop-0.20.0/hadoop-0.20.0-core.jar
|
||||
SPARK_CLASSPATH+=:third_party/scalatest-1.2/scalatest-1.2.jar
|
||||
SPARK_CLASSPATH+=:third_party/scalacheck_2.8.0-1.7.jar
|
||||
SPARK_CLASSPATH+=:third_party/jetty-7.1.6.v20100715/jetty-server-7.1.6.v20100715.jar
|
||||
SPARK_CLASSPATH+=:third_party/jetty-7.1.6.v20100715/servlet-api-2.5.jar
|
||||
CLASSPATH="$SPARK_CLASSPATH:$FWDIR/build/classes"
|
||||
CLASSPATH+=:$FWDIR/conf
|
||||
CLASSPATH+=:$FWDIR/third_party/mesos.jar
|
||||
CLASSPATH+=:$FWDIR/third_party/asm-3.2/lib/all/asm-all-3.2.jar
|
||||
CLASSPATH+=:$FWDIR/third_party/colt.jar
|
||||
CLASSPATH+=:$FWDIR/third_party/guava-r06/guava-r06.jar
|
||||
CLASSPATH+=:$FWDIR/third_party/hadoop-0.20.0/hadoop-0.20.0-core.jar
|
||||
CLASSPATH+=:$FWDIR/third_party/scalatest-1.2/scalatest-1.2.jar
|
||||
CLASSPATH+=:$FWDIR/third_party/scalacheck_2.8.0-1.7.jar
|
||||
CLASSPATH+=:$FWDIR/third_party/jetty-7.1.6.v20100715/jetty-server-7.1.6.v20100715.jar
|
||||
CLASSPATH+=:$FWDIR/third_party/jetty-7.1.6.v20100715/servlet-api-2.5.jar
|
||||
CLASSPATH+=:$FWDIR/third_party/apache-log4j-1.2.16/log4j-1.2.16.jar
|
||||
CLASSPATH+=:$FWDIR/third_party/slf4j-1.6.1/slf4j-api-1.6.1.jar
|
||||
CLASSPATH+=:$FWDIR/third_party/slf4j-1.6.1/slf4j-log4j12-1.6.1.jar
|
||||
for jar in $FWDIR/third_party/hadoop-0.20.0/lib/*.jar; do
|
||||
SPARK_CLASSPATH+=:$jar
|
||||
CLASSPATH+=:$jar
|
||||
done
|
||||
export SPARK_CLASSPATH
|
||||
export CLASSPATH=$SPARK_CLASSPATH # Needed for spark-shell
|
||||
export CLASSPATH # Needed for spark-shell
|
||||
|
||||
if [ -n "$SCALA_HOME" ]; then
|
||||
SCALA=${SCALA_HOME}/bin/scala
|
||||
|
@ -50,4 +53,4 @@ else
|
|||
SCALA=scala
|
||||
fi
|
||||
|
||||
exec $SCALA -cp $SPARK_CLASSPATH $@
|
||||
exec $SCALA -cp $CLASSPATH $@
|
||||
|
|
|
@ -1,5 +1,4 @@
|
|||
#!/bin/sh
|
||||
echo "In spark-executor"
|
||||
FWDIR="`dirname $0`"
|
||||
echo Framework dir: $FWDIR
|
||||
echo "Running spark-executor with framework dir = $FWDIR"
|
||||
exec $FWDIR/run spark.Executor
|
||||
|
|
|
@ -33,7 +33,7 @@ trait BroadcastRecipe {
|
|||
// TODO: Right, now no parallelization between multiple broadcasts
|
||||
@serializable
|
||||
class ChainedStreamingBroadcast[T] (@transient var value_ : T, local: Boolean)
|
||||
extends BroadcastRecipe {
|
||||
extends BroadcastRecipe with Logging {
|
||||
|
||||
def value = value_
|
||||
|
||||
|
@ -92,7 +92,7 @@ class ChainedStreamingBroadcast[T] (@transient var value_ : T, local: Boolean)
|
|||
}
|
||||
|
||||
val time = (System.nanoTime - start) / 1e9
|
||||
println( System.currentTimeMillis + ": " + "Reading Broadcasted variable " + uuid + " took " + time + " s")
|
||||
logInfo("Reading Broadcasted variable " + uuid + " took " + time + " s")
|
||||
}
|
||||
}
|
||||
}
|
||||
|
@ -149,7 +149,7 @@ class ChainedStreamingBroadcast[T] (@transient var value_ : T, local: Boolean)
|
|||
|
||||
@serializable
|
||||
class CentralizedHDFSBroadcast[T](@transient var value_ : T, local: Boolean)
|
||||
extends BroadcastRecipe {
|
||||
extends BroadcastRecipe with Logging {
|
||||
|
||||
def value = value_
|
||||
|
||||
|
@ -179,7 +179,7 @@ class CentralizedHDFSBroadcast[T](@transient var value_ : T, local: Boolean)
|
|||
fileIn.close
|
||||
|
||||
val time = (System.nanoTime - start) / 1e9
|
||||
println( System.currentTimeMillis + ": " + "Reading Broadcasted variable " + uuid + " took " + time + " s")
|
||||
logInfo("Reading Broadcasted variable " + uuid + " took " + time + " s")
|
||||
}
|
||||
}
|
||||
}
|
||||
|
@ -188,7 +188,7 @@ class CentralizedHDFSBroadcast[T](@transient var value_ : T, local: Boolean)
|
|||
@serializable
|
||||
case class SourceInfo (val hostAddress: String, val listenPort: Int,
|
||||
val totalBlocks: Int, val totalBytes: Int, val replicaID: Int)
|
||||
extends Comparable [SourceInfo]{
|
||||
extends Comparable[SourceInfo]{
|
||||
|
||||
var currentLeechers = 0
|
||||
var receptionFailed = false
|
||||
|
@ -231,7 +231,7 @@ private object Broadcast {
|
|||
}
|
||||
}
|
||||
|
||||
private object BroadcastCS {
|
||||
private object BroadcastCS extends Logging {
|
||||
val values = new MapMaker ().softValues ().makeMap[UUID, Any]
|
||||
// val valueInfos = new MapMaker ().softValues ().makeMap[UUID, Any]
|
||||
|
||||
|
@ -286,15 +286,15 @@ private object BroadcastCS {
|
|||
guideMR = new GuideMultipleRequests
|
||||
guideMR.setDaemon (true)
|
||||
guideMR.start
|
||||
println (System.currentTimeMillis + ": " + "GuideMultipleRequests started")
|
||||
logInfo("GuideMultipleRequests started")
|
||||
}
|
||||
serveMR = new ServeMultipleRequests
|
||||
serveMR.setDaemon (true)
|
||||
serveMR.start
|
||||
|
||||
println (System.currentTimeMillis + ": " + "ServeMultipleRequests started")
|
||||
logInfo("ServeMultipleRequests started")
|
||||
|
||||
println (System.currentTimeMillis + ": " + "BroadcastCS object has been initialized")
|
||||
logInfo("BroadcastCS object has been initialized")
|
||||
|
||||
initialized = true
|
||||
}
|
||||
|
@ -352,7 +352,7 @@ private object BroadcastCS {
|
|||
// Connect to Master and send this worker's Information
|
||||
val clientSocketToMaster =
|
||||
new Socket(BroadcastCS.masterHostAddress, BroadcastCS.masterListenPort)
|
||||
println (System.currentTimeMillis + ": " + "Connected to Master's guiding object")
|
||||
logInfo("Connected to Master's guiding object")
|
||||
// TODO: Guiding object connection is reusable
|
||||
val oisMaster =
|
||||
new ObjectInputStream (clientSocketToMaster.getInputStream)
|
||||
|
@ -371,11 +371,11 @@ private object BroadcastCS {
|
|||
}
|
||||
totalBytes = sourceInfo.totalBytes
|
||||
|
||||
println (System.currentTimeMillis + ": " + "Received SourceInfo from Master:" + sourceInfo + " My Port: " + listenPort)
|
||||
logInfo("Received SourceInfo from Master:" + sourceInfo + " My Port: " + listenPort)
|
||||
|
||||
retByteArray = receiveSingleTransmission (sourceInfo)
|
||||
|
||||
println (System.currentTimeMillis + ": " + "I got this from receiveSingleTransmission: " + retByteArray)
|
||||
logInfo("I got this from receiveSingleTransmission: " + retByteArray)
|
||||
|
||||
// TODO: Update sourceInfo to add error notifactions for Master
|
||||
if (retByteArray == null) { sourceInfo.receptionFailed = true }
|
||||
|
@ -414,8 +414,8 @@ private object BroadcastCS {
|
|||
oisSource =
|
||||
new ObjectInputStream (clientSocketToSource.getInputStream)
|
||||
|
||||
println (System.currentTimeMillis + ": " + "Inside receiveSingleTransmission")
|
||||
println (System.currentTimeMillis + ": " + "totalBlocks: "+ totalBlocks + " " + "hasBlocks: " + hasBlocks)
|
||||
logInfo("Inside receiveSingleTransmission")
|
||||
logInfo("totalBlocks: "+ totalBlocks + " " + "hasBlocks: " + hasBlocks)
|
||||
retByteArray = new Array[Byte] (totalBytes)
|
||||
for (i <- 0 until totalBlocks) {
|
||||
val bcBlock = oisSource.readObject.asInstanceOf[BroadcastBlock]
|
||||
|
@ -426,14 +426,14 @@ private object BroadcastCS {
|
|||
hasBlocksLock.synchronized {
|
||||
hasBlocksLock.notifyAll
|
||||
}
|
||||
println (System.currentTimeMillis + ": " + "Received block: " + i + " " + bcBlock)
|
||||
logInfo("Received block: " + i + " " + bcBlock)
|
||||
}
|
||||
assert (hasBlocks == totalBlocks)
|
||||
println (System.currentTimeMillis + ": " + "After the receive loop")
|
||||
logInfo("After the receive loop")
|
||||
} catch {
|
||||
case e: Exception => {
|
||||
retByteArray = null
|
||||
println (System.currentTimeMillis + ": " + "receiveSingleTransmission had a " + e)
|
||||
logInfo("receiveSingleTransmission had a " + e)
|
||||
}
|
||||
} finally {
|
||||
if (oisSource != null) { oisSource.close }
|
||||
|
@ -446,13 +446,13 @@ private object BroadcastCS {
|
|||
return retByteArray
|
||||
}
|
||||
|
||||
class TrackMultipleValues extends Thread {
|
||||
class TrackMultipleValues extends Thread with Logging {
|
||||
override def run = {
|
||||
var threadPool = Executors.newCachedThreadPool
|
||||
var serverSocket: ServerSocket = null
|
||||
|
||||
serverSocket = new ServerSocket (BroadcastCS.masterListenPort)
|
||||
println (System.currentTimeMillis + ": " + "TrackMultipleVariables" + serverSocket + " " + listenPort)
|
||||
logInfo("TrackMultipleVariables" + serverSocket + " " + listenPort)
|
||||
|
||||
var keepAccepting = true
|
||||
try {
|
||||
|
@ -463,11 +463,11 @@ private object BroadcastCS {
|
|||
clientSocket = serverSocket.accept
|
||||
} catch {
|
||||
case e: Exception => {
|
||||
println ("TrackMultipleValues Timeout. Stopping listening...")
|
||||
logInfo("TrackMultipleValues Timeout. Stopping listening...")
|
||||
keepAccepting = false
|
||||
}
|
||||
}
|
||||
println (System.currentTimeMillis + ": " + "TrackMultipleValues:Got new request:" + clientSocket)
|
||||
logInfo("TrackMultipleValues:Got new request:" + clientSocket)
|
||||
if (clientSocket != null) {
|
||||
try {
|
||||
threadPool.execute (new Runnable {
|
||||
|
@ -506,14 +506,14 @@ private object BroadcastCS {
|
|||
|
||||
}
|
||||
|
||||
class GuideMultipleRequests extends Thread {
|
||||
class GuideMultipleRequests extends Thread with Logging {
|
||||
override def run = {
|
||||
var threadPool = Executors.newCachedThreadPool
|
||||
var serverSocket: ServerSocket = null
|
||||
|
||||
serverSocket = new ServerSocket (BroadcastCS.masterListenPort)
|
||||
// listenPort = BroadcastCS.masterListenPort
|
||||
println (System.currentTimeMillis + ": " + "GuideMultipleRequests" + serverSocket + " " + listenPort)
|
||||
logInfo("GuideMultipleRequests" + serverSocket + " " + listenPort)
|
||||
|
||||
var keepAccepting = true
|
||||
try {
|
||||
|
@ -524,12 +524,12 @@ private object BroadcastCS {
|
|||
clientSocket = serverSocket.accept
|
||||
} catch {
|
||||
case e: Exception => {
|
||||
println ("GuideMultipleRequests Timeout. Stopping listening...")
|
||||
logInfo("GuideMultipleRequests Timeout. Stopping listening...")
|
||||
keepAccepting = false
|
||||
}
|
||||
}
|
||||
if (clientSocket != null) {
|
||||
println (System.currentTimeMillis + ": " + "Guide:Accepted new client connection:" + clientSocket)
|
||||
logInfo("Guide:Accepted new client connection:" + clientSocket)
|
||||
try {
|
||||
threadPool.execute (new GuideSingleRequest (clientSocket))
|
||||
} catch {
|
||||
|
@ -543,7 +543,8 @@ private object BroadcastCS {
|
|||
}
|
||||
}
|
||||
|
||||
class GuideSingleRequest (val clientSocket: Socket) extends Runnable {
|
||||
class GuideSingleRequest (val clientSocket: Socket)
|
||||
extends Runnable with Logging {
|
||||
private val oos = new ObjectOutputStream (clientSocket.getOutputStream)
|
||||
private val ois = new ObjectInputStream (clientSocket.getInputStream)
|
||||
|
||||
|
@ -552,21 +553,21 @@ private object BroadcastCS {
|
|||
|
||||
def run = {
|
||||
try {
|
||||
println (System.currentTimeMillis + ": " + "new GuideSingleRequest is running")
|
||||
logInfo("new GuideSingleRequest is running")
|
||||
// Connecting worker is sending in its hostAddress and listenPort it will
|
||||
// be listening to. ReplicaID is 0 and other fields are invalid (-1)
|
||||
var sourceInfo = ois.readObject.asInstanceOf[SourceInfo]
|
||||
|
||||
// Select a suitable source and send it back to the worker
|
||||
selectedSourceInfo = selectSuitableSource (sourceInfo)
|
||||
println (System.currentTimeMillis + ": " + "Sending selectedSourceInfo:" + selectedSourceInfo)
|
||||
logInfo("Sending selectedSourceInfo:" + selectedSourceInfo)
|
||||
oos.writeObject (selectedSourceInfo)
|
||||
oos.flush
|
||||
|
||||
// Add this new (if it can finish) source to the PQ of sources
|
||||
thisWorkerInfo = new SourceInfo(sourceInfo.hostAddress,
|
||||
sourceInfo.listenPort, totalBlocks, totalBytes, 0)
|
||||
println (System.currentTimeMillis + ": " + "Adding possible new source to pqOfSources: " + thisWorkerInfo)
|
||||
logInfo("Adding possible new source to pqOfSources: " + thisWorkerInfo)
|
||||
pqOfSources.synchronized {
|
||||
pqOfSources.add (thisWorkerInfo)
|
||||
}
|
||||
|
@ -642,14 +643,14 @@ private object BroadcastCS {
|
|||
}
|
||||
}
|
||||
|
||||
class ServeMultipleRequests extends Thread {
|
||||
class ServeMultipleRequests extends Thread with Logging {
|
||||
override def run = {
|
||||
var threadPool = Executors.newCachedThreadPool
|
||||
var serverSocket: ServerSocket = null
|
||||
|
||||
serverSocket = new ServerSocket (0)
|
||||
listenPort = serverSocket.getLocalPort
|
||||
println (System.currentTimeMillis + ": " + "ServeMultipleRequests" + serverSocket + " " + listenPort)
|
||||
logInfo("ServeMultipleRequests" + serverSocket + " " + listenPort)
|
||||
|
||||
listenPortLock.synchronized {
|
||||
listenPortLock.notifyAll
|
||||
|
@ -664,12 +665,12 @@ private object BroadcastCS {
|
|||
clientSocket = serverSocket.accept
|
||||
} catch {
|
||||
case e: Exception => {
|
||||
println ("ServeMultipleRequests Timeout. Stopping listening...")
|
||||
logInfo("ServeMultipleRequests Timeout. Stopping listening...")
|
||||
keepAccepting = false
|
||||
}
|
||||
}
|
||||
if (clientSocket != null) {
|
||||
println (System.currentTimeMillis + ": " + "Serve:Accepted new client connection:" + clientSocket)
|
||||
logInfo("Serve:Accepted new client connection:" + clientSocket)
|
||||
try {
|
||||
threadPool.execute (new ServeSingleRequest (clientSocket))
|
||||
} catch {
|
||||
|
@ -683,23 +684,24 @@ private object BroadcastCS {
|
|||
}
|
||||
}
|
||||
|
||||
class ServeSingleRequest (val clientSocket: Socket) extends Runnable {
|
||||
class ServeSingleRequest (val clientSocket: Socket)
|
||||
extends Runnable with Logging {
|
||||
private val oos = new ObjectOutputStream (clientSocket.getOutputStream)
|
||||
private val ois = new ObjectInputStream (clientSocket.getInputStream)
|
||||
|
||||
def run = {
|
||||
try {
|
||||
println (System.currentTimeMillis + ": " + "new ServeSingleRequest is running")
|
||||
logInfo("new ServeSingleRequest is running")
|
||||
sendObject
|
||||
} catch {
|
||||
// TODO: Need to add better exception handling here
|
||||
// If something went wrong, e.g., the worker at the other end died etc.
|
||||
// then close everything up
|
||||
case e: Exception => {
|
||||
println (System.currentTimeMillis + ": " + "ServeSingleRequest had a " + e)
|
||||
logInfo("ServeSingleRequest had a " + e)
|
||||
}
|
||||
} finally {
|
||||
println (System.currentTimeMillis + ": " + "ServeSingleRequest is closing streams and sockets")
|
||||
logInfo("ServeSingleRequest is closing streams and sockets")
|
||||
ois.close
|
||||
oos.close
|
||||
clientSocket.close
|
||||
|
@ -726,7 +728,7 @@ private object BroadcastCS {
|
|||
} catch {
|
||||
case e: Exception => { }
|
||||
}
|
||||
println (System.currentTimeMillis + ": " + "Send block: " + i + " " + arrayOfBlocks(i))
|
||||
logInfo("Send block: " + i + " " + arrayOfBlocks(i))
|
||||
}
|
||||
}
|
||||
}
|
||||
|
@ -734,7 +736,7 @@ private object BroadcastCS {
|
|||
}
|
||||
}
|
||||
|
||||
private object BroadcastCH {
|
||||
private object BroadcastCH extends Logging {
|
||||
val values = new MapMaker ().softValues ().makeMap[UUID, Any]
|
||||
|
||||
private var initialized = false
|
||||
|
|
|
@ -8,7 +8,7 @@ import org.objectweb.asm.commons.EmptyVisitor
|
|||
import org.objectweb.asm.Opcodes._
|
||||
|
||||
|
||||
object ClosureCleaner {
|
||||
object ClosureCleaner extends Logging {
|
||||
private def getClassReader(cls: Class[_]): ClassReader = {
|
||||
new ClassReader(cls.getResourceAsStream(
|
||||
cls.getName.replaceFirst("^.*\\.", "") + ".class"))
|
||||
|
@ -72,13 +72,13 @@ object ClosureCleaner {
|
|||
val field = cls.getDeclaredField(fieldName)
|
||||
field.setAccessible(true)
|
||||
val value = field.get(obj)
|
||||
//println("1: Setting " + fieldName + " on " + cls + " to " + value);
|
||||
//logInfo("1: Setting " + fieldName + " on " + cls + " to " + value);
|
||||
field.set(outer, value)
|
||||
}
|
||||
}
|
||||
|
||||
if (outer != null) {
|
||||
//println("2: Setting $outer on " + func.getClass + " to " + outer);
|
||||
//logInfo("2: Setting $outer on " + func.getClass + " to " + outer);
|
||||
val field = func.getClass.getDeclaredField("$outer")
|
||||
field.setAccessible(true)
|
||||
field.set(func, outer)
|
||||
|
@ -101,7 +101,7 @@ object ClosureCleaner {
|
|||
val newCtor = rf.newConstructorForSerialization(cls, parentCtor)
|
||||
val obj = newCtor.newInstance().asInstanceOf[AnyRef];
|
||||
if (outer != null) {
|
||||
//println("3: Setting $outer on " + cls + " to " + outer);
|
||||
//logInfo("3: Setting $outer on " + cls + " to " + outer);
|
||||
val field = cls.getDeclaredField("$outer")
|
||||
field.setAccessible(true)
|
||||
field.set(obj, outer)
|
||||
|
|
|
@ -5,10 +5,14 @@ import java.util.concurrent.{Executors, ExecutorService}
|
|||
import mesos.{ExecutorArgs, ExecutorDriver, MesosExecutorDriver}
|
||||
import mesos.{TaskDescription, TaskState, TaskStatus}
|
||||
|
||||
object Executor {
|
||||
/**
|
||||
* The Mesos executor for Spark.
|
||||
*/
|
||||
object Executor extends Logging {
|
||||
def main(args: Array[String]) {
|
||||
System.loadLibrary("mesos")
|
||||
|
||||
// Create a new Executor implementation that will run our tasks
|
||||
val exec = new mesos.Executor() {
|
||||
var classLoader: ClassLoader = null
|
||||
var threadPool: ExecutorService = null
|
||||
|
@ -27,7 +31,7 @@ object Executor {
|
|||
classLoader = this.getClass.getClassLoader
|
||||
val classUri = System.getProperty("spark.repl.class.uri")
|
||||
if (classUri != null) {
|
||||
println("Using REPL class URI: " + classUri)
|
||||
logInfo("Using REPL class URI: " + classUri)
|
||||
classLoader = new repl.ExecutorClassLoader(classUri, classLoader)
|
||||
}
|
||||
Thread.currentThread.setContextClassLoader(classLoader)
|
||||
|
@ -43,7 +47,7 @@ object Executor {
|
|||
val arg = desc.getArg
|
||||
threadPool.execute(new Runnable() {
|
||||
def run() = {
|
||||
println("Running task ID " + taskId)
|
||||
logInfo("Running task ID " + taskId)
|
||||
try {
|
||||
Accumulators.clear
|
||||
val task = Utils.deserialize[Task[Any]](arg, classLoader)
|
||||
|
@ -52,12 +56,11 @@ object Executor {
|
|||
val result = new TaskResult(value, accumUpdates)
|
||||
d.sendStatusUpdate(new TaskStatus(
|
||||
taskId, TaskState.TASK_FINISHED, Utils.serialize(result)))
|
||||
println("Finished task ID " + taskId)
|
||||
logInfo("Finished task ID " + taskId)
|
||||
} catch {
|
||||
case e: Exception => {
|
||||
// TODO: Handle errors in tasks less dramatically
|
||||
System.err.println("Exception in task ID " + taskId + ":")
|
||||
e.printStackTrace
|
||||
logError("Exception in task ID " + taskId, e)
|
||||
System.exit(1)
|
||||
}
|
||||
}
|
||||
|
@ -66,6 +69,7 @@ object Executor {
|
|||
}
|
||||
}
|
||||
|
||||
// Start it running and connect it to the slave
|
||||
new MesosExecutorDriver(exec).run()
|
||||
}
|
||||
}
|
||||
|
|
|
@ -4,8 +4,10 @@ import java.util.concurrent._
|
|||
|
||||
import scala.collection.mutable.Map
|
||||
|
||||
// A simple Scheduler implementation that runs tasks locally in a thread pool.
|
||||
private class LocalScheduler(threads: Int) extends Scheduler {
|
||||
/**
|
||||
* A simple Scheduler implementation that runs tasks locally in a thread pool.
|
||||
*/
|
||||
private class LocalScheduler(threads: Int) extends Scheduler with Logging {
|
||||
var threadPool: ExecutorService =
|
||||
Executors.newFixedThreadPool(threads, DaemonThreadFactory)
|
||||
|
||||
|
@ -20,25 +22,24 @@ private class LocalScheduler(threads: Int) extends Scheduler {
|
|||
for (i <- 0 until tasks.length) {
|
||||
futures(i) = threadPool.submit(new Callable[TaskResult[T]]() {
|
||||
def call(): TaskResult[T] = {
|
||||
println("Running task " + i)
|
||||
logInfo("Running task " + i)
|
||||
try {
|
||||
// Serialize and deserialize the task so that accumulators are
|
||||
// changed to thread-local ones; this adds a bit of unnecessary
|
||||
// overhead but matches how the Nexus Executor works
|
||||
Accumulators.clear
|
||||
val bytes = Utils.serialize(tasks(i))
|
||||
println("Size of task " + i + " is " + bytes.size + " bytes")
|
||||
logInfo("Size of task " + i + " is " + bytes.size + " bytes")
|
||||
val task = Utils.deserialize[Task[T]](
|
||||
bytes, currentThread.getContextClassLoader)
|
||||
val value = task.run
|
||||
val accumUpdates = Accumulators.values
|
||||
println("Finished task " + i)
|
||||
logInfo("Finished task " + i)
|
||||
new TaskResult[T](value, accumUpdates)
|
||||
} catch {
|
||||
case e: Exception => {
|
||||
// TODO: Do something nicer here
|
||||
System.err.println("Exception in task " + i + ":")
|
||||
e.printStackTrace()
|
||||
logError("Exception in task " + i, e)
|
||||
System.exit(1)
|
||||
null
|
||||
}
|
||||
|
@ -58,7 +59,10 @@ private class LocalScheduler(threads: Int) extends Scheduler {
|
|||
override def numCores() = threads
|
||||
}
|
||||
|
||||
// A ThreadFactory that creates daemon threads
|
||||
|
||||
/**
|
||||
* A ThreadFactory that creates daemon threads
|
||||
*/
|
||||
private object DaemonThreadFactory extends ThreadFactory {
|
||||
override def newThread(r: Runnable): Thread = {
|
||||
val t = new Thread(r);
|
||||
|
|
49
src/scala/spark/Logging.scala
Normal file
49
src/scala/spark/Logging.scala
Normal file
|
@ -0,0 +1,49 @@
|
|||
package spark
|
||||
|
||||
import org.slf4j.Logger
|
||||
import org.slf4j.LoggerFactory
|
||||
|
||||
/**
|
||||
* Utility trait for classes that want to log data. Creates a SLF4J logger
|
||||
* for the class and allows logging messages at different levels using
|
||||
* methods that only evaluate parameters lazily if the log level is enabled.
|
||||
*/
|
||||
trait Logging {
|
||||
// Make the log field transient so that objects with Logging can
|
||||
// be serialized and used on another machine
|
||||
@transient private var log_ : Logger = null
|
||||
|
||||
// Method to get or create the logger for this object
|
||||
def log: Logger = {
|
||||
if (log_ == null) {
|
||||
var className = this.getClass().getName()
|
||||
// Ignore trailing $'s in the class names for Scala objects
|
||||
if (className.endsWith("$"))
|
||||
className = className.substring(0, className.length - 1)
|
||||
log_ = LoggerFactory.getLogger(className)
|
||||
}
|
||||
return log_
|
||||
}
|
||||
|
||||
// Log methods that take only a String
|
||||
def logInfo(msg: => String) = if (log.isInfoEnabled) log.info(msg)
|
||||
|
||||
def logDebug(msg: => String) = if (log.isDebugEnabled) log.debug(msg)
|
||||
|
||||
def logWarning(msg: => String) = if (log.isWarnEnabled) log.warn(msg)
|
||||
|
||||
def logError(msg: => String) = if (log.isErrorEnabled) log.error(msg)
|
||||
|
||||
// Log methods that take Throwables (Exceptions/Errors) too
|
||||
def logInfo(msg: => String, throwable: Throwable) =
|
||||
if (log.isInfoEnabled) log.info(msg)
|
||||
|
||||
def logDebug(msg: => String, throwable: Throwable) =
|
||||
if (log.isDebugEnabled) log.debug(msg)
|
||||
|
||||
def logWarning(msg: => String, throwable: Throwable) =
|
||||
if (log.isWarnEnabled) log.warn(msg, throwable)
|
||||
|
||||
def logError(msg: => String, throwable: Throwable) =
|
||||
if (log.isErrorEnabled) log.error(msg, throwable)
|
||||
}
|
|
@ -23,7 +23,7 @@ import mesos._
|
|||
// all the offers to the ParallelOperation and have it load-balance.
|
||||
private class MesosScheduler(
|
||||
master: String, frameworkName: String, execArg: Array[Byte])
|
||||
extends NScheduler with spark.Scheduler
|
||||
extends NScheduler with spark.Scheduler with Logging
|
||||
{
|
||||
// Lock used by runTasks to ensure only one thread can be in it
|
||||
val runTasksMutex = new Object()
|
||||
|
@ -101,7 +101,7 @@ extends NScheduler with spark.Scheduler
|
|||
}
|
||||
|
||||
override def registered(d: SchedulerDriver, frameworkId: String) {
|
||||
println("Registered as framework ID " + frameworkId)
|
||||
logInfo("Registered as framework ID " + frameworkId)
|
||||
registeredLock.synchronized {
|
||||
isRegistered = true
|
||||
registeredLock.notifyAll()
|
||||
|
@ -137,7 +137,7 @@ extends NScheduler with spark.Scheduler
|
|||
case None => {}
|
||||
}
|
||||
} catch {
|
||||
case e: Exception => e.printStackTrace
|
||||
case e: Exception => logError("Exception in resourceOffer", e)
|
||||
}
|
||||
}
|
||||
}
|
||||
|
@ -157,11 +157,11 @@ extends NScheduler with spark.Scheduler
|
|||
activeOps(opId).statusUpdate(status)
|
||||
}
|
||||
case None =>
|
||||
println("TID " + status.getTaskId + " already finished")
|
||||
logInfo("TID " + status.getTaskId + " already finished")
|
||||
}
|
||||
|
||||
} catch {
|
||||
case e: Exception => e.printStackTrace
|
||||
case e: Exception => logError("Exception in statusUpdate", e)
|
||||
}
|
||||
}
|
||||
}
|
||||
|
@ -173,12 +173,11 @@ extends NScheduler with spark.Scheduler
|
|||
try {
|
||||
activeOp.error(code, message)
|
||||
} catch {
|
||||
case e: Exception => e.printStackTrace
|
||||
case e: Exception => logError("Exception in error callback", e)
|
||||
}
|
||||
}
|
||||
} else {
|
||||
val msg = "Mesos error: %s (error code: %d)".format(message, code)
|
||||
System.err.println(msg)
|
||||
logError("Mesos error: %s (error code: %d)".format(message, code))
|
||||
System.exit(1)
|
||||
}
|
||||
}
|
||||
|
@ -205,7 +204,7 @@ trait ParallelOperation {
|
|||
|
||||
class SimpleParallelOperation[T: ClassManifest](
|
||||
sched: MesosScheduler, tasks: Array[Task[T]], val opId: Int)
|
||||
extends ParallelOperation
|
||||
extends ParallelOperation with Logging
|
||||
{
|
||||
// Maximum time to wait to run a task in a preferred location (in ms)
|
||||
val LOCALITY_WAIT = System.getProperty("spark.locality.wait", "3000").toLong
|
||||
|
@ -261,9 +260,11 @@ extends ParallelOperation
|
|||
val taskId = sched.newTaskId()
|
||||
sched.taskIdToOpId(taskId) = opId
|
||||
tidToIndex(taskId) = i
|
||||
printf("Starting task %d as opId %d, TID %s on slave %s: %s (%s)",
|
||||
i, opId, taskId, offer.getSlaveId, offer.getHost,
|
||||
if(checkPref) "preferred" else "non-preferred")
|
||||
val preferred = if(checkPref) "preferred" else "non-preferred"
|
||||
val message =
|
||||
"Starting task %d as opId %d, TID %s on slave %s: %s (%s)".format(
|
||||
i, opId, taskId, offer.getSlaveId, offer.getHost, preferred)
|
||||
logInfo(message)
|
||||
tasks(i).markStarted(offer)
|
||||
launched(i) = true
|
||||
tasksLaunched += 1
|
||||
|
@ -273,7 +274,7 @@ extends ParallelOperation
|
|||
params.put("cpus", "" + desiredCpus)
|
||||
params.put("mem", "" + desiredMem)
|
||||
val serializedTask = Utils.serialize(tasks(i))
|
||||
println("... Serialized size: " + serializedTask.size)
|
||||
//logInfo("Serialized size: " + serializedTask.size)
|
||||
return Some(new TaskDescription(taskId, offer.getSlaveId,
|
||||
"task_" + taskId, params, serializedTask))
|
||||
}
|
||||
|
@ -298,36 +299,39 @@ extends ParallelOperation
|
|||
|
||||
def taskFinished(status: TaskStatus) {
|
||||
val tid = status.getTaskId
|
||||
print("Finished opId " + opId + " TID " + tid)
|
||||
if (!finished(tidToIndex(tid))) {
|
||||
val index = tidToIndex(tid)
|
||||
if (!finished(index)) {
|
||||
tasksFinished += 1
|
||||
logInfo("Finished opId %d TID %d (progress: %d/%d)".format(
|
||||
opId, tid, tasksFinished, numTasks))
|
||||
// Deserialize task result
|
||||
val result = Utils.deserialize[TaskResult[T]](status.getData)
|
||||
results(tidToIndex(tid)) = result.value
|
||||
results(index) = result.value
|
||||
// Update accumulators
|
||||
Accumulators.add(callingThread, result.accumUpdates)
|
||||
// Mark finished and stop if we've finished all the tasks
|
||||
finished(tidToIndex(tid)) = true
|
||||
finished(index) = true
|
||||
// Remove TID -> opId mapping from sched
|
||||
sched.taskIdToOpId.remove(tid)
|
||||
tasksFinished += 1
|
||||
|
||||
println(", finished " + tasksFinished + "/" + numTasks)
|
||||
if (tasksFinished == numTasks)
|
||||
setAllFinished()
|
||||
} else {
|
||||
printf("... Task %s had already finished, so ignoring it\n", tidToIndex(tid))
|
||||
logInfo("Ignoring task-finished event for TID " + tid +
|
||||
" because task " + index + " is already finished")
|
||||
}
|
||||
}
|
||||
|
||||
def taskLost(status: TaskStatus) {
|
||||
val tid = status.getTaskId
|
||||
println("Lost opId " + opId + " TID " + tid)
|
||||
if (!finished(tidToIndex(tid))) {
|
||||
launched(tidToIndex(tid)) = false
|
||||
val index = tidToIndex(tid)
|
||||
if (!finished(index)) {
|
||||
logInfo("Lost opId " + opId + " TID " + tid)
|
||||
launched(index) = false
|
||||
sched.taskIdToOpId.remove(tid)
|
||||
tasksLaunched -= 1
|
||||
} else {
|
||||
printf("Task %s had already finished, so ignoring it\n", tidToIndex(tid))
|
||||
logInfo("Ignoring task-lost event for TID " + tid +
|
||||
" because task " + index + " is already finished")
|
||||
}
|
||||
}
|
||||
|
||||
|
|
|
@ -93,27 +93,27 @@ extends Task[U] {
|
|||
|
||||
class ForeachTask[T: ClassManifest](
|
||||
rdd: RDD[T], split: Split, func: T => Unit)
|
||||
extends RDDTask[Unit, T](rdd, split) {
|
||||
extends RDDTask[Unit, T](rdd, split) with Logging {
|
||||
override def run() {
|
||||
println("Processing " + split)
|
||||
logInfo("Processing " + split)
|
||||
rdd.iterator(split).foreach(func)
|
||||
}
|
||||
}
|
||||
|
||||
class CollectTask[T](
|
||||
rdd: RDD[T], split: Split)(implicit m: ClassManifest[T])
|
||||
extends RDDTask[Array[T], T](rdd, split) {
|
||||
extends RDDTask[Array[T], T](rdd, split) with Logging {
|
||||
override def run(): Array[T] = {
|
||||
println("Processing " + split)
|
||||
logInfo("Processing " + split)
|
||||
rdd.iterator(split).toArray(m)
|
||||
}
|
||||
}
|
||||
|
||||
class ReduceTask[T: ClassManifest](
|
||||
rdd: RDD[T], split: Split, f: (T, T) => T)
|
||||
extends RDDTask[Option[T], T](rdd, split) {
|
||||
extends RDDTask[Option[T], T](rdd, split) with Logging {
|
||||
override def run(): Option[T] = {
|
||||
println("Processing " + split)
|
||||
logInfo("Processing " + split)
|
||||
val iter = rdd.iterator(split)
|
||||
if (iter.hasNext)
|
||||
Some(iter.reduceLeft(f))
|
||||
|
@ -183,7 +183,7 @@ extends RDD[T](prev.sparkContext) {
|
|||
|
||||
class CachedRDD[T](
|
||||
prev: RDD[T])(implicit m: ClassManifest[T])
|
||||
extends RDD[T](prev.sparkContext) {
|
||||
extends RDD[T](prev.sparkContext) with Logging {
|
||||
val id = CachedRDD.newId()
|
||||
@transient val cacheLocs = Map[Split, List[String]]()
|
||||
|
||||
|
@ -217,7 +217,7 @@ extends RDD[T](prev.sparkContext) {
|
|||
}
|
||||
}
|
||||
// If we got here, we have to load the split
|
||||
println("Loading and caching " + split)
|
||||
logInfo("Loading and caching " + split)
|
||||
val array = prev.iterator(split).toArray(m)
|
||||
cache.put(key, array)
|
||||
loading.synchronized {
|
||||
|
|
|
@ -6,7 +6,7 @@ import java.util.UUID
|
|||
import scala.collection.mutable.ArrayBuffer
|
||||
import scala.actors.Actor._
|
||||
|
||||
class SparkContext(master: String, frameworkName: String) {
|
||||
class SparkContext(master: String, frameworkName: String) extends Logging {
|
||||
Broadcast.initialize(true)
|
||||
|
||||
def parallelize[T: ClassManifest](seq: Seq[T], numSlices: Int) =
|
||||
|
@ -56,10 +56,10 @@ class SparkContext(master: String, frameworkName: String) {
|
|||
|
||||
private[spark] def runTaskObjects[T: ClassManifest](tasks: Seq[Task[T]])
|
||||
: Array[T] = {
|
||||
println("Running " + tasks.length + " tasks in parallel")
|
||||
logInfo("Running " + tasks.length + " tasks in parallel")
|
||||
val start = System.nanoTime
|
||||
val result = scheduler.runTasks(tasks.toArray)
|
||||
println("Tasks finished in " + (System.nanoTime - start) / 1e9 + " s")
|
||||
logInfo("Tasks finished in " + (System.nanoTime - start) / 1e9 + " s")
|
||||
return result
|
||||
}
|
||||
|
||||
|
|
|
@ -8,6 +8,8 @@ import org.eclipse.jetty.server.handler.DefaultHandler
|
|||
import org.eclipse.jetty.server.handler.HandlerList
|
||||
import org.eclipse.jetty.server.handler.ResourceHandler
|
||||
|
||||
import spark.Logging
|
||||
|
||||
|
||||
/**
|
||||
* Exception type thrown by ClassServer when it is in the wrong state
|
||||
|
@ -21,7 +23,7 @@ class ServerStateException(message: String) extends Exception(message)
|
|||
* class files created as the user types in lines of code. This is just a
|
||||
* wrapper around a Jetty embedded HTTP server.
|
||||
*/
|
||||
class ClassServer(classDir: File) {
|
||||
class ClassServer(classDir: File) extends Logging {
|
||||
private var server: Server = null
|
||||
private var port: Int = -1
|
||||
|
||||
|
@ -37,6 +39,7 @@ class ClassServer(classDir: File) {
|
|||
server.setHandler(handlerList)
|
||||
server.start()
|
||||
port = server.getConnectors()(0).getLocalPort()
|
||||
logDebug("ClassServer started at " + uri)
|
||||
}
|
||||
}
|
||||
|
||||
|
|
|
@ -31,7 +31,6 @@ extends ClassLoader(parent) {
|
|||
|
||||
override def findClass(name: String): Class[_] = {
|
||||
try {
|
||||
//println("repl.ExecutorClassLoader resolving " + name)
|
||||
val pathInDirectory = name.replace('.', '/') + ".class"
|
||||
val inputStream = {
|
||||
if (fileSystem != null)
|
||||
|
@ -92,7 +91,6 @@ extends ClassAdapter(cv) {
|
|||
// This is the constructor, time to clean it; just output some new
|
||||
// instructions to mv that create the object and set the static MODULE$
|
||||
// field in the class to point to it, but do nothing otherwise.
|
||||
//println("Cleaning constructor of " + className)
|
||||
mv.visitCode()
|
||||
mv.visitVarInsn(ALOAD, 0) // load this
|
||||
mv.visitMethodInsn(INVOKESPECIAL, "java/lang/Object", "<init>", "()V")
|
||||
|
|
|
@ -90,6 +90,7 @@ class SparkInterpreter(val settings: Settings, out: PrintWriter) {
|
|||
|
||||
val SPARK_DEBUG_REPL: Boolean = (System.getenv("SPARK_DEBUG_REPL") == "1")
|
||||
|
||||
/** Local directory to save .class files too */
|
||||
val outputDir = {
|
||||
val rootDir = new File(System.getProperty("spark.repl.classdir",
|
||||
System.getProperty("java.io.tmpdir")))
|
||||
|
@ -108,22 +109,26 @@ class SparkInterpreter(val settings: Settings, out: PrintWriter) {
|
|||
dir = null
|
||||
} catch { case e: IOException => ; }
|
||||
}
|
||||
if (SPARK_DEBUG_REPL)
|
||||
if (SPARK_DEBUG_REPL) {
|
||||
println("Output directory: " + dir)
|
||||
}
|
||||
dir
|
||||
}
|
||||
|
||||
/** directory to save .class files to */
|
||||
/** Scala compiler virtual directory for outputDir */
|
||||
//val virtualDirectory = new VirtualDirectory("(memory)", None)
|
||||
val virtualDirectory = new PlainFile(outputDir)
|
||||
|
||||
/** Jetty server that will serve our classes to worker nodes */
|
||||
val classServer = new ClassServer(outputDir)
|
||||
|
||||
// Start the classServer and remember its URI in a spark system property */
|
||||
// Start the classServer and store its URI in a spark system property
|
||||
// (which will be passed to executors so that they can connect to it)
|
||||
classServer.start()
|
||||
println("ClassServer started, URI = " + classServer.uri)
|
||||
System.setProperty("spark.repl.class.uri", classServer.uri)
|
||||
if (SPARK_DEBUG_REPL) {
|
||||
println("ClassServer started, URI = " + classServer.uri)
|
||||
}
|
||||
|
||||
/** reporter */
|
||||
object reporter extends ConsoleReporter(settings, null, out) {
|
||||
|
|
BIN
third_party/apache-log4j-1.2.16/log4j-1.2.16.jar
vendored
Normal file
BIN
third_party/apache-log4j-1.2.16/log4j-1.2.16.jar
vendored
Normal file
Binary file not shown.
BIN
third_party/hadoop-0.20.0/lib/slf4j-api-1.4.3.jar
vendored
BIN
third_party/hadoop-0.20.0/lib/slf4j-api-1.4.3.jar
vendored
Binary file not shown.
Binary file not shown.
BIN
third_party/slf4j-1.6.1/slf4j-api-1.6.1.jar
vendored
Normal file
BIN
third_party/slf4j-1.6.1/slf4j-api-1.6.1.jar
vendored
Normal file
Binary file not shown.
BIN
third_party/slf4j-1.6.1/slf4j-log4j12-1.6.1.jar
vendored
Normal file
BIN
third_party/slf4j-1.6.1/slf4j-log4j12-1.6.1.jar
vendored
Normal file
Binary file not shown.
Loading…
Reference in a new issue