[SPARK-36667][SS][TEST] Close resources properly in StateStoreSuite/RocksDBStateStoreSuite

### What changes were proposed in this pull request?

This PR proposes to ensure StateStoreProvider instances are properly closed for each test in StateStoreSuite/RocksDBStateStoreSuite.

### Why are the changes needed?

While this doesn't break the test, this is a bad practice and may possibly make nasty problems in the future.

### Does this PR introduce _any_ user-facing change?

No.

### How was this patch tested?

Existing UTs

Closes #33916 from HeartSaVioR/SPARK-36667.

Authored-by: Jungtaek Lim <kabhwan.opensource@gmail.com>
Signed-off-by: Liang-Chi Hsieh <viirya@gmail.com>
This commit is contained in:
Jungtaek Lim 2021-09-06 17:40:03 -07:00 committed by Liang-Chi Hsieh
parent 0ab0cb108d
commit 093c2080fe
2 changed files with 521 additions and 465 deletions

View file

@ -36,12 +36,22 @@ import org.apache.spark.util.Utils
class RocksDBStateStoreSuite extends StateStoreSuiteBase[RocksDBStateStoreProvider]
with BeforeAndAfter {
before {
StateStore.stop()
require(!StateStore.isMaintenanceRunning)
}
after {
StateStore.stop()
require(!StateStore.isMaintenanceRunning)
}
import StateStoreTestsHelper._
test("version encoding") {
import RocksDBStateStoreProvider._
val provider = newStoreProvider()
tryWithProviderResource(newStoreProvider()) { provider =>
val store = provider.getStore(0)
val keyRow = dataToKeyRow("a", 0)
val valueRow = dataToValueRow(1)
@ -54,6 +64,7 @@ class RocksDBStateStoreSuite extends StateStoreSuiteBase[RocksDBStateStoreProvid
assert(Platform.getByte(kv.key, Platform.BYTE_ARRAY_OFFSET) === STATE_ENCODING_VERSION)
assert(Platform.getByte(kv.value, Platform.BYTE_ARRAY_OFFSET) === STATE_ENCODING_VERSION)
}
}
test("RocksDB confs are passed correctly from SparkSession to db instance") {
val sparkConf = new SparkConf().setMaster("local").setAppName(this.getClass.getSimpleName)
@ -100,7 +111,7 @@ class RocksDBStateStoreSuite extends StateStoreSuiteBase[RocksDBStateStoreProvid
metricPair.get._2
}
val provider = newStoreProvider()
tryWithProviderResource(newStoreProvider()) { provider =>
val store = provider.getStore(0)
// Verify state after updating
put(store, "a", 0, 1)
@ -114,6 +125,7 @@ class RocksDBStateStoreSuite extends StateStoreSuiteBase[RocksDBStateStoreProvid
assert(getCustomMetric(storeMetrics, CUSTOM_METRIC_BYTES_COPIED) > 0L)
assert(getCustomMetric(storeMetrics, CUSTOM_METRIC_ZIP_FILE_BYTES_UNCOMPRESSED) > 0L)
}
}
override def newStoreProvider(): RocksDBStateStoreProvider = {
newStoreProvider(StateStoreId(newDir(), Random.nextInt(), 0))
@ -145,10 +157,11 @@ class RocksDBStateStoreSuite extends StateStoreSuiteBase[RocksDBStateStoreProvid
override def getData(
provider: RocksDBStateStoreProvider,
version: Int = -1): Set[((String, Int), Int)] = {
val reloadedProvider = newStoreProvider(provider.stateStoreId)
tryWithProviderResource(newStoreProvider(provider.stateStoreId)) { reloadedProvider =>
val versionToRead = if (version < 0) reloadedProvider.latestVersion else version
reloadedProvider.getStore(versionToRead).iterator().map(rowPairToDataPair).toSet
}
}
override def newStoreProvider(
minDeltasForSnapshot: Int,

View file

@ -62,7 +62,8 @@ class StateStoreSuite extends StateStoreSuiteBase[HDFSBackedStateStoreProvider]
}
test("retaining only two latest versions when MAX_BATCHES_TO_RETAIN_IN_MEMORY set to 2") {
val provider = newStoreProvider(minDeltasForSnapshot = 10, numOfVersToRetainInMemory = 2)
tryWithProviderResource(
newStoreProvider(minDeltasForSnapshot = 10, numOfVersToRetainInMemory = 2)) { provider =>
var currentVersion = 0
@ -90,10 +91,11 @@ class StateStoreSuite extends StateStoreSuiteBase[HDFSBackedStateStoreProvider]
checkVersion(loadedMaps, 3, Map(("a", 0) -> 3))
checkVersion(loadedMaps, 2, Map(("a", 0) -> 2))
}
}
test("failure after committing with MAX_BATCHES_TO_RETAIN_IN_MEMORY set to 1") {
val provider = newStoreProvider(opId = Random.nextInt, partition = 0,
numOfVersToRetainInMemory = 1)
tryWithProviderResource(newStoreProvider(opId = Random.nextInt, partition = 0,
numOfVersToRetainInMemory = 1)) { provider =>
var currentVersion = 0
@ -130,10 +132,11 @@ class StateStoreSuite extends StateStoreSuiteBase[HDFSBackedStateStoreProvider]
checkLoadedVersions(loadedMaps, count = 1, earliestKey = 2, latestKey = 2)
checkVersion(loadedMaps, 2, Map(("a", 0) -> -2))
}
}
test("no cache data with MAX_BATCHES_TO_RETAIN_IN_MEMORY set to 0") {
val provider = newStoreProvider(opId = Random.nextInt, partition = 0,
numOfVersToRetainInMemory = 0)
tryWithProviderResource(newStoreProvider(opId = Random.nextInt, partition = 0,
numOfVersToRetainInMemory = 0)) { provider =>
var currentVersion = 0
@ -149,9 +152,11 @@ class StateStoreSuite extends StateStoreSuiteBase[HDFSBackedStateStoreProvider]
loadedMaps = provider.getLoadedMaps()
assert(loadedMaps.size() === 0)
}
}
test("cleaning") {
val provider = newStoreProvider(opId = Random.nextInt, partition = 0, minDeltasForSnapshot = 5)
tryWithProviderResource(newStoreProvider(opId = Random.nextInt, partition = 0,
minDeltasForSnapshot = 5)) { provider =>
for (i <- 1 to 20) {
val store = provider.getStore(i - 1)
@ -169,13 +174,16 @@ class StateStoreSuite extends StateStoreSuiteBase[HDFSBackedStateStoreProvider]
assert(getData(provider, 20) === Set(("a", 0) -> 20))
assert(getData(provider, 19) === Set(("a", 0) -> 19))
}
}
testQuietly("SPARK-19677: Committing a delta file atop an existing one should not fail on HDFS") {
val conf = new Configuration()
conf.set("fs.fake.impl", classOf[RenameLikeHDFSFileSystem].getName)
conf.set("fs.defaultFS", "fake:///")
val provider = newStoreProvider(opId = Random.nextInt, partition = 0, hadoopConf = conf)
tryWithProviderResource(
newStoreProvider(opId = Random.nextInt, partition = 0, hadoopConf = conf)) { provider =>
provider.getStore(0).commit()
provider.getStore(0).commit()
@ -184,9 +192,12 @@ class StateStoreSuite extends StateStoreSuiteBase[HDFSBackedStateStoreProvider]
null, true).asScala.filter(_.getName.startsWith("temp-"))
assert(tempFiles.isEmpty)
}
}
test("corrupted file handling") {
val provider = newStoreProvider(opId = Random.nextInt, partition = 0, minDeltasForSnapshot = 5)
tryWithProviderResource(newStoreProvider(opId = Random.nextInt, partition = 0,
minDeltasForSnapshot = 5)) { provider =>
for (i <- 1 to 6) {
val store = provider.getStore(i - 1)
put(store, "a", 0, i)
@ -216,6 +227,7 @@ class StateStoreSuite extends StateStoreSuiteBase[HDFSBackedStateStoreProvider]
getData(provider, snapshotVersion - 1)
}
}
}
test("reports memory usage on current version") {
def getSizeOfStateForCurrentVersion(metrics: StateStoreMetrics): Long = {
@ -224,7 +236,7 @@ class StateStoreSuite extends StateStoreSuiteBase[HDFSBackedStateStoreProvider]
metricPair.get._2
}
val provider = newStoreProvider()
tryWithProviderResource(newStoreProvider()) { provider =>
val store = provider.getStore(0)
val noDataMemoryUsed = getSizeOfStateForCurrentVersion(store.metrics)
@ -232,6 +244,7 @@ class StateStoreSuite extends StateStoreSuiteBase[HDFSBackedStateStoreProvider]
store.commit()
assert(getSizeOfStateForCurrentVersion(store.metrics) > noDataMemoryUsed)
}
}
test("maintenance") {
val conf = new SparkConf()
@ -252,7 +265,6 @@ class StateStoreSuite extends StateStoreSuiteBase[HDFSBackedStateStoreProvider]
sqlConf.setConf(SQLConf.STREAMING_MAINTENANCE_INTERVAL, 10L)
val storeConf = StateStoreConf(sqlConf)
val hadoopConf = new Configuration()
val provider = newStoreProvider(storeProviderId1.storeId)
var latestStoreVersion = 0
@ -285,20 +297,24 @@ class StateStoreSuite extends StateStoreSuiteBase[HDFSBackedStateStoreProvider]
assert(StateStore.isMaintenanceRunning, "Maintenance task is not running")
// Some snapshots should have been generated
tryWithProviderResource(newStoreProvider(storeProviderId1.storeId)) { provider =>
val snapshotVersions = (1 to latestStoreVersion).filter { version =>
fileExists(provider, version, isSnapshot = true)
}
assert(snapshotVersions.nonEmpty, "no snapshot file found")
}
}
// Generate more versions such that there is another snapshot and
// the earliest delta file will be cleaned up
generateStoreVersions()
// Earliest delta file should get cleaned up
tryWithProviderResource(newStoreProvider(storeProviderId1.storeId)) { provider =>
eventually(timeout(timeoutDuration)) {
assert(!fileExists(provider, 1, isSnapshot = false), "earliest file not deleted")
}
}
// If driver decides to deactivate all stores related to a query run,
// then this instance should be unloaded
@ -346,7 +362,8 @@ class StateStoreSuite extends StateStoreSuiteBase[HDFSBackedStateStoreProvider]
}
test("snapshotting") {
val provider = newStoreProvider(minDeltasForSnapshot = 5, numOfVersToRetainInMemory = 2)
tryWithProviderResource(
newStoreProvider(minDeltasForSnapshot = 5, numOfVersToRetainInMemory = 2)) { provider =>
var currentVersion = 0
@ -365,7 +382,9 @@ class StateStoreSuite extends StateStoreSuiteBase[HDFSBackedStateStoreProvider]
require(getLatestData(provider) === Set(("a", 0) -> 6), "store not updated correctly")
provider.doMaintenance() // should generate snapshot files
val snapshotVersion = (0 to 6).find(version => fileExists(provider, version, isSnapshot = true))
val snapshotVersion = (0 to 6).find { version =>
fileExists(provider, version, isSnapshot = true)
}
assert(snapshotVersion.nonEmpty, "snapshot file not generated")
deleteFilesEarlierThanVersion(provider, snapshotVersion.get)
assert(
@ -388,19 +407,22 @@ class StateStoreSuite extends StateStoreSuiteBase[HDFSBackedStateStoreProvider]
deleteFilesEarlierThanVersion(provider, latestSnapshotVersion.get)
assert(getLatestData(provider) === Set(("a", 0) -> 20), "snapshotting messed up the data")
}
}
testQuietly("SPARK-18342: commit fails when rename fails") {
import RenameReturnsFalseFileSystem._
val dir = scheme + "://" + newDir()
val conf = new Configuration()
conf.set(s"fs.$scheme.impl", classOf[RenameReturnsFalseFileSystem].getName)
val provider = newStoreProvider(
opId = Random.nextInt, partition = 0, dir = dir, hadoopConf = conf)
tryWithProviderResource(newStoreProvider(
opId = Random.nextInt, partition = 0, dir = dir, hadoopConf = conf)) { provider =>
val store = provider.getStore(0)
put(store, "a", 0, 0)
val e = intercept[IllegalStateException](store.commit())
assert(e.getCause.getMessage.contains("Failed to rename"))
}
}
test("SPARK-18416: do not create temp delta file until the store is updated") {
val dir = newDir()
@ -528,8 +550,8 @@ class StateStoreSuite extends StateStoreSuiteBase[HDFSBackedStateStoreProvider]
classOf[CreateAtomicTestManager].getName)
val remoteDir = Utils.createTempDir().getAbsolutePath
val provider = newStoreProvider(
opId = Random.nextInt, partition = 0, dir = remoteDir, hadoopConf = hadoopConf)
tryWithProviderResource(newStoreProvider(opId = Random.nextInt, partition = 0,
dir = remoteDir, hadoopConf = hadoopConf)) { provider =>
// Disable failure of output stream and generate versions
CreateAtomicTestManager.shouldFailInCreateAtomic = false
@ -556,6 +578,7 @@ class StateStoreSuite extends StateStoreSuiteBase[HDFSBackedStateStoreProvider]
store2.abort()
assert(CreateAtomicTestManager.cancelCalledInCreateAtomic)
}
}
test("expose metrics with custom metrics to StateStoreMetrics") {
def getCustomMetric(metrics: StateStoreMetrics, name: String): Long = {
@ -578,12 +601,13 @@ class StateStoreSuite extends StateStoreSuiteBase[HDFSBackedStateStoreProvider]
assert(cacheMissCount === expectedCacheMissCount)
}
val provider = newStoreProvider()
var store: StateStore = null
var loadedMapSizeForVersion1: Long = -1L
tryWithProviderResource(newStoreProvider()) { provider =>
// Verify state before starting a new set of updates
assert(getLatestData(provider).isEmpty)
val store = provider.getStore(0)
store = provider.getStore(0)
assert(!store.hasCommitted)
assert(store.metrics.numKeys === 0)
@ -604,7 +628,7 @@ class StateStoreSuite extends StateStoreSuiteBase[HDFSBackedStateStoreProvider]
assert(store.hasCommitted)
val loadedMapSizeForVersion1 = getLoadedMapSizeMetric(store.metrics)
loadedMapSizeForVersion1 = getLoadedMapSizeMetric(store.metrics)
assert(loadedMapSizeForVersion1 > initialLoadedMapSize)
assertCacheHitAndMiss(store.metrics, expectedCacheHitCount = 0, expectedCacheMissCount = 0)
@ -621,8 +645,9 @@ class StateStoreSuite extends StateStoreSuiteBase[HDFSBackedStateStoreProvider]
val loadedMapSizeForVersion1And2 = getLoadedMapSizeMetric(storeV2.metrics)
assert(loadedMapSizeForVersion1And2 > loadedMapSizeForVersion1)
assertCacheHitAndMiss(storeV2.metrics, expectedCacheHitCount = 1, expectedCacheMissCount = 0)
}
val reloadedProvider = newStoreProvider(store.id)
tryWithProviderResource(newStoreProvider(store.id)) { reloadedProvider =>
// intended to load version 2 instead of 1
// version 2 will not be loaded to the cache in provider
val reloadedStore = reloadedProvider.getStore(1)
@ -639,6 +664,7 @@ class StateStoreSuite extends StateStoreSuiteBase[HDFSBackedStateStoreProvider]
assertCacheHitAndMiss(reloadedStoreV2.metrics, expectedCacheHitCount = 0,
expectedCacheMissCount = 2)
}
}
override def newStoreProvider(): HDFSBackedStateStoreProvider = {
newStoreProvider(opId = Random.nextInt(), partition = 0)
@ -664,13 +690,14 @@ class StateStoreSuite extends StateStoreSuiteBase[HDFSBackedStateStoreProvider]
override def getData(
provider: HDFSBackedStateStoreProvider,
version: Int): Set[((String, Int), Int)] = {
val reloadedProvider = newStoreProvider(provider.stateStoreId)
tryWithProviderResource(newStoreProvider(provider.stateStoreId)) { reloadedProvider =>
if (version < 0) {
reloadedProvider.latestIterator().map(rowPairToDataPair).toSet
} else {
reloadedProvider.getStore(version).iterator().map(rowPairToDataPair).toSet
}
}
}
override def getDefaultSQLConf(
minDeltasForSnapshot: Int,
@ -751,8 +778,7 @@ abstract class StateStoreSuiteBase[ProviderClass <: StateStoreProvider]
protected val valueSchema: StructType = StateStoreTestsHelper.valueSchema
testWithAllCodec("get, put, remove, commit, and all data iterator") {
val provider = newStoreProvider()
tryWithProviderResource(newStoreProvider()) { provider =>
// Verify state before starting a new set of updates
assert(getLatestData(provider).isEmpty)
@ -788,7 +814,7 @@ abstract class StateStoreSuiteBase[ProviderClass <: StateStoreProvider]
}
// New updates to the reloaded store with new version, and does not change old version
val reloadedProvider = newStoreProvider(store.id)
tryWithProviderResource(newStoreProvider(store.id)) { reloadedProvider =>
val reloadedStore = reloadedProvider.getStore(1)
put(reloadedStore, "c", 0, 4)
assert(reloadedStore.commit() === 2)
@ -796,10 +822,11 @@ abstract class StateStoreSuiteBase[ProviderClass <: StateStoreProvider]
assert(getLatestData(provider) === Set(("b", 0) -> 2, ("c", 0) -> 4))
assert(getData(provider, version = 1) === Set(("b", 0) -> 2))
}
}
}
testWithAllCodec("prefix scan") {
val provider = newStoreProvider(numPrefixCols = 1)
tryWithProviderResource(newStoreProvider(numPrefixCols = 1)) { provider =>
// Verify state before starting a new set of updates
assert(getLatestData(provider).isEmpty)
@ -855,10 +882,10 @@ abstract class StateStoreSuiteBase[ProviderClass <: StateStoreProvider]
verifyScan(key1AtVersion0, key2AtVersion0)
verifyScan(Seq("d"), Seq.empty)
}
}
testWithAllCodec("numKeys metrics") {
val provider = newStoreProvider()
tryWithProviderResource(newStoreProvider()) { provider =>
// Verify state before starting a new set of updates
assert(getLatestData(provider).isEmpty)
@ -881,10 +908,10 @@ abstract class StateStoreSuiteBase[ProviderClass <: StateStoreProvider]
assert(rowPairsToDataSet(reloadedStore.iterator()) ===
Set(("a", 0) -> 1, ("c", 0) -> 3, ("d", 0) -> 4, ("e", 0) -> 5))
}
}
testWithAllCodec("removing while iterating") {
val provider = newStoreProvider()
tryWithProviderResource(newStoreProvider()) { provider =>
// Verify state before starting a new set of updates
assert(getLatestData(provider).isEmpty)
val store = provider.getStore(0)
@ -903,9 +930,10 @@ abstract class StateStoreSuiteBase[ProviderClass <: StateStoreProvider]
filtered2.foreach { tuple => store.remove(tuple.key) }
assert(get(store, "b", 0) === None)
}
}
testWithAllCodec("abort") {
val provider = newStoreProvider()
tryWithProviderResource(newStoreProvider()) { provider =>
val store = provider.getStore(0)
put(store, "a", 0, 1)
store.commit()
@ -916,10 +944,10 @@ abstract class StateStoreSuiteBase[ProviderClass <: StateStoreProvider]
put(store1, "b", 0, 1)
store1.abort()
}
}
testWithAllCodec("getStore with invalid versions") {
val provider = newStoreProvider()
tryWithProviderResource(newStoreProvider()) { provider =>
def checkInvalidVersion(version: Int): Unit = {
intercept[Exception] {
provider.getStore(version)
@ -950,6 +978,7 @@ abstract class StateStoreSuiteBase[ProviderClass <: StateStoreProvider]
checkInvalidVersion(-1)
checkInvalidVersion(3)
}
}
testWithAllCodec("two concurrent StateStores - one for read-only and one for read-write") {
// During Streaming Aggregation, we have two StateStores per task, one used as read-only in
@ -959,29 +988,34 @@ abstract class StateStoreSuiteBase[ProviderClass <: StateStoreProvider]
// accidentally lead to the deletion of state.
val dir = newDir()
val storeId = StateStoreId(dir, 0L, 1)
val provider0 = newStoreProvider(storeId)
// prime state
val store = provider0.getStore(0)
val key1 = "a"
val key2 = 0
tryWithProviderResource(newStoreProvider(storeId)) { provider0 =>
// prime state
val store = provider0.getStore(0)
put(store, key1, key2, 1)
store.commit()
assert(rowPairsToDataSet(store.iterator()) === Set((key1, key2) -> 1))
}
// two state stores
val provider1 = newStoreProvider(storeId)
tryWithProviderResource(newStoreProvider(storeId)) { provider1 =>
val restoreStore = provider1.getReadStore(1)
val saveStore = provider1.getStore(1)
put(saveStore, key1, key2, get(restoreStore, key1, key2).get + 1)
saveStore.commit()
restoreStore.abort()
}
// check that state is correct for next batch
val provider2 = newStoreProvider(storeId)
tryWithProviderResource(newStoreProvider(storeId)) { provider2 =>
val finalStore = provider2.getStore(2)
assert(rowPairsToDataSet(finalStore.iterator()) === Set((key1, key2) -> 2))
}
}
test("StateStore.get") {
quietly {
@ -1036,13 +1070,14 @@ abstract class StateStoreSuiteBase[ProviderClass <: StateStoreProvider]
}
test("reports memory usage") {
val provider = newStoreProvider()
tryWithProviderResource(newStoreProvider()) { provider =>
val store = provider.getStore(0)
val noDataMemoryUsed = store.metrics.memoryUsedBytes
put(store, "a", 0, 1)
store.commit()
assert(store.metrics.memoryUsedBytes > noDataMemoryUsed)
}
}
test("SPARK-34270: StateStoreMetrics.combine should not override individual metrics") {
val customSumMetric = StateStoreCustomSumMetric("metric1", "custom metric 1")
@ -1067,8 +1102,7 @@ abstract class StateStoreSuiteBase[ProviderClass <: StateStoreProvider]
}
test("SPARK-35659: StateStore.put cannot put null value") {
val provider = newStoreProvider()
tryWithProviderResource(newStoreProvider()) { provider =>
// Verify state before starting a new set of updates
assert(getLatestData(provider).isEmpty)
@ -1078,6 +1112,7 @@ abstract class StateStoreSuiteBase[ProviderClass <: StateStoreProvider]
}
assert(err.getMessage.contains("Cannot put a null value"))
}
}
test("SPARK-35763: StateStoreCustomMetric withNewDesc and createSQLMetric") {
val metric = StateStoreCustomSizeMetric(name = "m1", desc = "desc1")
@ -1122,6 +1157,14 @@ abstract class StateStoreSuiteBase[ProviderClass <: StateStoreProvider]
}
}
protected def tryWithProviderResource[T](provider: ProviderClass)(f: ProviderClass => T): T = {
try {
f(provider)
} finally {
provider.close()
}
}
/** Get the `SQLConf` by the given minimum delta and version to retain in memory */
def getDefaultSQLConf(minDeltasForSnapshot: Int, numOfVersToRetainInMemory: Int): SQLConf