From 2ec9b866285fc059cae6816033babca64b4da7ec Mon Sep 17 00:00:00 2001 From: Zhen Li Date: Tue, 16 Jun 2020 12:59:57 -0500 Subject: [PATCH] [SPARK-31929][WEBUI] Close leveldbiterator when leveldb.close ### What changes were proposed in this pull request? Close LevelDBIterator when LevelDB.close() is called. ### Why are the changes needed? This pull request would prevent JNI resources leaking from Level DB instance and its' iterators. In before implementation JNI resources from LevelDBIterator are cleaned by finalize() function. This behavior is also mentioned in comments of ["LevelDBIterator.java"](https://github.com/apache/spark/blob/master/common/kvstore/src/main/java/org/apache/spark/util/kvstore/LevelDBIterator.java) by squito . But if DB instance is already closed, then iterator's close method would be ignored. LevelDB's iterator would keep level db files opened (for the case table cache is filled up), till iterator.close() is called. Then these JNI resources (file handle) would be leaked. This JNI resource leaking issue would cause the problem described in [SPARK-31929](https://issues.apache.org/jira/browse/SPARK-31929) on Windows: in spark history server, leaked file handle for level db files would trigger "IOException" when HistoryServerDiskManager try to remove them for releasing disk space. ![IOException](https://user-images.githubusercontent.com/10524738/84134659-7c388680-aa7b-11ea-807f-04dcfa7886a0.JPG) ### Does this PR introduce _any_ user-facing change? No ### How was this patch tested? Add unit test and manually tested it. Closes #28769 from zhli1142015/close-leveldbiterator-when-leveldb.close. Authored-by: Zhen Li Signed-off-by: Sean Owen --- .../apache/spark/util/kvstore/LevelDB.java | 32 ++++++++++++++++- .../spark/util/kvstore/LevelDBIterator.java | 1 + .../spark/util/kvstore/LevelDBSuite.java | 36 +++++++++++++++++++ .../apache/spark/status/AppStatusStore.scala | 8 ++++- 4 files changed, 75 insertions(+), 2 deletions(-) diff --git a/common/kvstore/src/main/java/org/apache/spark/util/kvstore/LevelDB.java b/common/kvstore/src/main/java/org/apache/spark/util/kvstore/LevelDB.java index 2ca4b0b2cb..98f33b70fe 100644 --- a/common/kvstore/src/main/java/org/apache/spark/util/kvstore/LevelDB.java +++ b/common/kvstore/src/main/java/org/apache/spark/util/kvstore/LevelDB.java @@ -19,8 +19,10 @@ package org.apache.spark.util.kvstore; import java.io.File; import java.io.IOException; +import java.lang.ref.SoftReference; import java.util.*; import java.util.concurrent.ConcurrentHashMap; +import java.util.concurrent.ConcurrentLinkedQueue; import java.util.concurrent.ConcurrentMap; import java.util.concurrent.atomic.AtomicReference; import static java.nio.charset.StandardCharsets.UTF_8; @@ -64,6 +66,13 @@ public class LevelDB implements KVStore { private final ConcurrentMap typeAliases; private final ConcurrentMap, LevelDBTypeInfo> types; + /** + * Trying to close a JNI LevelDB handle with a closed DB causes JVM crashes. This is used to + * ensure that all iterators are correctly closed before LevelDB is closed. Use soft reference + * to ensure that the iterator can be GCed, when it is only referenced here. + */ + private final ConcurrentLinkedQueue>> iteratorTracker; + public LevelDB(File path) throws Exception { this(path, new KVStoreSerializer()); } @@ -94,6 +103,8 @@ public class LevelDB implements KVStore { aliases = new HashMap<>(); } typeAliases = new ConcurrentHashMap<>(aliases); + + iteratorTracker = new ConcurrentLinkedQueue<>(); } @Override @@ -189,7 +200,9 @@ public class LevelDB implements KVStore { @Override public Iterator iterator() { try { - return new LevelDBIterator<>(type, LevelDB.this, this); + LevelDBIterator it = new LevelDBIterator<>(type, LevelDB.this, this); + iteratorTracker.add(new SoftReference<>(it)); + return it; } catch (Exception e) { throw Throwables.propagate(e); } @@ -238,6 +251,14 @@ public class LevelDB implements KVStore { } try { + if (iteratorTracker != null) { + for (SoftReference> ref: iteratorTracker) { + LevelDBIterator it = ref.get(); + if (it != null) { + it.close(); + } + } + } _db.close(); } catch (IOException ioe) { throw ioe; @@ -252,6 +273,7 @@ public class LevelDB implements KVStore { * with a closed DB can cause JVM crashes, so this ensures that situation does not happen. */ void closeIterator(LevelDBIterator it) throws IOException { + notifyIteratorClosed(it); synchronized (this._db) { DB _db = this._db.get(); if (_db != null) { @@ -260,6 +282,14 @@ public class LevelDB implements KVStore { } } + /** + * Remove iterator from iterator tracker. `LevelDBIterator` calls it to notify + * iterator is closed. + */ + void notifyIteratorClosed(LevelDBIterator it) { + iteratorTracker.removeIf(ref -> it.equals(ref.get())); + } + /** Returns metadata about indices for the given type. */ LevelDBTypeInfo getTypeInfo(Class type) throws Exception { LevelDBTypeInfo ti = types.get(type); diff --git a/common/kvstore/src/main/java/org/apache/spark/util/kvstore/LevelDBIterator.java b/common/kvstore/src/main/java/org/apache/spark/util/kvstore/LevelDBIterator.java index 94e8c9fc57..e8fb4fac5b 100644 --- a/common/kvstore/src/main/java/org/apache/spark/util/kvstore/LevelDBIterator.java +++ b/common/kvstore/src/main/java/org/apache/spark/util/kvstore/LevelDBIterator.java @@ -185,6 +185,7 @@ class LevelDBIterator implements KVStoreIterator { @Override public synchronized void close() throws IOException { + db.notifyIteratorClosed(this); if (!closed) { it.close(); closed = true; diff --git a/common/kvstore/src/test/java/org/apache/spark/util/kvstore/LevelDBSuite.java b/common/kvstore/src/test/java/org/apache/spark/util/kvstore/LevelDBSuite.java index 0b755ba0e8..f656661776 100644 --- a/common/kvstore/src/test/java/org/apache/spark/util/kvstore/LevelDBSuite.java +++ b/common/kvstore/src/test/java/org/apache/spark/util/kvstore/LevelDBSuite.java @@ -19,6 +19,7 @@ package org.apache.spark.util.kvstore; import java.io.File; import java.util.Arrays; +import java.util.Iterator; import java.util.List; import java.util.NoSuchElementException; import java.util.stream.Collectors; @@ -276,6 +277,41 @@ public class LevelDBSuite { assertEquals(expected, results); } + @Test + public void testCloseLevelDBIterator() throws Exception { + // SPARK-31929: test when LevelDB.close() is called, related LevelDBIterators + // are closed. And files opened by iterators are also closed. + File dbPathForCloseTest = File + .createTempFile( + "test_db_close.", + ".ldb"); + dbPathForCloseTest.delete(); + LevelDB dbForCloseTest = new LevelDB(dbPathForCloseTest); + for (int i = 0; i < 8192; i++) { + dbForCloseTest.write(createCustomType1(i)); + } + String key = dbForCloseTest + .view(CustomType1.class).iterator().next().key; + assertEquals("key0", key); + Iterator it0 = dbForCloseTest + .view(CustomType1.class).max(1).iterator(); + while (it0.hasNext()) { + it0.next(); + } + System.gc(); + Iterator it1 = dbForCloseTest + .view(CustomType1.class).iterator(); + assertEquals("key0", it1.next().key); + try (KVStoreIterator it2 = dbForCloseTest + .view(CustomType1.class).closeableIterator()) { + assertEquals("key0", it2.next().key); + } + dbForCloseTest.close(); + assertTrue(dbPathForCloseTest.exists()); + FileUtils.deleteQuietly(dbPathForCloseTest); + assertTrue(!dbPathForCloseTest.exists()); + } + private CustomType1 createCustomType1(int i) { CustomType1 t = new CustomType1(); t.key = "key" + i; diff --git a/core/src/main/scala/org/apache/spark/status/AppStatusStore.scala b/core/src/main/scala/org/apache/spark/status/AppStatusStore.scala index 31a6f7d901..106d272948 100644 --- a/core/src/main/scala/org/apache/spark/status/AppStatusStore.scala +++ b/core/src/main/scala/org/apache/spark/status/AppStatusStore.scala @@ -39,7 +39,13 @@ private[spark] class AppStatusStore( def applicationInfo(): v1.ApplicationInfo = { try { // The ApplicationInfo may not be available when Spark is starting up. - store.view(classOf[ApplicationInfoWrapper]).max(1).iterator().next().info + Utils.tryWithResource( + store.view(classOf[ApplicationInfoWrapper]) + .max(1) + .closeableIterator() + ) { it => + it.next().info + } } catch { case _: NoSuchElementException => throw new NoSuchElementException("Failed to get the application information. " +