[SPARK-7116] [SQL] [PYSPARK] Remove cache() causing memory leak

This patch simply removes a `cache()` on an intermediate RDD when evaluating Python UDFs.

Author: ksonj <kson@siberie.de>

Closes #5973 from ksonj/udf and squashes the following commits:

db5b564 [ksonj] removed TODO about cleaning up
fe70c54 [ksonj] Remove cache() causing memory leak
This commit is contained in:
ksonj 2015-05-07 12:04:19 -07:00 committed by Michael Armbrust
parent 5784c8d955
commit dec8f53719

View file

@ -219,8 +219,8 @@ case class EvaluatePython(
/**
* :: DeveloperApi ::
* Uses PythonRDD to evaluate a [[PythonUDF]], one partition of tuples at a time. The input
* data is cached and zipped with the result of the udf evaluation.
* Uses PythonRDD to evaluate a [[PythonUDF]], one partition of tuples at a time.
* The input data is zipped with the result of the udf evaluation.
*/
@DeveloperApi
case class BatchPythonEvaluation(udf: PythonUDF, output: Seq[Attribute], child: SparkPlan)
@ -229,8 +229,7 @@ case class BatchPythonEvaluation(udf: PythonUDF, output: Seq[Attribute], child:
def children: Seq[SparkPlan] = child :: Nil
def execute(): RDD[Row] = {
// TODO: Clean up after ourselves?
val childResults = child.execute().map(_.copy()).cache()
val childResults = child.execute().map(_.copy())
val parent = childResults.mapPartitions { iter =>
val pickle = new Pickler