diff --git a/core/src/main/scala/org/apache/spark/scheduler/DAGScheduler.scala b/core/src/main/scala/org/apache/spark/scheduler/DAGScheduler.scala index f3d87ee5c4..f8ba3d2d1a 100644 --- a/core/src/main/scala/org/apache/spark/scheduler/DAGScheduler.scala +++ b/core/src/main/scala/org/apache/spark/scheduler/DAGScheduler.scala @@ -556,6 +556,9 @@ class DAGScheduler( case JobFailed(exception: Exception) => logInfo("Job %d failed: %s, took %f s".format (waiter.jobId, callSite.shortForm, (System.nanoTime - start) / 1e9)) + // SPARK-8644: Include user stack trace in exceptions coming from DAGScheduler. + val callerStackTrace = Thread.currentThread().getStackTrace.tail + exception.setStackTrace(exception.getStackTrace ++ callerStackTrace) throw exception } } diff --git a/core/src/test/scala/org/apache/spark/scheduler/DAGSchedulerSuite.scala b/core/src/test/scala/org/apache/spark/scheduler/DAGSchedulerSuite.scala index 6bc45f249f..4f2b0fa162 100644 --- a/core/src/test/scala/org/apache/spark/scheduler/DAGSchedulerSuite.scala +++ b/core/src/test/scala/org/apache/spark/scheduler/DAGSchedulerSuite.scala @@ -875,6 +875,21 @@ class DAGSchedulerSuite assertDataStructuresEmpty } + test("Spark exceptions should include call site in stack trace") { + val e = intercept[SparkException] { + sc.parallelize(1 to 10, 2).map { _ => throw new RuntimeException("uh-oh!") }.count() + } + + // Does not include message, ONLY stack trace. + val stackTraceString = e.getStackTraceString + + // should actually include the RDD operation that invoked the method: + assert(stackTraceString.contains("org.apache.spark.rdd.RDD.count")) + + // should include the FunSuite setup: + assert(stackTraceString.contains("org.scalatest.FunSuite")) + } + /** * Assert that the supplied TaskSet has exactly the given hosts as its preferred locations. * Note that this checks only the host and not the executor ID.