[SPARK-27677][CORE] Disable by default fetching of disk persisted RDD blocks via external shuffle service

## What changes were proposed in this pull request?

In the PR the config `spark.shuffle.service.fetch.rdd.enabled` default is changed to **false** to avoid breaking any compatibility with older external shuffle service installations. As external shuffle service is deployed separately and disk persisted RDD block fetching had even introduced new network messages (`RemoveBlocks` and `BlocksRemoved`) and changed the behaviour of the already existing fetching: extended it for RDD blocks.

## How was this patch tested?

With existing unit tests.

Closes #24697 from attilapiros/minor-ext-shuffle-fetch-disabled.

Authored-by: “attilapiros” <piros.attila.zsolt@gmail.com>
Signed-off-by: Marcelo Vanzin <vanzin@cloudera.com>
This commit is contained in:
“attilapiros” 2019-05-24 11:58:26 -07:00 committed by Marcelo Vanzin
parent de13f70ce1
commit 1e87694f2b
4 changed files with 5 additions and 3 deletions

View file

@ -112,7 +112,7 @@ public class ExternalShuffleBlockResolver {
Executor directoryCleaner) throws IOException {
this.conf = conf;
this.rddFetchEnabled =
Boolean.valueOf(conf.get(Constants.SHUFFLE_SERVICE_FETCH_RDD_ENABLED, "true"));
Boolean.valueOf(conf.get(Constants.SHUFFLE_SERVICE_FETCH_RDD_ENABLED, "false"));
this.registeredExecutorFile = registeredExecutorFile;
String indexCacheSize = conf.get("spark.shuffle.service.index.cache.size", "100m");
CacheLoader<File, ShuffleIndexInformation> indexCacheLoader =

View file

@ -107,6 +107,7 @@ public class ExternalShuffleIntegrationSuite {
HashMap<String, String> config = new HashMap<>();
config.put("spark.shuffle.io.maxRetries", "0");
config.put(Constants.SHUFFLE_SERVICE_FETCH_RDD_ENABLED, "true");
conf = new TransportConf("shuffle", new MapConfigProvider(config));
handler = new ExternalShuffleBlockHandler(
new OneForOneStreamManager(),

View file

@ -376,7 +376,7 @@ package object config {
"persisted blocks are considered idle after " +
"'spark.dynamicAllocation.executorIdleTimeout' and will be released accordingly.")
.booleanConf
.createWithDefault(true)
.createWithDefault(false)
private[spark] val SHUFFLE_SERVICE_DB_ENABLED =
ConfigBuilder("spark.shuffle.service.db.enabled")

View file

@ -97,7 +97,8 @@ class ExternalShuffleServiceSuite extends ShuffleSuite with BeforeAndAfterAll wi
}
test("SPARK-25888: using external shuffle service fetching disk persisted blocks") {
sc = new SparkContext("local-cluster[1,1,1024]", "test", conf)
val confWithRddFetchEnabled = conf.clone.set(config.SHUFFLE_SERVICE_FETCH_RDD_ENABLED, true)
sc = new SparkContext("local-cluster[1,1,1024]", "test", confWithRddFetchEnabled)
sc.env.blockManager.externalShuffleServiceEnabled should equal(true)
sc.env.blockManager.shuffleClient.getClass should equal(classOf[ExternalShuffleClient])
try {