[SPARK-35785][SS][FOLLOWUP] Remove ignored test from RocksDBSuite

### What changes were proposed in this pull request?

This patch removes an ignored test from `RocksDBSuite`.

### Why are the changes needed?

The removed test is now ignored. The test itself doesn't look making sense. For example, the condition for capturing exception is never matched. The test runs updates to RocksDB instances at same remote dir with same versions. This doesn't look like a case it will run through in practice.

### Does this PR introduce _any_ user-facing change?

No

### How was this patch tested?

Existing tests.

Closes #33401 from viirya/remove-ignore-test.

Authored-by: Liang-Chi Hsieh <viirya@gmail.com>
Signed-off-by: Dongjoon Hyun <dongjoon@apache.org>
(cherry picked from commit 8009f0dd92)
Signed-off-by: Dongjoon Hyun <dongjoon@apache.org>
This commit is contained in:
Liang-Chi Hsieh 2021-07-17 02:04:55 -07:00 committed by Dongjoon Hyun
parent 595e8251d1
commit 3d423b94a1

View file

@ -337,69 +337,6 @@ class RocksDBSuite extends SparkFunSuite {
}
}
ignore("ensure that concurrent update and cleanup consistent versions") {
quietly {
val numThreads = 20
val numUpdatesInEachThread = 20
val remoteDir = Utils.createTempDir().toString
@volatile var exception: Exception = null
val updatingThreads = Array.fill(numThreads) {
new Thread() {
override def run(): Unit = {
try {
for (version <- 0 to numUpdatesInEachThread) {
withDB(
remoteDir,
version = version) { db =>
val prevValue = Option(toStr(db.get("a"))).getOrElse("0").toInt
db.put("a", (prevValue + 1).toString)
db.commit()
}
}
} catch {
case e: Exception =>
val newException = new Exception(s"ThreadId ${this.getId} failed", e)
if (exception != null) {
exception = newException
}
throw e
}
}
}
}
val cleaningThread = new Thread() {
override def run(): Unit = {
try {
withDB(remoteDir, conf = RocksDBConf().copy(compactOnCommit = true)) { db =>
while (!this.isInterrupted) {
db.cleanup()
Thread.sleep(1)
}
}
} catch {
case e: Exception =>
val newException = new Exception(s"ThreadId ${this.getId} failed", e)
if (exception != null) {
exception = newException
}
throw e
}
}
}
updatingThreads.foreach(_.start())
cleaningThread.start()
updatingThreads.foreach(_.join())
cleaningThread.interrupt()
cleaningThread.join()
if (exception != null) {
fail(exception)
}
withDB(remoteDir, numUpdatesInEachThread) { db =>
assert(toStr(db.get("a")) === numUpdatesInEachThread.toString)
}
}
}
test("checkpoint metadata serde roundtrip") {
def checkJsonRoundtrip(metadata: RocksDBCheckpointMetadata, json: String): Unit = {
assert(metadata.json == json)