[SPARK-36619][SS] Fix bugs around prefix-scan for HDFS backed state store and RocksDB state store

### What changes were proposed in this pull request?

This PR proposes to fix bugs around prefix-scan for both HDFS backed state store and RocksDB state store.

> HDFS backed state store

We did "shallow-copy" on copying prefix map, which leads the values of prefix map (mutable Set) to be "same instances" across multiple versions. This PR fixes it via creating a new mutable Set and copying elements.

> RocksDB state store

Prefix-scan iterators are only closed on RocksDB.rollback(), which is only called in RocksDBStateStore.abort().

While `RocksDBStateStore.abort()` method will be called for streaming session window (since it has two physical plans for read and write), other stateful operators which only have read-write physical plan will call either commit or abort, and don't close the iterators on committing. These unclosed iterators can be "reused" and produce incorrect outputs.

This PR ensures that resetting prefix-scan iterators is done on loading RocksDB, which was only done in rollback.

### Why are the changes needed?

Please refer the above section on explanation of bugs and treatments.

### Does this PR introduce _any_ user-facing change?

No.

### How was this patch tested?

Modified UT which failed without this PR and passes with this PR.

Closes #33870 from HeartSaVioR/SPARK-36619.

Authored-by: Jungtaek Lim <kabhwan.opensource@gmail.com>
Signed-off-by: Gengliang Wang <gengliang@apache.org>
(cherry picked from commit 60a72c938a)
Signed-off-by: Gengliang Wang <gengliang@apache.org>
This commit is contained in:
Jungtaek Lim 2021-09-01 00:51:45 +08:00 committed by Gengliang Wang
parent 8be53c3298
commit 9a71c4ca84
3 changed files with 61 additions and 19 deletions

View file

