[SPARK-8981][CORE][TEST-HADOOP3.2][TEST-JAVA11] Add MDC support in Executor

### What changes were proposed in this pull request?
Added MDC support in all thread pools.
ThreaddUtils create new pools that pass over MDC.

### Why are the changes needed?
In many cases, it is very hard to understand from which actions the logs in the executor come from.
when you are doing multi-thread work in the driver and send actions in parallel.

### Does this PR introduce any user-facing change?
No

### How was this patch tested?
No test added because no new functionality added it is thread pull change and all current tests pass.

Closes #26624 from igreenfield/master.

Authored-by: Izek Greenfield <igreenfield@axiomsl.com>
Signed-off-by: Wenchen Fan <wenchen@databricks.com>
This commit is contained in:
Izek Greenfield 2020-05-20 07:41:00 +00:00 committed by Wenchen Fan
parent b7947e0285
commit eaf7a2a4ed
2 changed files with 27 additions and 1 deletions

View file

@ -34,6 +34,7 @@ import scala.concurrent.duration._
import scala.util.control.NonFatal
import com.google.common.util.concurrent.ThreadFactoryBuilder
import org.slf4j.MDC
import org.apache.spark._
import org.apache.spark.deploy.SparkHadoopUtil
@ -320,7 +321,12 @@ private[spark] class Executor(
val taskId = taskDescription.taskId
val threadName = s"Executor task launch worker for task $taskId"
private val taskName = taskDescription.name
val taskName = taskDescription.name
val mdcProperties = taskDescription.properties.asScala
.filter(_._1.startsWith("mdc.")).map { item =>
val key = item._1.substring(4)
(key, item._2)
}.toSeq
/** If specified, this task has been killed and this option contains the reason. */
@volatile private var reasonIfKilled: Option[String] = None
@ -395,6 +401,9 @@ private[spark] class Executor(
}
override def run(): Unit = {
setMDCForTask(taskName, mdcProperties)
threadId = Thread.currentThread.getId
Thread.currentThread.setName(threadName)
val threadMXBean = ManagementFactory.getThreadMXBean
@ -693,6 +702,14 @@ private[spark] class Executor(
}
}
private def setMDCForTask(taskName: String, mdc: Seq[(String, String)]): Unit = {
MDC.put("taskName", taskName)
mdc.foreach { case (key, value) =>
MDC.put(key, value)
}
}
/**
* Supervises the killing / cancellation of a task by sending the interrupted flag, optionally
* sending a Thread.interrupt(), and monitoring the task until it finishes.
@ -733,6 +750,9 @@ private[spark] class Executor(
private[this] val takeThreadDump: Boolean = conf.get(TASK_REAPER_THREAD_DUMP)
override def run(): Unit = {
setMDCForTask(taskRunner.taskName, taskRunner.mdcProperties)
val startTimeNs = System.nanoTime()
def elapsedTimeNs = System.nanoTime() - startTimeNs
def timeoutExceeded(): Boolean = killTimeoutNs > 0 && elapsedTimeNs > killTimeoutNs

View file

@ -2955,6 +2955,12 @@ Spark uses [log4j](http://logging.apache.org/log4j/) for logging. You can config
`log4j.properties` file in the `conf` directory. One way to start is to copy the existing
`log4j.properties.template` located there.
By default, Spark adds 1 record to the MDC (Mapped Diagnostic Context): `taskName`, which shows something
like `task 1.0 in stage 0.0`. You can add `%X{taskName}` to your patternLayout in
order to print it in the logs.
Moreover, you can use `spark.sparkContext.setLocalProperty("mdc." + name, "value")` to add user specific data into MDC.
The key in MDC will be the string after the `mdc.` prefix.
# Overriding configuration directory
To specify a different configuration directory other than the default "SPARK_HOME/conf",