Be more aggressive and defensive in all uses of SelectionKey in select loop

This commit is contained in:
Mridul Muralidharan 2013-05-01 00:30:30 +05:30
parent 0f45477be1
commit 3b748ced22

View file

@ -232,24 +232,37 @@ private[spark] class ConnectionManager(port: Int) extends Logging {
while(!keyInterestChangeRequests.isEmpty) {
val (key, ops) = keyInterestChangeRequests.dequeue
val connection = connectionsByKey.getOrElse(key, null)
if (connection != null) {
val lastOps = key.interestOps()
key.interestOps(ops)
// hot loop - prevent materialization of string if trace not enabled.
if (isTraceEnabled()) {
def intToOpStr(op: Int): String = {
val opStrs = ArrayBuffer[String]()
if ((op & SelectionKey.OP_READ) != 0) opStrs += "READ"
if ((op & SelectionKey.OP_WRITE) != 0) opStrs += "WRITE"
if ((op & SelectionKey.OP_CONNECT) != 0) opStrs += "CONNECT"
if ((op & SelectionKey.OP_ACCEPT) != 0) opStrs += "ACCEPT"
if (opStrs.size > 0) opStrs.reduceLeft(_ + " | " + _) else " "
try {
if (key.isValid) {
val connection = connectionsByKey.getOrElse(key, null)
if (connection != null) {
val lastOps = key.interestOps()
key.interestOps(ops)
// hot loop - prevent materialization of string if trace not enabled.
if (isTraceEnabled()) {
def intToOpStr(op: Int): String = {
val opStrs = ArrayBuffer[String]()
if ((op & SelectionKey.OP_READ) != 0) opStrs += "READ"
if ((op & SelectionKey.OP_WRITE) != 0) opStrs += "WRITE"
if ((op & SelectionKey.OP_CONNECT) != 0) opStrs += "CONNECT"
if ((op & SelectionKey.OP_ACCEPT) != 0) opStrs += "ACCEPT"
if (opStrs.size > 0) opStrs.reduceLeft(_ + " | " + _) else " "
}
logTrace("Changed key for connection to [" + connection.getRemoteConnectionManagerId() +
"] changed from [" + intToOpStr(lastOps) + "] to [" + intToOpStr(ops) + "]")
}
}
logTrace("Changed key for connection to [" + connection.getRemoteConnectionManagerId() +
"] changed from [" + intToOpStr(lastOps) + "] to [" + intToOpStr(ops) + "]")
} else {
logInfo("Key not valid ? " + key)
throw new CancelledKeyException()
}
} catch {
case e: CancelledKeyException => {
logInfo("key already cancelled ? " + key, e)
triggerForceCloseByException(key, e)
}
}
}