[SPARK-7693][Core] Remove "import scala.concurrent.ExecutionContext.Implicits.global"

Learnt a lesson from SPARK-7655: Spark should avoid to use `scala.concurrent.ExecutionContext.Implicits.global` because the user may submit blocking actions to `scala.concurrent.ExecutionContext.Implicits.global` and exhaust all threads in it. This could crash Spark. So Spark should always use its own thread pools for safety.

This PR removes all usages of `scala.concurrent.ExecutionContext.Implicits.global` and uses proper thread pools to replace them.

Author: zsxwing <zsxwing@gmail.com>

Closes #6223 from zsxwing/SPARK-7693 and squashes the following commits:

a33ff06 [zsxwing] Decrease the max thread number from 1024 to 128
cf4b3fc [zsxwing] Remove "import scala.concurrent.ExecutionContext.Implicits.global"

(cherry picked from commit ff71d34e00)
Signed-off-by: Reynold Xin <rxin@databricks.com>
This commit is contained in:
zsxwing 2015-05-17 20:37:19 -07:00 committed by Reynold Xin
parent be66d1924e
commit 2a42d2d8f2
6 changed files with 58 additions and 26 deletions

View file

@ -33,7 +33,7 @@ import org.apache.spark.deploy.worker.WorkerWatcher
import org.apache.spark.scheduler.TaskDescription import org.apache.spark.scheduler.TaskDescription
import org.apache.spark.scheduler.cluster.CoarseGrainedClusterMessages._ import org.apache.spark.scheduler.cluster.CoarseGrainedClusterMessages._
import org.apache.spark.serializer.SerializerInstance import org.apache.spark.serializer.SerializerInstance
import org.apache.spark.util.{SignalLogger, Utils} import org.apache.spark.util.{ThreadUtils, SignalLogger, Utils}
private[spark] class CoarseGrainedExecutorBackend( private[spark] class CoarseGrainedExecutorBackend(
override val rpcEnv: RpcEnv, override val rpcEnv: RpcEnv,
@ -55,18 +55,19 @@ private[spark] class CoarseGrainedExecutorBackend(
private[this] val ser: SerializerInstance = env.closureSerializer.newInstance() private[this] val ser: SerializerInstance = env.closureSerializer.newInstance()
override def onStart() { override def onStart() {
import scala.concurrent.ExecutionContext.Implicits.global
logInfo("Connecting to driver: " + driverUrl) logInfo("Connecting to driver: " + driverUrl)
rpcEnv.asyncSetupEndpointRefByURI(driverUrl).flatMap { ref => rpcEnv.asyncSetupEndpointRefByURI(driverUrl).flatMap { ref =>
// This is a very fast action so we can use "ThreadUtils.sameThread"
driver = Some(ref) driver = Some(ref)
ref.ask[RegisteredExecutor.type]( ref.ask[RegisteredExecutor.type](
RegisterExecutor(executorId, self, hostPort, cores, extractLogUrls)) RegisterExecutor(executorId, self, hostPort, cores, extractLogUrls))
} onComplete { }(ThreadUtils.sameThread).onComplete {
// This is a very fast action so we can use "ThreadUtils.sameThread"
case Success(msg) => Utils.tryLogNonFatalError { case Success(msg) => Utils.tryLogNonFatalError {
Option(self).foreach(_.send(msg)) // msg must be RegisteredExecutor Option(self).foreach(_.send(msg)) // msg must be RegisteredExecutor
} }
case Failure(e) => logError(s"Cannot register with driver: $driverUrl", e) case Failure(e) => logError(s"Cannot register with driver: $driverUrl", e)
} }(ThreadUtils.sameThread)
} }
def extractLogUrls: Map[String, String] = { def extractLogUrls: Map[String, String] = {

View file

@ -19,8 +19,10 @@ package org.apache.spark.rdd
import java.util.concurrent.atomic.AtomicLong import java.util.concurrent.atomic.AtomicLong
import org.apache.spark.util.ThreadUtils
import scala.collection.mutable.ArrayBuffer import scala.collection.mutable.ArrayBuffer
import scala.concurrent.ExecutionContext.Implicits.global import scala.concurrent.ExecutionContext
import scala.reflect.ClassTag import scala.reflect.ClassTag
import org.apache.spark.{ComplexFutureAction, FutureAction, Logging} import org.apache.spark.{ComplexFutureAction, FutureAction, Logging}
@ -66,6 +68,8 @@ class AsyncRDDActions[T: ClassTag](self: RDD[T]) extends Serializable with Loggi
val f = new ComplexFutureAction[Seq[T]] val f = new ComplexFutureAction[Seq[T]]
f.run { f.run {
// This is a blocking action so we should use "AsyncRDDActions.futureExecutionContext" which
// is a cached thread pool.
val results = new ArrayBuffer[T](num) val results = new ArrayBuffer[T](num)
val totalParts = self.partitions.length val totalParts = self.partitions.length
var partsScanned = 0 var partsScanned = 0
@ -101,7 +105,7 @@ class AsyncRDDActions[T: ClassTag](self: RDD[T]) extends Serializable with Loggi
partsScanned += numPartsToTry partsScanned += numPartsToTry
} }
results.toSeq results.toSeq
} }(AsyncRDDActions.futureExecutionContext)
f f
} }
@ -123,3 +127,8 @@ class AsyncRDDActions[T: ClassTag](self: RDD[T]) extends Serializable with Loggi
(index, data) => Unit, Unit) (index, data) => Unit, Unit)
} }
} }
private object AsyncRDDActions {
val futureExecutionContext = ExecutionContext.fromExecutorService(
ThreadUtils.newDaemonCachedThreadPool("AsyncRDDActions-future", 128))
}

