[SPARK-25218][CORE] Fix potential resource leaks in TransportServer and SocketAuthHelper

## What changes were proposed in this pull request?

Make sure TransportServer and SocketAuthHelper close the resources for all types of errors.

## How was this patch tested?

Jenkins

Closes #22210 from zsxwing/SPARK-25218.

Authored-by: Shixiong Zhu <zsxwing@gmail.com>
Signed-off-by: Shixiong Zhu <zsxwing@gmail.com>
This commit is contained in:
Shixiong Zhu 2018-08-28 08:36:06 -07:00
parent 8198ea5019
commit 592e3a42c2
No known key found for this signature in database
GPG key ID: ECB1BBEA55295E39
3 changed files with 54 additions and 37 deletions

View file

@ -77,16 +77,16 @@ public final class FileSegmentManagedBuffer extends ManagedBuffer {
return channel.map(FileChannel.MapMode.READ_ONLY, offset, length);
}
} catch (IOException e) {
String errorMessage = "Error in reading " + this;
try {
if (channel != null) {
long size = channel.size();
throw new IOException("Error in reading " + this + " (actual file length " + size + ")",
e);
errorMessage = "Error in reading " + this + " (actual file length " + size + ")";
}
} catch (IOException ignored) {
// ignore
}
throw new IOException("Error in opening " + this, e);
throw new IOException(errorMessage, e);
} finally {
JavaUtils.closeQuietly(channel);
}
@ -95,26 +95,24 @@ public final class FileSegmentManagedBuffer extends ManagedBuffer {
@Override
public InputStream createInputStream() throws IOException {
FileInputStream is = null;
boolean shouldClose = true;
try {
is = new FileInputStream(file);
ByteStreams.skipFully(is, offset);
return new LimitedInputStream(is, length);
InputStream r = new LimitedInputStream(is, length);
shouldClose = false;
return r;
} catch (IOException e) {
try {
if (is != null) {
long size = file.length();
throw new IOException("Error in reading " + this + " (actual file length " + size + ")",
e);
}
} catch (IOException ignored) {
// ignore
} finally {
String errorMessage = "Error in reading " + this;
if (is != null) {
long size = file.length();
errorMessage = "Error in reading " + this + " (actual file length " + size + ")";
}
throw new IOException(errorMessage, e);
} finally {
if (shouldClose) {
JavaUtils.closeQuietly(is);
}
throw new IOException("Error in opening " + this, e);
} catch (RuntimeException e) {
JavaUtils.closeQuietly(is);
throw e;
}
}

View file

@ -70,11 +70,14 @@ public class TransportServer implements Closeable {
this.appRpcHandler = appRpcHandler;
this.bootstraps = Lists.newArrayList(Preconditions.checkNotNull(bootstraps));
boolean shouldClose = true;
try {
init(hostToBind, portToBind);
} catch (RuntimeException e) {
JavaUtils.closeQuietly(this);
throw e;
shouldClose = false;
} finally {
if (shouldClose) {
JavaUtils.closeQuietly(this);
}
}
}

View file

@ -42,43 +42,59 @@ private[spark] class SocketAuthHelper(conf: SparkConf) {
* Read the auth secret from the socket and compare to the expected value. Write the reply back
* to the socket.
*
* If authentication fails, this method will close the socket.
* If authentication fails or error is thrown, this method will close the socket.
*
* @param s The client socket.
* @throws IllegalArgumentException If authentication fails.
*/
def authClient(s: Socket): Unit = {
// Set the socket timeout while checking the auth secret. Reset it before returning.
val currentTimeout = s.getSoTimeout()
var shouldClose = true
try {
s.setSoTimeout(10000)
val clientSecret = readUtf8(s)
if (secret == clientSecret) {
writeUtf8("ok", s)
} else {
writeUtf8("err", s)
JavaUtils.closeQuietly(s)
// Set the socket timeout while checking the auth secret. Reset it before returning.
val currentTimeout = s.getSoTimeout()
try {
s.setSoTimeout(10000)
val clientSecret = readUtf8(s)
if (secret == clientSecret) {
writeUtf8("ok", s)
shouldClose = false
} else {
writeUtf8("err", s)
throw new IllegalArgumentException("Authentication failed.")
}
} finally {
s.setSoTimeout(currentTimeout)
}
} finally {
s.setSoTimeout(currentTimeout)
if (shouldClose) {
JavaUtils.closeQuietly(s)
}
}
}
/**
* Authenticate with a server by writing the auth secret and checking the server's reply.
*
* If authentication fails, this method will close the socket.
* If authentication fails or error is thrown, this method will close the socket.
*
* @param s The socket connected to the server.
* @throws IllegalArgumentException If authentication fails.
*/
def authToServer(s: Socket): Unit = {
writeUtf8(secret, s)
var shouldClose = true
try {
writeUtf8(secret, s)
val reply = readUtf8(s)
if (reply != "ok") {
JavaUtils.closeQuietly(s)
throw new IllegalArgumentException("Authentication failed.")
val reply = readUtf8(s)
if (reply != "ok") {
throw new IllegalArgumentException("Authentication failed.")
} else {
shouldClose = false
}
} finally {
if (shouldClose) {
JavaUtils.closeQuietly(s)
}
}
}