[SPARK-5783] Better eventlog-parsing error messages
Author: Ryan Williams <ryan.blake.williams@gmail.com> Closes #4573 from ryan-williams/history and squashes the following commits: a8647ec [Ryan Williams] fix test calls to .replay() 98aa3fe [Ryan Williams] include filename in history-parsing error message 8deecf0 [Ryan Williams] add line number to history-parsing error message b668b52 [Ryan Williams] add log info line to history-eventlog parsing
This commit is contained in:
parent
e1a1ff8108
commit
fc6d3e796a
|
@ -247,6 +247,7 @@ private[history] class FsHistoryProvider(conf: SparkConf) extends ApplicationHis
|
|||
*/
|
||||
private def replay(eventLog: FileStatus, bus: ReplayListenerBus): FsApplicationHistoryInfo = {
|
||||
val logPath = eventLog.getPath()
|
||||
logInfo(s"Replaying log path: $logPath")
|
||||
val (logInput, sparkVersion) =
|
||||
if (isLegacyLogDirectory(eventLog)) {
|
||||
openLegacyEventLog(logPath)
|
||||
|
@ -256,7 +257,7 @@ private[history] class FsHistoryProvider(conf: SparkConf) extends ApplicationHis
|
|||
try {
|
||||
val appListener = new ApplicationEventListener
|
||||
bus.addListener(appListener)
|
||||
bus.replay(logInput, sparkVersion)
|
||||
bus.replay(logInput, sparkVersion, logPath.toString)
|
||||
new FsApplicationHistoryInfo(
|
||||
logPath.getName(),
|
||||
appListener.appId.getOrElse(logPath.getName()),
|
||||
|
|
|
@ -761,7 +761,7 @@ private[spark] class Master(
|
|||
val ui = SparkUI.createHistoryUI(new SparkConf, replayBus, new SecurityManager(conf),
|
||||
appName + " (completed)", HistoryServer.UI_PATH_PREFIX + s"/${app.id}")
|
||||
try {
|
||||
replayBus.replay(logInput, sparkVersion)
|
||||
replayBus.replay(logInput, sparkVersion, eventLogFile)
|
||||
} finally {
|
||||
logInput.close()
|
||||
}
|
||||
|
|
|
@ -40,21 +40,24 @@ private[spark] class ReplayListenerBus extends SparkListenerBus with Logging {
|
|||
*
|
||||
* @param logData Stream containing event log data.
|
||||
* @param version Spark version that generated the events.
|
||||
* @param sourceName Filename (or other source identifier) from whence @logData is being read
|
||||
*/
|
||||
def replay(logData: InputStream, version: String) {
|
||||
def replay(logData: InputStream, version: String, sourceName: String) {
|
||||
var currentLine: String = null
|
||||
var lineNumber: Int = 1
|
||||
try {
|
||||
val lines = Source.fromInputStream(logData).getLines()
|
||||
lines.foreach { line =>
|
||||
currentLine = line
|
||||
postToAll(JsonProtocol.sparkEventFromJson(parse(line)))
|
||||
lineNumber += 1
|
||||
}
|
||||
} catch {
|
||||
case ioe: IOException =>
|
||||
throw ioe
|
||||
case e: Exception =>
|
||||
logError("Exception in parsing Spark event log.", e)
|
||||
logError("Malformed line: %s\n".format(currentLine))
|
||||
logError(s"Exception parsing Spark event log: $sourceName", e)
|
||||
logError(s"Malformed line #$lineNumber: $currentLine\n")
|
||||
}
|
||||
}
|
||||
|
||||
|
|
|
@ -61,7 +61,7 @@ class ReplayListenerSuite extends FunSuite with BeforeAndAfter {
|
|||
try {
|
||||
val replayer = new ReplayListenerBus()
|
||||
replayer.addListener(eventMonster)
|
||||
replayer.replay(logData, SPARK_VERSION)
|
||||
replayer.replay(logData, SPARK_VERSION, logFilePath.toString)
|
||||
} finally {
|
||||
logData.close()
|
||||
}
|
||||
|
@ -120,7 +120,7 @@ class ReplayListenerSuite extends FunSuite with BeforeAndAfter {
|
|||
try {
|
||||
val replayer = new ReplayListenerBus()
|
||||
replayer.addListener(eventMonster)
|
||||
replayer.replay(logData, version)
|
||||
replayer.replay(logData, version, eventLog.getPath().toString)
|
||||
} finally {
|
||||
logData.close()
|
||||
}
|
||||
|
|
Loading…
Reference in a new issue