Made RDD checkpoint not create a new thread. Fixed bug in detecting when spark.cleaner.delay is insufficient.
This commit is contained in:
parent
477de94894
commit
b4dba55f78
|
@ -211,14 +211,6 @@ abstract class RDD[T: ClassManifest](
|
|||
|
||||
if (startCheckpoint) {
|
||||
val rdd = this
|
||||
val env = SparkEnv.get
|
||||
|
||||
// Spawn a new thread to do the checkpoint as it takes sometime to write the RDD to file
|
||||
val th = new Thread() {
|
||||
override def run() {
|
||||
// Save the RDD to a file, create a new HadoopRDD from it,
|
||||
// and change the dependencies from the original parents to the new RDD
|
||||
SparkEnv.set(env)
|
||||
rdd.checkpointFile = new Path(context.checkpointDir, "rdd-" + id).toString
|
||||
rdd.saveAsObjectFile(checkpointFile)
|
||||
rdd.synchronized {
|
||||
|
@ -228,11 +220,8 @@ abstract class RDD[T: ClassManifest](
|
|||
rdd.shouldCheckpoint = false
|
||||
rdd.isCheckpointInProgress = false
|
||||
rdd.isCheckpointed = true
|
||||
println("Done checkpointing RDD " + rdd.id + ", " + rdd)
|
||||
println("Done checkpointing RDD " + rdd.id + ", " + rdd + ", created RDD " + rdd.checkpointRDD.id + ", " + rdd.checkpointRDD)
|
||||
}
|
||||
}
|
||||
}
|
||||
th.start()
|
||||
} else {
|
||||
// Recursively call doCheckpoint() to perform checkpointing on parent RDD if they are marked
|
||||
dependencies.foreach(_.rdd.doCheckpoint())
|
||||
|
|
|
@ -10,7 +10,7 @@ import java.util.concurrent.ConcurrentHashMap
|
|||
* threshold time can them be removed using the cleanup method. This is intended to be a drop-in
|
||||
* replacement of scala.collection.mutable.HashMap.
|
||||
*/
|
||||
class TimeStampedHashMap[A, B] extends Map[A, B]() {
|
||||
class TimeStampedHashMap[A, B] extends Map[A, B]() with spark.Logging {
|
||||
val internalMap = new ConcurrentHashMap[A, (B, Long)]()
|
||||
|
||||
def get(key: A): Option[B] = {
|
||||
|
@ -79,6 +79,7 @@ class TimeStampedHashMap[A, B] extends Map[A, B]() {
|
|||
while(iterator.hasNext) {
|
||||
val entry = iterator.next()
|
||||
if (entry.getValue._2 < threshTime) {
|
||||
logDebug("Removing key " + entry.getKey)
|
||||
iterator.remove()
|
||||
}
|
||||
}
|
||||
|
|
|
@ -182,14 +182,15 @@ extends Serializable with Logging {
|
|||
checkpointInterval + "). Please set it to higher than " + checkpointInterval + "."
|
||||
)
|
||||
|
||||
val metadataCleanupDelay = System.getProperty("spark.cleanup.delay", "-1").toDouble
|
||||
val metadataCleanerDelay = spark.util.MetadataCleaner.getDelaySeconds
|
||||
logInfo("metadataCleanupDelay = " + metadataCleanerDelay)
|
||||
assert(
|
||||
metadataCleanupDelay < 0 || rememberDuration < metadataCleanupDelay * 60 * 1000,
|
||||
metadataCleanerDelay < 0 || rememberDuration < metadataCleanerDelay * 1000,
|
||||
"It seems you are doing some DStream window operation or setting a checkpoint interval " +
|
||||
"which requires " + this.getClass.getSimpleName + " to remember generated RDDs for more " +
|
||||
"than " + rememberDuration.milliseconds + " milliseconds. But the Spark's metadata cleanup" +
|
||||
"delay is set to " + metadataCleanupDelay + " minutes, which is not sufficient. Please set " +
|
||||
"the Java property 'spark.cleanup.delay' to more than " +
|
||||
"delay is set to " + (metadataCleanerDelay / 60.0) + " minutes, which is not sufficient. Please set " +
|
||||
"the Java property 'spark.cleaner.delay' to more than " +
|
||||
math.ceil(rememberDuration.millis.toDouble / 60000.0).toInt + " minutes."
|
||||
)
|
||||
|
||||
|
|
Loading…
Reference in a new issue