HdfsFile.scala: added a try/catch block to exit gracefully for correupted gzip files

MesosScheduler.scala: formatted the slaveOffer() output to include the serialized task size
RDD.scala: added support for aggregating RDDs on a per-split basis
(aggregateSplit()) as well as for sampling without replacement (sample())
This commit is contained in:
Justin Ma 2010-08-18 15:25:57 -07:00
parent 75b2ca10c3
commit 156bccbe23
3 changed files with 35 additions and 2 deletions

View file

@ -44,7 +44,12 @@ extends RDD[String, HdfsSplit](sc) {
override def hasNext: Boolean = {
if (!gotNext) {
finished = !reader.next(lineNum, text)
try {
finished = !reader.next(lineNum, text)
} catch {
case eofe: java.io.EOFException =>
finished = true
}
gotNext = true
}
!finished

View file

@ -223,7 +223,8 @@ extends ParallelOperation
{
val taskId = sched.newTaskId()
tidToIndex(taskId) = i
printf("Starting task %d as TID %s on slave %s: %s (%s)\n",
//printf("Starting task %d as TID %s on slave %s: %s (%s)\n",
printf("Starting task %d as TID %s on slave %s: %s (%s)",
i, taskId, offer.getSlaveId, offer.getHost,
if(checkPref) "preferred" else "non-preferred")
tasks(i).markStarted(offer)
@ -235,6 +236,7 @@ extends ParallelOperation
params.put("cpus", "" + desiredCpus)
params.put("mem", "" + desiredMem)
val serializedTask = Utils.serialize(tasks(i))
println("... Serialized size: " + serializedTask.size)
return Some(new TaskDescription(taskId, offer.getSlaveId,
"task_" + taskId, params, serializedTask))
}

View file

@ -3,6 +3,7 @@ package spark
import java.util.concurrent.atomic.AtomicLong
import java.util.concurrent.ConcurrentHashMap
import java.util.HashSet
import java.util.Random
import scala.collection.mutable.ArrayBuffer
import scala.collection.mutable.Map
@ -24,7 +25,9 @@ abstract class RDD[T: ClassManifest, Split](
def map[U: ClassManifest](f: T => U) = new MappedRDD(this, sc.clean(f))
def filter(f: T => Boolean) = new FilteredRDD(this, sc.clean(f))
def aggregateSplit() = new SplitRDD(this)
def cache() = new CachedRDD(this)
def sample(frac: Double, seed: Int) = new SampledRDD(this, frac, seed)
def foreach(f: T => Unit) {
val cleanF = sc.clean(f)
@ -77,6 +80,7 @@ abstract class RDD[T: ClassManifest, Split](
new UnionRDD(sc, this, other)
def ++[OtherSplit](other: RDD[T, OtherSplit]) = this.union(other)
}
@serializable
@ -136,6 +140,28 @@ extends RDD[T, Split](prev.sparkContext) {
override def taskStarted(split: Split, slot: SlaveOffer) = prev.taskStarted(split, slot)
}
class SplitRDD[T: ClassManifest, Split](
prev: RDD[T, Split])
extends RDD[Array[T], Split](prev.sparkContext) {
override def splits = prev.splits
override def preferredLocations(split: Split) = prev.preferredLocations(split)
override def iterator(split: Split) = Iterator.fromArray(Array(prev.iterator(split).toArray))
override def taskStarted(split: Split, slot: SlaveOffer) = prev.taskStarted(split, slot)
}
@serializable class SeededSplit[Split](val prev: Split, val seed: Int) {}
class SampledRDD[T: ClassManifest, Split](
prev: RDD[T, Split], frac: Double, seed: Int)
extends RDD[T, SeededSplit[Split]](prev.sparkContext) {
@transient val splits_ = { val rg = new Random(seed); prev.splits.map(x => new SeededSplit(x, rg.nextInt)) }
override def splits = splits_
override def preferredLocations(split: SeededSplit[Split]) = prev.preferredLocations(split.prev)
override def iterator(split: SeededSplit[Split]) = { val rg = new Random(split.seed); prev.iterator(split.prev).filter(x => (rg.nextDouble <= frac)) }
override def taskStarted(split: SeededSplit[Split], slot: SlaveOffer) = prev.taskStarted(split.prev, slot)
}
class CachedRDD[T, Split](
prev: RDD[T, Split])(implicit m: ClassManifest[T])
extends RDD[T, Split](prev.sparkContext) {