[SPARK-8701] [STREAMING] [WEBUI] Add input metadata in the batch page
This PR adds `metadata` to `InputInfo`. `InputDStream` can report its metadata for a batch and it will be shown in the batch page. For example, ![screen shot](https://cloud.githubusercontent.com/assets/1000778/8403741/d6ffc7e2-1e79-11e5-9888-c78c1575123a.png) FileInputDStream will display the new files for a batch, and DirectKafkaInputDStream will display its offset ranges. Author: zsxwing <zsxwing@gmail.com> Closes #7081 from zsxwing/input-metadata and squashes the following commits: f7abd9b [zsxwing] Revert the space changes in project/MimaExcludes.scala d906209 [zsxwing] Merge branch 'master' into input-metadata 74762da [zsxwing] Fix MiMa tests 7903e33 [zsxwing] Merge branch 'master' into input-metadata 450a46c [zsxwing] Address comments 1d94582 [zsxwing] Raname InputInfo to StreamInputInfo and change "metadata" to Map[String, Any] d496ae9 [zsxwing] Add input metadata in the batch page
This commit is contained in:
parent
c4830598b2
commit
1f6b0b1234
|
@ -19,7 +19,7 @@ package org.apache.spark.streaming.kafka
|
|||
|
||||
import scala.annotation.tailrec
|
||||
import scala.collection.mutable
|
||||
import scala.reflect.{classTag, ClassTag}
|
||||
import scala.reflect.ClassTag
|
||||
|
||||
import kafka.common.TopicAndPartition
|
||||
import kafka.message.MessageAndMetadata
|
||||
|
@ -29,7 +29,7 @@ import org.apache.spark.{Logging, SparkException}
|
|||
import org.apache.spark.streaming.{StreamingContext, Time}
|
||||
import org.apache.spark.streaming.dstream._
|
||||
import org.apache.spark.streaming.kafka.KafkaCluster.LeaderOffset
|
||||
import org.apache.spark.streaming.scheduler.InputInfo
|
||||
import org.apache.spark.streaming.scheduler.StreamInputInfo
|
||||
|
||||
/**
|
||||
* A stream of {@link org.apache.spark.streaming.kafka.KafkaRDD} where
|
||||
|
@ -119,8 +119,23 @@ class DirectKafkaInputDStream[
|
|||
val rdd = KafkaRDD[K, V, U, T, R](
|
||||
context.sparkContext, kafkaParams, currentOffsets, untilOffsets, messageHandler)
|
||||
|
||||
// Report the record number of this batch interval to InputInfoTracker.
|
||||
val inputInfo = InputInfo(id, rdd.count)
|
||||
// Report the record number and metadata of this batch interval to InputInfoTracker.
|
||||
val offsetRanges = currentOffsets.map { case (tp, fo) =>
|
||||
val uo = untilOffsets(tp)
|
||||
OffsetRange(tp.topic, tp.partition, fo, uo.offset)
|
||||
}
|
||||
val description = offsetRanges.filter { offsetRange =>
|
||||
// Don't display empty ranges.
|
||||
offsetRange.fromOffset != offsetRange.untilOffset
|
||||
}.map { offsetRange =>
|
||||
s"topic: ${offsetRange.topic}\tpartition: ${offsetRange.partition}\t" +
|
||||
s"offsets: ${offsetRange.fromOffset} to ${offsetRange.untilOffset}"
|
||||
}.mkString("\n")
|
||||
// Copy offsetRanges to immutable.List to prevent from being modified by the user
|
||||
val metadata = Map(
|
||||
"offsets" -> offsetRanges.toList,
|
||||
StreamInputInfo.METADATA_KEY_DESCRIPTION -> description)
|
||||
val inputInfo = StreamInputInfo(id, rdd.count, metadata)
|
||||
ssc.scheduler.inputInfoTracker.reportInfo(validTime, inputInfo)
|
||||
|
||||
currentOffsets = untilOffsets.map(kv => kv._1 -> kv._2.offset)
|
||||
|
|
|
@ -75,7 +75,7 @@ final class OffsetRange private(
|
|||
}
|
||||
|
||||
override def toString(): String = {
|
||||
s"OffsetRange(topic: '$topic', partition: $partition, range: [$fromOffset -> $untilOffset]"
|
||||
s"OffsetRange(topic: '$topic', partition: $partition, range: [$fromOffset -> $untilOffset])"
|
||||
}
|
||||
|
||||
/** this is to avoid ClassNotFoundException during checkpoint restore */
|
||||
|
|
|
@ -77,6 +77,12 @@ object MimaExcludes {
|
|||
// SPARK-8914 Remove RDDApi
|
||||
ProblemFilters.exclude[MissingClassProblem](
|
||||
"org.apache.spark.sql.RDDApi")
|
||||
) ++ Seq(
|
||||
// SPARK-8701 Add input metadata in the batch page.
|
||||
ProblemFilters.exclude[MissingClassProblem](
|
||||
"org.apache.spark.streaming.scheduler.InputInfo$"),
|
||||
ProblemFilters.exclude[MissingClassProblem](
|
||||
"org.apache.spark.streaming.scheduler.InputInfo")
|
||||
)
|
||||
|
||||
case v if v.startsWith("1.4") =>
|
||||
|
|
|
@ -28,6 +28,7 @@ import org.apache.hadoop.mapreduce.{InputFormat => NewInputFormat}
|
|||
|
||||
import org.apache.spark.rdd.{RDD, UnionRDD}
|
||||
import org.apache.spark.streaming._
|
||||
import org.apache.spark.streaming.scheduler.StreamInputInfo
|
||||
import org.apache.spark.util.{SerializableConfiguration, TimeStampedHashMap, Utils}
|
||||
|
||||
/**
|
||||
|
@ -144,7 +145,14 @@ class FileInputDStream[K, V, F <: NewInputFormat[K, V]](
|
|||
logInfo("New files at time " + validTime + ":\n" + newFiles.mkString("\n"))
|
||||
batchTimeToSelectedFiles += ((validTime, newFiles))
|
||||
recentlySelectedFiles ++= newFiles
|
||||
Some(filesToRDD(newFiles))
|
||||
val rdds = Some(filesToRDD(newFiles))
|
||||
// Copy newFiles to immutable.List to prevent from being modified by the user
|
||||
val metadata = Map(
|
||||
"files" -> newFiles.toList,
|
||||
StreamInputInfo.METADATA_KEY_DESCRIPTION -> newFiles.mkString("\n"))
|
||||
val inputInfo = StreamInputInfo(id, 0, metadata)
|
||||
ssc.scheduler.inputInfoTracker.reportInfo(validTime, inputInfo)
|
||||
rdds
|
||||
}
|
||||
|
||||
/** Clear the old time-to-files mappings along with old RDDs */
|
||||
|
|
|
@ -24,7 +24,7 @@ import org.apache.spark.storage.BlockId
|
|||
import org.apache.spark.streaming._
|
||||
import org.apache.spark.streaming.rdd.WriteAheadLogBackedBlockRDD
|
||||
import org.apache.spark.streaming.receiver.Receiver
|
||||
import org.apache.spark.streaming.scheduler.InputInfo
|
||||
import org.apache.spark.streaming.scheduler.StreamInputInfo
|
||||
import org.apache.spark.streaming.util.WriteAheadLogUtils
|
||||
|
||||
/**
|
||||
|
@ -70,7 +70,7 @@ abstract class ReceiverInputDStream[T: ClassTag](@transient ssc_ : StreamingCont
|
|||
val blockIds = blockInfos.map { _.blockId.asInstanceOf[BlockId] }.toArray
|
||||
|
||||
// Register the input blocks information into InputInfoTracker
|
||||
val inputInfo = InputInfo(id, blockInfos.flatMap(_.numRecords).sum)
|
||||
val inputInfo = StreamInputInfo(id, blockInfos.flatMap(_.numRecords).sum)
|
||||
ssc.scheduler.inputInfoTracker.reportInfo(validTime, inputInfo)
|
||||
|
||||
if (blockInfos.nonEmpty) {
|
||||
|
|
|
@ -24,7 +24,7 @@ import org.apache.spark.streaming.Time
|
|||
* :: DeveloperApi ::
|
||||
* Class having information on completed batches.
|
||||
* @param batchTime Time of the batch
|
||||
* @param streamIdToNumRecords A map of input stream id to record number
|
||||
* @param streamIdToInputInfo A map of input stream id to its input info
|
||||
* @param submissionTime Clock time of when jobs of this batch was submitted to
|
||||
* the streaming scheduler queue
|
||||
* @param processingStartTime Clock time of when the first job of this batch started processing
|
||||
|
@ -33,12 +33,15 @@ import org.apache.spark.streaming.Time
|
|||
@DeveloperApi
|
||||
case class BatchInfo(
|
||||
batchTime: Time,
|
||||
streamIdToNumRecords: Map[Int, Long],
|
||||
streamIdToInputInfo: Map[Int, StreamInputInfo],
|
||||
submissionTime: Long,
|
||||
processingStartTime: Option[Long],
|
||||
processingEndTime: Option[Long]
|
||||
) {
|
||||
|
||||
@deprecated("Use streamIdToInputInfo instead", "1.5.0")
|
||||
def streamIdToNumRecords: Map[Int, Long] = streamIdToInputInfo.mapValues(_.numRecords)
|
||||
|
||||
/**
|
||||
* Time taken for the first job of this batch to start processing from the time this batch
|
||||
* was submitted to the streaming scheduler. Essentially, it is
|
||||
|
@ -63,5 +66,5 @@ case class BatchInfo(
|
|||
/**
|
||||
* The number of recorders received by the receivers in this batch.
|
||||
*/
|
||||
def numRecords: Long = streamIdToNumRecords.values.sum
|
||||
def numRecords: Long = streamIdToInputInfo.values.map(_.numRecords).sum
|
||||
}
|
||||
|
|
|
@ -20,11 +20,34 @@ package org.apache.spark.streaming.scheduler
|
|||
import scala.collection.mutable
|
||||
|
||||
import org.apache.spark.Logging
|
||||
import org.apache.spark.annotation.DeveloperApi
|
||||
import org.apache.spark.streaming.{Time, StreamingContext}
|
||||
|
||||
/** To track the information of input stream at specified batch time. */
|
||||
private[streaming] case class InputInfo(inputStreamId: Int, numRecords: Long) {
|
||||
/**
|
||||
* :: DeveloperApi ::
|
||||
* Track the information of input stream at specified batch time.
|
||||
*
|
||||
* @param inputStreamId the input stream id
|
||||
* @param numRecords the number of records in a batch
|
||||
* @param metadata metadata for this batch. It should contain at least one standard field named
|
||||
* "Description" which maps to the content that will be shown in the UI.
|
||||
*/
|
||||
@DeveloperApi
|
||||
case class StreamInputInfo(
|
||||
inputStreamId: Int, numRecords: Long, metadata: Map[String, Any] = Map.empty) {
|
||||
require(numRecords >= 0, "numRecords must not be negative")
|
||||
|
||||
def metadataDescription: Option[String] =
|
||||
metadata.get(StreamInputInfo.METADATA_KEY_DESCRIPTION).map(_.toString)
|
||||
}
|
||||
|
||||
@DeveloperApi
|
||||
object StreamInputInfo {
|
||||
|
||||
/**
|
||||
* The key for description in `StreamInputInfo.metadata`.
|
||||
*/
|
||||
val METADATA_KEY_DESCRIPTION: String = "Description"
|
||||
}
|
||||
|
||||
/**
|
||||
|
@ -34,12 +57,13 @@ private[streaming] case class InputInfo(inputStreamId: Int, numRecords: Long) {
|
|||
private[streaming] class InputInfoTracker(ssc: StreamingContext) extends Logging {
|
||||
|
||||
// Map to track all the InputInfo related to specific batch time and input stream.
|
||||
private val batchTimeToInputInfos = new mutable.HashMap[Time, mutable.HashMap[Int, InputInfo]]
|
||||
private val batchTimeToInputInfos =
|
||||
new mutable.HashMap[Time, mutable.HashMap[Int, StreamInputInfo]]
|
||||
|
||||
/** Report the input information with batch time to the tracker */
|
||||
def reportInfo(batchTime: Time, inputInfo: InputInfo): Unit = synchronized {
|
||||
def reportInfo(batchTime: Time, inputInfo: StreamInputInfo): Unit = synchronized {
|
||||
val inputInfos = batchTimeToInputInfos.getOrElseUpdate(batchTime,
|
||||
new mutable.HashMap[Int, InputInfo]())
|
||||
new mutable.HashMap[Int, StreamInputInfo]())
|
||||
|
||||
if (inputInfos.contains(inputInfo.inputStreamId)) {
|
||||
throw new IllegalStateException(s"Input stream ${inputInfo.inputStreamId}} for batch" +
|
||||
|
@ -49,10 +73,10 @@ private[streaming] class InputInfoTracker(ssc: StreamingContext) extends Logging
|
|||
}
|
||||
|
||||
/** Get the all the input stream's information of specified batch time */
|
||||
def getInfo(batchTime: Time): Map[Int, InputInfo] = synchronized {
|
||||
def getInfo(batchTime: Time): Map[Int, StreamInputInfo] = synchronized {
|
||||
val inputInfos = batchTimeToInputInfos.get(batchTime)
|
||||
// Convert mutable HashMap to immutable Map for the caller
|
||||
inputInfos.map(_.toMap).getOrElse(Map[Int, InputInfo]())
|
||||
inputInfos.map(_.toMap).getOrElse(Map[Int, StreamInputInfo]())
|
||||
}
|
||||
|
||||
/** Cleanup the tracked input information older than threshold batch time */
|
||||
|
|
|
@ -244,8 +244,7 @@ class JobGenerator(jobScheduler: JobScheduler) extends Logging {
|
|||
} match {
|
||||
case Success(jobs) =>
|
||||
val streamIdToInputInfos = jobScheduler.inputInfoTracker.getInfo(time)
|
||||
val streamIdToNumRecords = streamIdToInputInfos.mapValues(_.numRecords)
|
||||
jobScheduler.submitJobSet(JobSet(time, jobs, streamIdToNumRecords))
|
||||
jobScheduler.submitJobSet(JobSet(time, jobs, streamIdToInputInfos))
|
||||
case Failure(e) =>
|
||||
jobScheduler.reportError("Error generating jobs for time " + time, e)
|
||||
}
|
||||
|
|
|
@ -28,7 +28,7 @@ private[streaming]
|
|||
case class JobSet(
|
||||
time: Time,
|
||||
jobs: Seq[Job],
|
||||
streamIdToNumRecords: Map[Int, Long] = Map.empty) {
|
||||
streamIdToInputInfo: Map[Int, StreamInputInfo] = Map.empty) {
|
||||
|
||||
private val incompleteJobs = new HashSet[Job]()
|
||||
private val submissionTime = System.currentTimeMillis() // when this jobset was submitted
|
||||
|
@ -64,7 +64,7 @@ case class JobSet(
|
|||
def toBatchInfo: BatchInfo = {
|
||||
new BatchInfo(
|
||||
time,
|
||||
streamIdToNumRecords,
|
||||
streamIdToInputInfo,
|
||||
submissionTime,
|
||||
if (processingStartTime >= 0 ) Some(processingStartTime) else None,
|
||||
if (processingEndTime >= 0 ) Some(processingEndTime) else None
|
||||
|
|
|
@ -17,11 +17,9 @@
|
|||
|
||||
package org.apache.spark.streaming.ui
|
||||
|
||||
import java.text.SimpleDateFormat
|
||||
import java.util.Date
|
||||
import javax.servlet.http.HttpServletRequest
|
||||
|
||||
import scala.xml.{NodeSeq, Node, Text}
|
||||
import scala.xml.{NodeSeq, Node, Text, Unparsed}
|
||||
|
||||
import org.apache.commons.lang3.StringEscapeUtils
|
||||
|
||||
|
@ -303,6 +301,9 @@ private[ui] class BatchPage(parent: StreamingTab) extends WebUIPage("batch") {
|
|||
batchUIData.processingDelay.map(SparkUIUtils.formatDuration).getOrElse("-")
|
||||
val formattedTotalDelay = batchUIData.totalDelay.map(SparkUIUtils.formatDuration).getOrElse("-")
|
||||
|
||||
val inputMetadatas = batchUIData.streamIdToInputInfo.values.flatMap { inputInfo =>
|
||||
inputInfo.metadataDescription.map(desc => inputInfo.inputStreamId -> desc)
|
||||
}.toSeq
|
||||
val summary: NodeSeq =
|
||||
<div>
|
||||
<ul class="unstyled">
|
||||
|
@ -326,6 +327,13 @@ private[ui] class BatchPage(parent: StreamingTab) extends WebUIPage("batch") {
|
|||
<strong>Total delay: </strong>
|
||||
{formattedTotalDelay}
|
||||
</li>
|
||||
{
|
||||
if (inputMetadatas.nonEmpty) {
|
||||
<li>
|
||||
<strong>Input Metadata:</strong>{generateInputMetadataTable(inputMetadatas)}
|
||||
</li>
|
||||
}
|
||||
}
|
||||
</ul>
|
||||
</div>
|
||||
|
||||
|
@ -340,4 +348,33 @@ private[ui] class BatchPage(parent: StreamingTab) extends WebUIPage("batch") {
|
|||
|
||||
SparkUIUtils.headerSparkPage(s"Details of batch at $formattedBatchTime", content, parent)
|
||||
}
|
||||
|
||||
def generateInputMetadataTable(inputMetadatas: Seq[(Int, String)]): Seq[Node] = {
|
||||
<table class={SparkUIUtils.TABLE_CLASS_STRIPED}>
|
||||
<thead>
|
||||
<tr>
|
||||
<th>Input</th>
|
||||
<th>Metadata</th>
|
||||
</tr>
|
||||
</thead>
|
||||
<tbody>
|
||||
{inputMetadatas.flatMap(generateInputMetadataRow)}
|
||||
</tbody>
|
||||
</table>
|
||||
}
|
||||
|
||||
def generateInputMetadataRow(inputMetadata: (Int, String)): Seq[Node] = {
|
||||
val streamId = inputMetadata._1
|
||||
|
||||
<tr>
|
||||
<td>{streamingListener.streamName(streamId).getOrElse(s"Stream-$streamId")}</td>
|
||||
<td>{metadataDescriptionToHTML(inputMetadata._2)}</td>
|
||||
</tr>
|
||||
}
|
||||
|
||||
private def metadataDescriptionToHTML(metadataDescription: String): Seq[Node] = {
|
||||
// tab to 4 spaces and "\n" to "<br/>"
|
||||
Unparsed(StringEscapeUtils.escapeHtml4(metadataDescription).
|
||||
replaceAllLiterally("\t", " ").replaceAllLiterally("\n", "<br/>"))
|
||||
}
|
||||
}
|
||||
|
|
|
@ -19,14 +19,14 @@
|
|||
package org.apache.spark.streaming.ui
|
||||
|
||||
import org.apache.spark.streaming.Time
|
||||
import org.apache.spark.streaming.scheduler.BatchInfo
|
||||
import org.apache.spark.streaming.scheduler.{BatchInfo, StreamInputInfo}
|
||||
import org.apache.spark.streaming.ui.StreamingJobProgressListener._
|
||||
|
||||
private[ui] case class OutputOpIdAndSparkJobId(outputOpId: OutputOpId, sparkJobId: SparkJobId)
|
||||
|
||||
private[ui] case class BatchUIData(
|
||||
val batchTime: Time,
|
||||
val streamIdToNumRecords: Map[Int, Long],
|
||||
val streamIdToInputInfo: Map[Int, StreamInputInfo],
|
||||
val submissionTime: Long,
|
||||
val processingStartTime: Option[Long],
|
||||
val processingEndTime: Option[Long],
|
||||
|
@ -58,7 +58,7 @@ private[ui] case class BatchUIData(
|
|||
/**
|
||||
* The number of recorders received by the receivers in this batch.
|
||||
*/
|
||||
def numRecords: Long = streamIdToNumRecords.values.sum
|
||||
def numRecords: Long = streamIdToInputInfo.values.map(_.numRecords).sum
|
||||
}
|
||||
|
||||
private[ui] object BatchUIData {
|
||||
|
@ -66,7 +66,7 @@ private[ui] object BatchUIData {
|
|||
def apply(batchInfo: BatchInfo): BatchUIData = {
|
||||
new BatchUIData(
|
||||
batchInfo.batchTime,
|
||||
batchInfo.streamIdToNumRecords,
|
||||
batchInfo.streamIdToInputInfo,
|
||||
batchInfo.submissionTime,
|
||||
batchInfo.processingStartTime,
|
||||
batchInfo.processingEndTime
|
||||
|
|
|
@ -192,7 +192,7 @@ private[streaming] class StreamingJobProgressListener(ssc: StreamingContext)
|
|||
def receivedEventRateWithBatchTime: Map[Int, Seq[(Long, Double)]] = synchronized {
|
||||
val _retainedBatches = retainedBatches
|
||||
val latestBatches = _retainedBatches.map { batchUIData =>
|
||||
(batchUIData.batchTime.milliseconds, batchUIData.streamIdToNumRecords)
|
||||
(batchUIData.batchTime.milliseconds, batchUIData.streamIdToInputInfo.mapValues(_.numRecords))
|
||||
}
|
||||
streamIds.map { streamId =>
|
||||
val eventRates = latestBatches.map {
|
||||
|
@ -205,7 +205,8 @@ private[streaming] class StreamingJobProgressListener(ssc: StreamingContext)
|
|||
}
|
||||
|
||||
def lastReceivedBatchRecords: Map[Int, Long] = synchronized {
|
||||
val lastReceivedBlockInfoOption = lastReceivedBatch.map(_.streamIdToNumRecords)
|
||||
val lastReceivedBlockInfoOption =
|
||||
lastReceivedBatch.map(_.streamIdToInputInfo.mapValues(_.numRecords))
|
||||
lastReceivedBlockInfoOption.map { lastReceivedBlockInfo =>
|
||||
streamIds.map { streamId =>
|
||||
(streamId, lastReceivedBlockInfo.getOrElse(streamId, 0L))
|
||||
|
|
|
@ -59,7 +59,7 @@ class StreamingListenerSuite extends TestSuiteBase with Matchers {
|
|||
|
||||
batchInfosSubmitted.foreach { info =>
|
||||
info.numRecords should be (1L)
|
||||
info.streamIdToNumRecords should be (Map(0 -> 1L))
|
||||
info.streamIdToInputInfo should be (Map(0 -> StreamInputInfo(0, 1L)))
|
||||
}
|
||||
|
||||
isInIncreasingOrder(batchInfosSubmitted.map(_.submissionTime)) should be (true)
|
||||
|
@ -77,7 +77,7 @@ class StreamingListenerSuite extends TestSuiteBase with Matchers {
|
|||
|
||||
batchInfosStarted.foreach { info =>
|
||||
info.numRecords should be (1L)
|
||||
info.streamIdToNumRecords should be (Map(0 -> 1L))
|
||||
info.streamIdToInputInfo should be (Map(0 -> StreamInputInfo(0, 1L)))
|
||||
}
|
||||
|
||||
isInIncreasingOrder(batchInfosStarted.map(_.submissionTime)) should be (true)
|
||||
|
@ -98,7 +98,7 @@ class StreamingListenerSuite extends TestSuiteBase with Matchers {
|
|||
|
||||
batchInfosCompleted.foreach { info =>
|
||||
info.numRecords should be (1L)
|
||||
info.streamIdToNumRecords should be (Map(0 -> 1L))
|
||||
info.streamIdToInputInfo should be (Map(0 -> StreamInputInfo(0, 1L)))
|
||||
}
|
||||
|
||||
isInIncreasingOrder(batchInfosCompleted.map(_.submissionTime)) should be (true)
|
||||
|
|
|
@ -76,7 +76,7 @@ class TestInputStream[T: ClassTag](ssc_ : StreamingContext, input: Seq[Seq[T]],
|
|||
}
|
||||
|
||||
// Report the input data's information to InputInfoTracker for testing
|
||||
val inputInfo = InputInfo(id, selectedInput.length.toLong)
|
||||
val inputInfo = StreamInputInfo(id, selectedInput.length.toLong)
|
||||
ssc.scheduler.inputInfoTracker.reportInfo(validTime, inputInfo)
|
||||
|
||||
val rdd = ssc.sc.makeRDD(selectedInput, numPartitions)
|
||||
|
|
|
@ -46,8 +46,8 @@ class InputInfoTrackerSuite extends SparkFunSuite with BeforeAndAfter {
|
|||
val streamId1 = 0
|
||||
val streamId2 = 1
|
||||
val time = Time(0L)
|
||||
val inputInfo1 = InputInfo(streamId1, 100L)
|
||||
val inputInfo2 = InputInfo(streamId2, 300L)
|
||||
val inputInfo1 = StreamInputInfo(streamId1, 100L)
|
||||
val inputInfo2 = StreamInputInfo(streamId2, 300L)
|
||||
inputInfoTracker.reportInfo(time, inputInfo1)
|
||||
inputInfoTracker.reportInfo(time, inputInfo2)
|
||||
|
||||
|
@ -63,8 +63,8 @@ class InputInfoTrackerSuite extends SparkFunSuite with BeforeAndAfter {
|
|||
val inputInfoTracker = new InputInfoTracker(ssc)
|
||||
|
||||
val streamId1 = 0
|
||||
val inputInfo1 = InputInfo(streamId1, 100L)
|
||||
val inputInfo2 = InputInfo(streamId1, 300L)
|
||||
val inputInfo1 = StreamInputInfo(streamId1, 100L)
|
||||
val inputInfo2 = StreamInputInfo(streamId1, 300L)
|
||||
inputInfoTracker.reportInfo(Time(0), inputInfo1)
|
||||
inputInfoTracker.reportInfo(Time(1), inputInfo2)
|
||||
|
||||
|
|
|
@ -49,10 +49,12 @@ class StreamingJobProgressListenerSuite extends TestSuiteBase with Matchers {
|
|||
val ssc = setupStreams(input, operation)
|
||||
val listener = new StreamingJobProgressListener(ssc)
|
||||
|
||||
val streamIdToNumRecords = Map(0 -> 300L, 1 -> 300L)
|
||||
val streamIdToInputInfo = Map(
|
||||
0 -> StreamInputInfo(0, 300L),
|
||||
1 -> StreamInputInfo(1, 300L, Map(StreamInputInfo.METADATA_KEY_DESCRIPTION -> "test")))
|
||||
|
||||
// onBatchSubmitted
|
||||
val batchInfoSubmitted = BatchInfo(Time(1000), streamIdToNumRecords, 1000, None, None)
|
||||
val batchInfoSubmitted = BatchInfo(Time(1000), streamIdToInputInfo, 1000, None, None)
|
||||
listener.onBatchSubmitted(StreamingListenerBatchSubmitted(batchInfoSubmitted))
|
||||
listener.waitingBatches should be (List(BatchUIData(batchInfoSubmitted)))
|
||||
listener.runningBatches should be (Nil)
|
||||
|
@ -64,7 +66,7 @@ class StreamingJobProgressListenerSuite extends TestSuiteBase with Matchers {
|
|||
listener.numTotalReceivedRecords should be (0)
|
||||
|
||||
// onBatchStarted
|
||||
val batchInfoStarted = BatchInfo(Time(1000), streamIdToNumRecords, 1000, Some(2000), None)
|
||||
val batchInfoStarted = BatchInfo(Time(1000), streamIdToInputInfo, 1000, Some(2000), None)
|
||||
listener.onBatchStarted(StreamingListenerBatchStarted(batchInfoStarted))
|
||||
listener.waitingBatches should be (Nil)
|
||||
listener.runningBatches should be (List(BatchUIData(batchInfoStarted)))
|
||||
|
@ -94,7 +96,9 @@ class StreamingJobProgressListenerSuite extends TestSuiteBase with Matchers {
|
|||
batchUIData.get.schedulingDelay should be (batchInfoStarted.schedulingDelay)
|
||||
batchUIData.get.processingDelay should be (batchInfoStarted.processingDelay)
|
||||
batchUIData.get.totalDelay should be (batchInfoStarted.totalDelay)
|
||||
batchUIData.get.streamIdToNumRecords should be (Map(0 -> 300L, 1 -> 300L))
|
||||
batchUIData.get.streamIdToInputInfo should be (Map(
|
||||
0 -> StreamInputInfo(0, 300L),
|
||||
1 -> StreamInputInfo(1, 300L, Map(StreamInputInfo.METADATA_KEY_DESCRIPTION -> "test"))))
|
||||
batchUIData.get.numRecords should be(600)
|
||||
batchUIData.get.outputOpIdSparkJobIdPairs should be
|
||||
Seq(OutputOpIdAndSparkJobId(0, 0),
|
||||
|
@ -103,7 +107,7 @@ class StreamingJobProgressListenerSuite extends TestSuiteBase with Matchers {
|
|||
OutputOpIdAndSparkJobId(1, 1))
|
||||
|
||||
// onBatchCompleted
|
||||
val batchInfoCompleted = BatchInfo(Time(1000), streamIdToNumRecords, 1000, Some(2000), None)
|
||||
val batchInfoCompleted = BatchInfo(Time(1000), streamIdToInputInfo, 1000, Some(2000), None)
|
||||
listener.onBatchCompleted(StreamingListenerBatchCompleted(batchInfoCompleted))
|
||||
listener.waitingBatches should be (Nil)
|
||||
listener.runningBatches should be (Nil)
|
||||
|
@ -141,9 +145,9 @@ class StreamingJobProgressListenerSuite extends TestSuiteBase with Matchers {
|
|||
val limit = ssc.conf.getInt("spark.streaming.ui.retainedBatches", 1000)
|
||||
val listener = new StreamingJobProgressListener(ssc)
|
||||
|
||||
val streamIdToNumRecords = Map(0 -> 300L, 1 -> 300L)
|
||||
val streamIdToInputInfo = Map(0 -> StreamInputInfo(0, 300L), 1 -> StreamInputInfo(1, 300L))
|
||||
|
||||
val batchInfoCompleted = BatchInfo(Time(1000), streamIdToNumRecords, 1000, Some(2000), None)
|
||||
val batchInfoCompleted = BatchInfo(Time(1000), streamIdToInputInfo, 1000, Some(2000), None)
|
||||
|
||||
for(_ <- 0 until (limit + 10)) {
|
||||
listener.onBatchCompleted(StreamingListenerBatchCompleted(batchInfoCompleted))
|
||||
|
@ -182,7 +186,7 @@ class StreamingJobProgressListenerSuite extends TestSuiteBase with Matchers {
|
|||
batchUIData.get.schedulingDelay should be (batchInfoSubmitted.schedulingDelay)
|
||||
batchUIData.get.processingDelay should be (batchInfoSubmitted.processingDelay)
|
||||
batchUIData.get.totalDelay should be (batchInfoSubmitted.totalDelay)
|
||||
batchUIData.get.streamIdToNumRecords should be (Map.empty)
|
||||
batchUIData.get.streamIdToInputInfo should be (Map.empty)
|
||||
batchUIData.get.numRecords should be (0)
|
||||
batchUIData.get.outputOpIdSparkJobIdPairs should be (Seq(OutputOpIdAndSparkJobId(0, 0)))
|
||||
|
||||
|
@ -211,14 +215,14 @@ class StreamingJobProgressListenerSuite extends TestSuiteBase with Matchers {
|
|||
val limit = ssc.conf.getInt("spark.streaming.ui.retainedBatches", 1000)
|
||||
|
||||
for (_ <- 0 until 2 * limit) {
|
||||
val streamIdToNumRecords = Map(0 -> 300L, 1 -> 300L)
|
||||
val streamIdToInputInfo = Map(0 -> StreamInputInfo(0, 300L), 1 -> StreamInputInfo(1, 300L))
|
||||
|
||||
// onBatchSubmitted
|
||||
val batchInfoSubmitted = BatchInfo(Time(1000), streamIdToNumRecords, 1000, None, None)
|
||||
val batchInfoSubmitted = BatchInfo(Time(1000), streamIdToInputInfo, 1000, None, None)
|
||||
listener.onBatchSubmitted(StreamingListenerBatchSubmitted(batchInfoSubmitted))
|
||||
|
||||
// onBatchStarted
|
||||
val batchInfoStarted = BatchInfo(Time(1000), streamIdToNumRecords, 1000, Some(2000), None)
|
||||
val batchInfoStarted = BatchInfo(Time(1000), streamIdToInputInfo, 1000, Some(2000), None)
|
||||
listener.onBatchStarted(StreamingListenerBatchStarted(batchInfoStarted))
|
||||
|
||||
// onJobStart
|
||||
|
@ -235,7 +239,7 @@ class StreamingJobProgressListenerSuite extends TestSuiteBase with Matchers {
|
|||
listener.onJobStart(jobStart4)
|
||||
|
||||
// onBatchCompleted
|
||||
val batchInfoCompleted = BatchInfo(Time(1000), streamIdToNumRecords, 1000, Some(2000), None)
|
||||
val batchInfoCompleted = BatchInfo(Time(1000), streamIdToInputInfo, 1000, Some(2000), None)
|
||||
listener.onBatchCompleted(StreamingListenerBatchCompleted(batchInfoCompleted))
|
||||
}
|
||||
|
||||
|
|
Loading…
Reference in a new issue