[SPARK-35610][CORE] Fix the memory leak introduced by the Executor's stop shutdown hook
### What changes were proposed in this pull request? Fixing the memory leak by deregistering the shutdown hook when the executor is stopped. This way the Garbage Collector can release the executor object early. Which is a huge win for our tests as user's classloader could be also released which keeps references to objects which are created for the jars on the classpath. ### Why are the changes needed? I have identified this leak by running the Livy tests (I know it is close to the attic but this leak causes a constant OOM there) and it is in our Spark unit tests as well. This leak can be identified by checking the number of `LeakyEntry` in case of Scala 2.12.14 (and `ZipEntry` for Scala 2.12.10) instances which with its related data can take up a considerable amount of memory (as those are created from the jars which are on the classpath). I have my own tool for instrumenting JVM code [trace-agent](https://github.com/attilapiros/trace-agent) and with that I am able to call JVM diagnostic commands at specific methods. Let me show how it in action. It has a single text file embedded into the tool's jar called action.txt. In this case actions.txt content is: {noformat} $ unzip -q -c trace-agent-0.0.7.jar actions.txt diagnostic_command org.apache.spark.repl.ReplSuite runInterpreter cmd:gcClassHistogram,limit_output_lines:8,where:beforeAndAfter,with_gc:true diagnostic_command org.apache.spark.repl.ReplSuite afterAll cmd:gcClassHistogram,limit_output_lines:8,where:after,with_gc:true {noformat} Which creates a class histogram at the beginning and at the end of `org.apache.spark.repl.ReplSuite#runInterpreter()` (after triggering a GC which might not finish as GC is done in a separate thread..) and one histogram in the end of the `org.apache.spark.repl.ReplSuite#afterAll()` method. And the histograms are the followings on master branch: ``` $ ./build/sbt ";project repl;set Test/javaOptions += \"-javaagent:/Users/attilazsoltpiros/git/attilapiros/memoryLeak/trace-agent-0.0.7.jar\"; testOnly" |grep "ZipEntry\|LeakyEntry" 3: 197089 9460272 scala.reflect.io.FileZipArchive$LeakyEntry 3: 197089 9460272 scala.reflect.io.FileZipArchive$LeakyEntry 3: 197089 9460272 scala.reflect.io.FileZipArchive$LeakyEntry 3: 197089 9460272 scala.reflect.io.FileZipArchive$LeakyEntry 3: 197089 9460272 scala.reflect.io.FileZipArchive$LeakyEntry 3: 197089 9460272 scala.reflect.io.FileZipArchive$LeakyEntry 3: 394178 18920544 scala.reflect.io.FileZipArchive$LeakyEntry 3: 394178 18920544 scala.reflect.io.FileZipArchive$LeakyEntry 3: 591267 28380816 scala.reflect.io.FileZipArchive$LeakyEntry 3: 591267 28380816 scala.reflect.io.FileZipArchive$LeakyEntry 3: 788356 37841088 scala.reflect.io.FileZipArchive$LeakyEntry 3: 788356 37841088 scala.reflect.io.FileZipArchive$LeakyEntry 3: 985445 47301360 scala.reflect.io.FileZipArchive$LeakyEntry 3: 985445 47301360 scala.reflect.io.FileZipArchive$LeakyEntry 3: 1182534 56761632 scala.reflect.io.FileZipArchive$LeakyEntry 3: 1182534 56761632 scala.reflect.io.FileZipArchive$LeakyEntry 3: 1379623 66221904 scala.reflect.io.FileZipArchive$LeakyEntry 3: 1379623 66221904 scala.reflect.io.FileZipArchive$LeakyEntry 3: 1576712 75682176 scala.reflect.io.FileZipArchive$LeakyEntry ``` Where the header of the table is: ``` num #instances #bytes class name ``` So the `LeakyEntry` in the end is about 75MB (173MB in case of Scala 2.12.10 and before for another class called `ZipEntry`) but the first item (a char/byte arrays) and the second item (strings) in the histogram also relates to this leak: ``` $ ./build/sbt ";project repl;set Test/javaOptions += \"-javaagent:/Users/attilazsoltpiros/git/attilapiros/memoryLeak/trace-agent-0.0.7.jar\"; testOnly" |grep "1:\|2:\|3:" 1: 2701 3496112 [B 2: 21855 2607192 [C 3: 4885 537264 java.lang.Class 1: 480323 55970208 [C 2: 480499 11531976 java.lang.String 3: 197089 9460272 scala.reflect.io.FileZipArchive$LeakyEntry 1: 481825 56148024 [C 2: 481998 11567952 java.lang.String 3: 197089 9460272 scala.reflect.io.FileZipArchive$LeakyEntry 1: 487056 57550344 [C 2: 487179 11692296 java.lang.String 3: 197089 9460272 scala.reflect.io.FileZipArchive$LeakyEntry 1: 487054 57551008 [C 2: 487176 11692224 java.lang.String 3: 197089 9460272 scala.reflect.io.FileZipArchive$LeakyEntry 1: 927823 107139160 [C 2: 928072 22273728 java.lang.String 3: 394178 18920544 scala.reflect.io.FileZipArchive$LeakyEntry 1: 927793 107129328 [C 2: 928041 22272984 java.lang.String 3: 394178 18920544 scala.reflect.io.FileZipArchive$LeakyEntry 1: 1361851 155555608 [C 2: 1362261 32694264 java.lang.String 3: 591267 28380816 scala.reflect.io.FileZipArchive$LeakyEntry 1: 1361683 155493464 [C 2: 1362092 32690208 java.lang.String 3: 591267 28380816 scala.reflect.io.FileZipArchive$LeakyEntry 1: 1803074 205157728 [C 2: 1803268 43278432 java.lang.String 3: 788356 37841088 scala.reflect.io.FileZipArchive$LeakyEntry 1: 1802385 204938224 [C 2: 1802579 43261896 java.lang.String 3: 788356 37841088 scala.reflect.io.FileZipArchive$LeakyEntry 1: 2236631 253636592 [C 2: 2237029 53688696 java.lang.String 3: 985445 47301360 scala.reflect.io.FileZipArchive$LeakyEntry 1: 2236536 253603008 [C 2: 2236933 53686392 java.lang.String 3: 985445 47301360 scala.reflect.io.FileZipArchive$LeakyEntry 1: 2668892 301893920 [C 2: 2669510 64068240 java.lang.String 3: 1182534 56761632 scala.reflect.io.FileZipArchive$LeakyEntry 1: 2668759 301846376 [C 2: 2669376 64065024 java.lang.String 3: 1182534 56761632 scala.reflect.io.FileZipArchive$LeakyEntry 1: 3101238 350101048 [C 2: 3102073 74449752 java.lang.String 3: 1379623 66221904 scala.reflect.io.FileZipArchive$LeakyEntry 1: 3101240 350101104 [C 2: 3102075 74449800 java.lang.String 3: 1379623 66221904 scala.reflect.io.FileZipArchive$LeakyEntry 1: 3533785 398371760 [C 2: 3534835 84836040 java.lang.String 3: 1576712 75682176 scala.reflect.io.FileZipArchive$LeakyEntry 1: 3533759 398367088 [C 2: 3534807 84835368 java.lang.String 3: 1576712 75682176 scala.reflect.io.FileZipArchive$LeakyEntry 1: 3967049 446893400 [C 2: 3968314 95239536 java.lang.String 3: 1773801 85142448 scala.reflect.io.FileZipArchive$LeakyEntry [info] - SPARK-26633: ExecutorClassLoader.getResourceAsStream find REPL classes (8 seconds, 248 milliseconds) Setting default log level to "ERROR". To adjust logging level use sc.setLogLevel(newLevel). For SparkR, use setLogLevel(newLevel). 1: 3966423 446709584 [C 2: 3967682 95224368 java.lang.String 3: 1773801 85142448 scala.reflect.io.FileZipArchive$LeakyEntry 1: 4399583 495097208 [C 2: 4401050 105625200 java.lang.String 3: 1970890 94602720 scala.reflect.io.FileZipArchive$LeakyEntry 1: 4399578 495070064 [C 2: 4401040 105624960 java.lang.String 3: 1970890 94602720 scala.reflect.io.FileZipArchive$LeakyEntry ``` The last three is about 700MB altogether. ### Does this PR introduce _any_ user-facing change? No. ### How was this patch tested? I used the trace-agent tool with the same settings for the modified code: ``` $ ./build/sbt ";project repl;set Test/javaOptions += \"-javaagent:/Users/attilazsoltpiros/git/attilapiros/memoryLeak/trace-agent-0.0.7.jar\"; testOnly" |grep "1:\|2:\|3:" 1: 2701 3496112 [B 2: 21855 2607192 [C 3: 4885 537264 java.lang.Class 1: 480323 55970208 [C 2: 480499 11531976 java.lang.String 3: 197089 9460272 scala.reflect.io.FileZipArchive$LeakyEntry 1: 481825 56148024 [C 2: 481998 11567952 java.lang.String 3: 197089 9460272 scala.reflect.io.FileZipArchive$LeakyEntry 1: 487056 57550344 [C 2: 487179 11692296 java.lang.String 3: 197089 9460272 scala.reflect.io.FileZipArchive$LeakyEntry 1: 487054 57551008 [C 2: 487176 11692224 java.lang.String 3: 197089 9460272 scala.reflect.io.FileZipArchive$LeakyEntry 1: 927823 107139160 [C 2: 928072 22273728 java.lang.String 3: 394178 18920544 scala.reflect.io.FileZipArchive$LeakyEntry 1: 927793 107129328 [C 2: 928041 22272984 java.lang.String 3: 394178 18920544 scala.reflect.io.FileZipArchive$LeakyEntry 1: 1361851 155555608 [C 2: 1362261 32694264 java.lang.String 3: 591267 28380816 scala.reflect.io.FileZipArchive$LeakyEntry 1: 1361683 155493464 [C 2: 1362092 32690208 java.lang.String 3: 591267 28380816 scala.reflect.io.FileZipArchive$LeakyEntry 1: 1803074 205157728 [C 2: 1803268 43278432 java.lang.String 3: 788356 37841088 scala.reflect.io.FileZipArchive$LeakyEntry 1: 1802385 204938224 [C 2: 1802579 43261896 java.lang.String 3: 788356 37841088 scala.reflect.io.FileZipArchive$LeakyEntry 1: 2236631 253636592 [C 2: 2237029 53688696 java.lang.String 3: 985445 47301360 scala.reflect.io.FileZipArchive$LeakyEntry 1: 2236536 253603008 [C 2: 2236933 53686392 java.lang.String 3: 985445 47301360 scala.reflect.io.FileZipArchive$LeakyEntry 1: 2668892 301893920 [C 2: 2669510 64068240 java.lang.String 3: 1182534 56761632 scala.reflect.io.FileZipArchive$LeakyEntry 1: 2668759 301846376 [C 2: 2669376 64065024 java.lang.String 3: 1182534 56761632 scala.reflect.io.FileZipArchive$LeakyEntry 1: 3101238 350101048 [C 2: 3102073 74449752 java.lang.String 3: 1379623 66221904 scala.reflect.io.FileZipArchive$LeakyEntry 1: 3101240 350101104 [C 2: 3102075 74449800 java.lang.String 3: 1379623 66221904 scala.reflect.io.FileZipArchive$LeakyEntry 1: 3533785 398371760 [C 2: 3534835 84836040 java.lang.String 3: 1576712 75682176 scala.reflect.io.FileZipArchive$LeakyEntry 1: 3533759 398367088 [C 2: 3534807 84835368 java.lang.String 3: 1576712 75682176 scala.reflect.io.FileZipArchive$LeakyEntry 1: 3967049 446893400 [C 2: 3968314 95239536 java.lang.String 3: 1773801 85142448 scala.reflect.io.FileZipArchive$LeakyEntry [info] - SPARK-26633: ExecutorClassLoader.getResourceAsStream find REPL classes (8 seconds, 248 milliseconds) Setting default log level to "ERROR". To adjust logging level use sc.setLogLevel(newLevel). For SparkR, use setLogLevel(newLevel). 1: 3966423 446709584 [C 2: 3967682 95224368 java.lang.String 3: 1773801 85142448 scala.reflect.io.FileZipArchive$LeakyEntry 1: 4399583 495097208 [C 2: 4401050 105625200 java.lang.String 3: 1970890 94602720 scala.reflect.io.FileZipArchive$LeakyEntry 1: 4399578 495070064 [C 2: 4401040 105624960 java.lang.String 3: 1970890 94602720 scala.reflect.io.FileZipArchive$LeakyEntry [success] Total time: 174 s (02:54), completed Jun 2, 2021 2:00:43 PM ╭─attilazsoltpirosapiros-MBP16 ~/git/attilapiros/memoryLeak ‹SPARK-35610*› ╰─$ vim ╭─attilazsoltpirosapiros-MBP16 ~/git/attilapiros/memoryLeak ‹SPARK-35610*› ╰─$ ./build/sbt ";project repl;set Test/javaOptions += \"-javaagent:/Users/attilazsoltpiros/git/attilapiros/memoryLeak/trace-agent-0.0.7.jar\"; testOnly" |grep "1:\|2:\|3:" 1: 2685 3457368 [B 2: 21833 2606712 [C 3: 4885 537264 java.lang.Class 1: 480245 55978400 [C 2: 480421 11530104 java.lang.String 3: 197089 9460272 scala.reflect.io.FileZipArchive$LeakyEntry 1: 480460 56005784 [C 2: 480633 11535192 java.lang.String 3: 197089 9460272 scala.reflect.io.FileZipArchive$LeakyEntry 1: 486643 57537784 [C 2: 486766 11682384 java.lang.String 3: 197089 9460272 scala.reflect.io.FileZipArchive$LeakyEntry 1: 486636 57538192 [C 2: 486758 11682192 java.lang.String 3: 197089 9460272 scala.reflect.io.FileZipArchive$LeakyEntry 1: 501208 60411856 [C 2: 501180 12028320 java.lang.String 3: 197089 9460272 scala.reflect.io.FileZipArchive$LeakyEntry 1: 501206 60412960 [C 2: 501177 12028248 java.lang.String 3: 197089 9460272 scala.reflect.io.FileZipArchive$LeakyEntry 1: 934925 108773320 [C 2: 935058 22441392 java.lang.String 3: 394178 18920544 scala.reflect.io.FileZipArchive$LeakyEntry 1: 934912 108769528 [C 2: 935044 22441056 java.lang.String 3: 394178 18920544 scala.reflect.io.FileZipArchive$LeakyEntry 1: 1370351 156901296 [C 2: 1370318 32887632 java.lang.String 3: 591267 28380816 scala.reflect.io.FileZipArchive$LeakyEntry 1: 1369660 156681680 [C 2: 1369627 32871048 java.lang.String 3: 591267 28380816 scala.reflect.io.FileZipArchive$LeakyEntry 1: 1803746 205383136 [C 2: 1803917 43294008 java.lang.String 3: 788356 37841088 scala.reflect.io.FileZipArchive$LeakyEntry 1: 1803658 205353096 [C 2: 1803828 43291872 java.lang.String 3: 788356 37841088 scala.reflect.io.FileZipArchive$LeakyEntry 1: 2235677 253608240 [C 2: 2236068 53665632 java.lang.String 3: 985445 47301360 scala.reflect.io.FileZipArchive$LeakyEntry 1: 2235539 253560088 [C 2: 2235929 53662296 java.lang.String 3: 985445 47301360 scala.reflect.io.FileZipArchive$LeakyEntry 1: 2667775 301799240 [C 2: 2668383 64041192 java.lang.String 3: 1182534 56761632 scala.reflect.io.FileZipArchive$LeakyEntry 1: 2667765 301798568 [C 2: 2668373 64040952 java.lang.String 3: 1182534 56761632 scala.reflect.io.FileZipArchive$LeakyEntry 1: 2666665 301491096 [C 2: 2667285 64014840 java.lang.String 3: 1182534 56761632 scala.reflect.io.FileZipArchive$LeakyEntry 1: 2666648 301490792 [C 2: 2667266 64014384 java.lang.String 3: 1182534 56761632 scala.reflect.io.FileZipArchive$LeakyEntry 1: 2668169 301833032 [C 2: 2668782 64050768 java.lang.String 3: 1182534 56761632 scala.reflect.io.FileZipArchive$LeakyEntry [info] - SPARK-26633: ExecutorClassLoader.getResourceAsStream find REPL classes (6 seconds, 396 milliseconds) Setting default log level to "ERROR". To adjust logging level use sc.setLogLevel(newLevel). For SparkR, use setLogLevel(newLevel). 1: 2235495 253419952 [C 2: 2235887 53661288 java.lang.String 3: 985445 47301360 scala.reflect.io.FileZipArchive$LeakyEntry 1: 2668379 301800768 [C 2: 2668979 64055496 java.lang.String 3: 1182534 56761632 scala.reflect.io.FileZipArchive$LeakyEntry 1: 2236123 253522640 [C 2: 2236514 53676336 java.lang.String 3: 985445 47301360 scala.reflect.io.FileZipArchive$LeakyEntry ``` The sum of the last three numbers is about 354MB. Closes #32748 from attilapiros/SPARK-35610. Authored-by: attilapiros <piros.attila.zsolt@gmail.com> Signed-off-by: Dongjoon Hyun <dhyun@apple.com>
This commit is contained in:
parent
8041aed296
commit
806edf8f44
|
@ -72,7 +72,7 @@ private[spark] class Executor(
|
||||||
logInfo(s"Starting executor ID $executorId on host $executorHostname")
|
logInfo(s"Starting executor ID $executorId on host $executorHostname")
|
||||||
|
|
||||||
private val executorShutdown = new AtomicBoolean(false)
|
private val executorShutdown = new AtomicBoolean(false)
|
||||||
ShutdownHookManager.addShutdownHook(
|
val stopHookReference = ShutdownHookManager.addShutdownHook(
|
||||||
() => stop()
|
() => stop()
|
||||||
)
|
)
|
||||||
// Application dependencies (added through SparkContext) that we've fetched so far on this node.
|
// Application dependencies (added through SparkContext) that we've fetched so far on this node.
|
||||||
|
@ -312,6 +312,7 @@ private[spark] class Executor(
|
||||||
|
|
||||||
def stop(): Unit = {
|
def stop(): Unit = {
|
||||||
if (!executorShutdown.getAndSet(true)) {
|
if (!executorShutdown.getAndSet(true)) {
|
||||||
|
ShutdownHookManager.removeShutdownHook(stopHookReference)
|
||||||
env.metricsSystem.report()
|
env.metricsSystem.report()
|
||||||
try {
|
try {
|
||||||
metricsPoller.stop()
|
metricsPoller.stop()
|
||||||
|
|
Loading…
Reference in a new issue