[SPARK-8981][CORE][FOLLOW-UP] Clean up MDC properties after running a task

### What changes were proposed in this pull request?

This PR is a followup of #26624. This PR cleans up MDC properties if the original value is empty.
Besides, this PR adds a warning and ignore the value when the user tries to override the value of `taskName`.

### Why are the changes needed?

Before this PR, running the following jobs:

```
sc.setLocalProperty("mdc.my", "ABC")
sc.parallelize(1 to 100).count()
sc.setLocalProperty("mdc.my", null)
sc.parallelize(1 to 100).count()
```

there's still MDC value "ABC" in the log of the second count job even if we've unset the value.

### Does this PR introduce _any_ user-facing change?

Yes, user will 1) no longer see the MDC values after unsetting the value; 2) see a warning if he/she tries to override the value of `taskName`.

### How was this patch tested?

Tested Manaually.

Closes #28756 from Ngone51/followup-8981.

Authored-by: yi.wu <yi.wu@databricks.com>
Signed-off-by: Wenchen Fan <wenchen@databricks.com>
This commit is contained in:
yi.wu 2020-06-11 14:16:12 +00:00 committed by Wenchen Fan
parent 912d45df7c
commit 91cd06bd56

View file

@ -401,9 +401,7 @@ private[spark] class Executor(
}
override def run(): Unit = {
setMDCForTask(taskName, mdcProperties)
threadId = Thread.currentThread.getId
Thread.currentThread.setName(threadName)
val threadMXBean = ManagementFactory.getThreadMXBean
@ -703,11 +701,11 @@ private[spark] class Executor(
}
private def setMDCForTask(taskName: String, mdc: Seq[(String, String)]): Unit = {
// make sure we run the task with the user-specified mdc properties only
MDC.clear()
mdc.foreach { case (key, value) => MDC.put(key, value) }
// avoid overriding the takName by the user
MDC.put("taskName", taskName)
mdc.foreach { case (key, value) =>
MDC.put(key, value)
}
}
/**
@ -750,9 +748,7 @@ private[spark] class Executor(
private[this] val takeThreadDump: Boolean = conf.get(TASK_REAPER_THREAD_DUMP)
override def run(): Unit = {
setMDCForTask(taskRunner.taskName, taskRunner.mdcProperties)
val startTimeNs = System.nanoTime()
def elapsedTimeNs = System.nanoTime() - startTimeNs
def timeoutExceeded(): Boolean = killTimeoutNs > 0 && elapsedTimeNs > killTimeoutNs