[SPARK-4370] [Core] Limit number of Netty cores based on executor size

Author: Aaron Davidson <aaron@databricks.com>

Closes #3155 from aarondav/conf and squashes the following commits:

7045e77 [Aaron Davidson] Add mesos comment
4770f6e [Aaron Davidson] [SPARK-4370] [Core] Limit number of Netty cores based on executor size
This commit is contained in:
Aaron Davidson 2014-11-12 18:46:37 -08:00 committed by Reynold Xin
parent 23f5bdf06a
commit b9e1c2eb9b
17 changed files with 104 additions and 64 deletions

View file

@ -168,9 +168,11 @@ object SparkEnv extends Logging {
executorId: String,
hostname: String,
port: Int,
numCores: Int,
isLocal: Boolean,
actorSystem: ActorSystem = null): SparkEnv = {
create(conf, executorId, hostname, port, false, isLocal, defaultActorSystem = actorSystem)
create(conf, executorId, hostname, port, false, isLocal, defaultActorSystem = actorSystem,
numUsableCores = numCores)
}
/**
@ -184,7 +186,8 @@ object SparkEnv extends Logging {
isDriver: Boolean,
isLocal: Boolean,
listenerBus: LiveListenerBus = null,
defaultActorSystem: ActorSystem = null): SparkEnv = {
defaultActorSystem: ActorSystem = null,
numUsableCores: Int = 0): SparkEnv = {
// Listener bus is only used on the driver
if (isDriver) {
@ -276,7 +279,7 @@ object SparkEnv extends Logging {
val blockTransferService =
conf.get("spark.shuffle.blockTransferService", "netty").toLowerCase match {
case "netty" =>
new NettyBlockTransferService(conf, securityManager)
new NettyBlockTransferService(conf, securityManager, numUsableCores)
case "nio" =>
new NioBlockTransferService(conf, securityManager)
}
@ -287,7 +290,8 @@ object SparkEnv extends Logging {
// NB: blockManager is not valid until initialize() is called later.
val blockManager = new BlockManager(executorId, actorSystem, blockManagerMaster,
serializer, conf, mapOutputTracker, shuffleManager, blockTransferService, securityManager)
serializer, conf, mapOutputTracker, shuffleManager, blockTransferService, securityManager,
numUsableCores)
val broadcastManager = new BroadcastManager(isDriver, conf, securityManager)

View file

@ -39,7 +39,7 @@ class StandaloneWorkerShuffleService(sparkConf: SparkConf, securityManager: Secu
private val port = sparkConf.getInt("spark.shuffle.service.port", 7337)
private val useSasl: Boolean = securityManager.isAuthenticationEnabled()
private val transportConf = SparkTransportConf.fromSparkConf(sparkConf)
private val transportConf = SparkTransportConf.fromSparkConf(sparkConf, numUsableCores = 0)
private val blockHandler = new ExternalShuffleBlockHandler(transportConf)
private val transportContext: TransportContext = {
val handler = if (useSasl) new SaslRpcHandler(blockHandler, securityManager) else blockHandler

View file

@ -57,9 +57,9 @@ private[spark] class CoarseGrainedExecutorBackend(
override def receiveWithLogging = {
case RegisteredExecutor =>
logInfo("Successfully registered with driver")
// Make this host instead of hostPort ?
val (hostname, _) = Utils.parseHostPort(hostPort)
executor = new Executor(executorId, hostname, sparkProperties, isLocal = false, actorSystem)
executor = new Executor(executorId, hostname, sparkProperties, cores, isLocal = false,
actorSystem)
case RegisterExecutorFailed(message) =>
logError("Slave registration failed: " + message)

View file

@ -43,6 +43,7 @@ private[spark] class Executor(
executorId: String,
slaveHostname: String,
properties: Seq[(String, String)],
numCores: Int,
isLocal: Boolean = false,
actorSystem: ActorSystem = null)
extends Logging
@ -83,7 +84,7 @@ private[spark] class Executor(
if (!isLocal) {
val port = conf.getInt("spark.executor.port", 0)
val _env = SparkEnv.createExecutorEnv(
conf, executorId, slaveHostname, port, isLocal, actorSystem)
conf, executorId, slaveHostname, port, numCores, isLocal, actorSystem)
SparkEnv.set(_env)
_env.metricsSystem.registerSource(executorSource)
_env.blockManager.initialize(conf.getAppId)

View file

@ -19,6 +19,8 @@ package org.apache.spark.executor
import java.nio.ByteBuffer
import scala.collection.JavaConversions._
import org.apache.mesos.protobuf.ByteString
import org.apache.mesos.{Executor => MesosExecutor, ExecutorDriver, MesosExecutorDriver, MesosNativeLibrary}
import org.apache.mesos.Protos.{TaskStatus => MesosTaskStatus, _}
@ -50,14 +52,23 @@ private[spark] class MesosExecutorBackend
executorInfo: ExecutorInfo,
frameworkInfo: FrameworkInfo,
slaveInfo: SlaveInfo) {
logInfo("Registered with Mesos as executor ID " + executorInfo.getExecutorId.getValue)
// Get num cores for this task from ExecutorInfo, created in MesosSchedulerBackend.
val cpusPerTask = executorInfo.getResourcesList
.find(_.getName == "cpus")
.map(_.getScalar.getValue.toInt)
.getOrElse(0)
val executorId = executorInfo.getExecutorId.getValue
logInfo(s"Registered with Mesos as executor ID $executorId with $cpusPerTask cpus")
this.driver = driver
val properties = Utils.deserialize[Array[(String, String)]](executorInfo.getData.toByteArray) ++
Seq[(String, String)](("spark.app.id", frameworkInfo.getId.getValue))
executor = new Executor(
executorInfo.getExecutorId.getValue,
executorId,
slaveInfo.getHostname,
properties)
properties,
cpusPerTask)
}
override def launchTask(d: ExecutorDriver, taskInfo: TaskInfo) {

View file

@ -35,13 +35,13 @@ import org.apache.spark.util.Utils
/**
* A BlockTransferService that uses Netty to fetch a set of blocks at at time.
*/
class NettyBlockTransferService(conf: SparkConf, securityManager: SecurityManager)
class NettyBlockTransferService(conf: SparkConf, securityManager: SecurityManager, numCores: Int)
extends BlockTransferService {
// TODO: Don't use Java serialization, use a more cross-version compatible serialization format.
private val serializer = new JavaSerializer(conf)
private val authEnabled = securityManager.isAuthenticationEnabled()
private val transportConf = SparkTransportConf.fromSparkConf(conf)
private val transportConf = SparkTransportConf.fromSparkConf(conf, numCores)
private[this] var transportContext: TransportContext = _
private[this] var server: TransportServer = _

View file

@ -20,11 +20,22 @@ package org.apache.spark.network.netty
import org.apache.spark.SparkConf
import org.apache.spark.network.util.{TransportConf, ConfigProvider}
/**
* Utility for creating a [[TransportConf]] from a [[SparkConf]].
*/
object SparkTransportConf {
def fromSparkConf(conf: SparkConf): TransportConf = {
/**
* Utility for creating a [[TransportConf]] from a [[SparkConf]].
* @param numUsableCores if nonzero, this will restrict the server and client threads to only
* use the given number of cores, rather than all of the machine's cores.
* This restriction will only occur if these properties are not already set.
*/
def fromSparkConf(_conf: SparkConf, numUsableCores: Int = 0): TransportConf = {
val conf = _conf.clone
if (numUsableCores > 0) {
// Only set if serverThreads/clientThreads not already set.
conf.set("spark.shuffle.io.serverThreads",
conf.get("spark.shuffle.io.serverThreads", numUsableCores.toString))
conf.set("spark.shuffle.io.clientThreads",
conf.get("spark.shuffle.io.clientThreads", numUsableCores.toString))
}
new TransportConf(new ConfigProvider {
override def get(name: String): String = conf.get(name)
})

View file

@ -51,7 +51,7 @@ private[spark] class LocalActor(
private val localExecutorHostname = "localhost"
val executor = new Executor(
localExecutorId, localExecutorHostname, scheduler.conf.getAll, isLocal = true)
localExecutorId, localExecutorHostname, scheduler.conf.getAll, totalCores, isLocal = true)
override def receiveWithLogging = {
case ReviveOffers =>

View file

@ -73,7 +73,8 @@ private[spark] class BlockManager(
mapOutputTracker: MapOutputTracker,
shuffleManager: ShuffleManager,
blockTransferService: BlockTransferService,
securityManager: SecurityManager)
securityManager: SecurityManager,
numUsableCores: Int)
extends BlockDataManager with Logging {
val diskBlockManager = new DiskBlockManager(this, conf)
@ -121,8 +122,8 @@ private[spark] class BlockManager(
// Client to read other executors' shuffle files. This is either an external service, or just the
// standard BlockTranserService to directly connect to other Executors.
private[spark] val shuffleClient = if (externalShuffleServiceEnabled) {
new ExternalShuffleClient(SparkTransportConf.fromSparkConf(conf), securityManager,
securityManager.isAuthenticationEnabled())
val transConf = SparkTransportConf.fromSparkConf(conf, numUsableCores)
new ExternalShuffleClient(transConf, securityManager, securityManager.isAuthenticationEnabled())
} else {
blockTransferService
}
@ -174,9 +175,10 @@ private[spark] class BlockManager(
mapOutputTracker: MapOutputTracker,
shuffleManager: ShuffleManager,
blockTransferService: BlockTransferService,
securityManager: SecurityManager) = {
securityManager: SecurityManager,
numUsableCores: Int) = {
this(execId, actorSystem, master, serializer, BlockManager.getMaxMemory(conf),
conf, mapOutputTracker, shuffleManager, blockTransferService, securityManager)
conf, mapOutputTracker, shuffleManager, blockTransferService, securityManager, numUsableCores)
}
/**

View file

@ -38,7 +38,7 @@ class ExternalShuffleServiceSuite extends ShuffleSuite with BeforeAndAfterAll {
var rpcHandler: ExternalShuffleBlockHandler = _
override def beforeAll() {
val transportConf = SparkTransportConf.fromSparkConf(conf)
val transportConf = SparkTransportConf.fromSparkConf(conf, numUsableCores = 2)
rpcHandler = new ExternalShuffleBlockHandler(transportConf)
val transportContext = new TransportContext(transportConf, rpcHandler)
server = transportContext.createServer()

View file

@ -104,11 +104,11 @@ class NettyBlockTransferSecuritySuite extends FunSuite with MockitoSugar with Sh
when(blockManager.getBlockData(blockId)).thenReturn(blockBuffer)
val securityManager0 = new SecurityManager(conf0)
val exec0 = new NettyBlockTransferService(conf0, securityManager0)
val exec0 = new NettyBlockTransferService(conf0, securityManager0, numCores = 1)
exec0.init(blockManager)
val securityManager1 = new SecurityManager(conf1)
val exec1 = new NettyBlockTransferService(conf1, securityManager1)
val exec1 = new NettyBlockTransferService(conf1, securityManager1, numCores = 1)
exec1.init(blockManager)
val result = fetchBlock(exec0, exec1, "1", blockId) match {

View file

@ -62,7 +62,7 @@ class BlockManagerReplicationSuite extends FunSuite with Matchers with BeforeAnd
name: String = SparkContext.DRIVER_IDENTIFIER): BlockManager = {
val transfer = new NioBlockTransferService(conf, securityMgr)
val store = new BlockManager(name, actorSystem, master, serializer, maxMem, conf,
mapOutputTracker, shuffleManager, transfer, securityMgr)
mapOutputTracker, shuffleManager, transfer, securityMgr, 0)
store.initialize("app-id")
allStores += store
store
@ -263,7 +263,7 @@ class BlockManagerReplicationSuite extends FunSuite with Matchers with BeforeAnd
when(failableTransfer.hostName).thenReturn("some-hostname")
when(failableTransfer.port).thenReturn(1000)
val failableStore = new BlockManager("failable-store", actorSystem, master, serializer,
10000, conf, mapOutputTracker, shuffleManager, failableTransfer, securityMgr)
10000, conf, mapOutputTracker, shuffleManager, failableTransfer, securityMgr, 0)
failableStore.initialize("app-id")
allStores += failableStore // so that this gets stopped after test
assert(master.getPeers(store.blockManagerId).toSet === Set(failableStore.blockManagerId))

View file

@ -74,7 +74,7 @@ class BlockManagerSuite extends FunSuite with Matchers with BeforeAndAfter
name: String = SparkContext.DRIVER_IDENTIFIER): BlockManager = {
val transfer = new NioBlockTransferService(conf, securityMgr)
val manager = new BlockManager(name, actorSystem, master, serializer, maxMem, conf,
mapOutputTracker, shuffleManager, transfer, securityMgr)
mapOutputTracker, shuffleManager, transfer, securityMgr, 0)
manager.initialize("app-id")
manager
}
@ -795,7 +795,8 @@ class BlockManagerSuite extends FunSuite with Matchers with BeforeAndAfter
// Use Java serializer so we can create an unserializable error.
val transfer = new NioBlockTransferService(conf, securityMgr)
store = new BlockManager(SparkContext.DRIVER_IDENTIFIER, actorSystem, master,
new JavaSerializer(conf), 1200, conf, mapOutputTracker, shuffleManager, transfer, securityMgr)
new JavaSerializer(conf), 1200, conf, mapOutputTracker, shuffleManager, transfer, securityMgr,
0)
// The put should fail since a1 is not serializable.
class UnserializableClass

View file

@ -118,7 +118,8 @@ public class TransportClientFactory implements Closeable {
.option(ChannelOption.CONNECT_TIMEOUT_MILLIS, conf.connectionTimeoutMs());
// Use pooled buffers to reduce temporary buffer allocation
bootstrap.option(ChannelOption.ALLOCATOR, createPooledByteBufAllocator());
bootstrap.option(ChannelOption.ALLOCATOR, NettyUtils.createPooledByteBufAllocator(
conf.preferDirectBufs(), false /* allowCache */, conf.clientThreads()));
final AtomicReference<TransportClient> clientRef = new AtomicReference<TransportClient>();
@ -190,34 +191,4 @@ public class TransportClientFactory implements Closeable {
workerGroup = null;
}
}
/**
* Create a pooled ByteBuf allocator but disables the thread-local cache. Thread-local caches
* are disabled because the ByteBufs are allocated by the event loop thread, but released by the
* executor thread rather than the event loop thread. Those thread-local caches actually delay
* the recycling of buffers, leading to larger memory usage.
*/
private PooledByteBufAllocator createPooledByteBufAllocator() {
return new PooledByteBufAllocator(
conf.preferDirectBufs() && PlatformDependent.directBufferPreferred(),
getPrivateStaticField("DEFAULT_NUM_HEAP_ARENA"),
getPrivateStaticField("DEFAULT_NUM_DIRECT_ARENA"),
getPrivateStaticField("DEFAULT_PAGE_SIZE"),
getPrivateStaticField("DEFAULT_MAX_ORDER"),
0, // tinyCacheSize
0, // smallCacheSize
0 // normalCacheSize
);
}
/** Used to get defaults from Netty's private static fields. */
private int getPrivateStaticField(String name) {
try {
Field f = PooledByteBufAllocator.DEFAULT.getClass().getDeclaredField(name);
f.setAccessible(true);
return f.getInt(null);
} catch (Exception e) {
throw new RuntimeException(e);
}
}
}

View file

@ -72,8 +72,8 @@ public class TransportServer implements Closeable {
NettyUtils.createEventLoop(ioMode, conf.serverThreads(), "shuffle-server");
EventLoopGroup workerGroup = bossGroup;
PooledByteBufAllocator allocator = new PooledByteBufAllocator(
conf.preferDirectBufs() && PlatformDependent.directBufferPreferred());
PooledByteBufAllocator allocator = NettyUtils.createPooledByteBufAllocator(
conf.preferDirectBufs(), true /* allowCache */, conf.serverThreads());
bootstrap = new ServerBootstrap()
.group(bossGroup, workerGroup)

View file

@ -17,9 +17,11 @@
package org.apache.spark.network.util;
import java.lang.reflect.Field;
import java.util.concurrent.ThreadFactory;
import com.google.common.util.concurrent.ThreadFactoryBuilder;
import io.netty.buffer.PooledByteBufAllocator;
import io.netty.channel.Channel;
import io.netty.channel.EventLoopGroup;
import io.netty.channel.ServerChannel;
@ -32,6 +34,7 @@ import io.netty.channel.socket.nio.NioServerSocketChannel;
import io.netty.channel.socket.nio.NioSocketChannel;
import io.netty.handler.codec.ByteToMessageDecoder;
import io.netty.handler.codec.LengthFieldBasedFrameDecoder;
import io.netty.util.internal.PlatformDependent;
/**
* Utilities for creating various Netty constructs based on whether we're using EPOLL or NIO.
@ -103,4 +106,40 @@ public class NettyUtils {
}
return "<unknown remote>";
}
/**
* Create a pooled ByteBuf allocator but disables the thread-local cache. Thread-local caches
* are disabled because the ByteBufs are allocated by the event loop thread, but released by the
* executor thread rather than the event loop thread. Those thread-local caches actually delay
* the recycling of buffers, leading to larger memory usage.
*/
public static PooledByteBufAllocator createPooledByteBufAllocator(
boolean allowDirectBufs,
boolean allowCache,
int numCores) {
if (numCores == 0) {
numCores = Runtime.getRuntime().availableProcessors();
}
return new PooledByteBufAllocator(
allowDirectBufs && PlatformDependent.directBufferPreferred(),
Math.min(getPrivateStaticField("DEFAULT_NUM_HEAP_ARENA"), numCores),
Math.min(getPrivateStaticField("DEFAULT_NUM_DIRECT_ARENA"), allowDirectBufs ? numCores : 0),
getPrivateStaticField("DEFAULT_PAGE_SIZE"),
getPrivateStaticField("DEFAULT_MAX_ORDER"),
allowCache ? getPrivateStaticField("DEFAULT_TINY_CACHE_SIZE") : 0,
allowCache ? getPrivateStaticField("DEFAULT_SMALL_CACHE_SIZE") : 0,
allowCache ? getPrivateStaticField("DEFAULT_NORMAL_CACHE_SIZE") : 0
);
}
/** Used to get defaults from Netty's private static fields. */
private static int getPrivateStaticField(String name) {
try {
Field f = PooledByteBufAllocator.DEFAULT.getClass().getDeclaredField(name);
f.setAccessible(true);
return f.getInt(null);
} catch (Exception e) {
throw new RuntimeException(e);
}
}
}

View file

@ -73,7 +73,7 @@ class ReceivedBlockHandlerSuite extends FunSuite with BeforeAndAfter with Matche
blockManager = new BlockManager("bm", actorSystem, blockManagerMaster, serializer,
blockManagerSize, conf, mapOutputTracker, shuffleManager,
new NioBlockTransferService(conf, securityMgr), securityMgr)
new NioBlockTransferService(conf, securityMgr), securityMgr, 0)
blockManager.initialize("app-id")
tempDirectory = Files.createTempDir()