diff --git a/core/src/main/scala/spark/RDD.scala b/core/src/main/scala/spark/RDD.scala index cce0ea2183..8e50ea5853 100644 --- a/core/src/main/scala/spark/RDD.scala +++ b/core/src/main/scala/spark/RDD.scala @@ -12,6 +12,7 @@ import scala.collection.mutable.ArrayBuffer import scala.collection.Map import scala.collection.mutable.HashMap import scala.collection.JavaConversions.mapAsScalaMap +import scala.util.control.Breaks._ import org.apache.hadoop.io.BytesWritable import org.apache.hadoop.io.NullWritable @@ -61,6 +62,9 @@ abstract class RDD[T: ClassManifest](@transient sc: SparkContext) extends Serial def compute(split: Split): Iterator[T] @transient val dependencies: List[Dependency[_]] + // Record user function generating this RDD + val origin = getOriginDescription + // Optionally overridden by subclasses to specify how they are partitioned val partitioner: Option[Partitioner] = None @@ -124,6 +128,37 @@ abstract class RDD[T: ClassManifest](@transient sc: SparkContext) extends Serial } } + // Describe which spark and user functions generated this RDD. Only works if called from + // constructor. + def getOriginDescription : String = { + val trace = Thread.currentThread().getStackTrace().filter( el => + (!el.getMethodName().contains("getStackTrace"))) + + // Keep crawling up the stack trace until we find the first function not inside of the spark + // package. We track the last (shallowest) contiguous Spark method. This might be an RDD + // transformation, a SparkContext function (such as parallelize), or anything else that leads + // to instantiation of an RDD. We also track the first (deepest) user method, file, and line. + var lastSparkMethod = "" + var firstUserMethod = "" + var firstUserFile = "" + var firstUserLine = -1 + + breakable { + for (el <- trace) { + if (el.getClassName().contains("spark") && !el.getClassName().contains("spark.examples")) { + lastSparkMethod = el.getMethodName() + } + else { + firstUserMethod = el.getMethodName() + firstUserLine = el.getLineNumber() + firstUserFile = el.getFileName() + break + } + } + } + "%s called in %s (%s:%s)".format(lastSparkMethod, firstUserMethod, firstUserFile, firstUserLine) + } + // Transformations (return a new RDD) def map[U: ClassManifest](f: T => U): RDD[U] = new MappedRDD(this, sc.clean(f)) diff --git a/core/src/main/scala/spark/scheduler/DAGScheduler.scala b/core/src/main/scala/spark/scheduler/DAGScheduler.scala index 2e2dc295b6..4944f41e3a 100644 --- a/core/src/main/scala/spark/scheduler/DAGScheduler.scala +++ b/core/src/main/scala/spark/scheduler/DAGScheduler.scala @@ -337,7 +337,8 @@ class DAGScheduler(taskSched: TaskScheduler) extends TaskSchedulerListener with val missing = getMissingParentStages(stage).sortBy(_.id) logDebug("missing: " + missing) if (missing == Nil) { - logInfo("Submitting " + stage + ", which has no missing parents") + logInfo("Submitting " + stage + " from " + stage.rdd.origin + + ", which has no missing parents") submitMissingTasks(stage) running += stage } else { @@ -452,6 +453,8 @@ class DAGScheduler(taskSched: TaskScheduler) extends TaskSchedulerListener with waiting --= newlyRunnable running ++= newlyRunnable for (stage <- newlyRunnable.sortBy(_.id)) { + logInfo("Submitting " + stage + " from " + stage.rdd.origin + + " which is now runnable") submitMissingTasks(stage) } }