[SPARK-36603][CORE] Use WeakReference not SoftReference in LevelDB

### What changes were proposed in this pull request?

Use WeakReference not SoftReference in LevelDB

### Why are the changes needed?

(See discussion at https://github.com/apache/spark/pull/28769#issuecomment-906722390 )

"The soft reference to iterator introduced in this pr unfortunately ended up causing iterators to not be closed when they go out of scope (which would have happened earlier in the finalize)

This is because java is more conservative in cleaning up SoftReference's.
The net result was we ended up having 50k files for SHS while typically they get compacted away to 200 odd files.

Changing from SoftReference to WeakReference should make it much more aggresive in cleanup and prevent the issue - which we observed in a 3.1 SHS"

### Does this PR introduce _any_ user-facing change?

No

### How was this patch tested?

Existing tests

Closes #33859 from srowen/SPARK-36603.

Authored-by: Sean Owen <srowen@gmail.com>
Signed-off-by: Dongjoon Hyun <dongjoon@apache.org>
(cherry picked from commit 89e907f76c)
Signed-off-by: Dongjoon Hyun <dongjoon@apache.org>
This commit is contained in:
Sean Owen 2021-08-29 09:29:23 -07:00 committed by Dongjoon Hyun
parent e3edb65bf0
commit b76471c5df

View file

@ -19,7 +19,8 @@ package org.apache.spark.util.kvstore;
import java.io.File;
import java.io.IOException;
import java.lang.ref.SoftReference;
import java.lang.ref.Reference;
import java.lang.ref.WeakReference;
import java.util.*;
import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.ConcurrentLinkedQueue;
@ -69,10 +70,10 @@ public class LevelDB implements KVStore {
/**
* Trying to close a JNI LevelDB handle with a closed DB causes JVM crashes. This is used to
* ensure that all iterators are correctly closed before LevelDB is closed. Use soft reference
* ensure that all iterators are correctly closed before LevelDB is closed. Use weak references
* to ensure that the iterator can be GCed, when it is only referenced here.
*/
private final ConcurrentLinkedQueue<SoftReference<LevelDBIterator<?>>> iteratorTracker;
private final ConcurrentLinkedQueue<Reference<LevelDBIterator<?>>> iteratorTracker;
public LevelDB(File path) throws Exception {
this(path, new KVStoreSerializer());
@ -250,7 +251,7 @@ public class LevelDB implements KVStore {
public Iterator<T> iterator() {
try {
LevelDBIterator<T> it = new LevelDBIterator<>(type, LevelDB.this, this);
iteratorTracker.add(new SoftReference<>(it));
iteratorTracker.add(new WeakReference<>(it));
return it;
} catch (Exception e) {
throw Throwables.propagate(e);
@ -301,7 +302,7 @@ public class LevelDB implements KVStore {
try {
if (iteratorTracker != null) {
for (SoftReference<LevelDBIterator<?>> ref: iteratorTracker) {
for (Reference<LevelDBIterator<?>> ref: iteratorTracker) {
LevelDBIterator<?> it = ref.get();
if (it != null) {
it.close();