Fixed a few test issues due to Akka 2.1, as well as SBT memory.

Unfortunately, in Akka 2.1, ActorSystem.awaitTermination hangs for
remote actors, and Akka also leaves a non-daemon Netty thread even when
run in daemon mode. Thus I had to comment out some of the calls to
awaitTermination, and we still have one failing test.
This commit is contained in:
Matei Zaharia 2013-06-08 01:09:24 -07:00
parent dbe2887da7
commit 5b5b5aedbf
4 changed files with 35 additions and 22 deletions

View file

@ -42,7 +42,8 @@ class SparkEnv (
actorSystem.shutdown()
// Unfortunately Akka's awaitTermination doesn't actually wait for the Netty server to shut
// down, but let's call it anyway in case it gets fixed in a later release
actorSystem.awaitTermination()
// UPDATE: In Akka 2.1.x, this hangs if there are remote actors, so we can't call it.
//actorSystem.awaitTermination()
}
}

View file

@ -43,9 +43,13 @@ class LocalSparkCluster(numWorkers: Int, coresPerWorker: Int, memoryPerWorker: I
def stop() {
logInfo("Shutting down local Spark cluster.")
// Stop the workers before the master so they don't get upset that it disconnected
// TODO: In Akka 2.1.x, ActorSystem.awaitTermination hangs when you have remote actors!
// This is unfortunate, but for now we just comment it out.
workerActorSystems.foreach(_.shutdown())
workerActorSystems.foreach(_.awaitTermination())
//workerActorSystems.foreach(_.awaitTermination())
masterActorSystems.foreach(_.shutdown())
masterActorSystems.foreach(_.awaitTermination())
//masterActorSystems.foreach(_.awaitTermination())
masterActorSystems.clear()
workerActorSystems.clear()
}
}

View file

@ -96,12 +96,14 @@ object SparkBuild extends Build {
*/
libraryDependencies ++= Seq(
"org.eclipse.jetty" % "jetty-server" % "7.5.3.v20111011",
"org.scalatest" %% "scalatest" % "1.9.1" % "test",
"org.scalacheck" %% "scalacheck" % "1.10.0" % "test",
"com.novocode" % "junit-interface" % "0.8" % "test",
"org.easymock" % "easymock" % "3.1" % "test"
),
"io.netty" % "netty" % "3.6.6.Final",
"org.eclipse.jetty" % "jetty-server" % "7.5.3.v20111011",
"org.scalatest" %% "scalatest" % "1.9.1" % "test",
"org.scalacheck" %% "scalacheck" % "1.10.0" % "test",
"com.novocode" % "junit-interface" % "0.8" % "test",
"org.easymock" % "easymock" % "3.1" % "test"
),
parallelExecution := false,
/* Workaround for issue #206 (fixed after SBT 0.11.0) */
watchTransitiveSources <<= Defaults.inDependencies[Task[Seq[File]]](watchSources.task,
@ -118,6 +120,9 @@ object SparkBuild extends Build {
val slf4jVersion = "1.6.1"
val excludeJackson = ExclusionRule(organization = "org.codehaus.jackson")
val excludeNetty = ExclusionRule(organization = "org.jboss.netty")
def coreSettings = sharedSettings ++ Seq(
name := "spark-core",
resolvers ++= Seq(
@ -134,24 +139,27 @@ object SparkBuild extends Build {
"org.slf4j" % "slf4j-api" % slf4jVersion,
"org.slf4j" % "slf4j-log4j12" % slf4jVersion,
"com.ning" % "compress-lzf" % "0.8.4",
"org.apache.hadoop" % "hadoop-core" % HADOOP_VERSION,
"org.apache.hadoop" % "hadoop-core" % HADOOP_VERSION excludeAll(excludeNetty, excludeJackson),
"asm" % "asm-all" % "3.3.1",
"com.google.protobuf" % "protobuf-java" % "2.4.1",
"de.javakaffee" % "kryo-serializers" % "0.20",
"com.typesafe.akka" %% "akka-remote" % "2.1.2",
"com.typesafe.akka" %% "akka-slf4j" % "2.1.2",
"com.typesafe.akka" %% "akka-remote" % "2.1.4" excludeAll(excludeNetty),
"com.typesafe.akka" %% "akka-slf4j" % "2.1.4" excludeAll(excludeNetty),
"it.unimi.dsi" % "fastutil" % "6.4.4",
"io.spray" % "spray-can" % "1.1-M7",
"io.spray" % "spray-io" % "1.1-M7",
"io.spray" % "spray-routing" % "1.1-M7",
"io.spray" %% "spray-json" % "1.2.3",
"io.spray" % "spray-can" % "1.1-M7" excludeAll(excludeNetty),
"io.spray" % "spray-io" % "1.1-M7" excludeAll(excludeNetty),
"io.spray" % "spray-routing" % "1.1-M7" excludeAll(excludeNetty),
"io.spray" %% "spray-json" % "1.2.3" excludeAll(excludeNetty),
"colt" % "colt" % "1.2.0",
"org.apache.mesos" % "mesos" % "0.9.0-incubating",
"org.scala-lang" % "scala-actors" % "2.10.1",
"org.scala-lang" % "jline" % "2.10.1",
"org.scala-lang" % "scala-reflect" % "2.10.1"
) ++ (if (HADOOP_MAJOR_VERSION == "2")
Some("org.apache.hadoop" % "hadoop-client" % HADOOP_VERSION) else None).toSeq,
Some("org.apache.hadoop" % "hadoop-client" % HADOOP_VERSION excludeAll(excludeNetty, excludeJackson))
else
None
).toSeq,
unmanagedSourceDirectories in Compile <+= baseDirectory{ _ / ("src/hadoop" + HADOOP_MAJOR_VERSION + "/scala") }
) ++ assemblySettings ++ extraAssemblySettings ++ Twirl.settings
@ -175,10 +183,10 @@ object SparkBuild extends Build {
def streamingSettings = sharedSettings ++ Seq(
name := "spark-streaming",
libraryDependencies ++= Seq(
"org.apache.flume" % "flume-ng-sdk" % "1.2.0" % "compile",
"com.github.sgroschupf" % "zkclient" % "0.1",
"org.twitter4j" % "twitter4j-stream" % "3.0.3",
"com.typesafe.akka" %% "akka-zeromq" % "2.1.2"
"org.apache.flume" % "flume-ng-sdk" % "1.2.0" % "compile" excludeAll(excludeNetty),
"com.github.sgroschupf" % "zkclient" % "0.1",
"org.twitter4j" % "twitter4j-stream" % "3.0.3" excludeAll(excludeNetty),
"com.typesafe.akka" %% "akka-zeromq" % "2.1.4" excludeAll(excludeNetty)
)
) ++ assemblySettings ++ extraAssemblySettings

View file

@ -5,4 +5,4 @@ if [ "$MESOS_HOME" != "" ]; then
fi
export SPARK_HOME=$(cd "$(dirname $0)/.."; pwd)
export SPARK_TESTING=1 # To put test classes on classpath
java -Xmx1200m -XX:MaxPermSize=250m -XX:ReservedCodeCacheSize=128m $EXTRA_ARGS -jar $SPARK_HOME/sbt/sbt-launch-*.jar "$@"
java -Xmx1200m -XX:MaxPermSize=350m -XX:ReservedCodeCacheSize=128m $EXTRA_ARGS -jar $SPARK_HOME/sbt/sbt-launch-*.jar "$@"