Fixed conf/slaves and updated docs.

This commit is contained in:
Tathagata Das 2014-01-10 05:06:15 -08:00
parent 38d75e18fa
commit 740730a179
4 changed files with 23 additions and 9 deletions

View file

@ -1 +1,2 @@
localhost
# A Spark Worker will be started on each of the machines listed below.
localhost

View file

@ -26,9 +26,12 @@ import org.apache.spark.Logging
/**
* This is a custom implementation of scala.collection.mutable.Map which stores the insertion
* time stamp along with each key-value pair. Key-value pairs that are older than a particular
* threshold time can them be removed using the clearOldValues method. This is intended to be a drop-in
* replacement of scala.collection.mutable.HashMap.
* timestamp along with each key-value pair. If specified, the timestamp of each pair can be
* updated every it is accessed. Key-value pairs whose timestamp are older than a particular
* threshold time can them be removed using the clearOldValues method. This is intended to
* be a drop-in replacement of scala.collection.mutable.HashMap.
* @param updateTimeStampOnGet When enabled, the timestamp of a pair will be
* updated when it is accessed
*/
class TimeStampedHashMap[A, B](updateTimeStampOnGet: Boolean = false)
extends Map[A, B]() with Logging {

View file

@ -34,8 +34,10 @@ class DStreamCheckpointData[T: ClassTag] (dstream: DStream[T])
// Mapping of the batch time to the checkpointed RDD file of that time
@transient private var timeToCheckpointFile = new HashMap[Time, String]
// Mapping of the batch time to the time of the oldest checkpointed RDD in that batch's checkpoint data
// Mapping of the batch time to the time of the oldest checkpointed RDD
// in that batch's checkpoint data
@transient private var timeToOldestCheckpointFileTime = new HashMap[Time, Time]
@transient private var fileSystem : FileSystem = null
protected[streaming] def currentCheckpointFiles = data.asInstanceOf[HashMap[Time, String]]
@ -55,19 +57,26 @@ class DStreamCheckpointData[T: ClassTag] (dstream: DStream[T])
if (!checkpointFiles.isEmpty) {
currentCheckpointFiles.clear()
currentCheckpointFiles ++= checkpointFiles
// Add the current checkpoint files to the map of all checkpoint files
// This will be used to delete old checkpoint files
timeToCheckpointFile ++= currentCheckpointFiles
// Remember the time of the oldest checkpoint RDD in current state
timeToOldestCheckpointFileTime(time) = currentCheckpointFiles.keys.min(Time.ordering)
}
}
/**
* Cleanup old checkpoint data. This gets called every time the graph
* checkpoint is initiated, but after `update` is called. Default
* implementation, cleans up old checkpoint files.
* Cleanup old checkpoint data. This gets called after a checkpoint of `time` has been
* written to the checkpoint directory.
*/
def cleanup(time: Time) {
// Get the time of the oldest checkpointed RDD that was written as part of the
// checkpoint of `time`
timeToOldestCheckpointFileTime.remove(time) match {
case Some(lastCheckpointFileTime) =>
// Find all the checkpointed RDDs (i.e. files) that are older than `lastCheckpointFileTime`
// This is because checkpointed RDDs older than this are not going to be needed
// even after master fails, as the checkpoint data of `time` does not refer to those files
val filesToDelete = timeToCheckpointFile.filter(_._1 < lastCheckpointFileTime)
logDebug("Files to delete:\n" + filesToDelete.mkString(","))
filesToDelete.foreach {

View file

@ -98,7 +98,7 @@ class FileInputDStream[K: ClassTag, V: ClassTag, F <: NewInputFormat[K,V] : Clas
(time - rememberDuration) + ": " + oldFiles.keys.mkString(", "))
logDebug("Cleared files are:\n" +
oldFiles.map(p => (p._1, p._2.mkString(", "))).mkString("\n"))
// Delete file times that weren't accessed in the last round of getting new files
// Delete file mod times that weren't accessed in the last round of getting new files
fileModTimes.clearOldValues(lastNewFileFindingTime - 1)
}
@ -147,6 +147,7 @@ class FileInputDStream[K: ClassTag, V: ClassTag, F <: NewInputFormat[K,V] : Clas
}
private def getFileModTime(path: Path) = {
// Get file mod time from cache or fetch it from the file system
fileModTimes.getOrElseUpdate(path.toString, fs.getFileStatus(path).getModificationTime())
}