Every thread is a daemon thread => Program shuts down ASAP once its done.

Some log messages added to blank exception handlers.
This commit is contained in:
Mosharaf Chowdhury 2010-11-09 16:44:42 -08:00
parent 6cb76d9708
commit 50612ab3a9

View file

@ -6,7 +6,7 @@ import java.util.{BitSet, Comparator, Random, Timer, TimerTask, UUID}
import com.google.common.collect.MapMaker
import java.util.concurrent.{Executors, ExecutorService, ThreadPoolExecutor}
import java.util.concurrent.{Executors, ThreadFactory, ThreadPoolExecutor}
import scala.collection.mutable.{ListBuffer, Map, Set}
@ -374,7 +374,9 @@ extends BroadcastRecipe with Logging {
oosTracker.flush
gInfo = oisTracker.readObject.asInstanceOf[SourceInfo]
} catch {
case e: Exception => { }
case e: Exception => {
logInfo ("getGuideInfo had a " + e)
}
} finally {
if (oisTracker != null) {
oisTracker.close
@ -454,8 +456,7 @@ extends BroadcastRecipe with Logging {
override def run = {
var threadPool =
Executors.newFixedThreadPool(
BroadcastBT.MaxTxPeers).asInstanceOf[ThreadPoolExecutor]
BroadcastBT.newDaemonFixedThreadPool (BroadcastBT.MaxTxPeers)
while (hasBlocks < totalBlocks) {
var numThreadsToCreate =
@ -636,8 +637,7 @@ extends BroadcastRecipe with Logging {
private var setOfCompletedSources = Set[SourceInfo] ()
override def run = {
// TODO: Cached threadpool has 60s keep alive timer
var threadPool = Executors.newCachedThreadPool
var threadPool = BroadcastBT.newDaemonCachedThreadPool
var serverSocket: ServerSocket = null
serverSocket = new ServerSocket (0)
@ -722,7 +722,9 @@ extends BroadcastRecipe with Logging {
SourceInfo.UnusedParam, SourceInfo.UnusedParam))
gosSource.flush
} catch {
case e: Exception => { }
case e: Exception => {
logInfo ("sendStopBroadcastNotifications had a " + e)
}
} finally {
if (gisSource != null) {
gisSource.close
@ -816,7 +818,7 @@ extends BroadcastRecipe with Logging {
// TODO: Not sure if this will be able to fix the number of outgoing links
// We should have a timeout mechanism on the receiver side
var threadPool =
Executors.newFixedThreadPool(BroadcastBT.MaxRxPeers)
BroadcastBT.newDaemonFixedThreadPool(BroadcastBT.MaxRxPeers)
var serverSocket = new ServerSocket (0)
listenPort = serverSocket.getLocalPort
@ -959,7 +961,9 @@ extends BroadcastRecipe with Logging {
oos.writeObject (arrayOfBlocks(blockIndex))
oos.flush
} catch {
case e: Exception => { }
case e: Exception => {
logInfo ("pickAndSendBlock had a " + e)
}
}
logInfo ("Sent block: " + blockIndex + " to " + clientSocket)
}
@ -1225,12 +1229,51 @@ extends Logging {
}
}
// Returns a standard ThreadFactory except all threads are daemons
private def newDaemonThreadFactory: ThreadFactory = {
new ThreadFactory {
def newThread(r: Runnable): Thread = {
var t = Executors.defaultThreadFactory.newThread(r)
t.setDaemon(true)
return t
}
}
}
// Wrapper over newCachedThreadPool
def newDaemonCachedThreadPool: ThreadPoolExecutor = {
var threadPool =
Executors.newCachedThreadPool.asInstanceOf[ThreadPoolExecutor]
threadPool.setThreadFactory (newDaemonThreadFactory)
return threadPool
}
// Wrapper over newFixedThreadPool
def newDaemonFixedThreadPool (nThreads: Int): ThreadPoolExecutor = {
var threadPool =
Executors.newFixedThreadPool (nThreads).asInstanceOf[ThreadPoolExecutor]
threadPool.setThreadFactory (newDaemonThreadFactory)
return threadPool
}
class TrackMultipleValues
extends Thread with Logging {
var stopTracker = false
override def run = {
var threadPool = Executors.newCachedThreadPool
var myThreadFactory = new ThreadFactory {
def newThread(r: Runnable): Thread = {
var t = Executors.defaultThreadFactory.newThread(r)
t.setDaemon(true)
return t
}
}
var threadPool = BroadcastBT.newDaemonCachedThreadPool
var serverSocket: ServerSocket = null
serverSocket = new ServerSocket (BroadcastBT.MasterTrackerPort)
@ -1246,7 +1289,6 @@ extends Logging {
} catch {
case e: Exception => {
logInfo ("TrackMultipleValues Timeout. Stopping listening...")
// TODO: Tracking should be explicitly stopped by the SparkContext
stopTracker = true
}
}
@ -1265,10 +1307,12 @@ extends Logging {
valueToGuideMap (uuid)
} else SourceInfo ("", SourceInfo.TxNotStartedRetry,
SourceInfo.UnusedParam, SourceInfo.UnusedParam)
logInfo ("TrackMultipleValues:Got new request: " + clientSocket + " for " + uuid + " : " + gInfo.listenPort)
logInfo ("TrackMultipleValues: Got new request: " + clientSocket + " for " + uuid + " : " + gInfo.listenPort)
oos.writeObject (gInfo)
} catch {
case e: Exception => { }
case e: Exception => {
logInfo ("TrackMultipleValues had a " + e)
}
} finally {
ois.close
oos.close