[SPARK-8112] [STREAMING] Fix the negative event count issue

Author: zsxwing <zsxwing@gmail.com>

Closes #6659 from zsxwing/SPARK-8112 and squashes the following commits:

a5d7da6 [zsxwing] Address comments
d255b6e [zsxwing] Fix the negative event count issue

(cherry picked from commit 4f16d3fe2e)
Signed-off-by: Tathagata Das <tathagata.das1565@gmail.com>
This commit is contained in:
zsxwing 2015-06-05 12:46:02 -07:00 committed by Tathagata Das
parent 429c658519
commit 200c980a13
5 changed files with 10 additions and 6 deletions

View file

@ -70,7 +70,7 @@ abstract class ReceiverInputDStream[T: ClassTag](@transient ssc_ : StreamingCont
val blockIds = blockInfos.map { _.blockId.asInstanceOf[BlockId] }.toArray
// Register the input blocks information into InputInfoTracker
val inputInfo = InputInfo(id, blockInfos.map(_.numRecords).sum)
val inputInfo = InputInfo(id, blockInfos.flatMap(_.numRecords).sum)
ssc.scheduler.inputInfoTracker.reportInfo(validTime, inputInfo)
if (blockInfos.nonEmpty) {

View file

@ -138,8 +138,8 @@ private[streaming] class ReceiverSupervisorImpl(
) {
val blockId = blockIdOption.getOrElse(nextBlockId)
val numRecords = receivedBlock match {
case ArrayBufferBlock(arrayBuffer) => arrayBuffer.size
case _ => -1
case ArrayBufferBlock(arrayBuffer) => Some(arrayBuffer.size.toLong)
case _ => None
}
val time = System.currentTimeMillis

View file

@ -23,7 +23,9 @@ import org.apache.spark.Logging
import org.apache.spark.streaming.{Time, StreamingContext}
/** To track the information of input stream at specified batch time. */
private[streaming] case class InputInfo(inputStreamId: Int, numRecords: Long)
private[streaming] case class InputInfo(inputStreamId: Int, numRecords: Long) {
require(numRecords >= 0, "numRecords must not be negative")
}
/**
* This class manages all the input streams as well as their input data statistics. The information

View file

@ -24,11 +24,13 @@ import org.apache.spark.streaming.util.WriteAheadLogRecordHandle
/** Information about blocks received by the receiver */
private[streaming] case class ReceivedBlockInfo(
streamId: Int,
numRecords: Long,
numRecords: Option[Long],
metadataOption: Option[Any],
blockStoreResult: ReceivedBlockStoreResult
) {
require(numRecords.isEmpty || numRecords.get >= 0, "numRecords must not be negative")
@volatile private var _isBlockIdValid = true
def blockId: StreamBlockId = blockStoreResult.blockId

View file

@ -224,7 +224,7 @@ class ReceivedBlockTrackerSuite
/** Generate blocks infos using random ids */
def generateBlockInfos(): Seq[ReceivedBlockInfo] = {
List.fill(5)(ReceivedBlockInfo(streamId, 0, None,
List.fill(5)(ReceivedBlockInfo(streamId, Some(0L), None,
BlockManagerBasedStoreResult(StreamBlockId(streamId, math.abs(Random.nextInt)))))
}