View file

@ -21,8 +21,7 @@ import java.io.{BufferedOutputStream, ByteArrayOutputStream, File, InputStream,
import java.nio.{ByteBuffer, MappedByteBuffer} import java.nio.{ByteBuffer, MappedByteBuffer}
import scala.collection.mutable.{ArrayBuffer, HashMap} import scala.collection.mutable.{ArrayBuffer, HashMap}
import scala.concurrent.{Await, Future} import scala.concurrent.{ExecutionContext, Await, Future}
import scala.concurrent.ExecutionContext.Implicits.global
import scala.concurrent.duration._ import scala.concurrent.duration._
import scala.util.Random import scala.util.Random
@ -77,6 +76,9 @@ private[spark] class BlockManager(
private val blockInfo = new TimeStampedHashMap[BlockId, BlockInfo] private val blockInfo = new TimeStampedHashMap[BlockId, BlockInfo]
private val futureExecutionContext = ExecutionContext.fromExecutorService(
ThreadUtils.newDaemonCachedThreadPool("block-manager-future", 128))
// Actual storage of where blocks are kept // Actual storage of where blocks are kept
private var externalBlockStoreInitialized = false private var externalBlockStoreInitialized = false
private[spark] val memoryStore = new MemoryStore(this, maxMemory) private[spark] val memoryStore = new MemoryStore(this, maxMemory)
@ -266,11 +268,13 @@ private[spark] class BlockManager(
asyncReregisterLock.synchronized { asyncReregisterLock.synchronized {
if (asyncReregisterTask == null) { if (asyncReregisterTask == null) {
asyncReregisterTask = Future[Unit] { asyncReregisterTask = Future[Unit] {
// This is a blocking action and should run in futureExecutionContext which is a cached
// thread pool
reregister() reregister()
asyncReregisterLock.synchronized { asyncReregisterLock.synchronized {
asyncReregisterTask = null asyncReregisterTask = null
} }
} }(futureExecutionContext)
} }
} }
} }
@ -744,7 +748,11 @@ private[spark] class BlockManager(
case b: ByteBufferValues if putLevel.replication > 1 => case b: ByteBufferValues if putLevel.replication > 1 =>
// Duplicate doesn't copy the bytes, but just creates a wrapper // Duplicate doesn't copy the bytes, but just creates a wrapper
val bufferView = b.buffer.duplicate() val bufferView = b.buffer.duplicate()
Future { replicate(blockId, bufferView, putLevel) } Future {
// This is a blocking action and should run in futureExecutionContext which is a cached
// thread pool
replicate(blockId, bufferView, putLevel)
}(futureExecutionContext)
case _ => null case _ => null
} }
@ -1218,6 +1226,7 @@ private[spark] class BlockManager(
} }
metadataCleaner.cancel() metadataCleaner.cancel()
broadcastCleaner.cancel() broadcastCleaner.cancel()
futureExecutionContext.shutdownNow()
logInfo("BlockManager stopped") logInfo("BlockManager stopped")
} }
} }

