[SPARK-20654][CORE] Add config to limit disk usage of the history server.

This change adds a new configuration option and support code that limits
how much disk space the SHS will use. The default value is pretty generous
so that applications will, hopefully, only rarely need to be replayed
because of their disk stored being evicted.

This works by keeping track of how much data each application is using.
Also, because it's not possible to know, before replaying, how much space
will be needed, it's possible that usage will exceed the configured limit
temporarily. The code uses the concept of a "lease" to try to limit how
much the SHS will exceed the limit in those cases.

Active UIs are also tracked, so they're never deleted. This works in
tandem with the existing option of how many active UIs are loaded; because
unused UIs will be unloaded, their disk stores will also become candidates
for deletion. If the data is not deleted, though, re-loading the UI is
pretty quick.

Author: Marcelo Vanzin <vanzin@cloudera.com>

Closes #20011 from vanzin/SPARK-20654.
This commit is contained in:
Marcelo Vanzin 2017-12-29 10:40:09 -06:00 committed by Imran Rashid
parent 11a849b3a7
commit 8b497046c6
5 changed files with 606 additions and 74 deletions

View file

@ -24,6 +24,7 @@ import java.util.zip.{ZipEntry, ZipOutputStream}
import scala.collection.JavaConverters._
import scala.collection.mutable
import scala.util.Try
import scala.xml.Node
import com.fasterxml.jackson.annotation.JsonIgnore
@ -39,11 +40,13 @@ import org.fusesource.leveldbjni.internal.NativeDB
import org.apache.spark.{SecurityManager, SparkConf, SparkException}
import org.apache.spark.deploy.SparkHadoopUtil
import org.apache.spark.internal.Logging
import org.apache.spark.io.CompressionCodec
import org.apache.spark.scheduler._
import org.apache.spark.scheduler.ReplayListenerBus._
import org.apache.spark.status._
import org.apache.spark.status.KVUtils._
import org.apache.spark.status.api.v1.{ApplicationAttemptInfo, ApplicationInfo}
import org.apache.spark.status.config._
import org.apache.spark.ui.SparkUI
import org.apache.spark.util.{Clock, SystemClock, ThreadUtils, Utils}
import org.apache.spark.util.kvstore._
@ -149,6 +152,10 @@ private[history] class FsHistoryProvider(conf: SparkConf, clock: Clock)
}
}.getOrElse(new InMemoryStore())
private val diskManager = storePath.map { path =>
new HistoryServerDiskManager(conf, path, listing, clock)
}
private val activeUIs = new mutable.HashMap[(String, Option[String]), LoadedAppUI]()
/**
@ -219,6 +226,8 @@ private[history] class FsHistoryProvider(conf: SparkConf, clock: Clock)
}
private def startPolling(): Unit = {
diskManager.foreach(_.initialize())
// Validate the log directory.
val path = new Path(logDir)
try {
@ -299,63 +308,24 @@ private[history] class FsHistoryProvider(conf: SparkConf, clock: Clock)
attempt.adminAclsGroups.getOrElse(""))
secManager.setViewAclsGroups(attempt.viewAclsGroups.getOrElse(""))
val uiStorePath = storePath.map { path => getStorePath(path, appId, attemptId) }
val kvstore = try {
diskManager match {
case Some(sm) =>
loadDiskStore(sm, appId, attempt)
val (kvstore, needReplay) = uiStorePath match {
case Some(path) =>
try {
// The store path is not guaranteed to exist - maybe it hasn't been created, or was
// invalidated because changes to the event log were detected. Need to replay in that
// case.
val _replay = !path.isDirectory()
(createDiskStore(path, conf), _replay)
} catch {
case e: Exception =>
// Get rid of the old data and re-create it. The store is either old or corrupted.
logWarning(s"Failed to load disk store $uiStorePath for $appId.", e)
Utils.deleteRecursively(path)
(createDiskStore(path, conf), true)
}
case _ =>
(new InMemoryStore(), true)
}
val plugins = ServiceLoader.load(
classOf[AppHistoryServerPlugin], Utils.getContextOrSparkClassLoader).asScala
val trackingStore = new ElementTrackingStore(kvstore, conf)
if (needReplay) {
val replayBus = new ReplayListenerBus()
val listener = new AppStatusListener(trackingStore, conf, false,
lastUpdateTime = Some(attempt.info.lastUpdated.getTime()))
replayBus.addListener(listener)
for {
plugin <- plugins
listener <- plugin.createListeners(conf, trackingStore)
} replayBus.addListener(listener)
try {
val fileStatus = fs.getFileStatus(new Path(logDir, attempt.logPath))
replay(fileStatus, isApplicationCompleted(fileStatus), replayBus)
trackingStore.close(false)
} catch {
case e: Exception =>
Utils.tryLogNonFatalError {
trackingStore.close()
}
uiStorePath.foreach(Utils.deleteRecursively)
if (e.isInstanceOf[FileNotFoundException]) {
return None
} else {
throw e
}
case _ =>
createInMemoryStore(attempt)
}
} catch {
case _: FileNotFoundException =>
return None
}
val ui = SparkUI.create(None, new AppStatusStore(kvstore), conf, secManager, app.info.name,
HistoryServer.getAttemptURI(appId, attempt.info.attemptId),
attempt.info.startTime.getTime(),
attempt.info.appSparkVersion)
plugins.foreach(_.setupUI(ui))
loadPlugins().foreach(_.setupUI(ui))
val loadedUI = LoadedAppUI(ui)
@ -417,11 +387,11 @@ private[history] class FsHistoryProvider(conf: SparkConf, clock: Clock)
loadedUI.lock.writeLock().unlock()
}
// If the UI is not valid, delete its files from disk, if any. This relies on the fact that
// ApplicationCache will never call this method concurrently with getAppUI() for the same
// appId / attemptId.
if (!loadedUI.valid && storePath.isDefined) {
Utils.deleteRecursively(getStorePath(storePath.get, appId, attemptId))
diskManager.foreach { dm =>
// If the UI is not valid, delete its files from disk, if any. This relies on the fact that
// ApplicationCache will never call this method concurrently with getAppUI() for the same
// appId / attemptId.
dm.release(appId, attemptId, delete = !loadedUI.valid)
}
}
}
@ -569,12 +539,11 @@ private[history] class FsHistoryProvider(conf: SparkConf, clock: Clock)
}
val logPath = fileStatus.getPath()
val bus = new ReplayListenerBus()
val listener = new AppListingListener(fileStatus, clock)
bus.addListener(listener)
replay(fileStatus, bus, eventsFilter = eventsFilter)
replay(fileStatus, isApplicationCompleted(fileStatus), bus, eventsFilter)
listener.applicationInfo.foreach { app =>
// Invalidate the existing UI for the reloaded app attempt, if any. See LoadedAppUI for a
// discussion on the UI lifecycle.
@ -651,10 +620,10 @@ private[history] class FsHistoryProvider(conf: SparkConf, clock: Clock)
*/
private def replay(
eventLog: FileStatus,
appCompleted: Boolean,
bus: ReplayListenerBus,
eventsFilter: ReplayEventsFilter = SELECT_ALL_FILTER): Unit = {
val logPath = eventLog.getPath()
val isCompleted = !logPath.getName().endsWith(EventLoggingListener.IN_PROGRESS)
logInfo(s"Replaying log path: $logPath")
// Note that the eventLog may have *increased* in size since when we grabbed the filestatus,
// and when we read the file here. That is OK -- it may result in an unnecessary refresh
@ -664,18 +633,44 @@ private[history] class FsHistoryProvider(conf: SparkConf, clock: Clock)
// after it's created, so we get a file size that is no bigger than what is actually read.
val logInput = EventLoggingListener.openEventLog(logPath, fs)
try {
bus.replay(logInput, logPath.toString, !appCompleted, eventsFilter)
logInfo(s"Finished replaying $logPath")
bus.replay(logInput, logPath.toString, !isCompleted, eventsFilter)
logInfo(s"Finished parsing $logPath")
} finally {
logInput.close()
}
}
/**
* Return true when the application has completed.
* Rebuilds the application state store from its event log.
*/
private def isApplicationCompleted(entry: FileStatus): Boolean = {
!entry.getPath().getName().endsWith(EventLoggingListener.IN_PROGRESS)
private def rebuildAppStore(
store: KVStore,
eventLog: FileStatus,
lastUpdated: Long): Unit = {
// Disable async updates, since they cause higher memory usage, and it's ok to take longer
// to parse the event logs in the SHS.
val replayConf = conf.clone().set(ASYNC_TRACKING_ENABLED, false)
val trackingStore = new ElementTrackingStore(store, replayConf)
val replayBus = new ReplayListenerBus()
val listener = new AppStatusListener(trackingStore, replayConf, false,
lastUpdateTime = Some(lastUpdated))
replayBus.addListener(listener)
for {
plugin <- loadPlugins()
listener <- plugin.createListeners(conf, trackingStore)
} replayBus.addListener(listener)
try {
replay(eventLog, replayBus)
trackingStore.close(false)
} catch {
case e: Exception =>
Utils.tryLogNonFatalError {
trackingStore.close()
}
throw e
}
}
/**
@ -751,14 +746,58 @@ private[history] class FsHistoryProvider(conf: SparkConf, clock: Clock)
listing.write(newAppInfo)
}
private def createDiskStore(path: File, conf: SparkConf): KVStore = {
private def loadDiskStore(
dm: HistoryServerDiskManager,
appId: String,
attempt: AttemptInfoWrapper): KVStore = {
val metadata = new AppStatusStoreMetadata(AppStatusStore.CURRENT_VERSION)
KVUtils.open(path, metadata)
// First check if the store already exists and try to open it. If that fails, then get rid of
// the existing data.
dm.openStore(appId, attempt.info.attemptId).foreach { path =>
try {
return KVUtils.open(path, metadata)
} catch {
case e: Exception =>
logInfo(s"Failed to open existing store for $appId/${attempt.info.attemptId}.", e)
dm.release(appId, attempt.info.attemptId, delete = true)
}
}
// At this point the disk data either does not exist or was deleted because it failed to
// load, so the event log needs to be replayed.
val status = fs.getFileStatus(new Path(logDir, attempt.logPath))
val isCompressed = EventLoggingListener.codecName(status.getPath()).flatMap { name =>
Try(CompressionCodec.getShortName(name)).toOption
}.isDefined
logInfo(s"Leasing disk manager space for app $appId / ${attempt.info.attemptId}...")
val lease = dm.lease(status.getLen(), isCompressed)
val newStorePath = try {
val store = KVUtils.open(lease.tmpPath, metadata)
try {
rebuildAppStore(store, status, attempt.info.lastUpdated.getTime())
} finally {
store.close()
}
lease.commit(appId, attempt.info.attemptId)
} catch {
case e: Exception =>
lease.rollback()
throw e
}
KVUtils.open(newStorePath, metadata)
}
private def getStorePath(path: File, appId: String, attemptId: Option[String]): File = {
val fileName = appId + attemptId.map("_" + _).getOrElse("") + ".ldb"
new File(path, fileName)
private def createInMemoryStore(attempt: AttemptInfoWrapper): KVStore = {
val store = new InMemoryStore()
val status = fs.getFileStatus(new Path(logDir, attempt.logPath))
rebuildAppStore(store, status, attempt.info.lastUpdated.getTime())
store
}
private def loadPlugins(): Iterable[AppHistoryServerPlugin] = {
ServiceLoader.load(classOf[AppHistoryServerPlugin], Utils.getContextOrSparkClassLoader).asScala
}
/** For testing. Returns internal data about a single attempt. */

View file

@ -0,0 +1,327 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.spark.deploy.history
import java.io.File
import java.nio.file.Files
import java.nio.file.attribute.PosixFilePermissions
import java.util.concurrent.atomic.AtomicLong
import scala.collection.JavaConverters._
import scala.collection.mutable.{HashMap, ListBuffer}
import org.apache.commons.io.FileUtils
import org.apache.spark.SparkConf
import org.apache.spark.internal.Logging
import org.apache.spark.status.KVUtils._
import org.apache.spark.util.{Clock, Utils}
import org.apache.spark.util.kvstore.KVStore
/**
* A class used to keep track of disk usage by the SHS, allowing application data to be deleted
* from disk when usage exceeds a configurable threshold.
*
* The goal of the class is not to guarantee that usage will never exceed the threshold; because of
* how application data is written, disk usage may temporarily go higher. But, eventually, it
* should fall back under the threshold.
*
* @param conf Spark configuration.
* @param path Path where to store application data.
* @param listing The listing store, used to persist usage data.
* @param clock Clock instance to use.
*/
private class HistoryServerDiskManager(
conf: SparkConf,
path: File,
listing: KVStore,
clock: Clock) extends Logging {
import config._
private val appStoreDir = new File(path, "apps")
if (!appStoreDir.isDirectory() && !appStoreDir.mkdir()) {
throw new IllegalArgumentException(s"Failed to create app directory ($appStoreDir).")
}
private val tmpStoreDir = new File(path, "temp")
if (!tmpStoreDir.isDirectory() && !tmpStoreDir.mkdir()) {
throw new IllegalArgumentException(s"Failed to create temp directory ($tmpStoreDir).")
}
private val maxUsage = conf.get(MAX_LOCAL_DISK_USAGE)
private val currentUsage = new AtomicLong(0L)
private val committedUsage = new AtomicLong(0L)
private val active = new HashMap[(String, Option[String]), Long]()
def initialize(): Unit = {
updateUsage(sizeOf(appStoreDir), committed = true)
// Clean up any temporary stores during start up. This assumes that they're leftover from other
// instances and are not useful.
tmpStoreDir.listFiles().foreach(FileUtils.deleteQuietly)
// Go through the recorded store directories and remove any that may have been removed by
// external code.
val orphans = listing.view(classOf[ApplicationStoreInfo]).asScala.filter { info =>
!new File(info.path).exists()
}.toSeq
orphans.foreach { info =>
listing.delete(info.getClass(), info.path)
}
logInfo("Initialized disk manager: " +
s"current usage = ${Utils.bytesToString(currentUsage.get())}, " +
s"max usage = ${Utils.bytesToString(maxUsage)}")
}
/**
* Lease some space from the store. The leased space is calculated as a fraction of the given
* event log size; this is an approximation, and doesn't mean the application store cannot
* outgrow the lease.
*
* If there's not enough space for the lease, other applications might be evicted to make room.
* This method always returns a lease, meaning that it's possible for local disk usage to grow
* past the configured threshold if there aren't enough idle applications to evict.
*
* While the lease is active, the data is written to a temporary location, so `openStore()`
* will still return `None` for the application.
*/
def lease(eventLogSize: Long, isCompressed: Boolean = false): Lease = {
val needed = approximateSize(eventLogSize, isCompressed)
makeRoom(needed)
val perms = PosixFilePermissions.fromString("rwx------")
val tmp = Files.createTempDirectory(tmpStoreDir.toPath(), "appstore",
PosixFilePermissions.asFileAttribute(perms)).toFile()
updateUsage(needed)
val current = currentUsage.get()
if (current > maxUsage) {
logInfo(s"Lease of ${Utils.bytesToString(needed)} may cause usage to exceed max " +
s"(${Utils.bytesToString(current)} > ${Utils.bytesToString(maxUsage)})")
}
new Lease(tmp, needed)
}
/**
* Returns the location of an application store if it's still available. Marks the store as
* being used so that it's not evicted when running out of designated space.
*/
def openStore(appId: String, attemptId: Option[String]): Option[File] = {
val storePath = active.synchronized {
val path = appStorePath(appId, attemptId)
if (path.isDirectory()) {
active(appId -> attemptId) = sizeOf(path)
Some(path)
} else {
None
}
}
storePath.foreach { path =>
updateAccessTime(appId, attemptId)
}
storePath
}
/**
* Tell the disk manager that the store for the given application is not being used anymore.
*
* @param delete Whether to delete the store from disk.
*/
def release(appId: String, attemptId: Option[String], delete: Boolean = false): Unit = {
// Because LevelDB may modify the structure of the store files even when just reading, update
// the accounting for this application when it's closed.
val oldSizeOpt = active.synchronized {
active.remove(appId -> attemptId)
}
oldSizeOpt.foreach { oldSize =>
val path = appStorePath(appId, attemptId)
updateUsage(-oldSize, committed = true)
if (path.isDirectory()) {
if (delete) {
deleteStore(path)
} else {
val newSize = sizeOf(path)
val newInfo = listing.read(classOf[ApplicationStoreInfo], path.getAbsolutePath())
.copy(size = newSize)
listing.write(newInfo)
updateUsage(newSize, committed = true)
}
}
}
}
/**
* A non-scientific approximation of how large an app state store will be given the size of the
* event log.
*/
def approximateSize(eventLogSize: Long, isCompressed: Boolean): Long = {
if (isCompressed) {
// For compressed logs, assume that compression reduces the log size a lot, and the disk
// store will actually grow compared to the log size.
eventLogSize * 2
} else {
// For non-compressed logs, assume the disk store will end up at approximately 50% of the
// size of the logs. This is loosely based on empirical evidence.
eventLogSize / 2
}
}
/** Current free space. Considers space currently leased out too. */
def free(): Long = {
math.max(maxUsage - currentUsage.get(), 0L)
}
/** Current committed space. */
def committed(): Long = committedUsage.get()
private def deleteStore(path: File): Unit = {
FileUtils.deleteDirectory(path)
listing.delete(classOf[ApplicationStoreInfo], path.getAbsolutePath())
}
private def makeRoom(size: Long): Unit = {
if (free() < size) {
logDebug(s"Not enough free space, looking at candidates for deletion...")
val evicted = new ListBuffer[ApplicationStoreInfo]()
Utils.tryWithResource(
listing.view(classOf[ApplicationStoreInfo]).index("lastAccess").closeableIterator()
) { iter =>
var needed = size
while (needed > 0 && iter.hasNext()) {
val info = iter.next()
val isActive = active.synchronized {
active.contains(info.appId -> info.attemptId)
}
if (!isActive) {
evicted += info
needed -= info.size
}
}
}
if (evicted.nonEmpty) {
val freed = evicted.map { info =>
logInfo(s"Deleting store for ${info.appId}/${info.attemptId}.")
deleteStore(new File(info.path))
updateUsage(-info.size, committed = true)
info.size
}.sum
logInfo(s"Deleted ${evicted.size} store(s) to free ${Utils.bytesToString(freed)} " +
s"(target = ${Utils.bytesToString(size)}).")
} else {
logWarning(s"Unable to free any space to make room for ${Utils.bytesToString(size)}.")
}
}
}
private def appStorePath(appId: String, attemptId: Option[String]): File = {
val fileName = appId + attemptId.map("_" + _).getOrElse("") + ".ldb"
new File(appStoreDir, fileName)
}
private def updateAccessTime(appId: String, attemptId: Option[String]): Unit = {
val path = appStorePath(appId, attemptId)
val info = ApplicationStoreInfo(path.getAbsolutePath(), clock.getTimeMillis(), appId, attemptId,
sizeOf(path))
listing.write(info)
}
private def updateUsage(delta: Long, committed: Boolean = false): Unit = {
val updated = currentUsage.addAndGet(delta)
if (updated < 0) {
throw new IllegalStateException(
s"Disk usage tracker went negative (now = $updated, delta = $delta)")
}
if (committed) {
val updatedCommitted = committedUsage.addAndGet(delta)
if (updatedCommitted < 0) {
throw new IllegalStateException(
s"Disk usage tracker went negative (now = $updatedCommitted, delta = $delta)")
}
}
}
/** Visible for testing. Return the size of a directory. */
private[history] def sizeOf(path: File): Long = FileUtils.sizeOf(path)
private[history] class Lease(val tmpPath: File, private val leased: Long) {
/**
* Commits a lease to its final location, and update accounting information. This method
* marks the application as active, so its store is not available for eviction.
*/
def commit(appId: String, attemptId: Option[String]): File = {
val dst = appStorePath(appId, attemptId)
active.synchronized {
require(!active.contains(appId -> attemptId),
s"Cannot commit lease for active application $appId / $attemptId")
if (dst.isDirectory()) {
val size = sizeOf(dst)
deleteStore(dst)
updateUsage(-size, committed = true)
}
}
updateUsage(-leased)
val newSize = sizeOf(tmpPath)
makeRoom(newSize)
tmpPath.renameTo(dst)
updateUsage(newSize, committed = true)
if (committedUsage.get() > maxUsage) {
val current = Utils.bytesToString(committedUsage.get())
val max = Utils.bytesToString(maxUsage)
logWarning(s"Commit of application $appId / $attemptId causes maximum disk usage to be " +
s"exceeded ($current > $max)")
}
updateAccessTime(appId, attemptId)
active.synchronized {
active(appId -> attemptId) = newSize
}
dst
}
/** Deletes the temporary directory created for the lease. */
def rollback(): Unit = {
updateUsage(-leased)
FileUtils.deleteDirectory(tmpPath)
}
}
}
private case class ApplicationStoreInfo(
@KVIndexParam path: String,
@KVIndexParam("lastAccess") lastAccess: Long,
appId: String,
attemptId: Option[String],
size: Long)

View file

@ -20,6 +20,7 @@ package org.apache.spark.deploy.history
import java.util.concurrent.TimeUnit
import org.apache.spark.internal.config.ConfigBuilder
import org.apache.spark.network.util.ByteUnit
private[spark] object config {
@ -39,4 +40,8 @@ private[spark] object config {
.stringConf
.createOptional
val MAX_LOCAL_DISK_USAGE = ConfigBuilder("spark.history.store.maxDiskUsage")
.bytesConf(ByteUnit.BYTE)
.createWithDefaultString("10g")
}

View file

@ -352,14 +352,8 @@ private[spark] object EventLoggingListener extends Logging {
*/
def openEventLog(log: Path, fs: FileSystem): InputStream = {
val in = new BufferedInputStream(fs.open(log))
// Compression codec is encoded as an extension, e.g. app_123.lzf
// Since we sanitize the app ID to not include periods, it is safe to split on it
val logName = log.getName.stripSuffix(IN_PROGRESS)
val codecName: Option[String] = logName.split("\\.").tail.lastOption
try {
val codec = codecName.map { c =>
val codec = codecName(log).map { c =>
codecMap.getOrElseUpdate(c, CompressionCodec.createCodec(new SparkConf, c))
}
codec.map(_.compressedInputStream(in)).getOrElse(in)
@ -370,4 +364,11 @@ private[spark] object EventLoggingListener extends Logging {
}
}
def codecName(log: Path): Option[String] = {
// Compression codec is encoded as an extension, e.g. app_123.lzf
// Since we sanitize the app ID to not include periods, it is safe to split on it
val logName = log.getName.stripSuffix(IN_PROGRESS)
logName.split("\\.").tail.lastOption
}
}

View file

@ -0,0 +1,160 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.spark.deploy.history
import java.io.File
import org.mockito.AdditionalAnswers
import org.mockito.Matchers.{any, anyBoolean, anyLong, eq => meq}
import org.mockito.Mockito._
import org.scalatest.BeforeAndAfter
import org.apache.spark.{SparkConf, SparkFunSuite}
import org.apache.spark.status.KVUtils
import org.apache.spark.util.{ManualClock, Utils}
import org.apache.spark.util.kvstore.KVStore
class HistoryServerDiskManagerSuite extends SparkFunSuite with BeforeAndAfter {
import config._
private val MAX_USAGE = 3L
private var testDir: File = _
private var store: KVStore = _
before {
testDir = Utils.createTempDir()
store = KVUtils.open(new File(testDir, "listing"), "test")
}
after {
store.close()
if (testDir != null) {
Utils.deleteRecursively(testDir)
}
}
private def mockManager(): HistoryServerDiskManager = {
val conf = new SparkConf().set(MAX_LOCAL_DISK_USAGE, MAX_USAGE)
val manager = spy(new HistoryServerDiskManager(conf, testDir, store, new ManualClock()))
doAnswer(AdditionalAnswers.returnsFirstArg[Long]()).when(manager)
.approximateSize(anyLong(), anyBoolean())
manager
}
test("leasing space") {
val manager = mockManager()
// Lease all available space.
val leaseA = manager.lease(1)
val leaseB = manager.lease(1)
val leaseC = manager.lease(1)
assert(manager.free() === 0)
// Revert one lease, get another one.
leaseA.rollback()
assert(manager.free() > 0)
assert(!leaseA.tmpPath.exists())
val leaseD = manager.lease(1)
assert(manager.free() === 0)
// Committing B should bring the "used" space up to 4, so there shouldn't be space left yet.
doReturn(2L).when(manager).sizeOf(meq(leaseB.tmpPath))
val dstB = leaseB.commit("app2", None)
assert(manager.free() === 0)
assert(manager.committed() === 2)
// Rollback C and D, now there should be 1 left.
leaseC.rollback()
leaseD.rollback()
assert(manager.free() === 1)
// Release app 2 to make it available for eviction.
doReturn(2L).when(manager).sizeOf(meq(dstB))
manager.release("app2", None)
assert(manager.committed() === 2)
// Emulate an updated event log by replacing the store for lease B. Lease 1, and commit with
// size 3.
val leaseE = manager.lease(1)
doReturn(3L).when(manager).sizeOf(meq(leaseE.tmpPath))
val dstE = leaseE.commit("app2", None)
assert(dstE === dstB)
assert(dstE.exists())
doReturn(3L).when(manager).sizeOf(meq(dstE))
assert(!leaseE.tmpPath.exists())
assert(manager.free() === 0)
manager.release("app2", None)
assert(manager.committed() === 3)
// Try a big lease that should cause the released app to be evicted.
val leaseF = manager.lease(6)
assert(!dstB.exists())
assert(manager.free() === 0)
assert(manager.committed() === 0)
// Leasing when no free space is available should still be allowed.
manager.lease(1)
assert(manager.free() === 0)
}
test("tracking active stores") {
val manager = mockManager()
// Lease and commit space for app 1, making it active.
val leaseA = manager.lease(2)
assert(manager.free() === 1)
doReturn(2L).when(manager).sizeOf(leaseA.tmpPath)
assert(manager.openStore("appA", None).isEmpty)
val dstA = leaseA.commit("appA", None)
// Create a new lease. Leases are always granted, but this shouldn't cause app1's store
// to be deleted.
val leaseB = manager.lease(2)
assert(dstA.exists())
// Trying to commit on top of an active application should fail.
intercept[IllegalArgumentException] {
leaseB.commit("appA", None)
}
leaseB.rollback()
// Close appA with an updated size, then create a new lease. Now the app's directory should be
// deleted.
doReturn(3L).when(manager).sizeOf(dstA)
manager.release("appA", None)
assert(manager.free() === 0)
val leaseC = manager.lease(1)
assert(!dstA.exists())
leaseC.rollback()
assert(manager.openStore("appA", None).isEmpty)
}
test("approximate size heuristic") {
val manager = new HistoryServerDiskManager(new SparkConf(false), testDir, store,
new ManualClock())
assert(manager.approximateSize(50L, false) < 50L)
assert(manager.approximateSize(50L, true) > 50L)
}
}