spark-instrumented-optimizer/core
Liang-Chi Hsieh 9eff1186ae [SPARK-30379][CORE] Avoid OOM when using collection accumulator
### What changes were proposed in this pull request?

This patch proposes to only convert first few elements of collection accumulators in `LiveEntityHelpers.newAccumulatorInfos`.

### Why are the changes needed?

One Spark job on our cluster uses collection accumulator to collect something and has encountered an exception like:

```
java.lang.OutOfMemoryError: Java heap space
    at java.util.Arrays.copyOf(Arrays.java:3332)
    at java.lang.AbstractStringBuilder.ensureCapacityInternal(AbstractStringBuilder.java:124)
    at java.lang.AbstractStringBuilder.append(AbstractStringBuilder.java:448)
    at java.lang.StringBuilder.append(StringBuilder.java:136)
    at java.lang.StringBuilder.append(StringBuilder.java:131)
    at java.util.AbstractCollection.toString(AbstractCollection.java:462)
    at java.util.Collections$UnmodifiableCollection.toString(Collections.java:1035)
    at org.apache.spark.status.LiveEntityHelpers$$anonfun$newAccumulatorInfos$2$$anonfun$apply$3.apply(LiveEntity.scala:596)
    at org.apache.spark.status.LiveEntityHelpers$$anonfun$newAccumulatorInfos$2$$anonfun$apply$3.apply(LiveEntity.scala:596)
    at scala.Option.map(Option.scala:146)
    at org.apache.spark.status.LiveEntityHelpers$$anonfun$newAccumulatorInfos$2.apply(LiveEntity.scala:596)
    at org.apache.spark.status.LiveEntityHelpers$$anonfun$newAccumulatorInfos$2.apply(LiveEntity.scala:591)
```

`LiveEntityHelpers.newAccumulatorInfos` converts `AccumulableInfo`s to `v1.AccumulableInfo` by calling `toString` on accumulator's value. For collection accumulator, it might take much more memory when in string representation, for example, collection accumulator of long values, and cause OOM (in this job, the driver memory is 6g).

Looks like the results of `newAccumulatorInfos` are used in api and ui. For such usage, it also does not make sense to have very long string of complete collection accumulators.

### Does this PR introduce any user-facing change?

Yes. Collection accumulator now only shows first few elements in api and ui.

### How was this patch tested?

Unit test.

Manual test. Launched a Spark shell, ran:
```scala
val accum = sc.collectionAccumulator[Long]("Collection Accumulator Example")
sc.range(0, 10000, 1, 1).foreach(x => accum.add(x))
accum.value
```

<img width="2533" alt="Screen Shot 2019-12-30 at 2 03 43 PM" src="https://user-images.githubusercontent.com/68855/71602488-6eb2c400-2b0d-11ea-8725-dba36478198f.png">

Closes #27038 from viirya/partial-collect-accu.

Lead-authored-by: Liang-Chi Hsieh <viirya@gmail.com>
Co-authored-by: Liang-Chi Hsieh <liangchi@uber.com>
Signed-off-by: HyukjinKwon <gurwls223@apache.org>
2019-12-31 12:45:23 +09:00
..
benchmarks [SPARK-29576][CORE] Use Spark's CompressionCodec for Ser/Deser of MapOutputStatus 2019-10-23 18:17:37 -07:00
src [SPARK-30379][CORE] Avoid OOM when using collection accumulator 2019-12-31 12:45:23 +09:00
pom.xml [INFRA] Reverts commit 56dcd79 and c216ef1 2019-12-16 19:57:44 -07:00