[SPARK-17740] Spark tests should mock / interpose HDFS to ensure that streams are closed

## What changes were proposed in this pull request?

As a followup to SPARK-17666, ensure filesystem connections are not leaked at least in unit tests. This is done here by intercepting filesystem calls as suggested by JoshRosen . At the end of each test, we assert no filesystem streams are left open.

This applies to all tests using SharedSQLContext or SharedSparkContext.

## How was this patch tested?

I verified that tests in sql and core are indeed using the filesystem backend, and fixed the detected leaks. I also checked that reverting https://github.com/apache/spark/pull/15245 causes many actual test failures due to connection leaks.

Author: Eric Liang <ekl@databricks.com>
Author: Eric Liang <ekhliang@gmail.com>

Closes #15306 from ericl/sc-4672.
This commit is contained in:
Eric Liang 2016-09-30 23:51:36 -07:00 committed by Reynold Xin
parent 15e9bbb49e
commit 4bcd9b728b
5 changed files with 147 additions and 7 deletions

View file

@ -0,0 +1,114 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.spark
import java.io.{FileDescriptor, InputStream}
import java.lang
import java.nio.ByteBuffer
import java.util.concurrent.ConcurrentHashMap
import scala.collection.JavaConverters._
import scala.collection.mutable
import org.apache.hadoop.fs._
import org.apache.spark.internal.Logging
object DebugFilesystem extends Logging {
// Stores the set of active streams and their creation sites.
private val openStreams = new ConcurrentHashMap[FSDataInputStream, Throwable]()
def clearOpenStreams(): Unit = {
openStreams.clear()
}
def assertNoOpenStreams(): Unit = {
val numOpen = openStreams.size()
if (numOpen > 0) {
for (exc <- openStreams.values().asScala) {
logWarning("Leaked filesystem connection created at:")
exc.printStackTrace()
}
throw new RuntimeException(s"There are $numOpen possibly leaked file streams.")
}
}
}
/**
* DebugFilesystem wraps file open calls to track all open connections. This can be used in tests
* to check that connections are not leaked.
*/
// TODO(ekl) we should consider always interposing this to expose num open conns as a metric
class DebugFilesystem extends LocalFileSystem {
import DebugFilesystem._
override def open(f: Path, bufferSize: Int): FSDataInputStream = {
val wrapped: FSDataInputStream = super.open(f, bufferSize)
openStreams.put(wrapped, new Throwable())
new FSDataInputStream(wrapped.getWrappedStream) {
override def setDropBehind(dropBehind: lang.Boolean): Unit = wrapped.setDropBehind(dropBehind)
override def getWrappedStream: InputStream = wrapped.getWrappedStream
override def getFileDescriptor: FileDescriptor = wrapped.getFileDescriptor
override def getPos: Long = wrapped.getPos
override def seekToNewSource(targetPos: Long): Boolean = wrapped.seekToNewSource(targetPos)
override def seek(desired: Long): Unit = wrapped.seek(desired)
override def setReadahead(readahead: lang.Long): Unit = wrapped.setReadahead(readahead)
override def read(position: Long, buffer: Array[Byte], offset: Int, length: Int): Int =
wrapped.read(position, buffer, offset, length)
override def read(buf: ByteBuffer): Int = wrapped.read(buf)
override def readFully(position: Long, buffer: Array[Byte], offset: Int, length: Int): Unit =
wrapped.readFully(position, buffer, offset, length)
override def readFully(position: Long, buffer: Array[Byte]): Unit =
wrapped.readFully(position, buffer)
override def available(): Int = wrapped.available()
override def mark(readlimit: Int): Unit = wrapped.mark(readlimit)
override def skip(n: Long): Long = wrapped.skip(n)
override def markSupported(): Boolean = wrapped.markSupported()
override def close(): Unit = {
wrapped.close()
openStreams.remove(wrapped)
}
override def read(): Int = wrapped.read()
override def reset(): Unit = wrapped.reset()
override def toString: String = wrapped.toString
override def equals(obj: scala.Any): Boolean = wrapped.equals(obj)
override def hashCode(): Int = wrapped.hashCode()
}
}
}

View file

@ -17,11 +17,11 @@
package org.apache.spark
import org.scalatest.BeforeAndAfterAll
import org.scalatest.{BeforeAndAfterAll, BeforeAndAfterEach}
import org.scalatest.Suite
/** Shares a local `SparkContext` between all tests in a suite and closes it at the end */
trait SharedSparkContext extends BeforeAndAfterAll { self: Suite =>
trait SharedSparkContext extends BeforeAndAfterAll with BeforeAndAfterEach { self: Suite =>
@transient private var _sc: SparkContext = _
@ -31,7 +31,8 @@ trait SharedSparkContext extends BeforeAndAfterAll { self: Suite =>
override def beforeAll() {
super.beforeAll()
_sc = new SparkContext("local[4]", "test", conf)
_sc = new SparkContext(
"local[4]", "test", conf.set("spark.hadoop.fs.file.impl", classOf[DebugFilesystem].getName))
}
override def afterAll() {
@ -42,4 +43,14 @@ trait SharedSparkContext extends BeforeAndAfterAll { self: Suite =>
super.afterAll()
}
}
protected override def beforeEach(): Unit = {
super.beforeEach()
DebugFilesystem.clearOpenStreams()
}
protected override def afterEach(): Unit = {
super.afterEach()
DebugFilesystem.assertNoOpenStreams()
}
}

View file

@ -104,6 +104,7 @@ class ParquetEncodingSuite extends ParquetCompatibilityTest with SharedSQLContex
assert(column.getUTF8String(3 * i + 1).toString == i.toString)
assert(column.getUTF8String(3 * i + 2).toString == i.toString)
}
reader.close()
}
}
}

View file

@ -203,13 +203,14 @@ class HDFSMetadataLogSuite extends SparkFunSuite with SharedSQLContext {
}
// Open and delete
fm.open(path)
val f1 = fm.open(path)
fm.delete(path)
assert(!fm.exists(path))
intercept[IOException] {
fm.open(path)
}
fm.delete(path) // should not throw exception
f1.close()
// Rename
val path1 = new Path(s"$dir/file1")

View file

@ -17,14 +17,16 @@
package org.apache.spark.sql.test
import org.apache.spark.SparkConf
import org.scalatest.BeforeAndAfterEach
import org.apache.spark.{DebugFilesystem, SparkConf}
import org.apache.spark.sql.{SparkSession, SQLContext}
/**
* Helper trait for SQL test suites where all tests share a single [[TestSparkSession]].
*/
trait SharedSQLContext extends SQLTestUtils {
trait SharedSQLContext extends SQLTestUtils with BeforeAndAfterEach {
protected val sparkConf = new SparkConf()
@ -52,7 +54,8 @@ trait SharedSQLContext extends SQLTestUtils {
protected override def beforeAll(): Unit = {
SparkSession.sqlListener.set(null)
if (_spark == null) {
_spark = new TestSparkSession(sparkConf)
_spark = new TestSparkSession(
sparkConf.set("spark.hadoop.fs.file.impl", classOf[DebugFilesystem].getName))
}
// Ensure we have initialized the context before calling parent code
super.beforeAll()
@ -71,4 +74,14 @@ trait SharedSQLContext extends SQLTestUtils {
super.afterAll()
}
}
protected override def beforeEach(): Unit = {
super.beforeEach()
DebugFilesystem.clearOpenStreams()
}
protected override def afterEach(): Unit = {
super.afterEach()
DebugFilesystem.assertNoOpenStreams()
}
}