diff --git a/core/src/main/scala/spark/api/python/PythonRDD.scala b/core/src/main/scala/spark/api/python/PythonRDD.scala index 39758e94f4..ab8351e55e 100644 --- a/core/src/main/scala/spark/api/python/PythonRDD.scala +++ b/core/src/main/scala/spark/api/python/PythonRDD.scala @@ -238,6 +238,11 @@ private[spark] object PythonRDD { } def writeIteratorToPickleFile[T](items: java.util.Iterator[T], filename: String) { + import scala.collection.JavaConverters._ + writeIteratorToPickleFile(items.asScala, filename) + } + + def writeIteratorToPickleFile[T](items: Iterator[T], filename: String) { val file = new DataOutputStream(new FileOutputStream(filename)) for (item <- items) { writeAsPickle(item, file) @@ -245,8 +250,10 @@ private[spark] object PythonRDD { file.close() } - def takePartition[T](rdd: RDD[T], partition: Int): java.util.Iterator[T] = - rdd.context.runJob(rdd, ((x: Iterator[T]) => x), Seq(partition), true).head + def takePartition[T](rdd: RDD[T], partition: Int): Iterator[T] = { + implicit val cm : ClassManifest[T] = rdd.elementClassManifest + rdd.context.runJob(rdd, ((x: Iterator[T]) => x.toArray), Seq(partition), true).head.iterator + } } private object Pickle { diff --git a/core/src/test/scala/spark/MapOutputTrackerSuite.scala b/core/src/test/scala/spark/MapOutputTrackerSuite.scala index f4e7ec39fe..dd19442dcb 100644 --- a/core/src/test/scala/spark/MapOutputTrackerSuite.scala +++ b/core/src/test/scala/spark/MapOutputTrackerSuite.scala @@ -79,8 +79,7 @@ class MapOutputTrackerSuite extends FunSuite with LocalSparkContext { test("remote fetch") { try { System.clearProperty("spark.driver.host") // In case some previous test had set it - val (actorSystem, boundPort) = - AkkaUtils.createActorSystem("test", "localhost", 0) + val (actorSystem, boundPort) = AkkaUtils.createActorSystem("spark", "localhost", 0) System.setProperty("spark.driver.port", boundPort.toString) val masterTracker = new MapOutputTracker(actorSystem, true) val slaveTracker = new MapOutputTracker(actorSystem, false) diff --git a/core/src/test/scala/spark/RDDSuite.scala b/core/src/test/scala/spark/RDDSuite.scala index 89a3687386..fe7deb10d6 100644 --- a/core/src/test/scala/spark/RDDSuite.scala +++ b/core/src/test/scala/spark/RDDSuite.scala @@ -14,7 +14,7 @@ class RDDSuite extends FunSuite with LocalSparkContext { val dups = sc.makeRDD(Array(1, 1, 2, 2, 3, 3, 4, 4), 2) assert(dups.distinct().count() === 4) assert(dups.distinct.count === 4) // Can distinct and count be called without parentheses? - assert(dups.distinct().collect === dups.distinct().collect) + assert(dups.distinct.collect === dups.distinct().collect) assert(dups.distinct(2).collect === dups.distinct().collect) assert(nums.reduce(_ + _) === 10) assert(nums.fold(0)(_ + _) === 10) diff --git a/python/pyspark/accumulators.py b/python/pyspark/accumulators.py index 61fcbbd376..3e9d7d36da 100644 --- a/python/pyspark/accumulators.py +++ b/python/pyspark/accumulators.py @@ -196,12 +196,3 @@ def _start_update_server(): thread.daemon = True thread.start() return server - - -def _test(): - import doctest - doctest.testmod() - - -if __name__ == "__main__": - _test() diff --git a/python/pyspark/broadcast.py b/python/pyspark/broadcast.py index 93876fa738..def810dd46 100644 --- a/python/pyspark/broadcast.py +++ b/python/pyspark/broadcast.py @@ -37,12 +37,3 @@ class Broadcast(object): def __reduce__(self): self._pickle_registry.add(self) return (_from_id, (self.bid, )) - - -def _test(): - import doctest - doctest.testmod() - - -if __name__ == "__main__": - _test() diff --git a/python/pyspark/context.py b/python/pyspark/context.py index 6831f9b7f8..657fe6f989 100644 --- a/python/pyspark/context.py +++ b/python/pyspark/context.py @@ -256,8 +256,10 @@ def _test(): globs['sc'] = SparkContext('local[4]', 'PythonTest', batchSize=2) globs['tempdir'] = tempfile.mkdtemp() atexit.register(lambda: shutil.rmtree(globs['tempdir'])) - doctest.testmod(globs=globs) + (failure_count, test_count) = doctest.testmod(globs=globs) globs['sc'].stop() + if failure_count: + exit(-1) if __name__ == "__main__": diff --git a/python/pyspark/rdd.py b/python/pyspark/rdd.py index 41ea6e6e14..4cda6cf661 100644 --- a/python/pyspark/rdd.py +++ b/python/pyspark/rdd.py @@ -372,6 +372,10 @@ class RDD(object): items = [] for partition in range(self._jrdd.splits().size()): iterator = self.ctx._takePartition(self._jrdd.rdd(), partition) + # Each item in the iterator is a string, Python object, batch of + # Python objects. Regardless, it is sufficient to take `num` + # of these objects in order to collect `num` Python objects: + iterator = iterator.take(num) items.extend(self._collect_iterator_through_file(iterator)) if len(items) >= num: break @@ -748,8 +752,10 @@ def _test(): # The small batch size here ensures that we see multiple batches, # even in these small test examples: globs['sc'] = SparkContext('local[4]', 'PythonTest', batchSize=2) - doctest.testmod(globs=globs) + (failure_count, test_count) = doctest.testmod(globs=globs) globs['sc'].stop() + if failure_count: + exit(-1) if __name__ == "__main__":