438f8fd675
## What changes were proposed in this pull request? This pull request fixes [SPARK-26114](https://issues.apache.org/jira/browse/SPARK-26114) issue that occurs when trying to reduce the number of partitions by means of coalesce without shuffling after shuffle-based transformations. The leak occurs because of not cleaning up `ExternalSorter`'s `readingIterator` field as it's done for its `map` and `buffer` fields. Additionally there are changes to the `CompletionIterator` to prevent capturing its `sub`-iterator and holding it even after the completion iterator completes. It is necessary because in some cases, e.g. in case of standard scala's `flatMap` iterator (which is used is `CoalescedRDD`'s `compute` method) the next value of the main iterator is assigned to `flatMap`'s `cur` field only after it is available. For DAGs where ShuffledRDD is a parent of CoalescedRDD it means that the data should be fetched from the map-side of the shuffle, but the process of fetching this data consumes quite a lot of memory in addition to the memory already consumed by the iterator held by `flatMap`'s `cur` field (until it is reassigned). For the following data ```scala import org.apache.hadoop.io._ import org.apache.hadoop.io.compress._ import org.apache.commons.lang._ import org.apache.spark._ // generate 100M records of sample data sc.makeRDD(1 to 1000, 1000) .flatMap(item => (1 to 100000) .map(i => new Text(RandomStringUtils.randomAlphanumeric(3).toLowerCase) -> new Text(RandomStringUtils.randomAlphanumeric(1024)))) .saveAsSequenceFile("/tmp/random-strings", Some(classOf[GzipCodec])) ``` and the following job ```scala import org.apache.hadoop.io._ import org.apache.spark._ import org.apache.spark.storage._ val rdd = sc.sequenceFile("/tmp/random-strings", classOf[Text], classOf[Text]) rdd .map(item => item._1.toString -> item._2.toString) .repartitionAndSortWithinPartitions(new HashPartitioner(1000)) .coalesce(10,false) .count ``` ... executed like the following ```bash spark-shell \ --num-executors=5 \ --executor-cores=2 \ --master=yarn \ --deploy-mode=client \ --conf spark.executor.memoryOverhead=512 \ --conf spark.executor.memory=1g \ --conf spark.dynamicAllocation.enabled=false \ --conf spark.executor.extraJavaOptions='-XX:+HeapDumpOnOutOfMemoryError -XX:HeapDumpPath=/tmp -Dio.netty.noUnsafe=true' ``` ... executors are always failing with OutOfMemoryErrors. The main issue is multiple leaks of ExternalSorter references. For example, in case of 2 tasks per executor it is expected to be 2 simultaneous instances of ExternalSorter per executor but heap dump generated on OutOfMemoryError shows that there are more ones. ![run1-noparams-dominator-tree-externalsorter](https://user-images.githubusercontent.com/1523889/48703665-782ce580-ec05-11e8-95a9-d6c94e8285ab.png) P.S. This PR does not cover cases with CoGroupedRDDs which use ExternalAppendOnlyMap internally, which itself can lead to OutOfMemoryErrors in many places. ## How was this patch tested? - Existing unit tests - New unit tests - Job executions on the live environment Here is the screenshot before applying this patch ![run3-noparams-failure-ui-5x2-repartition-and-sort](https://user-images.githubusercontent.com/1523889/48700395-f769eb80-ebfc-11e8-831b-e94c757d416c.png) Here is the screenshot after applying this patch ![run3-noparams-success-ui-5x2-repartition-and-sort](https://user-images.githubusercontent.com/1523889/48700610-7a8b4180-ebfd-11e8-9761-baaf38a58e66.png) And in case of reducing the number of executors even more the job is still stable ![run3-noparams-success-ui-2x2-repartition-and-sort](https://user-images.githubusercontent.com/1523889/48700619-82e37c80-ebfd-11e8-98ed-a38e1f1f1fd9.png) Closes #23083 from szhem/SPARK-26114-externalsorter-leak. Authored-by: Sergey Zhemzhitsky <szhemzhitski@gmail.com> Signed-off-by: Wenchen Fan <wenchen@databricks.com> |
||
---|---|---|
.. | ||
main | ||
test |