[SPARK-13195][STREAMING] Fix NoSuchElementException when a state is not set but timeoutThreshold is defined
Check the state Existence before calling get. Author: Shixiong Zhu <shixiong@databricks.com> Closes #11081 from zsxwing/SPARK-13195.
This commit is contained in:
parent
bd38dd6f75
commit
8e2f296306
|
@ -57,7 +57,8 @@ private[streaming] object MapWithStateRDDRecord {
|
|||
val returned = mappingFunction(batchTime, key, Some(value), wrappedState)
|
||||
if (wrappedState.isRemoved) {
|
||||
newStateMap.remove(key)
|
||||
} else if (wrappedState.isUpdated || timeoutThresholdTime.isDefined) {
|
||||
} else if (wrappedState.isUpdated
|
||||
|| (wrappedState.exists && timeoutThresholdTime.isDefined)) {
|
||||
newStateMap.put(key, wrappedState.get(), batchTime.milliseconds)
|
||||
}
|
||||
mappedData ++= returned
|
||||
|
|
|
@ -190,6 +190,11 @@ class MapWithStateRDDSuite extends SparkFunSuite with RDDCheckpointTester with B
|
|||
timeoutThreshold = Some(initialTime + 1), removeTimedoutData = true,
|
||||
expectedStates = Nil, expectedTimingOutStates = Nil, expectedRemovedStates = Seq(123))
|
||||
|
||||
// If a state is not set but timeoutThreshold is defined, we should ignore this state.
|
||||
// Previously it threw NoSuchElementException (SPARK-13195).
|
||||
assertRecordUpdate(initStates = Seq(), data = Seq("noop"),
|
||||
timeoutThreshold = Some(initialTime + 1), removeTimedoutData = true,
|
||||
expectedStates = Nil, expectedTimingOutStates = Nil)
|
||||
}
|
||||
|
||||
test("states generated by MapWithStateRDD") {
|
||||
|
|
Loading…
Reference in a new issue