Serialize and restore spark.cleaner.ttl to savepoint
This commit is contained in:
parent
a106ed8b97
commit
fbe40c5806
|
@ -26,6 +26,7 @@ import org.apache.hadoop.conf.Configuration
|
|||
|
||||
import org.apache.spark.Logging
|
||||
import org.apache.spark.io.CompressionCodec
|
||||
import org.apache.spark.util.MetadataCleaner
|
||||
|
||||
|
||||
private[streaming]
|
||||
|
@ -40,6 +41,7 @@ class Checkpoint(@transient ssc: StreamingContext, val checkpointTime: Time)
|
|||
val checkpointDir = ssc.checkpointDir
|
||||
val checkpointDuration = ssc.checkpointDuration
|
||||
val pendingTimes = ssc.scheduler.jobManager.getPendingTimes()
|
||||
val delaySeconds = MetadataCleaner.getDelaySeconds
|
||||
|
||||
def validate() {
|
||||
assert(master != null, "Checkpoint.master is null")
|
||||
|
|
|
@ -100,6 +100,10 @@ class StreamingContext private (
|
|||
"both SparkContext and checkpoint as null")
|
||||
}
|
||||
|
||||
if(cp_ != null && cp_.delaySeconds >= 0 && MetadataCleaner.getDelaySeconds < 0) {
|
||||
MetadataCleaner.setDelaySeconds(cp_.delaySeconds)
|
||||
}
|
||||
|
||||
if (MetadataCleaner.getDelaySeconds < 0) {
|
||||
throw new SparkException("Spark Streaming cannot be used without setting spark.cleaner.ttl; "
|
||||
+ "set this property before creating a SparkContext (use SPARK_JAVA_OPTS for the shell)")
|
||||
|
|
Loading…
Reference in a new issue