spark-instrumented-optimizer/core
Liang-Chi Hsieh 0cd436be66 [SPARK-29244][CORE] Prevent freed page in BytesToBytesMap free again
### What changes were proposed in this pull request?

When BytesToBytesMap cannot allocate a page, allocated page was freed by TaskMemoryManager. In this case, we should not keep it in longArray field of BytesToBytesMap. Otherwise it might be freed again in task completion listener of UnsafeFixedWidthAggregationMap and cause confusing error.

Note that because we catch Throwable when invoking completion listeners, this error should not affect other listeners, except for the current listener. In the completion listener of UnsafeFixedWidthAggregationMap, it only performs BytesToBytesMap.free().

BytesToBytesMap.free() does two things: freeing allocated pages and deleting spilled files. When it tries to free a freed page, this error hits and skips remaining pages (Executor.cleanUpAllAllocatedMemory will guard memory leak) and spilled files.

### Why are the changes needed?

By chance, it is possibly that we free an already freed page in BytesToBytesMap. Because we have some guards when freeing a page, a confusing error would be hit:

```
16:07:33.550 ERROR org.apache.spark.TaskContextImpl: Error in TaskCompletionListener
java.lang.AssertionError: Called freePage() on a memory block that has already been freed
        at org.apache.spark.memory.TaskMemoryManager.freePage(TaskMemoryManager.java:332)
        at org.apache.spark.memory.MemoryConsumer.freePage(MemoryConsumer.java:129)
        at org.apache.spark.memory.MemoryConsumer.freeArray(MemoryConsumer.java:107)
        at org.apache.spark.unsafe.map.BytesToBytesMap.free(BytesToBytesMap.java:806)
        at org.apache.spark.sql.execution.UnsafeFixedWidthAggregationMap.free(UnsafeFixedWidthAggregationMap.java:226)
        at org.apache.spark.sql.execution.UnsafeFixedWidthAggregationMap.lambda$new$0(UnsafeFixedWidthAggregationMap.java:112)
        at org.apache.spark.TaskContextImpl.$anonfun$markTaskCompleted$1(TaskContextImpl.scala:119)
        at org.apache.spark.TaskContextImpl.$anonfun$markTaskCompleted$1$adapted(TaskContextImpl.scala:119)
        at org.apache.spark.TaskContextImpl.$anonfun$invokeListeners$1(TaskContextImpl.scala:132)
        at org.apache.spark.TaskContextImpl.$anonfun$invokeListeners$1$adapted(TaskContextImpl.scala:130)
        at scala.collection.mutable.ResizableArray.foreach(ResizableArray.scala:62)
        at scala.collection.mutable.ResizableArray.foreach$(ResizableArray.scala:55)
        at scala.collection.mutable.ArrayBuffer.foreach(ArrayBuffer.scala:49)
        at org.apache.spark.TaskContextImpl.invokeListeners(TaskContextImpl.scala:130)
        at org.apache.spark.TaskContextImpl.markTaskCompleted(TaskContextImpl.scala:119)
        at org.apache.spark.sql.execution.UnsafeFixedWidthAggregationMapSuite.$anonfun$new$19(UnsafeFixedWidthAggregationMapSuite.scala:425)
        at org.apache.spark.sql.execution.UnsafeFixedWidthAggregationMapSuite.$anonfun$testWithMemoryLeakDetection$1(UnsafeFixedWidthAggregationMapSuite.scala:87)
        at scala.runtime.java8.JFunction0$mcV$sp.apply(JFunction0$mcV$sp.java:23)
        at org.scalatest.OutcomeOf.outcomeOf(OutcomeOf.scala:85)
        at org.scalatest.OutcomeOf.outcomeOf$(OutcomeOf.scala:83)
        at org.scalatest.OutcomeOf$.outcomeOf(OutcomeOf.scala:104)
        at org.scalatest.Transformer.apply(Transformer.scala:22)
        at org.scalatest.Transformer.apply(Transformer.scala:20)
        at org.scalatest.FunSuiteLike$$anon$1.apply(FunSuiteLike.scala:186)
        at org.apache.spark.SparkFunSuite.withFixture(SparkFunSuite.scala:149)
        at org.scalatest.FunSuiteLike.invokeWithFixture$1(FunSuiteLike.scala:184)
        at org.scalatest.FunSuiteLike.$anonfun$runTest$1(FunSuiteLike.scala:196)
        at org.scalatest.SuperEngine.runTestImpl(Engine.scala:289)
        at org.scalatest.FunSuiteLike.runTest(FunSuiteLike.scala:196)
        at org.scalatest.FunSuiteLike.runTest$(FunSuiteLike.scala:178)
        at org.apache.spark.SparkFunSuite.org$scalatest$BeforeAndAfterEach$$super$runTest(SparkFunSuite.scala:56)
        at org.scalatest.BeforeAndAfterEach.runTest(BeforeAndAfterEach.scala:221)
        at org.scalatest.BeforeAndAfterEach.runTest$(BeforeAndAfterEach.scala:214)
        at org.apache.spark.SparkFunSuite.runTest(SparkFunSuite.scala:56)
        at org.scalatest.FunSuiteLike.$anonfun$runTests$1(FunSuiteLike.scala:229)
        at org.scalatest.SuperEngine.$anonfun$runTestsInBranch$1(Engine.scala:396)
        at scala.collection.immutable.List.foreach(List.scala:392)
        at org.scalatest.SuperEngine.traverseSubNodes$1(Engine.scala:384)
        at org.scalatest.SuperEngine.runTestsInBranch(Engine.scala:379)
        at org.scalatest.SuperEngine.runTestsImpl(Engine.scala:461)
        at org.scalatest.FunSuiteLike.runTests(FunSuiteLike.scala:229)
        at org.scalatest.FunSuiteLike.runTests$(FunSuiteLike.scala:228)
        at org.scalatest.FunSuite.runTests(FunSuite.scala:1560)
        at org.scalatest.Suite.run(Suite.scala:1147)
        at org.scalatest.Suite.run$(Suite.scala:1129)
        at org.scalatest.FunSuite.org$scalatest$FunSuiteLike$$super$run(FunSuite.scala:1560)
        at org.scalatest.FunSuiteLike.$anonfun$run$1(FunSuiteLike.scala:233)
        at org.scalatest.SuperEngine.runImpl(Engine.scala:521)
        at org.scalatest.FunSuiteLike.run(FunSuiteLike.scala:233)
        at org.scalatest.FunSuiteLike.run$(FunSuiteLike.scala:232)
        at org.apache.spark.SparkFunSuite.org$scalatest$BeforeAndAfterAll$$super$run(SparkFunSuite.scala:56)
        at org.scalatest.BeforeAndAfterAll.liftedTree1$1(BeforeAndAfterAll.scala:213)
        at org.scalatest.BeforeAndAfterAll.run(BeforeAndAfterAll.scala:210)
        at org.scalatest.BeforeAndAfterAll.run$(BeforeAndAfterAll.scala:208)
        at org.apache.spark.SparkFunSuite.run(SparkFunSuite.scala:56)
        at org.scalatest.tools.Framework.org$scalatest$tools$Framework$$runSuite(Framework.scala:314)
        at org.scalatest.tools.Framework$ScalaTestTask.execute(Framework.scala:507)
        at sbt.ForkMain$Run$2.call(ForkMain.java:296)
        at sbt.ForkMain$Run$2.call(ForkMain.java:286)
        at java.util.concurrent.FutureTask.run(FutureTask.java:266)
        at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1149)
        at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:624)
        at java.lang.Thread.run(Thread.java:748)
[info] - SPARK-29244 *** FAILED *** (16 milliseconds)
[info]   org.apache.spark.util.TaskCompletionListenerException: Called freePage() on a memory block that has already been freed
[info]
[info] Previous exception in task: Unable to acquire 4096 bytes of memory, got 0
[info]  org.apache.spark.memory.MemoryConsumer.throwOom(MemoryConsumer.java:157)
[info]  org.apache.spark.memory.MemoryConsumer.allocateArray(MemoryConsumer.java:97)
[info]  org.apache.spark.unsafe.map.BytesToBytesMap.allocate(BytesToBytesMap.java:790)
[info]  org.apache.spark.unsafe.map.BytesToBytesMap.reset(BytesToBytesMap.java:893)
[info]  org.apache.spark.sql.execution.UnsafeKVExternalSorter.<init>(UnsafeKVExternalSorter.java:170)
[info]  org.apache.spark.sql.execution.UnsafeFixedWidthAggregationMap.destructAndCreateExternalSorter(UnsafeFixedWidthAggregationMap.java:249)
[info]  org.apache.spark.sql.execution.UnsafeFixedWidthAggregationMapSuite.$anonfun$new$19(UnsafeFixedWidthAggregationMapSuite.scala:421)
[info]  org.apache.spark.sql.execution.UnsafeFixedWidthAggregationMapSuite.$anonfun$testWithMemoryLeakDetection$1(UnsafeFixedWidthAggregationMapSuite.scala:87)
```

### Does this PR introduce any user-facing change?

No

### How was this patch tested?

Added unit test.

Closes #25953 from viirya/SPARK-29244.

Lead-authored-by: Liang-Chi Hsieh <viirya@gmail.com>
Co-authored-by: Liang-Chi Hsieh <liangchi@uber.com>
Signed-off-by: Liang-Chi Hsieh <liangchi@uber.com>
2019-10-01 11:50:31 -07:00
..
benchmarks [SPARK-29297][TESTS] Compare core/mllib module benchmarks in JDK8/11 2019-09-29 21:43:58 -07:00
src [SPARK-29244][CORE] Prevent freed page in BytesToBytesMap free again 2019-10-01 11:50:31 -07:00
pom.xml [SPARK-29082][CORE] Skip delegation token generation if no credentials are available 2019-09-24 11:12:26 -07:00