Set default logging to WARN for Spark streaming examples.

This programatically sets the log level to WARN by default for streaming
tests. If the user has already specified a log4j.properties file,
the user's file will take precedence over this default.
This commit is contained in:
Patrick Wendell 2014-01-08 20:54:24 -08:00
parent 04d83fc37f
commit 35f80da21a
21 changed files with 80 additions and 34 deletions

View file

@ -47,6 +47,8 @@ public final class JavaFlumeEventCount {
System.exit(1);
}
StreamingExamples.setStreamingLogLevels();
String master = args[0];
String host = args[1];
int port = Integer.parseInt(args[2]);

View file

@ -59,6 +59,8 @@ public final class JavaKafkaWordCount {
System.exit(1);
}
StreamingExamples.setStreamingLogLevels();
// Create the context with a 1 second batch size
JavaStreamingContext jssc = new JavaStreamingContext(args[0], "KafkaWordCount",
new Duration(2000), System.getenv("SPARK_HOME"),

View file

@ -53,6 +53,8 @@ public final class JavaNetworkWordCount {
System.exit(1);
}
StreamingExamples.setStreamingLogLevels();
// Create the context with a 1 second batch size
JavaStreamingContext ssc = new JavaStreamingContext(args[0], "NetworkWordCount",
new Duration(1000), System.getenv("SPARK_HOME"),

View file

@ -41,6 +41,8 @@ public final class JavaQueueStream {
System.exit(1);
}
StreamingExamples.setStreamingLogLevels();
// Create the context
JavaStreamingContext ssc = new JavaStreamingContext(args[0], "QueueStream", new Duration(1000),
System.getenv("SPARK_HOME"), JavaStreamingContext.jarOfClass(JavaQueueStream.class));

View file

@ -18,17 +18,13 @@
package org.apache.spark.streaming.examples
import scala.collection.mutable.LinkedList
import scala.util.Random
import scala.reflect.ClassTag
import scala.util.Random
import akka.actor.Actor
import akka.actor.ActorRef
import akka.actor.Props
import akka.actor.actorRef2Scala
import akka.actor.{Actor, ActorRef, Props, actorRef2Scala}
import org.apache.spark.SparkConf
import org.apache.spark.streaming.Seconds
import org.apache.spark.streaming.StreamingContext
import org.apache.spark.streaming.{Seconds, StreamingContext}
import org.apache.spark.streaming.StreamingContext.toPairDStreamFunctions
import org.apache.spark.streaming.receivers.Receiver
import org.apache.spark.util.AkkaUtils
@ -147,6 +143,8 @@ object ActorWordCount {
System.exit(1)
}
StreamingExamples.setStreamingLogLevels()
val Seq(master, host, port) = args.toSeq
// Create the context and set the batch size

View file

@ -17,10 +17,10 @@
package org.apache.spark.streaming.examples
import org.apache.spark.util.IntParam
import org.apache.spark.storage.StorageLevel
import org.apache.spark.streaming._
import org.apache.spark.streaming.flume._
import org.apache.spark.util.IntParam
/**
* Produces a count of events received from Flume.
@ -44,6 +44,8 @@ object FlumeEventCount {
System.exit(1)
}
StreamingExamples.setStreamingLogLevels()
val Array(master, host, IntParam(port)) = args
val batchInterval = Milliseconds(2000)

View file

@ -20,7 +20,6 @@ package org.apache.spark.streaming.examples
import org.apache.spark.streaming.{Seconds, StreamingContext}
import org.apache.spark.streaming.StreamingContext._
/**
* Counts words in new text files created in the given directory
* Usage: HdfsWordCount <master> <directory>
@ -38,6 +37,8 @@ object HdfsWordCount {
System.exit(1)
}
StreamingExamples.setStreamingLogLevels()
// Create the context
val ssc = new StreamingContext(args(0), "HdfsWordCount", Seconds(2),
System.getenv("SPARK_HOME"), StreamingContext.jarOfClass(this.getClass))

View file

@ -23,8 +23,8 @@ import kafka.producer._
import org.apache.spark.streaming._
import org.apache.spark.streaming.StreamingContext._
import org.apache.spark.streaming.util.RawTextHelper._
import org.apache.spark.streaming.kafka._
import org.apache.spark.streaming.util.RawTextHelper._
/**
* Consumes messages from one or more topics in Kafka and does wordcount.
@ -40,12 +40,13 @@ import org.apache.spark.streaming.kafka._
*/
object KafkaWordCount {
def main(args: Array[String]) {
if (args.length < 5) {
System.err.println("Usage: KafkaWordCount <master> <zkQuorum> <group> <topics> <numThreads>")
System.exit(1)
}
StreamingExamples.setStreamingLogLevels()
val Array(master, zkQuorum, group, topics, numThreads) = args
val ssc = new StreamingContext(master, "KafkaWordCount", Seconds(2),

View file

@ -17,12 +17,8 @@
package org.apache.spark.streaming.examples
import org.eclipse.paho.client.mqttv3.MqttClient
import org.eclipse.paho.client.mqttv3.MqttClientPersistence
import org.eclipse.paho.client.mqttv3.{MqttClient, MqttClientPersistence, MqttException, MqttMessage, MqttTopic}
import org.eclipse.paho.client.mqttv3.persist.MqttDefaultFilePersistence
import org.eclipse.paho.client.mqttv3.MqttException
import org.eclipse.paho.client.mqttv3.MqttMessage
import org.eclipse.paho.client.mqttv3.MqttTopic
import org.apache.spark.storage.StorageLevel
import org.apache.spark.streaming.{Seconds, StreamingContext}
@ -43,6 +39,8 @@ object MQTTPublisher {
System.exit(1)
}
StreamingExamples.setStreamingLogLevels()
val Seq(brokerUrl, topic) = args.toSeq
try {

View file

@ -39,6 +39,8 @@ object NetworkWordCount {
System.exit(1)
}
StreamingExamples.setStreamingLogLevels()
// Create the context with a 1 second batch size
val ssc = new StreamingContext(args(0), "NetworkWordCount", Seconds(1),
System.getenv("SPARK_HOME"), StreamingContext.jarOfClass(this.getClass))

View file

@ -17,12 +17,12 @@
package org.apache.spark.streaming.examples
import scala.collection.mutable.SynchronizedQueue
import org.apache.spark.rdd.RDD
import org.apache.spark.streaming.{Seconds, StreamingContext}
import org.apache.spark.streaming.StreamingContext._
import scala.collection.mutable.SynchronizedQueue
object QueueStream {
def main(args: Array[String]) {
@ -30,7 +30,9 @@ object QueueStream {
System.err.println("Usage: QueueStream <master>")
System.exit(1)
}
StreamingExamples.setStreamingLogLevels()
// Create the context
val ssc = new StreamingContext(args(0), "QueueStream", Seconds(1),
System.getenv("SPARK_HOME"), StreamingContext.jarOfClass(this.getClass))

View file

@ -17,11 +17,10 @@
package org.apache.spark.streaming.examples
import org.apache.spark.util.IntParam
import org.apache.spark.storage.StorageLevel
import org.apache.spark.streaming._
import org.apache.spark.streaming.util.RawTextHelper
import org.apache.spark.util.IntParam
/**
* Receives text from multiple rawNetworkStreams and counts how many '\n' delimited
@ -45,6 +44,8 @@ object RawNetworkGrep {
System.exit(1)
}
StreamingExamples.setStreamingLogLevels()
val Array(master, IntParam(numStreams), host, IntParam(port), IntParam(batchMillis)) = args
// Create the context

View file

@ -39,6 +39,8 @@ object StatefulNetworkWordCount {
System.exit(1)
}
StreamingExamples.setStreamingLogLevels()
val updateFunc = (values: Seq[Int], state: Option[Int]) => {
val currentCount = values.foldLeft(0)(_ + _)

View file

@ -0,0 +1,21 @@
package org.apache.spark.streaming.examples
import org.apache.spark.Logging
import org.apache.log4j.{Level, Logger}
/** Utility functions for Spark Streaming examples. */
object StreamingExamples extends Logging {
/** Set reasonable logging levels for streaming if the user has not configured log4j. */
def setStreamingLogLevels() {
val log4jInitialized = Logger.getRootLogger.getAllAppenders.hasMoreElements
if (!log4jInitialized) {
// We first log something to initialize Spark's default logging, then we override the
// logging level.
logInfo("Setting log level to [WARN] for streaming example." +
" To override add a custom log4j.properties to the classpath.")
Logger.getRootLogger.setLevel(Level.WARN)
}
}
}

View file

@ -17,12 +17,12 @@
package org.apache.spark.streaming.examples
import org.apache.spark.streaming.{Seconds, StreamingContext}
import org.apache.spark.storage.StorageLevel
import com.twitter.algebird._
import org.apache.spark.streaming.StreamingContext._
import org.apache.spark.SparkContext._
import org.apache.spark.SparkContext._
import org.apache.spark.storage.StorageLevel
import org.apache.spark.streaming.{Seconds, StreamingContext}
import org.apache.spark.streaming.StreamingContext._
import org.apache.spark.streaming.twitter._
/**
@ -51,6 +51,8 @@ object TwitterAlgebirdCMS {
System.exit(1)
}
StreamingExamples.setStreamingLogLevels()
// CMS parameters
val DELTA = 1E-3
val EPS = 0.01

View file

@ -17,10 +17,11 @@
package org.apache.spark.streaming.examples
import org.apache.spark.streaming.{Seconds, StreamingContext}
import org.apache.spark.storage.StorageLevel
import com.twitter.algebird.HyperLogLog._
import com.twitter.algebird.HyperLogLogMonoid
import com.twitter.algebird.HyperLogLog._
import org.apache.spark.storage.StorageLevel
import org.apache.spark.streaming.{Seconds, StreamingContext}
import org.apache.spark.streaming.twitter._
/**
@ -44,6 +45,8 @@ object TwitterAlgebirdHLL {
System.exit(1)
}
StreamingExamples.setStreamingLogLevels()
/** Bit size parameter for HyperLogLog, trades off accuracy vs size */
val BIT_SIZE = 12
val (master, filters) = (args.head, args.tail)

View file

@ -36,6 +36,8 @@ object TwitterPopularTags {
System.exit(1)
}
StreamingExamples.setStreamingLogLevels()
val (master, filters) = (args.head, args.tail)
val ssc = new StreamingContext(master, "TwitterPopularTags", Seconds(2),

View file

@ -76,6 +76,7 @@ object ZeroMQWordCount {
"In local mode, <master> should be 'local[n]' with n > 1")
System.exit(1)
}
StreamingExamples.setStreamingLogLevels()
val Seq(master, url, topic) = args.toSeq
// Create the context and set the batch size

View file

@ -17,9 +17,10 @@
package org.apache.spark.streaming.examples.clickstream
import org.apache.spark.SparkContext._
import org.apache.spark.streaming.{Seconds, StreamingContext}
import org.apache.spark.streaming.StreamingContext._
import org.apache.spark.SparkContext._
import org.apache.spark.streaming.examples.StreamingExamples
/** Analyses a streaming dataset of web page views. This class demonstrates several types of
* operators available in Spark streaming.
@ -36,6 +37,7 @@ object PageViewStream {
" errorRatePerZipCode, activeUserCount, popularUsersSeen")
System.exit(1)
}
StreamingExamples.setStreamingLogLevels()
val metric = args(0)
val host = args(1)
val port = args(2).toInt

View file

@ -333,7 +333,7 @@ abstract class DStream[T: ClassTag] (
var numForgotten = 0
val oldRDDs = generatedRDDs.filter(_._1 <= (time - rememberDuration))
generatedRDDs --= oldRDDs.keys
logInfo("Cleared " + oldRDDs.size + " RDDs that were older than " +
logDebug("Cleared " + oldRDDs.size + " RDDs that were older than " +
(time - rememberDuration) + ": " + oldRDDs.keys.mkString(", "))
dependencies.foreach(_.clearOldMetadata(time))
}

View file

@ -105,18 +105,18 @@ final private[streaming] class DStreamGraph extends Serializable with Logging {
def generateJobs(time: Time): Seq[Job] = {
this.synchronized {
logInfo("Generating jobs for time " + time)
logDebug("Generating jobs for time " + time)
val jobs = outputStreams.flatMap(outputStream => outputStream.generateJob(time))
logInfo("Generated " + jobs.length + " jobs for time " + time)
logDebug("Generated " + jobs.length + " jobs for time " + time)
jobs
}
}
def clearOldMetadata(time: Time) {
this.synchronized {
logInfo("Clearing old metadata for time " + time)
logDebug("Clearing old metadata for time " + time)
outputStreams.foreach(_.clearOldMetadata(time))
logInfo("Cleared old metadata for time " + time)
logDebug("Cleared old metadata for time " + time)
}
}