Fixed compile time warnings and formatting post merge.
This commit is contained in:
parent
44fd30d3fb
commit
d092a8cc6a
|
@ -17,9 +17,7 @@
|
|||
|
||||
package org.apache.spark.deploy.master
|
||||
|
||||
private[spark] object RecoveryState
|
||||
extends Enumeration("STANDBY", "ALIVE", "RECOVERING", "COMPLETING_RECOVERY") {
|
||||
|
||||
private[spark] object RecoveryState extends Enumeration {
|
||||
type MasterState = Value
|
||||
|
||||
val STANDBY, ALIVE, RECOVERING, COMPLETING_RECOVERY = Value
|
||||
|
|
|
@ -553,7 +553,7 @@ abstract class RDD[T: ClassTag](
|
|||
(f: (Iterator[T], Iterator[B]) => Iterator[V]): RDD[V] =
|
||||
new ZippedPartitionsRDD2(sc, sc.clean(f), this, rdd2, false)
|
||||
|
||||
def zipPartitions[B: ClassManifest, C: ClassManifest, V: ClassManifest]
|
||||
def zipPartitions[B: ClassTag, C: ClassTag, V: ClassTag]
|
||||
(rdd2: RDD[B], rdd3: RDD[C], preservesPartitioning: Boolean)
|
||||
(f: (Iterator[T], Iterator[B], Iterator[C]) => Iterator[V]): RDD[V] =
|
||||
new ZippedPartitionsRDD3(sc, sc.clean(f), this, rdd2, rdd3, preservesPartitioning)
|
||||
|
@ -563,7 +563,7 @@ abstract class RDD[T: ClassTag](
|
|||
(f: (Iterator[T], Iterator[B], Iterator[C]) => Iterator[V]): RDD[V] =
|
||||
new ZippedPartitionsRDD3(sc, sc.clean(f), this, rdd2, rdd3, false)
|
||||
|
||||
def zipPartitions[B: ClassManifest, C: ClassManifest, D: ClassManifest, V: ClassManifest]
|
||||
def zipPartitions[B: ClassTag, C: ClassTag, D: ClassTag, V: ClassTag]
|
||||
(rdd2: RDD[B], rdd3: RDD[C], rdd4: RDD[D], preservesPartitioning: Boolean)
|
||||
(f: (Iterator[T], Iterator[B], Iterator[C], Iterator[D]) => Iterator[V]): RDD[V] =
|
||||
new ZippedPartitionsRDD4(sc, sc.clean(f), this, rdd2, rdd3, rdd4, preservesPartitioning)
|
||||
|
|
|
@ -55,8 +55,7 @@ class MetadataCleaner(cleanerType: MetadataCleanerType.MetadataCleanerType, clea
|
|||
}
|
||||
}
|
||||
|
||||
object MetadataCleanerType extends Enumeration("MapOutputTracker", "SparkContext", "HttpBroadcast", "DagScheduler", "ResultTask",
|
||||
"ShuffleMapTask", "BlockManager", "DiskBlockManager", "BroadcastVars") {
|
||||
object MetadataCleanerType extends Enumeration {
|
||||
|
||||
val MAP_OUTPUT_TRACKER, SPARK_CONTEXT, HTTP_BROADCAST, DAG_SCHEDULER, RESULT_TASK,
|
||||
SHUFFLE_MAP_TASK, BLOCK_MANAGER, SHUFFLE_BLOCK_MANAGER, BROADCAST_VARS = Value
|
||||
|
|
|
@ -39,7 +39,7 @@ class BlockIdSuite extends FunSuite {
|
|||
fail()
|
||||
} catch {
|
||||
case e: IllegalStateException => // OK
|
||||
case _ => fail()
|
||||
case _: Throwable => fail()
|
||||
}
|
||||
}
|
||||
|
||||
|
|
|
@ -79,23 +79,25 @@ class ReplSuite extends FunSuite {
|
|||
}
|
||||
|
||||
test("simple foreach with accumulator") {
|
||||
val output = runInterpreter("local", """
|
||||
|val accum = sc.accumulator(0)
|
||||
|sc.parallelize(1 to 10).foreach(x => accum += x)
|
||||
|accum.value
|
||||
""".stripMargin)
|
||||
val output = runInterpreter("local",
|
||||
"""
|
||||
|val accum = sc.accumulator(0)
|
||||
|sc.parallelize(1 to 10).foreach(x => accum += x)
|
||||
|accum.value
|
||||
""".stripMargin)
|
||||
assertDoesNotContain("error:", output)
|
||||
assertDoesNotContain("Exception", output)
|
||||
assertContains("res1: Int = 55", output)
|
||||
}
|
||||
|
||||
test("external vars") {
|
||||
val output = runInterpreter("local", """
|
||||
|var v = 7
|
||||
|sc.parallelize(1 to 10).map(x => v).collect.reduceLeft(_+_)
|
||||
|v = 10
|
||||
|sc.parallelize(1 to 10).map(x => v).collect.reduceLeft(_+_)
|
||||
""".stripMargin)
|
||||
val output = runInterpreter("local",
|
||||
"""
|
||||
|var v = 7
|
||||
|sc.parallelize(1 to 10).map(x => v).collect.reduceLeft(_+_)
|
||||
|v = 10
|
||||
|sc.parallelize(1 to 10).map(x => v).collect.reduceLeft(_+_)
|
||||
""".stripMargin)
|
||||
assertDoesNotContain("error:", output)
|
||||
assertDoesNotContain("Exception", output)
|
||||
assertContains("res0: Int = 70", output)
|
||||
|
@ -103,35 +105,38 @@ class ReplSuite extends FunSuite {
|
|||
}
|
||||
|
||||
test("external classes") {
|
||||
val output = runInterpreter("local", """
|
||||
|class C {
|
||||
|def foo = 5
|
||||
|}
|
||||
|sc.parallelize(1 to 10).map(x => (new C).foo).collect.reduceLeft(_+_)
|
||||
""".stripMargin)
|
||||
val output = runInterpreter("local",
|
||||
"""
|
||||
|class C {
|
||||
|def foo = 5
|
||||
|}
|
||||
|sc.parallelize(1 to 10).map(x => (new C).foo).collect.reduceLeft(_+_)
|
||||
""".stripMargin)
|
||||
assertDoesNotContain("error:", output)
|
||||
assertDoesNotContain("Exception", output)
|
||||
assertContains("res0: Int = 50", output)
|
||||
}
|
||||
|
||||
test("external functions") {
|
||||
val output = runInterpreter("local", """
|
||||
|def double(x: Int) = x + x
|
||||
|sc.parallelize(1 to 10).map(x => double(x)).collect.reduceLeft(_+_)
|
||||
""".stripMargin)
|
||||
val output = runInterpreter("local",
|
||||
"""
|
||||
|def double(x: Int) = x + x
|
||||
|sc.parallelize(1 to 10).map(x => double(x)).collect.reduceLeft(_+_)
|
||||
""".stripMargin)
|
||||
assertDoesNotContain("error:", output)
|
||||
assertDoesNotContain("Exception", output)
|
||||
assertContains("res0: Int = 110", output)
|
||||
}
|
||||
|
||||
test("external functions that access vars") {
|
||||
val output = runInterpreter("local", """
|
||||
|var v = 7
|
||||
|def getV() = v
|
||||
|sc.parallelize(1 to 10).map(x => getV()).collect.reduceLeft(_+_)
|
||||
|v = 10
|
||||
|sc.parallelize(1 to 10).map(x => getV()).collect.reduceLeft(_+_)
|
||||
""".stripMargin)
|
||||
val output = runInterpreter("local",
|
||||
"""
|
||||
|var v = 7
|
||||
|def getV() = v
|
||||
|sc.parallelize(1 to 10).map(x => getV()).collect.reduceLeft(_+_)
|
||||
|v = 10
|
||||
|sc.parallelize(1 to 10).map(x => getV()).collect.reduceLeft(_+_)
|
||||
""".stripMargin)
|
||||
assertDoesNotContain("error:", output)
|
||||
assertDoesNotContain("Exception", output)
|
||||
assertContains("res0: Int = 70", output)
|
||||
|
@ -142,13 +147,14 @@ class ReplSuite extends FunSuite {
|
|||
// Test that the value that a broadcast var had when it was created is used,
|
||||
// even if that variable is then modified in the driver program
|
||||
// TODO: This doesn't actually work for arrays when we run in local mode!
|
||||
val output = runInterpreter("local", """
|
||||
|var array = new Array[Int](5)
|
||||
|val broadcastArray = sc.broadcast(array)
|
||||
|sc.parallelize(0 to 4).map(x => broadcastArray.value(x)).collect
|
||||
|array(0) = 5
|
||||
|sc.parallelize(0 to 4).map(x => broadcastArray.value(x)).collect
|
||||
""".stripMargin)
|
||||
val output = runInterpreter("local",
|
||||
"""
|
||||
|var array = new Array[Int](5)
|
||||
|val broadcastArray = sc.broadcast(array)
|
||||
|sc.parallelize(0 to 4).map(x => broadcastArray.value(x)).collect
|
||||
|array(0) = 5
|
||||
|sc.parallelize(0 to 4).map(x => broadcastArray.value(x)).collect
|
||||
""".stripMargin)
|
||||
assertDoesNotContain("error:", output)
|
||||
assertDoesNotContain("Exception", output)
|
||||
assertContains("res0: Array[Int] = Array(0, 0, 0, 0, 0)", output)
|
||||
|
@ -162,12 +168,13 @@ class ReplSuite extends FunSuite {
|
|||
out.write("What's up?\n")
|
||||
out.write("Goodbye\n")
|
||||
out.close()
|
||||
val output = runInterpreter("local", """
|
||||
|var file = sc.textFile("%s/input").cache()
|
||||
|file.count()
|
||||
|file.count()
|
||||
|file.count()
|
||||
""".stripMargin.format(tempDir.getAbsolutePath))
|
||||
val output = runInterpreter("local",
|
||||
"""
|
||||
|var file = sc.textFile("%s/input").cache()
|
||||
|file.count()
|
||||
|file.count()
|
||||
|file.count()
|
||||
""".stripMargin.format(tempDir.getAbsolutePath))
|
||||
assertDoesNotContain("error:", output)
|
||||
assertDoesNotContain("Exception", output)
|
||||
assertContains("res0: Long = 3", output)
|
||||
|
@ -176,18 +183,19 @@ class ReplSuite extends FunSuite {
|
|||
}
|
||||
|
||||
test("local-cluster mode") {
|
||||
val output = runInterpreter("local-cluster[1,1,512]", """
|
||||
|var v = 7
|
||||
|def getV() = v
|
||||
|sc.parallelize(1 to 10).map(x => getV()).collect.reduceLeft(_+_)
|
||||
|v = 10
|
||||
|sc.parallelize(1 to 10).map(x => getV()).collect.reduceLeft(_+_)
|
||||
|var array = new Array[Int](5)
|
||||
|val broadcastArray = sc.broadcast(array)
|
||||
|sc.parallelize(0 to 4).map(x => broadcastArray.value(x)).collect
|
||||
|array(0) = 5
|
||||
|sc.parallelize(0 to 4).map(x => broadcastArray.value(x)).collect
|
||||
""".stripMargin)
|
||||
val output = runInterpreter("local-cluster[1,1,512]",
|
||||
"""
|
||||
|var v = 7
|
||||
|def getV() = v
|
||||
|sc.parallelize(1 to 10).map(x => getV()).collect.reduceLeft(_+_)
|
||||
|v = 10
|
||||
|sc.parallelize(1 to 10).map(x => getV()).collect.reduceLeft(_+_)
|
||||
|var array = new Array[Int](5)
|
||||
|val broadcastArray = sc.broadcast(array)
|
||||
|sc.parallelize(0 to 4).map(x => broadcastArray.value(x)).collect
|
||||
|array(0) = 5
|
||||
|sc.parallelize(0 to 4).map(x => broadcastArray.value(x)).collect
|
||||
""".stripMargin)
|
||||
assertDoesNotContain("error:", output)
|
||||
assertDoesNotContain("Exception", output)
|
||||
assertContains("res0: Int = 70", output)
|
||||
|
@ -198,18 +206,19 @@ class ReplSuite extends FunSuite {
|
|||
|
||||
if (System.getenv("MESOS_NATIVE_LIBRARY") != null) {
|
||||
test("running on Mesos") {
|
||||
val output = runInterpreter("localquiet", """
|
||||
|var v = 7
|
||||
|def getV() = v
|
||||
|sc.parallelize(1 to 10).map(x => getV()).collect.reduceLeft(_+_)
|
||||
|v = 10
|
||||
|sc.parallelize(1 to 10).map(x => getV()).collect.reduceLeft(_+_)
|
||||
|var array = new Array[Int](5)
|
||||
|val broadcastArray = sc.broadcast(array)
|
||||
|sc.parallelize(0 to 4).map(x => broadcastArray.value(x)).collect
|
||||
|array(0) = 5
|
||||
|sc.parallelize(0 to 4).map(x => broadcastArray.value(x)).collect
|
||||
""".stripMargin)
|
||||
val output = runInterpreter("localquiet",
|
||||
"""
|
||||
|var v = 7
|
||||
|def getV() = v
|
||||
|sc.parallelize(1 to 10).map(x => getV()).collect.reduceLeft(_+_)
|
||||
|v = 10
|
||||
|sc.parallelize(1 to 10).map(x => getV()).collect.reduceLeft(_+_)
|
||||
|var array = new Array[Int](5)
|
||||
|val broadcastArray = sc.broadcast(array)
|
||||
|sc.parallelize(0 to 4).map(x => broadcastArray.value(x)).collect
|
||||
|array(0) = 5
|
||||
|sc.parallelize(0 to 4).map(x => broadcastArray.value(x)).collect
|
||||
""".stripMargin)
|
||||
assertDoesNotContain("error:", output)
|
||||
assertDoesNotContain("Exception", output)
|
||||
assertContains("res0: Int = 70", output)
|
||||
|
|
Loading…
Reference in a new issue