diff --git a/common/network-common/src/main/java/org/apache/spark/network/util/TransportConf.java b/common/network-common/src/main/java/org/apache/spark/network/util/TransportConf.java index bc507a4405..f73e3ce2e0 100644 --- a/common/network-common/src/main/java/org/apache/spark/network/util/TransportConf.java +++ b/common/network-common/src/main/java/org/apache/spark/network/util/TransportConf.java @@ -427,11 +427,4 @@ public class TransportConf { public int ioExceptionsThresholdDuringMerge() { return conf.getInt("spark.shuffle.push.server.ioExceptionsThresholdDuringMerge", 4); } - - /** - * The application attemptID assigned from Hadoop YARN. - */ - public int appAttemptId() { - return conf.getInt("spark.app.attempt.id", -1); - } } diff --git a/common/network-shuffle/src/main/java/org/apache/spark/network/shuffle/BlockStoreClient.java b/common/network-shuffle/src/main/java/org/apache/spark/network/shuffle/BlockStoreClient.java index 6dc5fd5a70..253fb7aca1 100644 --- a/common/network-shuffle/src/main/java/org/apache/spark/network/shuffle/BlockStoreClient.java +++ b/common/network-shuffle/src/main/java/org/apache/spark/network/shuffle/BlockStoreClient.java @@ -46,6 +46,8 @@ public abstract class BlockStoreClient implements Closeable { protected volatile TransportClientFactory clientFactory; protected String appId; + // Store the application attemptId + private String appAttemptId; protected TransportConf transportConf; /** @@ -124,6 +126,16 @@ public abstract class BlockStoreClient implements Closeable { assert appId != null : "Called before init()"; } + // Set the application attemptId + public void setAppAttemptId(String appAttemptId) { + this.appAttemptId = appAttemptId; + } + + // Get the application attemptId + public String getAppAttemptId() { + return this.appAttemptId; + } + /** * Request the local disk directories for executors which are located at the same host with * the current BlockStoreClient(it can be ExternalBlockStoreClient or NettyBlockTransferService). diff --git a/common/network-shuffle/src/main/java/org/apache/spark/network/shuffle/ExternalBlockStoreClient.java b/common/network-shuffle/src/main/java/org/apache/spark/network/shuffle/ExternalBlockStoreClient.java index 4c0e9f301a..d2df77658c 100644 --- a/common/network-shuffle/src/main/java/org/apache/spark/network/shuffle/ExternalBlockStoreClient.java +++ b/common/network-shuffle/src/main/java/org/apache/spark/network/shuffle/ExternalBlockStoreClient.java @@ -52,6 +52,10 @@ public class ExternalBlockStoreClient extends BlockStoreClient { private final boolean authEnabled; private final SecretKeyHolder secretKeyHolder; private final long registrationTimeoutMs; + // Push based shuffle requires a comparable Id to distinguish the shuffle data among multiple + // application attempts. This variable is derived from the String typed appAttemptId. If no + // appAttemptId is set, the default comparableAppAttemptId is -1. + private int comparableAppAttemptId = -1; /** * Creates an external shuffle client, with SASL optionally enabled. If SASL is not enabled, @@ -83,6 +87,26 @@ public class ExternalBlockStoreClient extends BlockStoreClient { clientFactory = context.createClientFactory(bootstraps); } + @Override + public void setAppAttemptId(String appAttemptId) { + super.setAppAttemptId(appAttemptId); + setComparableAppAttemptId(appAttemptId); + } + + private void setComparableAppAttemptId(String appAttemptId) { + // For now, push based shuffle only supports running in YARN. + // Application attemptId in YARN is integer and it can be safely parsed + // to integer here. For the application attemptId from other cluster set up + // which is not numeric, it needs to generate this comparableAppAttemptId + // from the String typed appAttemptId through some other customized logic. + try { + this.comparableAppAttemptId = Integer.parseInt(appAttemptId); + } catch (NumberFormatException e) { + logger.warn("Push based shuffle requires comparable application attemptId, " + + "but the appAttemptId {} cannot be parsed to Integer", appAttemptId, e); + } + } + @Override public void fetchBlocks( String host, @@ -146,7 +170,7 @@ public class ExternalBlockStoreClient extends BlockStoreClient { assert inputListener instanceof BlockPushingListener : "Expecting a BlockPushingListener, but got " + inputListener.getClass(); TransportClient client = clientFactory.createClient(host, port); - new OneForOneBlockPusher(client, appId, transportConf.appAttemptId(), inputBlockId, + new OneForOneBlockPusher(client, appId, comparableAppAttemptId, inputBlockId, (BlockPushingListener) inputListener, buffersWithId).start(); } else { logger.info("This clientFactory was closed. Skipping further block push retries."); @@ -178,8 +202,8 @@ public class ExternalBlockStoreClient extends BlockStoreClient { try { TransportClient client = clientFactory.createClient(host, port); ByteBuffer finalizeShuffleMerge = - new FinalizeShuffleMerge(appId, transportConf.appAttemptId(), shuffleId, - shuffleMergeId).toByteBuffer(); + new FinalizeShuffleMerge( + appId, comparableAppAttemptId, shuffleId, shuffleMergeId).toByteBuffer(); client.sendRpc(finalizeShuffleMerge, new RpcResponseCallback() { @Override public void onSuccess(ByteBuffer response) { diff --git a/core/src/main/scala/org/apache/spark/SparkContext.scala b/core/src/main/scala/org/apache/spark/SparkContext.scala index 698a0227ed..3404a0f8ee 100644 --- a/core/src/main/scala/org/apache/spark/SparkContext.scala +++ b/core/src/main/scala/org/apache/spark/SparkContext.scala @@ -583,7 +583,10 @@ class SparkContext(config: SparkConf) extends Logging { _applicationId = _taskScheduler.applicationId() _applicationAttemptId = _taskScheduler.applicationAttemptId() _conf.set("spark.app.id", _applicationId) - _applicationAttemptId.foreach(attemptId => _conf.set(APP_ATTEMPT_ID, attemptId)) + _applicationAttemptId.foreach { attemptId => + _conf.set(APP_ATTEMPT_ID, attemptId) + _env.blockManager.blockStoreClient.setAppAttemptId(attemptId) + } if (_conf.get(UI_REVERSE_PROXY)) { val proxyUrl = _conf.get(UI_REVERSE_PROXY_URL.key, "").stripSuffix("/") + "/proxy/" + _applicationId diff --git a/core/src/main/scala/org/apache/spark/executor/CoarseGrainedExecutorBackend.scala b/core/src/main/scala/org/apache/spark/executor/CoarseGrainedExecutorBackend.scala index c87e61ae70..43887a7f0c 100644 --- a/core/src/main/scala/org/apache/spark/executor/CoarseGrainedExecutorBackend.scala +++ b/core/src/main/scala/org/apache/spark/executor/CoarseGrainedExecutorBackend.scala @@ -470,7 +470,11 @@ private[spark] object CoarseGrainedExecutorBackend extends Logging { driverConf.set(EXECUTOR_ID, arguments.executorId) val env = SparkEnv.createExecutorEnv(driverConf, arguments.executorId, arguments.bindAddress, arguments.hostname, arguments.cores, cfg.ioEncryptionKey, isLocal = false) - + // Set the application attemptId in the BlockStoreClient if available. + val appAttemptId = env.conf.get(APP_ATTEMPT_ID) + appAttemptId.foreach(attemptId => + env.blockManager.blockStoreClient.setAppAttemptId(attemptId) + ) val backend = backendCreateFn(env.rpcEnv, arguments, env, cfg.resourceProfile) env.rpcEnv.setupEndpoint("Executor", backend) arguments.workerUrl.foreach { url => diff --git a/core/src/test/scala/org/apache/spark/SparkContextSuite.scala b/core/src/test/scala/org/apache/spark/SparkContextSuite.scala index fcac90b56f..bc809f11cc 100644 --- a/core/src/test/scala/org/apache/spark/SparkContextSuite.scala +++ b/core/src/test/scala/org/apache/spark/SparkContextSuite.scala @@ -1325,6 +1325,18 @@ class SparkContextSuite extends SparkFunSuite with LocalSparkContext with Eventu } } } + + test("SPARK-36772: Store application attemptId in BlockStoreClient for push based shuffle") { + val conf = new SparkConf().setAppName("testAppAttemptId") + .setMaster("pushbasedshuffleclustermanager") + conf.set(PUSH_BASED_SHUFFLE_ENABLED.key, "true") + conf.set(IS_TESTING.key, "true") + conf.set(SHUFFLE_SERVICE_ENABLED.key, "true") + sc = new SparkContext(conf) + val env = SparkEnv.get + assert(env.blockManager.blockStoreClient.getAppAttemptId.equals("1")) + } + } object SparkContextSuite { diff --git a/core/src/test/scala/org/apache/spark/scheduler/DAGSchedulerSuite.scala b/core/src/test/scala/org/apache/spark/scheduler/DAGSchedulerSuite.scala index 312d1f8316..deddaea4df 100644 --- a/core/src/test/scala/org/apache/spark/scheduler/DAGSchedulerSuite.scala +++ b/core/src/test/scala/org/apache/spark/scheduler/DAGSchedulerSuite.scala @@ -3961,7 +3961,9 @@ private class PushBasedClusterManager extends ExternalClusterManager { override def createTaskScheduler( sc: SparkContext, - masterURL: String): TaskScheduler = new TaskSchedulerImpl(sc, 1, isLocal = true) + masterURL: String): TaskScheduler = new TaskSchedulerImpl(sc, 1, isLocal = true) { + override def applicationAttemptId(): Option[String] = Some("1") + } override def initialize(scheduler: TaskScheduler, backend: SchedulerBackend): Unit = { val sc = scheduler.asInstanceOf[TaskSchedulerImpl]