[SPARK-36772] FinalizeShuffleMerge fails with an exception due to attempt id not matching

### What changes were proposed in this pull request?
Remove the appAttemptId from TransportConf, and parsing through SparkEnv.

### Why are the changes needed?
Push based shuffle will fail if there are any attemptId set in the SparkConf, as the attemptId is not set correctly in Driver.

### Does this PR introduce _any_ user-facing change?
No

### How was this patch tested?
Tested within our Yarn cluster. Without this PR, the Driver will fail to finalize the shuffle merge on all the mergers. After the patch, Driver can successfully finalize the shuffle merge and the push based shuffle can work fine.
Also with unit test to verify the attemptId is being set in the BlockStoreClient in Driver.

Closes #34018 from zhouyejoe/SPARK-36772.

Authored-by: Ye Zhou <yezhou@linkedin.com>
Signed-off-by: Gengliang Wang <gengliang@apache.org>
This commit is contained in:
Ye Zhou 2021-09-18 15:51:57 +08:00 committed by Gengliang Wang
parent 32b8512912
commit cabc36b54d
7 changed files with 63 additions and 13 deletions

View file

@ -427,11 +427,4 @@ public class TransportConf {
public int ioExceptionsThresholdDuringMerge() { public int ioExceptionsThresholdDuringMerge() {
return conf.getInt("spark.shuffle.push.server.ioExceptionsThresholdDuringMerge", 4); return conf.getInt("spark.shuffle.push.server.ioExceptionsThresholdDuringMerge", 4);
} }
/**
* The application attemptID assigned from Hadoop YARN.
*/
public int appAttemptId() {
return conf.getInt("spark.app.attempt.id", -1);
}
} }

View file

