2011-05-17 15:41:13 -04:00
|
|
|
package spark
|
|
|
|
|
2012-06-29 02:51:28 -04:00
|
|
|
import akka.actor.ActorSystem
|
2012-06-07 03:25:47 -04:00
|
|
|
|
|
|
|
import spark.storage.BlockManager
|
|
|
|
import spark.storage.BlockManagerMaster
|
|
|
|
import spark.network.ConnectionManager
|
2012-06-30 17:45:55 -04:00
|
|
|
import spark.util.AkkaUtils
|
2012-06-07 03:25:47 -04:00
|
|
|
|
2011-05-17 15:41:13 -04:00
|
|
|
class SparkEnv (
|
2012-06-29 02:51:28 -04:00
|
|
|
val actorSystem: ActorSystem,
|
2012-06-07 03:25:47 -04:00
|
|
|
val cache: Cache,
|
|
|
|
val serializer: Serializer,
|
|
|
|
val closureSerializer: Serializer,
|
|
|
|
val cacheTracker: CacheTracker,
|
|
|
|
val mapOutputTracker: MapOutputTracker,
|
|
|
|
val shuffleFetcher: ShuffleFetcher,
|
|
|
|
val shuffleManager: ShuffleManager,
|
|
|
|
val blockManager: BlockManager,
|
|
|
|
val connectionManager: ConnectionManager
|
|
|
|
) {
|
|
|
|
|
|
|
|
/** No-parameter constructor for unit tests. */
|
2012-07-30 14:24:01 -04:00
|
|
|
def this() = {
|
|
|
|
this(null, null, new JavaSerializer, new JavaSerializer, null, null, null, null, null, null)
|
|
|
|
}
|
|
|
|
|
|
|
|
def stop() {
|
|
|
|
mapOutputTracker.stop()
|
|
|
|
cacheTracker.stop()
|
|
|
|
shuffleFetcher.stop()
|
|
|
|
shuffleManager.stop()
|
|
|
|
blockManager.stop()
|
|
|
|
BlockManagerMaster.stopBlockManagerMaster()
|
|
|
|
actorSystem.shutdown()
|
|
|
|
actorSystem.awaitTermination()
|
|
|
|
}
|
2012-06-07 03:25:47 -04:00
|
|
|
}
|
2011-05-17 15:41:13 -04:00
|
|
|
|
|
|
|
object SparkEnv {
|
|
|
|
private val env = new ThreadLocal[SparkEnv]
|
|
|
|
|
|
|
|
def set(e: SparkEnv) {
|
|
|
|
env.set(e)
|
|
|
|
}
|
|
|
|
|
|
|
|
def get: SparkEnv = {
|
|
|
|
env.get()
|
|
|
|
}
|
|
|
|
|
2012-06-29 19:01:36 -04:00
|
|
|
def createFromSystemProperties(
|
|
|
|
hostname: String,
|
|
|
|
port: Int,
|
|
|
|
isMaster: Boolean,
|
|
|
|
isLocal: Boolean
|
|
|
|
) : SparkEnv = {
|
|
|
|
|
2012-06-30 17:45:55 -04:00
|
|
|
val (actorSystem, boundPort) = AkkaUtils.createActorSystem("spark", hostname, port)
|
2012-06-29 02:51:28 -04:00
|
|
|
|
2012-06-29 19:01:36 -04:00
|
|
|
// Bit of a hack: If this is the master and our port was 0 (meaning bind to any free port),
|
|
|
|
// figure out which port number Akka actually bound to and set spark.master.port to it.
|
|
|
|
if (isMaster && port == 0) {
|
2012-06-30 17:45:55 -04:00
|
|
|
System.setProperty("spark.master.port", boundPort.toString)
|
2012-06-29 19:01:36 -04:00
|
|
|
}
|
|
|
|
|
2012-06-07 03:25:47 -04:00
|
|
|
val serializerClass = System.getProperty("spark.serializer", "spark.KryoSerializer")
|
2011-07-09 17:25:56 -04:00
|
|
|
val serializer = Class.forName(serializerClass).newInstance().asInstanceOf[Serializer]
|
2012-06-07 03:25:47 -04:00
|
|
|
|
2012-06-29 02:51:28 -04:00
|
|
|
BlockManagerMaster.startBlockManagerMaster(actorSystem, isMaster, isLocal)
|
2012-07-30 14:24:01 -04:00
|
|
|
|
|
|
|
val blockManager = new BlockManager(serializer)
|
2012-06-07 03:25:47 -04:00
|
|
|
|
|
|
|
val connectionManager = blockManager.connectionManager
|
|
|
|
|
|
|
|
val shuffleManager = new ShuffleManager()
|
2011-05-17 15:41:13 -04:00
|
|
|
|
2012-04-10 16:29:46 -04:00
|
|
|
val closureSerializerClass =
|
|
|
|
System.getProperty("spark.closure.serializer", "spark.JavaSerializer")
|
|
|
|
val closureSerializer =
|
|
|
|
Class.forName(closureSerializerClass).newInstance().asInstanceOf[Serializer]
|
2012-06-07 03:25:47 -04:00
|
|
|
val cacheClass = System.getProperty("spark.cache.class", "spark.BoundedMemoryCache")
|
|
|
|
val cache = Class.forName(cacheClass).newInstance().asInstanceOf[Cache]
|
2012-04-10 16:29:46 -04:00
|
|
|
|
2012-06-29 02:51:28 -04:00
|
|
|
val cacheTracker = new CacheTracker(actorSystem, isMaster, blockManager)
|
2012-06-07 03:25:47 -04:00
|
|
|
blockManager.cacheTracker = cacheTracker
|
2011-05-17 15:41:13 -04:00
|
|
|
|
2012-06-29 02:51:28 -04:00
|
|
|
val mapOutputTracker = new MapOutputTracker(actorSystem, isMaster)
|
2011-05-17 15:41:13 -04:00
|
|
|
|
2012-02-09 16:26:23 -05:00
|
|
|
val shuffleFetcherClass =
|
2012-06-07 03:25:47 -04:00
|
|
|
System.getProperty("spark.shuffle.fetcher", "spark.BlockStoreShuffleFetcher")
|
2012-02-09 16:26:23 -05:00
|
|
|
val shuffleFetcher =
|
|
|
|
Class.forName(shuffleFetcherClass).newInstance().asInstanceOf[ShuffleFetcher]
|
2011-07-09 17:25:56 -04:00
|
|
|
|
2012-06-07 03:25:47 -04:00
|
|
|
/*
|
|
|
|
if (System.getProperty("spark.stream.distributed", "false") == "true") {
|
|
|
|
val blockManagerClass = classOf[spark.storage.BlockManager].asInstanceOf[Class[_]]
|
|
|
|
if (isLocal || !isMaster) {
|
|
|
|
(new Thread() {
|
|
|
|
override def run() {
|
|
|
|
println("Wait started")
|
|
|
|
Thread.sleep(60000)
|
|
|
|
println("Wait ended")
|
|
|
|
val receiverClass = Class.forName("spark.stream.TestStreamReceiver4")
|
|
|
|
val constructor = receiverClass.getConstructor(blockManagerClass)
|
|
|
|
val receiver = constructor.newInstance(blockManager)
|
|
|
|
receiver.asInstanceOf[Thread].start()
|
|
|
|
}
|
|
|
|
}).start()
|
|
|
|
}
|
|
|
|
}
|
|
|
|
*/
|
2012-02-10 01:14:19 -05:00
|
|
|
|
2012-06-29 02:51:28 -04:00
|
|
|
new SparkEnv(
|
|
|
|
actorSystem,
|
|
|
|
cache,
|
|
|
|
serializer,
|
|
|
|
closureSerializer,
|
|
|
|
cacheTracker,
|
|
|
|
mapOutputTracker,
|
|
|
|
shuffleFetcher,
|
|
|
|
shuffleManager,
|
|
|
|
blockManager,
|
|
|
|
connectionManager)
|
2011-05-17 15:41:13 -04:00
|
|
|
}
|
|
|
|
}
|