diff --git a/core/src/main/scala/org/apache/spark/executor/Executor.scala b/core/src/main/scala/org/apache/spark/executor/Executor.scala index 2bfa1cea4b..45cec726c4 100644 --- a/core/src/main/scala/org/apache/spark/executor/Executor.scala +++ b/core/src/main/scala/org/apache/spark/executor/Executor.scala @@ -34,6 +34,7 @@ import scala.concurrent.duration._ import scala.util.control.NonFatal import com.google.common.util.concurrent.ThreadFactoryBuilder +import org.slf4j.MDC import org.apache.spark._ import org.apache.spark.deploy.SparkHadoopUtil @@ -320,7 +321,12 @@ private[spark] class Executor( val taskId = taskDescription.taskId val threadName = s"Executor task launch worker for task $taskId" - private val taskName = taskDescription.name + val taskName = taskDescription.name + val mdcProperties = taskDescription.properties.asScala + .filter(_._1.startsWith("mdc.")).map { item => + val key = item._1.substring(4) + (key, item._2) + }.toSeq /** If specified, this task has been killed and this option contains the reason. */ @volatile private var reasonIfKilled: Option[String] = None @@ -395,6 +401,9 @@ private[spark] class Executor( } override def run(): Unit = { + + setMDCForTask(taskName, mdcProperties) + threadId = Thread.currentThread.getId Thread.currentThread.setName(threadName) val threadMXBean = ManagementFactory.getThreadMXBean @@ -693,6 +702,14 @@ private[spark] class Executor( } } + private def setMDCForTask(taskName: String, mdc: Seq[(String, String)]): Unit = { + MDC.put("taskName", taskName) + + mdc.foreach { case (key, value) => + MDC.put(key, value) + } + } + /** * Supervises the killing / cancellation of a task by sending the interrupted flag, optionally * sending a Thread.interrupt(), and monitoring the task until it finishes. @@ -733,6 +750,9 @@ private[spark] class Executor( private[this] val takeThreadDump: Boolean = conf.get(TASK_REAPER_THREAD_DUMP) override def run(): Unit = { + + setMDCForTask(taskRunner.taskName, taskRunner.mdcProperties) + val startTimeNs = System.nanoTime() def elapsedTimeNs = System.nanoTime() - startTimeNs def timeoutExceeded(): Boolean = killTimeoutNs > 0 && elapsedTimeNs > killTimeoutNs diff --git a/docs/configuration.md b/docs/configuration.md index fce04b9405..420942f7b7 100644 --- a/docs/configuration.md +++ b/docs/configuration.md @@ -2955,6 +2955,12 @@ Spark uses [log4j](http://logging.apache.org/log4j/) for logging. You can config `log4j.properties` file in the `conf` directory. One way to start is to copy the existing `log4j.properties.template` located there. +By default, Spark adds 1 record to the MDC (Mapped Diagnostic Context): `taskName`, which shows something +like `task 1.0 in stage 0.0`. You can add `%X{taskName}` to your patternLayout in +order to print it in the logs. +Moreover, you can use `spark.sparkContext.setLocalProperty("mdc." + name, "value")` to add user specific data into MDC. +The key in MDC will be the string after the `mdc.` prefix. + # Overriding configuration directory To specify a different configuration directory other than the default "SPARK_HOME/conf",