[SPARK-8372] History server shows incorrect information for application not started
The history server may show an incorrect App ID for an incomplete application like <App ID>.inprogress. This app info will never disappear even after the app is completed.
![incorrectappinfo](https://cloud.githubusercontent.com/assets/9278199/8156147/2a10fdbe-137d-11e5-9620-c5b61d93e3c1.png)
The cause of the issue is that a log path name is used as the app id when app id cannot be got during replay.
Author: Carson Wang <carson.wang@intel.com>
Closes #6827 from carsonwang/SPARK-8372 and squashes the following commits:
cdbb089 [Carson Wang] Fix code style
3e46b35 [Carson Wang] Update code style
90f5dde [Carson Wang] Add a unit test
d8c9cd0 [Carson Wang] Replaying events only return information when app is started
(cherry picked from commit 2837e06709
)
Signed-off-by: Andrew Or <andrew@databricks.com>
This commit is contained in:
parent
d75c53d88d
commit
f0513733d4
|
@ -157,7 +157,7 @@ private[history] class FsHistoryProvider(conf: SparkConf, clock: Clock)
|
||||||
replayBus.addListener(appListener)
|
replayBus.addListener(appListener)
|
||||||
val appInfo = replay(fs.getFileStatus(new Path(logDir, attempt.logPath)), replayBus)
|
val appInfo = replay(fs.getFileStatus(new Path(logDir, attempt.logPath)), replayBus)
|
||||||
|
|
||||||
ui.setAppName(s"${appInfo.name} ($appId)")
|
appInfo.foreach { app => ui.setAppName(s"${app.name} ($appId)") }
|
||||||
|
|
||||||
val uiAclsEnabled = conf.getBoolean("spark.history.ui.acls.enable", false)
|
val uiAclsEnabled = conf.getBoolean("spark.history.ui.acls.enable", false)
|
||||||
ui.getSecurityManager.setAcls(uiAclsEnabled)
|
ui.getSecurityManager.setAcls(uiAclsEnabled)
|
||||||
|
@ -227,8 +227,12 @@ private[history] class FsHistoryProvider(conf: SparkConf, clock: Clock)
|
||||||
val newAttempts = logs.flatMap { fileStatus =>
|
val newAttempts = logs.flatMap { fileStatus =>
|
||||||
try {
|
try {
|
||||||
val res = replay(fileStatus, bus)
|
val res = replay(fileStatus, bus)
|
||||||
logInfo(s"Application log ${res.logPath} loaded successfully.")
|
res match {
|
||||||
Some(res)
|
case Some(r) => logDebug(s"Application log ${r.logPath} loaded successfully.")
|
||||||
|
case None => logWarning(s"Failed to load application log ${fileStatus.getPath}. " +
|
||||||
|
"The application may have not started.")
|
||||||
|
}
|
||||||
|
res
|
||||||
} catch {
|
} catch {
|
||||||
case e: Exception =>
|
case e: Exception =>
|
||||||
logError(
|
logError(
|
||||||
|
@ -374,9 +378,11 @@ private[history] class FsHistoryProvider(conf: SparkConf, clock: Clock)
|
||||||
|
|
||||||
/**
|
/**
|
||||||
* Replays the events in the specified log file and returns information about the associated
|
* Replays the events in the specified log file and returns information about the associated
|
||||||
* application.
|
* application. Return `None` if the application ID cannot be located.
|
||||||
*/
|
*/
|
||||||
private def replay(eventLog: FileStatus, bus: ReplayListenerBus): FsApplicationAttemptInfo = {
|
private def replay(
|
||||||
|
eventLog: FileStatus,
|
||||||
|
bus: ReplayListenerBus): Option[FsApplicationAttemptInfo] = {
|
||||||
val logPath = eventLog.getPath()
|
val logPath = eventLog.getPath()
|
||||||
logInfo(s"Replaying log path: $logPath")
|
logInfo(s"Replaying log path: $logPath")
|
||||||
val logInput =
|
val logInput =
|
||||||
|
@ -390,16 +396,18 @@ private[history] class FsHistoryProvider(conf: SparkConf, clock: Clock)
|
||||||
val appCompleted = isApplicationCompleted(eventLog)
|
val appCompleted = isApplicationCompleted(eventLog)
|
||||||
bus.addListener(appListener)
|
bus.addListener(appListener)
|
||||||
bus.replay(logInput, logPath.toString, !appCompleted)
|
bus.replay(logInput, logPath.toString, !appCompleted)
|
||||||
new FsApplicationAttemptInfo(
|
appListener.appId.map { appId =>
|
||||||
logPath.getName(),
|
new FsApplicationAttemptInfo(
|
||||||
appListener.appName.getOrElse(NOT_STARTED),
|
logPath.getName(),
|
||||||
appListener.appId.getOrElse(logPath.getName()),
|
appListener.appName.getOrElse(NOT_STARTED),
|
||||||
appListener.appAttemptId,
|
appId,
|
||||||
appListener.startTime.getOrElse(-1L),
|
appListener.appAttemptId,
|
||||||
appListener.endTime.getOrElse(-1L),
|
appListener.startTime.getOrElse(-1L),
|
||||||
getModificationTime(eventLog).get,
|
appListener.endTime.getOrElse(-1L),
|
||||||
appListener.sparkUser.getOrElse(NOT_STARTED),
|
getModificationTime(eventLog).get,
|
||||||
appCompleted)
|
appListener.sparkUser.getOrElse(NOT_STARTED),
|
||||||
|
appCompleted)
|
||||||
|
}
|
||||||
} finally {
|
} finally {
|
||||||
logInput.close()
|
logInput.close()
|
||||||
}
|
}
|
||||||
|
|
|
@ -63,7 +63,8 @@ class FsHistoryProviderSuite extends SparkFunSuite with BeforeAndAfter with Matc
|
||||||
// Write a new-style application log.
|
// Write a new-style application log.
|
||||||
val newAppComplete = newLogFile("new1", None, inProgress = false)
|
val newAppComplete = newLogFile("new1", None, inProgress = false)
|
||||||
writeFile(newAppComplete, true, None,
|
writeFile(newAppComplete, true, None,
|
||||||
SparkListenerApplicationStart("new-app-complete", None, 1L, "test", None),
|
SparkListenerApplicationStart(
|
||||||
|
"new-app-complete", Some("new-app-complete"), 1L, "test", None),
|
||||||
SparkListenerApplicationEnd(5L)
|
SparkListenerApplicationEnd(5L)
|
||||||
)
|
)
|
||||||
|
|
||||||
|
@ -71,13 +72,15 @@ class FsHistoryProviderSuite extends SparkFunSuite with BeforeAndAfter with Matc
|
||||||
val newAppCompressedComplete = newLogFile("new1compressed", None, inProgress = false,
|
val newAppCompressedComplete = newLogFile("new1compressed", None, inProgress = false,
|
||||||
Some("lzf"))
|
Some("lzf"))
|
||||||
writeFile(newAppCompressedComplete, true, None,
|
writeFile(newAppCompressedComplete, true, None,
|
||||||
SparkListenerApplicationStart("new-app-compressed-complete", None, 1L, "test", None),
|
SparkListenerApplicationStart(
|
||||||
|
"new-app-compressed-complete", Some("new-app-compressed-complete"), 1L, "test", None),
|
||||||
SparkListenerApplicationEnd(4L))
|
SparkListenerApplicationEnd(4L))
|
||||||
|
|
||||||
// Write an unfinished app, new-style.
|
// Write an unfinished app, new-style.
|
||||||
val newAppIncomplete = newLogFile("new2", None, inProgress = true)
|
val newAppIncomplete = newLogFile("new2", None, inProgress = true)
|
||||||
writeFile(newAppIncomplete, true, None,
|
writeFile(newAppIncomplete, true, None,
|
||||||
SparkListenerApplicationStart("new-app-incomplete", None, 1L, "test", None)
|
SparkListenerApplicationStart(
|
||||||
|
"new-app-incomplete", Some("new-app-incomplete"), 1L, "test", None)
|
||||||
)
|
)
|
||||||
|
|
||||||
// Write an old-style application log.
|
// Write an old-style application log.
|
||||||
|
@ -85,7 +88,8 @@ class FsHistoryProviderSuite extends SparkFunSuite with BeforeAndAfter with Matc
|
||||||
oldAppComplete.mkdir()
|
oldAppComplete.mkdir()
|
||||||
createEmptyFile(new File(oldAppComplete, provider.SPARK_VERSION_PREFIX + "1.0"))
|
createEmptyFile(new File(oldAppComplete, provider.SPARK_VERSION_PREFIX + "1.0"))
|
||||||
writeFile(new File(oldAppComplete, provider.LOG_PREFIX + "1"), false, None,
|
writeFile(new File(oldAppComplete, provider.LOG_PREFIX + "1"), false, None,
|
||||||
SparkListenerApplicationStart("old-app-complete", None, 2L, "test", None),
|
SparkListenerApplicationStart(
|
||||||
|
"old-app-complete", Some("old-app-complete"), 2L, "test", None),
|
||||||
SparkListenerApplicationEnd(3L)
|
SparkListenerApplicationEnd(3L)
|
||||||
)
|
)
|
||||||
createEmptyFile(new File(oldAppComplete, provider.APPLICATION_COMPLETE))
|
createEmptyFile(new File(oldAppComplete, provider.APPLICATION_COMPLETE))
|
||||||
|
@ -99,7 +103,8 @@ class FsHistoryProviderSuite extends SparkFunSuite with BeforeAndAfter with Matc
|
||||||
oldAppIncomplete.mkdir()
|
oldAppIncomplete.mkdir()
|
||||||
createEmptyFile(new File(oldAppIncomplete, provider.SPARK_VERSION_PREFIX + "1.0"))
|
createEmptyFile(new File(oldAppIncomplete, provider.SPARK_VERSION_PREFIX + "1.0"))
|
||||||
writeFile(new File(oldAppIncomplete, provider.LOG_PREFIX + "1"), false, None,
|
writeFile(new File(oldAppIncomplete, provider.LOG_PREFIX + "1"), false, None,
|
||||||
SparkListenerApplicationStart("old-app-incomplete", None, 2L, "test", None)
|
SparkListenerApplicationStart(
|
||||||
|
"old-app-incomplete", Some("old-app-incomplete"), 2L, "test", None)
|
||||||
)
|
)
|
||||||
|
|
||||||
// Force a reload of data from the log directory, and check that both logs are loaded.
|
// Force a reload of data from the log directory, and check that both logs are loaded.
|
||||||
|
@ -120,16 +125,16 @@ class FsHistoryProviderSuite extends SparkFunSuite with BeforeAndAfter with Matc
|
||||||
List(ApplicationAttemptInfo(None, start, end, lastMod, user, completed)))
|
List(ApplicationAttemptInfo(None, start, end, lastMod, user, completed)))
|
||||||
}
|
}
|
||||||
|
|
||||||
list(0) should be (makeAppInfo(newAppComplete.getName(), "new-app-complete", 1L, 5L,
|
list(0) should be (makeAppInfo("new-app-complete", "new-app-complete", 1L, 5L,
|
||||||
newAppComplete.lastModified(), "test", true))
|
newAppComplete.lastModified(), "test", true))
|
||||||
list(1) should be (makeAppInfo(newAppCompressedComplete.getName(),
|
list(1) should be (makeAppInfo("new-app-compressed-complete",
|
||||||
"new-app-compressed-complete", 1L, 4L, newAppCompressedComplete.lastModified(), "test",
|
"new-app-compressed-complete", 1L, 4L, newAppCompressedComplete.lastModified(), "test",
|
||||||
true))
|
true))
|
||||||
list(2) should be (makeAppInfo(oldAppComplete.getName(), "old-app-complete", 2L, 3L,
|
list(2) should be (makeAppInfo("old-app-complete", "old-app-complete", 2L, 3L,
|
||||||
oldAppComplete.lastModified(), "test", true))
|
oldAppComplete.lastModified(), "test", true))
|
||||||
list(3) should be (makeAppInfo(oldAppIncomplete.getName(), "old-app-incomplete", 2L, -1L,
|
list(3) should be (makeAppInfo("old-app-incomplete", "old-app-incomplete", 2L, -1L,
|
||||||
oldAppIncomplete.lastModified(), "test", false))
|
oldAppIncomplete.lastModified(), "test", false))
|
||||||
list(4) should be (makeAppInfo(newAppIncomplete.getName(), "new-app-incomplete", 1L, -1L,
|
list(4) should be (makeAppInfo("new-app-incomplete", "new-app-incomplete", 1L, -1L,
|
||||||
newAppIncomplete.lastModified(), "test", false))
|
newAppIncomplete.lastModified(), "test", false))
|
||||||
|
|
||||||
// Make sure the UI can be rendered.
|
// Make sure the UI can be rendered.
|
||||||
|
@ -153,7 +158,7 @@ class FsHistoryProviderSuite extends SparkFunSuite with BeforeAndAfter with Matc
|
||||||
logDir.mkdir()
|
logDir.mkdir()
|
||||||
createEmptyFile(new File(logDir, provider.SPARK_VERSION_PREFIX + "1.0"))
|
createEmptyFile(new File(logDir, provider.SPARK_VERSION_PREFIX + "1.0"))
|
||||||
writeFile(new File(logDir, provider.LOG_PREFIX + "1"), false, Option(codec),
|
writeFile(new File(logDir, provider.LOG_PREFIX + "1"), false, Option(codec),
|
||||||
SparkListenerApplicationStart("app2", None, 2L, "test", None),
|
SparkListenerApplicationStart("app2", Some("app2"), 2L, "test", None),
|
||||||
SparkListenerApplicationEnd(3L)
|
SparkListenerApplicationEnd(3L)
|
||||||
)
|
)
|
||||||
createEmptyFile(new File(logDir, provider.COMPRESSION_CODEC_PREFIX + codecName))
|
createEmptyFile(new File(logDir, provider.COMPRESSION_CODEC_PREFIX + codecName))
|
||||||
|
@ -176,12 +181,12 @@ class FsHistoryProviderSuite extends SparkFunSuite with BeforeAndAfter with Matc
|
||||||
test("SPARK-3697: ignore directories that cannot be read.") {
|
test("SPARK-3697: ignore directories that cannot be read.") {
|
||||||
val logFile1 = newLogFile("new1", None, inProgress = false)
|
val logFile1 = newLogFile("new1", None, inProgress = false)
|
||||||
writeFile(logFile1, true, None,
|
writeFile(logFile1, true, None,
|
||||||
SparkListenerApplicationStart("app1-1", None, 1L, "test", None),
|
SparkListenerApplicationStart("app1-1", Some("app1-1"), 1L, "test", None),
|
||||||
SparkListenerApplicationEnd(2L)
|
SparkListenerApplicationEnd(2L)
|
||||||
)
|
)
|
||||||
val logFile2 = newLogFile("new2", None, inProgress = false)
|
val logFile2 = newLogFile("new2", None, inProgress = false)
|
||||||
writeFile(logFile2, true, None,
|
writeFile(logFile2, true, None,
|
||||||
SparkListenerApplicationStart("app1-2", None, 1L, "test", None),
|
SparkListenerApplicationStart("app1-2", Some("app1-2"), 1L, "test", None),
|
||||||
SparkListenerApplicationEnd(2L)
|
SparkListenerApplicationEnd(2L)
|
||||||
)
|
)
|
||||||
logFile2.setReadable(false, false)
|
logFile2.setReadable(false, false)
|
||||||
|
@ -214,6 +219,18 @@ class FsHistoryProviderSuite extends SparkFunSuite with BeforeAndAfter with Matc
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
|
test("Parse logs that application is not started") {
|
||||||
|
val provider = new FsHistoryProvider((createTestConf()))
|
||||||
|
|
||||||
|
val logFile1 = newLogFile("app1", None, inProgress = true)
|
||||||
|
writeFile(logFile1, true, None,
|
||||||
|
SparkListenerLogStart("1.4")
|
||||||
|
)
|
||||||
|
updateAndCheck(provider) { list =>
|
||||||
|
list.size should be (0)
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
test("SPARK-5582: empty log directory") {
|
test("SPARK-5582: empty log directory") {
|
||||||
val provider = new FsHistoryProvider(createTestConf())
|
val provider = new FsHistoryProvider(createTestConf())
|
||||||
|
|
||||||
|
|
Loading…
Reference in a new issue