[SPARK-3607] ConnectionManager threads.max configs on the thread pools don't work

Hi all - cleaned up the code to get rid of the unused parameter and added some discussion of the ThreadPoolExecutor parameters to explain why we can use a single threadCount instead of providing a min/max.

Author: Ilya Ganelin <ilya.ganelin@capitalone.com>

Closes #3664 from ilganeli/SPARK-3607C and squashes the following commits:

3c05690 [Ilya Ganelin] Updated documentation and refactored code to extract shared variables
This commit is contained in:
Ilya Ganelin 2014-12-18 12:53:18 -08:00 committed by Reynold Xin
parent d9956f86ad
commit 3720057b8e

View file

@ -83,9 +83,21 @@ private[nio] class ConnectionManager(
private val ackTimeout = conf.getInt("spark.core.connection.ack.wait.timeout", 60)
// Get the thread counts from the Spark Configuration.
//
// Even though the ThreadPoolExecutor constructor takes both a minimum and maximum value,
// we only query for the minimum value because we are using LinkedBlockingDeque.
//
// The JavaDoc for ThreadPoolExecutor points out that when using a LinkedBlockingDeque (which is
// an unbounded queue) no more than corePoolSize threads will ever be created, so only the "min"
// parameter is necessary.
private val handlerThreadCount = conf.getInt("spark.core.connection.handler.threads.min", 20)
private val ioThreadCount = conf.getInt("spark.core.connection.io.threads.min", 4)
private val connectThreadCount = conf.getInt("spark.core.connection.connect.threads.min", 1)
private val handleMessageExecutor = new ThreadPoolExecutor(
conf.getInt("spark.core.connection.handler.threads.min", 20),
conf.getInt("spark.core.connection.handler.threads.max", 60),
handlerThreadCount,
handlerThreadCount,
conf.getInt("spark.core.connection.handler.threads.keepalive", 60), TimeUnit.SECONDS,
new LinkedBlockingDeque[Runnable](),
Utils.namedThreadFactory("handle-message-executor")) {
@ -96,12 +108,11 @@ private[nio] class ConnectionManager(
logError("Error in handleMessageExecutor is not handled properly", t)
}
}
}
private val handleReadWriteExecutor = new ThreadPoolExecutor(
conf.getInt("spark.core.connection.io.threads.min", 4),
conf.getInt("spark.core.connection.io.threads.max", 32),
ioThreadCount,
ioThreadCount,
conf.getInt("spark.core.connection.io.threads.keepalive", 60), TimeUnit.SECONDS,
new LinkedBlockingDeque[Runnable](),
Utils.namedThreadFactory("handle-read-write-executor")) {
@ -112,14 +123,13 @@ private[nio] class ConnectionManager(
logError("Error in handleReadWriteExecutor is not handled properly", t)
}
}
}
// Use a different, yet smaller, thread pool - infrequently used with very short lived tasks :
// which should be executed asap
private val handleConnectExecutor = new ThreadPoolExecutor(
conf.getInt("spark.core.connection.connect.threads.min", 1),
conf.getInt("spark.core.connection.connect.threads.max", 8),
connectThreadCount,
connectThreadCount,
conf.getInt("spark.core.connection.connect.threads.keepalive", 60), TimeUnit.SECONDS,
new LinkedBlockingDeque[Runnable](),
Utils.namedThreadFactory("handle-connect-executor")) {
@ -130,7 +140,6 @@ private[nio] class ConnectionManager(
logError("Error in handleConnectExecutor is not handled properly", t)
}
}
}
private val serverChannel = ServerSocketChannel.open()