Merge pull request #466 from liyinan926/file-overwrite-new

Allow files added through SparkContext.addFile() to be overwritten

This is useful for the cases when a file needs to be refreshed and downloaded by the executors periodically. For example, a possible use case is: the driver periodically renews a Hadoop delegation token and writes it to a token file. The token file needs to be downloaded by the executors whenever it gets renewed. However, the current implementation throws an exception when the target file exists and its contents do not match those of the new source. This PR adds an option to allow files to be overwritten to support use cases similar to the above.
This commit is contained in:
Reynold Xin 2014-01-27 17:08:35 -08:00
commit 84670f2715
2 changed files with 41 additions and 15 deletions

View file

@ -247,6 +247,7 @@ private[spark] object Utils extends Logging {
val tempFile = File.createTempFile("fetchFileTemp", null, new File(tempDir))
val targetFile = new File(targetDir, filename)
val uri = new URI(url)
val fileOverwrite = conf.getBoolean("spark.files.overwrite", false)
uri.getScheme match {
case "http" | "https" | "ftp" =>
logInfo("Fetching " + url + " to " + tempFile)
@ -254,47 +255,65 @@ private[spark] object Utils extends Logging {
val out = new FileOutputStream(tempFile)
Utils.copyStream(in, out, true)
if (targetFile.exists && !Files.equal(tempFile, targetFile)) {
tempFile.delete()
throw new SparkException(
"File " + targetFile + " exists and does not match contents of" + " " + url)
} else {
Files.move(tempFile, targetFile)
if (fileOverwrite) {
targetFile.delete()
logInfo(("File %s exists and does not match contents of %s, " +
"replacing it with %s").format(targetFile, url, url))
} else {
tempFile.delete()
throw new SparkException(
"File " + targetFile + " exists and does not match contents of" + " " + url)
}
}
Files.move(tempFile, targetFile)
case "file" | null =>
// In the case of a local file, copy the local file to the target directory.
// Note the difference between uri vs url.
val sourceFile = if (uri.isAbsolute) new File(uri) else new File(url)
var shouldCopy = true
if (targetFile.exists) {
// If the target file already exists, warn the user if
if (!Files.equal(sourceFile, targetFile)) {
throw new SparkException(
"File " + targetFile + " exists and does not match contents of" + " " + url)
if (fileOverwrite) {
targetFile.delete()
logInfo(("File %s exists and does not match contents of %s, " +
"replacing it with %s").format(targetFile, url, url))
} else {
throw new SparkException(
"File " + targetFile + " exists and does not match contents of" + " " + url)
}
} else {
// Do nothing if the file contents are the same, i.e. this file has been copied
// previously.
logInfo(sourceFile.getAbsolutePath + " has been previously copied to "
+ targetFile.getAbsolutePath)
shouldCopy = false
}
} else {
}
if (shouldCopy) {
// The file does not exist in the target directory. Copy it there.
logInfo("Copying " + sourceFile.getAbsolutePath + " to " + targetFile.getAbsolutePath)
Files.copy(sourceFile, targetFile)
}
case _ =>
// Use the Hadoop filesystem library, which supports file://, hdfs://, s3://, and others
val uri = new URI(url)
val conf = SparkHadoopUtil.get.newConfiguration()
val fs = FileSystem.get(uri, conf)
val in = fs.open(new Path(uri))
val out = new FileOutputStream(tempFile)
Utils.copyStream(in, out, true)
if (targetFile.exists && !Files.equal(tempFile, targetFile)) {
tempFile.delete()
throw new SparkException("File " + targetFile + " exists and does not match contents of" +
" " + url)
} else {
Files.move(tempFile, targetFile)
if (fileOverwrite) {
targetFile.delete()
logInfo(("File %s exists and does not match contents of %s, " +
"replacing it with %s").format(targetFile, url, url))
} else {
tempFile.delete()
throw new SparkException(
"File " + targetFile + " exists and does not match contents of" + " " + url)
}
}
Files.move(tempFile, targetFile)
}
// Decompress the file if it's a .tar or .tar.gz
if (filename.endsWith(".tar.gz") || filename.endsWith(".tgz")) {

View file

@ -454,6 +454,13 @@ Apart from these, the following properties are also available, and may be useful
the whole cluster by default. <br/>
<b>Note:</b> this setting needs to be configured in the standalone cluster master, not in individual
applications; you can set it through <code>SPARK_JAVA_OPTS</code> in <code>spark-env.sh</code>.
</td>
</tr>
<tr>
<td>spark.files.overwrite</td>
<td>false</td>
<td>
Whether to overwrite files added through SparkContext.addFile() when the target file exists and its contents do not match those of the source.
</td>
</tr>
</table>