[SPARK-31608][CORE][WEBUI][TEST] Add test suites for HybridStore and HistoryServerMemoryManager

### What changes were proposed in this pull request?
This pull request adds 2 test suites for 2 new classes HybridStore and HistoryServerMemoryManager, which were created in https://github.com/apache/spark/pull/28412. This pull request also did some minor changes in these 2 classes to expose some variables for testing. Besides 2 suites, this pull request adds a unit test in FsHistoryProviderSuite to test parsing logs with HybridStore.

### Why are the changes needed?
Unit tests are needed for new features.

### Does this PR introduce _any_ user-facing change?
No.

### How was this patch tested?
Unit tests.

Closes #29509 from baohe-zhang/SPARK-31608-UT.

Authored-by: Baohe Zhang <baohe.zhang@verizonmedia.com>
Signed-off-by: Jungtaek Lim (HeartSaVioR) <kabhwan.opensource@gmail.com>
This commit is contained in:
Baohe Zhang 2020-08-25 12:07:49 +09:00 committed by Jungtaek Lim (HeartSaVioR)
parent 3eee915b47
commit 9151a589a7
5 changed files with 304 additions and 7 deletions

View file

@ -33,8 +33,9 @@ private class HistoryServerMemoryManager(
conf: SparkConf) extends Logging {
private val maxUsage = conf.get(MAX_IN_MEMORY_STORE_USAGE)
private val currentUsage = new AtomicLong(0L)
private val active = new HashMap[(String, Option[String]), Long]()
// Visible for testing.
private[history] val currentUsage = new AtomicLong(0L)
private[history] val active = new HashMap[(String, Option[String]), Long]()
def initialize(): Unit = {
logInfo("Initialized memory manager: " +

View file

@ -54,7 +54,8 @@ private[history] class HybridStore extends KVStore {
private var backgroundThread: Thread = null
// A hash map that stores all classes that had been writen to inMemoryStore
private val klassMap = new ConcurrentHashMap[Class[_], Boolean]
// Visible for testing
private[history] val klassMap = new ConcurrentHashMap[Class[_], Boolean]
override def getMetadata[T](klass: Class[T]): T = {
getStore().getMetadata(klass)
@ -165,8 +166,9 @@ private[history] class HybridStore extends KVStore {
/**
* This method return the store that we should use.
* Visible for testing.
*/
private def getStore(): KVStore = {
private[history] def getStore(): KVStore = {
if (shouldUseInMemoryStore.get) {
inMemoryStore
} else {

View file

@ -90,9 +90,13 @@ class FsHistoryProviderSuite extends SparkFunSuite with Matchers with Logging {
}
}
private def testAppLogParsing(inMemory: Boolean): Unit = {
test("SPARK-31608: parse application logs with HybridStore") {
testAppLogParsing(false, true)
}
private def testAppLogParsing(inMemory: Boolean, useHybridStore: Boolean = false): Unit = {
val clock = new ManualClock(12345678)
val conf = createTestConf(inMemory = inMemory)
val conf = createTestConf(inMemory = inMemory, useHybridStore = useHybridStore)
val provider = new FsHistoryProvider(conf, clock)
// Write a new-style application log.
@ -1509,7 +1513,9 @@ class FsHistoryProviderSuite extends SparkFunSuite with Matchers with Logging {
new FileOutputStream(file).close()
}
private def createTestConf(inMemory: Boolean = false): SparkConf = {
private def createTestConf(
inMemory: Boolean = false,
useHybridStore: Boolean = false): SparkConf = {
val conf = new SparkConf()
.set(HISTORY_LOG_DIR, testDir.getAbsolutePath())
.set(FAST_IN_PROGRESS_PARSING, true)
@ -1517,6 +1523,7 @@ class FsHistoryProviderSuite extends SparkFunSuite with Matchers with Logging {
if (!inMemory) {
conf.set(LOCAL_STORE_DIR, Utils.createTempDir().getAbsolutePath())
}
conf.set(HYBRID_STORE_ENABLED, useHybridStore)
conf
}

View file

@ -0,0 +1,55 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.spark.deploy.history
import org.apache.spark.{SparkConf, SparkFunSuite}
import org.apache.spark.internal.config.History._
class HistoryServerMemoryManagerSuite extends SparkFunSuite {
private val MAX_USAGE = 3L
test("lease and release memory") {
val conf = new SparkConf().set(MAX_IN_MEMORY_STORE_USAGE, MAX_USAGE)
val manager = new HistoryServerMemoryManager(conf)
// Memory usage estimation for non-compressed log file is filesize / 2
manager.lease("app1", None, 2L, None)
manager.lease("app2", None, 2L, None)
manager.lease("app3", None, 2L, None)
assert(manager.currentUsage.get === 3L)
assert(manager.active.size === 3)
assert(manager.active.get(("app1", None)) === Some(1L))
intercept[RuntimeException] {
manager.lease("app4", None, 2L, None)
}
// Releasing a non-existent app is a no-op
manager.release("app4", None)
assert(manager.currentUsage.get === 3L)
manager.release("app1", None)
assert(manager.currentUsage.get === 2L)
assert(manager.active.size === 2)
manager.lease("app4", None, 2L, None)
assert(manager.currentUsage.get === 3L)
assert(manager.active.size === 3)
}
}

View file

@ -0,0 +1,232 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.spark.deploy.history
import java.io.File
import java.util.NoSuchElementException
import java.util.concurrent.LinkedBlockingQueue
import org.apache.commons.io.FileUtils
import org.scalatest.BeforeAndAfter
import org.scalatest.concurrent.TimeLimits
import org.scalatest.time.SpanSugar._
import org.apache.spark.SparkFunSuite
import org.apache.spark.status.KVUtils._
import org.apache.spark.util.kvstore._
class HybridStoreSuite extends SparkFunSuite with BeforeAndAfter with TimeLimits {
private var db: LevelDB = _
private var dbpath: File = _
before {
dbpath = File.createTempFile("test.", ".ldb")
dbpath.delete()
db = new LevelDB(dbpath, new KVStoreScalaSerializer())
}
after {
if (db != null) {
db.close()
}
if (dbpath != null) {
FileUtils.deleteQuietly(dbpath)
}
}
test("test multiple objects write read delete") {
val store = createHybridStore()
val t1 = createCustomType1(1)
val t2 = createCustomType1(2)
intercept[NoSuchElementException] {
store.read(t1.getClass(), t1.key)
}
store.write(t1)
store.write(t2)
store.delete(t2.getClass(), t2.key)
Seq(false, true).foreach { switch =>
if (switch) switchHybridStore(store)
intercept[NoSuchElementException] {
store.read(t2.getClass(), t2.key)
}
assert(store.read(t1.getClass(), t1.key) === t1)
assert(store.count(t1.getClass()) === 1L)
}
}
test("test metadata") {
val store = createHybridStore()
assert(store.getMetadata(classOf[CustomType1]) === null)
val t1 = createCustomType1(1)
store.setMetadata(t1)
assert(store.getMetadata(classOf[CustomType1]) === t1)
// Switch to LevelDB and set a new metadata
switchHybridStore(store)
val t2 = createCustomType1(2)
store.setMetadata(t2)
assert(store.getMetadata(classOf[CustomType1]) === t2)
}
test("test update") {
val store = createHybridStore()
val t = createCustomType1(1)
store.write(t)
t.name = "name2"
store.write(t)
Seq(false, true).foreach { switch =>
if (switch) switchHybridStore(store)
assert(store.count(t.getClass()) === 1L)
assert(store.read(t.getClass(), t.key) === t)
}
}
test("test basic iteration") {
val store = createHybridStore()
val t1 = createCustomType1(1)
store.write(t1)
val t2 = createCustomType1(2)
store.write(t2)
Seq(false, true).foreach { switch =>
if (switch) switchHybridStore(store)
assert(store.view(t1.getClass()).iterator().next().id === t1.id)
assert(store.view(t1.getClass()).skip(1).iterator().next().id === t2.id)
assert(store.view(t1.getClass()).skip(1).max(1).iterator().next().id === t2.id)
assert(store.view(t1.getClass()).first(t1.key).max(1).iterator().next().id === t1.id)
assert(store.view(t1.getClass()).first(t2.key).max(1).iterator().next().id === t2.id)
}
}
test("test delete after switch") {
val store = createHybridStore()
val t = createCustomType1(1)
store.write(t)
switchHybridStore(store)
intercept[IllegalStateException] {
store.delete(t.getClass(), t.key)
}
}
test("test klassMap") {
val store = createHybridStore()
val t1 = createCustomType1(1)
store.write(t1)
assert(store.klassMap.size === 1)
val t2 = new CustomType2("key2")
store.write(t2)
assert(store.klassMap.size === 2)
switchHybridStore(store)
val t3 = new CustomType3("key3")
store.write(t3)
// Cannot put new klass to klassMap after the switching starts
assert(store.klassMap.size === 2)
}
private def createHybridStore(): HybridStore = {
val store = new HybridStore()
store.setLevelDB(db)
store
}
private def createCustomType1(i: Int): CustomType1 = {
new CustomType1("key" + i, "id" + i, "name" + i, i, "child" + i)
}
private def switchHybridStore(store: HybridStore): Unit = {
assert(store.getStore().isInstanceOf[InMemoryStore])
val listener = new SwitchListener()
store.switchToLevelDB(listener, "test", None)
failAfter(2.seconds) {
assert(listener.waitUntilDone())
}
while (!store.getStore().isInstanceOf[LevelDB]) {
Thread.sleep(10)
}
}
private class SwitchListener extends HybridStore.SwitchToLevelDBListener {
// Put true to the queue when switch succeeds, and false when fails.
private val results = new LinkedBlockingQueue[Boolean]()
override def onSwitchToLevelDBSuccess(): Unit = {
try {
results.put(true)
} catch {
case _: InterruptedException =>
// no-op
}
}
override def onSwitchToLevelDBFail(e: Exception): Unit = {
try {
results.put(false)
} catch {
case _: InterruptedException =>
// no-op
}
}
def waitUntilDone(): Boolean = {
results.take()
}
}
}
class CustomType1(
@KVIndexParam var key: String,
@KVIndexParam("id") var id: String,
@KVIndexParam(value = "name", copy = true) var name: String,
@KVIndexParam("int") var num: Int,
@KVIndexParam(value = "child", parent = "id") var child: String) {
override def equals(o: Any): Boolean = {
o match {
case t: CustomType1 =>
id.equals(t.id) && name.equals(t.name)
case _ => false
}
}
override def hashCode: Int = {
id.hashCode
}
override def toString: String = {
"CustomType1[key=" + key + ",id=" + id + ",name=" + name + ",num=" + num;
}
}
class CustomType2(@KVIndexParam var key: String) {}
class CustomType3(@KVIndexParam var key: String) {}