Merge branch 'mesos'

This commit is contained in:
haitao.yao 2013-02-05 14:09:45 +08:00
commit f609182e5b
7 changed files with 21 additions and 25 deletions

View file

@ -238,6 +238,11 @@ private[spark] object PythonRDD {
}
def writeIteratorToPickleFile[T](items: java.util.Iterator[T], filename: String) {
import scala.collection.JavaConverters._
writeIteratorToPickleFile(items.asScala, filename)
}
def writeIteratorToPickleFile[T](items: Iterator[T], filename: String) {
val file = new DataOutputStream(new FileOutputStream(filename))
for (item <- items) {
writeAsPickle(item, file)
@ -245,8 +250,10 @@ private[spark] object PythonRDD {
file.close()
}
def takePartition[T](rdd: RDD[T], partition: Int): java.util.Iterator[T] =
rdd.context.runJob(rdd, ((x: Iterator[T]) => x), Seq(partition), true).head
def takePartition[T](rdd: RDD[T], partition: Int): Iterator[T] = {
implicit val cm : ClassManifest[T] = rdd.elementClassManifest
rdd.context.runJob(rdd, ((x: Iterator[T]) => x.toArray), Seq(partition), true).head.iterator
}
}
private object Pickle {

View file

@ -79,8 +79,7 @@ class MapOutputTrackerSuite extends FunSuite with LocalSparkContext {
test("remote fetch") {
try {
System.clearProperty("spark.driver.host") // In case some previous test had set it
val (actorSystem, boundPort) =
AkkaUtils.createActorSystem("test", "localhost", 0)
val (actorSystem, boundPort) = AkkaUtils.createActorSystem("spark", "localhost", 0)
System.setProperty("spark.driver.port", boundPort.toString)
val masterTracker = new MapOutputTracker(actorSystem, true)
val slaveTracker = new MapOutputTracker(actorSystem, false)

View file

@ -14,7 +14,7 @@ class RDDSuite extends FunSuite with LocalSparkContext {
val dups = sc.makeRDD(Array(1, 1, 2, 2, 3, 3, 4, 4), 2)
assert(dups.distinct().count() === 4)
assert(dups.distinct.count === 4) // Can distinct and count be called without parentheses?
assert(dups.distinct().collect === dups.distinct().collect)
assert(dups.distinct.collect === dups.distinct().collect)
assert(dups.distinct(2).collect === dups.distinct().collect)
assert(nums.reduce(_ + _) === 10)
assert(nums.fold(0)(_ + _) === 10)

View file

@ -196,12 +196,3 @@ def _start_update_server():
thread.daemon = True
thread.start()
return server
def _test():
import doctest
doctest.testmod()
if __name__ == "__main__":
_test()

View file

@ -37,12 +37,3 @@ class Broadcast(object):
def __reduce__(self):
self._pickle_registry.add(self)
return (_from_id, (self.bid, ))
def _test():
import doctest
doctest.testmod()
if __name__ == "__main__":
_test()

View file

@ -256,8 +256,10 @@ def _test():
globs['sc'] = SparkContext('local[4]', 'PythonTest', batchSize=2)
globs['tempdir'] = tempfile.mkdtemp()
atexit.register(lambda: shutil.rmtree(globs['tempdir']))
doctest.testmod(globs=globs)
(failure_count, test_count) = doctest.testmod(globs=globs)
globs['sc'].stop()
if failure_count:
exit(-1)
if __name__ == "__main__":

View file

@ -372,6 +372,10 @@ class RDD(object):
items = []
for partition in range(self._jrdd.splits().size()):
iterator = self.ctx._takePartition(self._jrdd.rdd(), partition)
# Each item in the iterator is a string, Python object, batch of
# Python objects. Regardless, it is sufficient to take `num`
# of these objects in order to collect `num` Python objects:
iterator = iterator.take(num)
items.extend(self._collect_iterator_through_file(iterator))
if len(items) >= num:
break
@ -748,8 +752,10 @@ def _test():
# The small batch size here ensures that we see multiple batches,
# even in these small test examples:
globs['sc'] = SparkContext('local[4]', 'PythonTest', batchSize=2)
doctest.testmod(globs=globs)
(failure_count, test_count) = doctest.testmod(globs=globs)
globs['sc'].stop()
if failure_count:
exit(-1)
if __name__ == "__main__":