2010-06-17 15:49:42 -04:00
package spark
import java.util.concurrent.atomic.AtomicLong
import java.util.concurrent.ConcurrentHashMap
import java.util.HashSet
2010-08-18 18:25:57 -04:00
import java.util.Random
2010-06-17 15:49:42 -04:00
import scala.collection.mutable.ArrayBuffer
import scala.collection.mutable.Map
2010-07-25 23:53:46 -04:00
import mesos._
2010-06-17 15:49:42 -04:00
import com.google.common.collect.MapMaker
@serializable
2010-08-31 15:08:09 -04:00
abstract class RDD [ T : ClassManifest ] (
2010-06-17 15:49:42 -04:00
@transient sc : SparkContext ) {
def splits : Array [ Split ]
def iterator ( split : Split ) : Iterator [ T ]
2010-06-27 18:21:54 -04:00
def preferredLocations ( split : Split ) : Seq [ String ]
2010-06-17 15:49:42 -04:00
def taskStarted ( split : Split , slot : SlaveOffer ) { }
def sparkContext = sc
def map [ U : ClassManifest ] ( f : T => U ) = new MappedRDD ( this , sc . clean ( f ) )
def filter ( f : T => Boolean ) = new FilteredRDD ( this , sc . clean ( f ) )
2010-08-18 18:25:57 -04:00
def aggregateSplit ( ) = new SplitRDD ( this )
2010-06-17 15:49:42 -04:00
def cache ( ) = new CachedRDD ( this )
2010-08-18 18:59:35 -04:00
def sample ( withReplacement : Boolean , frac : Double , seed : Int ) = new SampledRDD ( this , withReplacement , frac , seed )
2010-06-17 15:49:42 -04:00
def foreach ( f : T => Unit ) {
val cleanF = sc . clean ( f )
val tasks = splits . map ( s => new ForeachTask ( this , s , cleanF ) ) . toArray
sc . runTaskObjects ( tasks )
}
def collect ( ) : Array [ T ] = {
val tasks = splits . map ( s => new CollectTask ( this , s ) )
val results = sc . runTaskObjects ( tasks )
Array . concat ( results : _ * )
}
def toArray ( ) : Array [ T ] = collect ( )
def reduce ( f : ( T , T ) => T ) : T = {
val cleanF = sc . clean ( f )
val tasks = splits . map ( s => new ReduceTask ( this , s , f ) )
val results = new ArrayBuffer [ T ]
for ( option <- sc . runTaskObjects ( tasks ) ; elem <- option )
results += elem
if ( results . size == 0 )
throw new UnsupportedOperationException ( "empty collection" )
else
return results . reduceLeft ( f )
}
def take ( num : Int ) : Array [ T ] = {
if ( num == 0 )
return new Array [ T ] ( 0 )
val buf = new ArrayBuffer [ T ]
for ( split <- splits ; elem <- iterator ( split ) ) {
buf += elem
if ( buf . length == num )
return buf . toArray
}
return buf . toArray
}
def first : T = take ( 1 ) match {
case Array ( t ) => t
case _ => throw new UnsupportedOperationException ( "empty collection" )
}
def count ( ) : Long =
try { map ( x => 1L ) . reduce ( _ + _ ) }
catch { case e : UnsupportedOperationException => 0L }
2010-06-18 15:54:33 -04:00
2010-08-31 15:08:09 -04:00
def union ( other : RDD [ T ] ) = new UnionRDD ( sc , this , other )
def cartesian [ U : ClassManifest ] ( other : RDD [ U ] ) = new CartesianRDD ( sc , this , other )
2010-06-18 15:54:33 -04:00
2010-08-31 15:08:09 -04:00
def ++ ( other : RDD [ T ] ) = this . union ( other )
2010-08-18 18:25:57 -04:00
2010-06-17 15:49:42 -04:00
}
@serializable
2010-08-31 15:08:09 -04:00
abstract class RDDTask [ U : ClassManifest , T : ClassManifest ] (
val rdd : RDD [ T ] , val split : Split )
2010-06-17 15:49:42 -04:00
extends Task [ U ] {
2010-06-27 18:21:54 -04:00
override def preferredLocations ( ) = rdd . preferredLocations ( split )
2010-06-17 15:49:42 -04:00
override def markStarted ( slot : SlaveOffer ) { rdd . taskStarted ( split , slot ) }
}
2010-08-31 15:08:09 -04:00
class ForeachTask [ T : ClassManifest ] (
rdd : RDD [ T ] , split : Split , func : T => Unit )
2010-09-29 02:22:07 -04:00
extends RDDTask [ Unit , T ] ( rdd , split ) with Logging {
2010-06-17 15:49:42 -04:00
override def run ( ) {
2010-09-29 02:22:07 -04:00
logInfo ( "Processing " + split )
2010-06-17 15:49:42 -04:00
rdd . iterator ( split ) . foreach ( func )
}
}
2010-08-31 15:08:09 -04:00
class CollectTask [ T ] (
rdd : RDD [ T ] , split : Split ) ( implicit m : ClassManifest [ T ] )
2010-09-29 02:22:07 -04:00
extends RDDTask [ Array [ T ] , T ] ( rdd , split ) with Logging {
2010-06-17 15:49:42 -04:00
override def run ( ) : Array [ T ] = {
2010-09-29 02:22:07 -04:00
logInfo ( "Processing " + split )
2010-06-17 15:49:42 -04:00
rdd . iterator ( split ) . toArray ( m )
}
}
2010-08-31 15:08:09 -04:00
class ReduceTask [ T : ClassManifest ] (
rdd : RDD [ T ] , split : Split , f : ( T , T ) => T )
2010-09-29 02:22:07 -04:00
extends RDDTask [ Option [ T ] , T ] ( rdd , split ) with Logging {
2010-06-17 15:49:42 -04:00
override def run ( ) : Option [ T ] = {
2010-09-29 02:22:07 -04:00
logInfo ( "Processing " + split )
2010-06-17 15:49:42 -04:00
val iter = rdd . iterator ( split )
if ( iter . hasNext )
Some ( iter . reduceLeft ( f ) )
else
None
}
}
2010-08-31 15:08:09 -04:00
class MappedRDD [ U : ClassManifest , T : ClassManifest ] (
prev : RDD [ T ] , f : T => U )
extends RDD [ U ] ( prev . sparkContext ) {
2010-06-17 15:49:42 -04:00
override def splits = prev . splits
2010-06-27 18:21:54 -04:00
override def preferredLocations ( split : Split ) = prev . preferredLocations ( split )
2010-06-17 15:49:42 -04:00
override def iterator ( split : Split ) = prev . iterator ( split ) . map ( f )
override def taskStarted ( split : Split , slot : SlaveOffer ) = prev . taskStarted ( split , slot )
}
2010-08-31 15:08:09 -04:00
class FilteredRDD [ T : ClassManifest ] (
prev : RDD [ T ] , f : T => Boolean )
extends RDD [ T ] ( prev . sparkContext ) {
2010-06-17 15:49:42 -04:00
override def splits = prev . splits
2010-06-27 18:21:54 -04:00
override def preferredLocations ( split : Split ) = prev . preferredLocations ( split )
2010-06-17 15:49:42 -04:00
override def iterator ( split : Split ) = prev . iterator ( split ) . filter ( f )
override def taskStarted ( split : Split , slot : SlaveOffer ) = prev . taskStarted ( split , slot )
}
2010-08-31 15:08:09 -04:00
class SplitRDD [ T : ClassManifest ] ( prev : RDD [ T ] )
extends RDD [ Array [ T ] ] ( prev . sparkContext ) {
2010-08-18 18:25:57 -04:00
override def splits = prev . splits
override def preferredLocations ( split : Split ) = prev . preferredLocations ( split )
override def iterator ( split : Split ) = Iterator . fromArray ( Array ( prev . iterator ( split ) . toArray ) )
override def taskStarted ( split : Split , slot : SlaveOffer ) = prev . taskStarted ( split , slot )
}
2010-08-31 15:08:09 -04:00
@serializable class SeededSplit ( val prev : Split , val seed : Int ) extends Split { }
2010-08-18 18:25:57 -04:00
2010-08-31 15:08:09 -04:00
class SampledRDD [ T : ClassManifest ] (
prev : RDD [ T ] , withReplacement : Boolean , frac : Double , seed : Int )
extends RDD [ T ] ( prev . sparkContext ) {
2010-08-18 18:25:57 -04:00
@transient val splits_ = { val rg = new Random ( seed ) ; prev . splits . map ( x => new SeededSplit ( x , rg . nextInt ) ) }
2010-08-31 15:08:09 -04:00
override def splits = splits_ . asInstanceOf [ Array [ Split ] ]
override def preferredLocations ( split : Split ) = prev . preferredLocations ( split . asInstanceOf [ SeededSplit ] . prev )
override def iterator ( splitIn : Split ) = {
val split = splitIn . asInstanceOf [ SeededSplit ]
2010-08-18 18:59:35 -04:00
val rg = new Random ( split . seed ) ;
// Sampling with replacement (TODO: use reservoir sampling to make this more efficient?)
if ( withReplacement ) {
val oldData = prev . iterator ( split . prev ) . toArray
val sampleSize = ( oldData . size * frac ) . ceil . toInt
val sampledData = for ( i <- 1 to sampleSize ) yield oldData ( rg . nextInt ( oldData . size ) ) // all of oldData's indices are candidates, even if sampleSize < oldData.size
sampledData . iterator
}
// Sampling without replacement
else {
prev . iterator ( split . prev ) . filter ( x => ( rg . nextDouble <= frac ) )
}
}
2010-08-31 15:08:09 -04:00
override def taskStarted ( split : Split , slot : SlaveOffer ) = prev . taskStarted ( split . asInstanceOf [ SeededSplit ] . prev , slot )
2010-08-18 18:25:57 -04:00
}
2010-08-18 18:59:35 -04:00
2010-08-31 15:08:09 -04:00
class CachedRDD [ T ] (
prev : RDD [ T ] ) ( implicit m : ClassManifest [ T ] )
2010-09-29 02:22:07 -04:00
extends RDD [ T ] ( prev . sparkContext ) with Logging {
2010-06-17 15:49:42 -04:00
val id = CachedRDD . newId ( )
2010-06-27 18:21:54 -04:00
@transient val cacheLocs = Map [ Split , List [ String ] ] ( )
2010-06-17 15:49:42 -04:00
override def splits = prev . splits
2010-06-27 18:21:54 -04:00
override def preferredLocations ( split : Split ) = {
2010-06-17 15:49:42 -04:00
if ( cacheLocs . contains ( split ) )
2010-06-27 18:21:54 -04:00
cacheLocs ( split )
2010-06-17 15:49:42 -04:00
else
2010-06-27 18:21:54 -04:00
prev . preferredLocations ( split )
2010-06-17 15:49:42 -04:00
}
override def iterator ( split : Split ) : Iterator [ T ] = {
val key = id + "::" + split . toString
2010-10-03 01:06:06 -04:00
logInfo ( "CachedRDD split key is " + key )
2010-06-17 15:49:42 -04:00
val cache = CachedRDD . cache
val loading = CachedRDD . loading
val cachedVal = cache . get ( key )
if ( cachedVal != null ) {
// Split is in cache, so just return its values
return Iterator . fromArray ( cachedVal . asInstanceOf [ Array [ T ] ] )
} else {
// Mark the split as loading (unless someone else marks it first)
loading . synchronized {
if ( loading . contains ( key ) ) {
while ( loading . contains ( key ) ) {
try { loading . wait ( ) } catch { case _ => }
}
return Iterator . fromArray ( cache . get ( key ) . asInstanceOf [ Array [ T ] ] )
} else {
loading . add ( key )
}
}
// If we got here, we have to load the split
2010-09-29 02:22:07 -04:00
logInfo ( "Loading and caching " + split )
2010-06-17 15:49:42 -04:00
val array = prev . iterator ( split ) . toArray ( m )
cache . put ( key , array )
loading . synchronized {
loading . remove ( key )
loading . notifyAll ( )
}
return Iterator . fromArray ( array )
}
}
override def taskStarted ( split : Split , slot : SlaveOffer ) {
val oldList = cacheLocs . getOrElse ( split , Nil )
2010-06-27 18:21:54 -04:00
val host = slot . getHost
if ( ! oldList . contains ( host ) )
cacheLocs ( split ) = host : : oldList
2010-06-17 15:49:42 -04:00
}
}
private object CachedRDD {
val nextId = new AtomicLong ( 0 ) // Generates IDs for cached RDDs (on master)
def newId ( ) = nextId . getAndIncrement ( )
// Stores map results for various splits locally (on workers)
val cache = new MapMaker ( ) . softValues ( ) . makeMap [ String , AnyRef ] ( )
// Remembers which splits are currently being loaded (on workers)
val loading = new HashSet [ String ]
}
2010-06-18 15:54:33 -04:00
@serializable
2010-08-31 15:08:09 -04:00
abstract class UnionSplit [ T : ClassManifest ] extends Split {
2010-06-18 15:54:33 -04:00
def iterator ( ) : Iterator [ T ]
2010-06-27 18:21:54 -04:00
def preferredLocations ( ) : Seq [ String ]
2010-06-18 15:54:33 -04:00
}
@serializable
2010-08-31 15:08:09 -04:00
class UnionSplitImpl [ T : ClassManifest ] (
rdd : RDD [ T ] , split : Split )
2010-06-18 15:54:33 -04:00
extends UnionSplit [ T ] {
override def iterator ( ) = rdd . iterator ( split )
2010-06-27 18:21:54 -04:00
override def preferredLocations ( ) = rdd . preferredLocations ( split )
2010-06-18 15:54:33 -04:00
}
@serializable
2010-08-31 15:08:09 -04:00
class UnionRDD [ T : ClassManifest ] (
sc : SparkContext , rdd1 : RDD [ T ] , rdd2 : RDD [ T ] )
extends RDD [ T ] ( sc ) {
2010-06-18 15:54:33 -04:00
@transient val splits_ : Array [ UnionSplit [ T ] ] = {
val a1 = rdd1 . splits . map ( s => new UnionSplitImpl ( rdd1 , s ) )
val a2 = rdd2 . splits . map ( s => new UnionSplitImpl ( rdd2 , s ) )
( a1 ++ a2 ) . toArray
}
2010-08-31 15:08:09 -04:00
override def splits = splits_ . asInstanceOf [ Array [ Split ] ]
override def iterator ( s : Split ) : Iterator [ T ] = s . asInstanceOf [ UnionSplit [ T ] ] . iterator ( )
override def preferredLocations ( s : Split ) : Seq [ String ] =
s . asInstanceOf [ UnionSplit [ T ] ] . preferredLocations ( )
}
@serializable class CartesianSplit ( val s1 : Split , val s2 : Split ) extends Split { }
@serializable
class CartesianRDD [ T : ClassManifest , U : ClassManifest ] (
sc : SparkContext , rdd1 : RDD [ T ] , rdd2 : RDD [ U ] )
extends RDD [ Pair [ T , U ] ] ( sc ) {
@transient val splits_ = {
// create the cross product split
rdd2 . splits . map ( y => rdd1 . splits . map ( x => new CartesianSplit ( x , y ) ) ) . flatten
}
override def splits = splits_ . asInstanceOf [ Array [ Split ] ]
2010-06-18 15:54:33 -04:00
2010-08-31 15:08:09 -04:00
override def preferredLocations ( split : Split ) = {
val currSplit = split . asInstanceOf [ CartesianSplit ]
rdd1 . preferredLocations ( currSplit . s1 ) ++ rdd2 . preferredLocations ( currSplit . s2 )
}
override def iterator ( split : Split ) = {
val currSplit = split . asInstanceOf [ CartesianSplit ]
for ( x <- rdd1 . iterator ( currSplit . s1 ) ; y <- rdd2 . iterator ( currSplit . s2 ) ) yield ( x , y )
}
2010-06-18 15:54:33 -04:00
2010-08-31 15:08:09 -04:00
override def taskStarted ( split : Split , slot : SlaveOffer ) = {
val currSplit = split . asInstanceOf [ CartesianSplit ]
rdd1 . taskStarted ( currSplit . s1 , slot )
rdd2 . taskStarted ( currSplit . s2 , slot )
}
2010-06-18 15:54:33 -04:00
}