Fixed multiple file stream and checkpointing bugs.
- Made file stream more robust to transient failures. - Changed Spark.setCheckpointDir API to not have the second 'useExisting' parameter. Spark will always create a unique directory for checkpointing underneath the directory provide to the funtion. - Fixed bug wrt local relative paths as checkpoint directory. - Made DStream and RDD checkpointing use SparkContext.hadoopConfiguration, so that more HDFS compatible filesystems are supported for checkpointing.
This commit is contained in:
parent
6169fe14a1
commit
5e9ce83d68
|
@ -19,7 +19,7 @@ package org.apache.spark
|
|||
|
||||
import java.io._
|
||||
import java.net.URI
|
||||
import java.util.Properties
|
||||
import java.util.{UUID, Properties}
|
||||
import java.util.concurrent.atomic.AtomicInteger
|
||||
|
||||
import scala.collection.Map
|
||||
|
@ -857,22 +857,15 @@ class SparkContext(
|
|||
|
||||
/**
|
||||
* Set the directory under which RDDs are going to be checkpointed. The directory must
|
||||
* be a HDFS path if running on a cluster. If the directory does not exist, it will
|
||||
* be created. If the directory exists and useExisting is set to true, then the
|
||||
* exisiting directory will be used. Otherwise an exception will be thrown to
|
||||
* prevent accidental overriding of checkpoint files in the existing directory.
|
||||
* be a HDFS path if running on a cluster.
|
||||
*/
|
||||
def setCheckpointDir(dir: String, useExisting: Boolean = false) {
|
||||
val path = new Path(dir)
|
||||
val fs = path.getFileSystem(SparkHadoopUtil.get.newConfiguration())
|
||||
if (!useExisting) {
|
||||
if (fs.exists(path)) {
|
||||
throw new Exception("Checkpoint directory '" + path + "' already exists.")
|
||||
} else {
|
||||
fs.mkdirs(path)
|
||||
}
|
||||
}
|
||||
checkpointDir = Some(dir)
|
||||
def setCheckpointDir(directory: String) {
|
||||
checkpointDir = Option(directory).map(dir => {
|
||||
val path = new Path(dir, UUID.randomUUID().toString)
|
||||
val fs = path.getFileSystem(hadoopConfiguration)
|
||||
fs.mkdirs(path)
|
||||
fs.getFileStatus(path).getPath().toString
|
||||
})
|
||||
}
|
||||
|
||||
/** Default level of parallelism to use when not given by user (e.g. parallelize and makeRDD). */
|
||||
|
|
|
@ -385,20 +385,7 @@ class JavaSparkContext(val sc: SparkContext) extends JavaSparkContextVarargsWork
|
|||
|
||||
/**
|
||||
* Set the directory under which RDDs are going to be checkpointed. The directory must
|
||||
* be a HDFS path if running on a cluster. If the directory does not exist, it will
|
||||
* be created. If the directory exists and useExisting is set to true, then the
|
||||
* exisiting directory will be used. Otherwise an exception will be thrown to
|
||||
* prevent accidental overriding of checkpoint files in the existing directory.
|
||||
*/
|
||||
def setCheckpointDir(dir: String, useExisting: Boolean) {
|
||||
sc.setCheckpointDir(dir, useExisting)
|
||||
}
|
||||
|
||||
/**
|
||||
* Set the directory under which RDDs are going to be checkpointed. The directory must
|
||||
* be a HDFS path if running on a cluster. If the directory does not exist, it will
|
||||
* be created. If the directory exists, an exception will be thrown to prevent accidental
|
||||
* overriding of checkpoint files.
|
||||
* be a HDFS path if running on a cluster.
|
||||
*/
|
||||
def setCheckpointDir(dir: String) {
|
||||
sc.setCheckpointDir(dir)
|
||||
|
|
|
@ -26,6 +26,7 @@ import org.apache.hadoop.util.ReflectionUtils
|
|||
import org.apache.hadoop.fs.Path
|
||||
import java.io.{File, IOException, EOFException}
|
||||
import java.text.NumberFormat
|
||||
import org.apache.spark.broadcast.Broadcast
|
||||
|
||||
private[spark] class CheckpointRDDPartition(val index: Int) extends Partition {}
|
||||
|
||||
|
@ -36,6 +37,8 @@ private[spark]
|
|||
class CheckpointRDD[T: ClassManifest](sc: SparkContext, val checkpointPath: String)
|
||||
extends RDD[T](sc, Nil) {
|
||||
|
||||
val broadcastedConf = sc.broadcast(new SerializableWritable(sc.hadoopConfiguration))
|
||||
|
||||
@transient val fs = new Path(checkpointPath).getFileSystem(sc.hadoopConfiguration)
|
||||
|
||||
override def getPartitions: Array[Partition] = {
|
||||
|
@ -67,7 +70,7 @@ class CheckpointRDD[T: ClassManifest](sc: SparkContext, val checkpointPath: Stri
|
|||
|
||||
override def compute(split: Partition, context: TaskContext): Iterator[T] = {
|
||||
val file = new Path(checkpointPath, CheckpointRDD.splitIdToFile(split.index))
|
||||
CheckpointRDD.readFromFile(file, context)
|
||||
CheckpointRDD.readFromFile(file, broadcastedConf, context)
|
||||
}
|
||||
|
||||
override def checkpoint() {
|
||||
|
@ -81,10 +84,14 @@ private[spark] object CheckpointRDD extends Logging {
|
|||
"part-%05d".format(splitId)
|
||||
}
|
||||
|
||||
def writeToFile[T](path: String, blockSize: Int = -1)(ctx: TaskContext, iterator: Iterator[T]) {
|
||||
def writeToFile[T](
|
||||
path: String,
|
||||
broadcastedConf: Broadcast[SerializableWritable[Configuration]],
|
||||
blockSize: Int = -1
|
||||
)(ctx: TaskContext, iterator: Iterator[T]) {
|
||||
val env = SparkEnv.get
|
||||
val outputDir = new Path(path)
|
||||
val fs = outputDir.getFileSystem(SparkHadoopUtil.get.newConfiguration())
|
||||
val fs = outputDir.getFileSystem(broadcastedConf.value.value)
|
||||
|
||||
val finalOutputName = splitIdToFile(ctx.partitionId)
|
||||
val finalOutputPath = new Path(outputDir, finalOutputName)
|
||||
|
@ -121,9 +128,13 @@ private[spark] object CheckpointRDD extends Logging {
|
|||
}
|
||||
}
|
||||
|
||||
def readFromFile[T](path: Path, context: TaskContext): Iterator[T] = {
|
||||
def readFromFile[T](
|
||||
path: Path,
|
||||
broadcastedConf: Broadcast[SerializableWritable[Configuration]],
|
||||
context: TaskContext
|
||||
): Iterator[T] = {
|
||||
val env = SparkEnv.get
|
||||
val fs = path.getFileSystem(SparkHadoopUtil.get.newConfiguration())
|
||||
val fs = path.getFileSystem(broadcastedConf.value.value)
|
||||
val bufferSize = System.getProperty("spark.buffer.size", "65536").toInt
|
||||
val fileInputStream = fs.open(path, bufferSize)
|
||||
val serializer = env.serializer.newInstance()
|
||||
|
@ -146,8 +157,10 @@ private[spark] object CheckpointRDD extends Logging {
|
|||
val sc = new SparkContext(cluster, "CheckpointRDD Test")
|
||||
val rdd = sc.makeRDD(1 to 10, 10).flatMap(x => 1 to 10000)
|
||||
val path = new Path(hdfsPath, "temp")
|
||||
val fs = path.getFileSystem(SparkHadoopUtil.get.newConfiguration())
|
||||
sc.runJob(rdd, CheckpointRDD.writeToFile(path.toString, 1024) _)
|
||||
val conf = SparkHadoopUtil.get.newConfiguration()
|
||||
val fs = path.getFileSystem(conf)
|
||||
val broadcastedConf = sc.broadcast(new SerializableWritable(conf))
|
||||
sc.runJob(rdd, CheckpointRDD.writeToFile(path.toString, broadcastedConf, 1024) _)
|
||||
val cpRDD = new CheckpointRDD[Int](sc, path.toString)
|
||||
assert(cpRDD.partitions.length == rdd.partitions.length, "Number of partitions is not the same")
|
||||
assert(cpRDD.collect.toList == rdd.collect.toList, "Data of partitions not the same")
|
||||
|
|
|
@ -20,7 +20,7 @@ package org.apache.spark.rdd
|
|||
import org.apache.hadoop.fs.Path
|
||||
import org.apache.hadoop.conf.Configuration
|
||||
|
||||
import org.apache.spark.{Partition, SparkException, Logging}
|
||||
import org.apache.spark.{SerializableWritable, Partition, SparkException, Logging}
|
||||
import org.apache.spark.scheduler.{ResultTask, ShuffleMapTask}
|
||||
|
||||
/**
|
||||
|
@ -83,14 +83,20 @@ private[spark] class RDDCheckpointData[T: ClassManifest](rdd: RDD[T])
|
|||
|
||||
// Create the output path for the checkpoint
|
||||
val path = new Path(rdd.context.checkpointDir.get, "rdd-" + rdd.id)
|
||||
val fs = path.getFileSystem(new Configuration())
|
||||
val fs = path.getFileSystem(rdd.context.hadoopConfiguration)
|
||||
if (!fs.mkdirs(path)) {
|
||||
throw new SparkException("Failed to create checkpoint path " + path)
|
||||
}
|
||||
|
||||
// Save to file, and reload it as an RDD
|
||||
rdd.context.runJob(rdd, CheckpointRDD.writeToFile(path.toString) _)
|
||||
val broadcastedConf = rdd.context.broadcast(new SerializableWritable(rdd.context.hadoopConfiguration))
|
||||
rdd.context.runJob(rdd, CheckpointRDD.writeToFile(path.toString, broadcastedConf) _)
|
||||
val newRDD = new CheckpointRDD[T](rdd.context, path.toString)
|
||||
if (newRDD.partitions.size != rdd.partitions.size) {
|
||||
throw new Exception(
|
||||
"Checkpoint RDD " + newRDD + "("+ newRDD.partitions.size + ") has different " +
|
||||
"number of partitions than original RDD " + rdd + "(" + rdd.partitions.size + ")")
|
||||
}
|
||||
|
||||
// Change the dependencies and partitions of the RDD
|
||||
RDDCheckpointData.synchronized {
|
||||
|
@ -99,8 +105,8 @@ private[spark] class RDDCheckpointData[T: ClassManifest](rdd: RDD[T])
|
|||
rdd.markCheckpointed(newRDD) // Update the RDD's dependencies and partitions
|
||||
cpState = Checkpointed
|
||||
RDDCheckpointData.clearTaskCaches()
|
||||
logInfo("Done checkpointing RDD " + rdd.id + ", new parent is RDD " + newRDD.id)
|
||||
}
|
||||
logInfo("Done checkpointing RDD " + rdd.id + " to " + path + ", new parent is RDD " + newRDD.id)
|
||||
}
|
||||
|
||||
// Get preferred location of a split after checkpointing
|
||||
|
|
|
@ -851,7 +851,7 @@ public class JavaAPISuite implements Serializable {
|
|||
public void checkpointAndComputation() {
|
||||
File tempDir = Files.createTempDir();
|
||||
JavaRDD<Integer> rdd = sc.parallelize(Arrays.asList(1, 2, 3, 4, 5));
|
||||
sc.setCheckpointDir(tempDir.getAbsolutePath(), true);
|
||||
sc.setCheckpointDir(tempDir.getAbsolutePath());
|
||||
Assert.assertEquals(false, rdd.isCheckpointed());
|
||||
rdd.checkpoint();
|
||||
rdd.count(); // Forces the DAG to cause a checkpoint
|
||||
|
@ -863,7 +863,7 @@ public class JavaAPISuite implements Serializable {
|
|||
public void checkpointAndRestore() {
|
||||
File tempDir = Files.createTempDir();
|
||||
JavaRDD<Integer> rdd = sc.parallelize(Arrays.asList(1, 2, 3, 4, 5));
|
||||
sc.setCheckpointDir(tempDir.getAbsolutePath(), true);
|
||||
sc.setCheckpointDir(tempDir.getAbsolutePath());
|
||||
Assert.assertEquals(false, rdd.isCheckpointed());
|
||||
rdd.checkpoint();
|
||||
rdd.count(); // Forces the DAG to cause a checkpoint
|
||||
|
|
|
@ -27,6 +27,7 @@ import org.apache.hadoop.conf.Configuration
|
|||
import org.apache.spark.Logging
|
||||
import org.apache.spark.io.CompressionCodec
|
||||
import org.apache.spark.util.MetadataCleaner
|
||||
import org.apache.spark.deploy.SparkHadoopUtil
|
||||
|
||||
|
||||
private[streaming]
|
||||
|
@ -57,7 +58,7 @@ class Checkpoint(@transient ssc: StreamingContext, val checkpointTime: Time)
|
|||
* Convenience class to speed up the writing of graph checkpoint to file
|
||||
*/
|
||||
private[streaming]
|
||||
class CheckpointWriter(checkpointDir: String) extends Logging {
|
||||
class CheckpointWriter(checkpointDir: String, hadoopConf: Configuration) extends Logging {
|
||||
val file = new Path(checkpointDir, "graph")
|
||||
// The file to which we actually write - and then "move" to file.
|
||||
private val writeFile = new Path(file.getParent, file.getName + ".next")
|
||||
|
@ -65,8 +66,7 @@ class CheckpointWriter(checkpointDir: String) extends Logging {
|
|||
|
||||
private var stopped = false
|
||||
|
||||
val conf = new Configuration()
|
||||
var fs = file.getFileSystem(conf)
|
||||
var fs = file.getFileSystem(hadoopConf)
|
||||
val maxAttempts = 3
|
||||
val executor = Executors.newFixedThreadPool(1)
|
||||
|
||||
|
|
|
@ -29,7 +29,7 @@ class Scheduler(ssc: StreamingContext) extends Logging {
|
|||
val concurrentJobs = System.getProperty("spark.streaming.concurrentJobs", "1").toInt
|
||||
val jobManager = new JobManager(ssc, concurrentJobs)
|
||||
val checkpointWriter = if (ssc.checkpointDuration != null && ssc.checkpointDir != null) {
|
||||
new CheckpointWriter(ssc.checkpointDir)
|
||||
new CheckpointWriter(ssc.checkpointDir, ssc.sparkContext.hadoopConfiguration)
|
||||
} else {
|
||||
null
|
||||
}
|
||||
|
|
|
@ -46,6 +46,7 @@ import org.apache.hadoop.mapreduce.lib.input.TextInputFormat
|
|||
import org.apache.hadoop.fs.Path
|
||||
import twitter4j.Status
|
||||
import twitter4j.auth.Authorization
|
||||
import org.apache.spark.deploy.SparkHadoopUtil
|
||||
|
||||
|
||||
/**
|
||||
|
@ -85,7 +86,6 @@ class StreamingContext private (
|
|||
null, batchDuration)
|
||||
}
|
||||
|
||||
|
||||
/**
|
||||
* Re-create a StreamingContext from a checkpoint file.
|
||||
* @param path Path either to the directory that was specified as the checkpoint directory, or
|
||||
|
@ -139,7 +139,7 @@ class StreamingContext private (
|
|||
|
||||
protected[streaming] var checkpointDir: String = {
|
||||
if (isCheckpointPresent) {
|
||||
sc.setCheckpointDir(StreamingContext.getSparkCheckpointDir(cp_.checkpointDir), true)
|
||||
sc.setCheckpointDir(cp_.checkpointDir)
|
||||
cp_.checkpointDir
|
||||
} else {
|
||||
null
|
||||
|
@ -173,8 +173,12 @@ class StreamingContext private (
|
|||
*/
|
||||
def checkpoint(directory: String) {
|
||||
if (directory != null) {
|
||||
sc.setCheckpointDir(StreamingContext.getSparkCheckpointDir(directory))
|
||||
checkpointDir = directory
|
||||
val path = new Path(directory)
|
||||
val fs = path.getFileSystem(sparkContext.hadoopConfiguration)
|
||||
fs.mkdirs(path)
|
||||
val fullPath = fs.getFileStatus(path).getPath().toString
|
||||
sc.setCheckpointDir(fullPath)
|
||||
checkpointDir = fullPath
|
||||
} else {
|
||||
checkpointDir = null
|
||||
}
|
||||
|
@ -595,8 +599,9 @@ object StreamingContext {
|
|||
prefix + "-" + time.milliseconds + "." + suffix
|
||||
}
|
||||
}
|
||||
|
||||
/*
|
||||
protected[streaming] def getSparkCheckpointDir(sscCheckpointDir: String): String = {
|
||||
new Path(sscCheckpointDir, UUID.randomUUID.toString).toString
|
||||
}
|
||||
*/
|
||||
}
|
||||
|
|
|
@ -39,8 +39,8 @@ class FileInputDStream[K: ClassManifest, V: ClassManifest, F <: NewInputFormat[K
|
|||
protected[streaming] override val checkpointData = new FileInputDStreamCheckpointData
|
||||
|
||||
// Latest file mod time seen till any point of time
|
||||
private val lastModTimeFiles = new HashSet[String]()
|
||||
private var lastModTime = 0L
|
||||
private val prevModTimeFiles = new HashSet[String]()
|
||||
private var prevModTime = 0L
|
||||
|
||||
@transient private var path_ : Path = null
|
||||
@transient private var fs_ : FileSystem = null
|
||||
|
@ -48,11 +48,11 @@ class FileInputDStream[K: ClassManifest, V: ClassManifest, F <: NewInputFormat[K
|
|||
|
||||
override def start() {
|
||||
if (newFilesOnly) {
|
||||
lastModTime = graph.zeroTime.milliseconds
|
||||
prevModTime = graph.zeroTime.milliseconds
|
||||
} else {
|
||||
lastModTime = 0
|
||||
prevModTime = 0
|
||||
}
|
||||
logDebug("LastModTime initialized to " + lastModTime + ", new files only = " + newFilesOnly)
|
||||
logDebug("LastModTime initialized to " + prevModTime + ", new files only = " + newFilesOnly)
|
||||
}
|
||||
|
||||
override def stop() { }
|
||||
|
@ -67,55 +67,22 @@ class FileInputDStream[K: ClassManifest, V: ClassManifest, F <: NewInputFormat[K
|
|||
* the previous call.
|
||||
*/
|
||||
override def compute(validTime: Time): Option[RDD[(K, V)]] = {
|
||||
assert(validTime.milliseconds >= lastModTime, "Trying to get new files for really old time [" + validTime + " < " + lastModTime)
|
||||
assert(validTime.milliseconds >= prevModTime,
|
||||
"Trying to get new files for really old time [" + validTime + " < " + prevModTime)
|
||||
|
||||
// Create the filter for selecting new files
|
||||
val newFilter = new PathFilter() {
|
||||
// Latest file mod time seen in this round of fetching files and its corresponding files
|
||||
var latestModTime = 0L
|
||||
val latestModTimeFiles = new HashSet[String]()
|
||||
|
||||
def accept(path: Path): Boolean = {
|
||||
if (!filter(path)) { // Reject file if it does not satisfy filter
|
||||
logDebug("Rejected by filter " + path)
|
||||
return false
|
||||
} else { // Accept file only if
|
||||
val modTime = fs.getFileStatus(path).getModificationTime()
|
||||
logDebug("Mod time for " + path + " is " + modTime)
|
||||
if (modTime < lastModTime) {
|
||||
logDebug("Mod time less than last mod time")
|
||||
return false // If the file was created before the last time it was called
|
||||
} else if (modTime == lastModTime && lastModTimeFiles.contains(path.toString)) {
|
||||
logDebug("Mod time equal to last mod time, but file considered already")
|
||||
return false // If the file was created exactly as lastModTime but not reported yet
|
||||
} else if (modTime > validTime.milliseconds) {
|
||||
logDebug("Mod time more than valid time")
|
||||
return false // If the file was created after the time this function call requires
|
||||
}
|
||||
if (modTime > latestModTime) {
|
||||
latestModTime = modTime
|
||||
latestModTimeFiles.clear()
|
||||
logDebug("Latest mod time updated to " + latestModTime)
|
||||
}
|
||||
latestModTimeFiles += path.toString
|
||||
logDebug("Accepted " + path)
|
||||
return true
|
||||
}
|
||||
}
|
||||
}
|
||||
logDebug("Finding new files at time " + validTime + " for last mod time = " + lastModTime)
|
||||
val newFiles = fs.listStatus(path, newFilter).map(_.getPath.toString)
|
||||
// Find new files
|
||||
val (newFiles, latestModTime, latestModTimeFiles) = findNewFiles(validTime.milliseconds)
|
||||
logInfo("New files at time " + validTime + ":\n" + newFiles.mkString("\n"))
|
||||
if (newFiles.length > 0) {
|
||||
// Update the modification time and the files processed for that modification time
|
||||
if (lastModTime != newFilter.latestModTime) {
|
||||
lastModTime = newFilter.latestModTime
|
||||
lastModTimeFiles.clear()
|
||||
if (prevModTime < latestModTime) {
|
||||
prevModTime = latestModTime
|
||||
prevModTimeFiles.clear()
|
||||
}
|
||||
lastModTimeFiles ++= newFilter.latestModTimeFiles
|
||||
logDebug("Last mod time updated to " + lastModTime)
|
||||
prevModTimeFiles ++= latestModTimeFiles
|
||||
logDebug("Last mod time updated to " + prevModTime)
|
||||
}
|
||||
files += ((validTime, newFiles))
|
||||
files += ((validTime, newFiles.toArray))
|
||||
Some(filesToRDD(newFiles))
|
||||
}
|
||||
|
||||
|
@ -130,8 +97,30 @@ class FileInputDStream[K: ClassManifest, V: ClassManifest, F <: NewInputFormat[K
|
|||
oldFiles.map(p => (p._1, p._2.mkString(", "))).mkString("\n"))
|
||||
}
|
||||
|
||||
/**
|
||||
* Finds files which have modification timestamp <= current time. If some files are being
|
||||
* deleted in the directory, then it can generate transient exceptions. Hence, multiple
|
||||
* attempts are made to handle these transient exceptions. Returns 3-tuple
|
||||
* (new files found, latest modification time among them, files with latest modification time)
|
||||
*/
|
||||
private def findNewFiles(currentTime: Long): (Seq[String], Long, Seq[String]) = {
|
||||
logDebug("Trying to get new files for time " + currentTime)
|
||||
var attempts = 0
|
||||
while (attempts < FileInputDStream.MAX_ATTEMPTS) {
|
||||
attempts += 1
|
||||
try {
|
||||
val filter = new CustomPathFilter(currentTime)
|
||||
val newFiles = fs.listStatus(path, filter)
|
||||
return (newFiles.map(_.getPath.toString), filter.latestModTime, filter.latestModTimeFiles.toSeq)
|
||||
} catch {
|
||||
case ioe: IOException => logWarning("Attempt " + attempts + " to get new files failed", ioe)
|
||||
}
|
||||
}
|
||||
(Seq(), -1, Seq())
|
||||
}
|
||||
|
||||
/** Generate one RDD from an array of files */
|
||||
protected[streaming] def filesToRDD(files: Seq[String]): RDD[(K, V)] = {
|
||||
private def filesToRDD(files: Seq[String]): RDD[(K, V)] = {
|
||||
new UnionRDD(
|
||||
context.sparkContext,
|
||||
files.map(file => context.sparkContext.newAPIHadoopFile[K, V, F](file))
|
||||
|
@ -189,10 +178,51 @@ class FileInputDStream[K: ClassManifest, V: ClassManifest, F <: NewInputFormat[K
|
|||
hadoopFiles.map(p => (p._1, p._2.mkString(", "))).mkString("\n") + "\n]"
|
||||
}
|
||||
}
|
||||
|
||||
/**
|
||||
* PathFilter to find new files that have modification timestamps <= current time, but have not
|
||||
* been seen before (i.e. the file should not be in lastModTimeFiles)
|
||||
* @param currentTime
|
||||
*/
|
||||
private[streaming]
|
||||
class CustomPathFilter(currentTime: Long) extends PathFilter() {
|
||||
// Latest file mod time seen in this round of fetching files and its corresponding files
|
||||
var latestModTime = 0L
|
||||
val latestModTimeFiles = new HashSet[String]()
|
||||
|
||||
def accept(path: Path): Boolean = {
|
||||
if (!filter(path)) { // Reject file if it does not satisfy filter
|
||||
logDebug("Rejected by filter " + path)
|
||||
return false
|
||||
} else { // Accept file only if
|
||||
val modTime = fs.getFileStatus(path).getModificationTime()
|
||||
logDebug("Mod time for " + path + " is " + modTime)
|
||||
if (modTime < prevModTime) {
|
||||
logDebug("Mod time less than last mod time")
|
||||
return false // If the file was created before the last time it was called
|
||||
} else if (modTime == prevModTime && prevModTimeFiles.contains(path.toString)) {
|
||||
logDebug("Mod time equal to last mod time, but file considered already")
|
||||
return false // If the file was created exactly as lastModTime but not reported yet
|
||||
} else if (modTime > currentTime) {
|
||||
logDebug("Mod time more than valid time")
|
||||
return false // If the file was created after the time this function call requires
|
||||
}
|
||||
if (modTime > latestModTime) {
|
||||
latestModTime = modTime
|
||||
latestModTimeFiles.clear()
|
||||
logDebug("Latest mod time updated to " + latestModTime)
|
||||
}
|
||||
latestModTimeFiles += path.toString
|
||||
logDebug("Accepted " + path)
|
||||
return true
|
||||
}
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
private[streaming]
|
||||
object FileInputDStream {
|
||||
val MAX_ATTEMPTS = 10
|
||||
def defaultFilter(path: Path): Boolean = !path.getName().startsWith(".")
|
||||
}
|
||||
|
||||
|
|
|
@ -25,8 +25,10 @@ import org.scalatest.BeforeAndAfter
|
|||
import org.apache.commons.io.FileUtils
|
||||
import collection.mutable.{SynchronizedBuffer, ArrayBuffer}
|
||||
import util.{Clock, ManualClock}
|
||||
import scala.util.Random
|
||||
import com.google.common.io.Files
|
||||
import org.apache.hadoop.fs.{Path, FileSystem}
|
||||
import org.apache.hadoop.conf.Configuration
|
||||
|
||||
|
||||
|
||||
/**
|
||||
|
@ -44,7 +46,7 @@ class CheckpointSuite extends TestSuiteBase with BeforeAndAfter {
|
|||
|
||||
after {
|
||||
if (ssc != null) ssc.stop()
|
||||
FileUtils.deleteDirectory(new File(checkpointDir))
|
||||
//FileUtils.deleteDirectory(new File(checkpointDir))
|
||||
|
||||
// To avoid Akka rebinding to the same port, since it doesn't unbind immediately on shutdown
|
||||
System.clearProperty("spark.driver.port")
|
||||
|
@ -66,7 +68,7 @@ class CheckpointSuite extends TestSuiteBase with BeforeAndAfter {
|
|||
System.setProperty("spark.streaming.clock", "org.apache.spark.streaming.util.ManualClock")
|
||||
|
||||
val stateStreamCheckpointInterval = Seconds(1)
|
||||
|
||||
val fs = FileSystem.getLocal(new Configuration())
|
||||
// this ensure checkpointing occurs at least once
|
||||
val firstNumBatches = (stateStreamCheckpointInterval / batchDuration).toLong * 2
|
||||
val secondNumBatches = firstNumBatches
|
||||
|
@ -90,11 +92,12 @@ class CheckpointSuite extends TestSuiteBase with BeforeAndAfter {
|
|||
ssc.start()
|
||||
advanceTimeWithRealDelay(ssc, firstNumBatches)
|
||||
logInfo("Checkpoint data of state stream = \n" + stateStream.checkpointData)
|
||||
assert(!stateStream.checkpointData.checkpointFiles.isEmpty, "No checkpointed RDDs in state stream before first failure")
|
||||
assert(!stateStream.checkpointData.checkpointFiles.isEmpty,
|
||||
"No checkpointed RDDs in state stream before first failure")
|
||||
stateStream.checkpointData.checkpointFiles.foreach {
|
||||
case (time, data) => {
|
||||
val file = new File(data.toString)
|
||||
assert(file.exists(), "Checkpoint file '" + file +"' for time " + time + " for state stream before first failure does not exist")
|
||||
case (time, file) => {
|
||||
assert(fs.exists(new Path(file)), "Checkpoint file '" + file +"' for time " + time +
|
||||
" for state stream before first failure does not exist")
|
||||
}
|
||||
}
|
||||
|
||||
|
@ -102,7 +105,8 @@ class CheckpointSuite extends TestSuiteBase with BeforeAndAfter {
|
|||
// and check whether the earlier checkpoint files are deleted
|
||||
val checkpointFiles = stateStream.checkpointData.checkpointFiles.map(x => new File(x._2))
|
||||
advanceTimeWithRealDelay(ssc, secondNumBatches)
|
||||
checkpointFiles.foreach(file => assert(!file.exists, "Checkpoint file '" + file + "' was not deleted"))
|
||||
checkpointFiles.foreach(file =>
|
||||
assert(!file.exists, "Checkpoint file '" + file + "' was not deleted"))
|
||||
ssc.stop()
|
||||
|
||||
// Restart stream computation using the checkpoint file and check whether
|
||||
|
@ -110,19 +114,20 @@ class CheckpointSuite extends TestSuiteBase with BeforeAndAfter {
|
|||
ssc = new StreamingContext(checkpointDir)
|
||||
stateStream = ssc.graph.getOutputStreams().head.dependencies.head.dependencies.head
|
||||
logInfo("Restored data of state stream = \n[" + stateStream.generatedRDDs.mkString("\n") + "]")
|
||||
assert(!stateStream.generatedRDDs.isEmpty, "No restored RDDs in state stream after recovery from first failure")
|
||||
assert(!stateStream.generatedRDDs.isEmpty,
|
||||
"No restored RDDs in state stream after recovery from first failure")
|
||||
|
||||
|
||||
// Run one batch to generate a new checkpoint file and check whether some RDD
|
||||
// is present in the checkpoint data or not
|
||||
ssc.start()
|
||||
advanceTimeWithRealDelay(ssc, 1)
|
||||
assert(!stateStream.checkpointData.checkpointFiles.isEmpty, "No checkpointed RDDs in state stream before second failure")
|
||||
assert(!stateStream.checkpointData.checkpointFiles.isEmpty,
|
||||
"No checkpointed RDDs in state stream before second failure")
|
||||
stateStream.checkpointData.checkpointFiles.foreach {
|
||||
case (time, data) => {
|
||||
val file = new File(data.toString)
|
||||
assert(file.exists(),
|
||||
"Checkpoint file '" + file +"' for time " + time + " for state stream before seconds failure does not exist")
|
||||
case (time, file) => {
|
||||
assert(fs.exists(new Path(file)), "Checkpoint file '" + file +"' for time " + time +
|
||||
" for state stream before seconds failure does not exist")
|
||||
}
|
||||
}
|
||||
ssc.stop()
|
||||
|
@ -132,7 +137,8 @@ class CheckpointSuite extends TestSuiteBase with BeforeAndAfter {
|
|||
ssc = new StreamingContext(checkpointDir)
|
||||
stateStream = ssc.graph.getOutputStreams().head.dependencies.head.dependencies.head
|
||||
logInfo("Restored data of state stream = \n[" + stateStream.generatedRDDs.mkString("\n") + "]")
|
||||
assert(!stateStream.generatedRDDs.isEmpty, "No restored RDDs in state stream after recovery from second failure")
|
||||
assert(!stateStream.generatedRDDs.isEmpty,
|
||||
"No restored RDDs in state stream after recovery from second failure")
|
||||
|
||||
// Adjust manual clock time as if it is being restarted after a delay
|
||||
System.setProperty("spark.streaming.manualClock.jump", (batchDuration.milliseconds * 7).toString)
|
||||
|
@ -143,6 +149,7 @@ class CheckpointSuite extends TestSuiteBase with BeforeAndAfter {
|
|||
ssc = null
|
||||
}
|
||||
|
||||
|
||||
// This tests whether the systm can recover from a master failure with simple
|
||||
// non-stateful operations. This assumes as reliable, replayable input
|
||||
// source - TestInputDStream.
|
||||
|
@ -191,6 +198,7 @@ class CheckpointSuite extends TestSuiteBase with BeforeAndAfter {
|
|||
testCheckpointedOperation(input, operation, output, 7)
|
||||
}
|
||||
|
||||
|
||||
// This tests whether file input stream remembers what files were seen before
|
||||
// the master failure and uses them again to process a large window operation.
|
||||
// It also tests whether batches, whose processing was incomplete due to the
|
||||
|
|
Loading…
Reference in a new issue