From 33b85620f910c404873d362d27cca1223084913a Mon Sep 17 00:00:00 2001 From: zsxwing Date: Wed, 22 Apr 2015 11:08:59 -0700 Subject: [PATCH] [SPARK-7052][Core] Add ThreadUtils and move thread methods from Utils to ThreadUtils As per rxin 's suggestion in https://github.com/apache/spark/pull/5392/files#r28757176 What's more, there is a race condition in the global shared `daemonThreadFactoryBuilder`. `daemonThreadFactoryBuilder` may be modified by multiple threads. This PR removed the global `daemonThreadFactoryBuilder` and created a new `ThreadFactoryBuilder` every time. Author: zsxwing Closes #5631 from zsxwing/thread-utils and squashes the following commits: 9fe5b0e [zsxwing] Add ThreadUtils and move thread methods from Utils to ThreadUtils --- .../spark/ExecutorAllocationManager.scala | 8 +-- .../org/apache/spark/HeartbeatReceiver.scala | 11 ++- .../deploy/history/FsHistoryProvider.scala | 4 +- .../org/apache/spark/executor/Executor.scala | 7 +- .../spark/network/nio/ConnectionManager.scala | 12 ++-- .../apache/spark/scheduler/DAGScheduler.scala | 4 +- .../spark/scheduler/TaskResultGetter.scala | 4 +- .../CoarseGrainedSchedulerBackend.scala | 6 +- .../cluster/YarnSchedulerBackend.scala | 4 +- .../spark/scheduler/local/LocalBackend.scala | 8 +-- .../storage/BlockManagerMasterEndpoint.scala | 4 +- .../storage/BlockManagerSlaveEndpoint.scala | 4 +- .../org/apache/spark/util/ThreadUtils.scala | 67 +++++++++++++++++++ .../scala/org/apache/spark/util/Utils.scala | 29 -------- .../apache/spark/util/ThreadUtilsSuite.scala | 57 ++++++++++++++++ .../streaming/kafka/KafkaInputDStream.scala | 5 +- .../kafka/ReliableKafkaReceiver.scala | 4 +- .../receiver/ReceivedBlockHandler.scala | 4 +- .../streaming/util/WriteAheadLogManager.scala | 4 +- 19 files changed, 170 insertions(+), 76 deletions(-) create mode 100644 core/src/main/scala/org/apache/spark/util/ThreadUtils.scala create mode 100644 core/src/test/scala/org/apache/spark/util/ThreadUtilsSuite.scala diff --git a/core/src/main/scala/org/apache/spark/ExecutorAllocationManager.scala b/core/src/main/scala/org/apache/spark/ExecutorAllocationManager.scala index 4e7bf51fc0..b986fa87dc 100644 --- a/core/src/main/scala/org/apache/spark/ExecutorAllocationManager.scala +++ b/core/src/main/scala/org/apache/spark/ExecutorAllocationManager.scala @@ -17,12 +17,12 @@ package org.apache.spark -import java.util.concurrent.{Executors, TimeUnit} +import java.util.concurrent.TimeUnit import scala.collection.mutable import org.apache.spark.scheduler._ -import org.apache.spark.util.{Clock, SystemClock, Utils} +import org.apache.spark.util.{ThreadUtils, Clock, SystemClock, Utils} /** * An agent that dynamically allocates and removes executors based on the workload. @@ -132,8 +132,8 @@ private[spark] class ExecutorAllocationManager( private val listener = new ExecutorAllocationListener // Executor that handles the scheduling task. - private val executor = Executors.newSingleThreadScheduledExecutor( - Utils.namedThreadFactory("spark-dynamic-executor-allocation")) + private val executor = + ThreadUtils.newDaemonSingleThreadScheduledExecutor("spark-dynamic-executor-allocation") /** * Verify that the settings specified through the config are valid. diff --git a/core/src/main/scala/org/apache/spark/HeartbeatReceiver.scala b/core/src/main/scala/org/apache/spark/HeartbeatReceiver.scala index e3bd16f1cb..68d05d5b02 100644 --- a/core/src/main/scala/org/apache/spark/HeartbeatReceiver.scala +++ b/core/src/main/scala/org/apache/spark/HeartbeatReceiver.scala @@ -17,7 +17,7 @@ package org.apache.spark -import java.util.concurrent.{ScheduledFuture, TimeUnit, Executors} +import java.util.concurrent.{ScheduledFuture, TimeUnit} import scala.collection.mutable @@ -25,7 +25,7 @@ import org.apache.spark.executor.TaskMetrics import org.apache.spark.rpc.{ThreadSafeRpcEndpoint, RpcEnv, RpcCallContext} import org.apache.spark.storage.BlockManagerId import org.apache.spark.scheduler.{SlaveLost, TaskScheduler} -import org.apache.spark.util.Utils +import org.apache.spark.util.{ThreadUtils, Utils} /** * A heartbeat from executors to the driver. This is a shared message used by several internal @@ -76,11 +76,10 @@ private[spark] class HeartbeatReceiver(sc: SparkContext) private var timeoutCheckingTask: ScheduledFuture[_] = null - private val timeoutCheckingThread = Executors.newSingleThreadScheduledExecutor( - Utils.namedThreadFactory("heartbeat-timeout-checking-thread")) + private val timeoutCheckingThread = + ThreadUtils.newDaemonSingleThreadScheduledExecutor("heartbeat-timeout-checking-thread") - private val killExecutorThread = Executors.newSingleThreadExecutor( - Utils.namedThreadFactory("kill-executor-thread")) + private val killExecutorThread = ThreadUtils.newDaemonSingleThreadExecutor("kill-executor-thread") override def onStart(): Unit = { timeoutCheckingTask = timeoutCheckingThread.scheduleAtFixedRate(new Runnable { diff --git a/core/src/main/scala/org/apache/spark/deploy/history/FsHistoryProvider.scala b/core/src/main/scala/org/apache/spark/deploy/history/FsHistoryProvider.scala index 47bdd7749e..9847d5944a 100644 --- a/core/src/main/scala/org/apache/spark/deploy/history/FsHistoryProvider.scala +++ b/core/src/main/scala/org/apache/spark/deploy/history/FsHistoryProvider.scala @@ -32,7 +32,7 @@ import org.apache.spark.deploy.SparkHadoopUtil import org.apache.spark.io.CompressionCodec import org.apache.spark.scheduler._ import org.apache.spark.ui.SparkUI -import org.apache.spark.util.Utils +import org.apache.spark.util.{ThreadUtils, Utils} import org.apache.spark.{Logging, SecurityManager, SparkConf} @@ -99,7 +99,7 @@ private[history] class FsHistoryProvider(conf: SparkConf) extends ApplicationHis */ private val replayExecutor: ExecutorService = { if (!conf.contains("spark.testing")) { - Executors.newSingleThreadExecutor(Utils.namedThreadFactory("log-replay-executor")) + ThreadUtils.newDaemonSingleThreadExecutor("log-replay-executor") } else { MoreExecutors.sameThreadExecutor() } diff --git a/core/src/main/scala/org/apache/spark/executor/Executor.scala b/core/src/main/scala/org/apache/spark/executor/Executor.scala index 327d155b38..5fc04df5d6 100644 --- a/core/src/main/scala/org/apache/spark/executor/Executor.scala +++ b/core/src/main/scala/org/apache/spark/executor/Executor.scala @@ -21,7 +21,7 @@ import java.io.File import java.lang.management.ManagementFactory import java.net.URL import java.nio.ByteBuffer -import java.util.concurrent.{ConcurrentHashMap, Executors, TimeUnit} +import java.util.concurrent.{ConcurrentHashMap, TimeUnit} import scala.collection.JavaConversions._ import scala.collection.mutable.{ArrayBuffer, HashMap} @@ -76,7 +76,7 @@ private[spark] class Executor( } // Start worker thread pool - private val threadPool = Utils.newDaemonCachedThreadPool("Executor task launch worker") + private val threadPool = ThreadUtils.newDaemonCachedThreadPool("Executor task launch worker") private val executorSource = new ExecutorSource(threadPool, executorId) if (!isLocal) { @@ -110,8 +110,7 @@ private[spark] class Executor( private val runningTasks = new ConcurrentHashMap[Long, TaskRunner] // Executor for the heartbeat task. - private val heartbeater = Executors.newSingleThreadScheduledExecutor( - Utils.namedThreadFactory("driver-heartbeater")) + private val heartbeater = ThreadUtils.newDaemonSingleThreadScheduledExecutor("driver-heartbeater") startDriverHeartbeater() diff --git a/core/src/main/scala/org/apache/spark/network/nio/ConnectionManager.scala b/core/src/main/scala/org/apache/spark/network/nio/ConnectionManager.scala index 1a68e621ea..16e905982c 100644 --- a/core/src/main/scala/org/apache/spark/network/nio/ConnectionManager.scala +++ b/core/src/main/scala/org/apache/spark/network/nio/ConnectionManager.scala @@ -36,7 +36,7 @@ import io.netty.util.{Timeout, TimerTask, HashedWheelTimer} import org.apache.spark._ import org.apache.spark.network.sasl.{SparkSaslClient, SparkSaslServer} -import org.apache.spark.util.Utils +import org.apache.spark.util.{ThreadUtils, Utils} import scala.util.Try import scala.util.control.NonFatal @@ -79,7 +79,7 @@ private[nio] class ConnectionManager( private val selector = SelectorProvider.provider.openSelector() private val ackTimeoutMonitor = - new HashedWheelTimer(Utils.namedThreadFactory("AckTimeoutMonitor")) + new HashedWheelTimer(ThreadUtils.namedThreadFactory("AckTimeoutMonitor")) private val ackTimeout = conf.getTimeAsSeconds("spark.core.connection.ack.wait.timeout", @@ -102,7 +102,7 @@ private[nio] class ConnectionManager( handlerThreadCount, conf.getInt("spark.core.connection.handler.threads.keepalive", 60), TimeUnit.SECONDS, new LinkedBlockingDeque[Runnable](), - Utils.namedThreadFactory("handle-message-executor")) { + ThreadUtils.namedThreadFactory("handle-message-executor")) { override def afterExecute(r: Runnable, t: Throwable): Unit = { super.afterExecute(r, t) @@ -117,7 +117,7 @@ private[nio] class ConnectionManager( ioThreadCount, conf.getInt("spark.core.connection.io.threads.keepalive", 60), TimeUnit.SECONDS, new LinkedBlockingDeque[Runnable](), - Utils.namedThreadFactory("handle-read-write-executor")) { + ThreadUtils.namedThreadFactory("handle-read-write-executor")) { override def afterExecute(r: Runnable, t: Throwable): Unit = { super.afterExecute(r, t) @@ -134,7 +134,7 @@ private[nio] class ConnectionManager( connectThreadCount, conf.getInt("spark.core.connection.connect.threads.keepalive", 60), TimeUnit.SECONDS, new LinkedBlockingDeque[Runnable](), - Utils.namedThreadFactory("handle-connect-executor")) { + ThreadUtils.namedThreadFactory("handle-connect-executor")) { override def afterExecute(r: Runnable, t: Throwable): Unit = { super.afterExecute(r, t) @@ -160,7 +160,7 @@ private[nio] class ConnectionManager( private val registerRequests = new SynchronizedQueue[SendingConnection] implicit val futureExecContext = ExecutionContext.fromExecutor( - Utils.newDaemonCachedThreadPool("Connection manager future execution context")) + ThreadUtils.newDaemonCachedThreadPool("Connection manager future execution context")) @volatile private var onReceiveCallback: (BufferMessage, ConnectionManagerId) => Option[Message] = null diff --git a/core/src/main/scala/org/apache/spark/scheduler/DAGScheduler.scala b/core/src/main/scala/org/apache/spark/scheduler/DAGScheduler.scala index 4a32f8936f..8c4bff4e83 100644 --- a/core/src/main/scala/org/apache/spark/scheduler/DAGScheduler.scala +++ b/core/src/main/scala/org/apache/spark/scheduler/DAGScheduler.scala @@ -19,7 +19,7 @@ package org.apache.spark.scheduler import java.io.NotSerializableException import java.util.Properties -import java.util.concurrent.{TimeUnit, Executors} +import java.util.concurrent.TimeUnit import java.util.concurrent.atomic.AtomicInteger import scala.collection.mutable.{ArrayBuffer, HashMap, HashSet, Map, Stack} @@ -129,7 +129,7 @@ class DAGScheduler( private val disallowStageRetryForTest = sc.getConf.getBoolean("spark.test.noStageRetry", false) private val messageScheduler = - Executors.newScheduledThreadPool(1, Utils.namedThreadFactory("dag-scheduler-message")) + ThreadUtils.newDaemonSingleThreadScheduledExecutor("dag-scheduler-message") private[scheduler] val eventProcessLoop = new DAGSchedulerEventProcessLoop(this) taskScheduler.setDAGScheduler(this) diff --git a/core/src/main/scala/org/apache/spark/scheduler/TaskResultGetter.scala b/core/src/main/scala/org/apache/spark/scheduler/TaskResultGetter.scala index 3938580aee..391827c1d2 100644 --- a/core/src/main/scala/org/apache/spark/scheduler/TaskResultGetter.scala +++ b/core/src/main/scala/org/apache/spark/scheduler/TaskResultGetter.scala @@ -26,7 +26,7 @@ import scala.util.control.NonFatal import org.apache.spark._ import org.apache.spark.TaskState.TaskState import org.apache.spark.serializer.SerializerInstance -import org.apache.spark.util.Utils +import org.apache.spark.util.{ThreadUtils, Utils} /** * Runs a thread pool that deserializes and remotely fetches (if necessary) task results. @@ -35,7 +35,7 @@ private[spark] class TaskResultGetter(sparkEnv: SparkEnv, scheduler: TaskSchedul extends Logging { private val THREADS = sparkEnv.conf.getInt("spark.resultGetter.threads", 4) - private val getTaskResultExecutor = Utils.newDaemonFixedThreadPool( + private val getTaskResultExecutor = ThreadUtils.newDaemonFixedThreadPool( THREADS, "task-result-getter") protected val serializer = new ThreadLocal[SerializerInstance] { diff --git a/core/src/main/scala/org/apache/spark/scheduler/cluster/CoarseGrainedSchedulerBackend.scala b/core/src/main/scala/org/apache/spark/scheduler/cluster/CoarseGrainedSchedulerBackend.scala index 63987dfb32..9656fb7685 100644 --- a/core/src/main/scala/org/apache/spark/scheduler/cluster/CoarseGrainedSchedulerBackend.scala +++ b/core/src/main/scala/org/apache/spark/scheduler/cluster/CoarseGrainedSchedulerBackend.scala @@ -17,7 +17,7 @@ package org.apache.spark.scheduler.cluster -import java.util.concurrent.{TimeUnit, Executors} +import java.util.concurrent.TimeUnit import java.util.concurrent.atomic.AtomicInteger import scala.collection.mutable.{ArrayBuffer, HashMap, HashSet} @@ -26,7 +26,7 @@ import org.apache.spark.rpc._ import org.apache.spark.{ExecutorAllocationClient, Logging, SparkEnv, SparkException, TaskState} import org.apache.spark.scheduler._ import org.apache.spark.scheduler.cluster.CoarseGrainedClusterMessages._ -import org.apache.spark.util.{SerializableBuffer, AkkaUtils, Utils} +import org.apache.spark.util.{ThreadUtils, SerializableBuffer, AkkaUtils, Utils} /** * A scheduler backend that waits for coarse grained executors to connect to it through Akka. @@ -73,7 +73,7 @@ class CoarseGrainedSchedulerBackend(scheduler: TaskSchedulerImpl, val rpcEnv: Rp private val addressToExecutorId = new HashMap[RpcAddress, String] private val reviveThread = - Executors.newSingleThreadScheduledExecutor(Utils.namedThreadFactory("driver-revive-thread")) + ThreadUtils.newDaemonSingleThreadScheduledExecutor("driver-revive-thread") override def onStart() { // Periodically revive offers to allow delay scheduling to work diff --git a/core/src/main/scala/org/apache/spark/scheduler/cluster/YarnSchedulerBackend.scala b/core/src/main/scala/org/apache/spark/scheduler/cluster/YarnSchedulerBackend.scala index 1406a36a66..d987c7d563 100644 --- a/core/src/main/scala/org/apache/spark/scheduler/cluster/YarnSchedulerBackend.scala +++ b/core/src/main/scala/org/apache/spark/scheduler/cluster/YarnSchedulerBackend.scala @@ -24,7 +24,7 @@ import org.apache.spark.rpc._ import org.apache.spark.scheduler.cluster.CoarseGrainedClusterMessages._ import org.apache.spark.scheduler.TaskSchedulerImpl import org.apache.spark.ui.JettyUtils -import org.apache.spark.util.{RpcUtils, Utils} +import org.apache.spark.util.{ThreadUtils, RpcUtils} import scala.util.control.NonFatal @@ -97,7 +97,7 @@ private[spark] abstract class YarnSchedulerBackend( private var amEndpoint: Option[RpcEndpointRef] = None private val askAmThreadPool = - Utils.newDaemonCachedThreadPool("yarn-scheduler-ask-am-thread-pool") + ThreadUtils.newDaemonCachedThreadPool("yarn-scheduler-ask-am-thread-pool") implicit val askAmExecutor = ExecutionContext.fromExecutor(askAmThreadPool) override def receive: PartialFunction[Any, Unit] = { diff --git a/core/src/main/scala/org/apache/spark/scheduler/local/LocalBackend.scala b/core/src/main/scala/org/apache/spark/scheduler/local/LocalBackend.scala index 50ba0b9d5a..ac5b524517 100644 --- a/core/src/main/scala/org/apache/spark/scheduler/local/LocalBackend.scala +++ b/core/src/main/scala/org/apache/spark/scheduler/local/LocalBackend.scala @@ -18,14 +18,14 @@ package org.apache.spark.scheduler.local import java.nio.ByteBuffer -import java.util.concurrent.{Executors, TimeUnit} +import java.util.concurrent.TimeUnit import org.apache.spark.{Logging, SparkConf, SparkContext, SparkEnv, TaskState} import org.apache.spark.TaskState.TaskState import org.apache.spark.executor.{Executor, ExecutorBackend} import org.apache.spark.rpc.{ThreadSafeRpcEndpoint, RpcCallContext, RpcEndpointRef, RpcEnv} import org.apache.spark.scheduler.{SchedulerBackend, TaskSchedulerImpl, WorkerOffer} -import org.apache.spark.util.Utils +import org.apache.spark.util.{ThreadUtils, Utils} private case class ReviveOffers() @@ -47,8 +47,8 @@ private[spark] class LocalEndpoint( private val totalCores: Int) extends ThreadSafeRpcEndpoint with Logging { - private val reviveThread = Executors.newSingleThreadScheduledExecutor( - Utils.namedThreadFactory("local-revive-thread")) + private val reviveThread = + ThreadUtils.newDaemonSingleThreadScheduledExecutor("local-revive-thread") private var freeCores = totalCores diff --git a/core/src/main/scala/org/apache/spark/storage/BlockManagerMasterEndpoint.scala b/core/src/main/scala/org/apache/spark/storage/BlockManagerMasterEndpoint.scala index 28c73a7d54..4682167912 100644 --- a/core/src/main/scala/org/apache/spark/storage/BlockManagerMasterEndpoint.scala +++ b/core/src/main/scala/org/apache/spark/storage/BlockManagerMasterEndpoint.scala @@ -28,7 +28,7 @@ import org.apache.spark.{Logging, SparkConf} import org.apache.spark.annotation.DeveloperApi import org.apache.spark.scheduler._ import org.apache.spark.storage.BlockManagerMessages._ -import org.apache.spark.util.Utils +import org.apache.spark.util.{ThreadUtils, Utils} /** * BlockManagerMasterEndpoint is an [[ThreadSafeRpcEndpoint]] on the master node to track statuses @@ -51,7 +51,7 @@ class BlockManagerMasterEndpoint( // Mapping from block id to the set of block managers that have the block. private val blockLocations = new JHashMap[BlockId, mutable.HashSet[BlockManagerId]] - private val askThreadPool = Utils.newDaemonCachedThreadPool("block-manager-ask-thread-pool") + private val askThreadPool = ThreadUtils.newDaemonCachedThreadPool("block-manager-ask-thread-pool") private implicit val askExecutionContext = ExecutionContext.fromExecutorService(askThreadPool) override def receiveAndReply(context: RpcCallContext): PartialFunction[Any, Unit] = { diff --git a/core/src/main/scala/org/apache/spark/storage/BlockManagerSlaveEndpoint.scala b/core/src/main/scala/org/apache/spark/storage/BlockManagerSlaveEndpoint.scala index 8980fa8eb7..543df4e135 100644 --- a/core/src/main/scala/org/apache/spark/storage/BlockManagerSlaveEndpoint.scala +++ b/core/src/main/scala/org/apache/spark/storage/BlockManagerSlaveEndpoint.scala @@ -20,7 +20,7 @@ package org.apache.spark.storage import scala.concurrent.{ExecutionContext, Future} import org.apache.spark.rpc.{RpcEnv, RpcCallContext, RpcEndpoint} -import org.apache.spark.util.Utils +import org.apache.spark.util.ThreadUtils import org.apache.spark.{Logging, MapOutputTracker, SparkEnv} import org.apache.spark.storage.BlockManagerMessages._ @@ -36,7 +36,7 @@ class BlockManagerSlaveEndpoint( extends RpcEndpoint with Logging { private val asyncThreadPool = - Utils.newDaemonCachedThreadPool("block-manager-slave-async-thread-pool") + ThreadUtils.newDaemonCachedThreadPool("block-manager-slave-async-thread-pool") private implicit val asyncExecutionContext = ExecutionContext.fromExecutorService(asyncThreadPool) // Operations that involve removing blocks may be slow and should be done asynchronously diff --git a/core/src/main/scala/org/apache/spark/util/ThreadUtils.scala b/core/src/main/scala/org/apache/spark/util/ThreadUtils.scala new file mode 100644 index 0000000000..098a4b7949 --- /dev/null +++ b/core/src/main/scala/org/apache/spark/util/ThreadUtils.scala @@ -0,0 +1,67 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + + +package org.apache.spark.util + +import java.util.concurrent._ + +import com.google.common.util.concurrent.ThreadFactoryBuilder + +private[spark] object ThreadUtils { + + /** + * Create a thread factory that names threads with a prefix and also sets the threads to daemon. + */ + def namedThreadFactory(prefix: String): ThreadFactory = { + new ThreadFactoryBuilder().setDaemon(true).setNameFormat(prefix + "-%d").build() + } + + /** + * Wrapper over newCachedThreadPool. Thread names are formatted as prefix-ID, where ID is a + * unique, sequentially assigned integer. + */ + def newDaemonCachedThreadPool(prefix: String): ThreadPoolExecutor = { + val threadFactory = namedThreadFactory(prefix) + Executors.newCachedThreadPool(threadFactory).asInstanceOf[ThreadPoolExecutor] + } + + /** + * Wrapper over newFixedThreadPool. Thread names are formatted as prefix-ID, where ID is a + * unique, sequentially assigned integer. + */ + def newDaemonFixedThreadPool(nThreads: Int, prefix: String): ThreadPoolExecutor = { + val threadFactory = namedThreadFactory(prefix) + Executors.newFixedThreadPool(nThreads, threadFactory).asInstanceOf[ThreadPoolExecutor] + } + + /** + * Wrapper over newSingleThreadExecutor. + */ + def newDaemonSingleThreadExecutor(threadName: String): ExecutorService = { + val threadFactory = new ThreadFactoryBuilder().setDaemon(true).setNameFormat(threadName).build() + Executors.newSingleThreadExecutor(threadFactory) + } + + /** + * Wrapper over newSingleThreadScheduledExecutor. + */ + def newDaemonSingleThreadScheduledExecutor(threadName: String): ScheduledExecutorService = { + val threadFactory = new ThreadFactoryBuilder().setDaemon(true).setNameFormat(threadName).build() + Executors.newSingleThreadScheduledExecutor(threadFactory) + } +} diff --git a/core/src/main/scala/org/apache/spark/util/Utils.scala b/core/src/main/scala/org/apache/spark/util/Utils.scala index 7b0de1ae55..2feb7341b1 100644 --- a/core/src/main/scala/org/apache/spark/util/Utils.scala +++ b/core/src/main/scala/org/apache/spark/util/Utils.scala @@ -35,7 +35,6 @@ import scala.util.control.{ControlThrowable, NonFatal} import com.google.common.io.{ByteStreams, Files} import com.google.common.net.InetAddresses -import com.google.common.util.concurrent.ThreadFactoryBuilder import org.apache.commons.lang3.SystemUtils import org.apache.hadoop.conf.Configuration import org.apache.hadoop.fs.{FileSystem, FileUtil, Path} @@ -897,34 +896,6 @@ private[spark] object Utils extends Logging { hostPortParseResults.get(hostPort) } - private val daemonThreadFactoryBuilder: ThreadFactoryBuilder = - new ThreadFactoryBuilder().setDaemon(true) - - /** - * Create a thread factory that names threads with a prefix and also sets the threads to daemon. - */ - def namedThreadFactory(prefix: String): ThreadFactory = { - daemonThreadFactoryBuilder.setNameFormat(prefix + "-%d").build() - } - - /** - * Wrapper over newCachedThreadPool. Thread names are formatted as prefix-ID, where ID is a - * unique, sequentially assigned integer. - */ - def newDaemonCachedThreadPool(prefix: String): ThreadPoolExecutor = { - val threadFactory = namedThreadFactory(prefix) - Executors.newCachedThreadPool(threadFactory).asInstanceOf[ThreadPoolExecutor] - } - - /** - * Wrapper over newFixedThreadPool. Thread names are formatted as prefix-ID, where ID is a - * unique, sequentially assigned integer. - */ - def newDaemonFixedThreadPool(nThreads: Int, prefix: String): ThreadPoolExecutor = { - val threadFactory = namedThreadFactory(prefix) - Executors.newFixedThreadPool(nThreads, threadFactory).asInstanceOf[ThreadPoolExecutor] - } - /** * Return the string to tell how long has passed in milliseconds. */ diff --git a/core/src/test/scala/org/apache/spark/util/ThreadUtilsSuite.scala b/core/src/test/scala/org/apache/spark/util/ThreadUtilsSuite.scala new file mode 100644 index 0000000000..a3aa3e953f --- /dev/null +++ b/core/src/test/scala/org/apache/spark/util/ThreadUtilsSuite.scala @@ -0,0 +1,57 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + + +package org.apache.spark.util + +import java.util.concurrent.{CountDownLatch, TimeUnit} + +import org.scalatest.FunSuite + +class ThreadUtilsSuite extends FunSuite { + + test("newDaemonSingleThreadExecutor") { + val executor = ThreadUtils.newDaemonSingleThreadExecutor("this-is-a-thread-name") + @volatile var threadName = "" + executor.submit(new Runnable { + override def run(): Unit = { + threadName = Thread.currentThread().getName() + } + }) + executor.shutdown() + executor.awaitTermination(10, TimeUnit.SECONDS) + assert(threadName === "this-is-a-thread-name") + } + + test("newDaemonSingleThreadScheduledExecutor") { + val executor = ThreadUtils.newDaemonSingleThreadScheduledExecutor("this-is-a-thread-name") + try { + val latch = new CountDownLatch(1) + @volatile var threadName = "" + executor.schedule(new Runnable { + override def run(): Unit = { + threadName = Thread.currentThread().getName() + latch.countDown() + } + }, 1, TimeUnit.MILLISECONDS) + latch.await(10, TimeUnit.SECONDS) + assert(threadName === "this-is-a-thread-name") + } finally { + executor.shutdownNow() + } + } +} diff --git a/external/kafka/src/main/scala/org/apache/spark/streaming/kafka/KafkaInputDStream.scala b/external/kafka/src/main/scala/org/apache/spark/streaming/kafka/KafkaInputDStream.scala index 4d26b640e8..cca0fac023 100644 --- a/external/kafka/src/main/scala/org/apache/spark/streaming/kafka/KafkaInputDStream.scala +++ b/external/kafka/src/main/scala/org/apache/spark/streaming/kafka/KafkaInputDStream.scala @@ -31,7 +31,7 @@ import org.apache.spark.storage.StorageLevel import org.apache.spark.streaming.StreamingContext import org.apache.spark.streaming.dstream._ import org.apache.spark.streaming.receiver.Receiver -import org.apache.spark.util.Utils +import org.apache.spark.util.ThreadUtils /** * Input stream that pulls messages from a Kafka Broker. @@ -111,7 +111,8 @@ class KafkaReceiver[ val topicMessageStreams = consumerConnector.createMessageStreams( topics, keyDecoder, valueDecoder) - val executorPool = Utils.newDaemonFixedThreadPool(topics.values.sum, "KafkaMessageHandler") + val executorPool = + ThreadUtils.newDaemonFixedThreadPool(topics.values.sum, "KafkaMessageHandler") try { // Start the messages handler for each partition topicMessageStreams.values.foreach { streams => diff --git a/external/kafka/src/main/scala/org/apache/spark/streaming/kafka/ReliableKafkaReceiver.scala b/external/kafka/src/main/scala/org/apache/spark/streaming/kafka/ReliableKafkaReceiver.scala index c4a44c1822..ea87e96037 100644 --- a/external/kafka/src/main/scala/org/apache/spark/streaming/kafka/ReliableKafkaReceiver.scala +++ b/external/kafka/src/main/scala/org/apache/spark/streaming/kafka/ReliableKafkaReceiver.scala @@ -33,7 +33,7 @@ import org.I0Itec.zkclient.ZkClient import org.apache.spark.{Logging, SparkEnv} import org.apache.spark.storage.{StorageLevel, StreamBlockId} import org.apache.spark.streaming.receiver.{BlockGenerator, BlockGeneratorListener, Receiver} -import org.apache.spark.util.Utils +import org.apache.spark.util.ThreadUtils /** * ReliableKafkaReceiver offers the ability to reliably store data into BlockManager without loss. @@ -121,7 +121,7 @@ class ReliableKafkaReceiver[ zkClient = new ZkClient(consumerConfig.zkConnect, consumerConfig.zkSessionTimeoutMs, consumerConfig.zkConnectionTimeoutMs, ZKStringSerializer) - messageHandlerThreadPool = Utils.newDaemonFixedThreadPool( + messageHandlerThreadPool = ThreadUtils.newDaemonFixedThreadPool( topics.values.sum, "KafkaMessageHandler") blockGenerator.start() diff --git a/streaming/src/main/scala/org/apache/spark/streaming/receiver/ReceivedBlockHandler.scala b/streaming/src/main/scala/org/apache/spark/streaming/receiver/ReceivedBlockHandler.scala index dcdc27d29c..297bf04c0c 100644 --- a/streaming/src/main/scala/org/apache/spark/streaming/receiver/ReceivedBlockHandler.scala +++ b/streaming/src/main/scala/org/apache/spark/streaming/receiver/ReceivedBlockHandler.scala @@ -28,7 +28,7 @@ import org.apache.hadoop.fs.Path import org.apache.spark.{Logging, SparkConf, SparkException} import org.apache.spark.storage._ import org.apache.spark.streaming.util.{WriteAheadLogFileSegment, WriteAheadLogManager} -import org.apache.spark.util.{Clock, SystemClock, Utils} +import org.apache.spark.util.{ThreadUtils, Clock, SystemClock} /** Trait that represents the metadata related to storage of blocks */ private[streaming] trait ReceivedBlockStoreResult { @@ -150,7 +150,7 @@ private[streaming] class WriteAheadLogBasedBlockHandler( // For processing futures used in parallel block storing into block manager and write ahead log // # threads = 2, so that both writing to BM and WAL can proceed in parallel implicit private val executionContext = ExecutionContext.fromExecutorService( - Utils.newDaemonFixedThreadPool(2, this.getClass.getSimpleName)) + ThreadUtils.newDaemonFixedThreadPool(2, this.getClass.getSimpleName)) /** * This implementation stores the block into the block manager as well as a write ahead log. diff --git a/streaming/src/main/scala/org/apache/spark/streaming/util/WriteAheadLogManager.scala b/streaming/src/main/scala/org/apache/spark/streaming/util/WriteAheadLogManager.scala index 6bdfe45dc7..38a93cc3c9 100644 --- a/streaming/src/main/scala/org/apache/spark/streaming/util/WriteAheadLogManager.scala +++ b/streaming/src/main/scala/org/apache/spark/streaming/util/WriteAheadLogManager.scala @@ -25,7 +25,7 @@ import scala.language.postfixOps import org.apache.hadoop.conf.Configuration import org.apache.hadoop.fs.Path import org.apache.spark.Logging -import org.apache.spark.util.{Clock, SystemClock, Utils} +import org.apache.spark.util.{ThreadUtils, Clock, SystemClock} import WriteAheadLogManager._ /** @@ -60,7 +60,7 @@ private[streaming] class WriteAheadLogManager( if (callerName.nonEmpty) s" for $callerName" else "" private val threadpoolName = s"WriteAheadLogManager $callerNameTag" implicit private val executionContext = ExecutionContext.fromExecutorService( - Utils.newDaemonFixedThreadPool(1, threadpoolName)) + ThreadUtils.newDaemonSingleThreadExecutor(threadpoolName)) override protected val logName = s"WriteAheadLogManager $callerNameTag" private var currentLogPath: Option[String] = None