From 200c980a13ce2c090df7b51925da1027cbe1bc7f Mon Sep 17 00:00:00 2001 From: zsxwing Date: Fri, 5 Jun 2015 12:46:02 -0700 Subject: [PATCH] [SPARK-8112] [STREAMING] Fix the negative event count issue Author: zsxwing Closes #6659 from zsxwing/SPARK-8112 and squashes the following commits: a5d7da6 [zsxwing] Address comments d255b6e [zsxwing] Fix the negative event count issue (cherry picked from commit 4f16d3fe2e260a716b5b4e4005cb6229386440ed) Signed-off-by: Tathagata Das --- .../apache/spark/streaming/dstream/ReceiverInputDStream.scala | 2 +- .../spark/streaming/receiver/ReceiverSupervisorImpl.scala | 4 ++-- .../apache/spark/streaming/scheduler/InputInfoTracker.scala | 4 +++- .../apache/spark/streaming/scheduler/ReceivedBlockInfo.scala | 4 +++- .../apache/spark/streaming/ReceivedBlockTrackerSuite.scala | 2 +- 5 files changed, 10 insertions(+), 6 deletions(-) diff --git a/streaming/src/main/scala/org/apache/spark/streaming/dstream/ReceiverInputDStream.scala b/streaming/src/main/scala/org/apache/spark/streaming/dstream/ReceiverInputDStream.scala index e4ff05e12f..e76e7eb0de 100644 --- a/streaming/src/main/scala/org/apache/spark/streaming/dstream/ReceiverInputDStream.scala +++ b/streaming/src/main/scala/org/apache/spark/streaming/dstream/ReceiverInputDStream.scala @@ -70,7 +70,7 @@ abstract class ReceiverInputDStream[T: ClassTag](@transient ssc_ : StreamingCont val blockIds = blockInfos.map { _.blockId.asInstanceOf[BlockId] }.toArray // Register the input blocks information into InputInfoTracker - val inputInfo = InputInfo(id, blockInfos.map(_.numRecords).sum) + val inputInfo = InputInfo(id, blockInfos.flatMap(_.numRecords).sum) ssc.scheduler.inputInfoTracker.reportInfo(validTime, inputInfo) if (blockInfos.nonEmpty) { diff --git a/streaming/src/main/scala/org/apache/spark/streaming/receiver/ReceiverSupervisorImpl.scala b/streaming/src/main/scala/org/apache/spark/streaming/receiver/ReceiverSupervisorImpl.scala index 92938379b9..8be732b64e 100644 --- a/streaming/src/main/scala/org/apache/spark/streaming/receiver/ReceiverSupervisorImpl.scala +++ b/streaming/src/main/scala/org/apache/spark/streaming/receiver/ReceiverSupervisorImpl.scala @@ -138,8 +138,8 @@ private[streaming] class ReceiverSupervisorImpl( ) { val blockId = blockIdOption.getOrElse(nextBlockId) val numRecords = receivedBlock match { - case ArrayBufferBlock(arrayBuffer) => arrayBuffer.size - case _ => -1 + case ArrayBufferBlock(arrayBuffer) => Some(arrayBuffer.size.toLong) + case _ => None } val time = System.currentTimeMillis diff --git a/streaming/src/main/scala/org/apache/spark/streaming/scheduler/InputInfoTracker.scala b/streaming/src/main/scala/org/apache/spark/streaming/scheduler/InputInfoTracker.scala index a72efccf2f..7c0db8a863 100644 --- a/streaming/src/main/scala/org/apache/spark/streaming/scheduler/InputInfoTracker.scala +++ b/streaming/src/main/scala/org/apache/spark/streaming/scheduler/InputInfoTracker.scala @@ -23,7 +23,9 @@ import org.apache.spark.Logging import org.apache.spark.streaming.{Time, StreamingContext} /** To track the information of input stream at specified batch time. */ -private[streaming] case class InputInfo(inputStreamId: Int, numRecords: Long) +private[streaming] case class InputInfo(inputStreamId: Int, numRecords: Long) { + require(numRecords >= 0, "numRecords must not be negative") +} /** * This class manages all the input streams as well as their input data statistics. The information diff --git a/streaming/src/main/scala/org/apache/spark/streaming/scheduler/ReceivedBlockInfo.scala b/streaming/src/main/scala/org/apache/spark/streaming/scheduler/ReceivedBlockInfo.scala index dc11e84f29..656ac80df8 100644 --- a/streaming/src/main/scala/org/apache/spark/streaming/scheduler/ReceivedBlockInfo.scala +++ b/streaming/src/main/scala/org/apache/spark/streaming/scheduler/ReceivedBlockInfo.scala @@ -24,11 +24,13 @@ import org.apache.spark.streaming.util.WriteAheadLogRecordHandle /** Information about blocks received by the receiver */ private[streaming] case class ReceivedBlockInfo( streamId: Int, - numRecords: Long, + numRecords: Option[Long], metadataOption: Option[Any], blockStoreResult: ReceivedBlockStoreResult ) { + require(numRecords.isEmpty || numRecords.get >= 0, "numRecords must not be negative") + @volatile private var _isBlockIdValid = true def blockId: StreamBlockId = blockStoreResult.blockId diff --git a/streaming/src/test/scala/org/apache/spark/streaming/ReceivedBlockTrackerSuite.scala b/streaming/src/test/scala/org/apache/spark/streaming/ReceivedBlockTrackerSuite.scala index 6f0ee774cb..be305b5e0d 100644 --- a/streaming/src/test/scala/org/apache/spark/streaming/ReceivedBlockTrackerSuite.scala +++ b/streaming/src/test/scala/org/apache/spark/streaming/ReceivedBlockTrackerSuite.scala @@ -224,7 +224,7 @@ class ReceivedBlockTrackerSuite /** Generate blocks infos using random ids */ def generateBlockInfos(): Seq[ReceivedBlockInfo] = { - List.fill(5)(ReceivedBlockInfo(streamId, 0, None, + List.fill(5)(ReceivedBlockInfo(streamId, Some(0L), None, BlockManagerBasedStoreResult(StreamBlockId(streamId, math.abs(Random.nextInt))))) }