Removing initLogging entirely
This commit is contained in:
parent
1cbef081e3
commit
18181e6c41
|
@ -46,6 +46,7 @@ private[spark] class HttpServer(resourceBase: File) extends Logging {
|
|||
if (server != null) {
|
||||
throw new ServerStateException("Server is already started")
|
||||
} else {
|
||||
log.info("Starting HTTP Server")
|
||||
server = new Server()
|
||||
val connector = new SocketConnector
|
||||
connector.setMaxIdleTime(60*1000)
|
||||
|
|
|
@ -33,6 +33,7 @@ trait Logging {
|
|||
// Method to get or create the logger for this object
|
||||
protected def log: Logger = {
|
||||
if (log_ == null) {
|
||||
initializeIfNecessary()
|
||||
var className = this.getClass.getName
|
||||
// Ignore trailing $'s in the class names for Scala objects
|
||||
if (className.endsWith("$")) {
|
||||
|
@ -89,9 +90,15 @@ trait Logging {
|
|||
log.isTraceEnabled
|
||||
}
|
||||
|
||||
// Method for ensuring that logging is initialized, to avoid having multiple
|
||||
// threads do it concurrently (as SLF4J initialization is not thread safe).
|
||||
protected def initLogging() {
|
||||
private def initializeIfNecessary() {
|
||||
Logging.initLock.synchronized {
|
||||
if (!Logging.initialized) {
|
||||
initializeLogging()
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
private def initializeLogging() {
|
||||
// If Log4j doesn't seem initialized, load a default properties file
|
||||
val log4jInitialized = LogManager.getRootLogger.getAllAppenders.hasMoreElements
|
||||
if (!log4jInitialized) {
|
||||
|
@ -101,7 +108,17 @@ trait Logging {
|
|||
case Some(url) => PropertyConfigurator.configure(url)
|
||||
case None => System.err.println(s"Spark was unable to load $defaultLogProps")
|
||||
}
|
||||
log.info(s"Using Spark's default log4j profile: $defaultLogProps")
|
||||
}
|
||||
Logging.initialized = true
|
||||
|
||||
// Force a call into slf4j to initialize it avoids this happening from mutliple threads
|
||||
// and triggering this: http://mailman.qos.ch/pipermail/slf4j-dev/2010-April/002956.html
|
||||
log
|
||||
}
|
||||
}
|
||||
|
||||
object Logging {
|
||||
@transient @volatile private var initialized = false
|
||||
@transient val initLock = new Object()
|
||||
}
|
|
@ -88,9 +88,6 @@ class SparkContext(
|
|||
scala.collection.immutable.Map())
|
||||
extends Logging {
|
||||
|
||||
// Ensure logging is initialized before we spawn any threads
|
||||
initLogging()
|
||||
|
||||
// Set Spark driver host and port system properties
|
||||
if (System.getProperty("spark.driver.host") == null) {
|
||||
System.setProperty("spark.driver.host", Utils.localHostName())
|
||||
|
|
|
@ -48,8 +48,6 @@ private[spark] class Executor(
|
|||
|
||||
private val EMPTY_BYTE_BUFFER = ByteBuffer.wrap(new Array[Byte](0))
|
||||
|
||||
initLogging()
|
||||
|
||||
// No ip or host:port - just hostname
|
||||
Utils.checkHost(slaveHostname, "Expected executed slave to be a hostname")
|
||||
// must not have port specified.
|
||||
|
|
|
@ -26,7 +26,6 @@ import scala.util.matching.Regex
|
|||
import org.apache.spark.Logging
|
||||
|
||||
private[spark] class MetricsConfig(val configFile: Option[String]) extends Logging {
|
||||
initLogging()
|
||||
|
||||
val DEFAULT_PREFIX = "*"
|
||||
val INSTANCE_REGEX = "^(\\*|[a-zA-Z]+)\\.(.+)".r
|
||||
|
|
|
@ -63,7 +63,6 @@ import org.apache.spark.metrics.source.Source
|
|||
* [options] is the specific property of this source or sink.
|
||||
*/
|
||||
private[spark] class MetricsSystem private (val instance: String) extends Logging {
|
||||
initLogging()
|
||||
|
||||
val confFile = System.getProperty("spark.metrics.conf")
|
||||
val metricsConfig = new MetricsConfig(Option(confFile))
|
||||
|
|
|
@ -50,8 +50,6 @@ class BlockManagerMasterActor(val isLocal: Boolean) extends Actor with Logging {
|
|||
|
||||
private val akkaTimeout = AkkaUtils.askTimeout
|
||||
|
||||
initLogging()
|
||||
|
||||
val slaveTimeout = System.getProperty("spark.storage.blockManagerSlaveTimeoutMs",
|
||||
"" + (BlockManager.getHeartBeatFrequencyFromSystemProperties * 3)).toLong
|
||||
|
||||
|
|
|
@ -30,7 +30,6 @@ import org.apache.spark.util.Utils
|
|||
* TODO: Use event model.
|
||||
*/
|
||||
private[spark] class BlockManagerWorker(val blockManager: BlockManager) extends Logging {
|
||||
initLogging()
|
||||
|
||||
blockManager.connectionManager.onReceiveMessage(onBlockMessageReceive)
|
||||
|
||||
|
@ -101,8 +100,6 @@ private[spark] class BlockManagerWorker(val blockManager: BlockManager) extends
|
|||
private[spark] object BlockManagerWorker extends Logging {
|
||||
private var blockManagerWorker: BlockManagerWorker = null
|
||||
|
||||
initLogging()
|
||||
|
||||
def startBlockManagerWorker(manager: BlockManager) {
|
||||
blockManagerWorker = new BlockManagerWorker(manager)
|
||||
}
|
||||
|
|
|
@ -37,8 +37,6 @@ class BlockMessageArray(var blockMessages: Seq[BlockMessage]) extends Seq[BlockM
|
|||
|
||||
def length = blockMessages.length
|
||||
|
||||
initLogging()
|
||||
|
||||
def set(bufferMessage: BufferMessage) {
|
||||
val startTime = System.currentTimeMillis
|
||||
val newBlockMessages = new ArrayBuffer[BlockMessage]()
|
||||
|
|
|
@ -60,8 +60,6 @@ class SparkILoop(in0: Option[BufferedReader], protected val out: JPrintWriter,
|
|||
def this(in0: BufferedReader, out: JPrintWriter) = this(Some(in0), out, None)
|
||||
def this() = this(None, new JPrintWriter(Console.out, true), None)
|
||||
|
||||
initLogging()
|
||||
|
||||
var in: InteractiveReader = _ // the input stream from which commands come
|
||||
var settings: Settings = _
|
||||
var intp: SparkIMain = _
|
||||
|
|
|
@ -56,8 +56,6 @@ abstract class DStream[T: ClassTag] (
|
|||
@transient protected[streaming] var ssc: StreamingContext
|
||||
) extends Serializable with Logging {
|
||||
|
||||
initLogging()
|
||||
|
||||
// =======================================================================
|
||||
// Methods that should be implemented by subclasses of DStream
|
||||
// =======================================================================
|
||||
|
|
|
@ -24,7 +24,6 @@ import org.apache.spark.Logging
|
|||
import org.apache.spark.streaming.scheduler.Job
|
||||
|
||||
final private[streaming] class DStreamGraph extends Serializable with Logging {
|
||||
initLogging()
|
||||
|
||||
private val inputStreams = new ArrayBuffer[InputDStream[_]]()
|
||||
private val outputStreams = new ArrayBuffer[DStream[_]]()
|
||||
|
|
|
@ -95,8 +95,6 @@ class StreamingContext private (
|
|||
*/
|
||||
def this(path: String) = this(null, CheckpointReader.read(path), null)
|
||||
|
||||
initLogging()
|
||||
|
||||
if (sc_ == null && cp_ == null) {
|
||||
throw new Exception("Spark Streaming cannot be initialized with " +
|
||||
"both SparkContext and checkpoint as null")
|
||||
|
|
|
@ -88,8 +88,6 @@ private[streaming] case class ReportError(msg: String) extends NetworkReceiverMe
|
|||
*/
|
||||
abstract class NetworkReceiver[T: ClassTag]() extends Serializable with Logging {
|
||||
|
||||
initLogging()
|
||||
|
||||
lazy protected val env = SparkEnv.get
|
||||
|
||||
lazy protected val actor = env.actorSystem.actorOf(
|
||||
|
|
|
@ -29,7 +29,6 @@ import org.apache.spark.streaming.util.{ManualClock, RecurringTimer, Clock}
|
|||
private[streaming]
|
||||
class JobGenerator(jobScheduler: JobScheduler) extends Logging {
|
||||
|
||||
initLogging()
|
||||
val ssc = jobScheduler.ssc
|
||||
val clockClass = System.getProperty(
|
||||
"spark.streaming.clock", "org.apache.spark.streaming.util.SystemClock")
|
||||
|
|
|
@ -30,8 +30,6 @@ import org.apache.spark.streaming._
|
|||
private[streaming]
|
||||
class JobScheduler(val ssc: StreamingContext) extends Logging {
|
||||
|
||||
initLogging()
|
||||
|
||||
val jobSets = new ConcurrentHashMap[Time, JobSet]
|
||||
val numConcurrentJobs = System.getProperty("spark.streaming.concurrentJobs", "1").toInt
|
||||
val executor = Executors.newFixedThreadPool(numConcurrentJobs)
|
||||
|
|
|
@ -39,7 +39,6 @@ import org.apache.hadoop.conf.Configuration
|
|||
|
||||
private[streaming]
|
||||
object MasterFailureTest extends Logging {
|
||||
initLogging()
|
||||
|
||||
@volatile var killed = false
|
||||
@volatile var killCount = 0
|
||||
|
@ -331,7 +330,6 @@ class TestOutputStream[T: ClassTag](
|
|||
*/
|
||||
private[streaming]
|
||||
class KillingThread(ssc: StreamingContext, maxKillWaitTime: Long) extends Thread with Logging {
|
||||
initLogging()
|
||||
|
||||
override def run() {
|
||||
try {
|
||||
|
@ -366,7 +364,6 @@ class KillingThread(ssc: StreamingContext, maxKillWaitTime: Long) extends Thread
|
|||
private[streaming]
|
||||
class FileGeneratingThread(input: Seq[String], testDir: Path, interval: Long)
|
||||
extends Thread with Logging {
|
||||
initLogging()
|
||||
|
||||
override def run() {
|
||||
val localTestDir = Files.createTempDir()
|
||||
|
|
Loading…
Reference in a new issue