Added standalone cluster repl suite

This commit is contained in:
Prashant Sharma 2013-04-15 19:49:40 +05:30
parent f31e41c270
commit 19b0256ae4
3 changed files with 100 additions and 47 deletions

View file

@ -1,50 +1,14 @@
package spark.repl
import java.io._
import java.net.URLClassLoader
import scala.collection.mutable.ArrayBuffer
import scala.collection.JavaConversions._
import java.io.FileWriter
import org.scalatest.FunSuite
import com.google.common.io.Files
class ReplSuite extends FunSuite {
def runInterpreter(master: String, input: String): String = {
val in = new BufferedReader(new StringReader(input + "\n"))
val out = new StringWriter()
val cl = getClass.getClassLoader
var paths = new ArrayBuffer[String]
if (cl.isInstanceOf[URLClassLoader]) {
val urlLoader = cl.asInstanceOf[URLClassLoader]
for (url <- urlLoader.getURLs) {
if (url.getProtocol == "file") {
paths += url.getFile
}
}
}
val interp = new SparkILoop(in, new PrintWriter(out), master)
spark.repl.Main.interp = interp
val separator = System.getProperty("path.separator")
interp.process(Array("-classpath", paths.mkString(separator)))
if (interp != null)
interp.closeInterpreter();
// To avoid Akka rebinding to the same port, since it doesn't unbind immediately on shutdown
System.clearProperty("spark.master.port")
return out.toString
}
class ReplSuite extends FunSuite with ReplSuiteMixin {
def assertContains(message: String, output: String) {
assert(output contains message,
"Interpreter output did not contain '" + message + "':\n" + output)
}
def assertDoesNotContain(message: String, output: String) {
assert(!(output contains message),
"Interpreter output contained '" + message + "':\n" + output)
}
test ("simple foreach with accumulator") {
test("simple foreach with accumulator") {
val output = runInterpreter("local", """
val accum = sc.accumulator(0)
sc.parallelize(1 to 10).foreach(x => accum += x)
@ -55,7 +19,7 @@ class ReplSuite extends FunSuite {
assertContains("res1: Int = 55", output)
}
test ("external vars") {
test("external vars") {
val output = runInterpreter("local", """
var v = 7
sc.parallelize(1 to 10).map(x => v).collect.reduceLeft(_+_)
@ -68,7 +32,7 @@ class ReplSuite extends FunSuite {
assertContains("res1: Int = 100", output)
}
test ("external classes") {
test("external classes") {
val output = runInterpreter("local", """
class C {
def foo = 5
@ -80,7 +44,7 @@ class ReplSuite extends FunSuite {
assertContains("res0: Int = 50", output)
}
test ("external functions") {
test("external functions") {
val output = runInterpreter("local", """
def double(x: Int) = x + x
sc.parallelize(1 to 10).map(x => double(x)).collect.reduceLeft(_+_)
@ -90,7 +54,7 @@ class ReplSuite extends FunSuite {
assertContains("res0: Int = 110", output)
}
test ("external functions that access vars") {
test("external functions that access vars") {
val output = runInterpreter("local", """
var v = 7
def getV() = v
@ -104,7 +68,7 @@ class ReplSuite extends FunSuite {
assertContains("res1: Int = 100", output)
}
test ("broadcast vars") {
test("broadcast vars") {
// Test that the value that a broadcast var had when it was created is used,
// even if that variable is then modified in the driver program
// TODO: This doesn't actually work for arrays when we run in local mode!
@ -121,7 +85,7 @@ class ReplSuite extends FunSuite {
assertContains("res2: Array[Int] = Array(5, 0, 0, 0, 0)", output)
}
test ("interacting with files") {
test("interacting with files") {
val tempDir = Files.createTempDir()
val out = new FileWriter(tempDir + "/input")
out.write("Hello world!\n")
@ -142,7 +106,7 @@ class ReplSuite extends FunSuite {
}
if (System.getenv("MESOS_NATIVE_LIBRARY") != null) {
test ("running on Mesos") {
test("running on Mesos") {
val output = runInterpreter("localquiet", """
var v = 7
def getV() = v
@ -163,4 +127,5 @@ class ReplSuite extends FunSuite {
assertContains("res4: Array[Int] = Array(0, 0, 0, 0, 0)", output)
}
}
}

View file

@ -0,0 +1,56 @@
package spark.repl
import java.io.BufferedReader
import java.io.PrintWriter
import java.io.StringReader
import java.io.StringWriter
import java.net.URLClassLoader
import scala.collection.mutable.ArrayBuffer
import scala.concurrent.ExecutionContext.Implicits.global
import scala.concurrent.future
import spark.deploy.master.Master
import spark.deploy.worker.Worker
trait ReplSuiteMixin {
def setupStandaloneCluster() {
future { Master.main(Array("-i", "127.0.1.2", "-p", "7089")) }
Thread.sleep(2000)
future { Worker.main(Array("spark://127.0.1.2:7089", "--webui-port", "0")) }
}
def runInterpreter(master: String, input: String): String = {
val in = new BufferedReader(new StringReader(input + "\n"))
val out = new StringWriter()
val cl = getClass.getClassLoader
var paths = new ArrayBuffer[String]
if (cl.isInstanceOf[URLClassLoader]) {
val urlLoader = cl.asInstanceOf[URLClassLoader]
for (url <- urlLoader.getURLs) {
if (url.getProtocol == "file") {
paths += url.getFile
}
}
}
val interp = new SparkILoop(in, new PrintWriter(out), master)
spark.repl.Main.interp = interp
val separator = System.getProperty("path.separator")
interp.process(Array("-classpath", paths.mkString(separator)))
if (interp != null)
interp.closeInterpreter();
// To avoid Akka rebinding to the same port, since it doesn't unbind immediately on shutdown
System.clearProperty("spark.master.port")
return out.toString
}
def assertContains(message: String, output: String) {
assert(output contains message,
"Interpreter output did not contain '" + message + "':\n" + output)
}
def assertDoesNotContain(message: String, output: String) {
assert(!(output contains message),
"Interpreter output contained '" + message + "':\n" + output)
}
}

View file

@ -0,0 +1,32 @@
package spark.repl
import org.scalatest.FunSuite
class StandaloneClusterReplSuite extends FunSuite with ReplSuiteMixin {
setupStandaloneCluster
test("simple collect") {
val output = runInterpreter("spark://127.0.1.2:7089", """
var x = 123
val data = sc.parallelize(1 to 3).map(_ + x)
data.take(3)
""")
assertDoesNotContain("error:", output)
assertDoesNotContain("Exception", output)
assertContains("124", output)
assertContains("125", output)
assertContains("126", output)
}
test("simple foreach with accumulator") {
val output = runInterpreter("spark://127.0.1.2:7089", """
val accum = sc.accumulator(0)
sc.parallelize(1 to 10).foreach(x => accum += x)
accum.value
""")
assertDoesNotContain("error:", output)
assertDoesNotContain("Exception", output)
assertContains("res1: Int = 55", output)
}
}