diff --git a/src/scala/spark/CustomBlockedInMemoryShuffle.scala b/src/scala/spark/CustomBlockedInMemoryShuffle.scala index 483d79a00a..aae940dc8e 100644 --- a/src/scala/spark/CustomBlockedInMemoryShuffle.scala +++ b/src/scala/spark/CustomBlockedInMemoryShuffle.scala @@ -361,11 +361,6 @@ extends Shuffle[K, V, C] with Logging { hasSplits += 1 } - // We have received splitIndex - splitsInRequestBitVector.synchronized { - splitsInRequestBitVector.set(splitIndex, false) - } - // Consistent state in accounting variables receptionSucceeded = true @@ -384,11 +379,8 @@ extends Shuffle[K, V, C] with Logging { logInfo("ShuffleClient had a " + e) } } finally { - // If reception failed, unset for future retry - if (!receptionSucceeded) { - splitsInRequestBitVector.synchronized { - splitsInRequestBitVector.set(splitIndex, false) - } + splitsInRequestBitVector.synchronized { + splitsInRequestBitVector.set(splitIndex, false) } cleanUpConnections() } diff --git a/src/scala/spark/CustomBlockedLocalFileShuffle.scala b/src/scala/spark/CustomBlockedLocalFileShuffle.scala index 7f03d4a525..148a0dc306 100644 --- a/src/scala/spark/CustomBlockedLocalFileShuffle.scala +++ b/src/scala/spark/CustomBlockedLocalFileShuffle.scala @@ -348,11 +348,6 @@ extends Shuffle[K, V, C] with Logging { hasSplits += 1 } - // We have received splitIndex - splitsInRequestBitVector.synchronized { - splitsInRequestBitVector.set(splitIndex, false) - } - // Consistent state in accounting variables receptionSucceeded = true @@ -371,11 +366,8 @@ extends Shuffle[K, V, C] with Logging { logInfo("ShuffleClient had a " + e) } } finally { - // If reception failed, unset for future retry - if (!receptionSucceeded) { - splitsInRequestBitVector.synchronized { - splitsInRequestBitVector.set(splitIndex, false) - } + splitsInRequestBitVector.synchronized { + splitsInRequestBitVector.set(splitIndex, false) } cleanUpConnections() } diff --git a/src/scala/spark/TrackedCustomBlockedLocalFileShuffle.scala b/src/scala/spark/TrackedCustomBlockedLocalFileShuffle.scala index ae15b61a47..39a7df7f6a 100644 --- a/src/scala/spark/TrackedCustomBlockedLocalFileShuffle.scala +++ b/src/scala/spark/TrackedCustomBlockedLocalFileShuffle.scala @@ -548,11 +548,6 @@ extends Shuffle[K, V, C] with Logging { hasSplits += 1 } - // We have received splitIndex - splitsInRequestBitVector.synchronized { - splitsInRequestBitVector.set(splitIndex, false) - } - receptionSucceeded = true logInfo("END READ: " + requestPath) @@ -574,11 +569,8 @@ extends Shuffle[K, V, C] with Logging { logInfo("ShuffleClient had a " + e) } } finally { - // If reception failed, unset for future retry - if (!receptionSucceeded) { - splitsInRequestBitVector.synchronized { - splitsInRequestBitVector.set(splitIndex, false) - } + splitsInRequestBitVector.synchronized { + splitsInRequestBitVector.set(splitIndex, false) } cleanUp() }