SPARK-3276 Added a new configuration spark.streaming.minRememberDuration
SPARK-3276 Added a new configuration parameter ``spark.streaming.minRememberDuration``, with a default value of 1 minute. So that when a Spark Streaming application is started, an arbitrary number of minutes can be taken as threshold for remembering. Author: emres <emre.sevinc@gmail.com> Closes #5438 from emres/SPARK-3276 and squashes the following commits: 766f938 [emres] SPARK-3276 Switched to using newly added getTimeAsSeconds method. affee1d [emres] SPARK-3276 Changed the property name and variable name for minRememberDuration c9d58ca [emres] SPARK-3276 Minor code re-formatting. 1c53ba9 [emres] SPARK-3276 Started to use ssc.conf rather than ssc.sparkContext.getConf, and also getLong method directly. bfe0acb [emres] SPARK-3276 Moved the minRememberDurationMin to the class daccc82 [emres] SPARK-3276 Changed the property name to reflect the unit of value and reduced number of fields. 43cc1ce [emres] SPARK-3276 Added a new configuration parameter spark.streaming.minRemember duration, with a default value of 1 minute.
This commit is contained in:
parent
c035c0f2d7
commit
c25ca7c5a1
|
@ -26,7 +26,7 @@ import org.apache.hadoop.conf.Configuration
|
||||||
import org.apache.hadoop.fs.{FileSystem, Path, PathFilter}
|
import org.apache.hadoop.fs.{FileSystem, Path, PathFilter}
|
||||||
import org.apache.hadoop.mapreduce.{InputFormat => NewInputFormat}
|
import org.apache.hadoop.mapreduce.{InputFormat => NewInputFormat}
|
||||||
|
|
||||||
import org.apache.spark.SerializableWritable
|
import org.apache.spark.{SparkConf, SerializableWritable}
|
||||||
import org.apache.spark.rdd.{RDD, UnionRDD}
|
import org.apache.spark.rdd.{RDD, UnionRDD}
|
||||||
import org.apache.spark.streaming._
|
import org.apache.spark.streaming._
|
||||||
import org.apache.spark.util.{TimeStampedHashMap, Utils}
|
import org.apache.spark.util.{TimeStampedHashMap, Utils}
|
||||||
|
@ -63,7 +63,7 @@ import org.apache.spark.util.{TimeStampedHashMap, Utils}
|
||||||
* the streaming app.
|
* the streaming app.
|
||||||
* - If a file is to be visible in the directory listings, it must be visible within a certain
|
* - If a file is to be visible in the directory listings, it must be visible within a certain
|
||||||
* duration of the mod time of the file. This duration is the "remember window", which is set to
|
* duration of the mod time of the file. This duration is the "remember window", which is set to
|
||||||
* 1 minute (see `FileInputDStream.MIN_REMEMBER_DURATION`). Otherwise, the file will never be
|
* 1 minute (see `FileInputDStream.minRememberDuration`). Otherwise, the file will never be
|
||||||
* selected as the mod time will be less than the ignore threshold when it becomes visible.
|
* selected as the mod time will be less than the ignore threshold when it becomes visible.
|
||||||
* - Once a file is visible, the mod time cannot change. If it does due to appends, then the
|
* - Once a file is visible, the mod time cannot change. If it does due to appends, then the
|
||||||
* processing semantics are undefined.
|
* processing semantics are undefined.
|
||||||
|
@ -80,6 +80,15 @@ class FileInputDStream[K, V, F <: NewInputFormat[K,V]](
|
||||||
|
|
||||||
private val serializableConfOpt = conf.map(new SerializableWritable(_))
|
private val serializableConfOpt = conf.map(new SerializableWritable(_))
|
||||||
|
|
||||||
|
/**
|
||||||
|
* Minimum duration of remembering the information of selected files. Defaults to 60 seconds.
|
||||||
|
*
|
||||||
|
* Files with mod times older than this "window" of remembering will be ignored. So if new
|
||||||
|
* files are visible within this window, then the file will get selected in the next batch.
|
||||||
|
*/
|
||||||
|
private val minRememberDurationS =
|
||||||
|
Seconds(ssc.conf.getTimeAsSeconds("spark.streaming.minRememberDuration", "60s"))
|
||||||
|
|
||||||
// This is a def so that it works during checkpoint recovery:
|
// This is a def so that it works during checkpoint recovery:
|
||||||
private def clock = ssc.scheduler.clock
|
private def clock = ssc.scheduler.clock
|
||||||
|
|
||||||
|
@ -95,7 +104,8 @@ class FileInputDStream[K, V, F <: NewInputFormat[K,V]](
|
||||||
* This would allow us to filter away not-too-old files which have already been recently
|
* This would allow us to filter away not-too-old files which have already been recently
|
||||||
* selected and processed.
|
* selected and processed.
|
||||||
*/
|
*/
|
||||||
private val numBatchesToRemember = FileInputDStream.calculateNumBatchesToRemember(slideDuration)
|
private val numBatchesToRemember = FileInputDStream
|
||||||
|
.calculateNumBatchesToRemember(slideDuration, minRememberDurationS)
|
||||||
private val durationToRemember = slideDuration * numBatchesToRemember
|
private val durationToRemember = slideDuration * numBatchesToRemember
|
||||||
remember(durationToRemember)
|
remember(durationToRemember)
|
||||||
|
|
||||||
|
@ -330,20 +340,14 @@ class FileInputDStream[K, V, F <: NewInputFormat[K,V]](
|
||||||
private[streaming]
|
private[streaming]
|
||||||
object FileInputDStream {
|
object FileInputDStream {
|
||||||
|
|
||||||
/**
|
|
||||||
* Minimum duration of remembering the information of selected files. Files with mod times
|
|
||||||
* older than this "window" of remembering will be ignored. So if new files are visible
|
|
||||||
* within this window, then the file will get selected in the next batch.
|
|
||||||
*/
|
|
||||||
private val MIN_REMEMBER_DURATION = Minutes(1)
|
|
||||||
|
|
||||||
def defaultFilter(path: Path): Boolean = !path.getName().startsWith(".")
|
def defaultFilter(path: Path): Boolean = !path.getName().startsWith(".")
|
||||||
|
|
||||||
/**
|
/**
|
||||||
* Calculate the number of last batches to remember, such that all the files selected in
|
* Calculate the number of last batches to remember, such that all the files selected in
|
||||||
* at least last MIN_REMEMBER_DURATION duration can be remembered.
|
* at least last minRememberDurationS duration can be remembered.
|
||||||
*/
|
*/
|
||||||
def calculateNumBatchesToRemember(batchDuration: Duration): Int = {
|
def calculateNumBatchesToRemember(batchDuration: Duration,
|
||||||
math.ceil(MIN_REMEMBER_DURATION.milliseconds.toDouble / batchDuration.milliseconds).toInt
|
minRememberDurationS: Duration): Int = {
|
||||||
|
math.ceil(minRememberDurationS.milliseconds.toDouble / batchDuration.milliseconds).toInt
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
Loading…
Reference in a new issue