Added a hashmap to cache file mod times.

This commit is contained in:
Tathagata Das 2014-01-05 23:42:53 -08:00
parent 2394794591
commit ac1f4b06c1
2 changed files with 30 additions and 8 deletions

View file

@ -30,12 +30,16 @@ import org.apache.spark.Logging
* threshold time can them be removed using the clearOldValues method. This is intended to be a drop-in * threshold time can them be removed using the clearOldValues method. This is intended to be a drop-in
* replacement of scala.collection.mutable.HashMap. * replacement of scala.collection.mutable.HashMap.
*/ */
class TimeStampedHashMap[A, B] extends Map[A, B]() with Logging { class TimeStampedHashMap[A, B](updateTimeStampOnGet: Boolean = false)
extends Map[A, B]() with Logging {
val internalMap = new ConcurrentHashMap[A, (B, Long)]() val internalMap = new ConcurrentHashMap[A, (B, Long)]()
def get(key: A): Option[B] = { def get(key: A): Option[B] = {
val value = internalMap.get(key) val value = internalMap.get(key)
if (value != null) Some(value._1) else None if (value != null && updateTimeStampOnGet) {
internalMap.replace(key, value, (value._1, currentTime))
}
Option(value).map(_._1)
} }
def iterator: Iterator[(A, B)] = { def iterator: Iterator[(A, B)] = {

View file

@ -23,10 +23,10 @@ import scala.reflect.ClassTag
import org.apache.hadoop.fs.{FileSystem, Path, PathFilter} import org.apache.hadoop.fs.{FileSystem, Path, PathFilter}
import org.apache.hadoop.conf.Configuration import org.apache.hadoop.conf.Configuration
import org.apache.hadoop.mapreduce.{InputFormat => NewInputFormat} import org.apache.hadoop.mapreduce.{InputFormat => NewInputFormat}
import org.apache.spark.SparkException
import org.apache.spark.rdd.RDD import org.apache.spark.rdd.RDD
import org.apache.spark.rdd.UnionRDD import org.apache.spark.rdd.UnionRDD
import org.apache.spark.streaming.{DStreamCheckpointData, StreamingContext, Time} import org.apache.spark.streaming.{DStreamCheckpointData, StreamingContext, Time}
import org.apache.spark.util.TimeStampedHashMap
private[streaming] private[streaming]
@ -46,6 +46,8 @@ class FileInputDStream[K: ClassTag, V: ClassTag, F <: NewInputFormat[K,V] : Clas
@transient private var path_ : Path = null @transient private var path_ : Path = null
@transient private var fs_ : FileSystem = null @transient private var fs_ : FileSystem = null
@transient private[streaming] var files = new HashMap[Time, Array[String]] @transient private[streaming] var files = new HashMap[Time, Array[String]]
@transient private var fileModTimes = new TimeStampedHashMap[String, Long](true)
@transient private var lastNewFileFindingTime = 0L
override def start() { override def start() {
if (newFilesOnly) { if (newFilesOnly) {
@ -96,6 +98,8 @@ class FileInputDStream[K: ClassTag, V: ClassTag, F <: NewInputFormat[K,V] : Clas
(time - rememberDuration) + ": " + oldFiles.keys.mkString(", ")) (time - rememberDuration) + ": " + oldFiles.keys.mkString(", "))
logDebug("Cleared files are:\n" + logDebug("Cleared files are:\n" +
oldFiles.map(p => (p._1, p._2.mkString(", "))).mkString("\n")) oldFiles.map(p => (p._1, p._2.mkString(", "))).mkString("\n"))
// Delete file times that weren't accessed in the last round of getting new files
fileModTimes.clearOldValues(lastNewFileFindingTime - 1)
} }
/** /**
@ -104,8 +108,18 @@ class FileInputDStream[K: ClassTag, V: ClassTag, F <: NewInputFormat[K,V] : Clas
*/ */
private def findNewFiles(currentTime: Long): (Seq[String], Long, Seq[String]) = { private def findNewFiles(currentTime: Long): (Seq[String], Long, Seq[String]) = {
logDebug("Trying to get new files for time " + currentTime) logDebug("Trying to get new files for time " + currentTime)
lastNewFileFindingTime = System.currentTimeMillis
val filter = new CustomPathFilter(currentTime) val filter = new CustomPathFilter(currentTime)
val newFiles = fs.listStatus(path, filter).map(_.getPath.toString) val newFiles = fs.listStatus(directoryPath, filter).map(_.getPath.toString)
val timeTaken = System.currentTimeMillis - lastNewFileFindingTime
logInfo("Finding new files took " + timeTaken + " ms")
if (timeTaken > slideDuration.milliseconds) {
logWarning(
"Time taken to find new files exceeds the batch size. " +
"Consider increasing the batch size or reduceing the number of " +
"files in the monitored directory."
)
}
(newFiles, filter.latestModTime, filter.latestModTimeFiles.toSeq) (newFiles, filter.latestModTime, filter.latestModTimeFiles.toSeq)
} }
@ -122,16 +136,20 @@ class FileInputDStream[K: ClassTag, V: ClassTag, F <: NewInputFormat[K,V] : Clas
new UnionRDD(context.sparkContext, fileRDDs) new UnionRDD(context.sparkContext, fileRDDs)
} }
private def path: Path = { private def directoryPath: Path = {
if (path_ == null) path_ = new Path(directory) if (path_ == null) path_ = new Path(directory)
path_ path_
} }
private def fs: FileSystem = { private def fs: FileSystem = {
if (fs_ == null) fs_ = path.getFileSystem(new Configuration()) if (fs_ == null) fs_ = directoryPath.getFileSystem(new Configuration())
fs_ fs_
} }
private def getFileModTime(path: Path) = {
fileModTimes.getOrElseUpdate(path.toString, fs.getFileStatus(path).getModificationTime())
}
private def reset() { private def reset() {
fs_ = null fs_ = null
} }
@ -142,6 +160,7 @@ class FileInputDStream[K: ClassTag, V: ClassTag, F <: NewInputFormat[K,V] : Clas
ois.defaultReadObject() ois.defaultReadObject()
generatedRDDs = new HashMap[Time, RDD[(K,V)]] () generatedRDDs = new HashMap[Time, RDD[(K,V)]] ()
files = new HashMap[Time, Array[String]] files = new HashMap[Time, Array[String]]
fileModTimes = new TimeStampedHashMap[String, Long](true)
} }
/** /**
@ -187,14 +206,13 @@ class FileInputDStream[K: ClassTag, V: ClassTag, F <: NewInputFormat[K,V] : Clas
// Latest file mod time seen in this round of fetching files and its corresponding files // Latest file mod time seen in this round of fetching files and its corresponding files
var latestModTime = 0L var latestModTime = 0L
val latestModTimeFiles = new HashSet[String]() val latestModTimeFiles = new HashSet[String]()
def accept(path: Path): Boolean = { def accept(path: Path): Boolean = {
try { try {
if (!filter(path)) { // Reject file if it does not satisfy filter if (!filter(path)) { // Reject file if it does not satisfy filter
logDebug("Rejected by filter " + path) logDebug("Rejected by filter " + path)
return false return false
} }
val modTime = fs.getFileStatus(path).getModificationTime() val modTime = getFileModTime(path)
logDebug("Mod time for " + path + " is " + modTime) logDebug("Mod time for " + path + " is " + modTime)
if (modTime < prevModTime) { if (modTime < prevModTime) {
logDebug("Mod time less than last mod time") logDebug("Mod time less than last mod time")