From 2ebd1df3f17993f3cb472ec44c8832213976d99a Mon Sep 17 00:00:00 2001 From: Aaron Davidson Date: Sun, 2 Nov 2014 16:26:24 -0800 Subject: [PATCH] [SPARK-4183] Close transport-related resources between SparkContexts A leak of event loops may be causing test failures. Author: Aaron Davidson Closes #3053 from aarondav/leak and squashes the following commits: e676d18 [Aaron Davidson] Typo! 8f96475 [Aaron Davidson] Keep original ssc semantics 7e49f10 [Aaron Davidson] A leak of event loops may be causing test failures. --- .../scala/org/apache/spark/SparkEnv.scala | 2 +- .../netty/NettyBlockTransferService.scala | 5 ++- .../apache/spark/storage/BlockManager.scala | 4 +++ .../ExecutorAllocationManagerSuite.scala | 34 +++++++++---------- .../apache/spark/MapOutputTrackerSuite.scala | 21 +++++++++++- .../SparkContextSchedulerCreationSuite.scala | 6 ++-- .../flume/FlumePollingStreamSuite.scala | 15 +++++--- .../client/TransportClientFactory.java | 3 +- .../spark/network/server/TransportServer.java | 5 +-- .../shuffle/ExternalShuffleClient.java | 7 ++++ .../spark/network/shuffle/ShuffleClient.java | 4 ++- .../streaming/StreamingContextSuite.scala | 4 +++ 12 files changed, 78 insertions(+), 32 deletions(-) diff --git a/core/src/main/scala/org/apache/spark/SparkEnv.scala b/core/src/main/scala/org/apache/spark/SparkEnv.scala index 7fb2b91377..e2f13accdf 100644 --- a/core/src/main/scala/org/apache/spark/SparkEnv.scala +++ b/core/src/main/scala/org/apache/spark/SparkEnv.scala @@ -274,7 +274,7 @@ object SparkEnv extends Logging { val shuffleMemoryManager = new ShuffleMemoryManager(conf) val blockTransferService = - conf.get("spark.shuffle.blockTransferService", "nio").toLowerCase match { + conf.get("spark.shuffle.blockTransferService", "netty").toLowerCase match { case "netty" => new NettyBlockTransferService(conf) case "nio" => diff --git a/core/src/main/scala/org/apache/spark/network/netty/NettyBlockTransferService.scala b/core/src/main/scala/org/apache/spark/network/netty/NettyBlockTransferService.scala index ec3000e722..1c4327cf13 100644 --- a/core/src/main/scala/org/apache/spark/network/netty/NettyBlockTransferService.scala +++ b/core/src/main/scala/org/apache/spark/network/netty/NettyBlockTransferService.scala @@ -106,5 +106,8 @@ class NettyBlockTransferService(conf: SparkConf) extends BlockTransferService { result.future } - override def close(): Unit = server.close() + override def close(): Unit = { + server.close() + clientFactory.close() + } } diff --git a/core/src/main/scala/org/apache/spark/storage/BlockManager.scala b/core/src/main/scala/org/apache/spark/storage/BlockManager.scala index 1f8de28961..5f5dd0dc1c 100644 --- a/core/src/main/scala/org/apache/spark/storage/BlockManager.scala +++ b/core/src/main/scala/org/apache/spark/storage/BlockManager.scala @@ -1178,6 +1178,10 @@ private[spark] class BlockManager( def stop(): Unit = { blockTransferService.close() + if (shuffleClient ne blockTransferService) { + // Closing should be idempotent, but maybe not for the NioBlockTransferService. + shuffleClient.close() + } diskBlockManager.stop() actorSystem.stop(slaveActor) blockInfo.clear() diff --git a/core/src/test/scala/org/apache/spark/ExecutorAllocationManagerSuite.scala b/core/src/test/scala/org/apache/spark/ExecutorAllocationManagerSuite.scala index f0aa914cfe..66cf60d25f 100644 --- a/core/src/test/scala/org/apache/spark/ExecutorAllocationManagerSuite.scala +++ b/core/src/test/scala/org/apache/spark/ExecutorAllocationManagerSuite.scala @@ -25,7 +25,7 @@ import org.apache.spark.storage.BlockManagerId /** * Test add and remove behavior of ExecutorAllocationManager. */ -class ExecutorAllocationManagerSuite extends FunSuite { +class ExecutorAllocationManagerSuite extends FunSuite with LocalSparkContext { import ExecutorAllocationManager._ import ExecutorAllocationManagerSuite._ @@ -36,17 +36,21 @@ class ExecutorAllocationManagerSuite extends FunSuite { .setAppName("test-executor-allocation-manager") .set("spark.dynamicAllocation.enabled", "true") intercept[SparkException] { new SparkContext(conf) } + SparkEnv.get.stop() // cleanup the created environment // Only min val conf1 = conf.clone().set("spark.dynamicAllocation.minExecutors", "1") intercept[SparkException] { new SparkContext(conf1) } + SparkEnv.get.stop() // Only max val conf2 = conf.clone().set("spark.dynamicAllocation.maxExecutors", "2") intercept[SparkException] { new SparkContext(conf2) } + SparkEnv.get.stop() // Both min and max, but min > max intercept[SparkException] { createSparkContext(2, 1) } + SparkEnv.get.stop() // Both min and max, and min == max val sc1 = createSparkContext(1, 1) @@ -60,18 +64,17 @@ class ExecutorAllocationManagerSuite extends FunSuite { } test("starting state") { - val sc = createSparkContext() + sc = createSparkContext() val manager = sc.executorAllocationManager.get assert(numExecutorsPending(manager) === 0) assert(executorsPendingToRemove(manager).isEmpty) assert(executorIds(manager).isEmpty) assert(addTime(manager) === ExecutorAllocationManager.NOT_SET) assert(removeTimes(manager).isEmpty) - sc.stop() } test("add executors") { - val sc = createSparkContext(1, 10) + sc = createSparkContext(1, 10) val manager = sc.executorAllocationManager.get // Keep adding until the limit is reached @@ -112,11 +115,10 @@ class ExecutorAllocationManagerSuite extends FunSuite { assert(addExecutors(manager) === 0) assert(numExecutorsPending(manager) === 6) assert(numExecutorsToAdd(manager) === 1) - sc.stop() } test("remove executors") { - val sc = createSparkContext(5, 10) + sc = createSparkContext(5, 10) val manager = sc.executorAllocationManager.get (1 to 10).map(_.toString).foreach { id => onExecutorAdded(manager, id) } @@ -163,11 +165,10 @@ class ExecutorAllocationManagerSuite extends FunSuite { assert(executorsPendingToRemove(manager).isEmpty) assert(!removeExecutor(manager, "8")) assert(executorsPendingToRemove(manager).isEmpty) - sc.stop() } test ("interleaving add and remove") { - val sc = createSparkContext(5, 10) + sc = createSparkContext(5, 10) val manager = sc.executorAllocationManager.get // Add a few executors @@ -232,11 +233,10 @@ class ExecutorAllocationManagerSuite extends FunSuite { onExecutorAdded(manager, "15") onExecutorAdded(manager, "16") assert(executorIds(manager).size === 10) - sc.stop() } test("starting/canceling add timer") { - val sc = createSparkContext(2, 10) + sc = createSparkContext(2, 10) val clock = new TestClock(8888L) val manager = sc.executorAllocationManager.get manager.setClock(clock) @@ -268,7 +268,7 @@ class ExecutorAllocationManagerSuite extends FunSuite { } test("starting/canceling remove timers") { - val sc = createSparkContext(2, 10) + sc = createSparkContext(2, 10) val clock = new TestClock(14444L) val manager = sc.executorAllocationManager.get manager.setClock(clock) @@ -313,7 +313,7 @@ class ExecutorAllocationManagerSuite extends FunSuite { } test("mock polling loop with no events") { - val sc = createSparkContext(1, 20) + sc = createSparkContext(1, 20) val manager = sc.executorAllocationManager.get val clock = new TestClock(2020L) manager.setClock(clock) @@ -339,7 +339,7 @@ class ExecutorAllocationManagerSuite extends FunSuite { } test("mock polling loop add behavior") { - val sc = createSparkContext(1, 20) + sc = createSparkContext(1, 20) val clock = new TestClock(2020L) val manager = sc.executorAllocationManager.get manager.setClock(clock) @@ -388,7 +388,7 @@ class ExecutorAllocationManagerSuite extends FunSuite { } test("mock polling loop remove behavior") { - val sc = createSparkContext(1, 20) + sc = createSparkContext(1, 20) val clock = new TestClock(2020L) val manager = sc.executorAllocationManager.get manager.setClock(clock) @@ -449,7 +449,7 @@ class ExecutorAllocationManagerSuite extends FunSuite { } test("listeners trigger add executors correctly") { - val sc = createSparkContext(2, 10) + sc = createSparkContext(2, 10) val manager = sc.executorAllocationManager.get assert(addTime(manager) === NOT_SET) @@ -479,7 +479,7 @@ class ExecutorAllocationManagerSuite extends FunSuite { } test("listeners trigger remove executors correctly") { - val sc = createSparkContext(2, 10) + sc = createSparkContext(2, 10) val manager = sc.executorAllocationManager.get assert(removeTimes(manager).isEmpty) @@ -510,7 +510,7 @@ class ExecutorAllocationManagerSuite extends FunSuite { } test("listeners trigger add and remove executor callbacks correctly") { - val sc = createSparkContext(2, 10) + sc = createSparkContext(2, 10) val manager = sc.executorAllocationManager.get assert(executorIds(manager).isEmpty) assert(removeTimes(manager).isEmpty) diff --git a/core/src/test/scala/org/apache/spark/MapOutputTrackerSuite.scala b/core/src/test/scala/org/apache/spark/MapOutputTrackerSuite.scala index cbc0bd178d..d27880f4bc 100644 --- a/core/src/test/scala/org/apache/spark/MapOutputTrackerSuite.scala +++ b/core/src/test/scala/org/apache/spark/MapOutputTrackerSuite.scala @@ -28,7 +28,7 @@ import org.apache.spark.shuffle.FetchFailedException import org.apache.spark.storage.BlockManagerId import org.apache.spark.util.AkkaUtils -class MapOutputTrackerSuite extends FunSuite with LocalSparkContext { +class MapOutputTrackerSuite extends FunSuite { private val conf = new SparkConf test("master start and stop") { @@ -37,6 +37,7 @@ class MapOutputTrackerSuite extends FunSuite with LocalSparkContext { tracker.trackerActor = actorSystem.actorOf(Props(new MapOutputTrackerMasterActor(tracker, conf))) tracker.stop() + actorSystem.shutdown() } test("master register shuffle and fetch") { @@ -56,6 +57,7 @@ class MapOutputTrackerSuite extends FunSuite with LocalSparkContext { assert(statuses.toSeq === Seq((BlockManagerId("a", "hostA", 1000), size1000), (BlockManagerId("b", "hostB", 1000), size10000))) tracker.stop() + actorSystem.shutdown() } test("master register and unregister shuffle") { @@ -74,6 +76,9 @@ class MapOutputTrackerSuite extends FunSuite with LocalSparkContext { tracker.unregisterShuffle(10) assert(!tracker.containsShuffle(10)) assert(tracker.getServerStatuses(10, 0).isEmpty) + + tracker.stop() + actorSystem.shutdown() } test("master register shuffle and unregister map output and fetch") { @@ -97,6 +102,9 @@ class MapOutputTrackerSuite extends FunSuite with LocalSparkContext { // this should cause it to fail, and the scheduler will ignore the failure due to the // stage already being aborted. intercept[FetchFailedException] { tracker.getServerStatuses(10, 1) } + + tracker.stop() + actorSystem.shutdown() } test("remote fetch") { @@ -136,6 +144,11 @@ class MapOutputTrackerSuite extends FunSuite with LocalSparkContext { // failure should be cached intercept[FetchFailedException] { slaveTracker.getServerStatuses(10, 0) } + + masterTracker.stop() + slaveTracker.stop() + actorSystem.shutdown() + slaveSystem.shutdown() } test("remote fetch below akka frame size") { @@ -154,6 +167,9 @@ class MapOutputTrackerSuite extends FunSuite with LocalSparkContext { masterTracker.registerMapOutput(10, 0, MapStatus( BlockManagerId("88", "mph", 1000), Array.fill[Long](10)(0))) masterActor.receive(GetMapOutputStatuses(10)) + +// masterTracker.stop() // this throws an exception + actorSystem.shutdown() } test("remote fetch exceeds akka frame size") { @@ -176,5 +192,8 @@ class MapOutputTrackerSuite extends FunSuite with LocalSparkContext { BlockManagerId("999", "mps", 1000), Array.fill[Long](4000000)(0))) } intercept[SparkException] { masterActor.receive(GetMapOutputStatuses(20)) } + +// masterTracker.stop() // this throws an exception + actorSystem.shutdown() } } diff --git a/core/src/test/scala/org/apache/spark/SparkContextSchedulerCreationSuite.scala b/core/src/test/scala/org/apache/spark/SparkContextSchedulerCreationSuite.scala index df237ba796..0390a2e4f1 100644 --- a/core/src/test/scala/org/apache/spark/SparkContextSchedulerCreationSuite.scala +++ b/core/src/test/scala/org/apache/spark/SparkContextSchedulerCreationSuite.scala @@ -17,7 +17,7 @@ package org.apache.spark -import org.scalatest.{BeforeAndAfterEach, FunSuite, PrivateMethodTester} +import org.scalatest.{FunSuite, PrivateMethodTester} import org.apache.spark.scheduler.{SchedulerBackend, TaskScheduler, TaskSchedulerImpl} import org.apache.spark.scheduler.cluster.{SimrSchedulerBackend, SparkDeploySchedulerBackend} @@ -25,12 +25,12 @@ import org.apache.spark.scheduler.cluster.mesos.{CoarseMesosSchedulerBackend, Me import org.apache.spark.scheduler.local.LocalBackend class SparkContextSchedulerCreationSuite - extends FunSuite with PrivateMethodTester with Logging with BeforeAndAfterEach { + extends FunSuite with LocalSparkContext with PrivateMethodTester with Logging { def createTaskScheduler(master: String): TaskSchedulerImpl = { // Create local SparkContext to setup a SparkEnv. We don't actually want to start() the // real schedulers, so we don't want to create a full SparkContext with the desired scheduler. - val sc = new SparkContext("local", "test") + sc = new SparkContext("local", "test") val createTaskSchedulerMethod = PrivateMethod[Tuple2[SchedulerBackend, TaskScheduler]]('createTaskScheduler) val (_, sched) = SparkContext invokePrivate createTaskSchedulerMethod(sc, master) diff --git a/external/flume/src/test/scala/org/apache/spark/streaming/flume/FlumePollingStreamSuite.scala b/external/flume/src/test/scala/org/apache/spark/streaming/flume/FlumePollingStreamSuite.scala index 32a19787a2..475026e8eb 100644 --- a/external/flume/src/test/scala/org/apache/spark/streaming/flume/FlumePollingStreamSuite.scala +++ b/external/flume/src/test/scala/org/apache/spark/streaming/flume/FlumePollingStreamSuite.scala @@ -145,11 +145,16 @@ class FlumePollingStreamSuite extends TestSuiteBase { outputStream.register() ssc.start() - writeAndVerify(Seq(channel, channel2), ssc, outputBuffer) - assertChannelIsEmpty(channel) - assertChannelIsEmpty(channel2) - sink.stop() - channel.stop() + try { + writeAndVerify(Seq(channel, channel2), ssc, outputBuffer) + assertChannelIsEmpty(channel) + assertChannelIsEmpty(channel2) + } finally { + sink.stop() + sink2.stop() + channel.stop() + channel2.stop() + } } def writeAndVerify(channels: Seq[MemoryChannel], ssc: StreamingContext, diff --git a/network/common/src/main/java/org/apache/spark/network/client/TransportClientFactory.java b/network/common/src/main/java/org/apache/spark/network/client/TransportClientFactory.java index e7fa4f6bf3..0b4a1d8286 100644 --- a/network/common/src/main/java/org/apache/spark/network/client/TransportClientFactory.java +++ b/network/common/src/main/java/org/apache/spark/network/client/TransportClientFactory.java @@ -58,7 +58,7 @@ public class TransportClientFactory implements Closeable { private final ConcurrentHashMap connectionPool; private final Class socketChannelClass; - private final EventLoopGroup workerGroup; + private EventLoopGroup workerGroup; public TransportClientFactory(TransportContext context) { this.context = context; @@ -150,6 +150,7 @@ public class TransportClientFactory implements Closeable { if (workerGroup != null) { workerGroup.shutdownGracefully(); + workerGroup = null; } } diff --git a/network/common/src/main/java/org/apache/spark/network/server/TransportServer.java b/network/common/src/main/java/org/apache/spark/network/server/TransportServer.java index d1a1877a98..70da48ca8e 100644 --- a/network/common/src/main/java/org/apache/spark/network/server/TransportServer.java +++ b/network/common/src/main/java/org/apache/spark/network/server/TransportServer.java @@ -49,6 +49,7 @@ public class TransportServer implements Closeable { private ChannelFuture channelFuture; private int port = -1; + /** Creates a TransportServer that binds to the given port, or to any available if 0. */ public TransportServer(TransportContext context, int portToBind) { this.context = context; this.conf = context.getConf(); @@ -67,7 +68,7 @@ public class TransportServer implements Closeable { IOMode ioMode = IOMode.valueOf(conf.ioMode()); EventLoopGroup bossGroup = - NettyUtils.createEventLoop(ioMode, conf.serverThreads(), "shuffle-server"); + NettyUtils.createEventLoop(ioMode, conf.serverThreads(), "shuffle-server"); EventLoopGroup workerGroup = bossGroup; bootstrap = new ServerBootstrap() @@ -105,7 +106,7 @@ public class TransportServer implements Closeable { @Override public void close() { if (channelFuture != null) { - // close is a local operation and should finish with milliseconds; timeout just to be safe + // close is a local operation and should finish within milliseconds; timeout just to be safe channelFuture.channel().close().awaitUninterruptibly(10, TimeUnit.SECONDS); channelFuture = null; } diff --git a/network/shuffle/src/main/java/org/apache/spark/network/shuffle/ExternalShuffleClient.java b/network/shuffle/src/main/java/org/apache/spark/network/shuffle/ExternalShuffleClient.java index cc2f6261ca..6bbabc44b9 100644 --- a/network/shuffle/src/main/java/org/apache/spark/network/shuffle/ExternalShuffleClient.java +++ b/network/shuffle/src/main/java/org/apache/spark/network/shuffle/ExternalShuffleClient.java @@ -17,6 +17,8 @@ package org.apache.spark.network.shuffle; +import java.io.Closeable; + import org.slf4j.Logger; import org.slf4j.LoggerFactory; @@ -85,4 +87,9 @@ public class ExternalShuffleClient implements ShuffleClient { JavaUtils.serialize(new RegisterExecutor(appId, execId, executorInfo)); client.sendRpcSync(registerExecutorMessage, 5000 /* timeoutMs */); } + + @Override + public void close() { + clientFactory.close(); + } } diff --git a/network/shuffle/src/main/java/org/apache/spark/network/shuffle/ShuffleClient.java b/network/shuffle/src/main/java/org/apache/spark/network/shuffle/ShuffleClient.java index 9fa87c2c6e..d46a562394 100644 --- a/network/shuffle/src/main/java/org/apache/spark/network/shuffle/ShuffleClient.java +++ b/network/shuffle/src/main/java/org/apache/spark/network/shuffle/ShuffleClient.java @@ -17,8 +17,10 @@ package org.apache.spark.network.shuffle; +import java.io.Closeable; + /** Provides an interface for reading shuffle files, either from an Executor or external service. */ -public interface ShuffleClient { +public interface ShuffleClient extends Closeable { /** * Fetch a sequence of blocks from a remote node asynchronously, * diff --git a/streaming/src/test/scala/org/apache/spark/streaming/StreamingContextSuite.scala b/streaming/src/test/scala/org/apache/spark/streaming/StreamingContextSuite.scala index 655cec1573..f47772947d 100644 --- a/streaming/src/test/scala/org/apache/spark/streaming/StreamingContextSuite.scala +++ b/streaming/src/test/scala/org/apache/spark/streaming/StreamingContextSuite.scala @@ -46,6 +46,10 @@ class StreamingContextSuite extends FunSuite with BeforeAndAfter with Timeouts w after { if (ssc != null) { ssc.stop() + if (ssc.sc != null) { + // Calling ssc.stop() does not always stop the associated SparkContext. + ssc.sc.stop() + } ssc = null } if (sc != null) {