From 5e9ce83d682d6198cda4631faf11cb53fcccf07f Mon Sep 17 00:00:00 2001 From: Tathagata Das Date: Wed, 11 Dec 2013 14:01:36 -0800 Subject: [PATCH] Fixed multiple file stream and checkpointing bugs. - Made file stream more robust to transient failures. - Changed Spark.setCheckpointDir API to not have the second 'useExisting' parameter. Spark will always create a unique directory for checkpointing underneath the directory provide to the funtion. - Fixed bug wrt local relative paths as checkpoint directory. - Made DStream and RDD checkpointing use SparkContext.hadoopConfiguration, so that more HDFS compatible filesystems are supported for checkpointing. --- .../scala/org/apache/spark/SparkContext.scala | 25 ++-- .../spark/api/java/JavaSparkContext.scala | 15 +- .../org/apache/spark/rdd/CheckpointRDD.scala | 27 +++- .../apache/spark/rdd/RDDCheckpointData.scala | 14 +- .../scala/org/apache/spark/JavaAPISuite.java | 4 +- .../apache/spark/streaming/Checkpoint.scala | 6 +- .../apache/spark/streaming/Scheduler.scala | 2 +- .../spark/streaming/StreamingContext.scala | 15 +- .../streaming/dstream/FileInputDStream.scala | 128 +++++++++++------- .../spark/streaming/CheckpointSuite.scala | 38 ++++-- 10 files changed, 158 insertions(+), 116 deletions(-) diff --git a/core/src/main/scala/org/apache/spark/SparkContext.scala b/core/src/main/scala/org/apache/spark/SparkContext.scala index 66006bf212..1811bfa1e5 100644 --- a/core/src/main/scala/org/apache/spark/SparkContext.scala +++ b/core/src/main/scala/org/apache/spark/SparkContext.scala @@ -19,7 +19,7 @@ package org.apache.spark import java.io._ import java.net.URI -import java.util.Properties +import java.util.{UUID, Properties} import java.util.concurrent.atomic.AtomicInteger import scala.collection.Map @@ -857,22 +857,15 @@ class SparkContext( /** * Set the directory under which RDDs are going to be checkpointed. The directory must - * be a HDFS path if running on a cluster. If the directory does not exist, it will - * be created. If the directory exists and useExisting is set to true, then the - * exisiting directory will be used. Otherwise an exception will be thrown to - * prevent accidental overriding of checkpoint files in the existing directory. + * be a HDFS path if running on a cluster. */ - def setCheckpointDir(dir: String, useExisting: Boolean = false) { - val path = new Path(dir) - val fs = path.getFileSystem(SparkHadoopUtil.get.newConfiguration()) - if (!useExisting) { - if (fs.exists(path)) { - throw new Exception("Checkpoint directory '" + path + "' already exists.") - } else { - fs.mkdirs(path) - } - } - checkpointDir = Some(dir) + def setCheckpointDir(directory: String) { + checkpointDir = Option(directory).map(dir => { + val path = new Path(dir, UUID.randomUUID().toString) + val fs = path.getFileSystem(hadoopConfiguration) + fs.mkdirs(path) + fs.getFileStatus(path).getPath().toString + }) } /** Default level of parallelism to use when not given by user (e.g. parallelize and makeRDD). */ diff --git a/core/src/main/scala/org/apache/spark/api/java/JavaSparkContext.scala b/core/src/main/scala/org/apache/spark/api/java/JavaSparkContext.scala index 8869e072bf..c63db4970b 100644 --- a/core/src/main/scala/org/apache/spark/api/java/JavaSparkContext.scala +++ b/core/src/main/scala/org/apache/spark/api/java/JavaSparkContext.scala @@ -385,20 +385,7 @@ class JavaSparkContext(val sc: SparkContext) extends JavaSparkContextVarargsWork /** * Set the directory under which RDDs are going to be checkpointed. The directory must - * be a HDFS path if running on a cluster. If the directory does not exist, it will - * be created. If the directory exists and useExisting is set to true, then the - * exisiting directory will be used. Otherwise an exception will be thrown to - * prevent accidental overriding of checkpoint files in the existing directory. - */ - def setCheckpointDir(dir: String, useExisting: Boolean) { - sc.setCheckpointDir(dir, useExisting) - } - - /** - * Set the directory under which RDDs are going to be checkpointed. The directory must - * be a HDFS path if running on a cluster. If the directory does not exist, it will - * be created. If the directory exists, an exception will be thrown to prevent accidental - * overriding of checkpoint files. + * be a HDFS path if running on a cluster. */ def setCheckpointDir(dir: String) { sc.setCheckpointDir(dir) diff --git a/core/src/main/scala/org/apache/spark/rdd/CheckpointRDD.scala b/core/src/main/scala/org/apache/spark/rdd/CheckpointRDD.scala index d3033ea4a6..ef4057e2a2 100644 --- a/core/src/main/scala/org/apache/spark/rdd/CheckpointRDD.scala +++ b/core/src/main/scala/org/apache/spark/rdd/CheckpointRDD.scala @@ -26,6 +26,7 @@ import org.apache.hadoop.util.ReflectionUtils import org.apache.hadoop.fs.Path import java.io.{File, IOException, EOFException} import java.text.NumberFormat +import org.apache.spark.broadcast.Broadcast private[spark] class CheckpointRDDPartition(val index: Int) extends Partition {} @@ -36,6 +37,8 @@ private[spark] class CheckpointRDD[T: ClassManifest](sc: SparkContext, val checkpointPath: String) extends RDD[T](sc, Nil) { + val broadcastedConf = sc.broadcast(new SerializableWritable(sc.hadoopConfiguration)) + @transient val fs = new Path(checkpointPath).getFileSystem(sc.hadoopConfiguration) override def getPartitions: Array[Partition] = { @@ -67,7 +70,7 @@ class CheckpointRDD[T: ClassManifest](sc: SparkContext, val checkpointPath: Stri override def compute(split: Partition, context: TaskContext): Iterator[T] = { val file = new Path(checkpointPath, CheckpointRDD.splitIdToFile(split.index)) - CheckpointRDD.readFromFile(file, context) + CheckpointRDD.readFromFile(file, broadcastedConf, context) } override def checkpoint() { @@ -81,10 +84,14 @@ private[spark] object CheckpointRDD extends Logging { "part-%05d".format(splitId) } - def writeToFile[T](path: String, blockSize: Int = -1)(ctx: TaskContext, iterator: Iterator[T]) { + def writeToFile[T]( + path: String, + broadcastedConf: Broadcast[SerializableWritable[Configuration]], + blockSize: Int = -1 + )(ctx: TaskContext, iterator: Iterator[T]) { val env = SparkEnv.get val outputDir = new Path(path) - val fs = outputDir.getFileSystem(SparkHadoopUtil.get.newConfiguration()) + val fs = outputDir.getFileSystem(broadcastedConf.value.value) val finalOutputName = splitIdToFile(ctx.partitionId) val finalOutputPath = new Path(outputDir, finalOutputName) @@ -121,9 +128,13 @@ private[spark] object CheckpointRDD extends Logging { } } - def readFromFile[T](path: Path, context: TaskContext): Iterator[T] = { + def readFromFile[T]( + path: Path, + broadcastedConf: Broadcast[SerializableWritable[Configuration]], + context: TaskContext + ): Iterator[T] = { val env = SparkEnv.get - val fs = path.getFileSystem(SparkHadoopUtil.get.newConfiguration()) + val fs = path.getFileSystem(broadcastedConf.value.value) val bufferSize = System.getProperty("spark.buffer.size", "65536").toInt val fileInputStream = fs.open(path, bufferSize) val serializer = env.serializer.newInstance() @@ -146,8 +157,10 @@ private[spark] object CheckpointRDD extends Logging { val sc = new SparkContext(cluster, "CheckpointRDD Test") val rdd = sc.makeRDD(1 to 10, 10).flatMap(x => 1 to 10000) val path = new Path(hdfsPath, "temp") - val fs = path.getFileSystem(SparkHadoopUtil.get.newConfiguration()) - sc.runJob(rdd, CheckpointRDD.writeToFile(path.toString, 1024) _) + val conf = SparkHadoopUtil.get.newConfiguration() + val fs = path.getFileSystem(conf) + val broadcastedConf = sc.broadcast(new SerializableWritable(conf)) + sc.runJob(rdd, CheckpointRDD.writeToFile(path.toString, broadcastedConf, 1024) _) val cpRDD = new CheckpointRDD[Int](sc, path.toString) assert(cpRDD.partitions.length == rdd.partitions.length, "Number of partitions is not the same") assert(cpRDD.collect.toList == rdd.collect.toList, "Data of partitions not the same") diff --git a/core/src/main/scala/org/apache/spark/rdd/RDDCheckpointData.scala b/core/src/main/scala/org/apache/spark/rdd/RDDCheckpointData.scala index 6009a41570..3160ab95c4 100644 --- a/core/src/main/scala/org/apache/spark/rdd/RDDCheckpointData.scala +++ b/core/src/main/scala/org/apache/spark/rdd/RDDCheckpointData.scala @@ -20,7 +20,7 @@ package org.apache.spark.rdd import org.apache.hadoop.fs.Path import org.apache.hadoop.conf.Configuration -import org.apache.spark.{Partition, SparkException, Logging} +import org.apache.spark.{SerializableWritable, Partition, SparkException, Logging} import org.apache.spark.scheduler.{ResultTask, ShuffleMapTask} /** @@ -83,14 +83,20 @@ private[spark] class RDDCheckpointData[T: ClassManifest](rdd: RDD[T]) // Create the output path for the checkpoint val path = new Path(rdd.context.checkpointDir.get, "rdd-" + rdd.id) - val fs = path.getFileSystem(new Configuration()) + val fs = path.getFileSystem(rdd.context.hadoopConfiguration) if (!fs.mkdirs(path)) { throw new SparkException("Failed to create checkpoint path " + path) } // Save to file, and reload it as an RDD - rdd.context.runJob(rdd, CheckpointRDD.writeToFile(path.toString) _) + val broadcastedConf = rdd.context.broadcast(new SerializableWritable(rdd.context.hadoopConfiguration)) + rdd.context.runJob(rdd, CheckpointRDD.writeToFile(path.toString, broadcastedConf) _) val newRDD = new CheckpointRDD[T](rdd.context, path.toString) + if (newRDD.partitions.size != rdd.partitions.size) { + throw new Exception( + "Checkpoint RDD " + newRDD + "("+ newRDD.partitions.size + ") has different " + + "number of partitions than original RDD " + rdd + "(" + rdd.partitions.size + ")") + } // Change the dependencies and partitions of the RDD RDDCheckpointData.synchronized { @@ -99,8 +105,8 @@ private[spark] class RDDCheckpointData[T: ClassManifest](rdd: RDD[T]) rdd.markCheckpointed(newRDD) // Update the RDD's dependencies and partitions cpState = Checkpointed RDDCheckpointData.clearTaskCaches() - logInfo("Done checkpointing RDD " + rdd.id + ", new parent is RDD " + newRDD.id) } + logInfo("Done checkpointing RDD " + rdd.id + " to " + path + ", new parent is RDD " + newRDD.id) } // Get preferred location of a split after checkpointing diff --git a/core/src/test/scala/org/apache/spark/JavaAPISuite.java b/core/src/test/scala/org/apache/spark/JavaAPISuite.java index 4234f6eac7..ee5d8c9f13 100644 --- a/core/src/test/scala/org/apache/spark/JavaAPISuite.java +++ b/core/src/test/scala/org/apache/spark/JavaAPISuite.java @@ -851,7 +851,7 @@ public class JavaAPISuite implements Serializable { public void checkpointAndComputation() { File tempDir = Files.createTempDir(); JavaRDD rdd = sc.parallelize(Arrays.asList(1, 2, 3, 4, 5)); - sc.setCheckpointDir(tempDir.getAbsolutePath(), true); + sc.setCheckpointDir(tempDir.getAbsolutePath()); Assert.assertEquals(false, rdd.isCheckpointed()); rdd.checkpoint(); rdd.count(); // Forces the DAG to cause a checkpoint @@ -863,7 +863,7 @@ public class JavaAPISuite implements Serializable { public void checkpointAndRestore() { File tempDir = Files.createTempDir(); JavaRDD rdd = sc.parallelize(Arrays.asList(1, 2, 3, 4, 5)); - sc.setCheckpointDir(tempDir.getAbsolutePath(), true); + sc.setCheckpointDir(tempDir.getAbsolutePath()); Assert.assertEquals(false, rdd.isCheckpointed()); rdd.checkpoint(); rdd.count(); // Forces the DAG to cause a checkpoint diff --git a/streaming/src/main/scala/org/apache/spark/streaming/Checkpoint.scala b/streaming/src/main/scala/org/apache/spark/streaming/Checkpoint.scala index 9271914eb5..bcf5e6b1e6 100644 --- a/streaming/src/main/scala/org/apache/spark/streaming/Checkpoint.scala +++ b/streaming/src/main/scala/org/apache/spark/streaming/Checkpoint.scala @@ -27,6 +27,7 @@ import org.apache.hadoop.conf.Configuration import org.apache.spark.Logging import org.apache.spark.io.CompressionCodec import org.apache.spark.util.MetadataCleaner +import org.apache.spark.deploy.SparkHadoopUtil private[streaming] @@ -57,7 +58,7 @@ class Checkpoint(@transient ssc: StreamingContext, val checkpointTime: Time) * Convenience class to speed up the writing of graph checkpoint to file */ private[streaming] -class CheckpointWriter(checkpointDir: String) extends Logging { +class CheckpointWriter(checkpointDir: String, hadoopConf: Configuration) extends Logging { val file = new Path(checkpointDir, "graph") // The file to which we actually write - and then "move" to file. private val writeFile = new Path(file.getParent, file.getName + ".next") @@ -65,8 +66,7 @@ class CheckpointWriter(checkpointDir: String) extends Logging { private var stopped = false - val conf = new Configuration() - var fs = file.getFileSystem(conf) + var fs = file.getFileSystem(hadoopConf) val maxAttempts = 3 val executor = Executors.newFixedThreadPool(1) diff --git a/streaming/src/main/scala/org/apache/spark/streaming/Scheduler.scala b/streaming/src/main/scala/org/apache/spark/streaming/Scheduler.scala index ed892e33e6..4cd8695df5 100644 --- a/streaming/src/main/scala/org/apache/spark/streaming/Scheduler.scala +++ b/streaming/src/main/scala/org/apache/spark/streaming/Scheduler.scala @@ -29,7 +29,7 @@ class Scheduler(ssc: StreamingContext) extends Logging { val concurrentJobs = System.getProperty("spark.streaming.concurrentJobs", "1").toInt val jobManager = new JobManager(ssc, concurrentJobs) val checkpointWriter = if (ssc.checkpointDuration != null && ssc.checkpointDir != null) { - new CheckpointWriter(ssc.checkpointDir) + new CheckpointWriter(ssc.checkpointDir, ssc.sparkContext.hadoopConfiguration) } else { null } diff --git a/streaming/src/main/scala/org/apache/spark/streaming/StreamingContext.scala b/streaming/src/main/scala/org/apache/spark/streaming/StreamingContext.scala index 70bf902143..d6fc2a19f4 100644 --- a/streaming/src/main/scala/org/apache/spark/streaming/StreamingContext.scala +++ b/streaming/src/main/scala/org/apache/spark/streaming/StreamingContext.scala @@ -46,6 +46,7 @@ import org.apache.hadoop.mapreduce.lib.input.TextInputFormat import org.apache.hadoop.fs.Path import twitter4j.Status import twitter4j.auth.Authorization +import org.apache.spark.deploy.SparkHadoopUtil /** @@ -85,7 +86,6 @@ class StreamingContext private ( null, batchDuration) } - /** * Re-create a StreamingContext from a checkpoint file. * @param path Path either to the directory that was specified as the checkpoint directory, or @@ -139,7 +139,7 @@ class StreamingContext private ( protected[streaming] var checkpointDir: String = { if (isCheckpointPresent) { - sc.setCheckpointDir(StreamingContext.getSparkCheckpointDir(cp_.checkpointDir), true) + sc.setCheckpointDir(cp_.checkpointDir) cp_.checkpointDir } else { null @@ -173,8 +173,12 @@ class StreamingContext private ( */ def checkpoint(directory: String) { if (directory != null) { - sc.setCheckpointDir(StreamingContext.getSparkCheckpointDir(directory)) - checkpointDir = directory + val path = new Path(directory) + val fs = path.getFileSystem(sparkContext.hadoopConfiguration) + fs.mkdirs(path) + val fullPath = fs.getFileStatus(path).getPath().toString + sc.setCheckpointDir(fullPath) + checkpointDir = fullPath } else { checkpointDir = null } @@ -595,8 +599,9 @@ object StreamingContext { prefix + "-" + time.milliseconds + "." + suffix } } - + /* protected[streaming] def getSparkCheckpointDir(sscCheckpointDir: String): String = { new Path(sscCheckpointDir, UUID.randomUUID.toString).toString } + */ } diff --git a/streaming/src/main/scala/org/apache/spark/streaming/dstream/FileInputDStream.scala b/streaming/src/main/scala/org/apache/spark/streaming/dstream/FileInputDStream.scala index fea0573b77..1a8db3ab59 100644 --- a/streaming/src/main/scala/org/apache/spark/streaming/dstream/FileInputDStream.scala +++ b/streaming/src/main/scala/org/apache/spark/streaming/dstream/FileInputDStream.scala @@ -39,8 +39,8 @@ class FileInputDStream[K: ClassManifest, V: ClassManifest, F <: NewInputFormat[K protected[streaming] override val checkpointData = new FileInputDStreamCheckpointData // Latest file mod time seen till any point of time - private val lastModTimeFiles = new HashSet[String]() - private var lastModTime = 0L + private val prevModTimeFiles = new HashSet[String]() + private var prevModTime = 0L @transient private var path_ : Path = null @transient private var fs_ : FileSystem = null @@ -48,11 +48,11 @@ class FileInputDStream[K: ClassManifest, V: ClassManifest, F <: NewInputFormat[K override def start() { if (newFilesOnly) { - lastModTime = graph.zeroTime.milliseconds + prevModTime = graph.zeroTime.milliseconds } else { - lastModTime = 0 + prevModTime = 0 } - logDebug("LastModTime initialized to " + lastModTime + ", new files only = " + newFilesOnly) + logDebug("LastModTime initialized to " + prevModTime + ", new files only = " + newFilesOnly) } override def stop() { } @@ -67,55 +67,22 @@ class FileInputDStream[K: ClassManifest, V: ClassManifest, F <: NewInputFormat[K * the previous call. */ override def compute(validTime: Time): Option[RDD[(K, V)]] = { - assert(validTime.milliseconds >= lastModTime, "Trying to get new files for really old time [" + validTime + " < " + lastModTime) + assert(validTime.milliseconds >= prevModTime, + "Trying to get new files for really old time [" + validTime + " < " + prevModTime) - // Create the filter for selecting new files - val newFilter = new PathFilter() { - // Latest file mod time seen in this round of fetching files and its corresponding files - var latestModTime = 0L - val latestModTimeFiles = new HashSet[String]() - - def accept(path: Path): Boolean = { - if (!filter(path)) { // Reject file if it does not satisfy filter - logDebug("Rejected by filter " + path) - return false - } else { // Accept file only if - val modTime = fs.getFileStatus(path).getModificationTime() - logDebug("Mod time for " + path + " is " + modTime) - if (modTime < lastModTime) { - logDebug("Mod time less than last mod time") - return false // If the file was created before the last time it was called - } else if (modTime == lastModTime && lastModTimeFiles.contains(path.toString)) { - logDebug("Mod time equal to last mod time, but file considered already") - return false // If the file was created exactly as lastModTime but not reported yet - } else if (modTime > validTime.milliseconds) { - logDebug("Mod time more than valid time") - return false // If the file was created after the time this function call requires - } - if (modTime > latestModTime) { - latestModTime = modTime - latestModTimeFiles.clear() - logDebug("Latest mod time updated to " + latestModTime) - } - latestModTimeFiles += path.toString - logDebug("Accepted " + path) - return true - } - } - } - logDebug("Finding new files at time " + validTime + " for last mod time = " + lastModTime) - val newFiles = fs.listStatus(path, newFilter).map(_.getPath.toString) + // Find new files + val (newFiles, latestModTime, latestModTimeFiles) = findNewFiles(validTime.milliseconds) logInfo("New files at time " + validTime + ":\n" + newFiles.mkString("\n")) if (newFiles.length > 0) { // Update the modification time and the files processed for that modification time - if (lastModTime != newFilter.latestModTime) { - lastModTime = newFilter.latestModTime - lastModTimeFiles.clear() + if (prevModTime < latestModTime) { + prevModTime = latestModTime + prevModTimeFiles.clear() } - lastModTimeFiles ++= newFilter.latestModTimeFiles - logDebug("Last mod time updated to " + lastModTime) + prevModTimeFiles ++= latestModTimeFiles + logDebug("Last mod time updated to " + prevModTime) } - files += ((validTime, newFiles)) + files += ((validTime, newFiles.toArray)) Some(filesToRDD(newFiles)) } @@ -130,8 +97,30 @@ class FileInputDStream[K: ClassManifest, V: ClassManifest, F <: NewInputFormat[K oldFiles.map(p => (p._1, p._2.mkString(", "))).mkString("\n")) } + /** + * Finds files which have modification timestamp <= current time. If some files are being + * deleted in the directory, then it can generate transient exceptions. Hence, multiple + * attempts are made to handle these transient exceptions. Returns 3-tuple + * (new files found, latest modification time among them, files with latest modification time) + */ + private def findNewFiles(currentTime: Long): (Seq[String], Long, Seq[String]) = { + logDebug("Trying to get new files for time " + currentTime) + var attempts = 0 + while (attempts < FileInputDStream.MAX_ATTEMPTS) { + attempts += 1 + try { + val filter = new CustomPathFilter(currentTime) + val newFiles = fs.listStatus(path, filter) + return (newFiles.map(_.getPath.toString), filter.latestModTime, filter.latestModTimeFiles.toSeq) + } catch { + case ioe: IOException => logWarning("Attempt " + attempts + " to get new files failed", ioe) + } + } + (Seq(), -1, Seq()) + } + /** Generate one RDD from an array of files */ - protected[streaming] def filesToRDD(files: Seq[String]): RDD[(K, V)] = { + private def filesToRDD(files: Seq[String]): RDD[(K, V)] = { new UnionRDD( context.sparkContext, files.map(file => context.sparkContext.newAPIHadoopFile[K, V, F](file)) @@ -189,10 +178,51 @@ class FileInputDStream[K: ClassManifest, V: ClassManifest, F <: NewInputFormat[K hadoopFiles.map(p => (p._1, p._2.mkString(", "))).mkString("\n") + "\n]" } } + + /** + * PathFilter to find new files that have modification timestamps <= current time, but have not + * been seen before (i.e. the file should not be in lastModTimeFiles) + * @param currentTime + */ + private[streaming] + class CustomPathFilter(currentTime: Long) extends PathFilter() { + // Latest file mod time seen in this round of fetching files and its corresponding files + var latestModTime = 0L + val latestModTimeFiles = new HashSet[String]() + + def accept(path: Path): Boolean = { + if (!filter(path)) { // Reject file if it does not satisfy filter + logDebug("Rejected by filter " + path) + return false + } else { // Accept file only if + val modTime = fs.getFileStatus(path).getModificationTime() + logDebug("Mod time for " + path + " is " + modTime) + if (modTime < prevModTime) { + logDebug("Mod time less than last mod time") + return false // If the file was created before the last time it was called + } else if (modTime == prevModTime && prevModTimeFiles.contains(path.toString)) { + logDebug("Mod time equal to last mod time, but file considered already") + return false // If the file was created exactly as lastModTime but not reported yet + } else if (modTime > currentTime) { + logDebug("Mod time more than valid time") + return false // If the file was created after the time this function call requires + } + if (modTime > latestModTime) { + latestModTime = modTime + latestModTimeFiles.clear() + logDebug("Latest mod time updated to " + latestModTime) + } + latestModTimeFiles += path.toString + logDebug("Accepted " + path) + return true + } + } + } } private[streaming] object FileInputDStream { + val MAX_ATTEMPTS = 10 def defaultFilter(path: Path): Boolean = !path.getName().startsWith(".") } diff --git a/streaming/src/test/scala/org/apache/spark/streaming/CheckpointSuite.scala b/streaming/src/test/scala/org/apache/spark/streaming/CheckpointSuite.scala index beb20831bd..e51754977c 100644 --- a/streaming/src/test/scala/org/apache/spark/streaming/CheckpointSuite.scala +++ b/streaming/src/test/scala/org/apache/spark/streaming/CheckpointSuite.scala @@ -25,8 +25,10 @@ import org.scalatest.BeforeAndAfter import org.apache.commons.io.FileUtils import collection.mutable.{SynchronizedBuffer, ArrayBuffer} import util.{Clock, ManualClock} -import scala.util.Random import com.google.common.io.Files +import org.apache.hadoop.fs.{Path, FileSystem} +import org.apache.hadoop.conf.Configuration + /** @@ -44,7 +46,7 @@ class CheckpointSuite extends TestSuiteBase with BeforeAndAfter { after { if (ssc != null) ssc.stop() - FileUtils.deleteDirectory(new File(checkpointDir)) + //FileUtils.deleteDirectory(new File(checkpointDir)) // To avoid Akka rebinding to the same port, since it doesn't unbind immediately on shutdown System.clearProperty("spark.driver.port") @@ -66,7 +68,7 @@ class CheckpointSuite extends TestSuiteBase with BeforeAndAfter { System.setProperty("spark.streaming.clock", "org.apache.spark.streaming.util.ManualClock") val stateStreamCheckpointInterval = Seconds(1) - + val fs = FileSystem.getLocal(new Configuration()) // this ensure checkpointing occurs at least once val firstNumBatches = (stateStreamCheckpointInterval / batchDuration).toLong * 2 val secondNumBatches = firstNumBatches @@ -90,11 +92,12 @@ class CheckpointSuite extends TestSuiteBase with BeforeAndAfter { ssc.start() advanceTimeWithRealDelay(ssc, firstNumBatches) logInfo("Checkpoint data of state stream = \n" + stateStream.checkpointData) - assert(!stateStream.checkpointData.checkpointFiles.isEmpty, "No checkpointed RDDs in state stream before first failure") + assert(!stateStream.checkpointData.checkpointFiles.isEmpty, + "No checkpointed RDDs in state stream before first failure") stateStream.checkpointData.checkpointFiles.foreach { - case (time, data) => { - val file = new File(data.toString) - assert(file.exists(), "Checkpoint file '" + file +"' for time " + time + " for state stream before first failure does not exist") + case (time, file) => { + assert(fs.exists(new Path(file)), "Checkpoint file '" + file +"' for time " + time + + " for state stream before first failure does not exist") } } @@ -102,7 +105,8 @@ class CheckpointSuite extends TestSuiteBase with BeforeAndAfter { // and check whether the earlier checkpoint files are deleted val checkpointFiles = stateStream.checkpointData.checkpointFiles.map(x => new File(x._2)) advanceTimeWithRealDelay(ssc, secondNumBatches) - checkpointFiles.foreach(file => assert(!file.exists, "Checkpoint file '" + file + "' was not deleted")) + checkpointFiles.foreach(file => + assert(!file.exists, "Checkpoint file '" + file + "' was not deleted")) ssc.stop() // Restart stream computation using the checkpoint file and check whether @@ -110,19 +114,20 @@ class CheckpointSuite extends TestSuiteBase with BeforeAndAfter { ssc = new StreamingContext(checkpointDir) stateStream = ssc.graph.getOutputStreams().head.dependencies.head.dependencies.head logInfo("Restored data of state stream = \n[" + stateStream.generatedRDDs.mkString("\n") + "]") - assert(!stateStream.generatedRDDs.isEmpty, "No restored RDDs in state stream after recovery from first failure") + assert(!stateStream.generatedRDDs.isEmpty, + "No restored RDDs in state stream after recovery from first failure") // Run one batch to generate a new checkpoint file and check whether some RDD // is present in the checkpoint data or not ssc.start() advanceTimeWithRealDelay(ssc, 1) - assert(!stateStream.checkpointData.checkpointFiles.isEmpty, "No checkpointed RDDs in state stream before second failure") + assert(!stateStream.checkpointData.checkpointFiles.isEmpty, + "No checkpointed RDDs in state stream before second failure") stateStream.checkpointData.checkpointFiles.foreach { - case (time, data) => { - val file = new File(data.toString) - assert(file.exists(), - "Checkpoint file '" + file +"' for time " + time + " for state stream before seconds failure does not exist") + case (time, file) => { + assert(fs.exists(new Path(file)), "Checkpoint file '" + file +"' for time " + time + + " for state stream before seconds failure does not exist") } } ssc.stop() @@ -132,7 +137,8 @@ class CheckpointSuite extends TestSuiteBase with BeforeAndAfter { ssc = new StreamingContext(checkpointDir) stateStream = ssc.graph.getOutputStreams().head.dependencies.head.dependencies.head logInfo("Restored data of state stream = \n[" + stateStream.generatedRDDs.mkString("\n") + "]") - assert(!stateStream.generatedRDDs.isEmpty, "No restored RDDs in state stream after recovery from second failure") + assert(!stateStream.generatedRDDs.isEmpty, + "No restored RDDs in state stream after recovery from second failure") // Adjust manual clock time as if it is being restarted after a delay System.setProperty("spark.streaming.manualClock.jump", (batchDuration.milliseconds * 7).toString) @@ -143,6 +149,7 @@ class CheckpointSuite extends TestSuiteBase with BeforeAndAfter { ssc = null } + // This tests whether the systm can recover from a master failure with simple // non-stateful operations. This assumes as reliable, replayable input // source - TestInputDStream. @@ -191,6 +198,7 @@ class CheckpointSuite extends TestSuiteBase with BeforeAndAfter { testCheckpointedOperation(input, operation, output, 7) } + // This tests whether file input stream remembers what files were seen before // the master failure and uses them again to process a large window operation. // It also tests whether batches, whose processing was incomplete due to the