[SPARK-28931][CORE][TESTS] Fix couple of bugs in FsHistoryProviderSuite

### What changes were proposed in this pull request?

This patch fixes the bugs in test code itself, FsHistoryProviderSuite.

1. When creating log file via `newLogFile`, codec is ignored, leading to wrong file name. (No one tends to create test for test code, as well as the bug doesn't affect existing tests indeed, so not easy to catch.)
2. When writing events to log file via `writeFile`, metadata (in case of new format) gets written to file regardless of its codec, and the content is overwritten by another stream, hence no information for Spark version is available. It affects existing test, hence we have wrong expected value to workaround the bug.

This patch also removes redundant parameter `isNewFormat` in `writeFile`, as according to review comment, Spark no longer supports old format.

### Why are the changes needed?

Explained in above section why they're bugs, though they only reside in test-code. (Please note that the bug didn't come from non-test side of code.)

### Does this PR introduce any user-facing change?

No

### How was this patch tested?

Modified existing UTs, as well as read event log file in console to see metadata is not overwritten by other contents.

Closes #25629 from HeartSaVioR/MINOR-FIX-FsHistoryProviderSuite.

Authored-by: Jungtaek Lim (HeartSaVioR) <kabhwan@gmail.com>
Signed-off-by: Marcelo Vanzin <vanzin@cloudera.com>
This commit is contained in:
Jungtaek Lim (HeartSaVioR) 2019-09-04 09:41:15 -07:00 committed by Marcelo Vanzin
parent 9f478a6832
commit 712874fa09

View file

