[SPARK-7318] [STREAMING] DStream cleans objects that are not closures
I added a check in `ClosureCleaner#clean` to fail fast if this is detected in the future. tdas Author: Andrew Or <andrew@databricks.com> Closes #5860 from andrewor14/streaming-closure-cleaner and squashes the following commits: 8e971d7 [Andrew Or] Do not throw exception if object to clean is not closure 5ee4e25 [Andrew Or] Fix tests eed3390 [Andrew Or] Merge branch 'master' of github.com:apache/spark into streaming-closure-cleaner 67eeff4 [Andrew Or] Add tests a4fa768 [Andrew Or] Clean the closure, not the RDD
This commit is contained in:
parent
1fdabf8dcd
commit
57e9f29e17
|
@ -179,6 +179,11 @@ private[spark] object ClosureCleaner extends Logging {
|
|||
cleanTransitively: Boolean,
|
||||
accessedFields: Map[Class[_], Set[String]]): Unit = {
|
||||
|
||||
if (!isClosure(func.getClass)) {
|
||||
logWarning("Expected a closure; got " + func.getClass.getName)
|
||||
return
|
||||
}
|
||||
|
||||
// TODO: clean all inner closures first. This requires us to find the inner objects.
|
||||
// TODO: cache outerClasses / innerClasses / accessedFields
|
||||
|
||||
|
|
|
@ -553,7 +553,8 @@ abstract class DStream[T: ClassTag] (
|
|||
// because the DStream is reachable from the outer object here, and because
|
||||
// DStreams can't be serialized with closures, we can't proactively check
|
||||
// it for serializability and so we pass the optional false to SparkContext.clean
|
||||
transform((r: RDD[T], t: Time) => context.sparkContext.clean(transformFunc(r), false))
|
||||
val cleanedF = context.sparkContext.clean(transformFunc, false)
|
||||
transform((r: RDD[T], t: Time) => cleanedF(r))
|
||||
}
|
||||
|
||||
/**
|
||||
|
|
|
@ -256,8 +256,8 @@ class ReceiverSuite extends TestSuiteBase with Timeouts with Serializable {
|
|||
}
|
||||
|
||||
withStreamingContext(new StreamingContext(sparkConf, batchDuration)) { ssc =>
|
||||
val receiver1 = ssc.sparkContext.clean(new FakeReceiver(sendData = true))
|
||||
val receiver2 = ssc.sparkContext.clean(new FakeReceiver(sendData = true))
|
||||
val receiver1 = new FakeReceiver(sendData = true)
|
||||
val receiver2 = new FakeReceiver(sendData = true)
|
||||
val receiverStream1 = ssc.receiverStream(receiver1)
|
||||
val receiverStream2 = ssc.receiverStream(receiver2)
|
||||
receiverStream1.register()
|
||||
|
|
Loading…
Reference in a new issue