SPARK-3811 [CORE] More robust / standard Utils.deleteRecursively, Utils.createTempDir
I noticed a few issues with how temp directories are created and deleted: *Minor* * Guava's `Files.createTempDir()` plus `File.deleteOnExit()` is used in many tests to make a temp dir, but `Utils.createTempDir()` seems to be the standard Spark mechanism * Call to `File.deleteOnExit()` could be pushed into `Utils.createTempDir()` as well, along with this replacement * _I messed up the message in an exception in `Utils` in SPARK-3794; fixed here_ *Bit Less Minor* * `Utils.deleteRecursively()` fails immediately if any `IOException` occurs, instead of trying to delete any remaining files and subdirectories. I've observed this leave temp dirs around. I suggest changing it to continue in the face of an exception and throw one of the possibly several exceptions that occur at the end. * `Utils.createTempDir()` will add a JVM shutdown hook every time the method is called. Even if the subdir is the parent of another parent dir, since this check is inside the hook. However `Utils` manages a set of all dirs to delete on shutdown already, called `shutdownDeletePaths`. A single hook can be registered to delete all of these on exit. This is how Tachyon temp paths are cleaned up in `TachyonBlockManager`. I noticed a few other things that might be changed but wanted to ask first: * Shouldn't the set of dirs to delete be `File`, not just `String` paths? * `Utils` manages the set of `TachyonFile` that have been registered for deletion, but the shutdown hook is managed in `TachyonBlockManager`. Should this logic not live together, and not in `Utils`? it's more specific to Tachyon, and looks a slight bit odd to import in such a generic place. Author: Sean Owen <sowen@cloudera.com> Closes #2670 from srowen/SPARK-3811 and squashes the following commits: 071ae60 [Sean Owen] Update per @vanzin's review da0146d [Sean Owen] Make Utils.deleteRecursively try to delete all paths even when an exception occurs; use one shutdown hook instead of one per method call to delete temp dirs 3a0faa4 [Sean Owen] Standardize on Utils.createTempDir instead of Files.createTempDir
This commit is contained in:
parent
2837bf8548
commit
363baacade
|
@ -26,6 +26,8 @@ import scala.collection.JavaConversions._
|
||||||
import javax.tools.{JavaFileObject, SimpleJavaFileObject, ToolProvider}
|
import javax.tools.{JavaFileObject, SimpleJavaFileObject, ToolProvider}
|
||||||
import com.google.common.io.Files
|
import com.google.common.io.Files
|
||||||
|
|
||||||
|
import org.apache.spark.util.Utils
|
||||||
|
|
||||||
/**
|
/**
|
||||||
* Utilities for tests. Included in main codebase since it's used by multiple
|
* Utilities for tests. Included in main codebase since it's used by multiple
|
||||||
* projects.
|
* projects.
|
||||||
|
@ -42,8 +44,7 @@ private[spark] object TestUtils {
|
||||||
* in order to avoid interference between tests.
|
* in order to avoid interference between tests.
|
||||||
*/
|
*/
|
||||||
def createJarWithClasses(classNames: Seq[String], value: String = ""): URL = {
|
def createJarWithClasses(classNames: Seq[String], value: String = ""): URL = {
|
||||||
val tempDir = Files.createTempDir()
|
val tempDir = Utils.createTempDir()
|
||||||
tempDir.deleteOnExit()
|
|
||||||
val files = for (name <- classNames) yield createCompiledClass(name, tempDir, value)
|
val files = for (name <- classNames) yield createCompiledClass(name, tempDir, value)
|
||||||
val jarFile = new File(tempDir, "testJar-%s.jar".format(System.currentTimeMillis()))
|
val jarFile = new File(tempDir, "testJar-%s.jar".format(System.currentTimeMillis()))
|
||||||
createJar(files, jarFile)
|
createJar(files, jarFile)
|
||||||
|
|
|
@ -168,6 +168,20 @@ private[spark] object Utils extends Logging {
|
||||||
private val shutdownDeletePaths = new scala.collection.mutable.HashSet[String]()
|
private val shutdownDeletePaths = new scala.collection.mutable.HashSet[String]()
|
||||||
private val shutdownDeleteTachyonPaths = new scala.collection.mutable.HashSet[String]()
|
private val shutdownDeleteTachyonPaths = new scala.collection.mutable.HashSet[String]()
|
||||||
|
|
||||||
|
// Add a shutdown hook to delete the temp dirs when the JVM exits
|
||||||
|
Runtime.getRuntime.addShutdownHook(new Thread("delete Spark temp dirs") {
|
||||||
|
override def run(): Unit = Utils.logUncaughtExceptions {
|
||||||
|
logDebug("Shutdown hook called")
|
||||||
|
shutdownDeletePaths.foreach { dirPath =>
|
||||||
|
try {
|
||||||
|
Utils.deleteRecursively(new File(dirPath))
|
||||||
|
} catch {
|
||||||
|
case e: Exception => logError(s"Exception while deleting Spark temp dir: $dirPath", e)
|
||||||
|
}
|
||||||
|
}
|
||||||
|
}
|
||||||
|
})
|
||||||
|
|
||||||
// Register the path to be deleted via shutdown hook
|
// Register the path to be deleted via shutdown hook
|
||||||
def registerShutdownDeleteDir(file: File) {
|
def registerShutdownDeleteDir(file: File) {
|
||||||
val absolutePath = file.getAbsolutePath()
|
val absolutePath = file.getAbsolutePath()
|
||||||
|
@ -252,14 +266,6 @@ private[spark] object Utils extends Logging {
|
||||||
}
|
}
|
||||||
|
|
||||||
registerShutdownDeleteDir(dir)
|
registerShutdownDeleteDir(dir)
|
||||||
|
|
||||||
// Add a shutdown hook to delete the temp dir when the JVM exits
|
|
||||||
Runtime.getRuntime.addShutdownHook(new Thread("delete Spark temp dir " + dir) {
|
|
||||||
override def run() {
|
|
||||||
// Attempt to delete if some patch which is parent of this is not already registered.
|
|
||||||
if (! hasRootAsShutdownDeleteDir(dir)) Utils.deleteRecursively(dir)
|
|
||||||
}
|
|
||||||
})
|
|
||||||
dir
|
dir
|
||||||
}
|
}
|
||||||
|
|
||||||
|
@ -666,15 +672,30 @@ private[spark] object Utils extends Logging {
|
||||||
*/
|
*/
|
||||||
def deleteRecursively(file: File) {
|
def deleteRecursively(file: File) {
|
||||||
if (file != null) {
|
if (file != null) {
|
||||||
if (file.isDirectory() && !isSymlink(file)) {
|
try {
|
||||||
for (child <- listFilesSafely(file)) {
|
if (file.isDirectory && !isSymlink(file)) {
|
||||||
deleteRecursively(child)
|
var savedIOException: IOException = null
|
||||||
|
for (child <- listFilesSafely(file)) {
|
||||||
|
try {
|
||||||
|
deleteRecursively(child)
|
||||||
|
} catch {
|
||||||
|
// In case of multiple exceptions, only last one will be thrown
|
||||||
|
case ioe: IOException => savedIOException = ioe
|
||||||
|
}
|
||||||
|
}
|
||||||
|
if (savedIOException != null) {
|
||||||
|
throw savedIOException
|
||||||
|
}
|
||||||
|
shutdownDeletePaths.synchronized {
|
||||||
|
shutdownDeletePaths.remove(file.getAbsolutePath)
|
||||||
|
}
|
||||||
}
|
}
|
||||||
}
|
} finally {
|
||||||
if (!file.delete()) {
|
if (!file.delete()) {
|
||||||
// Delete can also fail if the file simply did not exist
|
// Delete can also fail if the file simply did not exist
|
||||||
if (file.exists()) {
|
if (file.exists()) {
|
||||||
throw new IOException("Failed to delete: " + file.getAbsolutePath)
|
throw new IOException("Failed to delete: " + file.getAbsolutePath)
|
||||||
|
}
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
@ -713,7 +734,7 @@ private[spark] object Utils extends Logging {
|
||||||
*/
|
*/
|
||||||
def doesDirectoryContainAnyNewFiles(dir: File, cutoff: Long): Boolean = {
|
def doesDirectoryContainAnyNewFiles(dir: File, cutoff: Long): Boolean = {
|
||||||
if (!dir.isDirectory) {
|
if (!dir.isDirectory) {
|
||||||
throw new IllegalArgumentException("$dir is not a directory!")
|
throw new IllegalArgumentException(s"$dir is not a directory!")
|
||||||
}
|
}
|
||||||
val filesAndDirs = dir.listFiles()
|
val filesAndDirs = dir.listFiles()
|
||||||
val cutoffTimeInMillis = System.currentTimeMillis - (cutoff * 1000)
|
val cutoffTimeInMillis = System.currentTimeMillis - (cutoff * 1000)
|
||||||
|
|
|
@ -20,7 +20,6 @@ package org.apache.spark
|
||||||
import java.io._
|
import java.io._
|
||||||
import java.util.jar.{JarEntry, JarOutputStream}
|
import java.util.jar.{JarEntry, JarOutputStream}
|
||||||
|
|
||||||
import com.google.common.io.Files
|
|
||||||
import org.scalatest.FunSuite
|
import org.scalatest.FunSuite
|
||||||
|
|
||||||
import org.apache.spark.SparkContext._
|
import org.apache.spark.SparkContext._
|
||||||
|
@ -41,8 +40,7 @@ class FileServerSuite extends FunSuite with LocalSparkContext {
|
||||||
override def beforeAll() {
|
override def beforeAll() {
|
||||||
super.beforeAll()
|
super.beforeAll()
|
||||||
|
|
||||||
tmpDir = Files.createTempDir()
|
tmpDir = Utils.createTempDir()
|
||||||
tmpDir.deleteOnExit()
|
|
||||||
val testTempDir = new File(tmpDir, "test")
|
val testTempDir = new File(tmpDir, "test")
|
||||||
testTempDir.mkdir()
|
testTempDir.mkdir()
|
||||||
|
|
||||||
|
|
|
@ -21,7 +21,6 @@ import java.io.{File, FileWriter}
|
||||||
|
|
||||||
import scala.io.Source
|
import scala.io.Source
|
||||||
|
|
||||||
import com.google.common.io.Files
|
|
||||||
import org.apache.hadoop.io._
|
import org.apache.hadoop.io._
|
||||||
import org.apache.hadoop.io.compress.DefaultCodec
|
import org.apache.hadoop.io.compress.DefaultCodec
|
||||||
import org.apache.hadoop.mapred.{JobConf, FileAlreadyExistsException, FileSplit, TextInputFormat, TextOutputFormat}
|
import org.apache.hadoop.mapred.{JobConf, FileAlreadyExistsException, FileSplit, TextInputFormat, TextOutputFormat}
|
||||||
|
@ -39,8 +38,7 @@ class FileSuite extends FunSuite with LocalSparkContext {
|
||||||
|
|
||||||
override def beforeEach() {
|
override def beforeEach() {
|
||||||
super.beforeEach()
|
super.beforeEach()
|
||||||
tempDir = Files.createTempDir()
|
tempDir = Utils.createTempDir()
|
||||||
tempDir.deleteOnExit()
|
|
||||||
}
|
}
|
||||||
|
|
||||||
override def afterEach() {
|
override def afterEach() {
|
||||||
|
|
|
@ -26,7 +26,6 @@ import org.apache.spark.deploy.SparkSubmit._
|
||||||
import org.apache.spark.util.Utils
|
import org.apache.spark.util.Utils
|
||||||
import org.scalatest.FunSuite
|
import org.scalatest.FunSuite
|
||||||
import org.scalatest.Matchers
|
import org.scalatest.Matchers
|
||||||
import com.google.common.io.Files
|
|
||||||
|
|
||||||
class SparkSubmitSuite extends FunSuite with Matchers {
|
class SparkSubmitSuite extends FunSuite with Matchers {
|
||||||
def beforeAll() {
|
def beforeAll() {
|
||||||
|
@ -332,7 +331,7 @@ class SparkSubmitSuite extends FunSuite with Matchers {
|
||||||
}
|
}
|
||||||
|
|
||||||
def forConfDir(defaults: Map[String, String]) (f: String => Unit) = {
|
def forConfDir(defaults: Map[String, String]) (f: String => Unit) = {
|
||||||
val tmpDir = Files.createTempDir()
|
val tmpDir = Utils.createTempDir()
|
||||||
|
|
||||||
val defaultsConf = new File(tmpDir.getAbsolutePath, "spark-defaults.conf")
|
val defaultsConf = new File(tmpDir.getAbsolutePath, "spark-defaults.conf")
|
||||||
val writer = new OutputStreamWriter(new FileOutputStream(defaultsConf))
|
val writer = new OutputStreamWriter(new FileOutputStream(defaultsConf))
|
||||||
|
|
|
@ -23,8 +23,6 @@ import java.io.FileOutputStream
|
||||||
|
|
||||||
import scala.collection.immutable.IndexedSeq
|
import scala.collection.immutable.IndexedSeq
|
||||||
|
|
||||||
import com.google.common.io.Files
|
|
||||||
|
|
||||||
import org.scalatest.BeforeAndAfterAll
|
import org.scalatest.BeforeAndAfterAll
|
||||||
import org.scalatest.FunSuite
|
import org.scalatest.FunSuite
|
||||||
|
|
||||||
|
@ -66,9 +64,7 @@ class WholeTextFileRecordReaderSuite extends FunSuite with BeforeAndAfterAll {
|
||||||
* 3) Does the contents be the same.
|
* 3) Does the contents be the same.
|
||||||
*/
|
*/
|
||||||
test("Correctness of WholeTextFileRecordReader.") {
|
test("Correctness of WholeTextFileRecordReader.") {
|
||||||
|
val dir = Utils.createTempDir()
|
||||||
val dir = Files.createTempDir()
|
|
||||||
dir.deleteOnExit()
|
|
||||||
println(s"Local disk address is ${dir.toString}.")
|
println(s"Local disk address is ${dir.toString}.")
|
||||||
|
|
||||||
WholeTextFileRecordReaderSuite.files.foreach { case (filename, contents) =>
|
WholeTextFileRecordReaderSuite.files.foreach { case (filename, contents) =>
|
||||||
|
|
|
@ -24,13 +24,14 @@ import org.apache.hadoop.util.Progressable
|
||||||
import scala.collection.mutable.{ArrayBuffer, HashSet}
|
import scala.collection.mutable.{ArrayBuffer, HashSet}
|
||||||
import scala.util.Random
|
import scala.util.Random
|
||||||
|
|
||||||
import com.google.common.io.Files
|
|
||||||
import org.apache.hadoop.conf.{Configurable, Configuration}
|
import org.apache.hadoop.conf.{Configurable, Configuration}
|
||||||
import org.apache.hadoop.mapreduce.{JobContext => NewJobContext, OutputCommitter => NewOutputCommitter,
|
import org.apache.hadoop.mapreduce.{JobContext => NewJobContext, OutputCommitter => NewOutputCommitter,
|
||||||
OutputFormat => NewOutputFormat, RecordWriter => NewRecordWriter,
|
OutputFormat => NewOutputFormat, RecordWriter => NewRecordWriter,
|
||||||
TaskAttemptContext => NewTaskAttempContext}
|
TaskAttemptContext => NewTaskAttempContext}
|
||||||
import org.apache.spark.{Partitioner, SharedSparkContext}
|
import org.apache.spark.{Partitioner, SharedSparkContext}
|
||||||
import org.apache.spark.SparkContext._
|
import org.apache.spark.SparkContext._
|
||||||
|
import org.apache.spark.util.Utils
|
||||||
|
|
||||||
import org.scalatest.FunSuite
|
import org.scalatest.FunSuite
|
||||||
|
|
||||||
class PairRDDFunctionsSuite extends FunSuite with SharedSparkContext {
|
class PairRDDFunctionsSuite extends FunSuite with SharedSparkContext {
|
||||||
|
@ -381,14 +382,16 @@ class PairRDDFunctionsSuite extends FunSuite with SharedSparkContext {
|
||||||
}
|
}
|
||||||
|
|
||||||
test("zero-partition RDD") {
|
test("zero-partition RDD") {
|
||||||
val emptyDir = Files.createTempDir()
|
val emptyDir = Utils.createTempDir()
|
||||||
emptyDir.deleteOnExit()
|
try {
|
||||||
val file = sc.textFile(emptyDir.getAbsolutePath)
|
val file = sc.textFile(emptyDir.getAbsolutePath)
|
||||||
assert(file.partitions.size == 0)
|
assert(file.partitions.isEmpty)
|
||||||
assert(file.collect().toList === Nil)
|
assert(file.collect().toList === Nil)
|
||||||
// Test that a shuffle on the file works, because this used to be a bug
|
// Test that a shuffle on the file works, because this used to be a bug
|
||||||
assert(file.map(line => (line, 1)).reduceByKey(_ + _).collect().toList === Nil)
|
assert(file.map(line => (line, 1)).reduceByKey(_ + _).collect().toList === Nil)
|
||||||
emptyDir.delete()
|
} finally {
|
||||||
|
Utils.deleteRecursively(emptyDir)
|
||||||
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
test("keys and values") {
|
test("keys and values") {
|
||||||
|
|
|
@ -20,7 +20,6 @@ package org.apache.spark.scheduler
|
||||||
import scala.collection.mutable
|
import scala.collection.mutable
|
||||||
import scala.io.Source
|
import scala.io.Source
|
||||||
|
|
||||||
import com.google.common.io.Files
|
|
||||||
import org.apache.hadoop.fs.{FileStatus, Path}
|
import org.apache.hadoop.fs.{FileStatus, Path}
|
||||||
import org.json4s.jackson.JsonMethods._
|
import org.json4s.jackson.JsonMethods._
|
||||||
import org.scalatest.{BeforeAndAfter, FunSuite}
|
import org.scalatest.{BeforeAndAfter, FunSuite}
|
||||||
|
@ -51,8 +50,7 @@ class EventLoggingListenerSuite extends FunSuite with BeforeAndAfter {
|
||||||
private var logDirPath: Path = _
|
private var logDirPath: Path = _
|
||||||
|
|
||||||
before {
|
before {
|
||||||
testDir = Files.createTempDir()
|
testDir = Utils.createTempDir()
|
||||||
testDir.deleteOnExit()
|
|
||||||
logDirPath = Utils.getFilePath(testDir, "spark-events")
|
logDirPath = Utils.getFilePath(testDir, "spark-events")
|
||||||
}
|
}
|
||||||
|
|
||||||
|
|
|
@ -19,7 +19,6 @@ package org.apache.spark.scheduler
|
||||||
|
|
||||||
import java.io.{File, PrintWriter}
|
import java.io.{File, PrintWriter}
|
||||||
|
|
||||||
import com.google.common.io.Files
|
|
||||||
import org.json4s.jackson.JsonMethods._
|
import org.json4s.jackson.JsonMethods._
|
||||||
import org.scalatest.{BeforeAndAfter, FunSuite}
|
import org.scalatest.{BeforeAndAfter, FunSuite}
|
||||||
|
|
||||||
|
@ -39,8 +38,7 @@ class ReplayListenerSuite extends FunSuite with BeforeAndAfter {
|
||||||
private var testDir: File = _
|
private var testDir: File = _
|
||||||
|
|
||||||
before {
|
before {
|
||||||
testDir = Files.createTempDir()
|
testDir = Utils.createTempDir()
|
||||||
testDir.deleteOnExit()
|
|
||||||
}
|
}
|
||||||
|
|
||||||
after {
|
after {
|
||||||
|
|
|
@ -19,22 +19,13 @@ package org.apache.spark.storage
|
||||||
|
|
||||||
import java.io.{File, FileWriter}
|
import java.io.{File, FileWriter}
|
||||||
|
|
||||||
import org.apache.spark.network.nio.NioBlockTransferService
|
|
||||||
import org.apache.spark.shuffle.hash.HashShuffleManager
|
|
||||||
|
|
||||||
import scala.collection.mutable
|
|
||||||
import scala.language.reflectiveCalls
|
import scala.language.reflectiveCalls
|
||||||
|
|
||||||
import akka.actor.Props
|
|
||||||
import com.google.common.io.Files
|
|
||||||
import org.mockito.Mockito.{mock, when}
|
import org.mockito.Mockito.{mock, when}
|
||||||
import org.scalatest.{BeforeAndAfterAll, BeforeAndAfterEach, FunSuite}
|
import org.scalatest.{BeforeAndAfterAll, BeforeAndAfterEach, FunSuite}
|
||||||
|
|
||||||
import org.apache.spark.SparkConf
|
import org.apache.spark.SparkConf
|
||||||
import org.apache.spark.scheduler.LiveListenerBus
|
import org.apache.spark.util.Utils
|
||||||
import org.apache.spark.serializer.JavaSerializer
|
|
||||||
import org.apache.spark.util.{AkkaUtils, Utils}
|
|
||||||
import org.apache.spark.executor.ShuffleWriteMetrics
|
|
||||||
|
|
||||||
class DiskBlockManagerSuite extends FunSuite with BeforeAndAfterEach with BeforeAndAfterAll {
|
class DiskBlockManagerSuite extends FunSuite with BeforeAndAfterEach with BeforeAndAfterAll {
|
||||||
private val testConf = new SparkConf(false)
|
private val testConf = new SparkConf(false)
|
||||||
|
@ -48,10 +39,8 @@ class DiskBlockManagerSuite extends FunSuite with BeforeAndAfterEach with Before
|
||||||
|
|
||||||
override def beforeAll() {
|
override def beforeAll() {
|
||||||
super.beforeAll()
|
super.beforeAll()
|
||||||
rootDir0 = Files.createTempDir()
|
rootDir0 = Utils.createTempDir()
|
||||||
rootDir0.deleteOnExit()
|
rootDir1 = Utils.createTempDir()
|
||||||
rootDir1 = Files.createTempDir()
|
|
||||||
rootDir1.deleteOnExit()
|
|
||||||
rootDirs = rootDir0.getAbsolutePath + "," + rootDir1.getAbsolutePath
|
rootDirs = rootDir0.getAbsolutePath + "," + rootDir1.getAbsolutePath
|
||||||
}
|
}
|
||||||
|
|
||||||
|
|
|
@ -21,7 +21,6 @@ import java.io.{File, IOException}
|
||||||
|
|
||||||
import scala.io.Source
|
import scala.io.Source
|
||||||
|
|
||||||
import com.google.common.io.Files
|
|
||||||
import org.apache.hadoop.fs.Path
|
import org.apache.hadoop.fs.Path
|
||||||
import org.scalatest.{BeforeAndAfter, FunSuite}
|
import org.scalatest.{BeforeAndAfter, FunSuite}
|
||||||
|
|
||||||
|
@ -44,7 +43,7 @@ class FileLoggerSuite extends FunSuite with BeforeAndAfter {
|
||||||
private var logDirPathString: String = _
|
private var logDirPathString: String = _
|
||||||
|
|
||||||
before {
|
before {
|
||||||
testDir = Files.createTempDir()
|
testDir = Utils.createTempDir()
|
||||||
logDirPath = Utils.getFilePath(testDir, "test-file-logger")
|
logDirPath = Utils.getFilePath(testDir, "test-file-logger")
|
||||||
logDirPathString = logDirPath.toString
|
logDirPathString = logDirPath.toString
|
||||||
}
|
}
|
||||||
|
|
|
@ -112,7 +112,7 @@ class UtilsSuite extends FunSuite {
|
||||||
}
|
}
|
||||||
|
|
||||||
test("reading offset bytes of a file") {
|
test("reading offset bytes of a file") {
|
||||||
val tmpDir2 = Files.createTempDir()
|
val tmpDir2 = Utils.createTempDir()
|
||||||
tmpDir2.deleteOnExit()
|
tmpDir2.deleteOnExit()
|
||||||
val f1Path = tmpDir2 + "/f1"
|
val f1Path = tmpDir2 + "/f1"
|
||||||
val f1 = new FileOutputStream(f1Path)
|
val f1 = new FileOutputStream(f1Path)
|
||||||
|
@ -141,7 +141,7 @@ class UtilsSuite extends FunSuite {
|
||||||
}
|
}
|
||||||
|
|
||||||
test("reading offset bytes across multiple files") {
|
test("reading offset bytes across multiple files") {
|
||||||
val tmpDir = Files.createTempDir()
|
val tmpDir = Utils.createTempDir()
|
||||||
tmpDir.deleteOnExit()
|
tmpDir.deleteOnExit()
|
||||||
val files = (1 to 3).map(i => new File(tmpDir, i.toString))
|
val files = (1 to 3).map(i => new File(tmpDir, i.toString))
|
||||||
Files.write("0123456789", files(0), Charsets.UTF_8)
|
Files.write("0123456789", files(0), Charsets.UTF_8)
|
||||||
|
@ -308,4 +308,28 @@ class UtilsSuite extends FunSuite {
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
|
test("deleteRecursively") {
|
||||||
|
val tempDir1 = Utils.createTempDir()
|
||||||
|
assert(tempDir1.exists())
|
||||||
|
Utils.deleteRecursively(tempDir1)
|
||||||
|
assert(!tempDir1.exists())
|
||||||
|
|
||||||
|
val tempDir2 = Utils.createTempDir()
|
||||||
|
val tempFile1 = new File(tempDir2, "foo.txt")
|
||||||
|
Files.touch(tempFile1)
|
||||||
|
assert(tempFile1.exists())
|
||||||
|
Utils.deleteRecursively(tempFile1)
|
||||||
|
assert(!tempFile1.exists())
|
||||||
|
|
||||||
|
val tempDir3 = new File(tempDir2, "subdir")
|
||||||
|
assert(tempDir3.mkdir())
|
||||||
|
val tempFile2 = new File(tempDir3, "bar.txt")
|
||||||
|
Files.touch(tempFile2)
|
||||||
|
assert(tempFile2.exists())
|
||||||
|
Utils.deleteRecursively(tempDir2)
|
||||||
|
assert(!tempDir2.exists())
|
||||||
|
assert(!tempDir3.exists())
|
||||||
|
assert(!tempFile2.exists())
|
||||||
|
}
|
||||||
|
|
||||||
}
|
}
|
||||||
|
|
|
@ -67,8 +67,7 @@ class MLUtilsSuite extends FunSuite with LocalSparkContext {
|
||||||
|0
|
|0
|
||||||
|0 2:4.0 4:5.0 6:6.0
|
|0 2:4.0 4:5.0 6:6.0
|
||||||
""".stripMargin
|
""".stripMargin
|
||||||
val tempDir = Files.createTempDir()
|
val tempDir = Utils.createTempDir()
|
||||||
tempDir.deleteOnExit()
|
|
||||||
val file = new File(tempDir.getPath, "part-00000")
|
val file = new File(tempDir.getPath, "part-00000")
|
||||||
Files.write(lines, file, Charsets.US_ASCII)
|
Files.write(lines, file, Charsets.US_ASCII)
|
||||||
val path = tempDir.toURI.toString
|
val path = tempDir.toURI.toString
|
||||||
|
@ -100,7 +99,7 @@ class MLUtilsSuite extends FunSuite with LocalSparkContext {
|
||||||
LabeledPoint(1.1, Vectors.sparse(3, Seq((0, 1.23), (2, 4.56)))),
|
LabeledPoint(1.1, Vectors.sparse(3, Seq((0, 1.23), (2, 4.56)))),
|
||||||
LabeledPoint(0.0, Vectors.dense(1.01, 2.02, 3.03))
|
LabeledPoint(0.0, Vectors.dense(1.01, 2.02, 3.03))
|
||||||
), 2)
|
), 2)
|
||||||
val tempDir = Files.createTempDir()
|
val tempDir = Utils.createTempDir()
|
||||||
val outputDir = new File(tempDir, "output")
|
val outputDir = new File(tempDir, "output")
|
||||||
MLUtils.saveAsLibSVMFile(examples, outputDir.toURI.toString)
|
MLUtils.saveAsLibSVMFile(examples, outputDir.toURI.toString)
|
||||||
val lines = outputDir.listFiles()
|
val lines = outputDir.listFiles()
|
||||||
|
@ -166,7 +165,7 @@ class MLUtilsSuite extends FunSuite with LocalSparkContext {
|
||||||
Vectors.sparse(2, Array(1), Array(-1.0)),
|
Vectors.sparse(2, Array(1), Array(-1.0)),
|
||||||
Vectors.dense(0.0, 1.0)
|
Vectors.dense(0.0, 1.0)
|
||||||
), 2)
|
), 2)
|
||||||
val tempDir = Files.createTempDir()
|
val tempDir = Utils.createTempDir()
|
||||||
val outputDir = new File(tempDir, "vectors")
|
val outputDir = new File(tempDir, "vectors")
|
||||||
val path = outputDir.toURI.toString
|
val path = outputDir.toURI.toString
|
||||||
vectors.saveAsTextFile(path)
|
vectors.saveAsTextFile(path)
|
||||||
|
@ -181,7 +180,7 @@ class MLUtilsSuite extends FunSuite with LocalSparkContext {
|
||||||
LabeledPoint(0.0, Vectors.sparse(2, Array(1), Array(-1.0))),
|
LabeledPoint(0.0, Vectors.sparse(2, Array(1), Array(-1.0))),
|
||||||
LabeledPoint(1.0, Vectors.dense(0.0, 1.0))
|
LabeledPoint(1.0, Vectors.dense(0.0, 1.0))
|
||||||
), 2)
|
), 2)
|
||||||
val tempDir = Files.createTempDir()
|
val tempDir = Utils.createTempDir()
|
||||||
val outputDir = new File(tempDir, "points")
|
val outputDir = new File(tempDir, "points")
|
||||||
val path = outputDir.toURI.toString
|
val path = outputDir.toURI.toString
|
||||||
points.saveAsTextFile(path)
|
points.saveAsTextFile(path)
|
||||||
|
|
|
@ -23,8 +23,6 @@ import java.net.{URL, URLClassLoader}
|
||||||
import org.scalatest.BeforeAndAfterAll
|
import org.scalatest.BeforeAndAfterAll
|
||||||
import org.scalatest.FunSuite
|
import org.scalatest.FunSuite
|
||||||
|
|
||||||
import com.google.common.io.Files
|
|
||||||
|
|
||||||
import org.apache.spark.{SparkConf, TestUtils}
|
import org.apache.spark.{SparkConf, TestUtils}
|
||||||
import org.apache.spark.util.Utils
|
import org.apache.spark.util.Utils
|
||||||
|
|
||||||
|
@ -39,10 +37,8 @@ class ExecutorClassLoaderSuite extends FunSuite with BeforeAndAfterAll {
|
||||||
|
|
||||||
override def beforeAll() {
|
override def beforeAll() {
|
||||||
super.beforeAll()
|
super.beforeAll()
|
||||||
tempDir1 = Files.createTempDir()
|
tempDir1 = Utils.createTempDir()
|
||||||
tempDir1.deleteOnExit()
|
tempDir2 = Utils.createTempDir()
|
||||||
tempDir2 = Files.createTempDir()
|
|
||||||
tempDir2.deleteOnExit()
|
|
||||||
url1 = "file://" + tempDir1
|
url1 = "file://" + tempDir1
|
||||||
urls2 = List(tempDir2.toURI.toURL).toArray
|
urls2 = List(tempDir2.toURI.toURL).toArray
|
||||||
childClassNames.foreach(TestUtils.createCompiledClass(_, tempDir1, "1"))
|
childClassNames.foreach(TestUtils.createCompiledClass(_, tempDir1, "1"))
|
||||||
|
|
|
@ -22,7 +22,6 @@ import java.net.URLClassLoader
|
||||||
|
|
||||||
import scala.collection.mutable.ArrayBuffer
|
import scala.collection.mutable.ArrayBuffer
|
||||||
|
|
||||||
import com.google.common.io.Files
|
|
||||||
import org.scalatest.FunSuite
|
import org.scalatest.FunSuite
|
||||||
import org.apache.spark.SparkContext
|
import org.apache.spark.SparkContext
|
||||||
import org.apache.commons.lang3.StringEscapeUtils
|
import org.apache.commons.lang3.StringEscapeUtils
|
||||||
|
@ -190,8 +189,7 @@ class ReplSuite extends FunSuite {
|
||||||
}
|
}
|
||||||
|
|
||||||
test("interacting with files") {
|
test("interacting with files") {
|
||||||
val tempDir = Files.createTempDir()
|
val tempDir = Utils.createTempDir()
|
||||||
tempDir.deleteOnExit()
|
|
||||||
val out = new FileWriter(tempDir + "/input")
|
val out = new FileWriter(tempDir + "/input")
|
||||||
out.write("Hello world!\n")
|
out.write("Hello world!\n")
|
||||||
out.write("What's up?\n")
|
out.write("What's up?\n")
|
||||||
|
|
|
@ -231,8 +231,7 @@ class CheckpointSuite extends TestSuiteBase {
|
||||||
// failure, are re-processed or not.
|
// failure, are re-processed or not.
|
||||||
test("recovery with file input stream") {
|
test("recovery with file input stream") {
|
||||||
// Set up the streaming context and input streams
|
// Set up the streaming context and input streams
|
||||||
val testDir = Files.createTempDir()
|
val testDir = Utils.createTempDir()
|
||||||
testDir.deleteOnExit()
|
|
||||||
var ssc = new StreamingContext(master, framework, Seconds(1))
|
var ssc = new StreamingContext(master, framework, Seconds(1))
|
||||||
ssc.checkpoint(checkpointDir)
|
ssc.checkpoint(checkpointDir)
|
||||||
val fileStream = ssc.textFileStream(testDir.toString)
|
val fileStream = ssc.textFileStream(testDir.toString)
|
||||||
|
|
|
@ -98,8 +98,7 @@ class InputStreamsSuite extends TestSuiteBase with BeforeAndAfter {
|
||||||
conf.set("spark.streaming.clock", "org.apache.spark.streaming.util.SystemClock")
|
conf.set("spark.streaming.clock", "org.apache.spark.streaming.util.SystemClock")
|
||||||
|
|
||||||
// Set up the streaming context and input streams
|
// Set up the streaming context and input streams
|
||||||
val testDir = Files.createTempDir()
|
val testDir = Utils.createTempDir()
|
||||||
testDir.deleteOnExit()
|
|
||||||
val ssc = new StreamingContext(conf, batchDuration)
|
val ssc = new StreamingContext(conf, batchDuration)
|
||||||
val fileStream = ssc.textFileStream(testDir.toString)
|
val fileStream = ssc.textFileStream(testDir.toString)
|
||||||
val outputBuffer = new ArrayBuffer[Seq[String]] with SynchronizedBuffer[Seq[String]]
|
val outputBuffer = new ArrayBuffer[Seq[String]] with SynchronizedBuffer[Seq[String]]
|
||||||
|
|
|
@ -352,8 +352,7 @@ class FileGeneratingThread(input: Seq[String], testDir: Path, interval: Long)
|
||||||
extends Thread with Logging {
|
extends Thread with Logging {
|
||||||
|
|
||||||
override def run() {
|
override def run() {
|
||||||
val localTestDir = Files.createTempDir()
|
val localTestDir = Utils.createTempDir()
|
||||||
localTestDir.deleteOnExit()
|
|
||||||
var fs = testDir.getFileSystem(new Configuration())
|
var fs = testDir.getFileSystem(new Configuration())
|
||||||
val maxTries = 3
|
val maxTries = 3
|
||||||
try {
|
try {
|
||||||
|
|
|
@ -24,12 +24,12 @@ import scala.collection.mutable.SynchronizedBuffer
|
||||||
import scala.reflect.ClassTag
|
import scala.reflect.ClassTag
|
||||||
|
|
||||||
import org.scalatest.{BeforeAndAfter, FunSuite}
|
import org.scalatest.{BeforeAndAfter, FunSuite}
|
||||||
import com.google.common.io.Files
|
|
||||||
|
|
||||||
import org.apache.spark.streaming.dstream.{DStream, InputDStream, ForEachDStream}
|
import org.apache.spark.streaming.dstream.{DStream, InputDStream, ForEachDStream}
|
||||||
import org.apache.spark.streaming.util.ManualClock
|
import org.apache.spark.streaming.util.ManualClock
|
||||||
import org.apache.spark.{SparkConf, Logging}
|
import org.apache.spark.{SparkConf, Logging}
|
||||||
import org.apache.spark.rdd.RDD
|
import org.apache.spark.rdd.RDD
|
||||||
|
import org.apache.spark.util.Utils
|
||||||
|
|
||||||
/**
|
/**
|
||||||
* This is a input stream just for the testsuites. This is equivalent to a checkpointable,
|
* This is a input stream just for the testsuites. This is equivalent to a checkpointable,
|
||||||
|
@ -120,9 +120,8 @@ trait TestSuiteBase extends FunSuite with BeforeAndAfter with Logging {
|
||||||
|
|
||||||
// Directory where the checkpoint data will be saved
|
// Directory where the checkpoint data will be saved
|
||||||
lazy val checkpointDir = {
|
lazy val checkpointDir = {
|
||||||
val dir = Files.createTempDir()
|
val dir = Utils.createTempDir()
|
||||||
logDebug(s"checkpointDir: $dir")
|
logDebug(s"checkpointDir: $dir")
|
||||||
dir.deleteOnExit()
|
|
||||||
dir.toString
|
dir.toString
|
||||||
}
|
}
|
||||||
|
|
||||||
|
|
|
@ -20,13 +20,10 @@ package org.apache.spark.deploy.yarn
|
||||||
import java.io.File
|
import java.io.File
|
||||||
import java.net.URI
|
import java.net.URI
|
||||||
|
|
||||||
import com.google.common.io.Files
|
|
||||||
import org.apache.hadoop.conf.Configuration
|
import org.apache.hadoop.conf.Configuration
|
||||||
import org.apache.hadoop.fs.Path
|
import org.apache.hadoop.fs.Path
|
||||||
import org.apache.hadoop.mapreduce.MRJobConfig
|
import org.apache.hadoop.mapreduce.MRJobConfig
|
||||||
import org.apache.hadoop.yarn.conf.YarnConfiguration
|
|
||||||
import org.apache.hadoop.yarn.api.ApplicationConstants.Environment
|
import org.apache.hadoop.yarn.api.ApplicationConstants.Environment
|
||||||
import org.apache.hadoop.yarn.api.protocolrecords.GetNewApplicationResponse
|
|
||||||
import org.apache.hadoop.yarn.api.records._
|
import org.apache.hadoop.yarn.api.records._
|
||||||
import org.apache.hadoop.yarn.conf.YarnConfiguration
|
import org.apache.hadoop.yarn.conf.YarnConfiguration
|
||||||
import org.mockito.Matchers._
|
import org.mockito.Matchers._
|
||||||
|
@ -117,7 +114,7 @@ class ClientBaseSuite extends FunSuite with Matchers {
|
||||||
doReturn(new Path("/")).when(client).copyFileToRemote(any(classOf[Path]),
|
doReturn(new Path("/")).when(client).copyFileToRemote(any(classOf[Path]),
|
||||||
any(classOf[Path]), anyShort(), anyBoolean())
|
any(classOf[Path]), anyShort(), anyBoolean())
|
||||||
|
|
||||||
val tempDir = Files.createTempDir()
|
val tempDir = Utils.createTempDir()
|
||||||
try {
|
try {
|
||||||
client.prepareLocalResources(tempDir.getAbsolutePath())
|
client.prepareLocalResources(tempDir.getAbsolutePath())
|
||||||
sparkConf.getOption(ClientBase.CONF_SPARK_USER_JAR) should be (Some(USER))
|
sparkConf.getOption(ClientBase.CONF_SPARK_USER_JAR) should be (Some(USER))
|
||||||
|
|
Loading…
Reference in a new issue