[SPARK-14304][SQL][TESTS] Fix tests that don't create temp files in the java.io.tmpdir folder

## What changes were proposed in this pull request?

If I press `CTRL-C` when running these tests, the temp files will be left in `sql/core` folder and I need to delete them manually. It's annoying. This PR just moves the temp files to the `java.io.tmpdir` folder and add a name prefix for them.

## How was this patch tested?

Existing Jenkins tests

Author: Shixiong Zhu <shixiong@databricks.com>

Closes #12093 from zsxwing/temp-file.
This commit is contained in:
Shixiong Zhu 2016-03-31 12:17:25 -07:00 committed by Andrew Or
parent 3cfbeb70b1
commit e785402826
6 changed files with 24 additions and 22 deletions

View file

@ -265,7 +265,7 @@ trait StreamTest extends QueryTest with Timeouts {
}
val testThread = Thread.currentThread()
val metadataRoot = Utils.createTempDir("streaming.metadata").getCanonicalPath
val metadataRoot = Utils.createTempDir(namePrefix = "streaming.metadata").getCanonicalPath
try {
startedTest.foreach { action =>

View file

@ -236,7 +236,8 @@ class ContinuousQueryManagerSuite extends StreamTest with SharedSQLContext with
@volatile var query: StreamExecution = null
try {
val df = ds.toDF
val metadataRoot = Utils.createTempDir("streaming.metadata").getCanonicalPath
val metadataRoot =
Utils.createTempDir(namePrefix = "streaming.metadata").getCanonicalPath
query = sqlContext
.streams
.startQuery(

View file

@ -69,7 +69,8 @@ class DefaultSource extends StreamSourceProvider with StreamSinkProvider {
class DataFrameReaderWriterSuite extends StreamTest with SharedSQLContext with BeforeAndAfter {
import testImplicits._
private def newMetadataDir = Utils.createTempDir("streaming.metadata").getCanonicalPath
private def newMetadataDir =
Utils.createTempDir(namePrefix = "streaming.metadata").getCanonicalPath
after {
sqlContext.streams.active.foreach(_.stop())

View file

@ -29,8 +29,8 @@ class FileStreamSinkSuite extends StreamTest with SharedSQLContext {
val inputData = MemoryStream[Int]
val df = inputData.toDF()
val outputDir = Utils.createTempDir("stream.output").getCanonicalPath
val checkpointDir = Utils.createTempDir("stream.checkpoint").getCanonicalPath
val outputDir = Utils.createTempDir(namePrefix = "stream.output").getCanonicalPath
val checkpointDir = Utils.createTempDir(namePrefix = "stream.checkpoint").getCanonicalPath
val query =
df.write

View file

@ -202,8 +202,8 @@ class FileStreamSourceSuite extends FileStreamSourceTest with SharedSQLContext {
}
test("read from text files") {
val src = Utils.createTempDir("streaming.src")
val tmp = Utils.createTempDir("streaming.tmp")
val src = Utils.createTempDir(namePrefix = "streaming.src")
val tmp = Utils.createTempDir(namePrefix = "streaming.tmp")
val textSource = createFileStreamSource("text", src.getCanonicalPath)
val filtered = textSource.toDF().filter($"value" contains "keep")
@ -224,8 +224,8 @@ class FileStreamSourceSuite extends FileStreamSourceTest with SharedSQLContext {
}
test("read from json files") {
val src = Utils.createTempDir("streaming.src")
val tmp = Utils.createTempDir("streaming.tmp")
val src = Utils.createTempDir(namePrefix = "streaming.src")
val tmp = Utils.createTempDir(namePrefix = "streaming.tmp")
val textSource = createFileStreamSource("json", src.getCanonicalPath, Some(valueSchema))
val filtered = textSource.toDF().filter($"value" contains "keep")
@ -258,8 +258,8 @@ class FileStreamSourceSuite extends FileStreamSourceTest with SharedSQLContext {
}
test("read from json files with inferring schema") {
val src = Utils.createTempDir("streaming.src")
val tmp = Utils.createTempDir("streaming.tmp")
val src = Utils.createTempDir(namePrefix = "streaming.src")
val tmp = Utils.createTempDir(namePrefix = "streaming.tmp")
// Add a file so that we can infer its schema
stringToFile(new File(src, "existing"), "{'c': 'drop1'}\n{'c': 'keep2'}\n{'c': 'keep3'}")
@ -279,8 +279,8 @@ class FileStreamSourceSuite extends FileStreamSourceTest with SharedSQLContext {
}
test("read from parquet files") {
val src = Utils.createTempDir("streaming.src")
val tmp = Utils.createTempDir("streaming.tmp")
val src = Utils.createTempDir(namePrefix = "streaming.src")
val tmp = Utils.createTempDir(namePrefix = "streaming.tmp")
val fileSource = createFileStreamSource("parquet", src.getCanonicalPath, Some(valueSchema))
val filtered = fileSource.toDF().filter($"value" contains "keep")
@ -301,7 +301,7 @@ class FileStreamSourceSuite extends FileStreamSourceTest with SharedSQLContext {
}
test("file stream source without schema") {
val src = Utils.createTempDir("streaming.src")
val src = Utils.createTempDir(namePrefix = "streaming.src")
// Only "text" doesn't need a schema
createFileStreamSource("text", src.getCanonicalPath)
@ -318,8 +318,8 @@ class FileStreamSourceSuite extends FileStreamSourceTest with SharedSQLContext {
}
test("fault tolerance") {
val src = Utils.createTempDir("streaming.src")
val tmp = Utils.createTempDir("streaming.tmp")
val src = Utils.createTempDir(namePrefix = "streaming.src")
val tmp = Utils.createTempDir(namePrefix = "streaming.tmp")
val textSource = createFileStreamSource("text", src.getCanonicalPath)
val filtered = textSource.toDF().filter($"value" contains "keep")
@ -346,8 +346,8 @@ class FileStreamSourceStressTestSuite extends FileStreamSourceTest with SharedSQ
import testImplicits._
test("file source stress test") {
val src = Utils.createTempDir("streaming.src")
val tmp = Utils.createTempDir("streaming.tmp")
val src = Utils.createTempDir(namePrefix = "streaming.src")
val tmp = Utils.createTempDir(namePrefix = "streaming.tmp")
val textSource = createFileStreamSource("text", src.getCanonicalPath)
val ds = textSource.toDS[String]().map(_.toInt + 1)

View file

@ -43,10 +43,10 @@ class FileStressSuite extends StreamTest with SharedSQLContext {
test("fault tolerance stress test") {
val numRecords = 10000
val inputDir = Utils.createTempDir("stream.input").getCanonicalPath
val stagingDir = Utils.createTempDir("stream.staging").getCanonicalPath
val outputDir = Utils.createTempDir("stream.output").getCanonicalPath
val checkpoint = Utils.createTempDir("stream.checkpoint").getCanonicalPath
val inputDir = Utils.createTempDir(namePrefix = "stream.input").getCanonicalPath
val stagingDir = Utils.createTempDir(namePrefix = "stream.staging").getCanonicalPath
val outputDir = Utils.createTempDir(namePrefix = "stream.output").getCanonicalPath
val checkpoint = Utils.createTempDir(namePrefix = "stream.checkpoint").getCanonicalPath
@volatile
var continue = true