diff --git a/core/src/main/scala/spark/TaskContext.scala b/core/src/main/scala/spark/TaskContext.scala index d62229f0ce..6b67b17b8a 100644 --- a/core/src/main/scala/spark/TaskContext.scala +++ b/core/src/main/scala/spark/TaskContext.scala @@ -7,15 +7,15 @@ class TaskContext(val stageId: Int, val splitId: Int, val attemptId: Long, val t //by adding Task here, I'm destroying the separation between Task & TaskContext ... not sure why they need to // be separate - @transient val onCompleteCallbacks = new ArrayBuffer[TaskContext => Unit] + @transient val onCompleteCallbacks = new ArrayBuffer[() => Unit] // Add a callback function to be executed on task completion. An example use // is for HadoopRDD to register a callback to close the input stream. - def addOnCompleteCallback(f: TaskContext => Unit) { + def addOnCompleteCallback(f: () => Unit) { onCompleteCallbacks += f } def executeOnCompleteCallbacks() { - onCompleteCallbacks.foreach{_.apply(this)} + onCompleteCallbacks.foreach{_()} } } diff --git a/core/src/main/scala/spark/rdd/CheckpointRDD.scala b/core/src/main/scala/spark/rdd/CheckpointRDD.scala index ad08558590..96b593ba7c 100644 --- a/core/src/main/scala/spark/rdd/CheckpointRDD.scala +++ b/core/src/main/scala/spark/rdd/CheckpointRDD.scala @@ -104,7 +104,7 @@ private[spark] object CheckpointRDD extends Logging { val deserializeStream = serializer.deserializeStream(fileInputStream) // Register an on-task-completion callback to close the input stream. - context.addOnCompleteCallback(_ => deserializeStream.close()) + context.addOnCompleteCallback(() => deserializeStream.close()) deserializeStream.asIterator.asInstanceOf[Iterator[T]] } diff --git a/core/src/main/scala/spark/rdd/HadoopRDD.scala b/core/src/main/scala/spark/rdd/HadoopRDD.scala index 3f0b4ce23e..f547f53812 100644 --- a/core/src/main/scala/spark/rdd/HadoopRDD.scala +++ b/core/src/main/scala/spark/rdd/HadoopRDD.scala @@ -74,7 +74,7 @@ class HadoopRDD[K, V]( reader = fmt.getRecordReader(split.inputSplit.value, conf, Reporter.NULL) // Register an on-task-completion callback to close the input stream. - context.addOnCompleteCallback(_ => reader.close()) + context.addOnCompleteCallback(() => reader.close()) val key: K = reader.createKey() val value: V = reader.createValue() diff --git a/core/src/main/scala/spark/rdd/NewHadoopRDD.scala b/core/src/main/scala/spark/rdd/NewHadoopRDD.scala index c80d30e125..c3b155fcbd 100644 --- a/core/src/main/scala/spark/rdd/NewHadoopRDD.scala +++ b/core/src/main/scala/spark/rdd/NewHadoopRDD.scala @@ -63,7 +63,7 @@ class NewHadoopRDD[K, V]( reader.initialize(split.serializableHadoopSplit.value, hadoopAttemptContext) // Register an on-task-completion callback to close the input stream. - context.addOnCompleteCallback(_ => reader.close()) + context.addOnCompleteCallback(() => reader.close()) var havePair = false var finished = false