diff --git a/common/network-shuffle/src/main/java/org/apache/spark/network/shuffle/ExternalShuffleBlockResolver.java b/common/network-shuffle/src/main/java/org/apache/spark/network/shuffle/ExternalShuffleBlockResolver.java index 3071201266..54e870a9b5 100644 --- a/common/network-shuffle/src/main/java/org/apache/spark/network/shuffle/ExternalShuffleBlockResolver.java +++ b/common/network-shuffle/src/main/java/org/apache/spark/network/shuffle/ExternalShuffleBlockResolver.java @@ -76,6 +76,10 @@ public class ExternalShuffleBlockResolver { @VisibleForTesting final DB db; + private final List knownManagers = Arrays.asList( + "org.apache.spark.shuffle.sort.SortShuffleManager", + "org.apache.spark.shuffle.unsafe.UnsafeShuffleManager"); + public ExternalShuffleBlockResolver(TransportConf conf, File registeredExecutorFile) throws IOException { this(conf, registeredExecutorFile, Executors.newSingleThreadExecutor( @@ -149,6 +153,10 @@ public class ExternalShuffleBlockResolver { ExecutorShuffleInfo executorInfo) { AppExecId fullId = new AppExecId(appId, execId); logger.info("Registered executor {} with {}", fullId, executorInfo); + if (!knownManagers.contains(executorInfo.shuffleManager)) { + throw new UnsupportedOperationException( + "Unsupported shuffle manager of executor: " + executorInfo); + } try { if (db != null) { byte[] key = dbAppExecKey(fullId); @@ -183,12 +191,7 @@ public class ExternalShuffleBlockResolver { String.format("Executor is not registered (appId=%s, execId=%s)", appId, execId)); } - if ("sort".equals(executor.shuffleManager) || "tungsten-sort".equals(executor.shuffleManager)) { - return getSortBasedShuffleBlockData(executor, shuffleId, mapId, reduceId); - } else { - throw new UnsupportedOperationException( - "Unsupported shuffle manager: " + executor.shuffleManager); - } + return getSortBasedShuffleBlockData(executor, shuffleId, mapId, reduceId); } /** diff --git a/common/network-shuffle/src/test/java/org/apache/spark/network/sasl/SaslIntegrationSuite.java b/common/network-shuffle/src/test/java/org/apache/spark/network/sasl/SaslIntegrationSuite.java index 5bf9924185..6ba937dddb 100644 --- a/common/network-shuffle/src/test/java/org/apache/spark/network/sasl/SaslIntegrationSuite.java +++ b/common/network-shuffle/src/test/java/org/apache/spark/network/sasl/SaslIntegrationSuite.java @@ -220,7 +220,8 @@ public class SaslIntegrationSuite { // Register an executor so that the next steps work. ExecutorShuffleInfo executorInfo = new ExecutorShuffleInfo( - new String[] { System.getProperty("java.io.tmpdir") }, 1, "sort"); + new String[] { System.getProperty("java.io.tmpdir") }, 1, + "org.apache.spark.shuffle.sort.SortShuffleManager"); RegisterExecutor regmsg = new RegisterExecutor("app-1", "0", executorInfo); client1.sendRpcSync(regmsg.toByteBuffer(), TIMEOUT_MS); diff --git a/common/network-shuffle/src/test/java/org/apache/spark/network/shuffle/ExternalShuffleBlockResolverSuite.java b/common/network-shuffle/src/test/java/org/apache/spark/network/shuffle/ExternalShuffleBlockResolverSuite.java index de4840a588..35d6346474 100644 --- a/common/network-shuffle/src/test/java/org/apache/spark/network/shuffle/ExternalShuffleBlockResolverSuite.java +++ b/common/network-shuffle/src/test/java/org/apache/spark/network/shuffle/ExternalShuffleBlockResolverSuite.java @@ -37,6 +37,7 @@ import static org.junit.Assert.*; public class ExternalShuffleBlockResolverSuite { private static final String sortBlock0 = "Hello!"; private static final String sortBlock1 = "World!"; + private static final String SORT_MANAGER = "org.apache.spark.shuffle.sort.SortShuffleManager"; private static TestShuffleDataContext dataContext; @@ -71,8 +72,8 @@ public class ExternalShuffleBlockResolverSuite { } // Invalid shuffle manager - resolver.registerExecutor("app0", "exec2", dataContext.createExecutorInfo("foobar")); try { + resolver.registerExecutor("app0", "exec2", dataContext.createExecutorInfo("foobar")); resolver.getBlockData("app0", "exec2", "shuffle_1_1_0"); fail("Should have failed"); } catch (UnsupportedOperationException e) { @@ -81,7 +82,7 @@ public class ExternalShuffleBlockResolverSuite { // Nonexistent shuffle block resolver.registerExecutor("app0", "exec3", - dataContext.createExecutorInfo("sort")); + dataContext.createExecutorInfo(SORT_MANAGER)); try { resolver.getBlockData("app0", "exec3", "shuffle_1_1_0"); fail("Should have failed"); @@ -94,7 +95,7 @@ public class ExternalShuffleBlockResolverSuite { public void testSortShuffleBlocks() throws IOException { ExternalShuffleBlockResolver resolver = new ExternalShuffleBlockResolver(conf, null); resolver.registerExecutor("app0", "exec0", - dataContext.createExecutorInfo("sort")); + dataContext.createExecutorInfo(SORT_MANAGER)); InputStream block0Stream = resolver.getBlockData("app0", "exec0", "shuffle_0_0_0").createInputStream(); @@ -120,7 +121,7 @@ public class ExternalShuffleBlockResolverSuite { assertEquals(parsedAppId, appId); ExecutorShuffleInfo shuffleInfo = - new ExecutorShuffleInfo(new String[]{"/bippy", "/flippy"}, 7, "sort"); + new ExecutorShuffleInfo(new String[]{"/bippy", "/flippy"}, 7, SORT_MANAGER); String shuffleJson = mapper.writeValueAsString(shuffleInfo); ExecutorShuffleInfo parsedShuffleInfo = mapper.readValue(shuffleJson, ExecutorShuffleInfo.class); @@ -131,7 +132,7 @@ public class ExternalShuffleBlockResolverSuite { String legacyAppIdJson = "{\"appId\":\"foo\", \"execId\":\"bar\"}"; assertEquals(appId, mapper.readValue(legacyAppIdJson, AppExecId.class)); String legacyShuffleJson = "{\"localDirs\": [\"/bippy\", \"/flippy\"], " + - "\"subDirsPerLocalDir\": 7, \"shuffleManager\": \"sort\"}"; + "\"subDirsPerLocalDir\": 7, \"shuffleManager\": " + "\"" + SORT_MANAGER + "\"}"; assertEquals(shuffleInfo, mapper.readValue(legacyShuffleJson, ExecutorShuffleInfo.class)); } } diff --git a/common/network-shuffle/src/test/java/org/apache/spark/network/shuffle/ExternalShuffleCleanupSuite.java b/common/network-shuffle/src/test/java/org/apache/spark/network/shuffle/ExternalShuffleCleanupSuite.java index fa5cd1398a..bdd218db69 100644 --- a/common/network-shuffle/src/test/java/org/apache/spark/network/shuffle/ExternalShuffleCleanupSuite.java +++ b/common/network-shuffle/src/test/java/org/apache/spark/network/shuffle/ExternalShuffleCleanupSuite.java @@ -37,6 +37,7 @@ public class ExternalShuffleCleanupSuite { // Same-thread Executor used to ensure cleanup happens synchronously in test thread. private Executor sameThreadExecutor = MoreExecutors.sameThreadExecutor(); private TransportConf conf = new TransportConf("shuffle", new SystemPropertyConfigProvider()); + private static final String SORT_MANAGER = "org.apache.spark.shuffle.sort.SortShuffleManager"; @Test public void noCleanupAndCleanup() throws IOException { @@ -44,12 +45,12 @@ public class ExternalShuffleCleanupSuite { ExternalShuffleBlockResolver resolver = new ExternalShuffleBlockResolver(conf, null, sameThreadExecutor); - resolver.registerExecutor("app", "exec0", dataContext.createExecutorInfo("shuffleMgr")); + resolver.registerExecutor("app", "exec0", dataContext.createExecutorInfo(SORT_MANAGER)); resolver.applicationRemoved("app", false /* cleanup */); assertStillThere(dataContext); - resolver.registerExecutor("app", "exec1", dataContext.createExecutorInfo("shuffleMgr")); + resolver.registerExecutor("app", "exec1", dataContext.createExecutorInfo(SORT_MANAGER)); resolver.applicationRemoved("app", true /* cleanup */); assertCleanedUp(dataContext); @@ -69,7 +70,7 @@ public class ExternalShuffleCleanupSuite { ExternalShuffleBlockResolver manager = new ExternalShuffleBlockResolver(conf, null, noThreadExecutor); - manager.registerExecutor("app", "exec0", dataContext.createExecutorInfo("shuffleMgr")); + manager.registerExecutor("app", "exec0", dataContext.createExecutorInfo(SORT_MANAGER)); manager.applicationRemoved("app", true); assertTrue(cleanupCalled.get()); @@ -87,8 +88,8 @@ public class ExternalShuffleCleanupSuite { ExternalShuffleBlockResolver resolver = new ExternalShuffleBlockResolver(conf, null, sameThreadExecutor); - resolver.registerExecutor("app", "exec0", dataContext0.createExecutorInfo("shuffleMgr")); - resolver.registerExecutor("app", "exec1", dataContext1.createExecutorInfo("shuffleMgr")); + resolver.registerExecutor("app", "exec0", dataContext0.createExecutorInfo(SORT_MANAGER)); + resolver.registerExecutor("app", "exec1", dataContext1.createExecutorInfo(SORT_MANAGER)); resolver.applicationRemoved("app", true); assertCleanedUp(dataContext0); @@ -103,8 +104,8 @@ public class ExternalShuffleCleanupSuite { ExternalShuffleBlockResolver resolver = new ExternalShuffleBlockResolver(conf, null, sameThreadExecutor); - resolver.registerExecutor("app-0", "exec0", dataContext0.createExecutorInfo("shuffleMgr")); - resolver.registerExecutor("app-1", "exec0", dataContext1.createExecutorInfo("shuffleMgr")); + resolver.registerExecutor("app-0", "exec0", dataContext0.createExecutorInfo(SORT_MANAGER)); + resolver.registerExecutor("app-1", "exec0", dataContext1.createExecutorInfo(SORT_MANAGER)); resolver.applicationRemoved("app-nonexistent", true); assertStillThere(dataContext0); diff --git a/common/network-shuffle/src/test/java/org/apache/spark/network/shuffle/ExternalShuffleIntegrationSuite.java b/common/network-shuffle/src/test/java/org/apache/spark/network/shuffle/ExternalShuffleIntegrationSuite.java index 067c815c30..552b5366c5 100644 --- a/common/network-shuffle/src/test/java/org/apache/spark/network/shuffle/ExternalShuffleIntegrationSuite.java +++ b/common/network-shuffle/src/test/java/org/apache/spark/network/shuffle/ExternalShuffleIntegrationSuite.java @@ -48,8 +48,8 @@ import org.apache.spark.network.util.TransportConf; public class ExternalShuffleIntegrationSuite { - static String APP_ID = "app-id"; - static String SORT_MANAGER = "sort"; + private static final String APP_ID = "app-id"; + private static final String SORT_MANAGER = "org.apache.spark.shuffle.sort.SortShuffleManager"; // Executor 0 is sort-based static TestShuffleDataContext dataContext0; @@ -184,12 +184,9 @@ public class ExternalShuffleIntegrationSuite { exec0Fetch.releaseBuffers(); } - @Test - public void testFetchInvalidShuffle() throws Exception { + @Test (expected = RuntimeException.class) + public void testRegisterInvalidExecutor() throws Exception { registerExecutor("exec-1", dataContext0.createExecutorInfo("unknown sort manager")); - FetchResult execFetch = fetchBlocks("exec-1", new String[] { "shuffle_1_0_0" }); - assertTrue(execFetch.successBlocks.isEmpty()); - assertEquals(Sets.newHashSet("shuffle_1_0_0"), execFetch.failedBlocks); } @Test diff --git a/common/network-shuffle/src/test/java/org/apache/spark/network/shuffle/ExternalShuffleSecuritySuite.java b/common/network-shuffle/src/test/java/org/apache/spark/network/shuffle/ExternalShuffleSecuritySuite.java index acc1168f83..a0f69ca29a 100644 --- a/common/network-shuffle/src/test/java/org/apache/spark/network/shuffle/ExternalShuffleSecuritySuite.java +++ b/common/network-shuffle/src/test/java/org/apache/spark/network/shuffle/ExternalShuffleSecuritySuite.java @@ -93,7 +93,8 @@ public class ExternalShuffleSecuritySuite { client.init(appId); // Registration either succeeds or throws an exception. client.registerWithShuffleServer(TestUtils.getLocalHost(), server.getPort(), "exec0", - new ExecutorShuffleInfo(new String[0], 0, "")); + new ExecutorShuffleInfo(new String[0], 0, + "org.apache.spark.shuffle.sort.SortShuffleManager")); client.close(); } diff --git a/core/src/main/scala/org/apache/spark/shuffle/ShuffleManager.scala b/core/src/main/scala/org/apache/spark/shuffle/ShuffleManager.scala index 364fad664e..4ea8a7120a 100644 --- a/core/src/main/scala/org/apache/spark/shuffle/ShuffleManager.scala +++ b/core/src/main/scala/org/apache/spark/shuffle/ShuffleManager.scala @@ -29,9 +29,6 @@ import org.apache.spark.{ShuffleDependency, TaskContext} */ private[spark] trait ShuffleManager { - /** Return short name for the ShuffleManager */ - val shortName: String - /** * Register a shuffle with the manager and obtain a handle for it to pass to tasks. */ diff --git a/core/src/main/scala/org/apache/spark/shuffle/sort/SortShuffleManager.scala b/core/src/main/scala/org/apache/spark/shuffle/sort/SortShuffleManager.scala index 9bfd966e33..5e977a16fe 100644 --- a/core/src/main/scala/org/apache/spark/shuffle/sort/SortShuffleManager.scala +++ b/core/src/main/scala/org/apache/spark/shuffle/sort/SortShuffleManager.scala @@ -79,8 +79,6 @@ private[spark] class SortShuffleManager(conf: SparkConf) extends ShuffleManager */ private[this] val numMapsForShuffle = new ConcurrentHashMap[Int, Int]() - override val shortName: String = "sort" - override val shuffleBlockResolver = new IndexShuffleBlockResolver(conf) /** diff --git a/core/src/main/scala/org/apache/spark/storage/BlockManager.scala b/core/src/main/scala/org/apache/spark/storage/BlockManager.scala index 22bc76b143..1c4921666f 100644 --- a/core/src/main/scala/org/apache/spark/storage/BlockManager.scala +++ b/core/src/main/scala/org/apache/spark/storage/BlockManager.scala @@ -182,7 +182,7 @@ private[spark] class BlockManager( val shuffleConfig = new ExecutorShuffleInfo( diskBlockManager.localDirs.map(_.toString), diskBlockManager.subDirsPerLocalDir, - shuffleManager.shortName) + shuffleManager.getClass.getName) val MAX_ATTEMPTS = 3 val SLEEP_TIME_SECS = 5 diff --git a/yarn/src/test/scala/org/apache/spark/network/yarn/YarnShuffleServiceSuite.scala b/yarn/src/test/scala/org/apache/spark/network/yarn/YarnShuffleServiceSuite.scala index 5a426b86d1..0e433f6c1b 100644 --- a/yarn/src/test/scala/org/apache/spark/network/yarn/YarnShuffleServiceSuite.scala +++ b/yarn/src/test/scala/org/apache/spark/network/yarn/YarnShuffleServiceSuite.scala @@ -32,6 +32,7 @@ import org.apache.spark.network.shuffle.protocol.ExecutorShuffleInfo class YarnShuffleServiceSuite extends SparkFunSuite with Matchers with BeforeAndAfterEach { private[yarn] var yarnConfig: YarnConfiguration = new YarnConfiguration + private[yarn] val SORT_MANAGER = "org.apache.spark.shuffle.sort.SortShuffleManager" override def beforeEach(): Unit = { super.beforeEach() @@ -87,8 +88,8 @@ class YarnShuffleServiceSuite extends SparkFunSuite with Matchers with BeforeAnd val execStateFile = s1.registeredExecutorFile execStateFile should not be (null) - val shuffleInfo1 = new ExecutorShuffleInfo(Array("/foo", "/bar"), 3, "sort") - val shuffleInfo2 = new ExecutorShuffleInfo(Array("/bippy"), 5, "hash") + val shuffleInfo1 = new ExecutorShuffleInfo(Array("/foo", "/bar"), 3, SORT_MANAGER) + val shuffleInfo2 = new ExecutorShuffleInfo(Array("/bippy"), 5, SORT_MANAGER) val blockHandler = s1.blockHandler val blockResolver = ShuffleTestAccessor.getBlockResolver(blockHandler) @@ -158,8 +159,8 @@ class YarnShuffleServiceSuite extends SparkFunSuite with Matchers with BeforeAnd val execStateFile = s1.registeredExecutorFile execStateFile should not be (null) - val shuffleInfo1 = new ExecutorShuffleInfo(Array("/foo", "/bar"), 3, "sort") - val shuffleInfo2 = new ExecutorShuffleInfo(Array("/bippy"), 5, "hash") + val shuffleInfo1 = new ExecutorShuffleInfo(Array("/foo", "/bar"), 3, SORT_MANAGER) + val shuffleInfo2 = new ExecutorShuffleInfo(Array("/bippy"), 5, SORT_MANAGER) val blockHandler = s1.blockHandler val blockResolver = ShuffleTestAccessor.getBlockResolver(blockHandler) @@ -186,7 +187,7 @@ class YarnShuffleServiceSuite extends SparkFunSuite with Matchers with BeforeAnd s1.initializeApplication(app1Data) val execStateFile = s1.registeredExecutorFile - val shuffleInfo1 = new ExecutorShuffleInfo(Array("/foo", "/bar"), 3, "sort") + val shuffleInfo1 = new ExecutorShuffleInfo(Array("/foo", "/bar"), 3, SORT_MANAGER) val blockHandler = s1.blockHandler val blockResolver = ShuffleTestAccessor.getBlockResolver(blockHandler) @@ -218,7 +219,7 @@ class YarnShuffleServiceSuite extends SparkFunSuite with Matchers with BeforeAnd val app2Data: ApplicationInitializationContext = new ApplicationInitializationContext("user", app2Id, null) s2.initializeApplication(app2Data) - val shuffleInfo2 = new ExecutorShuffleInfo(Array("/bippy"), 5, "hash") + val shuffleInfo2 = new ExecutorShuffleInfo(Array("/bippy"), 5, SORT_MANAGER) resolver2.registerExecutor(app2Id.toString, "exec-2", shuffleInfo2) ShuffleTestAccessor.getExecutorInfo(app2Id, "exec-2", resolver2) should be (Some(shuffleInfo2)) s2.stop()