View file

@ -17,13 +17,14 @@
package org.apache.spark.storage package org.apache.spark.storage
import scala.collection.Iterable
import scala.collection.generic.CanBuildFrom
import scala.concurrent.{Await, Future} import scala.concurrent.{Await, Future}
import scala.concurrent.ExecutionContext.Implicits.global
import org.apache.spark.rpc.RpcEndpointRef import org.apache.spark.rpc.RpcEndpointRef
import org.apache.spark.{Logging, SparkConf, SparkException} import org.apache.spark.{Logging, SparkConf, SparkException}
import org.apache.spark.storage.BlockManagerMessages._ import org.apache.spark.storage.BlockManagerMessages._
import org.apache.spark.util.RpcUtils import org.apache.spark.util.{ThreadUtils, RpcUtils}
private[spark] private[spark]
class BlockManagerMaster( class BlockManagerMaster(
@ -102,8 +103,8 @@ class BlockManagerMaster(
val future = driverEndpoint.askWithRetry[Future[Seq[Int]]](RemoveRdd(rddId)) val future = driverEndpoint.askWithRetry[Future[Seq[Int]]](RemoveRdd(rddId))
future.onFailure { future.onFailure {
case e: Exception => case e: Exception =>
logWarning(s"Failed to remove RDD $rddId - ${e.getMessage}}") logWarning(s"Failed to remove RDD $rddId - ${e.getMessage}}", e)
} }(ThreadUtils.sameThread)
if (blocking) { if (blocking) {
Await.result(future, timeout) Await.result(future, timeout)
} }
@ -114,8 +115,8 @@ class BlockManagerMaster(
val future = driverEndpoint.askWithRetry[Future[Seq[Boolean]]](RemoveShuffle(shuffleId)) val future = driverEndpoint.askWithRetry[Future[Seq[Boolean]]](RemoveShuffle(shuffleId))
future.onFailure { future.onFailure {
case e: Exception => case e: Exception =>
logWarning(s"Failed to remove shuffle $shuffleId - ${e.getMessage}}") logWarning(s"Failed to remove shuffle $shuffleId - ${e.getMessage}}", e)
} }(ThreadUtils.sameThread)
if (blocking) { if (blocking) {
Await.result(future, timeout) Await.result(future, timeout)
} }
@ -128,8 +129,8 @@ class BlockManagerMaster(
future.onFailure { future.onFailure {
case e: Exception => case e: Exception =>
logWarning(s"Failed to remove broadcast $broadcastId" + logWarning(s"Failed to remove broadcast $broadcastId" +
s" with removeFromMaster = $removeFromMaster - ${e.getMessage}}") s" with removeFromMaster = $removeFromMaster - ${e.getMessage}}", e)
} }(ThreadUtils.sameThread)
if (blocking) { if (blocking) {
Await.result(future, timeout) Await.result(future, timeout)
} }
@ -169,11 +170,17 @@ class BlockManagerMaster(
val response = driverEndpoint. val response = driverEndpoint.
askWithRetry[Map[BlockManagerId, Future[Option[BlockStatus]]]](msg) askWithRetry[Map[BlockManagerId, Future[Option[BlockStatus]]]](msg)
val (blockManagerIds, futures) = response.unzip val (blockManagerIds, futures) = response.unzip
val result = Await.result(Future.sequence(futures), timeout) implicit val sameThread = ThreadUtils.sameThread
if (result == null) { val cbf =
implicitly[
CanBuildFrom[Iterable[Future[Option[BlockStatus]]],
Option[BlockStatus],
Iterable[Option[BlockStatus]]]]
val blockStatus = Await.result(
Future.sequence[Option[BlockStatus], Iterable](futures)(cbf, ThreadUtils.sameThread), timeout)
if (blockStatus == null) {
throw new SparkException("BlockManager returned null for BlockStatus query: " + blockId) throw new SparkException("BlockManager returned null for BlockStatus query: " + blockId)
} }
val blockStatus = result.asInstanceOf[Iterable[Option[BlockStatus]]]
blockManagerIds.zip(blockStatus).flatMap { case (blockManagerId, status) => blockManagerIds.zip(blockStatus).flatMap { case (blockManagerId, status) =>
status.map { s => (blockManagerId, s) } status.map { s => (blockManagerId, s) }
}.toMap }.toMap

View file

@ -78,5 +78,5 @@ case class BroadcastHashJoin(
object BroadcastHashJoin { object BroadcastHashJoin {
private val broadcastHashJoinExecutionContext = ExecutionContext.fromExecutorService( private val broadcastHashJoinExecutionContext = ExecutionContext.fromExecutorService(
ThreadUtils.newDaemonCachedThreadPool("broadcast-hash-join", 1024)) ThreadUtils.newDaemonCachedThreadPool("broadcast-hash-join", 128))
} }

View file

@ -18,14 +18,14 @@
package org.apache.spark.streaming.receiver package org.apache.spark.streaming.receiver
import java.nio.ByteBuffer import java.nio.ByteBuffer
import java.util.concurrent.CountDownLatch
import scala.collection.mutable.ArrayBuffer import scala.collection.mutable.ArrayBuffer
import scala.concurrent._
import org.apache.spark.{Logging, SparkConf} import org.apache.spark.{Logging, SparkConf}
import org.apache.spark.storage.StreamBlockId import org.apache.spark.storage.StreamBlockId
import java.util.concurrent.CountDownLatch import org.apache.spark.util.ThreadUtils
import scala.concurrent._
import ExecutionContext.Implicits.global
/** /**
* Abstract class that is responsible for supervising a Receiver in the worker. * Abstract class that is responsible for supervising a Receiver in the worker.
@ -46,6 +46,9 @@ private[streaming] abstract class ReceiverSupervisor(
// Attach the executor to the receiver // Attach the executor to the receiver
receiver.attachExecutor(this) receiver.attachExecutor(this)
private val futureExecutionContext = ExecutionContext.fromExecutorService(
ThreadUtils.newDaemonCachedThreadPool("receiver-supervisor-future", 128))
/** Receiver id */ /** Receiver id */
protected val streamId = receiver.streamId protected val streamId = receiver.streamId
@ -111,6 +114,7 @@ private[streaming] abstract class ReceiverSupervisor(
stoppingError = error.orNull stoppingError = error.orNull
stopReceiver(message, error) stopReceiver(message, error)
onStop(message, error) onStop(message, error)
futureExecutionContext.shutdownNow()
stopLatch.countDown() stopLatch.countDown()
} }
@ -150,6 +154,8 @@ private[streaming] abstract class ReceiverSupervisor(
/** Restart receiver with delay */ /** Restart receiver with delay */
def restartReceiver(message: String, error: Option[Throwable], delay: Int) { def restartReceiver(message: String, error: Option[Throwable], delay: Int) {
Future { Future {
// This is a blocking action so we should use "futureExecutionContext" which is a cached
// thread pool.
logWarning("Restarting receiver with delay " + delay + " ms: " + message, logWarning("Restarting receiver with delay " + delay + " ms: " + message,
error.getOrElse(null)) error.getOrElse(null))
stopReceiver("Restarting receiver with delay " + delay + "ms: " + message, error) stopReceiver("Restarting receiver with delay " + delay + "ms: " + message, error)
@ -158,7 +164,7 @@ private[streaming] abstract class ReceiverSupervisor(
logInfo("Starting receiver again") logInfo("Starting receiver again")
startReceiver() startReceiver()
logInfo("Receiver started again") logInfo("Receiver started again")
} }(futureExecutionContext)
} }
/** Check if receiver has been marked for stopping */ /** Check if receiver has been marked for stopping */