From 4d3471dd077e9e9c9038707eb5ba3fb8539c05e0 Mon Sep 17 00:00:00 2001 From: Denny Date: Mon, 10 Sep 2012 15:39:58 -0700 Subject: [PATCH] Fix serialization bugs and added local cluster tests --- core/src/main/scala/spark/SparkContext.scala | 2 +- .../spark/scheduler/ShuffleMapTask.scala | 2 +- .../test/scala/spark/FileServerSuite.scala | 33 +++++++++++++++++-- 3 files changed, 33 insertions(+), 4 deletions(-) diff --git a/core/src/main/scala/spark/SparkContext.scala b/core/src/main/scala/spark/SparkContext.scala index 7a1bf692e4..2bd07f10d4 100644 --- a/core/src/main/scala/spark/SparkContext.scala +++ b/core/src/main/scala/spark/SparkContext.scala @@ -330,7 +330,7 @@ class SparkContext( // Fetch the file locally in case the task is executed locally val filename = new File(path.split("/").last) - Utils.fetchFile(path, new File("")) + Utils.fetchFile(path, new File(".")) logInfo("Added file " + path + " at " + key + " with timestamp " + addedFiles(key)) } diff --git a/core/src/main/scala/spark/scheduler/ShuffleMapTask.scala b/core/src/main/scala/spark/scheduler/ShuffleMapTask.scala index 3687bb990c..a551bcc782 100644 --- a/core/src/main/scala/spark/scheduler/ShuffleMapTask.scala +++ b/core/src/main/scala/spark/scheduler/ShuffleMapTask.scala @@ -141,7 +141,7 @@ class ShuffleMapTask( val jarSetNumBytes = in.readInt() val jarSetBytes = new Array[Byte](jarSetNumBytes) in.readFully(jarSetBytes) - fileSet = ShuffleMapTask.deserializeFileSet(jarSetBytes) + jarSet = ShuffleMapTask.deserializeFileSet(jarSetBytes) partition = in.readInt() generation = in.readLong() diff --git a/core/src/test/scala/spark/FileServerSuite.scala b/core/src/test/scala/spark/FileServerSuite.scala index 05517e8be4..500af1eb90 100644 --- a/core/src/test/scala/spark/FileServerSuite.scala +++ b/core/src/test/scala/spark/FileServerSuite.scala @@ -33,7 +33,7 @@ class FileServerSuite extends FunSuite with BeforeAndAfter { } } - test("Distributing files") { + test("Distributing files locally") { sc = new SparkContext("local[4]", "test") sc.addFile(tmpFile.toString) val testData = Array((1,1), (1,1), (2,1), (3,5), (2,2), (3,0)) @@ -47,7 +47,7 @@ class FileServerSuite extends FunSuite with BeforeAndAfter { assert(result.toSet === Set((1,200), (2,300), (3,500))) } - test ("Dynamically adding JARS") { + test ("Dynamically adding JARS locally") { sc = new SparkContext("local[4]", "test") val sampleJarFile = getClass().getClassLoader().getResource("uncommons-maths-1.2.2.jar").getFile() sc.addJar(sampleJarFile) @@ -60,5 +60,34 @@ class FileServerSuite extends FunSuite with BeforeAndAfter { }.collect() assert(result.toSet === Set((1,2), (2,7), (3,121))) } + + test("Distributing files on a standalone cluster") { + sc = new SparkContext("local-cluster[1,1,512]", "test") + sc.addFile(tmpFile.toString) + val testData = Array((1,1), (1,1), (2,1), (3,5), (2,2), (3,0)) + val result = sc.parallelize(testData).reduceByKey { + val in = new java.io.BufferedReader(new java.io.FileReader(tmpFile)) + val fileVal = in.readLine().toInt + in.close() + _ * fileVal + _ * fileVal + }.collect + println(result) + assert(result.toSet === Set((1,200), (2,300), (3,500))) + } + + + test ("Dynamically adding JARS on a standalone cluster") { + sc = new SparkContext("local-cluster[1,1,512]", "test") + val sampleJarFile = getClass().getClassLoader().getResource("uncommons-maths-1.2.2.jar").getFile() + sc.addJar(sampleJarFile) + val testData = Array((1,1), (1,1), (2,1), (3,5), (2,3), (3,0)) + val result = sc.parallelize(testData).reduceByKey { (x,y) => + val fac = Thread.currentThread.getContextClassLoader().loadClass("org.uncommons.maths.Maths").getDeclaredMethod("factorial", classOf[Int]) + val a = fac.invoke(null, x.asInstanceOf[java.lang.Integer]).asInstanceOf[Long].toInt + val b = fac.invoke(null, y.asInstanceOf[java.lang.Integer]).asInstanceOf[Long].toInt + a + b + }.collect() + assert(result.toSet === Set((1,2), (2,7), (3,121))) + } } \ No newline at end of file