diff --git a/common/network-common/src/main/java/org/apache/spark/network/util/TransportConf.java b/common/network-common/src/main/java/org/apache/spark/network/util/TransportConf.java index 8e7ecf500e..69b8b25454 100644 --- a/common/network-common/src/main/java/org/apache/spark/network/util/TransportConf.java +++ b/common/network-common/src/main/java/org/apache/spark/network/util/TransportConf.java @@ -378,13 +378,13 @@ public class TransportConf { * Class name of the implementation of MergedShuffleFileManager that merges the blocks * pushed to it when push-based shuffle is enabled. By default, push-based shuffle is disabled at * a cluster level because this configuration is set to - * 'org.apache.spark.network.shuffle.ExternalBlockHandler$NoOpMergedShuffleFileManager'. + * 'org.apache.spark.network.shuffle.NoOpMergedShuffleFileManager'. * To turn on push-based shuffle at a cluster level, set the configuration to * 'org.apache.spark.network.shuffle.RemoteBlockPushResolver'. */ public String mergedShuffleFileManagerImpl() { return conf.get("spark.shuffle.server.mergedShuffleFileManagerImpl", - "org.apache.spark.network.shuffle.ExternalBlockHandler$NoOpMergedShuffleFileManager"); + "org.apache.spark.network.shuffle.NoOpMergedShuffleFileManager"); } /** diff --git a/common/network-shuffle/src/main/java/org/apache/spark/network/shuffle/ExternalBlockHandler.java b/common/network-shuffle/src/main/java/org/apache/spark/network/shuffle/ExternalBlockHandler.java index 71741f2cba..1e413f6b2f 100644 --- a/common/network-shuffle/src/main/java/org/apache/spark/network/shuffle/ExternalBlockHandler.java +++ b/common/network-shuffle/src/main/java/org/apache/spark/network/shuffle/ExternalBlockHandler.java @@ -594,64 +594,6 @@ public class ExternalBlockHandler extends RpcHandler } } - /** - * Dummy implementation of merged shuffle file manager. Suitable for when push-based shuffle - * is not enabled. - * - * @since 3.1.0 - */ - public static class NoOpMergedShuffleFileManager implements MergedShuffleFileManager { - - // This constructor is needed because we use this constructor to instantiate an implementation - // of MergedShuffleFileManager using reflection. - // See YarnShuffleService#newMergedShuffleFileManagerInstance. - public NoOpMergedShuffleFileManager(TransportConf transportConf) {} - - @Override - public StreamCallbackWithID receiveBlockDataAsStream(PushBlockStream msg) { - throw new UnsupportedOperationException("Cannot handle shuffle block merge"); - } - - @Override - public MergeStatuses finalizeShuffleMerge(FinalizeShuffleMerge msg) throws IOException { - throw new UnsupportedOperationException("Cannot handle shuffle block merge"); - } - - @Override - public void registerExecutor(String appId, ExecutorShuffleInfo executorInfo) { - // No-Op. Do nothing. - } - - @Override - public void applicationRemoved(String appId, boolean cleanupLocalDirs) { - // No-Op. Do nothing. - } - - @Override - public ManagedBuffer getMergedBlockData( - String appId, - int shuffleId, - int shuffleMergeId, - int reduceId, - int chunkId) { - throw new UnsupportedOperationException("Cannot handle shuffle block merge"); - } - - @Override - public MergedBlockMeta getMergedBlockMeta( - String appId, - int shuffleId, - int shuffleMergeId, - int reduceId) { - throw new UnsupportedOperationException("Cannot handle shuffle block merge"); - } - - @Override - public String[] getMergedBlockDirs(String appId) { - throw new UnsupportedOperationException("Cannot handle shuffle block merge"); - } - } - @Override public void channelActive(TransportClient client) { metrics.activeConnections.inc(); diff --git a/common/network-shuffle/src/main/java/org/apache/spark/network/shuffle/NoOpMergedShuffleFileManager.java b/common/network-shuffle/src/main/java/org/apache/spark/network/shuffle/NoOpMergedShuffleFileManager.java new file mode 100644 index 0000000000..f47bfc3077 --- /dev/null +++ b/common/network-shuffle/src/main/java/org/apache/spark/network/shuffle/NoOpMergedShuffleFileManager.java @@ -0,0 +1,86 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.spark.network.shuffle; + +import java.io.IOException; + +import org.apache.spark.network.buffer.ManagedBuffer; +import org.apache.spark.network.client.StreamCallbackWithID; +import org.apache.spark.network.shuffle.protocol.ExecutorShuffleInfo; +import org.apache.spark.network.shuffle.protocol.FinalizeShuffleMerge; +import org.apache.spark.network.shuffle.protocol.MergeStatuses; +import org.apache.spark.network.shuffle.protocol.PushBlockStream; +import org.apache.spark.network.util.TransportConf; + +/** + * Dummy implementation of merged shuffle file manager. Suitable for when push-based shuffle + * is not enabled. + * + * @since 3.1.0 + */ +public class NoOpMergedShuffleFileManager implements MergedShuffleFileManager { + + // This constructor is needed because we use this constructor to instantiate an implementation + // of MergedShuffleFileManager using reflection. + // See YarnShuffleService#newMergedShuffleFileManagerInstance. + public NoOpMergedShuffleFileManager(TransportConf transportConf) {} + + @Override + public StreamCallbackWithID receiveBlockDataAsStream(PushBlockStream msg) { + throw new UnsupportedOperationException("Cannot handle shuffle block merge"); + } + + @Override + public MergeStatuses finalizeShuffleMerge(FinalizeShuffleMerge msg) throws IOException { + throw new UnsupportedOperationException("Cannot handle shuffle block merge"); + } + + @Override + public void registerExecutor(String appId, ExecutorShuffleInfo executorInfo) { + // No-Op. Do nothing. + } + + @Override + public void applicationRemoved(String appId, boolean cleanupLocalDirs) { + // No-Op. Do nothing. + } + + @Override + public ManagedBuffer getMergedBlockData( + String appId, + int shuffleId, + int shuffleMergeId, + int reduceId, + int chunkId) { + throw new UnsupportedOperationException("Cannot handle shuffle block merge"); + } + + @Override + public MergedBlockMeta getMergedBlockMeta( + String appId, + int shuffleId, + int shuffleMergeId, + int reduceId) { + throw new UnsupportedOperationException("Cannot handle shuffle block merge"); + } + + @Override + public String[] getMergedBlockDirs(String appId) { + throw new UnsupportedOperationException("Cannot handle shuffle block merge"); + } +} diff --git a/common/network-yarn/src/main/java/org/apache/spark/network/yarn/YarnShuffleService.java b/common/network-yarn/src/main/java/org/apache/spark/network/yarn/YarnShuffleService.java index cb6d5d0ca2..ac163692c4 100644 --- a/common/network-yarn/src/main/java/org/apache/spark/network/yarn/YarnShuffleService.java +++ b/common/network-yarn/src/main/java/org/apache/spark/network/yarn/YarnShuffleService.java @@ -43,6 +43,7 @@ import org.apache.hadoop.metrics2.lib.DefaultMetricsSystem; import org.apache.hadoop.yarn.api.records.ContainerId; import org.apache.hadoop.yarn.server.api.*; import org.apache.spark.network.shuffle.MergedShuffleFileManager; +import org.apache.spark.network.shuffle.NoOpMergedShuffleFileManager; import org.apache.spark.network.util.LevelDBProvider; import org.iq80.leveldb.DB; import org.iq80.leveldb.DBIterator; @@ -284,7 +285,7 @@ public class YarnShuffleService extends AuxiliaryService { return mergeManagerSubClazz.getConstructor(TransportConf.class).newInstance(conf); } catch (Exception e) { logger.error("Unable to create an instance of {}", mergeManagerImplClassName); - return new ExternalBlockHandler.NoOpMergedShuffleFileManager(conf); + return new NoOpMergedShuffleFileManager(conf); } } diff --git a/resource-managers/yarn/src/test/scala/org/apache/spark/network/yarn/YarnShuffleServiceSuite.scala b/resource-managers/yarn/src/test/scala/org/apache/spark/network/yarn/YarnShuffleServiceSuite.scala index fb4097304d..b2025aa349 100644 --- a/resource-managers/yarn/src/test/scala/org/apache/spark/network/yarn/YarnShuffleServiceSuite.scala +++ b/resource-managers/yarn/src/test/scala/org/apache/spark/network/yarn/YarnShuffleServiceSuite.scala @@ -43,7 +43,7 @@ import org.scalatest.matchers.should.Matchers._ import org.apache.spark.SecurityManager import org.apache.spark.SparkFunSuite import org.apache.spark.internal.config._ -import org.apache.spark.network.shuffle.{ExternalBlockHandler, RemoteBlockPushResolver, ShuffleTestAccessor} +import org.apache.spark.network.shuffle.{NoOpMergedShuffleFileManager, RemoteBlockPushResolver, ShuffleTestAccessor} import org.apache.spark.network.shuffle.protocol.ExecutorShuffleInfo import org.apache.spark.network.util.TransportConf import org.apache.spark.util.Utils @@ -434,9 +434,9 @@ class YarnShuffleServiceSuite extends SparkFunSuite with Matchers with BeforeAnd test("create default merged shuffle file manager instance") { val mockConf = mock(classOf[TransportConf]) when(mockConf.mergedShuffleFileManagerImpl).thenReturn( - "org.apache.spark.network.shuffle.ExternalBlockHandler$NoOpMergedShuffleFileManager") + "org.apache.spark.network.shuffle.NoOpMergedShuffleFileManager") val mergeMgr = YarnShuffleService.newMergedShuffleFileManagerInstance(mockConf) - assert(mergeMgr.isInstanceOf[ExternalBlockHandler.NoOpMergedShuffleFileManager]) + assert(mergeMgr.isInstanceOf[NoOpMergedShuffleFileManager]) } test("create remote block push resolver instance") { @@ -452,6 +452,6 @@ class YarnShuffleServiceSuite extends SparkFunSuite with Matchers with BeforeAnd when(mockConf.mergedShuffleFileManagerImpl).thenReturn( "org.apache.spark.network.shuffle.NotExistent") val mergeMgr = YarnShuffleService.newMergedShuffleFileManagerInstance(mockConf) - assert(mergeMgr.isInstanceOf[ExternalBlockHandler.NoOpMergedShuffleFileManager]) + assert(mergeMgr.isInstanceOf[NoOpMergedShuffleFileManager]) } }