diff --git a/core/src/main/scala/org/apache/spark/executor/Executor.scala b/core/src/main/scala/org/apache/spark/executor/Executor.scala index 45cec726c4..93d1acdd2d 100644 --- a/core/src/main/scala/org/apache/spark/executor/Executor.scala +++ b/core/src/main/scala/org/apache/spark/executor/Executor.scala @@ -401,9 +401,7 @@ private[spark] class Executor( } override def run(): Unit = { - setMDCForTask(taskName, mdcProperties) - threadId = Thread.currentThread.getId Thread.currentThread.setName(threadName) val threadMXBean = ManagementFactory.getThreadMXBean @@ -703,11 +701,11 @@ private[spark] class Executor( } private def setMDCForTask(taskName: String, mdc: Seq[(String, String)]): Unit = { + // make sure we run the task with the user-specified mdc properties only + MDC.clear() + mdc.foreach { case (key, value) => MDC.put(key, value) } + // avoid overriding the takName by the user MDC.put("taskName", taskName) - - mdc.foreach { case (key, value) => - MDC.put(key, value) - } } /** @@ -750,9 +748,7 @@ private[spark] class Executor( private[this] val takeThreadDump: Boolean = conf.get(TASK_REAPER_THREAD_DUMP) override def run(): Unit = { - setMDCForTask(taskRunner.taskName, taskRunner.mdcProperties) - val startTimeNs = System.nanoTime() def elapsedTimeNs = System.nanoTime() - startTimeNs def timeoutExceeded(): Boolean = killTimeoutNs > 0 && elapsedTimeNs > killTimeoutNs