[SPARK-6879] [HISTORYSERVER] check if app is completed before clean it up

https://issues.apache.org/jira/browse/SPARK-6879

Use `applications` to replace `FileStatus`, and check if the app is completed before clean it up.
If an exception was throwed, add it to `applications` to wait for the next loop.

Author: WangTaoTheTonic <wangtao111@huawei.com>

Closes #5491 from WangTaoTheTonic/SPARK-6879 and squashes the following commits:

4a533eb [WangTaoTheTonic] treat ACE specially
cb45105 [WangTaoTheTonic] rebase
d4d5251 [WangTaoTheTonic] per Marcelo's comments
d7455d8 [WangTaoTheTonic] slightly change when delete file
b0abca5 [WangTaoTheTonic] use global var to store apps to clean
94adfe1 [WangTaoTheTonic] leave expired apps alone to be deleted
9872a9d [WangTaoTheTonic] use the right path
fdef4d6 [WangTaoTheTonic] check if app is completed before clean it up
This commit is contained in:
WangTaoTheTonic 2015-04-23 17:20:17 -04:00 committed by Sean Owen
parent 3e91cc273d
commit baa83a9a67

View file

@ -35,7 +35,6 @@ import org.apache.spark.ui.SparkUI
import org.apache.spark.util.{ThreadUtils, Utils}
import org.apache.spark.{Logging, SecurityManager, SparkConf}
/**
* A class that provides application history from event logs stored in the file system.
* This provider checks for new finished applications in the background periodically and
@ -76,6 +75,9 @@ private[history] class FsHistoryProvider(conf: SparkConf) extends ApplicationHis
@volatile private var applications: mutable.LinkedHashMap[String, FsApplicationHistoryInfo]
= new mutable.LinkedHashMap()
// List of applications to be deleted by event log cleaner.
private var appsToClean = new mutable.ListBuffer[FsApplicationHistoryInfo]
// Constants used to parse Spark 1.0.0 log directories.
private[history] val LOG_PREFIX = "EVENT_LOG_"
private[history] val SPARK_VERSION_PREFIX = EventLoggingListener.SPARK_VERSION_KEY + "_"
@ -266,34 +268,40 @@ private[history] class FsHistoryProvider(conf: SparkConf) extends ApplicationHis
*/
private def cleanLogs(): Unit = {
try {
val statusList = Option(fs.listStatus(new Path(logDir))).map(_.toSeq)
.getOrElse(Seq[FileStatus]())
val maxAge = conf.getTimeAsSeconds("spark.history.fs.cleaner.maxAge", "7d") * 1000
val now = System.currentTimeMillis()
val appsToRetain = new mutable.LinkedHashMap[String, FsApplicationHistoryInfo]()
// Scan all logs from the log directory.
// Only completed applications older than the specified max age will be deleted.
applications.values.foreach { info =>
if (now - info.lastUpdated <= maxAge) {
if (now - info.lastUpdated <= maxAge || !info.completed) {
appsToRetain += (info.id -> info)
} else {
appsToClean += info
}
}
applications = appsToRetain
// Scan all logs from the log directory.
// Only directories older than the specified max age will be deleted
statusList.foreach { dir =>
val leftToClean = new mutable.ListBuffer[FsApplicationHistoryInfo]
appsToClean.foreach { info =>
try {
if (now - dir.getModificationTime() > maxAge) {
// if path is a directory and set to true,
// the directory is deleted else throws an exception
fs.delete(dir.getPath, true)
val path = new Path(logDir, info.logPath)
if (fs.exists(path)) {
fs.delete(path, true)
}
} catch {
case t: IOException => logError(s"IOException in cleaning logs of $dir", t)
case e: AccessControlException =>
logInfo(s"No permission to delete ${info.logPath}, ignoring.")
case t: IOException =>
logError(s"IOException in cleaning logs of ${info.logPath}", t)
leftToClean += info
}
}
appsToClean = leftToClean
} catch {
case t: Exception => logError("Exception in cleaning logs", t)
}