@ -129,7 +129,14 @@ class PrefixScannableHDFSBackedStateStoreMap(
other match { other match {
case o: PrefixScannableHDFSBackedStateStoreMap => case o: PrefixScannableHDFSBackedStateStoreMap =>
map.putAll(o.map) map.putAll(o.map)
prefixKeyToKeysMap.putAll(o.prefixKeyToKeysMap) o.prefixKeyToKeysMap.asScala.foreach { case (prefixKey, keySet) =>
// Here we create a copy version of Set. Shallow-copying the prefix key map will lead
// two maps having the same Set "instances" for values, meaning modifying the prefix map
// on newer version will also affect on the prefix map on older version.
val newSet = new mutable.HashSet[UnsafeRow]()
newSet ++= keySet
prefixKeyToKeysMap.put(prefixKey, newSet)
}
case _ => other.iterator().foreach { pair => put(pair.key, pair.value) } case _ => other.iterator().foreach { pair => put(pair.key, pair.value) }
} }

View file

@ -120,6 +120,8 @@ class RocksDB(
if (conf.resetStatsOnLoad) { if (conf.resetStatsOnLoad) {
nativeStats.reset nativeStats.reset
} }
// reset resources to prevent side-effects from previous loaded version
closePrefixScanIterators()
writeBatch.clear() writeBatch.clear()
logInfo(s"Loaded $version") logInfo(s"Loaded $version")
} catch { } catch {
@ -290,8 +292,7 @@ class RocksDB(
* Drop uncommitted changes, and roll back to previous version. * Drop uncommitted changes, and roll back to previous version.
*/ */
def rollback(): Unit = { def rollback(): Unit = {
prefixScanReuseIter.entrySet().asScala.foreach(_.getValue.close()) closePrefixScanIterators()
prefixScanReuseIter.clear()
writeBatch.clear() writeBatch.clear()
numKeysOnWritingVersion = numKeysOnLoadedVersion numKeysOnWritingVersion = numKeysOnLoadedVersion
release() release()
@ -307,8 +308,7 @@ class RocksDB(
/** Release all resources */ /** Release all resources */
def close(): Unit = { def close(): Unit = {
prefixScanReuseIter.entrySet().asScala.foreach(_.getValue.close()) closePrefixScanIterators()
prefixScanReuseIter.clear()
try { try {
closeDB() closeDB()
@ -411,6 +411,11 @@ class RocksDB(
acquireLock.notifyAll() acquireLock.notifyAll()
} }
private def closePrefixScanIterators(): Unit = {
prefixScanReuseIter.entrySet().asScala.foreach(_.getValue.close())
prefixScanReuseIter.clear()
}
private def getDBProperty(property: String): Long = { private def getDBProperty(property: String): Long = {
db.getProperty(property).toLong db.getProperty(property).toLong
} }

View file

@ -803,18 +803,16 @@ abstract class StateStoreSuiteBase[ProviderClass <: StateStoreProvider]
// Verify state before starting a new set of updates // Verify state before starting a new set of updates
assert(getLatestData(provider).isEmpty) assert(getLatestData(provider).isEmpty)
val store = provider.getStore(0) var store = provider.getStore(0)
val key1 = Seq("a", "b", "c")
val key2 = Seq(1, 2, 3)
val keys = for (k1 <- key1; k2 <- key2) yield (k1, k2)
def putCompositeKeys(keys: Seq[(String, Int)]): Unit = {
val randomizedKeys = scala.util.Random.shuffle(keys.toList) val randomizedKeys = scala.util.Random.shuffle(keys.toList)
randomizedKeys.foreach { case (key1, key2) => randomizedKeys.foreach { case (key1, key2) =>
put(store, key1, key2, key2) put(store, key1, key2, key2)
} }
}
def verifyScan(key1: Seq[String], key2: Seq[Int]): Unit = {
key1.foreach { k1 => key1.foreach { k1 =>
val keyValueSet = store.prefixScan(dataToPrefixKeyRow(k1)).map { pair => val keyValueSet = store.prefixScan(dataToPrefixKeyRow(k1)).map { pair =>
rowPairToDataPair(pair.withRows(pair.key.copy(), pair.value.copy())) rowPairToDataPair(pair.withRows(pair.key.copy(), pair.value.copy()))
@ -822,8 +820,40 @@ abstract class StateStoreSuiteBase[ProviderClass <: StateStoreProvider]
assert(keyValueSet === key2.map(k2 => ((k1, k2), k2)).toSet) assert(keyValueSet === key2.map(k2 => ((k1, k2), k2)).toSet)
} }
}
val key1AtVersion0 = Seq("a", "b", "c")
val key2AtVersion0 = Seq(1, 2, 3)
val keysAtVersion0 = for (k1 <- key1AtVersion0; k2 <- key2AtVersion0) yield (k1, k2)
putCompositeKeys(keysAtVersion0)
verifyScan(key1AtVersion0, key2AtVersion0)
assert(store.prefixScan(dataToPrefixKeyRow("non-exist")).isEmpty) assert(store.prefixScan(dataToPrefixKeyRow("non-exist")).isEmpty)
// committing and loading the version 1 (the version being committed)
store.commit()
store = provider.getStore(1)
// before putting the new key-value pairs, verify prefix scan works for existing keys
verifyScan(key1AtVersion0, key2AtVersion0)
val key1AtVersion1 = Seq("c", "d")
val key2AtVersion1 = Seq(4, 5, 6)
val keysAtVersion1 = for (k1 <- key1AtVersion1; k2 <- key2AtVersion1) yield (k1, k2)
// put a new key-value pairs, and verify that prefix scan reflects the changes
putCompositeKeys(keysAtVersion1)
verifyScan(Seq("c"), Seq(1, 2, 3, 4, 5, 6))
verifyScan(Seq("d"), Seq(4, 5, 6))
// aborting and loading the version 1 again (keysAtVersion1 should be rolled back)
store.abort()
store = provider.getStore(1)
// prefix scan should not reflect the uncommitted changes
verifyScan(key1AtVersion0, key2AtVersion0)
verifyScan(Seq("d"), Seq.empty)
} }
testWithAllCodec("numKeys metrics") { testWithAllCodec("numKeys metrics") {