Skip deletion of files in clearFiles().
This fixes an issue where Spark could delete original files in the current working directory that were added to the job using addFile(). There was also the potential for addFile() to overwrite local files, which is addressed by changing Utils.fetchFile() to log a warning instead of overwriting a file with new contents. This is a short-term fix; a better long-term solution would be to remove the dependence on storing files in the current working directory, since we can't change the cwd from Java.
This commit is contained in:
parent
84587a9bf3
commit
f1bf4f0385
|
@ -419,8 +419,9 @@ class SparkContext(
|
|||
}
|
||||
addedFiles(key) = System.currentTimeMillis
|
||||
|
||||
// Fetch the file locally in case the task is executed locally
|
||||
val filename = new File(path.split("/").last)
|
||||
// Fetch the file locally in case a job is executed locally.
|
||||
// Jobs that run through LocalScheduler will already fetch the required dependencies,
|
||||
// but jobs run in DAGScheduler.runLocally() will not so we must fetch the files here.
|
||||
Utils.fetchFile(path, new File("."))
|
||||
|
||||
logInfo("Added file " + path + " at " + key + " with timestamp " + addedFiles(key))
|
||||
|
@ -437,11 +438,10 @@ class SparkContext(
|
|||
}
|
||||
|
||||
/**
|
||||
* Clear the job's list of files added by `addFile` so that they do not get donwloaded to
|
||||
* Clear the job's list of files added by `addFile` so that they do not get downloaded to
|
||||
* any new nodes.
|
||||
*/
|
||||
def clearFiles() {
|
||||
addedFiles.keySet.map(_.split("/").last).foreach { k => new File(k).delete() }
|
||||
addedFiles.clear()
|
||||
}
|
||||
|
||||
|
@ -465,7 +465,6 @@ class SparkContext(
|
|||
* any new nodes.
|
||||
*/
|
||||
def clearJars() {
|
||||
addedJars.keySet.map(_.split("/").last).foreach { k => new File(k).delete() }
|
||||
addedJars.clear()
|
||||
}
|
||||
|
||||
|
|
|
@ -9,6 +9,7 @@ import org.apache.hadoop.fs.{Path, FileSystem, FileUtil}
|
|||
import scala.collection.mutable.ArrayBuffer
|
||||
import scala.collection.JavaConversions._
|
||||
import scala.io.Source
|
||||
import com.google.common.io.Files
|
||||
|
||||
/**
|
||||
* Various utility methods used by Spark.
|
||||
|
@ -130,28 +131,47 @@ private object Utils extends Logging {
|
|||
*/
|
||||
def fetchFile(url: String, targetDir: File) {
|
||||
val filename = url.split("/").last
|
||||
val tempDir = System.getProperty("spark.local.dir", System.getProperty("java.io.tmpdir"))
|
||||
val tempFile = File.createTempFile("fetchFileTemp", null, new File(tempDir))
|
||||
val targetFile = new File(targetDir, filename)
|
||||
val uri = new URI(url)
|
||||
uri.getScheme match {
|
||||
case "http" | "https" | "ftp" =>
|
||||
logInfo("Fetching " + url + " to " + targetFile)
|
||||
logInfo("Fetching " + url + " to " + tempFile)
|
||||
val in = new URL(url).openStream()
|
||||
val out = new FileOutputStream(targetFile)
|
||||
val out = new FileOutputStream(tempFile)
|
||||
Utils.copyStream(in, out, true)
|
||||
case "file" | null =>
|
||||
// Remove the file if it already exists
|
||||
targetFile.delete()
|
||||
// Symlink the file locally.
|
||||
if (uri.isAbsolute) {
|
||||
// url is absolute, i.e. it starts with "file:///". Extract the source
|
||||
// file's absolute path from the url.
|
||||
val sourceFile = new File(uri)
|
||||
logInfo("Symlinking " + sourceFile.getAbsolutePath + " to " + targetFile.getAbsolutePath)
|
||||
FileUtil.symLink(sourceFile.getAbsolutePath, targetFile.getAbsolutePath)
|
||||
if (targetFile.exists && !Files.equal(tempFile, targetFile)) {
|
||||
logWarning("File " + targetFile + " exists and does not match contents of " + url +
|
||||
"; using existing version")
|
||||
tempFile.delete()
|
||||
} else {
|
||||
// url is not absolute, i.e. itself is the path to the source file.
|
||||
logInfo("Symlinking " + url + " to " + targetFile.getAbsolutePath)
|
||||
FileUtil.symLink(url, targetFile.getAbsolutePath)
|
||||
Files.move(tempFile, targetFile)
|
||||
}
|
||||
case "file" | null =>
|
||||
val sourceFile = if (uri.isAbsolute) {
|
||||
new File(uri)
|
||||
} else {
|
||||
new File(url)
|
||||
}
|
||||
if (targetFile.exists && !Files.equal(sourceFile, targetFile)) {
|
||||
logWarning("File " + targetFile + " exists and does not match contents of " + url +
|
||||
"; using existing version")
|
||||
} else {
|
||||
// Remove the file if it already exists
|
||||
targetFile.delete()
|
||||
// Symlink the file locally.
|
||||
if (uri.isAbsolute) {
|
||||
// url is absolute, i.e. it starts with "file:///". Extract the source
|
||||
// file's absolute path from the url.
|
||||
val sourceFile = new File(uri)
|
||||
logInfo("Symlinking " + sourceFile.getAbsolutePath + " to " + targetFile.getAbsolutePath)
|
||||
FileUtil.symLink(sourceFile.getAbsolutePath, targetFile.getAbsolutePath)
|
||||
} else {
|
||||
// url is not absolute, i.e. itself is the path to the source file.
|
||||
logInfo("Symlinking " + url + " to " + targetFile.getAbsolutePath)
|
||||
FileUtil.symLink(url, targetFile.getAbsolutePath)
|
||||
}
|
||||
}
|
||||
case _ =>
|
||||
// Use the Hadoop filesystem library, which supports file://, hdfs://, s3://, and others
|
||||
|
@ -159,8 +179,15 @@ private object Utils extends Logging {
|
|||
val conf = new Configuration()
|
||||
val fs = FileSystem.get(uri, conf)
|
||||
val in = fs.open(new Path(uri))
|
||||
val out = new FileOutputStream(targetFile)
|
||||
val out = new FileOutputStream(tempFile)
|
||||
Utils.copyStream(in, out, true)
|
||||
if (targetFile.exists && !Files.equal(tempFile, targetFile)) {
|
||||
logWarning("File " + targetFile + " exists and does not match contents of " + url +
|
||||
"; using existing version")
|
||||
tempFile.delete()
|
||||
} else {
|
||||
Files.move(tempFile, targetFile)
|
||||
}
|
||||
}
|
||||
// Decompress the file if it's a .tar or .tar.gz
|
||||
if (filename.endsWith(".tar.gz") || filename.endsWith(".tgz")) {
|
||||
|
|
Loading…
Reference in a new issue