From a74ec6d7bbfe185ba995dcb02d69e90a089c293e Mon Sep 17 00:00:00 2001 From: Thomas Graves Date: Mon, 9 Oct 2017 12:56:37 -0700 Subject: [PATCH] [SPARK-22218] spark shuffle services fails to update secret on app re-attempts This patch fixes application re-attempts when running spark on yarn using the external shuffle service with security on. Currently executors will fail to launch on any application re-attempt when launched on a nodemanager that had an executor from the first attempt. The reason for this is because we aren't updating the secret key after the first application attempt. The fix here is to just remove the containskey check to see if it already exists. In this way, we always add it and make sure its the most recent secret. Similarly remove the check for containsKey on the remove since its just adding extra check that isn't really needed. Note this worked before spark 2.2 because the check used to be contains (which was looking for the value) rather then containsKey, so that never matched and it was just always adding the new secret. Patch was tested on a 10 node cluster as well as added the unit test. The test ran was a wordcount where the output directory already existed. With the bug present the application attempt failed with max number of executor Failures which were all saslExceptions. With the fix present the application re-attempts fail with directory already exists or when you remove the directory between attempts the re-attemps succeed. Author: Thomas Graves Closes #19450 from tgravescs/SPARK-22218. --- .../network/sasl/ShuffleSecretManager.java | 19 +++---- .../sasl/ShuffleSecretManagerSuite.java | 55 +++++++++++++++++++ 2 files changed, 62 insertions(+), 12 deletions(-) create mode 100644 common/network-shuffle/src/test/java/org/apache/spark/network/sasl/ShuffleSecretManagerSuite.java diff --git a/common/network-shuffle/src/main/java/org/apache/spark/network/sasl/ShuffleSecretManager.java b/common/network-shuffle/src/main/java/org/apache/spark/network/sasl/ShuffleSecretManager.java index d2d008f8a3..7253101f41 100644 --- a/common/network-shuffle/src/main/java/org/apache/spark/network/sasl/ShuffleSecretManager.java +++ b/common/network-shuffle/src/main/java/org/apache/spark/network/sasl/ShuffleSecretManager.java @@ -47,12 +47,11 @@ public class ShuffleSecretManager implements SecretKeyHolder { * fetching shuffle files written by other executors in this application. */ public void registerApp(String appId, String shuffleSecret) { - if (!shuffleSecretMap.containsKey(appId)) { - shuffleSecretMap.put(appId, shuffleSecret); - logger.info("Registered shuffle secret for application {}", appId); - } else { - logger.debug("Application {} already registered", appId); - } + // Always put the new secret information to make sure it's the most up to date. + // Otherwise we have to specifically look at the application attempt in addition + // to the applicationId since the secrets change between application attempts on yarn. + shuffleSecretMap.put(appId, shuffleSecret); + logger.info("Registered shuffle secret for application {}", appId); } /** @@ -67,12 +66,8 @@ public class ShuffleSecretManager implements SecretKeyHolder { * This is called when the application terminates. */ public void unregisterApp(String appId) { - if (shuffleSecretMap.containsKey(appId)) { - shuffleSecretMap.remove(appId); - logger.info("Unregistered shuffle secret for application {}", appId); - } else { - logger.warn("Attempted to unregister application {} when it is not registered", appId); - } + shuffleSecretMap.remove(appId); + logger.info("Unregistered shuffle secret for application {}", appId); } /** diff --git a/common/network-shuffle/src/test/java/org/apache/spark/network/sasl/ShuffleSecretManagerSuite.java b/common/network-shuffle/src/test/java/org/apache/spark/network/sasl/ShuffleSecretManagerSuite.java new file mode 100644 index 0000000000..46c4c33865 --- /dev/null +++ b/common/network-shuffle/src/test/java/org/apache/spark/network/sasl/ShuffleSecretManagerSuite.java @@ -0,0 +1,55 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.spark.network.sasl; + +import java.nio.ByteBuffer; + +import org.junit.Test; +import static org.junit.Assert.*; + +public class ShuffleSecretManagerSuite { + static String app1 = "app1"; + static String app2 = "app2"; + static String pw1 = "password1"; + static String pw2 = "password2"; + static String pw1update = "password1update"; + static String pw2update = "password2update"; + + @Test + public void testMultipleRegisters() { + ShuffleSecretManager secretManager = new ShuffleSecretManager(); + secretManager.registerApp(app1, pw1); + assertEquals(pw1, secretManager.getSecretKey(app1)); + secretManager.registerApp(app2, ByteBuffer.wrap(pw2.getBytes())); + assertEquals(pw2, secretManager.getSecretKey(app2)); + + // now update the password for the apps and make sure it takes affect + secretManager.registerApp(app1, pw1update); + assertEquals(pw1update, secretManager.getSecretKey(app1)); + secretManager.registerApp(app2, ByteBuffer.wrap(pw2update.getBytes())); + assertEquals(pw2update, secretManager.getSecretKey(app2)); + + secretManager.unregisterApp(app1); + assertNull(secretManager.getSecretKey(app1)); + assertEquals(pw2update, secretManager.getSecretKey(app2)); + + secretManager.unregisterApp(app2); + assertNull(secretManager.getSecretKey(app2)); + assertNull(secretManager.getSecretKey(app1)); + } +}