Logging tweaks

This commit is contained in:
Matei Zaharia 2012-09-28 23:28:16 -07:00
parent 815d6bd69a
commit 1d44644f4f
5 changed files with 23 additions and 17 deletions

View file

@ -139,7 +139,6 @@ class CacheTracker(actorSystem: ActorSystem, isMaster: Boolean, blockManager: Bl
logInfo("Registering RDD ID " + rddId + " with cache")
registeredRddIds += rddId
communicate(RegisterRDD(rddId, numPartitions))
logInfo(RegisterRDD(rddId, numPartitions) + " successful")
}
}
}

View file

@ -70,7 +70,7 @@ class ShuffledSortedRDD[K <% Ordered[K]: ClassManifest, V](
// By separating this from RepartitionShuffledRDD, we avoided a
// buf.iterator.toArray call, thus avoiding building up the buffer twice.
val buf = new ArrayBuffer[(K, V)]
def addTupleToBuffer(k: K, v: V) = { buf += Tuple(k, v) }
def addTupleToBuffer(k: K, v: V) { buf += ((k, v)) }
SparkEnv.get.shuffleFetcher.fetch[K, V](dep.shuffleId, split.index, addTupleToBuffer)
if (ascending) {
buf.sortWith((x, y) => x._1 < y._1).iterator

View file

@ -409,10 +409,11 @@ class SparkContext(
partitions: Seq[Int],
allowLocal: Boolean
): Array[U] = {
logInfo("Starting job...")
val callSite = Utils.getSparkCallSite
logInfo("Starting job: " + callSite)
val start = System.nanoTime
val result = dagScheduler.runJob(rdd, func, partitions, allowLocal)
logInfo("Job finished in " + (System.nanoTime - start) / 1e9 + " s")
val result = dagScheduler.runJob(rdd, func, partitions, callSite, allowLocal)
logInfo("Job finished: " + callSite + ", took " + (System.nanoTime - start) / 1e9 + " s")
result
}
@ -445,10 +446,11 @@ class SparkContext(
evaluator: ApproximateEvaluator[U, R],
timeout: Long
): PartialResult[R] = {
logInfo("Starting job...")
val callSite = Utils.getSparkCallSite
logInfo("Starting job: " + callSite)
val start = System.nanoTime
val result = dagScheduler.runApproximateJob(rdd, func, evaluator, timeout)
logInfo("Job finished in " + (System.nanoTime - start) / 1e9 + " s")
val result = dagScheduler.runApproximateJob(rdd, func, evaluator, callSite, timeout)
logInfo("Job finished: " + callSite + ", took " + (System.nanoTime - start) / 1e9 + " s")
result
}

View file

@ -369,8 +369,13 @@ private object Utils extends Logging {
for (el <- trace) {
if (!finished) {
if (el.getClassName.contains("spark") && !el.getClassName.startsWith("spark.examples")) {
lastSparkMethod = el.getMethodName
if (el.getClassName.startsWith("spark.") && !el.getClassName.startsWith("spark.examples.")) {
lastSparkMethod = if (el.getMethodName == "<init>") {
// Spark method is a constructor; get its class name
el.getClassName.substring(el.getClassName.lastIndexOf('.') + 1)
} else {
el.getMethodName
}
}
else {
firstUserLine = el.getLineNumber

View file

@ -188,23 +188,22 @@ class DAGScheduler(taskSched: TaskScheduler) extends TaskSchedulerListener with
missing.toList
}
def runJob[T, U](
def runJob[T, U: ClassManifest](
finalRdd: RDD[T],
func: (TaskContext, Iterator[T]) => U,
partitions: Seq[Int],
callSite: String,
allowLocal: Boolean)
(implicit m: ClassManifest[U]): Array[U] =
: Array[U] =
{
if (partitions.size == 0) {
return new Array[U](0)
}
val waiter = new JobWaiter(partitions.size)
val func2 = func.asInstanceOf[(TaskContext, Iterator[_]) => _]
val callSite = Utils.getSparkCallSite
eventQueue.put(JobSubmitted(finalRdd, func2, partitions.toArray, allowLocal, callSite, waiter))
waiter.getResult() match {
case JobSucceeded(results: Seq[_]) =>
logInfo("Finished " + callSite)
return results.asInstanceOf[Seq[U]].toArray
case JobFailed(exception: Exception) =>
logInfo("Failed to run " + callSite)
@ -216,13 +215,14 @@ class DAGScheduler(taskSched: TaskScheduler) extends TaskSchedulerListener with
rdd: RDD[T],
func: (TaskContext, Iterator[T]) => U,
evaluator: ApproximateEvaluator[U, R],
timeout: Long
): PartialResult[R] =
callSite: String,
timeout: Long)
: PartialResult[R] =
{
val listener = new ApproximateActionListener(rdd, func, evaluator, timeout)
val func2 = func.asInstanceOf[(TaskContext, Iterator[_]) => _]
val partitions = (0 until rdd.splits.size).toArray
eventQueue.put(JobSubmitted(rdd, func2, partitions, false, Utils.getSparkCallSite, listener))
eventQueue.put(JobSubmitted(rdd, func2, partitions, false, callSite, listener))
return listener.getResult() // Will throw an exception if the job fails
}