Fix serialization bugs and added local cluster tests
This commit is contained in:
parent
94a7e82ba1
commit
4d3471dd07
|
@ -330,7 +330,7 @@ class SparkContext(
|
|||
|
||||
// Fetch the file locally in case the task is executed locally
|
||||
val filename = new File(path.split("/").last)
|
||||
Utils.fetchFile(path, new File(""))
|
||||
Utils.fetchFile(path, new File("."))
|
||||
|
||||
logInfo("Added file " + path + " at " + key + " with timestamp " + addedFiles(key))
|
||||
}
|
||||
|
|
|
@ -141,7 +141,7 @@ class ShuffleMapTask(
|
|||
val jarSetNumBytes = in.readInt()
|
||||
val jarSetBytes = new Array[Byte](jarSetNumBytes)
|
||||
in.readFully(jarSetBytes)
|
||||
fileSet = ShuffleMapTask.deserializeFileSet(jarSetBytes)
|
||||
jarSet = ShuffleMapTask.deserializeFileSet(jarSetBytes)
|
||||
|
||||
partition = in.readInt()
|
||||
generation = in.readLong()
|
||||
|
|
|
@ -33,7 +33,7 @@ class FileServerSuite extends FunSuite with BeforeAndAfter {
|
|||
}
|
||||
}
|
||||
|
||||
test("Distributing files") {
|
||||
test("Distributing files locally") {
|
||||
sc = new SparkContext("local[4]", "test")
|
||||
sc.addFile(tmpFile.toString)
|
||||
val testData = Array((1,1), (1,1), (2,1), (3,5), (2,2), (3,0))
|
||||
|
@ -47,7 +47,7 @@ class FileServerSuite extends FunSuite with BeforeAndAfter {
|
|||
assert(result.toSet === Set((1,200), (2,300), (3,500)))
|
||||
}
|
||||
|
||||
test ("Dynamically adding JARS") {
|
||||
test ("Dynamically adding JARS locally") {
|
||||
sc = new SparkContext("local[4]", "test")
|
||||
val sampleJarFile = getClass().getClassLoader().getResource("uncommons-maths-1.2.2.jar").getFile()
|
||||
sc.addJar(sampleJarFile)
|
||||
|
@ -60,5 +60,34 @@ class FileServerSuite extends FunSuite with BeforeAndAfter {
|
|||
}.collect()
|
||||
assert(result.toSet === Set((1,2), (2,7), (3,121)))
|
||||
}
|
||||
|
||||
test("Distributing files on a standalone cluster") {
|
||||
sc = new SparkContext("local-cluster[1,1,512]", "test")
|
||||
sc.addFile(tmpFile.toString)
|
||||
val testData = Array((1,1), (1,1), (2,1), (3,5), (2,2), (3,0))
|
||||
val result = sc.parallelize(testData).reduceByKey {
|
||||
val in = new java.io.BufferedReader(new java.io.FileReader(tmpFile))
|
||||
val fileVal = in.readLine().toInt
|
||||
in.close()
|
||||
_ * fileVal + _ * fileVal
|
||||
}.collect
|
||||
println(result)
|
||||
assert(result.toSet === Set((1,200), (2,300), (3,500)))
|
||||
}
|
||||
|
||||
|
||||
test ("Dynamically adding JARS on a standalone cluster") {
|
||||
sc = new SparkContext("local-cluster[1,1,512]", "test")
|
||||
val sampleJarFile = getClass().getClassLoader().getResource("uncommons-maths-1.2.2.jar").getFile()
|
||||
sc.addJar(sampleJarFile)
|
||||
val testData = Array((1,1), (1,1), (2,1), (3,5), (2,3), (3,0))
|
||||
val result = sc.parallelize(testData).reduceByKey { (x,y) =>
|
||||
val fac = Thread.currentThread.getContextClassLoader().loadClass("org.uncommons.maths.Maths").getDeclaredMethod("factorial", classOf[Int])
|
||||
val a = fac.invoke(null, x.asInstanceOf[java.lang.Integer]).asInstanceOf[Long].toInt
|
||||
val b = fac.invoke(null, y.asInstanceOf[java.lang.Integer]).asInstanceOf[Long].toInt
|
||||
a + b
|
||||
}.collect()
|
||||
assert(result.toSet === Set((1,2), (2,7), (3,121)))
|
||||
}
|
||||
|
||||
}
|
Loading…
Reference in a new issue