[SPARK-7052][Core] Add ThreadUtils and move thread methods from Utils to ThreadUtils
As per rxin 's suggestion in https://github.com/apache/spark/pull/5392/files#r28757176 What's more, there is a race condition in the global shared `daemonThreadFactoryBuilder`. `daemonThreadFactoryBuilder` may be modified by multiple threads. This PR removed the global `daemonThreadFactoryBuilder` and created a new `ThreadFactoryBuilder` every time. Author: zsxwing <zsxwing@gmail.com> Closes #5631 from zsxwing/thread-utils and squashes the following commits: 9fe5b0e [zsxwing] Add ThreadUtils and move thread methods from Utils to ThreadUtils
This commit is contained in:
parent
bdc5c16e76
commit
33b85620f9
|
@ -17,12 +17,12 @@
|
|||
|
||||
package org.apache.spark
|
||||
|
||||
import java.util.concurrent.{Executors, TimeUnit}
|
||||
import java.util.concurrent.TimeUnit
|
||||
|
||||
import scala.collection.mutable
|
||||
|
||||
import org.apache.spark.scheduler._
|
||||
import org.apache.spark.util.{Clock, SystemClock, Utils}
|
||||
import org.apache.spark.util.{ThreadUtils, Clock, SystemClock, Utils}
|
||||
|
||||
/**
|
||||
* An agent that dynamically allocates and removes executors based on the workload.
|
||||
|
@ -132,8 +132,8 @@ private[spark] class ExecutorAllocationManager(
|
|||
private val listener = new ExecutorAllocationListener
|
||||
|
||||
// Executor that handles the scheduling task.
|
||||
private val executor = Executors.newSingleThreadScheduledExecutor(
|
||||
Utils.namedThreadFactory("spark-dynamic-executor-allocation"))
|
||||
private val executor =
|
||||
ThreadUtils.newDaemonSingleThreadScheduledExecutor("spark-dynamic-executor-allocation")
|
||||
|
||||
/**
|
||||
* Verify that the settings specified through the config are valid.
|
||||
|
|
|
@ -17,7 +17,7 @@
|
|||
|
||||
package org.apache.spark
|
||||
|
||||
import java.util.concurrent.{ScheduledFuture, TimeUnit, Executors}
|
||||
import java.util.concurrent.{ScheduledFuture, TimeUnit}
|
||||
|
||||
import scala.collection.mutable
|
||||
|
||||
|
@ -25,7 +25,7 @@ import org.apache.spark.executor.TaskMetrics
|
|||
import org.apache.spark.rpc.{ThreadSafeRpcEndpoint, RpcEnv, RpcCallContext}
|
||||
import org.apache.spark.storage.BlockManagerId
|
||||
import org.apache.spark.scheduler.{SlaveLost, TaskScheduler}
|
||||
import org.apache.spark.util.Utils
|
||||
import org.apache.spark.util.{ThreadUtils, Utils}
|
||||
|
||||
/**
|
||||
* A heartbeat from executors to the driver. This is a shared message used by several internal
|
||||
|
@ -76,11 +76,10 @@ private[spark] class HeartbeatReceiver(sc: SparkContext)
|
|||
|
||||
private var timeoutCheckingTask: ScheduledFuture[_] = null
|
||||
|
||||
private val timeoutCheckingThread = Executors.newSingleThreadScheduledExecutor(
|
||||
Utils.namedThreadFactory("heartbeat-timeout-checking-thread"))
|
||||
private val timeoutCheckingThread =
|
||||
ThreadUtils.newDaemonSingleThreadScheduledExecutor("heartbeat-timeout-checking-thread")
|
||||
|
||||
private val killExecutorThread = Executors.newSingleThreadExecutor(
|
||||
Utils.namedThreadFactory("kill-executor-thread"))
|
||||
private val killExecutorThread = ThreadUtils.newDaemonSingleThreadExecutor("kill-executor-thread")
|
||||
|
||||
override def onStart(): Unit = {
|
||||
timeoutCheckingTask = timeoutCheckingThread.scheduleAtFixedRate(new Runnable {
|
||||
|
|
|
@ -32,7 +32,7 @@ import org.apache.spark.deploy.SparkHadoopUtil
|
|||
import org.apache.spark.io.CompressionCodec
|
||||
import org.apache.spark.scheduler._
|
||||
import org.apache.spark.ui.SparkUI
|
||||
import org.apache.spark.util.Utils
|
||||
import org.apache.spark.util.{ThreadUtils, Utils}
|
||||
import org.apache.spark.{Logging, SecurityManager, SparkConf}
|
||||
|
||||
|
||||
|
@ -99,7 +99,7 @@ private[history] class FsHistoryProvider(conf: SparkConf) extends ApplicationHis
|
|||
*/
|
||||
private val replayExecutor: ExecutorService = {
|
||||
if (!conf.contains("spark.testing")) {
|
||||
Executors.newSingleThreadExecutor(Utils.namedThreadFactory("log-replay-executor"))
|
||||
ThreadUtils.newDaemonSingleThreadExecutor("log-replay-executor")
|
||||
} else {
|
||||
MoreExecutors.sameThreadExecutor()
|
||||
}
|
||||
|
|
|
@ -21,7 +21,7 @@ import java.io.File
|
|||
import java.lang.management.ManagementFactory
|
||||
import java.net.URL
|
||||
import java.nio.ByteBuffer
|
||||
import java.util.concurrent.{ConcurrentHashMap, Executors, TimeUnit}
|
||||
import java.util.concurrent.{ConcurrentHashMap, TimeUnit}
|
||||
|
||||
import scala.collection.JavaConversions._
|
||||
import scala.collection.mutable.{ArrayBuffer, HashMap}
|
||||
|
@ -76,7 +76,7 @@ private[spark] class Executor(
|
|||
}
|
||||
|
||||
// Start worker thread pool
|
||||
private val threadPool = Utils.newDaemonCachedThreadPool("Executor task launch worker")
|
||||
private val threadPool = ThreadUtils.newDaemonCachedThreadPool("Executor task launch worker")
|
||||
private val executorSource = new ExecutorSource(threadPool, executorId)
|
||||
|
||||
if (!isLocal) {
|
||||
|
@ -110,8 +110,7 @@ private[spark] class Executor(
|
|||
private val runningTasks = new ConcurrentHashMap[Long, TaskRunner]
|
||||
|
||||
// Executor for the heartbeat task.
|
||||
private val heartbeater = Executors.newSingleThreadScheduledExecutor(
|
||||
Utils.namedThreadFactory("driver-heartbeater"))
|
||||
private val heartbeater = ThreadUtils.newDaemonSingleThreadScheduledExecutor("driver-heartbeater")
|
||||
|
||||
startDriverHeartbeater()
|
||||
|
||||
|
|
|
@ -36,7 +36,7 @@ import io.netty.util.{Timeout, TimerTask, HashedWheelTimer}
|
|||
|
||||
import org.apache.spark._
|
||||
import org.apache.spark.network.sasl.{SparkSaslClient, SparkSaslServer}
|
||||
import org.apache.spark.util.Utils
|
||||
import org.apache.spark.util.{ThreadUtils, Utils}
|
||||
|
||||
import scala.util.Try
|
||||
import scala.util.control.NonFatal
|
||||
|
@ -79,7 +79,7 @@ private[nio] class ConnectionManager(
|
|||
|
||||
private val selector = SelectorProvider.provider.openSelector()
|
||||
private val ackTimeoutMonitor =
|
||||
new HashedWheelTimer(Utils.namedThreadFactory("AckTimeoutMonitor"))
|
||||
new HashedWheelTimer(ThreadUtils.namedThreadFactory("AckTimeoutMonitor"))
|
||||
|
||||
private val ackTimeout =
|
||||
conf.getTimeAsSeconds("spark.core.connection.ack.wait.timeout",
|
||||
|
@ -102,7 +102,7 @@ private[nio] class ConnectionManager(
|
|||
handlerThreadCount,
|
||||
conf.getInt("spark.core.connection.handler.threads.keepalive", 60), TimeUnit.SECONDS,
|
||||
new LinkedBlockingDeque[Runnable](),
|
||||
Utils.namedThreadFactory("handle-message-executor")) {
|
||||
ThreadUtils.namedThreadFactory("handle-message-executor")) {
|
||||
|
||||
override def afterExecute(r: Runnable, t: Throwable): Unit = {
|
||||
super.afterExecute(r, t)
|
||||
|
@ -117,7 +117,7 @@ private[nio] class ConnectionManager(
|
|||
ioThreadCount,
|
||||
conf.getInt("spark.core.connection.io.threads.keepalive", 60), TimeUnit.SECONDS,
|
||||
new LinkedBlockingDeque[Runnable](),
|
||||
Utils.namedThreadFactory("handle-read-write-executor")) {
|
||||
ThreadUtils.namedThreadFactory("handle-read-write-executor")) {
|
||||
|
||||
override def afterExecute(r: Runnable, t: Throwable): Unit = {
|
||||
super.afterExecute(r, t)
|
||||
|
@ -134,7 +134,7 @@ private[nio] class ConnectionManager(
|
|||
connectThreadCount,
|
||||
conf.getInt("spark.core.connection.connect.threads.keepalive", 60), TimeUnit.SECONDS,
|
||||
new LinkedBlockingDeque[Runnable](),
|
||||
Utils.namedThreadFactory("handle-connect-executor")) {
|
||||
ThreadUtils.namedThreadFactory("handle-connect-executor")) {
|
||||
|
||||
override def afterExecute(r: Runnable, t: Throwable): Unit = {
|
||||
super.afterExecute(r, t)
|
||||
|
@ -160,7 +160,7 @@ private[nio] class ConnectionManager(
|
|||
private val registerRequests = new SynchronizedQueue[SendingConnection]
|
||||
|
||||
implicit val futureExecContext = ExecutionContext.fromExecutor(
|
||||
Utils.newDaemonCachedThreadPool("Connection manager future execution context"))
|
||||
ThreadUtils.newDaemonCachedThreadPool("Connection manager future execution context"))
|
||||
|
||||
@volatile
|
||||
private var onReceiveCallback: (BufferMessage, ConnectionManagerId) => Option[Message] = null
|
||||
|
|
|
@ -19,7 +19,7 @@ package org.apache.spark.scheduler
|
|||
|
||||
import java.io.NotSerializableException
|
||||
import java.util.Properties
|
||||
import java.util.concurrent.{TimeUnit, Executors}
|
||||
import java.util.concurrent.TimeUnit
|
||||
import java.util.concurrent.atomic.AtomicInteger
|
||||
|
||||
import scala.collection.mutable.{ArrayBuffer, HashMap, HashSet, Map, Stack}
|
||||
|
@ -129,7 +129,7 @@ class DAGScheduler(
|
|||
private val disallowStageRetryForTest = sc.getConf.getBoolean("spark.test.noStageRetry", false)
|
||||
|
||||
private val messageScheduler =
|
||||
Executors.newScheduledThreadPool(1, Utils.namedThreadFactory("dag-scheduler-message"))
|
||||
ThreadUtils.newDaemonSingleThreadScheduledExecutor("dag-scheduler-message")
|
||||
|
||||
private[scheduler] val eventProcessLoop = new DAGSchedulerEventProcessLoop(this)
|
||||
taskScheduler.setDAGScheduler(this)
|
||||
|
|
|
@ -26,7 +26,7 @@ import scala.util.control.NonFatal
|
|||
import org.apache.spark._
|
||||
import org.apache.spark.TaskState.TaskState
|
||||
import org.apache.spark.serializer.SerializerInstance
|
||||
import org.apache.spark.util.Utils
|
||||
import org.apache.spark.util.{ThreadUtils, Utils}
|
||||
|
||||
/**
|
||||
* Runs a thread pool that deserializes and remotely fetches (if necessary) task results.
|
||||
|
@ -35,7 +35,7 @@ private[spark] class TaskResultGetter(sparkEnv: SparkEnv, scheduler: TaskSchedul
|
|||
extends Logging {
|
||||
|
||||
private val THREADS = sparkEnv.conf.getInt("spark.resultGetter.threads", 4)
|
||||
private val getTaskResultExecutor = Utils.newDaemonFixedThreadPool(
|
||||
private val getTaskResultExecutor = ThreadUtils.newDaemonFixedThreadPool(
|
||||
THREADS, "task-result-getter")
|
||||
|
||||
protected val serializer = new ThreadLocal[SerializerInstance] {
|
||||
|
|
|
@ -17,7 +17,7 @@
|
|||
|
||||
package org.apache.spark.scheduler.cluster
|
||||
|
||||
import java.util.concurrent.{TimeUnit, Executors}
|
||||
import java.util.concurrent.TimeUnit
|
||||
import java.util.concurrent.atomic.AtomicInteger
|
||||
|
||||
import scala.collection.mutable.{ArrayBuffer, HashMap, HashSet}
|
||||
|
@ -26,7 +26,7 @@ import org.apache.spark.rpc._
|
|||
import org.apache.spark.{ExecutorAllocationClient, Logging, SparkEnv, SparkException, TaskState}
|
||||
import org.apache.spark.scheduler._
|
||||
import org.apache.spark.scheduler.cluster.CoarseGrainedClusterMessages._
|
||||
import org.apache.spark.util.{SerializableBuffer, AkkaUtils, Utils}
|
||||
import org.apache.spark.util.{ThreadUtils, SerializableBuffer, AkkaUtils, Utils}
|
||||
|
||||
/**
|
||||
* A scheduler backend that waits for coarse grained executors to connect to it through Akka.
|
||||
|
@ -73,7 +73,7 @@ class CoarseGrainedSchedulerBackend(scheduler: TaskSchedulerImpl, val rpcEnv: Rp
|
|||
private val addressToExecutorId = new HashMap[RpcAddress, String]
|
||||
|
||||
private val reviveThread =
|
||||
Executors.newSingleThreadScheduledExecutor(Utils.namedThreadFactory("driver-revive-thread"))
|
||||
ThreadUtils.newDaemonSingleThreadScheduledExecutor("driver-revive-thread")
|
||||
|
||||
override def onStart() {
|
||||
// Periodically revive offers to allow delay scheduling to work
|
||||
|
|
|
@ -24,7 +24,7 @@ import org.apache.spark.rpc._
|
|||
import org.apache.spark.scheduler.cluster.CoarseGrainedClusterMessages._
|
||||
import org.apache.spark.scheduler.TaskSchedulerImpl
|
||||
import org.apache.spark.ui.JettyUtils
|
||||
import org.apache.spark.util.{RpcUtils, Utils}
|
||||
import org.apache.spark.util.{ThreadUtils, RpcUtils}
|
||||
|
||||
import scala.util.control.NonFatal
|
||||
|
||||
|
@ -97,7 +97,7 @@ private[spark] abstract class YarnSchedulerBackend(
|
|||
private var amEndpoint: Option[RpcEndpointRef] = None
|
||||
|
||||
private val askAmThreadPool =
|
||||
Utils.newDaemonCachedThreadPool("yarn-scheduler-ask-am-thread-pool")
|
||||
ThreadUtils.newDaemonCachedThreadPool("yarn-scheduler-ask-am-thread-pool")
|
||||
implicit val askAmExecutor = ExecutionContext.fromExecutor(askAmThreadPool)
|
||||
|
||||
override def receive: PartialFunction[Any, Unit] = {
|
||||
|
|
|
@ -18,14 +18,14 @@
|
|||
package org.apache.spark.scheduler.local
|
||||
|
||||
import java.nio.ByteBuffer
|
||||
import java.util.concurrent.{Executors, TimeUnit}
|
||||
import java.util.concurrent.TimeUnit
|
||||
|
||||
import org.apache.spark.{Logging, SparkConf, SparkContext, SparkEnv, TaskState}
|
||||
import org.apache.spark.TaskState.TaskState
|
||||
import org.apache.spark.executor.{Executor, ExecutorBackend}
|
||||
import org.apache.spark.rpc.{ThreadSafeRpcEndpoint, RpcCallContext, RpcEndpointRef, RpcEnv}
|
||||
import org.apache.spark.scheduler.{SchedulerBackend, TaskSchedulerImpl, WorkerOffer}
|
||||
import org.apache.spark.util.Utils
|
||||
import org.apache.spark.util.{ThreadUtils, Utils}
|
||||
|
||||
private case class ReviveOffers()
|
||||
|
||||
|
@ -47,8 +47,8 @@ private[spark] class LocalEndpoint(
|
|||
private val totalCores: Int)
|
||||
extends ThreadSafeRpcEndpoint with Logging {
|
||||
|
||||
private val reviveThread = Executors.newSingleThreadScheduledExecutor(
|
||||
Utils.namedThreadFactory("local-revive-thread"))
|
||||
private val reviveThread =
|
||||
ThreadUtils.newDaemonSingleThreadScheduledExecutor("local-revive-thread")
|
||||
|
||||
private var freeCores = totalCores
|
||||
|
||||
|
|
|
@ -28,7 +28,7 @@ import org.apache.spark.{Logging, SparkConf}
|
|||
import org.apache.spark.annotation.DeveloperApi
|
||||
import org.apache.spark.scheduler._
|
||||
import org.apache.spark.storage.BlockManagerMessages._
|
||||
import org.apache.spark.util.Utils
|
||||
import org.apache.spark.util.{ThreadUtils, Utils}
|
||||
|
||||
/**
|
||||
* BlockManagerMasterEndpoint is an [[ThreadSafeRpcEndpoint]] on the master node to track statuses
|
||||
|
@ -51,7 +51,7 @@ class BlockManagerMasterEndpoint(
|
|||
// Mapping from block id to the set of block managers that have the block.
|
||||
private val blockLocations = new JHashMap[BlockId, mutable.HashSet[BlockManagerId]]
|
||||
|
||||
private val askThreadPool = Utils.newDaemonCachedThreadPool("block-manager-ask-thread-pool")
|
||||
private val askThreadPool = ThreadUtils.newDaemonCachedThreadPool("block-manager-ask-thread-pool")
|
||||
private implicit val askExecutionContext = ExecutionContext.fromExecutorService(askThreadPool)
|
||||
|
||||
override def receiveAndReply(context: RpcCallContext): PartialFunction[Any, Unit] = {
|
||||
|
|
|
@ -20,7 +20,7 @@ package org.apache.spark.storage
|
|||
import scala.concurrent.{ExecutionContext, Future}
|
||||
|
||||
import org.apache.spark.rpc.{RpcEnv, RpcCallContext, RpcEndpoint}
|
||||
import org.apache.spark.util.Utils
|
||||
import org.apache.spark.util.ThreadUtils
|
||||
import org.apache.spark.{Logging, MapOutputTracker, SparkEnv}
|
||||
import org.apache.spark.storage.BlockManagerMessages._
|
||||
|
||||
|
@ -36,7 +36,7 @@ class BlockManagerSlaveEndpoint(
|
|||
extends RpcEndpoint with Logging {
|
||||
|
||||
private val asyncThreadPool =
|
||||
Utils.newDaemonCachedThreadPool("block-manager-slave-async-thread-pool")
|
||||
ThreadUtils.newDaemonCachedThreadPool("block-manager-slave-async-thread-pool")
|
||||
private implicit val asyncExecutionContext = ExecutionContext.fromExecutorService(asyncThreadPool)
|
||||
|
||||
// Operations that involve removing blocks may be slow and should be done asynchronously
|
||||
|
|
67
core/src/main/scala/org/apache/spark/util/ThreadUtils.scala
Normal file
67
core/src/main/scala/org/apache/spark/util/ThreadUtils.scala
Normal file
|
@ -0,0 +1,67 @@
|
|||
/*
|
||||
* Licensed to the Apache Software Foundation (ASF) under one or more
|
||||
* contributor license agreements. See the NOTICE file distributed with
|
||||
* this work for additional information regarding copyright ownership.
|
||||
* The ASF licenses this file to You under the Apache License, Version 2.0
|
||||
* (the "License"); you may not use this file except in compliance with
|
||||
* the License. You may obtain a copy of the License at
|
||||
*
|
||||
* http://www.apache.org/licenses/LICENSE-2.0
|
||||
*
|
||||
* Unless required by applicable law or agreed to in writing, software
|
||||
* distributed under the License is distributed on an "AS IS" BASIS,
|
||||
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
|
||||
* See the License for the specific language governing permissions and
|
||||
* limitations under the License.
|
||||
*/
|
||||
|
||||
|
||||
package org.apache.spark.util
|
||||
|
||||
import java.util.concurrent._
|
||||
|
||||
import com.google.common.util.concurrent.ThreadFactoryBuilder
|
||||
|
||||
private[spark] object ThreadUtils {
|
||||
|
||||
/**
|
||||
* Create a thread factory that names threads with a prefix and also sets the threads to daemon.
|
||||
*/
|
||||
def namedThreadFactory(prefix: String): ThreadFactory = {
|
||||
new ThreadFactoryBuilder().setDaemon(true).setNameFormat(prefix + "-%d").build()
|
||||
}
|
||||
|
||||
/**
|
||||
* Wrapper over newCachedThreadPool. Thread names are formatted as prefix-ID, where ID is a
|
||||
* unique, sequentially assigned integer.
|
||||
*/
|
||||
def newDaemonCachedThreadPool(prefix: String): ThreadPoolExecutor = {
|
||||
val threadFactory = namedThreadFactory(prefix)
|
||||
Executors.newCachedThreadPool(threadFactory).asInstanceOf[ThreadPoolExecutor]
|
||||
}
|
||||
|
||||
/**
|
||||
* Wrapper over newFixedThreadPool. Thread names are formatted as prefix-ID, where ID is a
|
||||
* unique, sequentially assigned integer.
|
||||
*/
|
||||
def newDaemonFixedThreadPool(nThreads: Int, prefix: String): ThreadPoolExecutor = {
|
||||
val threadFactory = namedThreadFactory(prefix)
|
||||
Executors.newFixedThreadPool(nThreads, threadFactory).asInstanceOf[ThreadPoolExecutor]
|
||||
}
|
||||
|
||||
/**
|
||||
* Wrapper over newSingleThreadExecutor.
|
||||
*/
|
||||
def newDaemonSingleThreadExecutor(threadName: String): ExecutorService = {
|
||||
val threadFactory = new ThreadFactoryBuilder().setDaemon(true).setNameFormat(threadName).build()
|
||||
Executors.newSingleThreadExecutor(threadFactory)
|
||||
}
|
||||
|
||||
/**
|
||||
* Wrapper over newSingleThreadScheduledExecutor.
|
||||
*/
|
||||
def newDaemonSingleThreadScheduledExecutor(threadName: String): ScheduledExecutorService = {
|
||||
val threadFactory = new ThreadFactoryBuilder().setDaemon(true).setNameFormat(threadName).build()
|
||||
Executors.newSingleThreadScheduledExecutor(threadFactory)
|
||||
}
|
||||
}
|
|
@ -35,7 +35,6 @@ import scala.util.control.{ControlThrowable, NonFatal}
|
|||
|
||||
import com.google.common.io.{ByteStreams, Files}
|
||||
import com.google.common.net.InetAddresses
|
||||
import com.google.common.util.concurrent.ThreadFactoryBuilder
|
||||
import org.apache.commons.lang3.SystemUtils
|
||||
import org.apache.hadoop.conf.Configuration
|
||||
import org.apache.hadoop.fs.{FileSystem, FileUtil, Path}
|
||||
|
@ -897,34 +896,6 @@ private[spark] object Utils extends Logging {
|
|||
hostPortParseResults.get(hostPort)
|
||||
}
|
||||
|
||||
private val daemonThreadFactoryBuilder: ThreadFactoryBuilder =
|
||||
new ThreadFactoryBuilder().setDaemon(true)
|
||||
|
||||
/**
|
||||
* Create a thread factory that names threads with a prefix and also sets the threads to daemon.
|
||||
*/
|
||||
def namedThreadFactory(prefix: String): ThreadFactory = {
|
||||
daemonThreadFactoryBuilder.setNameFormat(prefix + "-%d").build()
|
||||
}
|
||||
|
||||
/**
|
||||
* Wrapper over newCachedThreadPool. Thread names are formatted as prefix-ID, where ID is a
|
||||
* unique, sequentially assigned integer.
|
||||
*/
|
||||
def newDaemonCachedThreadPool(prefix: String): ThreadPoolExecutor = {
|
||||
val threadFactory = namedThreadFactory(prefix)
|
||||
Executors.newCachedThreadPool(threadFactory).asInstanceOf[ThreadPoolExecutor]
|
||||
}
|
||||
|
||||
/**
|
||||
* Wrapper over newFixedThreadPool. Thread names are formatted as prefix-ID, where ID is a
|
||||
* unique, sequentially assigned integer.
|
||||
*/
|
||||
def newDaemonFixedThreadPool(nThreads: Int, prefix: String): ThreadPoolExecutor = {
|
||||
val threadFactory = namedThreadFactory(prefix)
|
||||
Executors.newFixedThreadPool(nThreads, threadFactory).asInstanceOf[ThreadPoolExecutor]
|
||||
}
|
||||
|
||||
/**
|
||||
* Return the string to tell how long has passed in milliseconds.
|
||||
*/
|
||||
|
|
|
@ -0,0 +1,57 @@
|
|||
/*
|
||||
* Licensed to the Apache Software Foundation (ASF) under one or more
|
||||
* contributor license agreements. See the NOTICE file distributed with
|
||||
* this work for additional information regarding copyright ownership.
|
||||
* The ASF licenses this file to You under the Apache License, Version 2.0
|
||||
* (the "License"); you may not use this file except in compliance with
|
||||
* the License. You may obtain a copy of the License at
|
||||
*
|
||||
* http://www.apache.org/licenses/LICENSE-2.0
|
||||
*
|
||||
* Unless required by applicable law or agreed to in writing, software
|
||||
* distributed under the License is distributed on an "AS IS" BASIS,
|
||||
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
|
||||
* See the License for the specific language governing permissions and
|
||||
* limitations under the License.
|
||||
*/
|
||||
|
||||
|
||||
package org.apache.spark.util
|
||||
|
||||
import java.util.concurrent.{CountDownLatch, TimeUnit}
|
||||
|
||||
import org.scalatest.FunSuite
|
||||
|
||||
class ThreadUtilsSuite extends FunSuite {
|
||||
|
||||
test("newDaemonSingleThreadExecutor") {
|
||||
val executor = ThreadUtils.newDaemonSingleThreadExecutor("this-is-a-thread-name")
|
||||
@volatile var threadName = ""
|
||||
executor.submit(new Runnable {
|
||||
override def run(): Unit = {
|
||||
threadName = Thread.currentThread().getName()
|
||||
}
|
||||
})
|
||||
executor.shutdown()
|
||||
executor.awaitTermination(10, TimeUnit.SECONDS)
|
||||
assert(threadName === "this-is-a-thread-name")
|
||||
}
|
||||
|
||||
test("newDaemonSingleThreadScheduledExecutor") {
|
||||
val executor = ThreadUtils.newDaemonSingleThreadScheduledExecutor("this-is-a-thread-name")
|
||||
try {
|
||||
val latch = new CountDownLatch(1)
|
||||
@volatile var threadName = ""
|
||||
executor.schedule(new Runnable {
|
||||
override def run(): Unit = {
|
||||
threadName = Thread.currentThread().getName()
|
||||
latch.countDown()
|
||||
}
|
||||
}, 1, TimeUnit.MILLISECONDS)
|
||||
latch.await(10, TimeUnit.SECONDS)
|
||||
assert(threadName === "this-is-a-thread-name")
|
||||
} finally {
|
||||
executor.shutdownNow()
|
||||
}
|
||||
}
|
||||
}
|
|
@ -31,7 +31,7 @@ import org.apache.spark.storage.StorageLevel
|
|||
import org.apache.spark.streaming.StreamingContext
|
||||
import org.apache.spark.streaming.dstream._
|
||||
import org.apache.spark.streaming.receiver.Receiver
|
||||
import org.apache.spark.util.Utils
|
||||
import org.apache.spark.util.ThreadUtils
|
||||
|
||||
/**
|
||||
* Input stream that pulls messages from a Kafka Broker.
|
||||
|
@ -111,7 +111,8 @@ class KafkaReceiver[
|
|||
val topicMessageStreams = consumerConnector.createMessageStreams(
|
||||
topics, keyDecoder, valueDecoder)
|
||||
|
||||
val executorPool = Utils.newDaemonFixedThreadPool(topics.values.sum, "KafkaMessageHandler")
|
||||
val executorPool =
|
||||
ThreadUtils.newDaemonFixedThreadPool(topics.values.sum, "KafkaMessageHandler")
|
||||
try {
|
||||
// Start the messages handler for each partition
|
||||
topicMessageStreams.values.foreach { streams =>
|
||||
|
|
|
@ -33,7 +33,7 @@ import org.I0Itec.zkclient.ZkClient
|
|||
import org.apache.spark.{Logging, SparkEnv}
|
||||
import org.apache.spark.storage.{StorageLevel, StreamBlockId}
|
||||
import org.apache.spark.streaming.receiver.{BlockGenerator, BlockGeneratorListener, Receiver}
|
||||
import org.apache.spark.util.Utils
|
||||
import org.apache.spark.util.ThreadUtils
|
||||
|
||||
/**
|
||||
* ReliableKafkaReceiver offers the ability to reliably store data into BlockManager without loss.
|
||||
|
@ -121,7 +121,7 @@ class ReliableKafkaReceiver[
|
|||
zkClient = new ZkClient(consumerConfig.zkConnect, consumerConfig.zkSessionTimeoutMs,
|
||||
consumerConfig.zkConnectionTimeoutMs, ZKStringSerializer)
|
||||
|
||||
messageHandlerThreadPool = Utils.newDaemonFixedThreadPool(
|
||||
messageHandlerThreadPool = ThreadUtils.newDaemonFixedThreadPool(
|
||||
topics.values.sum, "KafkaMessageHandler")
|
||||
|
||||
blockGenerator.start()
|
||||
|
|
|
@ -28,7 +28,7 @@ import org.apache.hadoop.fs.Path
|
|||
import org.apache.spark.{Logging, SparkConf, SparkException}
|
||||
import org.apache.spark.storage._
|
||||
import org.apache.spark.streaming.util.{WriteAheadLogFileSegment, WriteAheadLogManager}
|
||||
import org.apache.spark.util.{Clock, SystemClock, Utils}
|
||||
import org.apache.spark.util.{ThreadUtils, Clock, SystemClock}
|
||||
|
||||
/** Trait that represents the metadata related to storage of blocks */
|
||||
private[streaming] trait ReceivedBlockStoreResult {
|
||||
|
@ -150,7 +150,7 @@ private[streaming] class WriteAheadLogBasedBlockHandler(
|
|||
// For processing futures used in parallel block storing into block manager and write ahead log
|
||||
// # threads = 2, so that both writing to BM and WAL can proceed in parallel
|
||||
implicit private val executionContext = ExecutionContext.fromExecutorService(
|
||||
Utils.newDaemonFixedThreadPool(2, this.getClass.getSimpleName))
|
||||
ThreadUtils.newDaemonFixedThreadPool(2, this.getClass.getSimpleName))
|
||||
|
||||
/**
|
||||
* This implementation stores the block into the block manager as well as a write ahead log.
|
||||
|
|
|
@ -25,7 +25,7 @@ import scala.language.postfixOps
|
|||
import org.apache.hadoop.conf.Configuration
|
||||
import org.apache.hadoop.fs.Path
|
||||
import org.apache.spark.Logging
|
||||
import org.apache.spark.util.{Clock, SystemClock, Utils}
|
||||
import org.apache.spark.util.{ThreadUtils, Clock, SystemClock}
|
||||
import WriteAheadLogManager._
|
||||
|
||||
/**
|
||||
|
@ -60,7 +60,7 @@ private[streaming] class WriteAheadLogManager(
|
|||
if (callerName.nonEmpty) s" for $callerName" else ""
|
||||
private val threadpoolName = s"WriteAheadLogManager $callerNameTag"
|
||||
implicit private val executionContext = ExecutionContext.fromExecutorService(
|
||||
Utils.newDaemonFixedThreadPool(1, threadpoolName))
|
||||
ThreadUtils.newDaemonSingleThreadExecutor(threadpoolName))
|
||||
override protected val logName = s"WriteAheadLogManager $callerNameTag"
|
||||
|
||||
private var currentLogPath: Option[String] = None
|
||||
|
|
Loading…
Reference in a new issue