Use AtomicInteger for numRunningTasks

This commit is contained in:
Andrew Or 2014-01-04 11:16:30 -08:00
parent 2db7884f6f
commit 4de9c9554c

View file

@ -17,8 +17,9 @@
package org.apache.spark package org.apache.spark
import collection.mutable import java.util.concurrent.atomic.AtomicInteger
import serializer.Serializer
import scala.collection.mutable
import akka.actor._ import akka.actor._
import akka.remote.RemoteActorRefProvider import akka.remote.RemoteActorRefProvider
@ -60,7 +61,7 @@ class SparkEnv private[spark] (
private val pythonWorkers = mutable.HashMap[(String, Map[String, String]), PythonWorkerFactory]() private val pythonWorkers = mutable.HashMap[(String, Map[String, String]), PythonWorkerFactory]()
// Number of tasks currently running across all threads // Number of tasks currently running across all threads
@volatile private var _numRunningTasks = 0 private val _numRunningTasks = new AtomicInteger(0)
// A general, soft-reference map for metadata needed during HadoopRDD split computation // A general, soft-reference map for metadata needed during HadoopRDD split computation
// (e.g., HadoopFileRDD uses this to cache JobConfs and InputFormats). // (e.g., HadoopFileRDD uses this to cache JobConfs and InputFormats).
@ -93,15 +94,9 @@ class SparkEnv private[spark] (
/** /**
* Return the number of tasks currently running across all threads * Return the number of tasks currently running across all threads
*/ */
def numRunningTasks: Int = _numRunningTasks def numRunningTasks: Int = _numRunningTasks.intValue()
def incrementNumRunningTasks(): Int = _numRunningTasks.incrementAndGet()
def incrementNumRunningTasks() = synchronized { def decrementNumRunningTasks(): Int = _numRunningTasks.decrementAndGet()
_numRunningTasks += 1
}
def decrementNumRunningTasks() = synchronized {
_numRunningTasks -= 1
}
} }
object SparkEnv extends Logging { object SparkEnv extends Logging {