[SPARK-36460][SHUFFLE] Pull out NoOpMergedShuffleFileManager inner class outside
### What changes were proposed in this pull request?
Pull out NoOpMergedShuffleFileManager inner class outside. This is required since passing dollar sign ($) for the config (`spark.shuffle.server.mergedShuffleFileManagerImpl`) value can be an issue. Currently `spark.shuffle.server.mergedShuffleFileManagerImpl` is by default set to `org.apache.spark.network.shuffle.ExternalBlockHandler$NoOpMergedShuffleFileManager`. After this change the default value be set to `org.apache.spark.network.shuffle.NoOpMergedShuffleFileManager`
### Why are the changes needed?
Passing `$` for the config value can be an issue.
### Does this PR introduce _any_ user-facing change?
No
### How was this patch tested?
Modified existing unit tests.
Closes #33688 from venkata91/SPARK-36460.
Authored-by: Venkata krishnan Sowrirajan <vsowrirajan@linkedin.com>
Signed-off-by: Gengliang Wang <gengliang@apache.org>
(cherry picked from commit df0de83c46
)
Signed-off-by: Gengliang Wang <gengliang@apache.org>
This commit is contained in:
parent
38dd42a50c
commit
1a432fe6bb
|
@ -378,13 +378,13 @@ public class TransportConf {
|
||||||
* Class name of the implementation of MergedShuffleFileManager that merges the blocks
|
* Class name of the implementation of MergedShuffleFileManager that merges the blocks
|
||||||
* pushed to it when push-based shuffle is enabled. By default, push-based shuffle is disabled at
|
* pushed to it when push-based shuffle is enabled. By default, push-based shuffle is disabled at
|
||||||
* a cluster level because this configuration is set to
|
* a cluster level because this configuration is set to
|
||||||
* 'org.apache.spark.network.shuffle.ExternalBlockHandler$NoOpMergedShuffleFileManager'.
|
* 'org.apache.spark.network.shuffle.NoOpMergedShuffleFileManager'.
|
||||||
* To turn on push-based shuffle at a cluster level, set the configuration to
|
* To turn on push-based shuffle at a cluster level, set the configuration to
|
||||||
* 'org.apache.spark.network.shuffle.RemoteBlockPushResolver'.
|
* 'org.apache.spark.network.shuffle.RemoteBlockPushResolver'.
|
||||||
*/
|
*/
|
||||||
public String mergedShuffleFileManagerImpl() {
|
public String mergedShuffleFileManagerImpl() {
|
||||||
return conf.get("spark.shuffle.server.mergedShuffleFileManagerImpl",
|
return conf.get("spark.shuffle.server.mergedShuffleFileManagerImpl",
|
||||||
"org.apache.spark.network.shuffle.ExternalBlockHandler$NoOpMergedShuffleFileManager");
|
"org.apache.spark.network.shuffle.NoOpMergedShuffleFileManager");
|
||||||
}
|
}
|
||||||
|
|
||||||
/**
|
/**
|
||||||
|
|
|
@ -594,64 +594,6 @@ public class ExternalBlockHandler extends RpcHandler
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
/**
|
|
||||||
* Dummy implementation of merged shuffle file manager. Suitable for when push-based shuffle
|
|
||||||
* is not enabled.
|
|
||||||
*
|
|
||||||
* @since 3.1.0
|
|
||||||
*/
|
|
||||||
public static class NoOpMergedShuffleFileManager implements MergedShuffleFileManager {
|
|
||||||
|
|
||||||
// This constructor is needed because we use this constructor to instantiate an implementation
|
|
||||||
// of MergedShuffleFileManager using reflection.
|
|
||||||
// See YarnShuffleService#newMergedShuffleFileManagerInstance.
|
|
||||||
public NoOpMergedShuffleFileManager(TransportConf transportConf) {}
|
|
||||||
|
|
||||||
@Override
|
|
||||||
public StreamCallbackWithID receiveBlockDataAsStream(PushBlockStream msg) {
|
|
||||||
throw new UnsupportedOperationException("Cannot handle shuffle block merge");
|
|
||||||
}
|
|
||||||
|
|
||||||
@Override
|
|
||||||
public MergeStatuses finalizeShuffleMerge(FinalizeShuffleMerge msg) throws IOException {
|
|
||||||
throw new UnsupportedOperationException("Cannot handle shuffle block merge");
|
|
||||||
}
|
|
||||||
|
|
||||||
@Override
|
|
||||||
public void registerExecutor(String appId, ExecutorShuffleInfo executorInfo) {
|
|
||||||
// No-Op. Do nothing.
|
|
||||||
}
|
|
||||||
|
|
||||||
@Override
|
|
||||||
public void applicationRemoved(String appId, boolean cleanupLocalDirs) {
|
|
||||||
// No-Op. Do nothing.
|
|
||||||
}
|
|
||||||
|
|
||||||
@Override
|
|
||||||
public ManagedBuffer getMergedBlockData(
|
|
||||||
String appId,
|
|
||||||
int shuffleId,
|
|
||||||
int shuffleMergeId,
|
|
||||||
int reduceId,
|
|
||||||
int chunkId) {
|
|
||||||
throw new UnsupportedOperationException("Cannot handle shuffle block merge");
|
|
||||||
}
|
|
||||||
|
|
||||||
@Override
|
|
||||||
public MergedBlockMeta getMergedBlockMeta(
|
|
||||||
String appId,
|
|
||||||
int shuffleId,
|
|
||||||
int shuffleMergeId,
|
|
||||||
int reduceId) {
|
|
||||||
throw new UnsupportedOperationException("Cannot handle shuffle block merge");
|
|
||||||
}
|
|
||||||
|
|
||||||
@Override
|
|
||||||
public String[] getMergedBlockDirs(String appId) {
|
|
||||||
throw new UnsupportedOperationException("Cannot handle shuffle block merge");
|
|
||||||
}
|
|
||||||
}
|
|
||||||
|
|
||||||
@Override
|
@Override
|
||||||
public void channelActive(TransportClient client) {
|
public void channelActive(TransportClient client) {
|
||||||
metrics.activeConnections.inc();
|
metrics.activeConnections.inc();
|
||||||
|
|
|
@ -0,0 +1,86 @@
|
||||||
|
/*
|
||||||
|
* Licensed to the Apache Software Foundation (ASF) under one or more
|
||||||
|
* contributor license agreements. See the NOTICE file distributed with
|
||||||
|
* this work for additional information regarding copyright ownership.
|
||||||
|
* The ASF licenses this file to You under the Apache License, Version 2.0
|
||||||
|
* (the "License"); you may not use this file except in compliance with
|
||||||
|
* the License. You may obtain a copy of the License at
|
||||||
|
*
|
||||||
|
* http://www.apache.org/licenses/LICENSE-2.0
|
||||||
|
*
|
||||||
|
* Unless required by applicable law or agreed to in writing, software
|
||||||
|
* distributed under the License is distributed on an "AS IS" BASIS,
|
||||||
|
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
|
||||||
|
* See the License for the specific language governing permissions and
|
||||||
|
* limitations under the License.
|
||||||
|
*/
|
||||||
|
|
||||||
|
package org.apache.spark.network.shuffle;
|
||||||
|
|
||||||
|
import java.io.IOException;
|
||||||
|
|
||||||
|
import org.apache.spark.network.buffer.ManagedBuffer;
|
||||||
|
import org.apache.spark.network.client.StreamCallbackWithID;
|
||||||
|
import org.apache.spark.network.shuffle.protocol.ExecutorShuffleInfo;
|
||||||
|
import org.apache.spark.network.shuffle.protocol.FinalizeShuffleMerge;
|
||||||
|
import org.apache.spark.network.shuffle.protocol.MergeStatuses;
|
||||||
|
import org.apache.spark.network.shuffle.protocol.PushBlockStream;
|
||||||
|
import org.apache.spark.network.util.TransportConf;
|
||||||
|
|
||||||
|
/**
|
||||||
|
* Dummy implementation of merged shuffle file manager. Suitable for when push-based shuffle
|
||||||
|
* is not enabled.
|
||||||
|
*
|
||||||
|
* @since 3.1.0
|
||||||
|
*/
|
||||||
|
public class NoOpMergedShuffleFileManager implements MergedShuffleFileManager {
|
||||||
|
|
||||||
|
// This constructor is needed because we use this constructor to instantiate an implementation
|
||||||
|
// of MergedShuffleFileManager using reflection.
|
||||||
|
// See YarnShuffleService#newMergedShuffleFileManagerInstance.
|
||||||
|
public NoOpMergedShuffleFileManager(TransportConf transportConf) {}
|
||||||
|
|
||||||
|
@Override
|
||||||
|
public StreamCallbackWithID receiveBlockDataAsStream(PushBlockStream msg) {
|
||||||
|
throw new UnsupportedOperationException("Cannot handle shuffle block merge");
|
||||||
|
}
|
||||||
|
|
||||||
|
@Override
|
||||||
|
public MergeStatuses finalizeShuffleMerge(FinalizeShuffleMerge msg) throws IOException {
|
||||||
|
throw new UnsupportedOperationException("Cannot handle shuffle block merge");
|
||||||
|
}
|
||||||
|
|
||||||
|
@Override
|
||||||
|
public void registerExecutor(String appId, ExecutorShuffleInfo executorInfo) {
|
||||||
|
// No-Op. Do nothing.
|
||||||
|
}
|
||||||
|
|
||||||
|
@Override
|
||||||
|
public void applicationRemoved(String appId, boolean cleanupLocalDirs) {
|
||||||
|
// No-Op. Do nothing.
|
||||||
|
}
|
||||||
|
|
||||||
|
@Override
|
||||||
|
public ManagedBuffer getMergedBlockData(
|
||||||
|
String appId,
|
||||||
|
int shuffleId,
|
||||||
|
int shuffleMergeId,
|
||||||
|
int reduceId,
|
||||||
|
int chunkId) {
|
||||||
|
throw new UnsupportedOperationException("Cannot handle shuffle block merge");
|
||||||
|
}
|
||||||
|
|
||||||
|
@Override
|
||||||
|
public MergedBlockMeta getMergedBlockMeta(
|
||||||
|
String appId,
|
||||||
|
int shuffleId,
|
||||||
|
int shuffleMergeId,
|
||||||
|
int reduceId) {
|
||||||
|
throw new UnsupportedOperationException("Cannot handle shuffle block merge");
|
||||||
|
}
|
||||||
|
|
||||||
|
@Override
|
||||||
|
public String[] getMergedBlockDirs(String appId) {
|
||||||
|
throw new UnsupportedOperationException("Cannot handle shuffle block merge");
|
||||||
|
}
|
||||||
|
}
|
|
@ -43,6 +43,7 @@ import org.apache.hadoop.metrics2.lib.DefaultMetricsSystem;
|
||||||
import org.apache.hadoop.yarn.api.records.ContainerId;
|
import org.apache.hadoop.yarn.api.records.ContainerId;
|
||||||
import org.apache.hadoop.yarn.server.api.*;
|
import org.apache.hadoop.yarn.server.api.*;
|
||||||
import org.apache.spark.network.shuffle.MergedShuffleFileManager;
|
import org.apache.spark.network.shuffle.MergedShuffleFileManager;
|
||||||
|
import org.apache.spark.network.shuffle.NoOpMergedShuffleFileManager;
|
||||||
import org.apache.spark.network.util.LevelDBProvider;
|
import org.apache.spark.network.util.LevelDBProvider;
|
||||||
import org.iq80.leveldb.DB;
|
import org.iq80.leveldb.DB;
|
||||||
import org.iq80.leveldb.DBIterator;
|
import org.iq80.leveldb.DBIterator;
|
||||||
|
@ -284,7 +285,7 @@ public class YarnShuffleService extends AuxiliaryService {
|
||||||
return mergeManagerSubClazz.getConstructor(TransportConf.class).newInstance(conf);
|
return mergeManagerSubClazz.getConstructor(TransportConf.class).newInstance(conf);
|
||||||
} catch (Exception e) {
|
} catch (Exception e) {
|
||||||
logger.error("Unable to create an instance of {}", mergeManagerImplClassName);
|
logger.error("Unable to create an instance of {}", mergeManagerImplClassName);
|
||||||
return new ExternalBlockHandler.NoOpMergedShuffleFileManager(conf);
|
return new NoOpMergedShuffleFileManager(conf);
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
|
|
|
@ -43,7 +43,7 @@ import org.scalatest.matchers.should.Matchers._
|
||||||
import org.apache.spark.SecurityManager
|
import org.apache.spark.SecurityManager
|
||||||
import org.apache.spark.SparkFunSuite
|
import org.apache.spark.SparkFunSuite
|
||||||
import org.apache.spark.internal.config._
|
import org.apache.spark.internal.config._
|
||||||
import org.apache.spark.network.shuffle.{ExternalBlockHandler, RemoteBlockPushResolver, ShuffleTestAccessor}
|
import org.apache.spark.network.shuffle.{NoOpMergedShuffleFileManager, RemoteBlockPushResolver, ShuffleTestAccessor}
|
||||||
import org.apache.spark.network.shuffle.protocol.ExecutorShuffleInfo
|
import org.apache.spark.network.shuffle.protocol.ExecutorShuffleInfo
|
||||||
import org.apache.spark.network.util.TransportConf
|
import org.apache.spark.network.util.TransportConf
|
||||||
import org.apache.spark.util.Utils
|
import org.apache.spark.util.Utils
|
||||||
|
@ -434,9 +434,9 @@ class YarnShuffleServiceSuite extends SparkFunSuite with Matchers with BeforeAnd
|
||||||
test("create default merged shuffle file manager instance") {
|
test("create default merged shuffle file manager instance") {
|
||||||
val mockConf = mock(classOf[TransportConf])
|
val mockConf = mock(classOf[TransportConf])
|
||||||
when(mockConf.mergedShuffleFileManagerImpl).thenReturn(
|
when(mockConf.mergedShuffleFileManagerImpl).thenReturn(
|
||||||
"org.apache.spark.network.shuffle.ExternalBlockHandler$NoOpMergedShuffleFileManager")
|
"org.apache.spark.network.shuffle.NoOpMergedShuffleFileManager")
|
||||||
val mergeMgr = YarnShuffleService.newMergedShuffleFileManagerInstance(mockConf)
|
val mergeMgr = YarnShuffleService.newMergedShuffleFileManagerInstance(mockConf)
|
||||||
assert(mergeMgr.isInstanceOf[ExternalBlockHandler.NoOpMergedShuffleFileManager])
|
assert(mergeMgr.isInstanceOf[NoOpMergedShuffleFileManager])
|
||||||
}
|
}
|
||||||
|
|
||||||
test("create remote block push resolver instance") {
|
test("create remote block push resolver instance") {
|
||||||
|
@ -452,6 +452,6 @@ class YarnShuffleServiceSuite extends SparkFunSuite with Matchers with BeforeAnd
|
||||||
when(mockConf.mergedShuffleFileManagerImpl).thenReturn(
|
when(mockConf.mergedShuffleFileManagerImpl).thenReturn(
|
||||||
"org.apache.spark.network.shuffle.NotExistent")
|
"org.apache.spark.network.shuffle.NotExistent")
|
||||||
val mergeMgr = YarnShuffleService.newMergedShuffleFileManagerInstance(mockConf)
|
val mergeMgr = YarnShuffleService.newMergedShuffleFileManagerInstance(mockConf)
|
||||||
assert(mergeMgr.isInstanceOf[ExternalBlockHandler.NoOpMergedShuffleFileManager])
|
assert(mergeMgr.isInstanceOf[NoOpMergedShuffleFileManager])
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
Loading…
Reference in a new issue