From cabc36b54d7f6633d8b128e511e7049c475b919d Mon Sep 17 00:00:00 2001 From: Ye Zhou Date: Sat, 18 Sep 2021 15:51:57 +0800 Subject: [PATCH] [SPARK-36772] FinalizeShuffleMerge fails with an exception due to attempt id not matching ### What changes were proposed in this pull request? Remove the appAttemptId from TransportConf, and parsing through SparkEnv. ### Why are the changes needed? Push based shuffle will fail if there are any attemptId set in the SparkConf, as the attemptId is not set correctly in Driver. ### Does this PR introduce _any_ user-facing change? No ### How was this patch tested? Tested within our Yarn cluster. Without this PR, the Driver will fail to finalize the shuffle merge on all the mergers. After the patch, Driver can successfully finalize the shuffle merge and the push based shuffle can work fine. Also with unit test to verify the attemptId is being set in the BlockStoreClient in Driver. Closes #34018 from zhouyejoe/SPARK-36772. Authored-by: Ye Zhou Signed-off-by: Gengliang Wang --- .../spark/network/util/TransportConf.java | 7 ----- .../network/shuffle/BlockStoreClient.java | 12 ++++++++ .../shuffle/ExternalBlockStoreClient.java | 30 +++++++++++++++++-- .../scala/org/apache/spark/SparkContext.scala | 5 +++- .../CoarseGrainedExecutorBackend.scala | 6 +++- .../org/apache/spark/SparkContextSuite.scala | 12 ++++++++ .../spark/scheduler/DAGSchedulerSuite.scala | 4 ++- 7 files changed, 63 insertions(+), 13 deletions(-) diff --git a/common/network-common/src/main/java/org/apache/spark/network/util/TransportConf.java b/common/network-common/src/main/java/org/apache/spark/network/util/TransportConf.java index bc507a4405..f73e3ce2e0 100644 --- a/common/network-common/src/main/java/org/apache/spark/network/util/TransportConf.java +++ b/common/network-common/src/main/java/org/apache/spark/network/util/TransportConf.java @@ -427,11 +427,4 @@ public class TransportConf { public int ioExceptionsThresholdDuringMerge() { return conf.getInt("spark.shuffle.push.server.ioExceptionsThresholdDuringMerge", 4); } - - /** - * The application attemptID assigned from Hadoop YARN. - */ - public int appAttemptId() { - return conf.getInt("spark.app.attempt.id", -1); - } } diff --git a/common/network-shuffle/src/main/java/org/apache/spark/network/shuffle/BlockStoreClient.java b/common/network-shuffle/src/main/java/org/apache/spark/network/shuffle/BlockStoreClient.java index 6dc5fd5a70..253fb7aca1 100644 --- a/common/network-shuffle/src/main/java/org/apache/spark/network/shuffle/BlockStoreClient.java +++ b/common/network-shuffle/src/main/java/org/apache/spark/network/shuffle/BlockStoreClient.java @@ -46,6 +46,8 @@ public abstract class BlockStoreClient implements Closeable { protected volatile TransportClientFactory clientFactory; protected String appId; + // Store the application attemptId + private String appAttemptId; protected TransportConf transportConf; /** @@ -124,6 +126,16 @@ public abstract class BlockStoreClient implements Closeable { assert appId != null : "Called before init()"; } + // Set the application attemptId + public void setAppAttemptId(String appAttemptId) { + this.appAttemptId = appAttemptId; + } + + // Get the application attemptId + public String getAppAttemptId() { + return this.appAttemptId; + } + /** * Request the local disk directories for executors which are located at the same host with * the current BlockStoreClient(it can be ExternalBlockStoreClient or NettyBlockTransferService). diff --git a/common/network-shuffle/src/main/java/org/apache/spark/network/shuffle/ExternalBlockStoreClient.java b/common/network-shuffle/src/main/java/org/apache/spark/network/shuffle/ExternalBlockStoreClient.java index 4c0e9f301a..d2df77658c 100644 --- a/common/network-shuffle/src/main/java/org/apache/spark/network/shuffle/ExternalBlockStoreClient.java +++ b/common/network-shuffle/src/main/java/org/apache/spark/network/shuffle/ExternalBlockStoreClient.java @@ -52,6 +52,10 @@ public class ExternalBlockStoreClient extends BlockStoreClient { private final boolean authEnabled; private final SecretKeyHolder secretKeyHolder; private final long registrationTimeoutMs; + // Push based shuffle requires a comparable Id to distinguish the shuffle data among multiple + // application attempts. This variable is derived from the String typed appAttemptId. If no + // appAttemptId is set, the default comparableAppAttemptId is -1. + private int comparableAppAttemptId = -1; /** * Creates an external shuffle client, with SASL optionally enabled. If SASL is not enabled, @@ -83,6 +87,26 @@ public class ExternalBlockStoreClient extends BlockStoreClient { clientFactory = context.createClientFactory(bootstraps); } + @Override + public void setAppAttemptId(String appAttemptId) { + super.setAppAttemptId(appAttemptId); + setComparableAppAttemptId(appAttemptId); + } + + private void setComparableAppAttemptId(String appAttemptId) { + // For now, push based shuffle only supports running in YARN. + // Application attemptId in YARN is integer and it can be safely parsed + // to integer here. For the application attemptId from other cluster set up + // which is not numeric, it needs to generate this comparableAppAttemptId + // from the String typed appAttemptId through some other customized logic. + try { + this.comparableAppAttemptId = Integer.parseInt(appAttemptId); + } catch (NumberFormatException e) { + logger.warn("Push based shuffle requires comparable application attemptId, " + + "but the appAttemptId {} cannot be parsed to Integer", appAttemptId, e); + } + } + @Override public void fetchBlocks( String host, @@ -146,7 +170,7 @@ public class ExternalBlockStoreClient extends BlockStoreClient { assert inputListener instanceof BlockPushingListener : "Expecting a BlockPushingListener, but got " + inputListener.getClass(); TransportClient client = clientFactory.createClient(host, port); - new OneForOneBlockPusher(client, appId, transportConf.appAttemptId(), inputBlockId, + new OneForOneBlockPusher(client, appId, comparableAppAttemptId, inputBlockId, (BlockPushingListener) inputListener, buffersWithId).start(); } else { logger.info("This clientFactory was closed. Skipping further block push retries."); @@ -178,8 +202,8 @@ public class ExternalBlockStoreClient extends BlockStoreClient { try { TransportClient client = clientFactory.createClient(host, port); ByteBuffer finalizeShuffleMerge = - new FinalizeShuffleMerge(appId, transportConf.appAttemptId(), shuffleId, - shuffleMergeId).toByteBuffer(); + new FinalizeShuffleMerge( + appId, comparableAppAttemptId, shuffleId, shuffleMergeId).toByteBuffer(); client.sendRpc(finalizeShuffleMerge, new RpcResponseCallback() { @Override public void onSuccess(ByteBuffer response) { diff --git a/core/src/main/scala/org/apache/spark/SparkContext.scala b/core/src/main/scala/org/apache/spark/SparkContext.scala index 698a0227ed..3404a0f8ee 100644 --- a/core/src/main/scala/org/apache/spark/SparkContext.scala +++ b/core/src/main/scala/org/apache/spark/SparkContext.scala @@ -583,7 +583,10 @@ class SparkContext(config: SparkConf) extends Logging { _applicationId = _taskScheduler.applicationId() _applicationAttemptId = _taskScheduler.applicationAttemptId() _conf.set("spark.app.id", _applicationId) - _applicationAttemptId.foreach(attemptId => _conf.set(APP_ATTEMPT_ID, attemptId)) + _applicationAttemptId.foreach { attemptId => + _conf.set(APP_ATTEMPT_ID, attemptId) + _env.blockManager.blockStoreClient.setAppAttemptId(attemptId) + } if (_conf.get(UI_REVERSE_PROXY)) { val proxyUrl = _conf.get(UI_REVERSE_PROXY_URL.key, "").stripSuffix("/") + "/proxy/" + _applicationId diff --git a/core/src/main/scala/org/apache/spark/executor/CoarseGrainedExecutorBackend.scala b/core/src/main/scala/org/apache/spark/executor/CoarseGrainedExecutorBackend.scala index c87e61ae70..43887a7f0c 100644 --- a/core/src/main/scala/org/apache/spark/executor/CoarseGrainedExecutorBackend.scala +++ b/core/src/main/scala/org/apache/spark/executor/CoarseGrainedExecutorBackend.scala @@ -470,7 +470,11 @@ private[spark] object CoarseGrainedExecutorBackend extends Logging { driverConf.set(EXECUTOR_ID, arguments.executorId) val env = SparkEnv.createExecutorEnv(driverConf, arguments.executorId, arguments.bindAddress, arguments.hostname, arguments.cores, cfg.ioEncryptionKey, isLocal = false) - + // Set the application attemptId in the BlockStoreClient if available. + val appAttemptId = env.conf.get(APP_ATTEMPT_ID) + appAttemptId.foreach(attemptId => + env.blockManager.blockStoreClient.setAppAttemptId(attemptId) + ) val backend = backendCreateFn(env.rpcEnv, arguments, env, cfg.resourceProfile) env.rpcEnv.setupEndpoint("Executor", backend) arguments.workerUrl.foreach { url => diff --git a/core/src/test/scala/org/apache/spark/SparkContextSuite.scala b/core/src/test/scala/org/apache/spark/SparkContextSuite.scala index fcac90b56f..bc809f11cc 100644 --- a/core/src/test/scala/org/apache/spark/SparkContextSuite.scala +++ b/core/src/test/scala/org/apache/spark/SparkContextSuite.scala @@ -1325,6 +1325,18 @@ class SparkContextSuite extends SparkFunSuite with LocalSparkContext with Eventu } } } + + test("SPARK-36772: Store application attemptId in BlockStoreClient for push based shuffle") { + val conf = new SparkConf().setAppName("testAppAttemptId") + .setMaster("pushbasedshuffleclustermanager") + conf.set(PUSH_BASED_SHUFFLE_ENABLED.key, "true") + conf.set(IS_TESTING.key, "true") + conf.set(SHUFFLE_SERVICE_ENABLED.key, "true") + sc = new SparkContext(conf) + val env = SparkEnv.get + assert(env.blockManager.blockStoreClient.getAppAttemptId.equals("1")) + } + } object SparkContextSuite { diff --git a/core/src/test/scala/org/apache/spark/scheduler/DAGSchedulerSuite.scala b/core/src/test/scala/org/apache/spark/scheduler/DAGSchedulerSuite.scala index 312d1f8316..deddaea4df 100644 --- a/core/src/test/scala/org/apache/spark/scheduler/DAGSchedulerSuite.scala +++ b/core/src/test/scala/org/apache/spark/scheduler/DAGSchedulerSuite.scala @@ -3961,7 +3961,9 @@ private class PushBasedClusterManager extends ExternalClusterManager { override def createTaskScheduler( sc: SparkContext, - masterURL: String): TaskScheduler = new TaskSchedulerImpl(sc, 1, isLocal = true) + masterURL: String): TaskScheduler = new TaskSchedulerImpl(sc, 1, isLocal = true) { + override def applicationAttemptId(): Option[String] = Some("1") + } override def initialize(scheduler: TaskScheduler, backend: SchedulerBackend): Unit = { val sc = scheduler.asInstanceOf[TaskSchedulerImpl]