Changes on top of Prashant's patch.

Closes #316
This commit is contained in:
Patrick Wendell 2014-01-03 17:32:25 -08:00
parent bc311bb826
commit 9e6f3bdcda
10 changed files with 42 additions and 72 deletions

View file

@ -13,7 +13,7 @@ This README file only contains basic setup instructions.
## Building ## Building
Spark requires Scala 2.10. The project is built using Simple Build Tool (SBT), Spark requires Scala 2.10. The project is built using Simple Build Tool (SBT),
which can be obtained from [here](http://www.scala-sbt.org). To build Spark and its example programs, run: which can be obtained [here](http://www.scala-sbt.org). To build Spark and its example programs, run:
sbt assembly sbt assembly
@ -38,24 +38,11 @@ locally with one thread, or "local[N]" to run locally with N threads.
## Running tests ## Running tests
### With sbt (Much faster to run compared to maven) Testing first requires [Building](#Building) Spark. Once Spark is built, tests
Once you have built spark with `sbt assembly` mentioned in [Building](#Building) section. Test suits can be run as follows using sbt. can be run using:
`sbt test` `sbt test`
### With maven.
1. Export these necessary environment variables as follows.
`export SCALA_HOME=<scala distribution>`
`export MAVEN_OPTS="-Xmx1512m -XX:MaxPermSize=512m"`
2. Build assembly by
`mvn package -DskipTests`
3. Run tests
`mvn test`
## A Note About Hadoop Versions ## A Note About Hadoop Versions
Spark uses the Hadoop core library to talk to HDFS and other Hadoop-supported Spark uses the Hadoop core library to talk to HDFS and other Hadoop-supported

View file

@ -173,7 +173,8 @@ class SparkContext(
value <- Option(System.getenv(key))) { value <- Option(System.getenv(key))) {
executorEnvs(key) = value executorEnvs(key) = value
} }
// A workaround for SPARK_TESTING and SPARK_HOME // Convert java options to env vars as a work around
// since we can't set env vars directly in sbt.
for { (envKey, propKey) <- Seq(("SPARK_HOME", "spark.home"), ("SPARK_TESTING", "spark.testing")) for { (envKey, propKey) <- Seq(("SPARK_HOME", "spark.home"), ("SPARK_TESTING", "spark.testing"))
value <- Option(System.getenv(envKey)).orElse(Option(System.getProperty(propKey)))} { value <- Option(System.getenv(envKey)).orElse(Option(System.getProperty(propKey)))} {
executorEnvs(envKey) = value executorEnvs(envKey) = value

View file

@ -30,8 +30,7 @@ import org.apache.spark.util.Utils
class DriverSuite extends FunSuite with Timeouts { class DriverSuite extends FunSuite with Timeouts {
test("driver should exit after finishing") { test("driver should exit after finishing") {
val sparkHome = Option(System.getenv("SPARK_HOME")) val sparkHome = sys.env.get("SPARK_HOME").orElse(sys.props.get("spark.home")).get
.orElse(Option(System.getProperty("spark.home"))).get
// Regression test for SPARK-530: "Spark driver process doesn't exit after finishing" // Regression test for SPARK-530: "Spark driver process doesn't exit after finishing"
val masters = Table(("master"), ("local"), ("local-cluster[2,1,512]")) val masters = Table(("master"), ("local"), ("local-cluster[2,1,512]"))
forAll(masters) { (master: String) => forAll(masters) { (master: String) =>

View file

@ -27,54 +27,39 @@ import org.scalatest.FunSuite
class FileServerSuite extends FunSuite with LocalSparkContext { class FileServerSuite extends FunSuite with LocalSparkContext {
@transient var tmpFile: File = _ @transient var tmpFile: File = _
@transient var testJarFile: String = _ @transient var tmpJarUrl: String = _
override def beforeAll() { override def beforeAll() {
super.beforeAll() super.beforeAll()
val buffer = new Array[Byte](10240) val tmpDir = new File(Files.createTempDir(), "test")
val tmpdir = new File(Files.createTempDir(), "test") tmpDir.mkdir()
tmpdir.mkdir()
val tmpJarEntry = new File(tmpdir, "FileServerSuite2.txt") val textFile = new File(tmpDir, "FileServerSuite.txt")
val pw = new PrintWriter(tmpJarEntry) val pw = new PrintWriter(textFile)
pw.println("test String in the file named FileServerSuite2.txt") pw.println("100")
pw.close() pw.close()
// The ugliest code possible, was translated from java.
val tmpFile2 = new File(tmpdir, "test.jar") val jarFile = new File(tmpDir, "test.jar")
val stream = new FileOutputStream(tmpFile2) val jarStream = new FileOutputStream(jarFile)
val jar = new JarOutputStream(stream, new java.util.jar.Manifest()) val jar = new JarOutputStream(jarStream, new java.util.jar.Manifest())
val jarAdd = new JarEntry(tmpJarEntry.getName)
jarAdd.setTime(tmpJarEntry.lastModified) val jarEntry = new JarEntry(textFile.getName)
jar.putNextEntry(jarAdd) jar.putNextEntry(jarEntry)
val in = new FileInputStream(tmpJarEntry)
val in = new FileInputStream(textFile)
val buffer = new Array[Byte](10240)
var nRead = 0 var nRead = 0
while (nRead <= 0) { while (nRead <= 0) {
nRead = in.read(buffer, 0, buffer.length) nRead = in.read(buffer, 0, buffer.length)
jar.write(buffer, 0, nRead) jar.write(buffer, 0, nRead)
} }
in.close() in.close()
jar.close() jar.close()
stream.close() jarStream.close()
testJarFile = tmpFile2.toURI.toURL.toString
}
override def beforeEach() { tmpFile = textFile
super.beforeEach() tmpJarUrl = jarFile.toURI.toURL.toString
// Create a sample text file
val tmpdir = new File(Files.createTempDir(), "test")
tmpdir.mkdir()
tmpFile = new File(tmpdir, "FileServerSuite.txt")
val pw = new PrintWriter(tmpFile)
pw.println("100")
pw.close()
}
override def afterEach() {
super.afterEach()
// Clean up downloaded file
if (tmpFile.exists) {
tmpFile.delete()
}
} }
test("Distributing files locally") { test("Distributing files locally") {
@ -108,10 +93,10 @@ class FileServerSuite extends FunSuite with LocalSparkContext {
test ("Dynamically adding JARS locally") { test ("Dynamically adding JARS locally") {
sc = new SparkContext("local[4]", "test") sc = new SparkContext("local[4]", "test")
sc.addJar(testJarFile) sc.addJar(tmpJarUrl)
val testData = Array((1, 1)) val testData = Array((1, 1))
sc.parallelize(testData).foreach { (x) => sc.parallelize(testData).foreach { x =>
if (Thread.currentThread.getContextClassLoader.getResource("FileServerSuite2.txt") == null) { if (Thread.currentThread.getContextClassLoader.getResource("FileServerSuite.txt") == null) {
throw new SparkException("jar not added") throw new SparkException("jar not added")
} }
} }
@ -133,10 +118,10 @@ class FileServerSuite extends FunSuite with LocalSparkContext {
test ("Dynamically adding JARS on a standalone cluster") { test ("Dynamically adding JARS on a standalone cluster") {
sc = new SparkContext("local-cluster[1,1,512]", "test") sc = new SparkContext("local-cluster[1,1,512]", "test")
sc.addJar(testJarFile) sc.addJar(tmpJarUrl)
val testData = Array((1,1)) val testData = Array((1,1))
sc.parallelize(testData).foreach { (x) => sc.parallelize(testData).foreach { x =>
if (Thread.currentThread.getContextClassLoader.getResource("FileServerSuite2.txt") == null) { if (Thread.currentThread.getContextClassLoader.getResource("FileServerSuite.txt") == null) {
throw new SparkException("jar not added") throw new SparkException("jar not added")
} }
} }
@ -144,10 +129,10 @@ class FileServerSuite extends FunSuite with LocalSparkContext {
test ("Dynamically adding JARS on a standalone cluster using local: URL") { test ("Dynamically adding JARS on a standalone cluster using local: URL") {
sc = new SparkContext("local-cluster[1,1,512]", "test") sc = new SparkContext("local-cluster[1,1,512]", "test")
sc.addJar(testJarFile.replace("file", "local")) sc.addJar(tmpJarUrl.replace("file", "local"))
val testData = Array((1,1)) val testData = Array((1,1))
sc.parallelize(testData).foreach { (x) => sc.parallelize(testData).foreach { x =>
if (Thread.currentThread.getContextClassLoader.getResource("FileServerSuite2.txt") == null) { if (Thread.currentThread.getContextClassLoader.getResource("FileServerSuite.txt") == null) {
throw new SparkException("jar not added") throw new SparkException("jar not added")
} }
} }

View file

@ -19,18 +19,14 @@ package org.apache.spark.deploy.worker
import java.io.File import java.io.File
import scala.util.Try
import org.scalatest.FunSuite import org.scalatest.FunSuite
import org.apache.spark.deploy.{ExecutorState, Command, ApplicationDescription} import org.apache.spark.deploy.{ExecutorState, Command, ApplicationDescription}
class ExecutorRunnerTest extends FunSuite { class ExecutorRunnerTest extends FunSuite {
test("command includes appId") { test("command includes appId") {
def f(s:String) = new File(s) def f(s:String) = new File(s)
val sparkHome = Try(sys.env("SPARK_HOME")).toOption val sparkHome = sys.env.get("SPARK_HOME").orElse(sys.env.get("spark.home")).get
.orElse(Option(System.getProperty("spark.home"))).get
val appDesc = new ApplicationDescription("app name", 8, 500, Command("foo", Seq(),Map()), val appDesc = new ApplicationDescription("app name", 8, 500, Command("foo", Seq(),Map()),
sparkHome, "appUiUrl") sparkHome, "appUiUrl")
val appId = "12345-worker321-9876" val appId = "12345-worker321-9876"

View file

@ -45,7 +45,8 @@ DISTDIR="$FWDIR/dist"
export TERM=dumb # Prevents color codes in SBT output export TERM=dumb # Prevents color codes in SBT output
if ! test `which sbt` ;then if ! test `which sbt` ;then
echo -e "You need sbt installed and available on path, please follow the instructions here: http://www.scala-sbt.org/release/docs/Getting-Started/Setup.html" echo -e "You need sbt installed and available on your path."
echo -e "Download sbt from http://www.scala-sbt.org/"
exit -1; exit -1;
fi fi

View file

@ -72,6 +72,7 @@ object SparkBuild extends Build {
val sparkHome = System.getProperty("user.dir") val sparkHome = System.getProperty("user.dir")
System.setProperty("spark.home", sparkHome) System.setProperty("spark.home", sparkHome)
System.setProperty("spark.testing", "1") System.setProperty("spark.testing", "1")
// Allows build configuration to be set through environment variables // Allows build configuration to be set through environment variables
lazy val hadoopVersion = Properties.envOrElse("SPARK_HADOOP_VERSION", DEFAULT_HADOOP_VERSION) lazy val hadoopVersion = Properties.envOrElse("SPARK_HADOOP_VERSION", DEFAULT_HADOOP_VERSION)
lazy val isNewHadoop = Properties.envOrNone("SPARK_IS_NEW_HADOOP") match { lazy val isNewHadoop = Properties.envOrNone("SPARK_IS_NEW_HADOOP") match {

Binary file not shown.

Binary file not shown.

View file

@ -40,7 +40,7 @@ Public classes:
import sys import sys
import os import os
sys.path.insert(0, os.path.join(os.environ["SPARK_HOME"], "python/lib/py4j0.7.egg")) sys.path.insert(0, os.path.join(os.environ["SPARK_HOME"], "python/lib/py4j-0.8.1-src.zip"))
from pyspark.conf import SparkConf from pyspark.conf import SparkConf