Bug fix: splitsInRequestBitVector(splitIndex) was wrongly set to false after receiving just one block in Blocked implementations that receive multiple blocks at a time.
This commit is contained in:
parent
07e778d7fa
commit
7eb334d97c
|
@ -361,11 +361,6 @@ extends Shuffle[K, V, C] with Logging {
|
|||
hasSplits += 1
|
||||
}
|
||||
|
||||
// We have received splitIndex
|
||||
splitsInRequestBitVector.synchronized {
|
||||
splitsInRequestBitVector.set(splitIndex, false)
|
||||
}
|
||||
|
||||
// Consistent state in accounting variables
|
||||
receptionSucceeded = true
|
||||
|
||||
|
@ -384,11 +379,8 @@ extends Shuffle[K, V, C] with Logging {
|
|||
logInfo("ShuffleClient had a " + e)
|
||||
}
|
||||
} finally {
|
||||
// If reception failed, unset for future retry
|
||||
if (!receptionSucceeded) {
|
||||
splitsInRequestBitVector.synchronized {
|
||||
splitsInRequestBitVector.set(splitIndex, false)
|
||||
}
|
||||
splitsInRequestBitVector.synchronized {
|
||||
splitsInRequestBitVector.set(splitIndex, false)
|
||||
}
|
||||
cleanUpConnections()
|
||||
}
|
||||
|
|
|
@ -348,11 +348,6 @@ extends Shuffle[K, V, C] with Logging {
|
|||
hasSplits += 1
|
||||
}
|
||||
|
||||
// We have received splitIndex
|
||||
splitsInRequestBitVector.synchronized {
|
||||
splitsInRequestBitVector.set(splitIndex, false)
|
||||
}
|
||||
|
||||
// Consistent state in accounting variables
|
||||
receptionSucceeded = true
|
||||
|
||||
|
@ -371,11 +366,8 @@ extends Shuffle[K, V, C] with Logging {
|
|||
logInfo("ShuffleClient had a " + e)
|
||||
}
|
||||
} finally {
|
||||
// If reception failed, unset for future retry
|
||||
if (!receptionSucceeded) {
|
||||
splitsInRequestBitVector.synchronized {
|
||||
splitsInRequestBitVector.set(splitIndex, false)
|
||||
}
|
||||
splitsInRequestBitVector.synchronized {
|
||||
splitsInRequestBitVector.set(splitIndex, false)
|
||||
}
|
||||
cleanUpConnections()
|
||||
}
|
||||
|
|
|
@ -548,11 +548,6 @@ extends Shuffle[K, V, C] with Logging {
|
|||
hasSplits += 1
|
||||
}
|
||||
|
||||
// We have received splitIndex
|
||||
splitsInRequestBitVector.synchronized {
|
||||
splitsInRequestBitVector.set(splitIndex, false)
|
||||
}
|
||||
|
||||
receptionSucceeded = true
|
||||
|
||||
logInfo("END READ: " + requestPath)
|
||||
|
@ -574,11 +569,8 @@ extends Shuffle[K, V, C] with Logging {
|
|||
logInfo("ShuffleClient had a " + e)
|
||||
}
|
||||
} finally {
|
||||
// If reception failed, unset for future retry
|
||||
if (!receptionSucceeded) {
|
||||
splitsInRequestBitVector.synchronized {
|
||||
splitsInRequestBitVector.set(splitIndex, false)
|
||||
}
|
||||
splitsInRequestBitVector.synchronized {
|
||||
splitsInRequestBitVector.set(splitIndex, false)
|
||||
}
|
||||
cleanUp()
|
||||
}
|
||||
|
|
Loading…
Reference in a new issue