Deprecate mapPartitionsWithSplit in PySpark.
Also, replace the last reference to it in the docs. This fixes SPARK-1026.
This commit is contained in:
parent
ff44732171
commit
4cebb79c9f
|
@ -168,9 +168,9 @@ The following tables list the transformations and actions currently supported (s
|
|||
Iterator[T] => Iterator[U] when running on an RDD of type T. </td>
|
||||
</tr>
|
||||
<tr>
|
||||
<td> <b>mapPartitionsWithSplit</b>(<i>func</i>) </td>
|
||||
<td> <b>mapPartitionsWithIndex</b>(<i>func</i>) </td>
|
||||
<td> Similar to mapPartitions, but also provides <i>func</i> with an integer value representing the index of
|
||||
the split, so <i>func</i> must be of type (Int, Iterator[T]) => Iterator[U] when running on an RDD of type T.
|
||||
the partition, so <i>func</i> must be of type (Int, Iterator[T]) => Iterator[U] when running on an RDD of type T.
|
||||
</td>
|
||||
</tr>
|
||||
<tr>
|
||||
|
|
|
@ -27,6 +27,7 @@ import traceback
|
|||
from subprocess import Popen, PIPE
|
||||
from tempfile import NamedTemporaryFile
|
||||
from threading import Thread
|
||||
import warnings
|
||||
|
||||
from pyspark.serializers import NoOpSerializer, CartesianDeserializer, \
|
||||
BatchedSerializer, CloudPickleSerializer, pack_long
|
||||
|
@ -179,7 +180,7 @@ class RDD(object):
|
|||
[(2, 2), (2, 2), (3, 3), (3, 3), (4, 4), (4, 4)]
|
||||
"""
|
||||
def func(s, iterator): return chain.from_iterable(imap(f, iterator))
|
||||
return self.mapPartitionsWithSplit(func, preservesPartitioning)
|
||||
return self.mapPartitionsWithIndex(func, preservesPartitioning)
|
||||
|
||||
def mapPartitions(self, f, preservesPartitioning=False):
|
||||
"""
|
||||
|
@ -191,10 +192,24 @@ class RDD(object):
|
|||
[3, 7]
|
||||
"""
|
||||
def func(s, iterator): return f(iterator)
|
||||
return self.mapPartitionsWithSplit(func)
|
||||
return self.mapPartitionsWithIndex(func)
|
||||
|
||||
def mapPartitionsWithIndex(self, f, preservesPartitioning=False):
|
||||
"""
|
||||
Return a new RDD by applying a function to each partition of this RDD,
|
||||
while tracking the index of the original partition.
|
||||
|
||||
>>> rdd = sc.parallelize([1, 2, 3, 4], 4)
|
||||
>>> def f(splitIndex, iterator): yield splitIndex
|
||||
>>> rdd.mapPartitionsWithIndex(f).sum()
|
||||
6
|
||||
"""
|
||||
return PipelinedRDD(self, f, preservesPartitioning)
|
||||
|
||||
def mapPartitionsWithSplit(self, f, preservesPartitioning=False):
|
||||
"""
|
||||
Deprecated: use mapPartitionsWithIndex instead.
|
||||
|
||||
Return a new RDD by applying a function to each partition of this RDD,
|
||||
while tracking the index of the original partition.
|
||||
|
||||
|
@ -203,7 +218,9 @@ class RDD(object):
|
|||
>>> rdd.mapPartitionsWithSplit(f).sum()
|
||||
6
|
||||
"""
|
||||
return PipelinedRDD(self, f, preservesPartitioning)
|
||||
warnings.warn("mapPartitionsWithSplit is deprecated; "
|
||||
"use mapPartitionsWithIndex instead", DeprecationWarning, stacklevel=2)
|
||||
return self.mapPartitionsWithIndex(f, preservesPartitioning)
|
||||
|
||||
def filter(self, f):
|
||||
"""
|
||||
|
@ -235,7 +252,7 @@ class RDD(object):
|
|||
>>> sc.parallelize(range(0, 100)).sample(False, 0.1, 2).collect() #doctest: +SKIP
|
||||
[2, 3, 20, 21, 24, 41, 42, 66, 67, 89, 90, 98]
|
||||
"""
|
||||
return self.mapPartitionsWithSplit(RDDSampler(withReplacement, fraction, seed).func, True)
|
||||
return self.mapPartitionsWithIndex(RDDSampler(withReplacement, fraction, seed).func, True)
|
||||
|
||||
# this is ported from scala/spark/RDD.scala
|
||||
def takeSample(self, withReplacement, num, seed):
|
||||
|
|
Loading…
Reference in a new issue