[SPARK-6860][Streaming][WebUI] Fix the possible inconsistency of StreamingPage

Because `StreamingPage.render` doesn't hold the `listener` lock when generating the content, the different parts of content may have some inconsistent values if `listener` updates its status at the same time. And it will confuse people.

This PR added `listener.synchronized` to make sure we have a consistent view of StreamingJobProgressListener when creating the content.

Author: zsxwing <zsxwing@gmail.com>

Closes #5470 from zsxwing/SPARK-6860 and squashes the following commits:

cec6f92 [zsxwing] Add missing 'synchronized' in StreamingJobProgressListener
7182498 [zsxwing] Add synchronized to make sure we have a consistent view of StreamingJobProgressListener when creating the content
This commit is contained in:
zsxwing 2015-04-13 12:21:29 +01:00 committed by Sean Owen
parent cadd7d72c5
commit 14ce3ea2c9
2 changed files with 7 additions and 6 deletions

View file

@ -149,7 +149,7 @@ private[streaming] class StreamingJobProgressListener(ssc: StreamingContext)
}.toMap }.toMap
} }
def lastReceivedBatchRecords: Map[Int, Long] = { def lastReceivedBatchRecords: Map[Int, Long] = synchronized {
val lastReceivedBlockInfoOption = lastReceivedBatch.map(_.receivedBlockInfo) val lastReceivedBlockInfoOption = lastReceivedBatch.map(_.receivedBlockInfo)
lastReceivedBlockInfoOption.map { lastReceivedBlockInfo => lastReceivedBlockInfoOption.map { lastReceivedBlockInfo =>
(0 until numReceivers).map { receiverId => (0 until numReceivers).map { receiverId =>
@ -160,19 +160,19 @@ private[streaming] class StreamingJobProgressListener(ssc: StreamingContext)
} }
} }
def receiverInfo(receiverId: Int): Option[ReceiverInfo] = { def receiverInfo(receiverId: Int): Option[ReceiverInfo] = synchronized {
receiverInfos.get(receiverId) receiverInfos.get(receiverId)
} }
def lastCompletedBatch: Option[BatchInfo] = { def lastCompletedBatch: Option[BatchInfo] = synchronized {
completedBatchInfos.sortBy(_.batchTime)(Time.ordering).lastOption completedBatchInfos.sortBy(_.batchTime)(Time.ordering).lastOption
} }
def lastReceivedBatch: Option[BatchInfo] = { def lastReceivedBatch: Option[BatchInfo] = synchronized {
retainedBatches.lastOption retainedBatches.lastOption
} }
private def retainedBatches: Seq[BatchInfo] = synchronized { private def retainedBatches: Seq[BatchInfo] = {
(waitingBatchInfos.values.toSeq ++ (waitingBatchInfos.values.toSeq ++
runningBatchInfos.values.toSeq ++ completedBatchInfos).sortBy(_.batchTime)(Time.ordering) runningBatchInfos.values.toSeq ++ completedBatchInfos).sortBy(_.batchTime)(Time.ordering)
} }

View file

@ -37,11 +37,12 @@ private[ui] class StreamingPage(parent: StreamingTab)
/** Render the page */ /** Render the page */
def render(request: HttpServletRequest): Seq[Node] = { def render(request: HttpServletRequest): Seq[Node] = {
val content = val content = listener.synchronized {
generateBasicStats() ++ <br></br> ++ generateBasicStats() ++ <br></br> ++
<h4>Statistics over last {listener.retainedCompletedBatches.size} processed batches</h4> ++ <h4>Statistics over last {listener.retainedCompletedBatches.size} processed batches</h4> ++
generateReceiverStats() ++ generateReceiverStats() ++
generateBatchStatsTable() generateBatchStatsTable()
}
UIUtils.headerSparkPage("Streaming", content, parent, Some(5000)) UIUtils.headerSparkPage("Streaming", content, parent, Some(5000))
} }