[SPARK-14731][shuffle]Revert SPARK-12130 to make 2.0 shuffle service compatible with 1.x
## What changes were proposed in this pull request? SPARK-12130 make 2.0 shuffle service incompatible with 1.x. So from discussion: [http://apache-spark-developers-list.1001551.n3.nabble.com/YARN-Shuffle-service-and-its-compatibility-td17222.html](url) we should maintain compatibility between Spark 1.x and Spark 2.x's shuffle service. I put string comparison into executor's register at first avoid string comparison in getBlockData every time. ## How was this patch tested? N/A Author: Lianhui Wang <lianhuiwang09@gmail.com> Closes #12568 from lianhuiwang/SPARK-14731.
This commit is contained in:
parent
425f691646
commit
6bfe42a3be
|
@ -76,6 +76,10 @@ public class ExternalShuffleBlockResolver {
|
||||||
@VisibleForTesting
|
@VisibleForTesting
|
||||||
final DB db;
|
final DB db;
|
||||||
|
|
||||||
|
private final List<String> knownManagers = Arrays.asList(
|
||||||
|
"org.apache.spark.shuffle.sort.SortShuffleManager",
|
||||||
|
"org.apache.spark.shuffle.unsafe.UnsafeShuffleManager");
|
||||||
|
|
||||||
public ExternalShuffleBlockResolver(TransportConf conf, File registeredExecutorFile)
|
public ExternalShuffleBlockResolver(TransportConf conf, File registeredExecutorFile)
|
||||||
throws IOException {
|
throws IOException {
|
||||||
this(conf, registeredExecutorFile, Executors.newSingleThreadExecutor(
|
this(conf, registeredExecutorFile, Executors.newSingleThreadExecutor(
|
||||||
|
@ -149,6 +153,10 @@ public class ExternalShuffleBlockResolver {
|
||||||
ExecutorShuffleInfo executorInfo) {
|
ExecutorShuffleInfo executorInfo) {
|
||||||
AppExecId fullId = new AppExecId(appId, execId);
|
AppExecId fullId = new AppExecId(appId, execId);
|
||||||
logger.info("Registered executor {} with {}", fullId, executorInfo);
|
logger.info("Registered executor {} with {}", fullId, executorInfo);
|
||||||
|
if (!knownManagers.contains(executorInfo.shuffleManager)) {
|
||||||
|
throw new UnsupportedOperationException(
|
||||||
|
"Unsupported shuffle manager of executor: " + executorInfo);
|
||||||
|
}
|
||||||
try {
|
try {
|
||||||
if (db != null) {
|
if (db != null) {
|
||||||
byte[] key = dbAppExecKey(fullId);
|
byte[] key = dbAppExecKey(fullId);
|
||||||
|
@ -183,12 +191,7 @@ public class ExternalShuffleBlockResolver {
|
||||||
String.format("Executor is not registered (appId=%s, execId=%s)", appId, execId));
|
String.format("Executor is not registered (appId=%s, execId=%s)", appId, execId));
|
||||||
}
|
}
|
||||||
|
|
||||||
if ("sort".equals(executor.shuffleManager) || "tungsten-sort".equals(executor.shuffleManager)) {
|
|
||||||
return getSortBasedShuffleBlockData(executor, shuffleId, mapId, reduceId);
|
return getSortBasedShuffleBlockData(executor, shuffleId, mapId, reduceId);
|
||||||
} else {
|
|
||||||
throw new UnsupportedOperationException(
|
|
||||||
"Unsupported shuffle manager: " + executor.shuffleManager);
|
|
||||||
}
|
|
||||||
}
|
}
|
||||||
|
|
||||||
/**
|
/**
|
||||||
|
|
|
@ -220,7 +220,8 @@ public class SaslIntegrationSuite {
|
||||||
|
|
||||||
// Register an executor so that the next steps work.
|
// Register an executor so that the next steps work.
|
||||||
ExecutorShuffleInfo executorInfo = new ExecutorShuffleInfo(
|
ExecutorShuffleInfo executorInfo = new ExecutorShuffleInfo(
|
||||||
new String[] { System.getProperty("java.io.tmpdir") }, 1, "sort");
|
new String[] { System.getProperty("java.io.tmpdir") }, 1,
|
||||||
|
"org.apache.spark.shuffle.sort.SortShuffleManager");
|
||||||
RegisterExecutor regmsg = new RegisterExecutor("app-1", "0", executorInfo);
|
RegisterExecutor regmsg = new RegisterExecutor("app-1", "0", executorInfo);
|
||||||
client1.sendRpcSync(regmsg.toByteBuffer(), TIMEOUT_MS);
|
client1.sendRpcSync(regmsg.toByteBuffer(), TIMEOUT_MS);
|
||||||
|
|
||||||
|
|
|
@ -37,6 +37,7 @@ import static org.junit.Assert.*;
|
||||||
public class ExternalShuffleBlockResolverSuite {
|
public class ExternalShuffleBlockResolverSuite {
|
||||||
private static final String sortBlock0 = "Hello!";
|
private static final String sortBlock0 = "Hello!";
|
||||||
private static final String sortBlock1 = "World!";
|
private static final String sortBlock1 = "World!";
|
||||||
|
private static final String SORT_MANAGER = "org.apache.spark.shuffle.sort.SortShuffleManager";
|
||||||
|
|
||||||
private static TestShuffleDataContext dataContext;
|
private static TestShuffleDataContext dataContext;
|
||||||
|
|
||||||
|
@ -71,8 +72,8 @@ public class ExternalShuffleBlockResolverSuite {
|
||||||
}
|
}
|
||||||
|
|
||||||
// Invalid shuffle manager
|
// Invalid shuffle manager
|
||||||
resolver.registerExecutor("app0", "exec2", dataContext.createExecutorInfo("foobar"));
|
|
||||||
try {
|
try {
|
||||||
|
resolver.registerExecutor("app0", "exec2", dataContext.createExecutorInfo("foobar"));
|
||||||
resolver.getBlockData("app0", "exec2", "shuffle_1_1_0");
|
resolver.getBlockData("app0", "exec2", "shuffle_1_1_0");
|
||||||
fail("Should have failed");
|
fail("Should have failed");
|
||||||
} catch (UnsupportedOperationException e) {
|
} catch (UnsupportedOperationException e) {
|
||||||
|
@ -81,7 +82,7 @@ public class ExternalShuffleBlockResolverSuite {
|
||||||
|
|
||||||
// Nonexistent shuffle block
|
// Nonexistent shuffle block
|
||||||
resolver.registerExecutor("app0", "exec3",
|
resolver.registerExecutor("app0", "exec3",
|
||||||
dataContext.createExecutorInfo("sort"));
|
dataContext.createExecutorInfo(SORT_MANAGER));
|
||||||
try {
|
try {
|
||||||
resolver.getBlockData("app0", "exec3", "shuffle_1_1_0");
|
resolver.getBlockData("app0", "exec3", "shuffle_1_1_0");
|
||||||
fail("Should have failed");
|
fail("Should have failed");
|
||||||
|
@ -94,7 +95,7 @@ public class ExternalShuffleBlockResolverSuite {
|
||||||
public void testSortShuffleBlocks() throws IOException {
|
public void testSortShuffleBlocks() throws IOException {
|
||||||
ExternalShuffleBlockResolver resolver = new ExternalShuffleBlockResolver(conf, null);
|
ExternalShuffleBlockResolver resolver = new ExternalShuffleBlockResolver(conf, null);
|
||||||
resolver.registerExecutor("app0", "exec0",
|
resolver.registerExecutor("app0", "exec0",
|
||||||
dataContext.createExecutorInfo("sort"));
|
dataContext.createExecutorInfo(SORT_MANAGER));
|
||||||
|
|
||||||
InputStream block0Stream =
|
InputStream block0Stream =
|
||||||
resolver.getBlockData("app0", "exec0", "shuffle_0_0_0").createInputStream();
|
resolver.getBlockData("app0", "exec0", "shuffle_0_0_0").createInputStream();
|
||||||
|
@ -120,7 +121,7 @@ public class ExternalShuffleBlockResolverSuite {
|
||||||
assertEquals(parsedAppId, appId);
|
assertEquals(parsedAppId, appId);
|
||||||
|
|
||||||
ExecutorShuffleInfo shuffleInfo =
|
ExecutorShuffleInfo shuffleInfo =
|
||||||
new ExecutorShuffleInfo(new String[]{"/bippy", "/flippy"}, 7, "sort");
|
new ExecutorShuffleInfo(new String[]{"/bippy", "/flippy"}, 7, SORT_MANAGER);
|
||||||
String shuffleJson = mapper.writeValueAsString(shuffleInfo);
|
String shuffleJson = mapper.writeValueAsString(shuffleInfo);
|
||||||
ExecutorShuffleInfo parsedShuffleInfo =
|
ExecutorShuffleInfo parsedShuffleInfo =
|
||||||
mapper.readValue(shuffleJson, ExecutorShuffleInfo.class);
|
mapper.readValue(shuffleJson, ExecutorShuffleInfo.class);
|
||||||
|
@ -131,7 +132,7 @@ public class ExternalShuffleBlockResolverSuite {
|
||||||
String legacyAppIdJson = "{\"appId\":\"foo\", \"execId\":\"bar\"}";
|
String legacyAppIdJson = "{\"appId\":\"foo\", \"execId\":\"bar\"}";
|
||||||
assertEquals(appId, mapper.readValue(legacyAppIdJson, AppExecId.class));
|
assertEquals(appId, mapper.readValue(legacyAppIdJson, AppExecId.class));
|
||||||
String legacyShuffleJson = "{\"localDirs\": [\"/bippy\", \"/flippy\"], " +
|
String legacyShuffleJson = "{\"localDirs\": [\"/bippy\", \"/flippy\"], " +
|
||||||
"\"subDirsPerLocalDir\": 7, \"shuffleManager\": \"sort\"}";
|
"\"subDirsPerLocalDir\": 7, \"shuffleManager\": " + "\"" + SORT_MANAGER + "\"}";
|
||||||
assertEquals(shuffleInfo, mapper.readValue(legacyShuffleJson, ExecutorShuffleInfo.class));
|
assertEquals(shuffleInfo, mapper.readValue(legacyShuffleJson, ExecutorShuffleInfo.class));
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
|
@ -37,6 +37,7 @@ public class ExternalShuffleCleanupSuite {
|
||||||
// Same-thread Executor used to ensure cleanup happens synchronously in test thread.
|
// Same-thread Executor used to ensure cleanup happens synchronously in test thread.
|
||||||
private Executor sameThreadExecutor = MoreExecutors.sameThreadExecutor();
|
private Executor sameThreadExecutor = MoreExecutors.sameThreadExecutor();
|
||||||
private TransportConf conf = new TransportConf("shuffle", new SystemPropertyConfigProvider());
|
private TransportConf conf = new TransportConf("shuffle", new SystemPropertyConfigProvider());
|
||||||
|
private static final String SORT_MANAGER = "org.apache.spark.shuffle.sort.SortShuffleManager";
|
||||||
|
|
||||||
@Test
|
@Test
|
||||||
public void noCleanupAndCleanup() throws IOException {
|
public void noCleanupAndCleanup() throws IOException {
|
||||||
|
@ -44,12 +45,12 @@ public class ExternalShuffleCleanupSuite {
|
||||||
|
|
||||||
ExternalShuffleBlockResolver resolver =
|
ExternalShuffleBlockResolver resolver =
|
||||||
new ExternalShuffleBlockResolver(conf, null, sameThreadExecutor);
|
new ExternalShuffleBlockResolver(conf, null, sameThreadExecutor);
|
||||||
resolver.registerExecutor("app", "exec0", dataContext.createExecutorInfo("shuffleMgr"));
|
resolver.registerExecutor("app", "exec0", dataContext.createExecutorInfo(SORT_MANAGER));
|
||||||
resolver.applicationRemoved("app", false /* cleanup */);
|
resolver.applicationRemoved("app", false /* cleanup */);
|
||||||
|
|
||||||
assertStillThere(dataContext);
|
assertStillThere(dataContext);
|
||||||
|
|
||||||
resolver.registerExecutor("app", "exec1", dataContext.createExecutorInfo("shuffleMgr"));
|
resolver.registerExecutor("app", "exec1", dataContext.createExecutorInfo(SORT_MANAGER));
|
||||||
resolver.applicationRemoved("app", true /* cleanup */);
|
resolver.applicationRemoved("app", true /* cleanup */);
|
||||||
|
|
||||||
assertCleanedUp(dataContext);
|
assertCleanedUp(dataContext);
|
||||||
|
@ -69,7 +70,7 @@ public class ExternalShuffleCleanupSuite {
|
||||||
ExternalShuffleBlockResolver manager =
|
ExternalShuffleBlockResolver manager =
|
||||||
new ExternalShuffleBlockResolver(conf, null, noThreadExecutor);
|
new ExternalShuffleBlockResolver(conf, null, noThreadExecutor);
|
||||||
|
|
||||||
manager.registerExecutor("app", "exec0", dataContext.createExecutorInfo("shuffleMgr"));
|
manager.registerExecutor("app", "exec0", dataContext.createExecutorInfo(SORT_MANAGER));
|
||||||
manager.applicationRemoved("app", true);
|
manager.applicationRemoved("app", true);
|
||||||
|
|
||||||
assertTrue(cleanupCalled.get());
|
assertTrue(cleanupCalled.get());
|
||||||
|
@ -87,8 +88,8 @@ public class ExternalShuffleCleanupSuite {
|
||||||
ExternalShuffleBlockResolver resolver =
|
ExternalShuffleBlockResolver resolver =
|
||||||
new ExternalShuffleBlockResolver(conf, null, sameThreadExecutor);
|
new ExternalShuffleBlockResolver(conf, null, sameThreadExecutor);
|
||||||
|
|
||||||
resolver.registerExecutor("app", "exec0", dataContext0.createExecutorInfo("shuffleMgr"));
|
resolver.registerExecutor("app", "exec0", dataContext0.createExecutorInfo(SORT_MANAGER));
|
||||||
resolver.registerExecutor("app", "exec1", dataContext1.createExecutorInfo("shuffleMgr"));
|
resolver.registerExecutor("app", "exec1", dataContext1.createExecutorInfo(SORT_MANAGER));
|
||||||
resolver.applicationRemoved("app", true);
|
resolver.applicationRemoved("app", true);
|
||||||
|
|
||||||
assertCleanedUp(dataContext0);
|
assertCleanedUp(dataContext0);
|
||||||
|
@ -103,8 +104,8 @@ public class ExternalShuffleCleanupSuite {
|
||||||
ExternalShuffleBlockResolver resolver =
|
ExternalShuffleBlockResolver resolver =
|
||||||
new ExternalShuffleBlockResolver(conf, null, sameThreadExecutor);
|
new ExternalShuffleBlockResolver(conf, null, sameThreadExecutor);
|
||||||
|
|
||||||
resolver.registerExecutor("app-0", "exec0", dataContext0.createExecutorInfo("shuffleMgr"));
|
resolver.registerExecutor("app-0", "exec0", dataContext0.createExecutorInfo(SORT_MANAGER));
|
||||||
resolver.registerExecutor("app-1", "exec0", dataContext1.createExecutorInfo("shuffleMgr"));
|
resolver.registerExecutor("app-1", "exec0", dataContext1.createExecutorInfo(SORT_MANAGER));
|
||||||
|
|
||||||
resolver.applicationRemoved("app-nonexistent", true);
|
resolver.applicationRemoved("app-nonexistent", true);
|
||||||
assertStillThere(dataContext0);
|
assertStillThere(dataContext0);
|
||||||
|
|
|
@ -48,8 +48,8 @@ import org.apache.spark.network.util.TransportConf;
|
||||||
|
|
||||||
public class ExternalShuffleIntegrationSuite {
|
public class ExternalShuffleIntegrationSuite {
|
||||||
|
|
||||||
static String APP_ID = "app-id";
|
private static final String APP_ID = "app-id";
|
||||||
static String SORT_MANAGER = "sort";
|
private static final String SORT_MANAGER = "org.apache.spark.shuffle.sort.SortShuffleManager";
|
||||||
|
|
||||||
// Executor 0 is sort-based
|
// Executor 0 is sort-based
|
||||||
static TestShuffleDataContext dataContext0;
|
static TestShuffleDataContext dataContext0;
|
||||||
|
@ -184,12 +184,9 @@ public class ExternalShuffleIntegrationSuite {
|
||||||
exec0Fetch.releaseBuffers();
|
exec0Fetch.releaseBuffers();
|
||||||
}
|
}
|
||||||
|
|
||||||
@Test
|
@Test (expected = RuntimeException.class)
|
||||||
public void testFetchInvalidShuffle() throws Exception {
|
public void testRegisterInvalidExecutor() throws Exception {
|
||||||
registerExecutor("exec-1", dataContext0.createExecutorInfo("unknown sort manager"));
|
registerExecutor("exec-1", dataContext0.createExecutorInfo("unknown sort manager"));
|
||||||
FetchResult execFetch = fetchBlocks("exec-1", new String[] { "shuffle_1_0_0" });
|
|
||||||
assertTrue(execFetch.successBlocks.isEmpty());
|
|
||||||
assertEquals(Sets.newHashSet("shuffle_1_0_0"), execFetch.failedBlocks);
|
|
||||||
}
|
}
|
||||||
|
|
||||||
@Test
|
@Test
|
||||||
|
|
|
@ -93,7 +93,8 @@ public class ExternalShuffleSecuritySuite {
|
||||||
client.init(appId);
|
client.init(appId);
|
||||||
// Registration either succeeds or throws an exception.
|
// Registration either succeeds or throws an exception.
|
||||||
client.registerWithShuffleServer(TestUtils.getLocalHost(), server.getPort(), "exec0",
|
client.registerWithShuffleServer(TestUtils.getLocalHost(), server.getPort(), "exec0",
|
||||||
new ExecutorShuffleInfo(new String[0], 0, ""));
|
new ExecutorShuffleInfo(new String[0], 0,
|
||||||
|
"org.apache.spark.shuffle.sort.SortShuffleManager"));
|
||||||
client.close();
|
client.close();
|
||||||
}
|
}
|
||||||
|
|
||||||
|
|
|
@ -29,9 +29,6 @@ import org.apache.spark.{ShuffleDependency, TaskContext}
|
||||||
*/
|
*/
|
||||||
private[spark] trait ShuffleManager {
|
private[spark] trait ShuffleManager {
|
||||||
|
|
||||||
/** Return short name for the ShuffleManager */
|
|
||||||
val shortName: String
|
|
||||||
|
|
||||||
/**
|
/**
|
||||||
* Register a shuffle with the manager and obtain a handle for it to pass to tasks.
|
* Register a shuffle with the manager and obtain a handle for it to pass to tasks.
|
||||||
*/
|
*/
|
||||||
|
|
|
@ -79,8 +79,6 @@ private[spark] class SortShuffleManager(conf: SparkConf) extends ShuffleManager
|
||||||
*/
|
*/
|
||||||
private[this] val numMapsForShuffle = new ConcurrentHashMap[Int, Int]()
|
private[this] val numMapsForShuffle = new ConcurrentHashMap[Int, Int]()
|
||||||
|
|
||||||
override val shortName: String = "sort"
|
|
||||||
|
|
||||||
override val shuffleBlockResolver = new IndexShuffleBlockResolver(conf)
|
override val shuffleBlockResolver = new IndexShuffleBlockResolver(conf)
|
||||||
|
|
||||||
/**
|
/**
|
||||||
|
|
|
@ -182,7 +182,7 @@ private[spark] class BlockManager(
|
||||||
val shuffleConfig = new ExecutorShuffleInfo(
|
val shuffleConfig = new ExecutorShuffleInfo(
|
||||||
diskBlockManager.localDirs.map(_.toString),
|
diskBlockManager.localDirs.map(_.toString),
|
||||||
diskBlockManager.subDirsPerLocalDir,
|
diskBlockManager.subDirsPerLocalDir,
|
||||||
shuffleManager.shortName)
|
shuffleManager.getClass.getName)
|
||||||
|
|
||||||
val MAX_ATTEMPTS = 3
|
val MAX_ATTEMPTS = 3
|
||||||
val SLEEP_TIME_SECS = 5
|
val SLEEP_TIME_SECS = 5
|
||||||
|
|
|
@ -32,6 +32,7 @@ import org.apache.spark.network.shuffle.protocol.ExecutorShuffleInfo
|
||||||
|
|
||||||
class YarnShuffleServiceSuite extends SparkFunSuite with Matchers with BeforeAndAfterEach {
|
class YarnShuffleServiceSuite extends SparkFunSuite with Matchers with BeforeAndAfterEach {
|
||||||
private[yarn] var yarnConfig: YarnConfiguration = new YarnConfiguration
|
private[yarn] var yarnConfig: YarnConfiguration = new YarnConfiguration
|
||||||
|
private[yarn] val SORT_MANAGER = "org.apache.spark.shuffle.sort.SortShuffleManager"
|
||||||
|
|
||||||
override def beforeEach(): Unit = {
|
override def beforeEach(): Unit = {
|
||||||
super.beforeEach()
|
super.beforeEach()
|
||||||
|
@ -87,8 +88,8 @@ class YarnShuffleServiceSuite extends SparkFunSuite with Matchers with BeforeAnd
|
||||||
|
|
||||||
val execStateFile = s1.registeredExecutorFile
|
val execStateFile = s1.registeredExecutorFile
|
||||||
execStateFile should not be (null)
|
execStateFile should not be (null)
|
||||||
val shuffleInfo1 = new ExecutorShuffleInfo(Array("/foo", "/bar"), 3, "sort")
|
val shuffleInfo1 = new ExecutorShuffleInfo(Array("/foo", "/bar"), 3, SORT_MANAGER)
|
||||||
val shuffleInfo2 = new ExecutorShuffleInfo(Array("/bippy"), 5, "hash")
|
val shuffleInfo2 = new ExecutorShuffleInfo(Array("/bippy"), 5, SORT_MANAGER)
|
||||||
|
|
||||||
val blockHandler = s1.blockHandler
|
val blockHandler = s1.blockHandler
|
||||||
val blockResolver = ShuffleTestAccessor.getBlockResolver(blockHandler)
|
val blockResolver = ShuffleTestAccessor.getBlockResolver(blockHandler)
|
||||||
|
@ -158,8 +159,8 @@ class YarnShuffleServiceSuite extends SparkFunSuite with Matchers with BeforeAnd
|
||||||
|
|
||||||
val execStateFile = s1.registeredExecutorFile
|
val execStateFile = s1.registeredExecutorFile
|
||||||
execStateFile should not be (null)
|
execStateFile should not be (null)
|
||||||
val shuffleInfo1 = new ExecutorShuffleInfo(Array("/foo", "/bar"), 3, "sort")
|
val shuffleInfo1 = new ExecutorShuffleInfo(Array("/foo", "/bar"), 3, SORT_MANAGER)
|
||||||
val shuffleInfo2 = new ExecutorShuffleInfo(Array("/bippy"), 5, "hash")
|
val shuffleInfo2 = new ExecutorShuffleInfo(Array("/bippy"), 5, SORT_MANAGER)
|
||||||
|
|
||||||
val blockHandler = s1.blockHandler
|
val blockHandler = s1.blockHandler
|
||||||
val blockResolver = ShuffleTestAccessor.getBlockResolver(blockHandler)
|
val blockResolver = ShuffleTestAccessor.getBlockResolver(blockHandler)
|
||||||
|
@ -186,7 +187,7 @@ class YarnShuffleServiceSuite extends SparkFunSuite with Matchers with BeforeAnd
|
||||||
s1.initializeApplication(app1Data)
|
s1.initializeApplication(app1Data)
|
||||||
|
|
||||||
val execStateFile = s1.registeredExecutorFile
|
val execStateFile = s1.registeredExecutorFile
|
||||||
val shuffleInfo1 = new ExecutorShuffleInfo(Array("/foo", "/bar"), 3, "sort")
|
val shuffleInfo1 = new ExecutorShuffleInfo(Array("/foo", "/bar"), 3, SORT_MANAGER)
|
||||||
|
|
||||||
val blockHandler = s1.blockHandler
|
val blockHandler = s1.blockHandler
|
||||||
val blockResolver = ShuffleTestAccessor.getBlockResolver(blockHandler)
|
val blockResolver = ShuffleTestAccessor.getBlockResolver(blockHandler)
|
||||||
|
@ -218,7 +219,7 @@ class YarnShuffleServiceSuite extends SparkFunSuite with Matchers with BeforeAnd
|
||||||
val app2Data: ApplicationInitializationContext =
|
val app2Data: ApplicationInitializationContext =
|
||||||
new ApplicationInitializationContext("user", app2Id, null)
|
new ApplicationInitializationContext("user", app2Id, null)
|
||||||
s2.initializeApplication(app2Data)
|
s2.initializeApplication(app2Data)
|
||||||
val shuffleInfo2 = new ExecutorShuffleInfo(Array("/bippy"), 5, "hash")
|
val shuffleInfo2 = new ExecutorShuffleInfo(Array("/bippy"), 5, SORT_MANAGER)
|
||||||
resolver2.registerExecutor(app2Id.toString, "exec-2", shuffleInfo2)
|
resolver2.registerExecutor(app2Id.toString, "exec-2", shuffleInfo2)
|
||||||
ShuffleTestAccessor.getExecutorInfo(app2Id, "exec-2", resolver2) should be (Some(shuffleInfo2))
|
ShuffleTestAccessor.getExecutorInfo(app2Id, "exec-2", resolver2) should be (Some(shuffleInfo2))
|
||||||
s2.stop()
|
s2.stop()
|
||||||
|
|
Loading…
Reference in a new issue