[SPARK-12101][CORE] Fix thread pools that cannot cache tasks in Worker and AppClient
`SynchronousQueue` cannot cache any task. This issue is similar to #9978. It's an easy fix. Just use the fixed `ThreadUtils.newDaemonCachedThreadPool`. Author: Shixiong Zhu <shixiong@databricks.com> Closes #10108 from zsxwing/fix-threadpool.
This commit is contained in:
parent
7bc9e1db2c
commit
649be4fa45
|
@ -68,12 +68,10 @@ private[spark] class AppClient(
|
|||
// A thread pool for registering with masters. Because registering with a master is a blocking
|
||||
// action, this thread pool must be able to create "masterRpcAddresses.size" threads at the same
|
||||
// time so that we can register with all masters.
|
||||
private val registerMasterThreadPool = new ThreadPoolExecutor(
|
||||
0,
|
||||
masterRpcAddresses.length, // Make sure we can register with all masters at the same time
|
||||
60L, TimeUnit.SECONDS,
|
||||
new SynchronousQueue[Runnable](),
|
||||
ThreadUtils.namedThreadFactory("appclient-register-master-threadpool"))
|
||||
private val registerMasterThreadPool = ThreadUtils.newDaemonCachedThreadPool(
|
||||
"appclient-register-master-threadpool",
|
||||
masterRpcAddresses.length // Make sure we can register with all masters at the same time
|
||||
)
|
||||
|
||||
// A scheduled executor for scheduling the registration actions
|
||||
private val registrationRetryThread =
|
||||
|
|
|
@ -146,12 +146,10 @@ private[deploy] class Worker(
|
|||
// A thread pool for registering with masters. Because registering with a master is a blocking
|
||||
// action, this thread pool must be able to create "masterRpcAddresses.size" threads at the same
|
||||
// time so that we can register with all masters.
|
||||
private val registerMasterThreadPool = new ThreadPoolExecutor(
|
||||
0,
|
||||
masterRpcAddresses.size, // Make sure we can register with all masters at the same time
|
||||
60L, TimeUnit.SECONDS,
|
||||
new SynchronousQueue[Runnable](),
|
||||
ThreadUtils.namedThreadFactory("worker-register-master-threadpool"))
|
||||
private val registerMasterThreadPool = ThreadUtils.newDaemonCachedThreadPool(
|
||||
"worker-register-master-threadpool",
|
||||
masterRpcAddresses.size // Make sure we can register with all masters at the same time
|
||||
)
|
||||
|
||||
var coresUsed = 0
|
||||
var memoryUsed = 0
|
||||
|
|
|
@ -25,8 +25,6 @@ import scala.collection.mutable
|
|||
import scala.collection.mutable.{ArrayBuffer, HashMap, HashSet}
|
||||
import scala.collection.JavaConverters._
|
||||
|
||||
import com.google.common.util.concurrent.ThreadFactoryBuilder
|
||||
|
||||
import org.apache.hadoop.conf.Configuration
|
||||
import org.apache.hadoop.yarn.api.records._
|
||||
import org.apache.hadoop.yarn.client.api.AMRMClient
|
||||
|
@ -40,7 +38,7 @@ import org.apache.spark.deploy.yarn.YarnSparkHadoopUtil._
|
|||
import org.apache.spark.rpc.{RpcCallContext, RpcEndpointRef}
|
||||
import org.apache.spark.scheduler.{ExecutorExited, ExecutorLossReason}
|
||||
import org.apache.spark.scheduler.cluster.CoarseGrainedClusterMessages.RemoveExecutor
|
||||
import org.apache.spark.util.Utils
|
||||
import org.apache.spark.util.ThreadUtils
|
||||
|
||||
/**
|
||||
* YarnAllocator is charged with requesting containers from the YARN ResourceManager and deciding
|
||||
|
@ -117,13 +115,9 @@ private[yarn] class YarnAllocator(
|
|||
// Resource capability requested for each executors
|
||||
private[yarn] val resource = Resource.newInstance(executorMemory + memoryOverhead, executorCores)
|
||||
|
||||
private val launcherPool = new ThreadPoolExecutor(
|
||||
// max pool size of Integer.MAX_VALUE is ignored because we use an unbounded queue
|
||||
sparkConf.getInt("spark.yarn.containerLauncherMaxThreads", 25), Integer.MAX_VALUE,
|
||||
1, TimeUnit.MINUTES,
|
||||
new LinkedBlockingQueue[Runnable](),
|
||||
new ThreadFactoryBuilder().setNameFormat("ContainerLauncher #%d").setDaemon(true).build())
|
||||
launcherPool.allowCoreThreadTimeOut(true)
|
||||
private val launcherPool = ThreadUtils.newDaemonCachedThreadPool(
|
||||
"ContainerLauncher",
|
||||
sparkConf.getInt("spark.yarn.containerLauncherMaxThreads", 25))
|
||||
|
||||
// For testing
|
||||
private val launchContainers = sparkConf.getBoolean("spark.yarn.launchContainers", true)
|
||||
|
|
Loading…
Reference in a new issue