[SPARK-26186][SPARK-26184][CORE] Last updated time is not getting updated for the Inprogress application

## What changes were proposed in this pull request?

When the 'spark.history.fs.inProgressOptimization.enabled' is true, inProgress application's last updated time is not getting updated in the History UI. Also, during the cleaning time, InProgress application is getting removed from the listing, even if the last updated time is within the cleaning threshold time.

In this PR, if the fastInprogressOptimization enabled, we update the `lastUpdateTime` of the application as last scan time. This will update the `lastUpdateTime` in the historyUI and also while cleaning, it won't remove if the updateTime is within the cleaning interval

## How was this patch tested?
Added UT, attached screen shot.
Before patch:
![screenshot from 2018-11-27 23-22-38](https://user-images.githubusercontent.com/23054875/49101600-9b5a3380-f29c-11e8-8efc-3fb594e4279a.png)
![screenshot from 2018-11-27 23-20-11](https://user-images.githubusercontent.com/23054875/49101601-9c8b6080-f29c-11e8-928e-643a8c8f4477.png)

After Patch:
![screenshot from 2018-11-27 23-37-10](https://user-images.githubusercontent.com/23054875/49101911-669aac00-f29d-11e8-8181-663e4a08ab0e.png)
![screenshot from 2018-11-27 23-39-04](https://user-images.githubusercontent.com/23054875/49102010-a5306680-f29d-11e8-947a-e8a2a09a785a.png)

Closes #23158 from shahidki31/HistoryLastUpdateTime.

Authored-by: Shahid <shahidki31@gmail.com>
Signed-off-by: Marcelo Vanzin <vanzin@cloudera.com>
This commit is contained in:
Shahid 2018-11-29 09:48:18 -08:00 committed by Marcelo Vanzin
parent de42281527
commit 24e78b7f16
2 changed files with 61 additions and 0 deletions

View file

@ -461,6 +461,28 @@ private[history] class FsHistoryProvider(conf: SparkConf, clock: Clock)
if (info.appId.isDefined && fastInProgressParsing) {
// When fast in-progress parsing is on, we don't need to re-parse when the
// size changes, but we do need to invalidate any existing UIs.
// Also, we need to update the `lastUpdated time` to display the updated time in
// the HistoryUI and to avoid cleaning the inprogress app while running.
val appInfo = listing.read(classOf[ApplicationInfoWrapper], info.appId.get)
val attemptList = appInfo.attempts.map { attempt =>
if (attempt.info.attemptId == info.attemptId) {
new AttemptInfoWrapper(
attempt.info.copy(lastUpdated = new Date(newLastScanTime)),
attempt.logPath,
attempt.fileSize,
attempt.adminAcls,
attempt.viewAcls,
attempt.adminAclsGroups,
attempt.viewAclsGroups)
} else {
attempt
}
}
val updatedAppInfo = new ApplicationInfoWrapper(appInfo.info, attemptList)
listing.write(updatedAppInfo)
invalidateUI(info.appId.get, info.attemptId)
false
} else {

View file

@ -334,6 +334,45 @@ class FsHistoryProviderSuite extends SparkFunSuite with BeforeAndAfter with Matc
assert(!log2.exists())
}
test("should not clean inprogress application with lastUpdated time less than maxTime") {
val firstFileModifiedTime = TimeUnit.DAYS.toMillis(1)
val secondFileModifiedTime = TimeUnit.DAYS.toMillis(6)
val maxAge = TimeUnit.DAYS.toMillis(7)
val clock = new ManualClock(0)
val provider = new FsHistoryProvider(
createTestConf().set(MAX_LOG_AGE_S, maxAge / 1000), clock)
val log = newLogFile("inProgressApp1", None, inProgress = true)
writeFile(log, true, None,
SparkListenerApplicationStart(
"inProgressApp1", Some("inProgressApp1"), 3L, "test", Some("attempt1"))
)
clock.setTime(firstFileModifiedTime)
log.setLastModified(clock.getTimeMillis())
provider.checkForLogs()
writeFile(log, true, None,
SparkListenerApplicationStart(
"inProgressApp1", Some("inProgressApp1"), 3L, "test", Some("attempt1")),
SparkListenerJobStart(0, 1L, Nil, null)
)
clock.setTime(secondFileModifiedTime)
log.setLastModified(clock.getTimeMillis())
provider.checkForLogs()
clock.setTime(TimeUnit.DAYS.toMillis(10))
writeFile(log, true, None,
SparkListenerApplicationStart(
"inProgressApp1", Some("inProgressApp1"), 3L, "test", Some("attempt1")),
SparkListenerJobStart(0, 1L, Nil, null),
SparkListenerJobEnd(0, 1L, JobSucceeded)
)
log.setLastModified(clock.getTimeMillis())
provider.checkForLogs()
// This should not trigger any cleanup
updateAndCheck(provider) { list =>
list.size should be(1)
}
}
test("log cleaner for inProgress files") {
val firstFileModifiedTime = TimeUnit.SECONDS.toMillis(10)
val secondFileModifiedTime = TimeUnit.SECONDS.toMillis(20)