diff --git a/src/scala/spark/Broadcast.scala b/src/scala/spark/Broadcast.scala index 5b1ea156db..476426b674 100644 --- a/src/scala/spark/Broadcast.scala +++ b/src/scala/spark/Broadcast.scala @@ -6,7 +6,7 @@ import java.util.{BitSet, Comparator, Random, Timer, TimerTask, UUID} import com.google.common.collect.MapMaker -import java.util.concurrent.{Executors, ExecutorService, ThreadPoolExecutor} +import java.util.concurrent.{Executors, ThreadFactory, ThreadPoolExecutor} import scala.collection.mutable.{ListBuffer, Map, Set} @@ -374,7 +374,9 @@ extends BroadcastRecipe with Logging { oosTracker.flush gInfo = oisTracker.readObject.asInstanceOf[SourceInfo] } catch { - case e: Exception => { } + case e: Exception => { + logInfo ("getGuideInfo had a " + e) + } } finally { if (oisTracker != null) { oisTracker.close @@ -454,8 +456,7 @@ extends BroadcastRecipe with Logging { override def run = { var threadPool = - Executors.newFixedThreadPool( - BroadcastBT.MaxTxPeers).asInstanceOf[ThreadPoolExecutor] + BroadcastBT.newDaemonFixedThreadPool (BroadcastBT.MaxTxPeers) while (hasBlocks < totalBlocks) { var numThreadsToCreate = @@ -636,8 +637,7 @@ extends BroadcastRecipe with Logging { private var setOfCompletedSources = Set[SourceInfo] () override def run = { - // TODO: Cached threadpool has 60s keep alive timer - var threadPool = Executors.newCachedThreadPool + var threadPool = BroadcastBT.newDaemonCachedThreadPool var serverSocket: ServerSocket = null serverSocket = new ServerSocket (0) @@ -722,7 +722,9 @@ extends BroadcastRecipe with Logging { SourceInfo.UnusedParam, SourceInfo.UnusedParam)) gosSource.flush } catch { - case e: Exception => { } + case e: Exception => { + logInfo ("sendStopBroadcastNotifications had a " + e) + } } finally { if (gisSource != null) { gisSource.close @@ -816,7 +818,7 @@ extends BroadcastRecipe with Logging { // TODO: Not sure if this will be able to fix the number of outgoing links // We should have a timeout mechanism on the receiver side var threadPool = - Executors.newFixedThreadPool(BroadcastBT.MaxRxPeers) + BroadcastBT.newDaemonFixedThreadPool(BroadcastBT.MaxRxPeers) var serverSocket = new ServerSocket (0) listenPort = serverSocket.getLocalPort @@ -959,7 +961,9 @@ extends BroadcastRecipe with Logging { oos.writeObject (arrayOfBlocks(blockIndex)) oos.flush } catch { - case e: Exception => { } + case e: Exception => { + logInfo ("pickAndSendBlock had a " + e) + } } logInfo ("Sent block: " + blockIndex + " to " + clientSocket) } @@ -1225,12 +1229,51 @@ extends Logging { } } + // Returns a standard ThreadFactory except all threads are daemons + private def newDaemonThreadFactory: ThreadFactory = { + new ThreadFactory { + def newThread(r: Runnable): Thread = { + var t = Executors.defaultThreadFactory.newThread(r) + t.setDaemon(true) + return t + } + } + } + + // Wrapper over newCachedThreadPool + def newDaemonCachedThreadPool: ThreadPoolExecutor = { + var threadPool = + Executors.newCachedThreadPool.asInstanceOf[ThreadPoolExecutor] + + threadPool.setThreadFactory (newDaemonThreadFactory) + + return threadPool + } + + // Wrapper over newFixedThreadPool + def newDaemonFixedThreadPool (nThreads: Int): ThreadPoolExecutor = { + var threadPool = + Executors.newFixedThreadPool (nThreads).asInstanceOf[ThreadPoolExecutor] + + threadPool.setThreadFactory (newDaemonThreadFactory) + + return threadPool + } + class TrackMultipleValues extends Thread with Logging { var stopTracker = false override def run = { - var threadPool = Executors.newCachedThreadPool + var myThreadFactory = new ThreadFactory { + def newThread(r: Runnable): Thread = { + var t = Executors.defaultThreadFactory.newThread(r) + t.setDaemon(true) + return t + } + } + + var threadPool = BroadcastBT.newDaemonCachedThreadPool var serverSocket: ServerSocket = null serverSocket = new ServerSocket (BroadcastBT.MasterTrackerPort) @@ -1246,7 +1289,6 @@ extends Logging { } catch { case e: Exception => { logInfo ("TrackMultipleValues Timeout. Stopping listening...") - // TODO: Tracking should be explicitly stopped by the SparkContext stopTracker = true } } @@ -1265,10 +1307,12 @@ extends Logging { valueToGuideMap (uuid) } else SourceInfo ("", SourceInfo.TxNotStartedRetry, SourceInfo.UnusedParam, SourceInfo.UnusedParam) - logInfo ("TrackMultipleValues:Got new request: " + clientSocket + " for " + uuid + " : " + gInfo.listenPort) + logInfo ("TrackMultipleValues: Got new request: " + clientSocket + " for " + uuid + " : " + gInfo.listenPort) oos.writeObject (gInfo) } catch { - case e: Exception => { } + case e: Exception => { + logInfo ("TrackMultipleValues had a " + e) + } } finally { ois.close oos.close