@ -37,7 +37,7 @@ import org.mockito.Mockito.{doThrow, mock, spy, verify, when}
import org.scalatest.Matchers
import org.scalatest.concurrent.Eventually._
import org.apache.spark.{SecurityManager, SparkConf, SparkFunSuite}
import org.apache.spark.{SecurityManager, SPARK_VERSION, SparkConf, SparkFunSuite}
import org.apache.spark.internal.Logging
import org.apache.spark.internal.config.DRIVER_LOG_DFS_DIR
import org.apache.spark.internal.config.History._
@ -75,7 +75,7 @@ class FsHistoryProviderSuite extends SparkFunSuite with Matchers with Logging {
inProgress: Boolean,
codec: Option[String] = None): File = {
val ip = if (inProgress) EventLoggingListener.IN_PROGRESS else ""
val logUri = EventLoggingListener.getLogPath(testDir.toURI, appId, appAttemptId)
val logUri = EventLoggingListener.getLogPath(testDir.toURI, appId, appAttemptId, codec)
val logPath = new Path(logUri).toUri.getPath + ip
new File(logPath)
}
@ -88,11 +88,12 @@ class FsHistoryProviderSuite extends SparkFunSuite with Matchers with Logging {
private def testAppLogParsing(inMemory: Boolean) {
val clock = new ManualClock(12345678)
val provider = new FsHistoryProvider(createTestConf(inMemory = inMemory), clock)
val conf = createTestConf(inMemory = inMemory)
val provider = new FsHistoryProvider(conf, clock)
// Write a new-style application log.
val newAppComplete = newLogFile("new1", None, inProgress = false)
writeFile(newAppComplete, true, None,
writeFile(newAppComplete, None,
SparkListenerApplicationStart(newAppComplete.getName(), Some("new-app-complete"), 1L, "test",
None),
SparkListenerApplicationEnd(5L)
@ -101,14 +102,14 @@ class FsHistoryProviderSuite extends SparkFunSuite with Matchers with Logging {
// Write a new-style application log.
val newAppCompressedComplete = newLogFile("new1compressed", None, inProgress = false,
Some("lzf"))
writeFile(newAppCompressedComplete, true, None,
writeFile(newAppCompressedComplete, Some(CompressionCodec.createCodec(conf, "lzf")),
SparkListenerApplicationStart(newAppCompressedComplete.getName(), Some("new-complete-lzf"),
1L, "test", None),
SparkListenerApplicationEnd(4L))
// Write an unfinished app, new-style.
val newAppIncomplete = newLogFile("new2", None, inProgress = true)
writeFile(newAppIncomplete, true, None,
writeFile(newAppIncomplete, None,
SparkListenerApplicationStart(newAppIncomplete.getName(), Some("new-incomplete"), 1L, "test",
None)
)
@ -131,7 +132,7 @@ class FsHistoryProviderSuite extends SparkFunSuite with Matchers with Logging {
val duration = if (end > 0) end - start else 0
new ApplicationInfo(id, name, None, None, None, None,
List(ApplicationAttemptInfo(None, new Date(start),
new Date(end), new Date(lastMod), duration, user, completed, "")))
new Date(end), new Date(lastMod), duration, user, completed, SPARK_VERSION)))
}
// For completed files, lastUpdated would be lastModified time.
@ -170,12 +171,12 @@ class FsHistoryProviderSuite extends SparkFunSuite with Matchers with Logging {
val provider = new TestFsHistoryProvider
val logFile1 = newLogFile("new1", None, inProgress = false)
writeFile(logFile1, true, None,
writeFile(logFile1, None,
SparkListenerApplicationStart("app1-1", Some("app1-1"), 1L, "test", None),
SparkListenerApplicationEnd(2L)
)
val logFile2 = newLogFile("new2", None, inProgress = false)
writeFile(logFile2, true, None,
writeFile(logFile2, None,
SparkListenerApplicationStart("app1-2", Some("app1-2"), 1L, "test", None),
SparkListenerApplicationEnd(2L)
)
@ -192,7 +193,7 @@ class FsHistoryProviderSuite extends SparkFunSuite with Matchers with Logging {
val provider = new FsHistoryProvider(createTestConf())
val logFile1 = newLogFile("app1", None, inProgress = true)
writeFile(logFile1, true, None,
writeFile(logFile1, None,
SparkListenerApplicationStart("app1", Some("app1"), 1L, "test", None),
SparkListenerApplicationEnd(2L)
)
@ -212,7 +213,7 @@ class FsHistoryProviderSuite extends SparkFunSuite with Matchers with Logging {
val provider = new FsHistoryProvider(createTestConf())
val logFile1 = newLogFile("app1", None, inProgress = true)
writeFile(logFile1, true, None,
writeFile(logFile1, None,
SparkListenerLogStart("1.4")
)
updateAndCheck(provider) { list =>
@ -224,7 +225,7 @@ class FsHistoryProviderSuite extends SparkFunSuite with Matchers with Logging {
val provider = new FsHistoryProvider(createTestConf())
val logFile1 = newLogFile("app1", None, inProgress = true)
writeFile(logFile1, true, None,
writeFile(logFile1, None,
SparkListenerApplicationStart("app1", Some("app1"), 1L, "test", None),
SparkListenerApplicationEnd(2L))
@ -240,7 +241,7 @@ class FsHistoryProviderSuite extends SparkFunSuite with Matchers with Logging {
val provider = new FsHistoryProvider(createTestConf())
val attempt1 = newLogFile("app1", Some("attempt1"), inProgress = true)
writeFile(attempt1, true, None,
writeFile(attempt1, None,
SparkListenerApplicationStart("app1", Some("app1"), 1L, "test", Some("attempt1"))
)
@ -250,7 +251,7 @@ class FsHistoryProviderSuite extends SparkFunSuite with Matchers with Logging {
}
val attempt2 = newLogFile("app1", Some("attempt2"), inProgress = true)
writeFile(attempt2, true, None,
writeFile(attempt2, None,
SparkListenerApplicationStart("app1", Some("app1"), 2L, "test", Some("attempt2"))
)
@ -261,7 +262,7 @@ class FsHistoryProviderSuite extends SparkFunSuite with Matchers with Logging {
}
val attempt3 = newLogFile("app1", Some("attempt3"), inProgress = false)
writeFile(attempt3, true, None,
writeFile(attempt3, None,
SparkListenerApplicationStart("app1", Some("app1"), 3L, "test", Some("attempt3")),
SparkListenerApplicationEnd(4L)
)
@ -273,7 +274,7 @@ class FsHistoryProviderSuite extends SparkFunSuite with Matchers with Logging {
}
val app2Attempt1 = newLogFile("app2", Some("attempt1"), inProgress = false)
writeFile(app2Attempt1, true, None,
writeFile(app2Attempt1, None,
SparkListenerApplicationStart("app2", Some("app2"), 5L, "test", Some("attempt1")),
SparkListenerApplicationEnd(6L)
)
@ -448,7 +449,7 @@ class FsHistoryProviderSuite extends SparkFunSuite with Matchers with Logging {
"test", Some("attempt1"))) ++ executorAddedEvents ++
(if (isCompletedApp) List(SparkListenerApplicationEnd(1000L)) else Seq())
writeFile(attempt1, true, None, allEvents: _*)
writeFile(attempt1, None, allEvents: _*)
updateAndCheck(provider) { list =>
list.size should be (1)
@ -487,14 +488,14 @@ class FsHistoryProviderSuite extends SparkFunSuite with Matchers with Logging {
createTestConf().set(MAX_LOG_AGE_S.key, s"${maxAge}ms"), clock)
val log1 = newLogFile("app1", Some("attempt1"), inProgress = false)
writeFile(log1, true, None,
writeFile(log1, None,
SparkListenerApplicationStart("app1", Some("app1"), 1L, "test", Some("attempt1")),
SparkListenerApplicationEnd(2L)
)
log1.setLastModified(0L)
val log2 = newLogFile("app1", Some("attempt2"), inProgress = false)
writeFile(log2, true, None,
writeFile(log2, None,
SparkListenerApplicationStart("app1", Some("app1"), 3L, "test", Some("attempt2")),
SparkListenerApplicationEnd(4L)
)
@ -532,14 +533,14 @@ class FsHistoryProviderSuite extends SparkFunSuite with Matchers with Logging {
val provider = new FsHistoryProvider(
createTestConf().set(MAX_LOG_AGE_S, maxAge / 1000), clock)
val log = newLogFile("inProgressApp1", None, inProgress = true)
writeFile(log, true, None,
writeFile(log, None,
SparkListenerApplicationStart(
"inProgressApp1", Some("inProgressApp1"), 3L, "test", Some("attempt1"))
)
clock.setTime(firstFileModifiedTime)
log.setLastModified(clock.getTimeMillis())
provider.checkForLogs()
writeFile(log, true, None,
writeFile(log, None,
SparkListenerApplicationStart(
"inProgressApp1", Some("inProgressApp1"), 3L, "test", Some("attempt1")),
SparkListenerJobStart(0, 1L, Nil, null)
@ -549,7 +550,7 @@ class FsHistoryProviderSuite extends SparkFunSuite with Matchers with Logging {
log.setLastModified(clock.getTimeMillis())
provider.checkForLogs()
clock.setTime(TimeUnit.DAYS.toMillis(10))
writeFile(log, true, None,
writeFile(log, None,
SparkListenerApplicationStart(
"inProgressApp1", Some("inProgressApp1"), 3L, "test", Some("attempt1")),
SparkListenerJobStart(0, 1L, Nil, null),
@ -572,7 +573,7 @@ class FsHistoryProviderSuite extends SparkFunSuite with Matchers with Logging {
createTestConf().set(MAX_LOG_AGE_S.key, s"${maxAge}ms"), clock)
val log1 = newLogFile("inProgressApp1", None, inProgress = true)
writeFile(log1, true, None,
writeFile(log1, None,
SparkListenerApplicationStart(
"inProgressApp1", Some("inProgressApp1"), 3L, "test", Some("attempt1"))
)
@ -581,7 +582,7 @@ class FsHistoryProviderSuite extends SparkFunSuite with Matchers with Logging {
provider.checkForLogs()
val log2 = newLogFile("inProgressApp2", None, inProgress = true)
writeFile(log2, true, None,
writeFile(log2, None,
SparkListenerApplicationStart(
"inProgressApp2", Some("inProgressApp2"), 23L, "test2", Some("attempt2"))
)
@ -615,7 +616,7 @@ class FsHistoryProviderSuite extends SparkFunSuite with Matchers with Logging {
val provider = new FsHistoryProvider(createTestConf())
val logs = (1 to 2).map { i =>
val log = newLogFile("downloadApp1", Some(s"attempt$i"), inProgress = false)
writeFile(log, true, None,
writeFile(log, None,
SparkListenerApplicationStart(
"downloadApp1", Some("downloadApp1"), 5000L * i, "test", Some(s"attempt$i")),
SparkListenerApplicationEnd(5001L * i)
@ -707,7 +708,7 @@ class FsHistoryProviderSuite extends SparkFunSuite with Matchers with Logging {
// Write a new log file without an app id, to make sure it's ignored.
val logFile1 = newLogFile("app1", None, inProgress = true)
writeFile(logFile1, true, None,
writeFile(logFile1, None,
SparkListenerLogStart("1.4")
)
@ -783,7 +784,7 @@ class FsHistoryProviderSuite extends SparkFunSuite with Matchers with Logging {
// and write one real file, which should still get picked up just fine
val newAppComplete = newLogFile("real-app", None, inProgress = false)
writeFile(newAppComplete, true, None,
writeFile(newAppComplete, None,
SparkListenerApplicationStart(newAppComplete.getName(), Some("new-app-complete"), 1L, "test",
None),
SparkListenerApplicationEnd(5L)
@ -808,7 +809,7 @@ class FsHistoryProviderSuite extends SparkFunSuite with Matchers with Logging {
try {
provider = new FsHistoryProvider(conf)
val log = newLogFile("app1", Some("attempt1"), inProgress = false)
writeFile(log, true, None,
writeFile(log, None,
SparkListenerApplicationStart("app1", Some("app1"), System.currentTimeMillis(),
"test", Some("attempt1")),
SparkListenerEnvironmentUpdate(Map(
@ -898,7 +899,7 @@ class FsHistoryProviderSuite extends SparkFunSuite with Matchers with Logging {
val oldProvider = new FsHistoryProvider(conf)
val logFile1 = newLogFile("app1", None, inProgress = false)
writeFile(logFile1, true, None,
writeFile(logFile1, None,
SparkListenerLogStart("2.3"),
SparkListenerApplicationStart("test", Some("test"), 1L, "test", None),
SparkListenerApplicationEnd(5L)
@ -926,7 +927,7 @@ class FsHistoryProviderSuite extends SparkFunSuite with Matchers with Logging {
// Write an incomplete app log.
val appLog = newLogFile(appId, None, inProgress = true)
writeFile(appLog, true, None,
writeFile(appLog, None,
SparkListenerApplicationStart(appId, Some(appId), 1L, "test", None)
)
provider.checkForLogs()
@ -939,7 +940,7 @@ class FsHistoryProviderSuite extends SparkFunSuite with Matchers with Logging {
}
// Add more info to the app log, and trigger the provider to update things.
writeFile(appLog, true, None,
writeFile(appLog, None,
SparkListenerApplicationStart(appId, Some(appId), 1L, "test", None),
SparkListenerJobStart(0, 1L, Nil, null)
)
@ -966,13 +967,13 @@ class FsHistoryProviderSuite extends SparkFunSuite with Matchers with Logging {
// Write logs for two app attempts.
clock.advance(1)
val attempt1 = newLogFile(appId, Some("1"), inProgress = false)
writeFile(attempt1, true, None,
writeFile(attempt1, None,
SparkListenerApplicationStart(appId, Some(appId), 1L, "test", Some("1")),
SparkListenerJobStart(0, 1L, Nil, null),
SparkListenerApplicationEnd(5L)
)
val attempt2 = newLogFile(appId, Some("2"), inProgress = false)
writeFile(attempt2, true, None,
writeFile(attempt2, None,
SparkListenerApplicationStart(appId, Some(appId), 1L, "test", Some("2")),
SparkListenerJobStart(0, 1L, Nil, null),
SparkListenerApplicationEnd(5L)
@ -1034,7 +1035,7 @@ class FsHistoryProviderSuite extends SparkFunSuite with Matchers with Logging {
// Create an incomplete log file, has an end record but no start record.
val corrupt = newLogFile("nonEmptyCorruptLogFile", None, inProgress = false)
writeFile(corrupt, true, None, SparkListenerApplicationEnd(0))
writeFile(corrupt, None, SparkListenerApplicationEnd(0))
corrupt.setLastModified(clock.getTimeMillis())
logCount += 1
@ -1050,7 +1051,7 @@ class FsHistoryProviderSuite extends SparkFunSuite with Matchers with Logging {
// Update the slow app to contain valid info. Code should detect the change and not clean
// it up.
writeFile(slowApp, true, None,
writeFile(slowApp, None,
SparkListenerApplicationStart(slowApp.getName(), Some(slowApp.getName()), 1L, "test", None))
slowApp.setLastModified(clock.getTimeMillis())
validLogCount += 1
@ -1067,7 +1068,7 @@ class FsHistoryProviderSuite extends SparkFunSuite with Matchers with Logging {
// Create a log file where the end event is before the configure chunk to be reparsed at
// the end of the file. The correct listing should still be generated.
val log = newLogFile("end-event-test", None, inProgress = false)
writeFile(log, true, None,
writeFile(log, None,
Seq(
SparkListenerApplicationStart("end-event-test", Some("end-event-test"), 1L, "test", None),
SparkListenerEnvironmentUpdate(Map(
@ -1096,13 +1097,13 @@ class FsHistoryProviderSuite extends SparkFunSuite with Matchers with Logging {
val provider = new FsHistoryProvider(conf)
val complete = newLogFile("complete", None, inProgress = false)
writeFile(complete, true, None,
writeFile(complete, None,
SparkListenerApplicationStart("complete", Some("complete"), 1L, "test", None),
SparkListenerApplicationEnd(5L)
)
val incomplete = newLogFile("incomplete", None, inProgress = true)
writeFile(incomplete, true, None,
writeFile(incomplete, None,
SparkListenerApplicationStart("incomplete", Some("incomplete"), 1L, "test", None)
)
@ -1116,10 +1117,10 @@ class FsHistoryProviderSuite extends SparkFunSuite with Matchers with Logging {
val clock = new ManualClock(1533132471)
val provider = new FsHistoryProvider(createTestConf(), clock)
val accessDenied = newLogFile("accessDenied", None, inProgress = false)
writeFile(accessDenied, true, None,
writeFile(accessDenied, None,
SparkListenerApplicationStart("accessDenied", Some("accessDenied"), 1L, "test", None))
val accessGranted = newLogFile("accessGranted", None, inProgress = false)
writeFile(accessGranted, true, None,
writeFile(accessGranted, None,
SparkListenerApplicationStart("accessGranted", Some("accessGranted"), 1L, "test", None),
SparkListenerApplicationEnd(5L))
var isReadable = false
@ -1189,34 +1190,34 @@ class FsHistoryProviderSuite extends SparkFunSuite with Matchers with Logging {
val clock = new ManualClock(0)
(5 to 0 by -1).foreach { num =>
val log1_1 = newLogFile("app1", Some("attempt1"), inProgress = false)
writeFile(log1_1, true, None,
writeFile(log1_1, None,
SparkListenerApplicationStart("app1", Some("app1"), 1L, "test", Some("attempt1")),
SparkListenerApplicationEnd(2L)
)
log1_1.setLastModified(2L)
val log2_1 = newLogFile("app2", Some("attempt1"), inProgress = false)
writeFile(log2_1, true, None,
writeFile(log2_1, None,
SparkListenerApplicationStart("app2", Some("app2"), 3L, "test", Some("attempt1")),
SparkListenerApplicationEnd(4L)
)
log2_1.setLastModified(4L)
val log3_1 = newLogFile("app3", Some("attempt1"), inProgress = false)
writeFile(log3_1, true, None,
writeFile(log3_1, None,
SparkListenerApplicationStart("app3", Some("app3"), 5L, "test", Some("attempt1")),
SparkListenerApplicationEnd(6L)
)
log3_1.setLastModified(6L)
val log1_2_incomplete = newLogFile("app1", Some("attempt2"), inProgress = false)
writeFile(log1_2_incomplete, true, None,
writeFile(log1_2_incomplete, None,
SparkListenerApplicationStart("app1", Some("app1"), 7L, "test", Some("attempt2"))
)
log1_2_incomplete.setLastModified(8L)
val log3_2 = newLogFile("app3", Some("attempt2"), inProgress = false)
writeFile(log3_2, true, None,
writeFile(log3_2, None,
SparkListenerApplicationStart("app3", Some("app3"), 9L, "test", Some("attempt2")),
SparkListenerApplicationEnd(10L)
)
@ -1250,19 +1251,12 @@ class FsHistoryProviderSuite extends SparkFunSuite with Matchers with Logging {
checkFn(provider.getListing().toSeq)
}
private def writeFile(file: File, isNewFormat: Boolean, codec: Option[CompressionCodec],
private def writeFile(file: File, codec: Option[CompressionCodec],
events: SparkListenerEvent*) = {
val fstream = new FileOutputStream(file)
val cstream = codec.map(_.compressedOutputStream(fstream)).getOrElse(fstream)
val bstream = new BufferedOutputStream(cstream)
if (isNewFormat) {
val newFormatStream = new FileOutputStream(file)
Utils.tryWithSafeFinally {
EventLoggingListener.initEventLog(newFormatStream, false, null)
} {
newFormatStream.close()
}
}
EventLoggingListener.initEventLog(bstream, false, null)
val writer = new OutputStreamWriter(bstream, StandardCharsets.UTF_8)
Utils.tryWithSafeFinally {