[SPARK-20640][CORE] Make rpc timeout and retry for shuffle registration configurable.

## What changes were proposed in this pull request?

Currently the shuffle service registration timeout and retry has been hardcoded. This works well for small workloads but under heavy workload when the shuffle service is busy transferring large amount of data we see significant delay in responding to the registration request, as a result we often see the executors fail to register with the shuffle service, eventually failing the job. We need to make these two parameters configurable.

## How was this patch tested?

* Updated `BlockManagerSuite` to test registration timeout and max attempts configuration actually works.

cc sitalkedia

Author: Li Yichao <lyc@zhihu.com>

Closes #18092 from liyichao/SPARK-20640.
This commit is contained in:
Li Yichao 2017-06-21 21:54:29 +08:00 committed by Wenchen Fan
parent 9ce714dca2
commit d107b3b910
9 changed files with 109 additions and 15 deletions

View file

@ -49,6 +49,7 @@ public class ExternalShuffleClient extends ShuffleClient {
private final TransportConf conf; private final TransportConf conf;
private final boolean authEnabled; private final boolean authEnabled;
private final SecretKeyHolder secretKeyHolder; private final SecretKeyHolder secretKeyHolder;
private final long registrationTimeoutMs;
protected TransportClientFactory clientFactory; protected TransportClientFactory clientFactory;
protected String appId; protected String appId;
@ -60,10 +61,12 @@ public class ExternalShuffleClient extends ShuffleClient {
public ExternalShuffleClient( public ExternalShuffleClient(
TransportConf conf, TransportConf conf,
SecretKeyHolder secretKeyHolder, SecretKeyHolder secretKeyHolder,
boolean authEnabled) { boolean authEnabled,
long registrationTimeoutMs) {
this.conf = conf; this.conf = conf;
this.secretKeyHolder = secretKeyHolder; this.secretKeyHolder = secretKeyHolder;
this.authEnabled = authEnabled; this.authEnabled = authEnabled;
this.registrationTimeoutMs = registrationTimeoutMs;
} }
protected void checkInit() { protected void checkInit() {
@ -132,7 +135,7 @@ public class ExternalShuffleClient extends ShuffleClient {
checkInit(); checkInit();
try (TransportClient client = clientFactory.createUnmanagedClient(host, port)) { try (TransportClient client = clientFactory.createUnmanagedClient(host, port)) {
ByteBuffer registerMessage = new RegisterExecutor(appId, execId, executorInfo).toByteBuffer(); ByteBuffer registerMessage = new RegisterExecutor(appId, execId, executorInfo).toByteBuffer();
client.sendRpcSync(registerMessage, 5000 /* timeoutMs */); client.sendRpcSync(registerMessage, registrationTimeoutMs);
} }
} }

View file

@ -60,8 +60,9 @@ public class MesosExternalShuffleClient extends ExternalShuffleClient {
public MesosExternalShuffleClient( public MesosExternalShuffleClient(
TransportConf conf, TransportConf conf,
SecretKeyHolder secretKeyHolder, SecretKeyHolder secretKeyHolder,
boolean authEnabled) { boolean authEnabled,
super(conf, secretKeyHolder, authEnabled); long registrationTimeoutMs) {
super(conf, secretKeyHolder, authEnabled, registrationTimeoutMs);
} }
public void registerDriverWithShuffleService( public void registerDriverWithShuffleService(

View file

@ -133,7 +133,7 @@ public class ExternalShuffleIntegrationSuite {
final Semaphore requestsRemaining = new Semaphore(0); final Semaphore requestsRemaining = new Semaphore(0);
ExternalShuffleClient client = new ExternalShuffleClient(clientConf, null, false); ExternalShuffleClient client = new ExternalShuffleClient(clientConf, null, false, 5000);
client.init(APP_ID); client.init(APP_ID);
client.fetchBlocks(TestUtils.getLocalHost(), port, execId, blockIds, client.fetchBlocks(TestUtils.getLocalHost(), port, execId, blockIds,
new BlockFetchingListener() { new BlockFetchingListener() {
@ -242,7 +242,7 @@ public class ExternalShuffleIntegrationSuite {
private static void registerExecutor(String executorId, ExecutorShuffleInfo executorInfo) private static void registerExecutor(String executorId, ExecutorShuffleInfo executorInfo)
throws IOException, InterruptedException { throws IOException, InterruptedException {
ExternalShuffleClient client = new ExternalShuffleClient(conf, null, false); ExternalShuffleClient client = new ExternalShuffleClient(conf, null, false, 5000);
client.init(APP_ID); client.init(APP_ID);
client.registerWithShuffleServer(TestUtils.getLocalHost(), server.getPort(), client.registerWithShuffleServer(TestUtils.getLocalHost(), server.getPort(),
executorId, executorInfo); executorId, executorInfo);

View file

@ -97,7 +97,7 @@ public class ExternalShuffleSecuritySuite {
} }
ExternalShuffleClient client = ExternalShuffleClient client =
new ExternalShuffleClient(testConf, new TestSecretKeyHolder(appId, secretKey), true); new ExternalShuffleClient(testConf, new TestSecretKeyHolder(appId, secretKey), true, 5000);
client.init(appId); client.init(appId);
// Registration either succeeds or throws an exception. // Registration either succeeds or throws an exception.
client.registerWithShuffleServer(TestUtils.getLocalHost(), server.getPort(), "exec0", client.registerWithShuffleServer(TestUtils.getLocalHost(), server.getPort(), "exec0",

View file

@ -303,6 +303,19 @@ package object config {
.bytesConf(ByteUnit.BYTE) .bytesConf(ByteUnit.BYTE)
.createWithDefault(100 * 1024 * 1024) .createWithDefault(100 * 1024 * 1024)
private[spark] val SHUFFLE_REGISTRATION_TIMEOUT =
ConfigBuilder("spark.shuffle.registration.timeout")
.doc("Timeout in milliseconds for registration to the external shuffle service.")
.timeConf(TimeUnit.MILLISECONDS)
.createWithDefault(5000)
private[spark] val SHUFFLE_REGISTRATION_MAX_ATTEMPTS =
ConfigBuilder("spark.shuffle.registration.maxAttempts")
.doc("When we fail to register to the external shuffle service, we will " +
"retry for maxAttempts times.")
.intConf
.createWithDefault(3)
private[spark] val REDUCER_MAX_REQ_SIZE_SHUFFLE_TO_MEM = private[spark] val REDUCER_MAX_REQ_SIZE_SHUFFLE_TO_MEM =
ConfigBuilder("spark.reducer.maxReqSizeShuffleToMem") ConfigBuilder("spark.reducer.maxReqSizeShuffleToMem")
.doc("The blocks of a shuffle request will be fetched to disk when size of the request is " + .doc("The blocks of a shuffle request will be fetched to disk when size of the request is " +

View file

@ -31,7 +31,7 @@ import scala.util.control.NonFatal
import org.apache.spark._ import org.apache.spark._
import org.apache.spark.executor.{DataReadMethod, ShuffleWriteMetrics} import org.apache.spark.executor.{DataReadMethod, ShuffleWriteMetrics}
import org.apache.spark.internal.Logging import org.apache.spark.internal.{config, Logging}
import org.apache.spark.memory.{MemoryManager, MemoryMode} import org.apache.spark.memory.{MemoryManager, MemoryMode}
import org.apache.spark.network._ import org.apache.spark.network._
import org.apache.spark.network.buffer.ManagedBuffer import org.apache.spark.network.buffer.ManagedBuffer
@ -174,7 +174,8 @@ private[spark] class BlockManager(
// standard BlockTransferService to directly connect to other Executors. // standard BlockTransferService to directly connect to other Executors.
private[spark] val shuffleClient = if (externalShuffleServiceEnabled) { private[spark] val shuffleClient = if (externalShuffleServiceEnabled) {
val transConf = SparkTransportConf.fromSparkConf(conf, "shuffle", numUsableCores) val transConf = SparkTransportConf.fromSparkConf(conf, "shuffle", numUsableCores)
new ExternalShuffleClient(transConf, securityManager, securityManager.isAuthenticationEnabled()) new ExternalShuffleClient(transConf, securityManager,
securityManager.isAuthenticationEnabled(), conf.get(config.SHUFFLE_REGISTRATION_TIMEOUT))
} else { } else {
blockTransferService blockTransferService
} }
@ -254,7 +255,7 @@ private[spark] class BlockManager(
diskBlockManager.subDirsPerLocalDir, diskBlockManager.subDirsPerLocalDir,
shuffleManager.getClass.getName) shuffleManager.getClass.getName)
val MAX_ATTEMPTS = 3 val MAX_ATTEMPTS = conf.get(config.SHUFFLE_REGISTRATION_MAX_ATTEMPTS)
val SLEEP_TIME_SECS = 5 val SLEEP_TIME_SECS = 5
for (i <- 1 to MAX_ATTEMPTS) { for (i <- 1 to MAX_ATTEMPTS) {

View file

@ -20,13 +20,15 @@ package org.apache.spark.storage
import java.io.File import java.io.File
import java.nio.ByteBuffer import java.nio.ByteBuffer
import scala.collection.JavaConverters._
import scala.collection.mutable
import scala.collection.mutable.ArrayBuffer import scala.collection.mutable.ArrayBuffer
import scala.concurrent.duration._ import scala.concurrent.duration._
import scala.concurrent.Future import scala.concurrent.Future
import scala.language.implicitConversions import scala.language.{implicitConversions, postfixOps}
import scala.language.postfixOps
import scala.reflect.ClassTag import scala.reflect.ClassTag
import org.apache.commons.lang3.RandomUtils
import org.mockito.{Matchers => mc} import org.mockito.{Matchers => mc}
import org.mockito.Mockito.{mock, times, verify, when} import org.mockito.Mockito.{mock, times, verify, when}
import org.scalatest._ import org.scalatest._
@ -38,10 +40,13 @@ import org.apache.spark.broadcast.BroadcastManager
import org.apache.spark.executor.DataReadMethod import org.apache.spark.executor.DataReadMethod
import org.apache.spark.internal.config._ import org.apache.spark.internal.config._
import org.apache.spark.memory.UnifiedMemoryManager import org.apache.spark.memory.UnifiedMemoryManager
import org.apache.spark.network.{BlockDataManager, BlockTransferService} import org.apache.spark.network.{BlockDataManager, BlockTransferService, TransportContext}
import org.apache.spark.network.buffer.{ManagedBuffer, NioManagedBuffer} import org.apache.spark.network.buffer.{ManagedBuffer, NioManagedBuffer}
import org.apache.spark.network.netty.NettyBlockTransferService import org.apache.spark.network.client.{RpcResponseCallback, TransportClient}
import org.apache.spark.network.netty.{NettyBlockTransferService, SparkTransportConf}
import org.apache.spark.network.server.{NoOpRpcHandler, TransportServer, TransportServerBootstrap}
import org.apache.spark.network.shuffle.BlockFetchingListener import org.apache.spark.network.shuffle.BlockFetchingListener
import org.apache.spark.network.shuffle.protocol.{BlockTransferMessage, RegisterExecutor}
import org.apache.spark.rpc.RpcEnv import org.apache.spark.rpc.RpcEnv
import org.apache.spark.scheduler.LiveListenerBus import org.apache.spark.scheduler.LiveListenerBus
import org.apache.spark.security.{CryptoStreamUtils, EncryptionFunSuite} import org.apache.spark.security.{CryptoStreamUtils, EncryptionFunSuite}
@ -1281,6 +1286,61 @@ class BlockManagerSuite extends SparkFunSuite with Matchers with BeforeAndAfterE
assert(master.getLocations("item").isEmpty) assert(master.getLocations("item").isEmpty)
} }
test("SPARK-20640: Shuffle registration timeout and maxAttempts conf are working") {
val tryAgainMsg = "test_spark_20640_try_again"
// a server which delays response 50ms and must try twice for success.
def newShuffleServer(port: Int): (TransportServer, Int) = {
val attempts = new mutable.HashMap[String, Int]()
val handler = new NoOpRpcHandler {
override def receive(
client: TransportClient,
message: ByteBuffer,
callback: RpcResponseCallback): Unit = {
val msgObj = BlockTransferMessage.Decoder.fromByteBuffer(message)
msgObj match {
case exec: RegisterExecutor =>
Thread.sleep(50)
val attempt = attempts.getOrElse(exec.execId, 0) + 1
attempts(exec.execId) = attempt
if (attempt < 2) {
callback.onFailure(new Exception(tryAgainMsg))
return
}
callback.onSuccess(ByteBuffer.wrap(new Array[Byte](0)))
}
}
}
val transConf = SparkTransportConf.fromSparkConf(conf, "shuffle", numUsableCores = 0)
val transCtx = new TransportContext(transConf, handler, true)
(transCtx.createServer(port, Seq.empty[TransportServerBootstrap].asJava), port)
}
val candidatePort = RandomUtils.nextInt(1024, 65536)
val (server, shufflePort) = Utils.startServiceOnPort(candidatePort,
newShuffleServer, conf, "ShuffleServer")
conf.set("spark.shuffle.service.enabled", "true")
conf.set("spark.shuffle.service.port", shufflePort.toString)
conf.set(SHUFFLE_REGISTRATION_TIMEOUT.key, "40")
conf.set(SHUFFLE_REGISTRATION_MAX_ATTEMPTS.key, "1")
var e = intercept[SparkException]{
makeBlockManager(8000, "executor1")
}.getMessage
assert(e.contains("TimeoutException"))
conf.set(SHUFFLE_REGISTRATION_TIMEOUT.key, "1000")
conf.set(SHUFFLE_REGISTRATION_MAX_ATTEMPTS.key, "1")
e = intercept[SparkException]{
makeBlockManager(8000, "executor2")
}.getMessage
assert(e.contains(tryAgainMsg))
conf.set(SHUFFLE_REGISTRATION_TIMEOUT.key, "1000")
conf.set(SHUFFLE_REGISTRATION_MAX_ATTEMPTS.key, "2")
makeBlockManager(8000, "executor3")
server.close()
}
class MockBlockTransferService(val maxFailures: Int) extends BlockTransferService { class MockBlockTransferService(val maxFailures: Int) extends BlockTransferService {
var numCalls = 0 var numCalls = 0

View file

@ -638,6 +638,20 @@ Apart from these, the following properties are also available, and may be useful
underestimating shuffle block size when fetch shuffle blocks. underestimating shuffle block size when fetch shuffle blocks.
</td> </td>
</tr> </tr>
<tr>
<td><code>spark.shuffle.registration.timeout</code></td>
<td>5000</td>
<td>
Timeout in milliseconds for registration to the external shuffle service.
</td>
</tr>
<tr>
<td><code>spark.shuffle.registration.maxAttempts</code></td>
<td>3</td>
<td>
When we fail to register to the external shuffle service, we will retry for maxAttempts times.
</td>
</tr>
<tr> <tr>
<td><code>spark.io.encryption.enabled</code></td> <td><code>spark.io.encryption.enabled</code></td>
<td>false</td> <td>false</td>

View file

@ -29,6 +29,7 @@ import org.apache.mesos.Protos.{TaskInfo => MesosTaskInfo, _}
import org.apache.mesos.SchedulerDriver import org.apache.mesos.SchedulerDriver
import org.apache.spark.{SecurityManager, SparkContext, SparkException, TaskState} import org.apache.spark.{SecurityManager, SparkContext, SparkException, TaskState}
import org.apache.spark.internal.config
import org.apache.spark.network.netty.SparkTransportConf import org.apache.spark.network.netty.SparkTransportConf
import org.apache.spark.network.shuffle.mesos.MesosExternalShuffleClient import org.apache.spark.network.shuffle.mesos.MesosExternalShuffleClient
import org.apache.spark.rpc.RpcEndpointAddress import org.apache.spark.rpc.RpcEndpointAddress
@ -150,7 +151,8 @@ private[spark] class MesosCoarseGrainedSchedulerBackend(
new MesosExternalShuffleClient( new MesosExternalShuffleClient(
SparkTransportConf.fromSparkConf(conf, "shuffle"), SparkTransportConf.fromSparkConf(conf, "shuffle"),
securityManager, securityManager,
securityManager.isAuthenticationEnabled()) securityManager.isAuthenticationEnabled(),
conf.get(config.SHUFFLE_REGISTRATION_TIMEOUT))
} }
private var nextMesosTaskId = 0 private var nextMesosTaskId = 0