Merge pull request #623 from rxin/master

Automatically configure Netty port.
This commit is contained in:
Matei Zaharia 2013-05-24 16:48:52 -07:00
commit 24e41aa423
10 changed files with 129 additions and 112 deletions

View file

@ -1,51 +1,83 @@
package spark.network.netty;
import java.net.InetSocketAddress;
import io.netty.bootstrap.ServerBootstrap;
import io.netty.channel.ChannelOption;
import io.netty.channel.Channel;
import io.netty.channel.ChannelFuture;
import io.netty.channel.ChannelOption;
import io.netty.channel.oio.OioEventLoopGroup;
import io.netty.channel.socket.oio.OioServerSocketChannel;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
/**
* Server that accept the path of a file an echo back its content.
*/
class FileServer {
private Logger LOG = LoggerFactory.getLogger(this.getClass().getName());
private ServerBootstrap bootstrap = null;
private Channel channel = null;
private PathResolver pResolver;
private ChannelFuture channelFuture = null;
private int port = 0;
private Thread blockingThread = null;
public FileServer(PathResolver pResolver) {
this.pResolver = pResolver;
}
public FileServer(PathResolver pResolver, int port) {
InetSocketAddress addr = new InetSocketAddress(port);
public void run(int port) {
// Configure the server.
bootstrap = new ServerBootstrap();
try {
bootstrap.group(new OioEventLoopGroup(), new OioEventLoopGroup())
bootstrap.group(new OioEventLoopGroup(), new OioEventLoopGroup())
.channel(OioServerSocketChannel.class)
.option(ChannelOption.SO_BACKLOG, 100)
.option(ChannelOption.SO_RCVBUF, 1500)
.childHandler(new FileServerChannelInitializer(pResolver));
// Start the server.
channel = bootstrap.bind(port).sync().channel();
channel.closeFuture().sync();
} catch (InterruptedException e) {
// TODO Auto-generated catch block
e.printStackTrace();
} finally{
// Start the server.
channelFuture = bootstrap.bind(addr);
this.port = addr.getPort();
}
/**
* Start the file server asynchronously in a new thread.
*/
public void start() {
try {
blockingThread = new Thread() {
public void run() {
try {
Channel channel = channelFuture.sync().channel();
channel.closeFuture().sync();
} catch (InterruptedException e) {
LOG.error("File server start got interrupted", e);
}
}
};
blockingThread.setDaemon(true);
blockingThread.start();
} finally {
bootstrap.shutdown();
}
}
public int getPort() {
return port;
}
public void stop() {
if (channel!=null) {
channel.close();
if (blockingThread != null) {
blockingThread.stop();
blockingThread = null;
}
if (channelFuture != null) {
channelFuture.channel().closeFuture();
channelFuture = null;
}
if (bootstrap != null) {
bootstrap.shutdown();
bootstrap = null;
}
}
}

View file

@ -5,23 +5,22 @@ import java.io.File
import spark.Logging
private[spark] class ShuffleSender(val port: Int, val pResolver: PathResolver) extends Logging {
val server = new FileServer(pResolver)
private[spark] class ShuffleSender(portIn: Int, val pResolver: PathResolver) extends Logging {
Runtime.getRuntime().addShutdownHook(
new Thread() {
override def run() {
server.stop()
}
}
)
val server = new FileServer(pResolver, portIn)
server.start()
def start() {
server.run(port)
def stop() {
server.stop()
}
def port: Int = server.getPort()
}
/**
* An application for testing the shuffle sender as a standalone program.
*/
private[spark] object ShuffleSender {
def main(args: Array[String]) {
@ -50,7 +49,5 @@ private[spark] object ShuffleSender {
}
}
val sender = new ShuffleSender(port, pResovler)
sender.start()
}
}

View file

@ -272,8 +272,7 @@ object BlockFetcherIterator {
logDebug("Sending request for %d blocks (%s) from %s".format(
req.blocks.size, Utils.memoryBytesToString(req.size), req.address.host))
val cmId = new ConnectionManagerId(
req.address.host, System.getProperty("spark.shuffle.sender.port", "6653").toInt)
val cmId = new ConnectionManagerId(req.address.host, req.address.nettyPort)
val cpier = new ShuffleCopier
cpier.getBlocks(cmId, req.blocks, putResult)
logDebug("Sent request for remote blocks " + req.blocks + " from " + req.address.host )

View file

@ -94,11 +94,16 @@ private[spark] class BlockManager(
private[storage] val diskStore: DiskStore =
new DiskStore(this, System.getProperty("spark.local.dir", System.getProperty("java.io.tmpdir")))
// If we use Netty for shuffle, start a new Netty-based shuffle sender service.
private val useNetty = System.getProperty("spark.shuffle.use.netty", "false").toBoolean
private val nettyPortConfig = System.getProperty("spark.shuffle.sender.port", "0").toInt
private val nettyPort = if (useNetty) diskStore.startShuffleBlockSender(nettyPortConfig) else 0
val connectionManager = new ConnectionManager(0)
implicit val futureExecContext = connectionManager.futureExecContext
val blockManagerId = BlockManagerId(
executorId, connectionManager.id.host, connectionManager.id.port)
executorId, connectionManager.id.host, connectionManager.id.port, nettyPort)
// Max megabytes of data to keep in flight per reducer (to avoid over-allocating memory
// for receiving shuffle outputs)
@ -266,7 +271,6 @@ private[spark] class BlockManager(
}
}
/**
* Get locations of an array of blocks.
*/
@ -274,7 +278,7 @@ private[spark] class BlockManager(
val startTimeMs = System.currentTimeMillis
val locations = master.getLocations(blockIds).toArray
logDebug("Got multiple block location in " + Utils.getUsedTimeMs(startTimeMs))
return locations
locations
}
/**
@ -971,8 +975,7 @@ private[spark] object BlockManager extends Logging {
assert (env != null || blockManagerMaster != null)
val locationBlockIds: Seq[Seq[BlockManagerId]] =
if (env != null) {
val blockManager = env.blockManager
blockManager.getLocationBlockIds(blockIds)
env.blockManager.getLocationBlockIds(blockIds)
} else {
blockManagerMaster.getLocations(blockIds)
}

View file

@ -7,18 +7,19 @@ import spark.Utils
/**
* This class represent an unique identifier for a BlockManager.
* The first 2 constructors of this class is made private to ensure that
* BlockManagerId objects can be created only using the factory method in
* [[spark.storage.BlockManager$]]. This allows de-duplication of ID objects.
* BlockManagerId objects can be created only using the apply method in
* the companion object. This allows de-duplication of ID objects.
* Also, constructor parameters are private to ensure that parameters cannot
* be modified from outside this class.
*/
private[spark] class BlockManagerId private (
private var executorId_ : String,
private var host_ : String,
private var port_ : Int
private var port_ : Int,
private var nettyPort_ : Int
) extends Externalizable {
private def this() = this(null, null, 0) // For deserialization only
private def this() = this(null, null, 0, 0) // For deserialization only
def executorId: String = executorId_
@ -39,28 +40,32 @@ private[spark] class BlockManagerId private (
def port: Int = port_
def nettyPort: Int = nettyPort_
override def writeExternal(out: ObjectOutput) {
out.writeUTF(executorId_)
out.writeUTF(host_)
out.writeInt(port_)
out.writeInt(nettyPort_)
}
override def readExternal(in: ObjectInput) {
executorId_ = in.readUTF()
host_ = in.readUTF()
port_ = in.readInt()
nettyPort_ = in.readInt()
}
@throws(classOf[IOException])
private def readResolve(): Object = BlockManagerId.getCachedBlockManagerId(this)
override def toString = "BlockManagerId(%s, %s, %d)".format(executorId, host, port)
override def toString = "BlockManagerId(%s, %s, %d, %d)".format(executorId, host, port, nettyPort)
override def hashCode: Int = (executorId.hashCode * 41 + host.hashCode) * 41 + port
override def hashCode: Int = (executorId.hashCode * 41 + host.hashCode) * 41 + port + nettyPort
override def equals(that: Any) = that match {
case id: BlockManagerId =>
executorId == id.executorId && port == id.port && host == id.host
executorId == id.executorId && port == id.port && host == id.host && nettyPort == id.nettyPort
case _ =>
false
}
@ -69,8 +74,17 @@ private[spark] class BlockManagerId private (
private[spark] object BlockManagerId {
def apply(execId: String, host: String, port: Int) =
getCachedBlockManagerId(new BlockManagerId(execId, host, port))
/**
* Returns a [[spark.storage.BlockManagerId]] for the given configuraiton.
*
* @param execId ID of the executor.
* @param host Host name of the block manager.
* @param port Port of the block manager.
* @param nettyPort Optional port for the Netty-based shuffle sender.
* @return A new [[spark.storage.BlockManagerId]].
*/
def apply(execId: String, host: String, port: Int, nettyPort: Int) =
getCachedBlockManagerId(new BlockManagerId(execId, host, port, nettyPort))
def apply(in: ObjectInput) = {
val obj = new BlockManagerId()

View file

@ -82,22 +82,15 @@ private class DiskStore(blockManager: BlockManager, rootDirs: String)
val MAX_DIR_CREATION_ATTEMPTS: Int = 10
val subDirsPerLocalDir = System.getProperty("spark.diskStore.subDirectories", "64").toInt
var shuffleSender : Thread = null
val thisInstance = this
var shuffleSender : ShuffleSender = null
// Create one local directory for each path mentioned in spark.local.dir; then, inside this
// directory, create multiple subdirectories that we will hash files into, in order to avoid
// having really large inodes at the top level.
val localDirs = createLocalDirs()
val subDirs = Array.fill(localDirs.length)(new Array[File](subDirsPerLocalDir))
val useNetty = System.getProperty("spark.shuffle.use.netty", "false").toBoolean
addShutdownHook()
if(useNetty){
startShuffleBlockSender()
}
def getBlockWriter(blockId: String, serializer: Serializer, bufferSize: Int)
: BlockObjectWriter = {
new DiskBlockObjectWriter(blockId, serializer, bufferSize)
@ -274,8 +267,9 @@ private class DiskStore(blockManager: BlockManager, rootDirs: String)
localDirs.foreach { localDir =>
if (!Utils.hasRootAsShutdownDeleteDir(localDir)) Utils.deleteRecursively(localDir)
}
if (useNetty && shuffleSender != null)
if (shuffleSender != null) {
shuffleSender.stop
}
} catch {
case t: Throwable => logError("Exception while deleting local spark dirs", t)
}
@ -283,39 +277,17 @@ private class DiskStore(blockManager: BlockManager, rootDirs: String)
})
}
private def startShuffleBlockSender() {
try {
val port = System.getProperty("spark.shuffle.sender.port", "6653").toInt
val pResolver = new PathResolver {
override def getAbsolutePath(blockId: String): String = {
if (!blockId.startsWith("shuffle_")) {
return null
}
thisInstance.getFile(blockId).getAbsolutePath()
}
}
shuffleSender = new Thread {
override def run() = {
val sender = new ShuffleSender(port, pResolver)
logInfo("Created ShuffleSender binding to port : "+ port)
sender.start
}
}
shuffleSender.setDaemon(true)
shuffleSender.start
} catch {
case interrupted: InterruptedException =>
logInfo("Runner thread for ShuffleBlockSender interrupted")
case e: Exception => {
logError("Error running ShuffleBlockSender ", e)
if (shuffleSender != null) {
shuffleSender.stop
shuffleSender = null
private[storage] def startShuffleBlockSender(port: Int): Int = {
val pResolver = new PathResolver {
override def getAbsolutePath(blockId: String): String = {
if (!blockId.startsWith("shuffle_")) {
return null
}
DiskStore.this.getFile(blockId).getAbsolutePath()
}
}
shuffleSender = new ShuffleSender(port, pResolver)
logInfo("Created ShuffleSender binding to port : "+ shuffleSender.port)
shuffleSender.port
}
}

View file

@ -8,7 +8,7 @@ import spark.storage.BlockManagerId
import spark.util.AkkaUtils
class MapOutputTrackerSuite extends FunSuite with LocalSparkContext {
test("compressSize") {
assert(MapOutputTracker.compressSize(0L) === 0)
assert(MapOutputTracker.compressSize(1L) === 1)
@ -45,13 +45,13 @@ class MapOutputTrackerSuite extends FunSuite with LocalSparkContext {
val compressedSize10000 = MapOutputTracker.compressSize(10000L)
val size1000 = MapOutputTracker.decompressSize(compressedSize1000)
val size10000 = MapOutputTracker.decompressSize(compressedSize10000)
tracker.registerMapOutput(10, 0, new MapStatus(BlockManagerId("a", "hostA", 1000),
tracker.registerMapOutput(10, 0, new MapStatus(BlockManagerId("a", "hostA", 1000, 0),
Array(compressedSize1000, compressedSize10000)))
tracker.registerMapOutput(10, 1, new MapStatus(BlockManagerId("b", "hostB", 1000),
tracker.registerMapOutput(10, 1, new MapStatus(BlockManagerId("b", "hostB", 1000, 0),
Array(compressedSize10000, compressedSize1000)))
val statuses = tracker.getServerStatuses(10, 0)
assert(statuses.toSeq === Seq((BlockManagerId("a", "hostA", 1000), size1000),
(BlockManagerId("b", "hostB", 1000), size10000)))
assert(statuses.toSeq === Seq((BlockManagerId("a", "hostA", 1000, 0), size1000),
(BlockManagerId("b", "hostB", 1000, 0), size10000)))
tracker.stop()
}
@ -64,14 +64,14 @@ class MapOutputTrackerSuite extends FunSuite with LocalSparkContext {
val compressedSize10000 = MapOutputTracker.compressSize(10000L)
val size1000 = MapOutputTracker.decompressSize(compressedSize1000)
val size10000 = MapOutputTracker.decompressSize(compressedSize10000)
tracker.registerMapOutput(10, 0, new MapStatus(BlockManagerId("a", "hostA", 1000),
tracker.registerMapOutput(10, 0, new MapStatus(BlockManagerId("a", "hostA", 1000, 0),
Array(compressedSize1000, compressedSize1000, compressedSize1000)))
tracker.registerMapOutput(10, 1, new MapStatus(BlockManagerId("b", "hostB", 1000),
tracker.registerMapOutput(10, 1, new MapStatus(BlockManagerId("b", "hostB", 1000, 0),
Array(compressedSize10000, compressedSize1000, compressedSize1000)))
// As if we had two simulatenous fetch failures
tracker.unregisterMapOutput(10, 0, BlockManagerId("a", "hostA", 1000))
tracker.unregisterMapOutput(10, 0, BlockManagerId("a", "hostA", 1000))
tracker.unregisterMapOutput(10, 0, BlockManagerId("a", "hostA", 1000, 0))
tracker.unregisterMapOutput(10, 0, BlockManagerId("a", "hostA", 1000, 0))
// The remaining reduce task might try to grab the output despite the shuffle failure;
// this should cause it to fail, and the scheduler will ignore the failure due to the
@ -88,12 +88,12 @@ class MapOutputTrackerSuite extends FunSuite with LocalSparkContext {
val masterTracker = new MapOutputTracker()
masterTracker.trackerActor = actorSystem.actorOf(
Props(new MapOutputTrackerActor(masterTracker)), "MapOutputTracker")
val (slaveSystem, _) = AkkaUtils.createActorSystem("spark-slave", hostname, 0)
val slaveTracker = new MapOutputTracker()
slaveTracker.trackerActor = slaveSystem.actorFor(
"akka://spark@localhost:" + boundPort + "/user/MapOutputTracker")
masterTracker.registerShuffle(10, 1)
masterTracker.incrementGeneration()
slaveTracker.updateGeneration(masterTracker.getGeneration)
@ -102,13 +102,13 @@ class MapOutputTrackerSuite extends FunSuite with LocalSparkContext {
val compressedSize1000 = MapOutputTracker.compressSize(1000L)
val size1000 = MapOutputTracker.decompressSize(compressedSize1000)
masterTracker.registerMapOutput(10, 0, new MapStatus(
BlockManagerId("a", "hostA", 1000), Array(compressedSize1000)))
BlockManagerId("a", "hostA", 1000, 0), Array(compressedSize1000)))
masterTracker.incrementGeneration()
slaveTracker.updateGeneration(masterTracker.getGeneration)
assert(slaveTracker.getServerStatuses(10, 0).toSeq ===
Seq((BlockManagerId("a", "hostA", 1000), size1000)))
Seq((BlockManagerId("a", "hostA", 1000, 0), size1000)))
masterTracker.unregisterMapOutput(10, 0, BlockManagerId("a", "hostA", 1000))
masterTracker.unregisterMapOutput(10, 0, BlockManagerId("a", "hostA", 1000, 0))
masterTracker.incrementGeneration()
slaveTracker.updateGeneration(masterTracker.getGeneration)
intercept[FetchFailedException] { slaveTracker.getServerStatuses(10, 0) }

View file

@ -326,5 +326,5 @@ object ShuffleSuite {
x + y
}
class NonJavaSerializableClass(val value: Int)
class NonJavaSerializableClass(val value: Int) extends Serializable
}

View file

@ -44,7 +44,7 @@ class DAGSchedulerSuite extends FunSuite with BeforeAndAfter with LocalSparkCont
override def submitTasks(taskSet: TaskSet) = {
// normally done by TaskSetManager
taskSet.tasks.foreach(_.generation = mapOutputTracker.getGeneration)
taskSets += taskSet
taskSets += taskSet
}
override def setListener(listener: TaskSchedulerListener) = {}
override def defaultParallelism() = 2
@ -164,7 +164,7 @@ class DAGSchedulerSuite extends FunSuite with BeforeAndAfter with LocalSparkCont
}
}
}
/** Sends the rdd to the scheduler for scheduling. */
private def submit(
rdd: RDD[_],
@ -174,7 +174,7 @@ class DAGSchedulerSuite extends FunSuite with BeforeAndAfter with LocalSparkCont
listener: JobListener = listener) {
runEvent(JobSubmitted(rdd, func, partitions, allowLocal, null, listener))
}
/** Sends TaskSetFailed to the scheduler. */
private def failed(taskSet: TaskSet, message: String) {
runEvent(TaskSetFailed(taskSet, message))
@ -209,11 +209,11 @@ class DAGSchedulerSuite extends FunSuite with BeforeAndAfter with LocalSparkCont
runEvent(JobSubmitted(rdd, jobComputeFunc, Array(0), true, null, listener))
assert(results === Map(0 -> 42))
}
test("run trivial job w/ dependency") {
val baseRdd = makeRdd(1, Nil)
val finalRdd = makeRdd(1, List(new OneToOneDependency(baseRdd)))
submit(finalRdd, Array(0))
submit(finalRdd, Array(0))
complete(taskSets(0), Seq((Success, 42)))
assert(results === Map(0 -> 42))
}
@ -250,7 +250,7 @@ class DAGSchedulerSuite extends FunSuite with BeforeAndAfter with LocalSparkCont
complete(taskSets(1), Seq((Success, 42)))
assert(results === Map(0 -> 42))
}
test("run trivial shuffle with fetch failure") {
val shuffleMapRdd = makeRdd(2, Nil)
val shuffleDep = new ShuffleDependency(shuffleMapRdd, null)
@ -398,6 +398,6 @@ class DAGSchedulerSuite extends FunSuite with BeforeAndAfter with LocalSparkCont
new MapStatus(makeBlockManagerId(host), Array.fill[Byte](reduces)(2))
private def makeBlockManagerId(host: String): BlockManagerId =
BlockManagerId("exec-" + host, host, 12345)
BlockManagerId("exec-" + host, host, 12345, 0)
}

View file

@ -99,9 +99,9 @@ class BlockManagerSuite extends FunSuite with BeforeAndAfter with PrivateMethodT
}
test("BlockManagerId object caching") {
val id1 = BlockManagerId("e1", "XXX", 1)
val id2 = BlockManagerId("e1", "XXX", 1) // this should return the same object as id1
val id3 = BlockManagerId("e1", "XXX", 2) // this should return a different object
val id1 = BlockManagerId("e1", "XXX", 1, 0)
val id2 = BlockManagerId("e1", "XXX", 1, 0) // this should return the same object as id1
val id3 = BlockManagerId("e1", "XXX", 2, 0) // this should return a different object
assert(id2 === id1, "id2 is not same as id1")
assert(id2.eq(id1), "id2 is not the same object as id1")
assert(id3 != id1, "id3 is same as id1")