@ -46,6 +46,8 @@ public abstract class BlockStoreClient implements Closeable {
protected volatile TransportClientFactory clientFactory; protected volatile TransportClientFactory clientFactory;
protected String appId; protected String appId;
// Store the application attemptId
private String appAttemptId;
protected TransportConf transportConf; protected TransportConf transportConf;
/** /**
@ -124,6 +126,16 @@ public abstract class BlockStoreClient implements Closeable {
assert appId != null : "Called before init()"; assert appId != null : "Called before init()";
} }
// Set the application attemptId
public void setAppAttemptId(String appAttemptId) {
this.appAttemptId = appAttemptId;
}
// Get the application attemptId
public String getAppAttemptId() {
return this.appAttemptId;
}
/** /**
* Request the local disk directories for executors which are located at the same host with * Request the local disk directories for executors which are located at the same host with
* the current BlockStoreClient(it can be ExternalBlockStoreClient or NettyBlockTransferService). * the current BlockStoreClient(it can be ExternalBlockStoreClient or NettyBlockTransferService).

View file

@ -52,6 +52,10 @@ public class ExternalBlockStoreClient extends BlockStoreClient {
private final boolean authEnabled; private final boolean authEnabled;
private final SecretKeyHolder secretKeyHolder; private final SecretKeyHolder secretKeyHolder;
private final long registrationTimeoutMs; private final long registrationTimeoutMs;
// Push based shuffle requires a comparable Id to distinguish the shuffle data among multiple
// application attempts. This variable is derived from the String typed appAttemptId. If no
// appAttemptId is set, the default comparableAppAttemptId is -1.
private int comparableAppAttemptId = -1;
/** /**
* Creates an external shuffle client, with SASL optionally enabled. If SASL is not enabled, * Creates an external shuffle client, with SASL optionally enabled. If SASL is not enabled,
@ -83,6 +87,26 @@ public class ExternalBlockStoreClient extends BlockStoreClient {
clientFactory = context.createClientFactory(bootstraps); clientFactory = context.createClientFactory(bootstraps);
} }
@Override
public void setAppAttemptId(String appAttemptId) {
super.setAppAttemptId(appAttemptId);
setComparableAppAttemptId(appAttemptId);
}
private void setComparableAppAttemptId(String appAttemptId) {
// For now, push based shuffle only supports running in YARN.
// Application attemptId in YARN is integer and it can be safely parsed
// to integer here. For the application attemptId from other cluster set up
// which is not numeric, it needs to generate this comparableAppAttemptId
// from the String typed appAttemptId through some other customized logic.
try {
this.comparableAppAttemptId = Integer.parseInt(appAttemptId);
} catch (NumberFormatException e) {
logger.warn("Push based shuffle requires comparable application attemptId, " +
"but the appAttemptId {} cannot be parsed to Integer", appAttemptId, e);
}
}
@Override @Override
public void fetchBlocks( public void fetchBlocks(
String host, String host,
@ -146,7 +170,7 @@ public class ExternalBlockStoreClient extends BlockStoreClient {
assert inputListener instanceof BlockPushingListener : assert inputListener instanceof BlockPushingListener :
"Expecting a BlockPushingListener, but got " + inputListener.getClass(); "Expecting a BlockPushingListener, but got " + inputListener.getClass();
TransportClient client = clientFactory.createClient(host, port); TransportClient client = clientFactory.createClient(host, port);
new OneForOneBlockPusher(client, appId, transportConf.appAttemptId(), inputBlockId, new OneForOneBlockPusher(client, appId, comparableAppAttemptId, inputBlockId,
(BlockPushingListener) inputListener, buffersWithId).start(); (BlockPushingListener) inputListener, buffersWithId).start();
} else { } else {
logger.info("This clientFactory was closed. Skipping further block push retries."); logger.info("This clientFactory was closed. Skipping further block push retries.");
@ -178,8 +202,8 @@ public class ExternalBlockStoreClient extends BlockStoreClient {
try { try {
TransportClient client = clientFactory.createClient(host, port); TransportClient client = clientFactory.createClient(host, port);
ByteBuffer finalizeShuffleMerge = ByteBuffer finalizeShuffleMerge =
new FinalizeShuffleMerge(appId, transportConf.appAttemptId(), shuffleId, new FinalizeShuffleMerge(
shuffleMergeId).toByteBuffer(); appId, comparableAppAttemptId, shuffleId, shuffleMergeId).toByteBuffer();
client.sendRpc(finalizeShuffleMerge, new RpcResponseCallback() { client.sendRpc(finalizeShuffleMerge, new RpcResponseCallback() {
@Override @Override
public void onSuccess(ByteBuffer response) { public void onSuccess(ByteBuffer response) {

View file

@ -583,7 +583,10 @@ class SparkContext(config: SparkConf) extends Logging {
_applicationId = _taskScheduler.applicationId() _applicationId = _taskScheduler.applicationId()
_applicationAttemptId = _taskScheduler.applicationAttemptId() _applicationAttemptId = _taskScheduler.applicationAttemptId()
_conf.set("spark.app.id", _applicationId) _conf.set("spark.app.id", _applicationId)
_applicationAttemptId.foreach(attemptId => _conf.set(APP_ATTEMPT_ID, attemptId)) _applicationAttemptId.foreach { attemptId =>
_conf.set(APP_ATTEMPT_ID, attemptId)
_env.blockManager.blockStoreClient.setAppAttemptId(attemptId)
}
if (_conf.get(UI_REVERSE_PROXY)) { if (_conf.get(UI_REVERSE_PROXY)) {
val proxyUrl = _conf.get(UI_REVERSE_PROXY_URL.key, "").stripSuffix("/") + val proxyUrl = _conf.get(UI_REVERSE_PROXY_URL.key, "").stripSuffix("/") +
"/proxy/" + _applicationId "/proxy/" + _applicationId

View file

@ -470,7 +470,11 @@ private[spark] object CoarseGrainedExecutorBackend extends Logging {
driverConf.set(EXECUTOR_ID, arguments.executorId) driverConf.set(EXECUTOR_ID, arguments.executorId)
val env = SparkEnv.createExecutorEnv(driverConf, arguments.executorId, arguments.bindAddress, val env = SparkEnv.createExecutorEnv(driverConf, arguments.executorId, arguments.bindAddress,
arguments.hostname, arguments.cores, cfg.ioEncryptionKey, isLocal = false) arguments.hostname, arguments.cores, cfg.ioEncryptionKey, isLocal = false)
// Set the application attemptId in the BlockStoreClient if available.
val appAttemptId = env.conf.get(APP_ATTEMPT_ID)
appAttemptId.foreach(attemptId =>
env.blockManager.blockStoreClient.setAppAttemptId(attemptId)
)
val backend = backendCreateFn(env.rpcEnv, arguments, env, cfg.resourceProfile) val backend = backendCreateFn(env.rpcEnv, arguments, env, cfg.resourceProfile)
env.rpcEnv.setupEndpoint("Executor", backend) env.rpcEnv.setupEndpoint("Executor", backend)
arguments.workerUrl.foreach { url => arguments.workerUrl.foreach { url =>

View file

@ -1325,6 +1325,18 @@ class SparkContextSuite extends SparkFunSuite with LocalSparkContext with Eventu
} }
} }
} }
test("SPARK-36772: Store application attemptId in BlockStoreClient for push based shuffle") {
val conf = new SparkConf().setAppName("testAppAttemptId")
.setMaster("pushbasedshuffleclustermanager")
conf.set(PUSH_BASED_SHUFFLE_ENABLED.key, "true")
conf.set(IS_TESTING.key, "true")
conf.set(SHUFFLE_SERVICE_ENABLED.key, "true")
sc = new SparkContext(conf)
val env = SparkEnv.get
assert(env.blockManager.blockStoreClient.getAppAttemptId.equals("1"))
}
} }
object SparkContextSuite { object SparkContextSuite {

View file

@ -3961,7 +3961,9 @@ private class PushBasedClusterManager extends ExternalClusterManager {
override def createTaskScheduler( override def createTaskScheduler(
sc: SparkContext, sc: SparkContext,
masterURL: String): TaskScheduler = new TaskSchedulerImpl(sc, 1, isLocal = true) masterURL: String): TaskScheduler = new TaskSchedulerImpl(sc, 1, isLocal = true) {
override def applicationAttemptId(): Option[String] = Some("1")
}
override def initialize(scheduler: TaskScheduler, backend: SchedulerBackend): Unit = { override def initialize(scheduler: TaskScheduler, backend: SchedulerBackend): Unit = {
val sc = scheduler.asInstanceOf[TaskSchedulerImpl] val sc = scheduler.asInstanceOf[TaskSchedulerImpl]