[SPARK-30246][CORE] OneForOneStreamManager might leak memory in connectionTerminated

### What changes were proposed in this pull request?

Ensure that all StreamStates are removed from OneForOneStreamManager memory map even if there's an error trying to release buffers

### Why are the changes needed?

OneForOneStreamManager may not remove all StreamStates from memory map when a connection is terminated. A RuntimeException might be thrown in StreamState$buffers.next() by one of ExternalShuffleBlockResolver$getBlockData... **breaking the loop through streams.entrySet(), keeping StreamStates in memory forever leaking memory.**
That may happen when an application is terminated abruptly and executors removed before the connection is terminated or if shuffleIndexCache fails to get ShuffleIndexInformation

References:
ee050ddbc6/common/network-shuffle/src/main/java/org/apache/spark/network/shuffle/ExternalBlockHandler.java (L319)

ee050ddbc6/common/network-shuffle/src/main/java/org/apache/spark/network/shuffle/ExternalBlockHandler.java (L357)

ee050ddbc6/common/network-shuffle/src/main/java/org/apache/spark/network/shuffle/ExternalShuffleBlockResolver.java (L195)

ee050ddbc6/common/network-shuffle/src/main/java/org/apache/spark/network/shuffle/ExternalShuffleBlockResolver.java (L208)

ee050ddbc6/common/network-shuffle/src/main/java/org/apache/spark/network/shuffle/ExternalShuffleBlockResolver.java (L330)

### Does this PR introduce any user-facing change?
No

### How was this patch tested?
Unit test added

Closes #27064 from hensg/SPARK-30246.

Lead-authored-by: Henrique Goulart <henriquedsg89@gmail.com>
Co-authored-by: Henrique Goulart <henrique.goulart@trivago.com>
Signed-off-by: Marcelo Vanzin <vanzin@cloudera.com>
This commit is contained in:
Henrique Goulart 2020-01-15 13:27:15 -08:00 committed by Marcelo Vanzin
parent 6c178a5d16
commit d42cf4566a
2 changed files with 58 additions and 5 deletions

View file

@ -117,12 +117,15 @@ public class OneForOneStreamManager extends StreamManager {
@Override
public void connectionTerminated(Channel channel) {
RuntimeException failedToReleaseBufferException = null;
// Close all streams which have been associated with the channel.
for (Map.Entry<Long, StreamState> entry: streams.entrySet()) {
StreamState state = entry.getValue();
if (state.associatedChannel == channel) {
streams.remove(entry.getKey());
try {
// Release all remaining buffers.
while (state.buffers.hasNext()) {
ManagedBuffer buffer = state.buffers.next();
@ -130,9 +133,20 @@ public class OneForOneStreamManager extends StreamManager {
buffer.release();
}
}
} catch (RuntimeException e) {
if (failedToReleaseBufferException == null) {
failedToReleaseBufferException = e;
} else {
logger.error("Exception trying to release remaining StreamState buffers", e);
}
}
}
}
if (failedToReleaseBufferException != null) {
throw failedToReleaseBufferException;
}
}
@Override
public void checkAuthorization(TransportClient client, long streamId) {

View file

@ -18,6 +18,7 @@
package org.apache.spark.network.server;
import java.util.ArrayList;
import java.util.Iterator;
import java.util.List;
import io.netty.channel.Channel;
@ -96,4 +97,42 @@ public class OneForOneStreamManagerSuite {
Mockito.verify(buffer2, Mockito.times(1)).release();
Assert.assertEquals(0, manager.numStreamStates());
}
@Test
public void streamStatesAreFreedWhenConnectionIsClosedEvenIfBufferIteratorThrowsException() {
OneForOneStreamManager manager = new OneForOneStreamManager();
Iterator<ManagedBuffer> buffers = Mockito.mock(Iterator.class);
Mockito.when(buffers.hasNext()).thenReturn(true);
Mockito.when(buffers.next()).thenThrow(RuntimeException.class);
ManagedBuffer mockManagedBuffer = Mockito.mock(ManagedBuffer.class);
Iterator<ManagedBuffer> buffers2 = Mockito.mock(Iterator.class);
Mockito.when(buffers2.hasNext()).thenReturn(true).thenReturn(true);
Mockito.when(buffers2.next()).thenReturn(mockManagedBuffer).thenThrow(RuntimeException.class);
Channel dummyChannel = Mockito.mock(Channel.class, Mockito.RETURNS_SMART_NULLS);
manager.registerStream("appId", buffers, dummyChannel);
manager.registerStream("appId", buffers2, dummyChannel);
Assert.assertEquals(2, manager.numStreamStates());
try {
manager.connectionTerminated(dummyChannel);
Assert.fail("connectionTerminated should throw exception when fails to release all buffers");
} catch (RuntimeException e) {
Mockito.verify(buffers, Mockito.times(1)).hasNext();
Mockito.verify(buffers, Mockito.times(1)).next();
Mockito.verify(buffers2, Mockito.times(2)).hasNext();
Mockito.verify(buffers2, Mockito.times(2)).next();
Mockito.verify(mockManagedBuffer, Mockito.times(1)).release();
Assert.assertEquals(0, manager.numStreamStates());
}
}
}