[SPARK-4183] Close transport-related resources between SparkContexts

A leak of event loops may be causing test failures.

Author: Aaron Davidson <aaron@databricks.com>

Closes #3053 from aarondav/leak and squashes the following commits:

e676d18 [Aaron Davidson] Typo!
8f96475 [Aaron Davidson] Keep original ssc semantics
7e49f10 [Aaron Davidson] A leak of event loops may be causing test failures.
This commit is contained in:
Aaron Davidson 2014-11-02 16:26:24 -08:00 committed by Patrick Wendell
parent 9081b9f9f7
commit 2ebd1df3f1
12 changed files with 78 additions and 32 deletions

View file

@ -274,7 +274,7 @@ object SparkEnv extends Logging {
val shuffleMemoryManager = new ShuffleMemoryManager(conf) val shuffleMemoryManager = new ShuffleMemoryManager(conf)
val blockTransferService = val blockTransferService =
conf.get("spark.shuffle.blockTransferService", "nio").toLowerCase match { conf.get("spark.shuffle.blockTransferService", "netty").toLowerCase match {
case "netty" => case "netty" =>
new NettyBlockTransferService(conf) new NettyBlockTransferService(conf)
case "nio" => case "nio" =>

View file

@ -106,5 +106,8 @@ class NettyBlockTransferService(conf: SparkConf) extends BlockTransferService {
result.future result.future
} }
override def close(): Unit = server.close() override def close(): Unit = {
server.close()
clientFactory.close()
}
} }

View file

