[SPARK-31929][WEBUI] Close leveldbiterator when leveldb.close

### What changes were proposed in this pull request?

Close LevelDBIterator when LevelDB.close() is called.

### Why are the changes needed?

This pull request would prevent JNI resources leaking from Level DB instance and its' iterators. In before implementation JNI resources from LevelDBIterator are cleaned by finalize() function. This behavior is also mentioned in comments of ["LevelDBIterator.java"](https://github.com/apache/spark/blob/master/common/kvstore/src/main/java/org/apache/spark/util/kvstore/LevelDBIterator.java) by squito . But if DB instance is already closed, then iterator's close method would be ignored. LevelDB's iterator would keep level db files opened (for the case table cache is filled up), till iterator.close() is called. Then these JNI resources (file handle) would be leaked.
This JNI resource leaking issue would cause the problem described in [SPARK-31929](https://issues.apache.org/jira/browse/SPARK-31929) on Windows: in spark history server, leaked file handle for level db files would trigger "IOException" when HistoryServerDiskManager try to remove them for releasing disk space.
![IOException](https://user-images.githubusercontent.com/10524738/84134659-7c388680-aa7b-11ea-807f-04dcfa7886a0.JPG)

### Does this PR introduce _any_ user-facing change?

No

### How was this patch tested?

Add unit test and manually tested it.

Closes #28769 from zhli1142015/close-leveldbiterator-when-leveldb.close.

Authored-by: Zhen Li <zhli@microsoft.com>
Signed-off-by: Sean Owen <srowen@gmail.com>
This commit is contained in:
Zhen Li 2020-06-16 12:59:57 -05:00 committed by Sean Owen
parent 8d577092ed
commit 2ec9b86628
4 changed files with 75 additions and 2 deletions

View file

@ -19,8 +19,10 @@ package org.apache.spark.util.kvstore;
import java.io.File;
import java.io.IOException;
import java.lang.ref.SoftReference;
import java.util.*;
import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.ConcurrentLinkedQueue;
import java.util.concurrent.ConcurrentMap;
import java.util.concurrent.atomic.AtomicReference;
import static java.nio.charset.StandardCharsets.UTF_8;
@ -64,6 +66,13 @@ public class LevelDB implements KVStore {
private final ConcurrentMap<String, byte[]> typeAliases;
private final ConcurrentMap<Class<?>, LevelDBTypeInfo> types;
/**
* Trying to close a JNI LevelDB handle with a closed DB causes JVM crashes. This is used to
* ensure that all iterators are correctly closed before LevelDB is closed. Use soft reference
* to ensure that the iterator can be GCed, when it is only referenced here.
*/
private final ConcurrentLinkedQueue<SoftReference<LevelDBIterator<?>>> iteratorTracker;
public LevelDB(File path) throws Exception {
this(path, new KVStoreSerializer());
}
@ -94,6 +103,8 @@ public class LevelDB implements KVStore {
aliases = new HashMap<>();
}
typeAliases = new ConcurrentHashMap<>(aliases);
iteratorTracker = new ConcurrentLinkedQueue<>();
}
@Override
@ -189,7 +200,9 @@ public class LevelDB implements KVStore {
@Override
public Iterator<T> iterator() {
try {
return new LevelDBIterator<>(type, LevelDB.this, this);
LevelDBIterator<T> it = new LevelDBIterator<>(type, LevelDB.this, this);
iteratorTracker.add(new SoftReference<>(it));
return it;
} catch (Exception e) {
throw Throwables.propagate(e);
}
@ -238,6 +251,14 @@ public class LevelDB implements KVStore {
}
try {
if (iteratorTracker != null) {
for (SoftReference<LevelDBIterator<?>> ref: iteratorTracker) {
LevelDBIterator<?> it = ref.get();
if (it != null) {
it.close();
}
}
}
_db.close();
} catch (IOException ioe) {
throw ioe;
@ -252,6 +273,7 @@ public class LevelDB implements KVStore {
* with a closed DB can cause JVM crashes, so this ensures that situation does not happen.
*/
void closeIterator(LevelDBIterator<?> it) throws IOException {
notifyIteratorClosed(it);
synchronized (this._db) {
DB _db = this._db.get();
if (_db != null) {
@ -260,6 +282,14 @@ public class LevelDB implements KVStore {
}
}
/**
* Remove iterator from iterator tracker. `LevelDBIterator` calls it to notify
* iterator is closed.
*/
void notifyIteratorClosed(LevelDBIterator<?> it) {
iteratorTracker.removeIf(ref -> it.equals(ref.get()));
}
/** Returns metadata about indices for the given type. */
LevelDBTypeInfo getTypeInfo(Class<?> type) throws Exception {
LevelDBTypeInfo ti = types.get(type);

View file

@ -185,6 +185,7 @@ class LevelDBIterator<T> implements KVStoreIterator<T> {
@Override
public synchronized void close() throws IOException {
db.notifyIteratorClosed(this);
if (!closed) {
it.close();
closed = true;

View file

@ -19,6 +19,7 @@ package org.apache.spark.util.kvstore;
import java.io.File;
import java.util.Arrays;
import java.util.Iterator;
import java.util.List;
import java.util.NoSuchElementException;
import java.util.stream.Collectors;
@ -276,6 +277,41 @@ public class LevelDBSuite {
assertEquals(expected, results);
}
@Test
public void testCloseLevelDBIterator() throws Exception {
// SPARK-31929: test when LevelDB.close() is called, related LevelDBIterators
// are closed. And files opened by iterators are also closed.
File dbPathForCloseTest = File
.createTempFile(
"test_db_close.",
".ldb");
dbPathForCloseTest.delete();
LevelDB dbForCloseTest = new LevelDB(dbPathForCloseTest);
for (int i = 0; i < 8192; i++) {
dbForCloseTest.write(createCustomType1(i));
}
String key = dbForCloseTest
.view(CustomType1.class).iterator().next().key;
assertEquals("key0", key);
Iterator<CustomType1> it0 = dbForCloseTest
.view(CustomType1.class).max(1).iterator();
while (it0.hasNext()) {
it0.next();
}
System.gc();
Iterator<CustomType1> it1 = dbForCloseTest
.view(CustomType1.class).iterator();
assertEquals("key0", it1.next().key);
try (KVStoreIterator<CustomType1> it2 = dbForCloseTest
.view(CustomType1.class).closeableIterator()) {
assertEquals("key0", it2.next().key);
}
dbForCloseTest.close();
assertTrue(dbPathForCloseTest.exists());
FileUtils.deleteQuietly(dbPathForCloseTest);
assertTrue(!dbPathForCloseTest.exists());
}
private CustomType1 createCustomType1(int i) {
CustomType1 t = new CustomType1();
t.key = "key" + i;

View file

@ -39,7 +39,13 @@ private[spark] class AppStatusStore(
def applicationInfo(): v1.ApplicationInfo = {
try {
// The ApplicationInfo may not be available when Spark is starting up.
store.view(classOf[ApplicationInfoWrapper]).max(1).iterator().next().info
Utils.tryWithResource(
store.view(classOf[ApplicationInfoWrapper])
.max(1)
.closeableIterator()
) { it =>
it.next().info
}
} catch {
case _: NoSuchElementException =>
throw new NoSuchElementException("Failed to get the application information. " +