@ -1178,6 +1178,10 @@ private[spark] class BlockManager(
def stop(): Unit = { def stop(): Unit = {
blockTransferService.close() blockTransferService.close()
if (shuffleClient ne blockTransferService) {
// Closing should be idempotent, but maybe not for the NioBlockTransferService.
shuffleClient.close()
}
diskBlockManager.stop() diskBlockManager.stop()
actorSystem.stop(slaveActor) actorSystem.stop(slaveActor)
blockInfo.clear() blockInfo.clear()

View file

@ -25,7 +25,7 @@ import org.apache.spark.storage.BlockManagerId
/** /**
* Test add and remove behavior of ExecutorAllocationManager. * Test add and remove behavior of ExecutorAllocationManager.
*/ */
class ExecutorAllocationManagerSuite extends FunSuite { class ExecutorAllocationManagerSuite extends FunSuite with LocalSparkContext {
import ExecutorAllocationManager._ import ExecutorAllocationManager._
import ExecutorAllocationManagerSuite._ import ExecutorAllocationManagerSuite._
@ -36,17 +36,21 @@ class ExecutorAllocationManagerSuite extends FunSuite {
.setAppName("test-executor-allocation-manager") .setAppName("test-executor-allocation-manager")
.set("spark.dynamicAllocation.enabled", "true") .set("spark.dynamicAllocation.enabled", "true")
intercept[SparkException] { new SparkContext(conf) } intercept[SparkException] { new SparkContext(conf) }
SparkEnv.get.stop() // cleanup the created environment
// Only min // Only min
val conf1 = conf.clone().set("spark.dynamicAllocation.minExecutors", "1") val conf1 = conf.clone().set("spark.dynamicAllocation.minExecutors", "1")
intercept[SparkException] { new SparkContext(conf1) } intercept[SparkException] { new SparkContext(conf1) }
SparkEnv.get.stop()
// Only max // Only max
val conf2 = conf.clone().set("spark.dynamicAllocation.maxExecutors", "2") val conf2 = conf.clone().set("spark.dynamicAllocation.maxExecutors", "2")
intercept[SparkException] { new SparkContext(conf2) } intercept[SparkException] { new SparkContext(conf2) }
SparkEnv.get.stop()
// Both min and max, but min > max // Both min and max, but min > max
intercept[SparkException] { createSparkContext(2, 1) } intercept[SparkException] { createSparkContext(2, 1) }
SparkEnv.get.stop()
// Both min and max, and min == max // Both min and max, and min == max
val sc1 = createSparkContext(1, 1) val sc1 = createSparkContext(1, 1)
@ -60,18 +64,17 @@ class ExecutorAllocationManagerSuite extends FunSuite {
} }
test("starting state") { test("starting state") {
val sc = createSparkContext() sc = createSparkContext()
val manager = sc.executorAllocationManager.get val manager = sc.executorAllocationManager.get
assert(numExecutorsPending(manager) === 0) assert(numExecutorsPending(manager) === 0)
assert(executorsPendingToRemove(manager).isEmpty) assert(executorsPendingToRemove(manager).isEmpty)
assert(executorIds(manager).isEmpty) assert(executorIds(manager).isEmpty)
assert(addTime(manager) === ExecutorAllocationManager.NOT_SET) assert(addTime(manager) === ExecutorAllocationManager.NOT_SET)
assert(removeTimes(manager).isEmpty) assert(removeTimes(manager).isEmpty)
sc.stop()
} }
test("add executors") { test("add executors") {
val sc = createSparkContext(1, 10) sc = createSparkContext(1, 10)
val manager = sc.executorAllocationManager.get val manager = sc.executorAllocationManager.get
// Keep adding until the limit is reached // Keep adding until the limit is reached
@ -112,11 +115,10 @@ class ExecutorAllocationManagerSuite extends FunSuite {
assert(addExecutors(manager) === 0) assert(addExecutors(manager) === 0)
assert(numExecutorsPending(manager) === 6) assert(numExecutorsPending(manager) === 6)
assert(numExecutorsToAdd(manager) === 1) assert(numExecutorsToAdd(manager) === 1)
sc.stop()
} }
test("remove executors") { test("remove executors") {
val sc = createSparkContext(5, 10) sc = createSparkContext(5, 10)
val manager = sc.executorAllocationManager.get val manager = sc.executorAllocationManager.get
(1 to 10).map(_.toString).foreach { id => onExecutorAdded(manager, id) } (1 to 10).map(_.toString).foreach { id => onExecutorAdded(manager, id) }
@ -163,11 +165,10 @@ class ExecutorAllocationManagerSuite extends FunSuite {
assert(executorsPendingToRemove(manager).isEmpty) assert(executorsPendingToRemove(manager).isEmpty)
assert(!removeExecutor(manager, "8")) assert(!removeExecutor(manager, "8"))
assert(executorsPendingToRemove(manager).isEmpty) assert(executorsPendingToRemove(manager).isEmpty)
sc.stop()
} }
test ("interleaving add and remove") { test ("interleaving add and remove") {
val sc = createSparkContext(5, 10) sc = createSparkContext(5, 10)
val manager = sc.executorAllocationManager.get val manager = sc.executorAllocationManager.get
// Add a few executors // Add a few executors
@ -232,11 +233,10 @@ class ExecutorAllocationManagerSuite extends FunSuite {
onExecutorAdded(manager, "15") onExecutorAdded(manager, "15")
onExecutorAdded(manager, "16") onExecutorAdded(manager, "16")
assert(executorIds(manager).size === 10) assert(executorIds(manager).size === 10)
sc.stop()
} }
test("starting/canceling add timer") { test("starting/canceling add timer") {
val sc = createSparkContext(2, 10) sc = createSparkContext(2, 10)
val clock = new TestClock(8888L) val clock = new TestClock(8888L)
val manager = sc.executorAllocationManager.get val manager = sc.executorAllocationManager.get
manager.setClock(clock) manager.setClock(clock)
@ -268,7 +268,7 @@ class ExecutorAllocationManagerSuite extends FunSuite {
} }
test("starting/canceling remove timers") { test("starting/canceling remove timers") {
val sc = createSparkContext(2, 10) sc = createSparkContext(2, 10)
val clock = new TestClock(14444L) val clock = new TestClock(14444L)
val manager = sc.executorAllocationManager.get val manager = sc.executorAllocationManager.get
manager.setClock(clock) manager.setClock(clock)
@ -313,7 +313,7 @@ class ExecutorAllocationManagerSuite extends FunSuite {
} }
test("mock polling loop with no events") { test("mock polling loop with no events") {
val sc = createSparkContext(1, 20) sc = createSparkContext(1, 20)
val manager = sc.executorAllocationManager.get val manager = sc.executorAllocationManager.get
val clock = new TestClock(2020L) val clock = new TestClock(2020L)
manager.setClock(clock) manager.setClock(clock)
@ -339,7 +339,7 @@ class ExecutorAllocationManagerSuite extends FunSuite {
} }
test("mock polling loop add behavior") { test("mock polling loop add behavior") {
val sc = createSparkContext(1, 20) sc = createSparkContext(1, 20)
val clock = new TestClock(2020L) val clock = new TestClock(2020L)
val manager = sc.executorAllocationManager.get val manager = sc.executorAllocationManager.get
manager.setClock(clock) manager.setClock(clock)
@ -388,7 +388,7 @@ class ExecutorAllocationManagerSuite extends FunSuite {
} }
test("mock polling loop remove behavior") { test("mock polling loop remove behavior") {
val sc = createSparkContext(1, 20) sc = createSparkContext(1, 20)
val clock = new TestClock(2020L) val clock = new TestClock(2020L)
val manager = sc.executorAllocationManager.get val manager = sc.executorAllocationManager.get
manager.setClock(clock) manager.setClock(clock)
@ -449,7 +449,7 @@ class ExecutorAllocationManagerSuite extends FunSuite {
} }
test("listeners trigger add executors correctly") { test("listeners trigger add executors correctly") {
val sc = createSparkContext(2, 10) sc = createSparkContext(2, 10)
val manager = sc.executorAllocationManager.get val manager = sc.executorAllocationManager.get
assert(addTime(manager) === NOT_SET) assert(addTime(manager) === NOT_SET)
@ -479,7 +479,7 @@ class ExecutorAllocationManagerSuite extends FunSuite {
} }
test("listeners trigger remove executors correctly") { test("listeners trigger remove executors correctly") {
val sc = createSparkContext(2, 10) sc = createSparkContext(2, 10)
val manager = sc.executorAllocationManager.get val manager = sc.executorAllocationManager.get
assert(removeTimes(manager).isEmpty) assert(removeTimes(manager).isEmpty)
@ -510,7 +510,7 @@ class ExecutorAllocationManagerSuite extends FunSuite {
} }
test("listeners trigger add and remove executor callbacks correctly") { test("listeners trigger add and remove executor callbacks correctly") {
val sc = createSparkContext(2, 10) sc = createSparkContext(2, 10)
val manager = sc.executorAllocationManager.get val manager = sc.executorAllocationManager.get
assert(executorIds(manager).isEmpty) assert(executorIds(manager).isEmpty)
assert(removeTimes(manager).isEmpty) assert(removeTimes(manager).isEmpty)

View file

@ -28,7 +28,7 @@ import org.apache.spark.shuffle.FetchFailedException
import org.apache.spark.storage.BlockManagerId import org.apache.spark.storage.BlockManagerId
import org.apache.spark.util.AkkaUtils import org.apache.spark.util.AkkaUtils
class MapOutputTrackerSuite extends FunSuite with LocalSparkContext { class MapOutputTrackerSuite extends FunSuite {
private val conf = new SparkConf private val conf = new SparkConf
test("master start and stop") { test("master start and stop") {
@ -37,6 +37,7 @@ class MapOutputTrackerSuite extends FunSuite with LocalSparkContext {
tracker.trackerActor = tracker.trackerActor =
actorSystem.actorOf(Props(new MapOutputTrackerMasterActor(tracker, conf))) actorSystem.actorOf(Props(new MapOutputTrackerMasterActor(tracker, conf)))
tracker.stop() tracker.stop()
actorSystem.shutdown()
} }
test("master register shuffle and fetch") { test("master register shuffle and fetch") {
@ -56,6 +57,7 @@ class MapOutputTrackerSuite extends FunSuite with LocalSparkContext {
assert(statuses.toSeq === Seq((BlockManagerId("a", "hostA", 1000), size1000), assert(statuses.toSeq === Seq((BlockManagerId("a", "hostA", 1000), size1000),
(BlockManagerId("b", "hostB", 1000), size10000))) (BlockManagerId("b", "hostB", 1000), size10000)))
tracker.stop() tracker.stop()
actorSystem.shutdown()
} }
test("master register and unregister shuffle") { test("master register and unregister shuffle") {
@ -74,6 +76,9 @@ class MapOutputTrackerSuite extends FunSuite with LocalSparkContext {
tracker.unregisterShuffle(10) tracker.unregisterShuffle(10)
assert(!tracker.containsShuffle(10)) assert(!tracker.containsShuffle(10))
assert(tracker.getServerStatuses(10, 0).isEmpty) assert(tracker.getServerStatuses(10, 0).isEmpty)
tracker.stop()
actorSystem.shutdown()
} }
test("master register shuffle and unregister map output and fetch") { test("master register shuffle and unregister map output and fetch") {
@ -97,6 +102,9 @@ class MapOutputTrackerSuite extends FunSuite with LocalSparkContext {
// this should cause it to fail, and the scheduler will ignore the failure due to the // this should cause it to fail, and the scheduler will ignore the failure due to the
// stage already being aborted. // stage already being aborted.
intercept[FetchFailedException] { tracker.getServerStatuses(10, 1) } intercept[FetchFailedException] { tracker.getServerStatuses(10, 1) }
tracker.stop()
actorSystem.shutdown()
} }
test("remote fetch") { test("remote fetch") {
@ -136,6 +144,11 @@ class MapOutputTrackerSuite extends FunSuite with LocalSparkContext {
// failure should be cached // failure should be cached
intercept[FetchFailedException] { slaveTracker.getServerStatuses(10, 0) } intercept[FetchFailedException] { slaveTracker.getServerStatuses(10, 0) }
masterTracker.stop()
slaveTracker.stop()
actorSystem.shutdown()
slaveSystem.shutdown()
} }
test("remote fetch below akka frame size") { test("remote fetch below akka frame size") {
@ -154,6 +167,9 @@ class MapOutputTrackerSuite extends FunSuite with LocalSparkContext {
masterTracker.registerMapOutput(10, 0, MapStatus( masterTracker.registerMapOutput(10, 0, MapStatus(
BlockManagerId("88", "mph", 1000), Array.fill[Long](10)(0))) BlockManagerId("88", "mph", 1000), Array.fill[Long](10)(0)))
masterActor.receive(GetMapOutputStatuses(10)) masterActor.receive(GetMapOutputStatuses(10))
// masterTracker.stop() // this throws an exception
actorSystem.shutdown()
} }
test("remote fetch exceeds akka frame size") { test("remote fetch exceeds akka frame size") {
@ -176,5 +192,8 @@ class MapOutputTrackerSuite extends FunSuite with LocalSparkContext {
BlockManagerId("999", "mps", 1000), Array.fill[Long](4000000)(0))) BlockManagerId("999", "mps", 1000), Array.fill[Long](4000000)(0)))
} }
intercept[SparkException] { masterActor.receive(GetMapOutputStatuses(20)) } intercept[SparkException] { masterActor.receive(GetMapOutputStatuses(20)) }
// masterTracker.stop() // this throws an exception
actorSystem.shutdown()
} }
} }

View file

@ -17,7 +17,7 @@
package org.apache.spark package org.apache.spark
import org.scalatest.{BeforeAndAfterEach, FunSuite, PrivateMethodTester} import org.scalatest.{FunSuite, PrivateMethodTester}
import org.apache.spark.scheduler.{SchedulerBackend, TaskScheduler, TaskSchedulerImpl} import org.apache.spark.scheduler.{SchedulerBackend, TaskScheduler, TaskSchedulerImpl}
import org.apache.spark.scheduler.cluster.{SimrSchedulerBackend, SparkDeploySchedulerBackend} import org.apache.spark.scheduler.cluster.{SimrSchedulerBackend, SparkDeploySchedulerBackend}
@ -25,12 +25,12 @@ import org.apache.spark.scheduler.cluster.mesos.{CoarseMesosSchedulerBackend, Me
import org.apache.spark.scheduler.local.LocalBackend import org.apache.spark.scheduler.local.LocalBackend
class SparkContextSchedulerCreationSuite class SparkContextSchedulerCreationSuite
extends FunSuite with PrivateMethodTester with Logging with BeforeAndAfterEach { extends FunSuite with LocalSparkContext with PrivateMethodTester with Logging {
def createTaskScheduler(master: String): TaskSchedulerImpl = { def createTaskScheduler(master: String): TaskSchedulerImpl = {
// Create local SparkContext to setup a SparkEnv. We don't actually want to start() the // Create local SparkContext to setup a SparkEnv. We don't actually want to start() the
// real schedulers, so we don't want to create a full SparkContext with the desired scheduler. // real schedulers, so we don't want to create a full SparkContext with the desired scheduler.
val sc = new SparkContext("local", "test") sc = new SparkContext("local", "test")
val createTaskSchedulerMethod = val createTaskSchedulerMethod =
PrivateMethod[Tuple2[SchedulerBackend, TaskScheduler]]('createTaskScheduler) PrivateMethod[Tuple2[SchedulerBackend, TaskScheduler]]('createTaskScheduler)
val (_, sched) = SparkContext invokePrivate createTaskSchedulerMethod(sc, master) val (_, sched) = SparkContext invokePrivate createTaskSchedulerMethod(sc, master)

View file

@ -145,11 +145,16 @@ class FlumePollingStreamSuite extends TestSuiteBase {
outputStream.register() outputStream.register()
ssc.start() ssc.start()
writeAndVerify(Seq(channel, channel2), ssc, outputBuffer) try {
assertChannelIsEmpty(channel) writeAndVerify(Seq(channel, channel2), ssc, outputBuffer)
assertChannelIsEmpty(channel2) assertChannelIsEmpty(channel)
sink.stop() assertChannelIsEmpty(channel2)
channel.stop() } finally {
sink.stop()
sink2.stop()
channel.stop()
channel2.stop()
}
} }
def writeAndVerify(channels: Seq[MemoryChannel], ssc: StreamingContext, def writeAndVerify(channels: Seq[MemoryChannel], ssc: StreamingContext,

View file

@ -58,7 +58,7 @@ public class TransportClientFactory implements Closeable {
private final ConcurrentHashMap<SocketAddress, TransportClient> connectionPool; private final ConcurrentHashMap<SocketAddress, TransportClient> connectionPool;
private final Class<? extends Channel> socketChannelClass; private final Class<? extends Channel> socketChannelClass;
private final EventLoopGroup workerGroup; private EventLoopGroup workerGroup;
public TransportClientFactory(TransportContext context) { public TransportClientFactory(TransportContext context) {
this.context = context; this.context = context;
@ -150,6 +150,7 @@ public class TransportClientFactory implements Closeable {
if (workerGroup != null) { if (workerGroup != null) {
workerGroup.shutdownGracefully(); workerGroup.shutdownGracefully();
workerGroup = null;
} }
} }

View file

@ -49,6 +49,7 @@ public class TransportServer implements Closeable {
private ChannelFuture channelFuture; private ChannelFuture channelFuture;
private int port = -1; private int port = -1;
/** Creates a TransportServer that binds to the given port, or to any available if 0. */
public TransportServer(TransportContext context, int portToBind) { public TransportServer(TransportContext context, int portToBind) {
this.context = context; this.context = context;
this.conf = context.getConf(); this.conf = context.getConf();
@ -67,7 +68,7 @@ public class TransportServer implements Closeable {
IOMode ioMode = IOMode.valueOf(conf.ioMode()); IOMode ioMode = IOMode.valueOf(conf.ioMode());
EventLoopGroup bossGroup = EventLoopGroup bossGroup =
NettyUtils.createEventLoop(ioMode, conf.serverThreads(), "shuffle-server"); NettyUtils.createEventLoop(ioMode, conf.serverThreads(), "shuffle-server");
EventLoopGroup workerGroup = bossGroup; EventLoopGroup workerGroup = bossGroup;
bootstrap = new ServerBootstrap() bootstrap = new ServerBootstrap()
@ -105,7 +106,7 @@ public class TransportServer implements Closeable {
@Override @Override
public void close() { public void close() {
if (channelFuture != null) { if (channelFuture != null) {
// close is a local operation and should finish with milliseconds; timeout just to be safe // close is a local operation and should finish within milliseconds; timeout just to be safe
channelFuture.channel().close().awaitUninterruptibly(10, TimeUnit.SECONDS); channelFuture.channel().close().awaitUninterruptibly(10, TimeUnit.SECONDS);
channelFuture = null; channelFuture = null;
} }

View file

@ -17,6 +17,8 @@
package org.apache.spark.network.shuffle; package org.apache.spark.network.shuffle;
import java.io.Closeable;
import org.slf4j.Logger; import org.slf4j.Logger;
import org.slf4j.LoggerFactory; import org.slf4j.LoggerFactory;
@ -85,4 +87,9 @@ public class ExternalShuffleClient implements ShuffleClient {
JavaUtils.serialize(new RegisterExecutor(appId, execId, executorInfo)); JavaUtils.serialize(new RegisterExecutor(appId, execId, executorInfo));
client.sendRpcSync(registerExecutorMessage, 5000 /* timeoutMs */); client.sendRpcSync(registerExecutorMessage, 5000 /* timeoutMs */);
} }
@Override
public void close() {
clientFactory.close();
}
} }

View file

@ -17,8 +17,10 @@
package org.apache.spark.network.shuffle; package org.apache.spark.network.shuffle;
import java.io.Closeable;
/** Provides an interface for reading shuffle files, either from an Executor or external service. */ /** Provides an interface for reading shuffle files, either from an Executor or external service. */
public interface ShuffleClient { public interface ShuffleClient extends Closeable {
/** /**
* Fetch a sequence of blocks from a remote node asynchronously, * Fetch a sequence of blocks from a remote node asynchronously,
* *

View file

@ -46,6 +46,10 @@ class StreamingContextSuite extends FunSuite with BeforeAndAfter with Timeouts w
after { after {
if (ssc != null) { if (ssc != null) {
ssc.stop() ssc.stop()
if (ssc.sc != null) {
// Calling ssc.stop() does not always stop the associated SparkContext.
ssc.sc.stop()
}
ssc = null ssc = null
} }
if (sc != null) { if (sc != null) {