ee1de66fe4
### What changes were proposed in this pull request? I add a new API in pyspark RDD class: def collectWithJobGroup(self, groupId, description, interruptOnCancel=False) This API do the same thing with `rdd.collect`, but it can specify the job group when do collect. The purpose of adding this API is, if we use: ``` sc.setJobGroup("group-id...") rdd.collect() ``` The `setJobGroup` API in pyspark won't work correctly. This related to a bug discussed in https://issues.apache.org/jira/browse/SPARK-31549 Note: This PR is a rather temporary workaround for `PYSPARK_PIN_THREAD`, and as a step to migrate to `PYSPARK_PIN_THREAD` smoothly. It targets Spark 3.0. - `PYSPARK_PIN_THREAD` is unstable at this moment that affects whole PySpark applications. - It is impossible to make it runtime configuration as it has to be set before JVM is launched. - There is a thread leak issue between Python and JVM. We should address but it's not a release blocker for Spark 3.0 since the feature is experimental. I plan to handle this after Spark 3.0 due to stability. Once `PYSPARK_PIN_THREAD` is enabled by default, we should remove this API out ideally. I will target to deprecate this API in Spark 3.1. ### Why are the changes needed? Fix bug. ### Does this PR introduce any user-facing change? A develop API in pyspark: `pyspark.RDD. collectWithJobGroup` ### How was this patch tested? Unit test. Closes #28395 from WeichenXu123/collect_with_job_group. Authored-by: Weichen Xu <weichen.xu@databricks.com> Signed-off-by: HyukjinKwon <gurwls223@apache.org>
2697 lines
102 KiB
Python
2697 lines
102 KiB
Python
#
|
|
# Licensed to the Apache Software Foundation (ASF) under one or more
|
|
# contributor license agreements. See the NOTICE file distributed with
|
|
# this work for additional information regarding copyright ownership.
|
|
# The ASF licenses this file to You under the Apache License, Version 2.0
|
|
# (the "License"); you may not use this file except in compliance with
|
|
# the License. You may obtain a copy of the License at
|
|
#
|
|
# http://www.apache.org/licenses/LICENSE-2.0
|
|
#
|
|
# Unless required by applicable law or agreed to in writing, software
|
|
# distributed under the License is distributed on an "AS IS" BASIS,
|
|
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
|
|
# See the License for the specific language governing permissions and
|
|
# limitations under the License.
|
|
#
|
|
|
|
import copy
|
|
import sys
|
|
import os
|
|
import re
|
|
import operator
|
|
import shlex
|
|
import warnings
|
|
import heapq
|
|
import bisect
|
|
import random
|
|
from subprocess import Popen, PIPE
|
|
from tempfile import NamedTemporaryFile
|
|
from threading import Thread
|
|
from collections import defaultdict
|
|
from itertools import chain
|
|
from functools import reduce
|
|
from math import sqrt, log, isinf, isnan, pow, ceil
|
|
|
|
if sys.version > '3':
|
|
basestring = unicode = str
|
|
else:
|
|
from itertools import imap as map, ifilter as filter
|
|
|
|
from pyspark.java_gateway import local_connect_and_auth
|
|
from pyspark.serializers import AutoBatchedSerializer, BatchedSerializer, NoOpSerializer, \
|
|
CartesianDeserializer, CloudPickleSerializer, PairDeserializer, PickleSerializer, \
|
|
UTF8Deserializer, pack_long, read_int, write_int
|
|
from pyspark.join import python_join, python_left_outer_join, \
|
|
python_right_outer_join, python_full_outer_join, python_cogroup
|
|
from pyspark.statcounter import StatCounter
|
|
from pyspark.rddsampler import RDDSampler, RDDRangeSampler, RDDStratifiedSampler
|
|
from pyspark.storagelevel import StorageLevel
|
|
from pyspark.resource.executorrequests import ExecutorResourceRequests
|
|
from pyspark.resource.resourceprofile import ResourceProfile
|
|
from pyspark.resource.taskrequests import TaskResourceRequests
|
|
from pyspark.resultiterable import ResultIterable
|
|
from pyspark.shuffle import Aggregator, ExternalMerger, \
|
|
get_used_memory, ExternalSorter, ExternalGroupBy
|
|
from pyspark.traceback_utils import SCCallSiteSync
|
|
from pyspark.util import fail_on_stopiteration, _parse_memory
|
|
|
|
|
|
__all__ = ["RDD"]
|
|
|
|
|
|
class PythonEvalType(object):
|
|
"""
|
|
Evaluation type of python rdd.
|
|
|
|
These values are internal to PySpark.
|
|
|
|
These values should match values in org.apache.spark.api.python.PythonEvalType.
|
|
"""
|
|
NON_UDF = 0
|
|
|
|
SQL_BATCHED_UDF = 100
|
|
|
|
SQL_SCALAR_PANDAS_UDF = 200
|
|
SQL_GROUPED_MAP_PANDAS_UDF = 201
|
|
SQL_GROUPED_AGG_PANDAS_UDF = 202
|
|
SQL_WINDOW_AGG_PANDAS_UDF = 203
|
|
SQL_SCALAR_PANDAS_ITER_UDF = 204
|
|
SQL_MAP_PANDAS_ITER_UDF = 205
|
|
SQL_COGROUPED_MAP_PANDAS_UDF = 206
|
|
|
|
|
|
def portable_hash(x):
|
|
"""
|
|
This function returns consistent hash code for builtin types, especially
|
|
for None and tuple with None.
|
|
|
|
The algorithm is similar to that one used by CPython 2.7
|
|
|
|
>>> portable_hash(None)
|
|
0
|
|
>>> portable_hash((None, 1)) & 0xffffffff
|
|
219750521
|
|
"""
|
|
|
|
if sys.version_info >= (3, 2, 3) and 'PYTHONHASHSEED' not in os.environ:
|
|
raise Exception("Randomness of hash of string should be disabled via PYTHONHASHSEED")
|
|
|
|
if x is None:
|
|
return 0
|
|
if isinstance(x, tuple):
|
|
h = 0x345678
|
|
for i in x:
|
|
h ^= portable_hash(i)
|
|
h *= 1000003
|
|
h &= sys.maxsize
|
|
h ^= len(x)
|
|
if h == -1:
|
|
h = -2
|
|
return int(h)
|
|
return hash(x)
|
|
|
|
|
|
class BoundedFloat(float):
|
|
"""
|
|
Bounded value is generated by approximate job, with confidence and low
|
|
bound and high bound.
|
|
|
|
>>> BoundedFloat(100.0, 0.95, 95.0, 105.0)
|
|
100.0
|
|
"""
|
|
def __new__(cls, mean, confidence, low, high):
|
|
obj = float.__new__(cls, mean)
|
|
obj.confidence = confidence
|
|
obj.low = low
|
|
obj.high = high
|
|
return obj
|
|
|
|
|
|
def _create_local_socket(sock_info):
|
|
"""
|
|
Create a local socket that can be used to load deserialized data from the JVM
|
|
|
|
:param sock_info: Tuple containing port number and authentication secret for a local socket.
|
|
:return: sockfile file descriptor of the local socket
|
|
"""
|
|
port = sock_info[0]
|
|
auth_secret = sock_info[1]
|
|
sockfile, sock = local_connect_and_auth(port, auth_secret)
|
|
# The RDD materialization time is unpredictable, if we set a timeout for socket reading
|
|
# operation, it will very possibly fail. See SPARK-18281.
|
|
sock.settimeout(None)
|
|
return sockfile
|
|
|
|
|
|
def _load_from_socket(sock_info, serializer):
|
|
"""
|
|
Connect to a local socket described by sock_info and use the given serializer to yield data
|
|
|
|
:param sock_info: Tuple containing port number and authentication secret for a local socket.
|
|
:param serializer: The PySpark serializer to use
|
|
:return: result of Serializer.load_stream, usually a generator that yields deserialized data
|
|
"""
|
|
sockfile = _create_local_socket(sock_info)
|
|
# The socket will be automatically closed when garbage-collected.
|
|
return serializer.load_stream(sockfile)
|
|
|
|
|
|
def _local_iterator_from_socket(sock_info, serializer):
|
|
|
|
class PyLocalIterable(object):
|
|
""" Create a synchronous local iterable over a socket """
|
|
|
|
def __init__(self, _sock_info, _serializer):
|
|
port, auth_secret, self.jsocket_auth_server = _sock_info
|
|
self._sockfile = _create_local_socket((port, auth_secret))
|
|
self._serializer = _serializer
|
|
self._read_iter = iter([]) # Initialize as empty iterator
|
|
self._read_status = 1
|
|
|
|
def __iter__(self):
|
|
while self._read_status == 1:
|
|
# Request next partition data from Java
|
|
write_int(1, self._sockfile)
|
|
self._sockfile.flush()
|
|
|
|
# If response is 1 then there is a partition to read, if 0 then fully consumed
|
|
self._read_status = read_int(self._sockfile)
|
|
if self._read_status == 1:
|
|
|
|
# Load the partition data as a stream and read each item
|
|
self._read_iter = self._serializer.load_stream(self._sockfile)
|
|
for item in self._read_iter:
|
|
yield item
|
|
|
|
# An error occurred, join serving thread and raise any exceptions from the JVM
|
|
elif self._read_status == -1:
|
|
self.jsocket_auth_server.getResult()
|
|
|
|
def __del__(self):
|
|
# If local iterator is not fully consumed,
|
|
if self._read_status == 1:
|
|
try:
|
|
# Finish consuming partition data stream
|
|
for _ in self._read_iter:
|
|
pass
|
|
# Tell Java to stop sending data and close connection
|
|
write_int(0, self._sockfile)
|
|
self._sockfile.flush()
|
|
except Exception:
|
|
# Ignore any errors, socket is automatically closed when garbage-collected
|
|
pass
|
|
|
|
return iter(PyLocalIterable(sock_info, serializer))
|
|
|
|
|
|
def ignore_unicode_prefix(f):
|
|
"""
|
|
Ignore the 'u' prefix of string in doc tests, to make it works
|
|
in both python 2 and 3
|
|
"""
|
|
if sys.version >= '3':
|
|
# the representation of unicode string in Python 3 does not have prefix 'u',
|
|
# so remove the prefix 'u' for doc tests
|
|
literal_re = re.compile(r"(\W|^)[uU](['])", re.UNICODE)
|
|
f.__doc__ = literal_re.sub(r'\1\2', f.__doc__)
|
|
return f
|
|
|
|
|
|
class Partitioner(object):
|
|
def __init__(self, numPartitions, partitionFunc):
|
|
self.numPartitions = numPartitions
|
|
self.partitionFunc = partitionFunc
|
|
|
|
def __eq__(self, other):
|
|
return (isinstance(other, Partitioner) and self.numPartitions == other.numPartitions
|
|
and self.partitionFunc == other.partitionFunc)
|
|
|
|
def __call__(self, k):
|
|
return self.partitionFunc(k) % self.numPartitions
|
|
|
|
|
|
class RDD(object):
|
|
|
|
"""
|
|
A Resilient Distributed Dataset (RDD), the basic abstraction in Spark.
|
|
Represents an immutable, partitioned collection of elements that can be
|
|
operated on in parallel.
|
|
"""
|
|
|
|
def __init__(self, jrdd, ctx, jrdd_deserializer=AutoBatchedSerializer(PickleSerializer())):
|
|
self._jrdd = jrdd
|
|
self.is_cached = False
|
|
self.is_checkpointed = False
|
|
self.has_resource_profile = False
|
|
self.ctx = ctx
|
|
self._jrdd_deserializer = jrdd_deserializer
|
|
self._id = jrdd.id()
|
|
self.partitioner = None
|
|
|
|
def _pickled(self):
|
|
return self._reserialize(AutoBatchedSerializer(PickleSerializer()))
|
|
|
|
def id(self):
|
|
"""
|
|
A unique ID for this RDD (within its SparkContext).
|
|
"""
|
|
return self._id
|
|
|
|
def __repr__(self):
|
|
return self._jrdd.toString()
|
|
|
|
def __getnewargs__(self):
|
|
# This method is called when attempting to pickle an RDD, which is always an error:
|
|
raise Exception(
|
|
"It appears that you are attempting to broadcast an RDD or reference an RDD from an "
|
|
"action or transformation. RDD transformations and actions can only be invoked by the "
|
|
"driver, not inside of other transformations; for example, "
|
|
"rdd1.map(lambda x: rdd2.values.count() * x) is invalid because the values "
|
|
"transformation and count action cannot be performed inside of the rdd1.map "
|
|
"transformation. For more information, see SPARK-5063."
|
|
)
|
|
|
|
@property
|
|
def context(self):
|
|
"""
|
|
The :class:`SparkContext` that this RDD was created on.
|
|
"""
|
|
return self.ctx
|
|
|
|
def cache(self):
|
|
"""
|
|
Persist this RDD with the default storage level (`MEMORY_ONLY`).
|
|
"""
|
|
self.is_cached = True
|
|
self.persist(StorageLevel.MEMORY_ONLY)
|
|
return self
|
|
|
|
def persist(self, storageLevel=StorageLevel.MEMORY_ONLY):
|
|
"""
|
|
Set this RDD's storage level to persist its values across operations
|
|
after the first time it is computed. This can only be used to assign
|
|
a new storage level if the RDD does not have a storage level set yet.
|
|
If no storage level is specified defaults to (`MEMORY_ONLY`).
|
|
|
|
>>> rdd = sc.parallelize(["b", "a", "c"])
|
|
>>> rdd.persist().is_cached
|
|
True
|
|
"""
|
|
self.is_cached = True
|
|
javaStorageLevel = self.ctx._getJavaStorageLevel(storageLevel)
|
|
self._jrdd.persist(javaStorageLevel)
|
|
return self
|
|
|
|
def unpersist(self, blocking=False):
|
|
"""
|
|
Mark the RDD as non-persistent, and remove all blocks for it from
|
|
memory and disk.
|
|
|
|
.. versionchanged:: 3.0.0
|
|
Added optional argument `blocking` to specify whether to block until all
|
|
blocks are deleted.
|
|
"""
|
|
self.is_cached = False
|
|
self._jrdd.unpersist(blocking)
|
|
return self
|
|
|
|
def checkpoint(self):
|
|
"""
|
|
Mark this RDD for checkpointing. It will be saved to a file inside the
|
|
checkpoint directory set with :meth:`SparkContext.setCheckpointDir` and
|
|
all references to its parent RDDs will be removed. This function must
|
|
be called before any job has been executed on this RDD. It is strongly
|
|
recommended that this RDD is persisted in memory, otherwise saving it
|
|
on a file will require recomputation.
|
|
"""
|
|
self.is_checkpointed = True
|
|
self._jrdd.rdd().checkpoint()
|
|
|
|
def isCheckpointed(self):
|
|
"""
|
|
Return whether this RDD is checkpointed and materialized, either reliably or locally.
|
|
"""
|
|
return self._jrdd.rdd().isCheckpointed()
|
|
|
|
def localCheckpoint(self):
|
|
"""
|
|
Mark this RDD for local checkpointing using Spark's existing caching layer.
|
|
|
|
This method is for users who wish to truncate RDD lineages while skipping the expensive
|
|
step of replicating the materialized data in a reliable distributed file system. This is
|
|
useful for RDDs with long lineages that need to be truncated periodically (e.g. GraphX).
|
|
|
|
Local checkpointing sacrifices fault-tolerance for performance. In particular, checkpointed
|
|
data is written to ephemeral local storage in the executors instead of to a reliable,
|
|
fault-tolerant storage. The effect is that if an executor fails during the computation,
|
|
the checkpointed data may no longer be accessible, causing an irrecoverable job failure.
|
|
|
|
This is NOT safe to use with dynamic allocation, which removes executors along
|
|
with their cached blocks. If you must use both features, you are advised to set
|
|
`spark.dynamicAllocation.cachedExecutorIdleTimeout` to a high value.
|
|
|
|
The checkpoint directory set through :meth:`SparkContext.setCheckpointDir` is not used.
|
|
"""
|
|
self._jrdd.rdd().localCheckpoint()
|
|
|
|
def isLocallyCheckpointed(self):
|
|
"""
|
|
Return whether this RDD is marked for local checkpointing.
|
|
|
|
Exposed for testing.
|
|
"""
|
|
return self._jrdd.rdd().isLocallyCheckpointed()
|
|
|
|
def getCheckpointFile(self):
|
|
"""
|
|
Gets the name of the file to which this RDD was checkpointed
|
|
|
|
Not defined if RDD is checkpointed locally.
|
|
"""
|
|
checkpointFile = self._jrdd.rdd().getCheckpointFile()
|
|
if checkpointFile.isDefined():
|
|
return checkpointFile.get()
|
|
|
|
def map(self, f, preservesPartitioning=False):
|
|
"""
|
|
Return a new RDD by applying a function to each element of this RDD.
|
|
|
|
>>> rdd = sc.parallelize(["b", "a", "c"])
|
|
>>> sorted(rdd.map(lambda x: (x, 1)).collect())
|
|
[('a', 1), ('b', 1), ('c', 1)]
|
|
"""
|
|
def func(_, iterator):
|
|
return map(fail_on_stopiteration(f), iterator)
|
|
return self.mapPartitionsWithIndex(func, preservesPartitioning)
|
|
|
|
def flatMap(self, f, preservesPartitioning=False):
|
|
"""
|
|
Return a new RDD by first applying a function to all elements of this
|
|
RDD, and then flattening the results.
|
|
|
|
>>> rdd = sc.parallelize([2, 3, 4])
|
|
>>> sorted(rdd.flatMap(lambda x: range(1, x)).collect())
|
|
[1, 1, 1, 2, 2, 3]
|
|
>>> sorted(rdd.flatMap(lambda x: [(x, x), (x, x)]).collect())
|
|
[(2, 2), (2, 2), (3, 3), (3, 3), (4, 4), (4, 4)]
|
|
"""
|
|
def func(s, iterator):
|
|
return chain.from_iterable(map(fail_on_stopiteration(f), iterator))
|
|
return self.mapPartitionsWithIndex(func, preservesPartitioning)
|
|
|
|
def mapPartitions(self, f, preservesPartitioning=False):
|
|
"""
|
|
Return a new RDD by applying a function to each partition of this RDD.
|
|
|
|
>>> rdd = sc.parallelize([1, 2, 3, 4], 2)
|
|
>>> def f(iterator): yield sum(iterator)
|
|
>>> rdd.mapPartitions(f).collect()
|
|
[3, 7]
|
|
"""
|
|
def func(s, iterator):
|
|
return f(iterator)
|
|
return self.mapPartitionsWithIndex(func, preservesPartitioning)
|
|
|
|
def mapPartitionsWithIndex(self, f, preservesPartitioning=False):
|
|
"""
|
|
Return a new RDD by applying a function to each partition of this RDD,
|
|
while tracking the index of the original partition.
|
|
|
|
>>> rdd = sc.parallelize([1, 2, 3, 4], 4)
|
|
>>> def f(splitIndex, iterator): yield splitIndex
|
|
>>> rdd.mapPartitionsWithIndex(f).sum()
|
|
6
|
|
"""
|
|
return PipelinedRDD(self, f, preservesPartitioning)
|
|
|
|
def mapPartitionsWithSplit(self, f, preservesPartitioning=False):
|
|
"""
|
|
Deprecated: use mapPartitionsWithIndex instead.
|
|
|
|
Return a new RDD by applying a function to each partition of this RDD,
|
|
while tracking the index of the original partition.
|
|
|
|
>>> rdd = sc.parallelize([1, 2, 3, 4], 4)
|
|
>>> def f(splitIndex, iterator): yield splitIndex
|
|
>>> rdd.mapPartitionsWithSplit(f).sum()
|
|
6
|
|
"""
|
|
warnings.warn("mapPartitionsWithSplit is deprecated; "
|
|
"use mapPartitionsWithIndex instead", DeprecationWarning, stacklevel=2)
|
|
return self.mapPartitionsWithIndex(f, preservesPartitioning)
|
|
|
|
def getNumPartitions(self):
|
|
"""
|
|
Returns the number of partitions in RDD
|
|
|
|
>>> rdd = sc.parallelize([1, 2, 3, 4], 2)
|
|
>>> rdd.getNumPartitions()
|
|
2
|
|
"""
|
|
return self._jrdd.partitions().size()
|
|
|
|
def filter(self, f):
|
|
"""
|
|
Return a new RDD containing only the elements that satisfy a predicate.
|
|
|
|
>>> rdd = sc.parallelize([1, 2, 3, 4, 5])
|
|
>>> rdd.filter(lambda x: x % 2 == 0).collect()
|
|
[2, 4]
|
|
"""
|
|
def func(iterator):
|
|
return filter(fail_on_stopiteration(f), iterator)
|
|
return self.mapPartitions(func, True)
|
|
|
|
def distinct(self, numPartitions=None):
|
|
"""
|
|
Return a new RDD containing the distinct elements in this RDD.
|
|
|
|
>>> sorted(sc.parallelize([1, 1, 2, 3]).distinct().collect())
|
|
[1, 2, 3]
|
|
"""
|
|
return self.map(lambda x: (x, None)) \
|
|
.reduceByKey(lambda x, _: x, numPartitions) \
|
|
.map(lambda x: x[0])
|
|
|
|
def sample(self, withReplacement, fraction, seed=None):
|
|
"""
|
|
Return a sampled subset of this RDD.
|
|
|
|
:param withReplacement: can elements be sampled multiple times (replaced when sampled out)
|
|
:param fraction: expected size of the sample as a fraction of this RDD's size
|
|
without replacement: probability that each element is chosen; fraction must be [0, 1]
|
|
with replacement: expected number of times each element is chosen; fraction must be >= 0
|
|
:param seed: seed for the random number generator
|
|
|
|
.. note:: This is not guaranteed to provide exactly the fraction specified of the total
|
|
count of the given :class:`DataFrame`.
|
|
|
|
>>> rdd = sc.parallelize(range(100), 4)
|
|
>>> 6 <= rdd.sample(False, 0.1, 81).count() <= 14
|
|
True
|
|
"""
|
|
assert fraction >= 0.0, "Negative fraction value: %s" % fraction
|
|
return self.mapPartitionsWithIndex(RDDSampler(withReplacement, fraction, seed).func, True)
|
|
|
|
def randomSplit(self, weights, seed=None):
|
|
"""
|
|
Randomly splits this RDD with the provided weights.
|
|
|
|
:param weights: weights for splits, will be normalized if they don't sum to 1
|
|
:param seed: random seed
|
|
:return: split RDDs in a list
|
|
|
|
>>> rdd = sc.parallelize(range(500), 1)
|
|
>>> rdd1, rdd2 = rdd.randomSplit([2, 3], 17)
|
|
>>> len(rdd1.collect() + rdd2.collect())
|
|
500
|
|
>>> 150 < rdd1.count() < 250
|
|
True
|
|
>>> 250 < rdd2.count() < 350
|
|
True
|
|
"""
|
|
s = float(sum(weights))
|
|
cweights = [0.0]
|
|
for w in weights:
|
|
cweights.append(cweights[-1] + w / s)
|
|
if seed is None:
|
|
seed = random.randint(0, 2 ** 32 - 1)
|
|
return [self.mapPartitionsWithIndex(RDDRangeSampler(lb, ub, seed).func, True)
|
|
for lb, ub in zip(cweights, cweights[1:])]
|
|
|
|
# this is ported from scala/spark/RDD.scala
|
|
def takeSample(self, withReplacement, num, seed=None):
|
|
"""
|
|
Return a fixed-size sampled subset of this RDD.
|
|
|
|
.. note:: This method should only be used if the resulting array is expected
|
|
to be small, as all the data is loaded into the driver's memory.
|
|
|
|
>>> rdd = sc.parallelize(range(0, 10))
|
|
>>> len(rdd.takeSample(True, 20, 1))
|
|
20
|
|
>>> len(rdd.takeSample(False, 5, 2))
|
|
5
|
|
>>> len(rdd.takeSample(False, 15, 3))
|
|
10
|
|
"""
|
|
numStDev = 10.0
|
|
|
|
if num < 0:
|
|
raise ValueError("Sample size cannot be negative.")
|
|
elif num == 0:
|
|
return []
|
|
|
|
initialCount = self.count()
|
|
if initialCount == 0:
|
|
return []
|
|
|
|
rand = random.Random(seed)
|
|
|
|
if (not withReplacement) and num >= initialCount:
|
|
# shuffle current RDD and return
|
|
samples = self.collect()
|
|
rand.shuffle(samples)
|
|
return samples
|
|
|
|
maxSampleSize = sys.maxsize - int(numStDev * sqrt(sys.maxsize))
|
|
if num > maxSampleSize:
|
|
raise ValueError(
|
|
"Sample size cannot be greater than %d." % maxSampleSize)
|
|
|
|
fraction = RDD._computeFractionForSampleSize(
|
|
num, initialCount, withReplacement)
|
|
samples = self.sample(withReplacement, fraction, seed).collect()
|
|
|
|
# If the first sample didn't turn out large enough, keep trying to take samples;
|
|
# this shouldn't happen often because we use a big multiplier for their initial size.
|
|
# See: scala/spark/RDD.scala
|
|
while len(samples) < num:
|
|
# TODO: add log warning for when more than one iteration was run
|
|
seed = rand.randint(0, sys.maxsize)
|
|
samples = self.sample(withReplacement, fraction, seed).collect()
|
|
|
|
rand.shuffle(samples)
|
|
|
|
return samples[0:num]
|
|
|
|
@staticmethod
|
|
def _computeFractionForSampleSize(sampleSizeLowerBound, total, withReplacement):
|
|
"""
|
|
Returns a sampling rate that guarantees a sample of
|
|
size >= sampleSizeLowerBound 99.99% of the time.
|
|
|
|
How the sampling rate is determined:
|
|
Let p = num / total, where num is the sample size and total is the
|
|
total number of data points in the RDD. We're trying to compute
|
|
q > p such that
|
|
- when sampling with replacement, we're drawing each data point
|
|
with prob_i ~ Pois(q), where we want to guarantee
|
|
Pr[s < num] < 0.0001 for s = sum(prob_i for i from 0 to
|
|
total), i.e. the failure rate of not having a sufficiently large
|
|
sample < 0.0001. Setting q = p + 5 * sqrt(p/total) is sufficient
|
|
to guarantee 0.9999 success rate for num > 12, but we need a
|
|
slightly larger q (9 empirically determined).
|
|
- when sampling without replacement, we're drawing each data point
|
|
with prob_i ~ Binomial(total, fraction) and our choice of q
|
|
guarantees 1-delta, or 0.9999 success rate, where success rate is
|
|
defined the same as in sampling with replacement.
|
|
"""
|
|
fraction = float(sampleSizeLowerBound) / total
|
|
if withReplacement:
|
|
numStDev = 5
|
|
if (sampleSizeLowerBound < 12):
|
|
numStDev = 9
|
|
return fraction + numStDev * sqrt(fraction / total)
|
|
else:
|
|
delta = 0.00005
|
|
gamma = - log(delta) / total
|
|
return min(1, fraction + gamma + sqrt(gamma * gamma + 2 * gamma * fraction))
|
|
|
|
def union(self, other):
|
|
"""
|
|
Return the union of this RDD and another one.
|
|
|
|
>>> rdd = sc.parallelize([1, 1, 2, 3])
|
|
>>> rdd.union(rdd).collect()
|
|
[1, 1, 2, 3, 1, 1, 2, 3]
|
|
"""
|
|
if self._jrdd_deserializer == other._jrdd_deserializer:
|
|
rdd = RDD(self._jrdd.union(other._jrdd), self.ctx,
|
|
self._jrdd_deserializer)
|
|
else:
|
|
# These RDDs contain data in different serialized formats, so we
|
|
# must normalize them to the default serializer.
|
|
self_copy = self._reserialize()
|
|
other_copy = other._reserialize()
|
|
rdd = RDD(self_copy._jrdd.union(other_copy._jrdd), self.ctx,
|
|
self.ctx.serializer)
|
|
if (self.partitioner == other.partitioner and
|
|
self.getNumPartitions() == rdd.getNumPartitions()):
|
|
rdd.partitioner = self.partitioner
|
|
return rdd
|
|
|
|
def intersection(self, other):
|
|
"""
|
|
Return the intersection of this RDD and another one. The output will
|
|
not contain any duplicate elements, even if the input RDDs did.
|
|
|
|
.. note:: This method performs a shuffle internally.
|
|
|
|
>>> rdd1 = sc.parallelize([1, 10, 2, 3, 4, 5])
|
|
>>> rdd2 = sc.parallelize([1, 6, 2, 3, 7, 8])
|
|
>>> rdd1.intersection(rdd2).collect()
|
|
[1, 2, 3]
|
|
"""
|
|
return self.map(lambda v: (v, None)) \
|
|
.cogroup(other.map(lambda v: (v, None))) \
|
|
.filter(lambda k_vs: all(k_vs[1])) \
|
|
.keys()
|
|
|
|
def _reserialize(self, serializer=None):
|
|
serializer = serializer or self.ctx.serializer
|
|
if self._jrdd_deserializer != serializer:
|
|
self = self.map(lambda x: x, preservesPartitioning=True)
|
|
self._jrdd_deserializer = serializer
|
|
return self
|
|
|
|
def __add__(self, other):
|
|
"""
|
|
Return the union of this RDD and another one.
|
|
|
|
>>> rdd = sc.parallelize([1, 1, 2, 3])
|
|
>>> (rdd + rdd).collect()
|
|
[1, 1, 2, 3, 1, 1, 2, 3]
|
|
"""
|
|
if not isinstance(other, RDD):
|
|
raise TypeError
|
|
return self.union(other)
|
|
|
|
def repartitionAndSortWithinPartitions(self, numPartitions=None, partitionFunc=portable_hash,
|
|
ascending=True, keyfunc=lambda x: x):
|
|
"""
|
|
Repartition the RDD according to the given partitioner and, within each resulting partition,
|
|
sort records by their keys.
|
|
|
|
>>> rdd = sc.parallelize([(0, 5), (3, 8), (2, 6), (0, 8), (3, 8), (1, 3)])
|
|
>>> rdd2 = rdd.repartitionAndSortWithinPartitions(2, lambda x: x % 2, True)
|
|
>>> rdd2.glom().collect()
|
|
[[(0, 5), (0, 8), (2, 6)], [(1, 3), (3, 8), (3, 8)]]
|
|
"""
|
|
if numPartitions is None:
|
|
numPartitions = self._defaultReducePartitions()
|
|
|
|
memory = self._memory_limit()
|
|
serializer = self._jrdd_deserializer
|
|
|
|
def sortPartition(iterator):
|
|
sort = ExternalSorter(memory * 0.9, serializer).sorted
|
|
return iter(sort(iterator, key=lambda k_v: keyfunc(k_v[0]), reverse=(not ascending)))
|
|
|
|
return self.partitionBy(numPartitions, partitionFunc).mapPartitions(sortPartition, True)
|
|
|
|
def sortByKey(self, ascending=True, numPartitions=None, keyfunc=lambda x: x):
|
|
"""
|
|
Sorts this RDD, which is assumed to consist of (key, value) pairs.
|
|
|
|
>>> tmp = [('a', 1), ('b', 2), ('1', 3), ('d', 4), ('2', 5)]
|
|
>>> sc.parallelize(tmp).sortByKey().first()
|
|
('1', 3)
|
|
>>> sc.parallelize(tmp).sortByKey(True, 1).collect()
|
|
[('1', 3), ('2', 5), ('a', 1), ('b', 2), ('d', 4)]
|
|
>>> sc.parallelize(tmp).sortByKey(True, 2).collect()
|
|
[('1', 3), ('2', 5), ('a', 1), ('b', 2), ('d', 4)]
|
|
>>> tmp2 = [('Mary', 1), ('had', 2), ('a', 3), ('little', 4), ('lamb', 5)]
|
|
>>> tmp2.extend([('whose', 6), ('fleece', 7), ('was', 8), ('white', 9)])
|
|
>>> sc.parallelize(tmp2).sortByKey(True, 3, keyfunc=lambda k: k.lower()).collect()
|
|
[('a', 3), ('fleece', 7), ('had', 2), ('lamb', 5),...('white', 9), ('whose', 6)]
|
|
"""
|
|
if numPartitions is None:
|
|
numPartitions = self._defaultReducePartitions()
|
|
|
|
memory = self._memory_limit()
|
|
serializer = self._jrdd_deserializer
|
|
|
|
def sortPartition(iterator):
|
|
sort = ExternalSorter(memory * 0.9, serializer).sorted
|
|
return iter(sort(iterator, key=lambda kv: keyfunc(kv[0]), reverse=(not ascending)))
|
|
|
|
if numPartitions == 1:
|
|
if self.getNumPartitions() > 1:
|
|
self = self.coalesce(1)
|
|
return self.mapPartitions(sortPartition, True)
|
|
|
|
# first compute the boundary of each part via sampling: we want to partition
|
|
# the key-space into bins such that the bins have roughly the same
|
|
# number of (key, value) pairs falling into them
|
|
rddSize = self.count()
|
|
if not rddSize:
|
|
return self # empty RDD
|
|
maxSampleSize = numPartitions * 20.0 # constant from Spark's RangePartitioner
|
|
fraction = min(maxSampleSize / max(rddSize, 1), 1.0)
|
|
samples = self.sample(False, fraction, 1).map(lambda kv: kv[0]).collect()
|
|
samples = sorted(samples, key=keyfunc)
|
|
|
|
# we have numPartitions many parts but one of the them has
|
|
# an implicit boundary
|
|
bounds = [samples[int(len(samples) * (i + 1) / numPartitions)]
|
|
for i in range(0, numPartitions - 1)]
|
|
|
|
def rangePartitioner(k):
|
|
p = bisect.bisect_left(bounds, keyfunc(k))
|
|
if ascending:
|
|
return p
|
|
else:
|
|
return numPartitions - 1 - p
|
|
|
|
return self.partitionBy(numPartitions, rangePartitioner).mapPartitions(sortPartition, True)
|
|
|
|
def sortBy(self, keyfunc, ascending=True, numPartitions=None):
|
|
"""
|
|
Sorts this RDD by the given keyfunc
|
|
|
|
>>> tmp = [('a', 1), ('b', 2), ('1', 3), ('d', 4), ('2', 5)]
|
|
>>> sc.parallelize(tmp).sortBy(lambda x: x[0]).collect()
|
|
[('1', 3), ('2', 5), ('a', 1), ('b', 2), ('d', 4)]
|
|
>>> sc.parallelize(tmp).sortBy(lambda x: x[1]).collect()
|
|
[('a', 1), ('b', 2), ('1', 3), ('d', 4), ('2', 5)]
|
|
"""
|
|
return self.keyBy(keyfunc).sortByKey(ascending, numPartitions).values()
|
|
|
|
def glom(self):
|
|
"""
|
|
Return an RDD created by coalescing all elements within each partition
|
|
into a list.
|
|
|
|
>>> rdd = sc.parallelize([1, 2, 3, 4], 2)
|
|
>>> sorted(rdd.glom().collect())
|
|
[[1, 2], [3, 4]]
|
|
"""
|
|
def func(iterator):
|
|
yield list(iterator)
|
|
return self.mapPartitions(func)
|
|
|
|
def cartesian(self, other):
|
|
"""
|
|
Return the Cartesian product of this RDD and another one, that is, the
|
|
RDD of all pairs of elements ``(a, b)`` where ``a`` is in `self` and
|
|
``b`` is in `other`.
|
|
|
|
>>> rdd = sc.parallelize([1, 2])
|
|
>>> sorted(rdd.cartesian(rdd).collect())
|
|
[(1, 1), (1, 2), (2, 1), (2, 2)]
|
|
"""
|
|
# Due to batching, we can't use the Java cartesian method.
|
|
deserializer = CartesianDeserializer(self._jrdd_deserializer,
|
|
other._jrdd_deserializer)
|
|
return RDD(self._jrdd.cartesian(other._jrdd), self.ctx, deserializer)
|
|
|
|
def groupBy(self, f, numPartitions=None, partitionFunc=portable_hash):
|
|
"""
|
|
Return an RDD of grouped items.
|
|
|
|
>>> rdd = sc.parallelize([1, 1, 2, 3, 5, 8])
|
|
>>> result = rdd.groupBy(lambda x: x % 2).collect()
|
|
>>> sorted([(x, sorted(y)) for (x, y) in result])
|
|
[(0, [2, 8]), (1, [1, 1, 3, 5])]
|
|
"""
|
|
return self.map(lambda x: (f(x), x)).groupByKey(numPartitions, partitionFunc)
|
|
|
|
@ignore_unicode_prefix
|
|
def pipe(self, command, env=None, checkCode=False):
|
|
"""
|
|
Return an RDD created by piping elements to a forked external process.
|
|
|
|
>>> sc.parallelize(['1', '2', '', '3']).pipe('cat').collect()
|
|
[u'1', u'2', u'', u'3']
|
|
|
|
:param checkCode: whether or not to check the return value of the shell command.
|
|
"""
|
|
if env is None:
|
|
env = dict()
|
|
|
|
def func(iterator):
|
|
pipe = Popen(
|
|
shlex.split(command), env=env, stdin=PIPE, stdout=PIPE)
|
|
|
|
def pipe_objs(out):
|
|
for obj in iterator:
|
|
s = unicode(obj).rstrip('\n') + '\n'
|
|
out.write(s.encode('utf-8'))
|
|
out.close()
|
|
Thread(target=pipe_objs, args=[pipe.stdin]).start()
|
|
|
|
def check_return_code():
|
|
pipe.wait()
|
|
if checkCode and pipe.returncode:
|
|
raise Exception("Pipe function `%s' exited "
|
|
"with error code %d" % (command, pipe.returncode))
|
|
else:
|
|
for i in range(0):
|
|
yield i
|
|
return (x.rstrip(b'\n').decode('utf-8') for x in
|
|
chain(iter(pipe.stdout.readline, b''), check_return_code()))
|
|
return self.mapPartitions(func)
|
|
|
|
def foreach(self, f):
|
|
"""
|
|
Applies a function to all elements of this RDD.
|
|
|
|
>>> def f(x): print(x)
|
|
>>> sc.parallelize([1, 2, 3, 4, 5]).foreach(f)
|
|
"""
|
|
f = fail_on_stopiteration(f)
|
|
|
|
def processPartition(iterator):
|
|
for x in iterator:
|
|
f(x)
|
|
return iter([])
|
|
self.mapPartitions(processPartition).count() # Force evaluation
|
|
|
|
def foreachPartition(self, f):
|
|
"""
|
|
Applies a function to each partition of this RDD.
|
|
|
|
>>> def f(iterator):
|
|
... for x in iterator:
|
|
... print(x)
|
|
>>> sc.parallelize([1, 2, 3, 4, 5]).foreachPartition(f)
|
|
"""
|
|
def func(it):
|
|
r = f(it)
|
|
try:
|
|
return iter(r)
|
|
except TypeError:
|
|
return iter([])
|
|
self.mapPartitions(func).count() # Force evaluation
|
|
|
|
def collect(self):
|
|
"""
|
|
Return a list that contains all of the elements in this RDD.
|
|
|
|
.. note:: This method should only be used if the resulting array is expected
|
|
to be small, as all the data is loaded into the driver's memory.
|
|
"""
|
|
with SCCallSiteSync(self.context) as css:
|
|
sock_info = self.ctx._jvm.PythonRDD.collectAndServe(self._jrdd.rdd())
|
|
return list(_load_from_socket(sock_info, self._jrdd_deserializer))
|
|
|
|
def collectWithJobGroup(self, groupId, description, interruptOnCancel=False):
|
|
"""
|
|
.. note:: Experimental
|
|
|
|
When collect rdd, use this method to specify job group.
|
|
|
|
.. versionadded:: 3.0.0
|
|
"""
|
|
with SCCallSiteSync(self.context) as css:
|
|
sock_info = self.ctx._jvm.PythonRDD.collectAndServeWithJobGroup(
|
|
self._jrdd.rdd(), groupId, description, interruptOnCancel)
|
|
return list(_load_from_socket(sock_info, self._jrdd_deserializer))
|
|
|
|
def reduce(self, f):
|
|
"""
|
|
Reduces the elements of this RDD using the specified commutative and
|
|
associative binary operator. Currently reduces partitions locally.
|
|
|
|
>>> from operator import add
|
|
>>> sc.parallelize([1, 2, 3, 4, 5]).reduce(add)
|
|
15
|
|
>>> sc.parallelize((2 for _ in range(10))).map(lambda x: 1).cache().reduce(add)
|
|
10
|
|
>>> sc.parallelize([]).reduce(add)
|
|
Traceback (most recent call last):
|
|
...
|
|
ValueError: Can not reduce() empty RDD
|
|
"""
|
|
f = fail_on_stopiteration(f)
|
|
|
|
def func(iterator):
|
|
iterator = iter(iterator)
|
|
try:
|
|
initial = next(iterator)
|
|
except StopIteration:
|
|
return
|
|
yield reduce(f, iterator, initial)
|
|
|
|
vals = self.mapPartitions(func).collect()
|
|
if vals:
|
|
return reduce(f, vals)
|
|
raise ValueError("Can not reduce() empty RDD")
|
|
|
|
def treeReduce(self, f, depth=2):
|
|
"""
|
|
Reduces the elements of this RDD in a multi-level tree pattern.
|
|
|
|
:param depth: suggested depth of the tree (default: 2)
|
|
|
|
>>> add = lambda x, y: x + y
|
|
>>> rdd = sc.parallelize([-5, -4, -3, -2, -1, 1, 2, 3, 4], 10)
|
|
>>> rdd.treeReduce(add)
|
|
-5
|
|
>>> rdd.treeReduce(add, 1)
|
|
-5
|
|
>>> rdd.treeReduce(add, 2)
|
|
-5
|
|
>>> rdd.treeReduce(add, 5)
|
|
-5
|
|
>>> rdd.treeReduce(add, 10)
|
|
-5
|
|
"""
|
|
if depth < 1:
|
|
raise ValueError("Depth cannot be smaller than 1 but got %d." % depth)
|
|
|
|
zeroValue = None, True # Use the second entry to indicate whether this is a dummy value.
|
|
|
|
def op(x, y):
|
|
if x[1]:
|
|
return y
|
|
elif y[1]:
|
|
return x
|
|
else:
|
|
return f(x[0], y[0]), False
|
|
|
|
reduced = self.map(lambda x: (x, False)).treeAggregate(zeroValue, op, op, depth)
|
|
if reduced[1]:
|
|
raise ValueError("Cannot reduce empty RDD.")
|
|
return reduced[0]
|
|
|
|
def fold(self, zeroValue, op):
|
|
"""
|
|
Aggregate the elements of each partition, and then the results for all
|
|
the partitions, using a given associative function and a neutral "zero value."
|
|
|
|
The function ``op(t1, t2)`` is allowed to modify ``t1`` and return it
|
|
as its result value to avoid object allocation; however, it should not
|
|
modify ``t2``.
|
|
|
|
This behaves somewhat differently from fold operations implemented
|
|
for non-distributed collections in functional languages like Scala.
|
|
This fold operation may be applied to partitions individually, and then
|
|
fold those results into the final result, rather than apply the fold
|
|
to each element sequentially in some defined ordering. For functions
|
|
that are not commutative, the result may differ from that of a fold
|
|
applied to a non-distributed collection.
|
|
|
|
>>> from operator import add
|
|
>>> sc.parallelize([1, 2, 3, 4, 5]).fold(0, add)
|
|
15
|
|
"""
|
|
op = fail_on_stopiteration(op)
|
|
|
|
def func(iterator):
|
|
acc = zeroValue
|
|
for obj in iterator:
|
|
acc = op(acc, obj)
|
|
yield acc
|
|
# collecting result of mapPartitions here ensures that the copy of
|
|
# zeroValue provided to each partition is unique from the one provided
|
|
# to the final reduce call
|
|
vals = self.mapPartitions(func).collect()
|
|
return reduce(op, vals, zeroValue)
|
|
|
|
def aggregate(self, zeroValue, seqOp, combOp):
|
|
"""
|
|
Aggregate the elements of each partition, and then the results for all
|
|
the partitions, using a given combine functions and a neutral "zero
|
|
value."
|
|
|
|
The functions ``op(t1, t2)`` is allowed to modify ``t1`` and return it
|
|
as its result value to avoid object allocation; however, it should not
|
|
modify ``t2``.
|
|
|
|
The first function (seqOp) can return a different result type, U, than
|
|
the type of this RDD. Thus, we need one operation for merging a T into
|
|
an U and one operation for merging two U
|
|
|
|
>>> seqOp = (lambda x, y: (x[0] + y, x[1] + 1))
|
|
>>> combOp = (lambda x, y: (x[0] + y[0], x[1] + y[1]))
|
|
>>> sc.parallelize([1, 2, 3, 4]).aggregate((0, 0), seqOp, combOp)
|
|
(10, 4)
|
|
>>> sc.parallelize([]).aggregate((0, 0), seqOp, combOp)
|
|
(0, 0)
|
|
"""
|
|
seqOp = fail_on_stopiteration(seqOp)
|
|
combOp = fail_on_stopiteration(combOp)
|
|
|
|
def func(iterator):
|
|
acc = zeroValue
|
|
for obj in iterator:
|
|
acc = seqOp(acc, obj)
|
|
yield acc
|
|
# collecting result of mapPartitions here ensures that the copy of
|
|
# zeroValue provided to each partition is unique from the one provided
|
|
# to the final reduce call
|
|
vals = self.mapPartitions(func).collect()
|
|
return reduce(combOp, vals, zeroValue)
|
|
|
|
def treeAggregate(self, zeroValue, seqOp, combOp, depth=2):
|
|
"""
|
|
Aggregates the elements of this RDD in a multi-level tree
|
|
pattern.
|
|
|
|
:param depth: suggested depth of the tree (default: 2)
|
|
|
|
>>> add = lambda x, y: x + y
|
|
>>> rdd = sc.parallelize([-5, -4, -3, -2, -1, 1, 2, 3, 4], 10)
|
|
>>> rdd.treeAggregate(0, add, add)
|
|
-5
|
|
>>> rdd.treeAggregate(0, add, add, 1)
|
|
-5
|
|
>>> rdd.treeAggregate(0, add, add, 2)
|
|
-5
|
|
>>> rdd.treeAggregate(0, add, add, 5)
|
|
-5
|
|
>>> rdd.treeAggregate(0, add, add, 10)
|
|
-5
|
|
"""
|
|
if depth < 1:
|
|
raise ValueError("Depth cannot be smaller than 1 but got %d." % depth)
|
|
|
|
if self.getNumPartitions() == 0:
|
|
return zeroValue
|
|
|
|
def aggregatePartition(iterator):
|
|
acc = zeroValue
|
|
for obj in iterator:
|
|
acc = seqOp(acc, obj)
|
|
yield acc
|
|
|
|
partiallyAggregated = self.mapPartitions(aggregatePartition)
|
|
numPartitions = partiallyAggregated.getNumPartitions()
|
|
scale = max(int(ceil(pow(numPartitions, 1.0 / depth))), 2)
|
|
# If creating an extra level doesn't help reduce the wall-clock time, we stop the tree
|
|
# aggregation.
|
|
while numPartitions > scale + numPartitions / scale:
|
|
numPartitions /= scale
|
|
curNumPartitions = int(numPartitions)
|
|
|
|
def mapPartition(i, iterator):
|
|
for obj in iterator:
|
|
yield (i % curNumPartitions, obj)
|
|
|
|
partiallyAggregated = partiallyAggregated \
|
|
.mapPartitionsWithIndex(mapPartition) \
|
|
.reduceByKey(combOp, curNumPartitions) \
|
|
.values()
|
|
|
|
return partiallyAggregated.reduce(combOp)
|
|
|
|
def max(self, key=None):
|
|
"""
|
|
Find the maximum item in this RDD.
|
|
|
|
:param key: A function used to generate key for comparing
|
|
|
|
>>> rdd = sc.parallelize([1.0, 5.0, 43.0, 10.0])
|
|
>>> rdd.max()
|
|
43.0
|
|
>>> rdd.max(key=str)
|
|
5.0
|
|
"""
|
|
if key is None:
|
|
return self.reduce(max)
|
|
return self.reduce(lambda a, b: max(a, b, key=key))
|
|
|
|
def min(self, key=None):
|
|
"""
|
|
Find the minimum item in this RDD.
|
|
|
|
:param key: A function used to generate key for comparing
|
|
|
|
>>> rdd = sc.parallelize([2.0, 5.0, 43.0, 10.0])
|
|
>>> rdd.min()
|
|
2.0
|
|
>>> rdd.min(key=str)
|
|
10.0
|
|
"""
|
|
if key is None:
|
|
return self.reduce(min)
|
|
return self.reduce(lambda a, b: min(a, b, key=key))
|
|
|
|
def sum(self):
|
|
"""
|
|
Add up the elements in this RDD.
|
|
|
|
>>> sc.parallelize([1.0, 2.0, 3.0]).sum()
|
|
6.0
|
|
"""
|
|
return self.mapPartitions(lambda x: [sum(x)]).fold(0, operator.add)
|
|
|
|
def count(self):
|
|
"""
|
|
Return the number of elements in this RDD.
|
|
|
|
>>> sc.parallelize([2, 3, 4]).count()
|
|
3
|
|
"""
|
|
return self.mapPartitions(lambda i: [sum(1 for _ in i)]).sum()
|
|
|
|
def stats(self):
|
|
"""
|
|
Return a :class:`StatCounter` object that captures the mean, variance
|
|
and count of the RDD's elements in one operation.
|
|
"""
|
|
def redFunc(left_counter, right_counter):
|
|
return left_counter.mergeStats(right_counter)
|
|
|
|
return self.mapPartitions(lambda i: [StatCounter(i)]).reduce(redFunc)
|
|
|
|
def histogram(self, buckets):
|
|
"""
|
|
Compute a histogram using the provided buckets. The buckets
|
|
are all open to the right except for the last which is closed.
|
|
e.g. [1,10,20,50] means the buckets are [1,10) [10,20) [20,50],
|
|
which means 1<=x<10, 10<=x<20, 20<=x<=50. And on the input of 1
|
|
and 50 we would have a histogram of 1,0,1.
|
|
|
|
If your histogram is evenly spaced (e.g. [0, 10, 20, 30]),
|
|
this can be switched from an O(log n) inseration to O(1) per
|
|
element (where n is the number of buckets).
|
|
|
|
Buckets must be sorted, not contain any duplicates, and have
|
|
at least two elements.
|
|
|
|
If `buckets` is a number, it will generate buckets which are
|
|
evenly spaced between the minimum and maximum of the RDD. For
|
|
example, if the min value is 0 and the max is 100, given `buckets`
|
|
as 2, the resulting buckets will be [0,50) [50,100]. `buckets` must
|
|
be at least 1. An exception is raised if the RDD contains infinity.
|
|
If the elements in the RDD do not vary (max == min), a single bucket
|
|
will be used.
|
|
|
|
The return value is a tuple of buckets and histogram.
|
|
|
|
>>> rdd = sc.parallelize(range(51))
|
|
>>> rdd.histogram(2)
|
|
([0, 25, 50], [25, 26])
|
|
>>> rdd.histogram([0, 5, 25, 50])
|
|
([0, 5, 25, 50], [5, 20, 26])
|
|
>>> rdd.histogram([0, 15, 30, 45, 60]) # evenly spaced buckets
|
|
([0, 15, 30, 45, 60], [15, 15, 15, 6])
|
|
>>> rdd = sc.parallelize(["ab", "ac", "b", "bd", "ef"])
|
|
>>> rdd.histogram(("a", "b", "c"))
|
|
(('a', 'b', 'c'), [2, 2])
|
|
"""
|
|
|
|
if isinstance(buckets, int):
|
|
if buckets < 1:
|
|
raise ValueError("number of buckets must be >= 1")
|
|
|
|
# filter out non-comparable elements
|
|
def comparable(x):
|
|
if x is None:
|
|
return False
|
|
if type(x) is float and isnan(x):
|
|
return False
|
|
return True
|
|
|
|
filtered = self.filter(comparable)
|
|
|
|
# faster than stats()
|
|
def minmax(a, b):
|
|
return min(a[0], b[0]), max(a[1], b[1])
|
|
try:
|
|
minv, maxv = filtered.map(lambda x: (x, x)).reduce(minmax)
|
|
except TypeError as e:
|
|
if " empty " in str(e):
|
|
raise ValueError("can not generate buckets from empty RDD")
|
|
raise
|
|
|
|
if minv == maxv or buckets == 1:
|
|
return [minv, maxv], [filtered.count()]
|
|
|
|
try:
|
|
inc = (maxv - minv) / buckets
|
|
except TypeError:
|
|
raise TypeError("Can not generate buckets with non-number in RDD")
|
|
|
|
if isinf(inc):
|
|
raise ValueError("Can not generate buckets with infinite value")
|
|
|
|
# keep them as integer if possible
|
|
inc = int(inc)
|
|
if inc * buckets != maxv - minv:
|
|
inc = (maxv - minv) * 1.0 / buckets
|
|
|
|
buckets = [i * inc + minv for i in range(buckets)]
|
|
buckets.append(maxv) # fix accumulated error
|
|
even = True
|
|
|
|
elif isinstance(buckets, (list, tuple)):
|
|
if len(buckets) < 2:
|
|
raise ValueError("buckets should have more than one value")
|
|
|
|
if any(i is None or isinstance(i, float) and isnan(i) for i in buckets):
|
|
raise ValueError("can not have None or NaN in buckets")
|
|
|
|
if sorted(buckets) != list(buckets):
|
|
raise ValueError("buckets should be sorted")
|
|
|
|
if len(set(buckets)) != len(buckets):
|
|
raise ValueError("buckets should not contain duplicated values")
|
|
|
|
minv = buckets[0]
|
|
maxv = buckets[-1]
|
|
even = False
|
|
inc = None
|
|
try:
|
|
steps = [buckets[i + 1] - buckets[i] for i in range(len(buckets) - 1)]
|
|
except TypeError:
|
|
pass # objects in buckets do not support '-'
|
|
else:
|
|
if max(steps) - min(steps) < 1e-10: # handle precision errors
|
|
even = True
|
|
inc = (maxv - minv) / (len(buckets) - 1)
|
|
|
|
else:
|
|
raise TypeError("buckets should be a list or tuple or number(int or long)")
|
|
|
|
def histogram(iterator):
|
|
counters = [0] * len(buckets)
|
|
for i in iterator:
|
|
if i is None or (type(i) is float and isnan(i)) or i > maxv or i < minv:
|
|
continue
|
|
t = (int((i - minv) / inc) if even
|
|
else bisect.bisect_right(buckets, i) - 1)
|
|
counters[t] += 1
|
|
# add last two together
|
|
last = counters.pop()
|
|
counters[-1] += last
|
|
return [counters]
|
|
|
|
def mergeCounters(a, b):
|
|
return [i + j for i, j in zip(a, b)]
|
|
|
|
return buckets, self.mapPartitions(histogram).reduce(mergeCounters)
|
|
|
|
def mean(self):
|
|
"""
|
|
Compute the mean of this RDD's elements.
|
|
|
|
>>> sc.parallelize([1, 2, 3]).mean()
|
|
2.0
|
|
"""
|
|
return self.stats().mean()
|
|
|
|
def variance(self):
|
|
"""
|
|
Compute the variance of this RDD's elements.
|
|
|
|
>>> sc.parallelize([1, 2, 3]).variance()
|
|
0.666...
|
|
"""
|
|
return self.stats().variance()
|
|
|
|
def stdev(self):
|
|
"""
|
|
Compute the standard deviation of this RDD's elements.
|
|
|
|
>>> sc.parallelize([1, 2, 3]).stdev()
|
|
0.816...
|
|
"""
|
|
return self.stats().stdev()
|
|
|
|
def sampleStdev(self):
|
|
"""
|
|
Compute the sample standard deviation of this RDD's elements (which
|
|
corrects for bias in estimating the standard deviation by dividing by
|
|
N-1 instead of N).
|
|
|
|
>>> sc.parallelize([1, 2, 3]).sampleStdev()
|
|
1.0
|
|
"""
|
|
return self.stats().sampleStdev()
|
|
|
|
def sampleVariance(self):
|
|
"""
|
|
Compute the sample variance of this RDD's elements (which corrects
|
|
for bias in estimating the variance by dividing by N-1 instead of N).
|
|
|
|
>>> sc.parallelize([1, 2, 3]).sampleVariance()
|
|
1.0
|
|
"""
|
|
return self.stats().sampleVariance()
|
|
|
|
def countByValue(self):
|
|
"""
|
|
Return the count of each unique value in this RDD as a dictionary of
|
|
(value, count) pairs.
|
|
|
|
>>> sorted(sc.parallelize([1, 2, 1, 2, 2], 2).countByValue().items())
|
|
[(1, 2), (2, 3)]
|
|
"""
|
|
def countPartition(iterator):
|
|
counts = defaultdict(int)
|
|
for obj in iterator:
|
|
counts[obj] += 1
|
|
yield counts
|
|
|
|
def mergeMaps(m1, m2):
|
|
for k, v in m2.items():
|
|
m1[k] += v
|
|
return m1
|
|
return self.mapPartitions(countPartition).reduce(mergeMaps)
|
|
|
|
def top(self, num, key=None):
|
|
"""
|
|
Get the top N elements from an RDD.
|
|
|
|
.. note:: This method should only be used if the resulting array is expected
|
|
to be small, as all the data is loaded into the driver's memory.
|
|
|
|
.. note:: It returns the list sorted in descending order.
|
|
|
|
>>> sc.parallelize([10, 4, 2, 12, 3]).top(1)
|
|
[12]
|
|
>>> sc.parallelize([2, 3, 4, 5, 6], 2).top(2)
|
|
[6, 5]
|
|
>>> sc.parallelize([10, 4, 2, 12, 3]).top(3, key=str)
|
|
[4, 3, 2]
|
|
"""
|
|
def topIterator(iterator):
|
|
yield heapq.nlargest(num, iterator, key=key)
|
|
|
|
def merge(a, b):
|
|
return heapq.nlargest(num, a + b, key=key)
|
|
|
|
return self.mapPartitions(topIterator).reduce(merge)
|
|
|
|
def takeOrdered(self, num, key=None):
|
|
"""
|
|
Get the N elements from an RDD ordered in ascending order or as
|
|
specified by the optional key function.
|
|
|
|
.. note:: this method should only be used if the resulting array is expected
|
|
to be small, as all the data is loaded into the driver's memory.
|
|
|
|
>>> sc.parallelize([10, 1, 2, 9, 3, 4, 5, 6, 7]).takeOrdered(6)
|
|
[1, 2, 3, 4, 5, 6]
|
|
>>> sc.parallelize([10, 1, 2, 9, 3, 4, 5, 6, 7], 2).takeOrdered(6, key=lambda x: -x)
|
|
[10, 9, 7, 6, 5, 4]
|
|
"""
|
|
|
|
def merge(a, b):
|
|
return heapq.nsmallest(num, a + b, key)
|
|
|
|
return self.mapPartitions(lambda it: [heapq.nsmallest(num, it, key)]).reduce(merge)
|
|
|
|
def take(self, num):
|
|
"""
|
|
Take the first num elements of the RDD.
|
|
|
|
It works by first scanning one partition, and use the results from
|
|
that partition to estimate the number of additional partitions needed
|
|
to satisfy the limit.
|
|
|
|
Translated from the Scala implementation in RDD#take().
|
|
|
|
.. note:: this method should only be used if the resulting array is expected
|
|
to be small, as all the data is loaded into the driver's memory.
|
|
|
|
>>> sc.parallelize([2, 3, 4, 5, 6]).cache().take(2)
|
|
[2, 3]
|
|
>>> sc.parallelize([2, 3, 4, 5, 6]).take(10)
|
|
[2, 3, 4, 5, 6]
|
|
>>> sc.parallelize(range(100), 100).filter(lambda x: x > 90).take(3)
|
|
[91, 92, 93]
|
|
"""
|
|
items = []
|
|
totalParts = self.getNumPartitions()
|
|
partsScanned = 0
|
|
|
|
while len(items) < num and partsScanned < totalParts:
|
|
# The number of partitions to try in this iteration.
|
|
# It is ok for this number to be greater than totalParts because
|
|
# we actually cap it at totalParts in runJob.
|
|
numPartsToTry = 1
|
|
if partsScanned > 0:
|
|
# If we didn't find any rows after the previous iteration,
|
|
# quadruple and retry. Otherwise, interpolate the number of
|
|
# partitions we need to try, but overestimate it by 50%.
|
|
# We also cap the estimation in the end.
|
|
if len(items) == 0:
|
|
numPartsToTry = partsScanned * 4
|
|
else:
|
|
# the first parameter of max is >=1 whenever partsScanned >= 2
|
|
numPartsToTry = int(1.5 * num * partsScanned / len(items)) - partsScanned
|
|
numPartsToTry = min(max(numPartsToTry, 1), partsScanned * 4)
|
|
|
|
left = num - len(items)
|
|
|
|
def takeUpToNumLeft(iterator):
|
|
iterator = iter(iterator)
|
|
taken = 0
|
|
while taken < left:
|
|
try:
|
|
yield next(iterator)
|
|
except StopIteration:
|
|
return
|
|
taken += 1
|
|
|
|
p = range(partsScanned, min(partsScanned + numPartsToTry, totalParts))
|
|
res = self.context.runJob(self, takeUpToNumLeft, p)
|
|
|
|
items += res
|
|
partsScanned += numPartsToTry
|
|
|
|
return items[:num]
|
|
|
|
def first(self):
|
|
"""
|
|
Return the first element in this RDD.
|
|
|
|
>>> sc.parallelize([2, 3, 4]).first()
|
|
2
|
|
>>> sc.parallelize([]).first()
|
|
Traceback (most recent call last):
|
|
...
|
|
ValueError: RDD is empty
|
|
"""
|
|
rs = self.take(1)
|
|
if rs:
|
|
return rs[0]
|
|
raise ValueError("RDD is empty")
|
|
|
|
def isEmpty(self):
|
|
"""
|
|
Returns true if and only if the RDD contains no elements at all.
|
|
|
|
.. note:: an RDD may be empty even when it has at least 1 partition.
|
|
|
|
>>> sc.parallelize([]).isEmpty()
|
|
True
|
|
>>> sc.parallelize([1]).isEmpty()
|
|
False
|
|
"""
|
|
return self.getNumPartitions() == 0 or len(self.take(1)) == 0
|
|
|
|
def saveAsNewAPIHadoopDataset(self, conf, keyConverter=None, valueConverter=None):
|
|
"""
|
|
Output a Python RDD of key-value pairs (of form ``RDD[(K, V)]``) to any Hadoop file
|
|
system, using the new Hadoop OutputFormat API (mapreduce package). Keys/values are
|
|
converted for output using either user specified converters or, by default,
|
|
"org.apache.spark.api.python.JavaToWritableConverter".
|
|
|
|
:param conf: Hadoop job configuration, passed in as a dict
|
|
:param keyConverter: (None by default)
|
|
:param valueConverter: (None by default)
|
|
"""
|
|
jconf = self.ctx._dictToJavaMap(conf)
|
|
pickledRDD = self._pickled()
|
|
self.ctx._jvm.PythonRDD.saveAsHadoopDataset(pickledRDD._jrdd, True, jconf,
|
|
keyConverter, valueConverter, True)
|
|
|
|
def saveAsNewAPIHadoopFile(self, path, outputFormatClass, keyClass=None, valueClass=None,
|
|
keyConverter=None, valueConverter=None, conf=None):
|
|
"""
|
|
Output a Python RDD of key-value pairs (of form ``RDD[(K, V)]``) to any Hadoop file
|
|
system, using the new Hadoop OutputFormat API (mapreduce package). Key and value types
|
|
will be inferred if not specified. Keys and values are converted for output using either
|
|
user specified converters or "org.apache.spark.api.python.JavaToWritableConverter". The
|
|
`conf` is applied on top of the base Hadoop conf associated with the SparkContext
|
|
of this RDD to create a merged Hadoop MapReduce job configuration for saving the data.
|
|
|
|
:param path: path to Hadoop file
|
|
:param outputFormatClass: fully qualified classname of Hadoop OutputFormat
|
|
(e.g. "org.apache.hadoop.mapreduce.lib.output.SequenceFileOutputFormat")
|
|
:param keyClass: fully qualified classname of key Writable class
|
|
(e.g. "org.apache.hadoop.io.IntWritable", None by default)
|
|
:param valueClass: fully qualified classname of value Writable class
|
|
(e.g. "org.apache.hadoop.io.Text", None by default)
|
|
:param keyConverter: (None by default)
|
|
:param valueConverter: (None by default)
|
|
:param conf: Hadoop job configuration, passed in as a dict (None by default)
|
|
"""
|
|
jconf = self.ctx._dictToJavaMap(conf)
|
|
pickledRDD = self._pickled()
|
|
self.ctx._jvm.PythonRDD.saveAsNewAPIHadoopFile(pickledRDD._jrdd, True, path,
|
|
outputFormatClass,
|
|
keyClass, valueClass,
|
|
keyConverter, valueConverter, jconf)
|
|
|
|
def saveAsHadoopDataset(self, conf, keyConverter=None, valueConverter=None):
|
|
"""
|
|
Output a Python RDD of key-value pairs (of form ``RDD[(K, V)]``) to any Hadoop file
|
|
system, using the old Hadoop OutputFormat API (mapred package). Keys/values are
|
|
converted for output using either user specified converters or, by default,
|
|
"org.apache.spark.api.python.JavaToWritableConverter".
|
|
|
|
:param conf: Hadoop job configuration, passed in as a dict
|
|
:param keyConverter: (None by default)
|
|
:param valueConverter: (None by default)
|
|
"""
|
|
jconf = self.ctx._dictToJavaMap(conf)
|
|
pickledRDD = self._pickled()
|
|
self.ctx._jvm.PythonRDD.saveAsHadoopDataset(pickledRDD._jrdd, True, jconf,
|
|
keyConverter, valueConverter, False)
|
|
|
|
def saveAsHadoopFile(self, path, outputFormatClass, keyClass=None, valueClass=None,
|
|
keyConverter=None, valueConverter=None, conf=None,
|
|
compressionCodecClass=None):
|
|
"""
|
|
Output a Python RDD of key-value pairs (of form ``RDD[(K, V)]``) to any Hadoop file
|
|
system, using the old Hadoop OutputFormat API (mapred package). Key and value types
|
|
will be inferred if not specified. Keys and values are converted for output using either
|
|
user specified converters or "org.apache.spark.api.python.JavaToWritableConverter". The
|
|
`conf` is applied on top of the base Hadoop conf associated with the SparkContext
|
|
of this RDD to create a merged Hadoop MapReduce job configuration for saving the data.
|
|
|
|
:param path: path to Hadoop file
|
|
:param outputFormatClass: fully qualified classname of Hadoop OutputFormat
|
|
(e.g. "org.apache.hadoop.mapred.SequenceFileOutputFormat")
|
|
:param keyClass: fully qualified classname of key Writable class
|
|
(e.g. "org.apache.hadoop.io.IntWritable", None by default)
|
|
:param valueClass: fully qualified classname of value Writable class
|
|
(e.g. "org.apache.hadoop.io.Text", None by default)
|
|
:param keyConverter: (None by default)
|
|
:param valueConverter: (None by default)
|
|
:param conf: (None by default)
|
|
:param compressionCodecClass: (None by default)
|
|
"""
|
|
jconf = self.ctx._dictToJavaMap(conf)
|
|
pickledRDD = self._pickled()
|
|
self.ctx._jvm.PythonRDD.saveAsHadoopFile(pickledRDD._jrdd, True, path,
|
|
outputFormatClass,
|
|
keyClass, valueClass,
|
|
keyConverter, valueConverter,
|
|
jconf, compressionCodecClass)
|
|
|
|
def saveAsSequenceFile(self, path, compressionCodecClass=None):
|
|
"""
|
|
Output a Python RDD of key-value pairs (of form ``RDD[(K, V)]``) to any Hadoop file
|
|
system, using the "org.apache.hadoop.io.Writable" types that we convert from the
|
|
RDD's key and value types. The mechanism is as follows:
|
|
|
|
1. Pyrolite is used to convert pickled Python RDD into RDD of Java objects.
|
|
2. Keys and values of this Java RDD are converted to Writables and written out.
|
|
|
|
:param path: path to sequence file
|
|
:param compressionCodecClass: (None by default)
|
|
"""
|
|
pickledRDD = self._pickled()
|
|
self.ctx._jvm.PythonRDD.saveAsSequenceFile(pickledRDD._jrdd, True,
|
|
path, compressionCodecClass)
|
|
|
|
def saveAsPickleFile(self, path, batchSize=10):
|
|
"""
|
|
Save this RDD as a SequenceFile of serialized objects. The serializer
|
|
used is :class:`pyspark.serializers.PickleSerializer`, default batch size
|
|
is 10.
|
|
|
|
>>> tmpFile = NamedTemporaryFile(delete=True)
|
|
>>> tmpFile.close()
|
|
>>> sc.parallelize([1, 2, 'spark', 'rdd']).saveAsPickleFile(tmpFile.name, 3)
|
|
>>> sorted(sc.pickleFile(tmpFile.name, 5).map(str).collect())
|
|
['1', '2', 'rdd', 'spark']
|
|
"""
|
|
if batchSize == 0:
|
|
ser = AutoBatchedSerializer(PickleSerializer())
|
|
else:
|
|
ser = BatchedSerializer(PickleSerializer(), batchSize)
|
|
self._reserialize(ser)._jrdd.saveAsObjectFile(path)
|
|
|
|
@ignore_unicode_prefix
|
|
def saveAsTextFile(self, path, compressionCodecClass=None):
|
|
"""
|
|
Save this RDD as a text file, using string representations of elements.
|
|
|
|
:param path: path to text file
|
|
:param compressionCodecClass: (None by default) string i.e.
|
|
"org.apache.hadoop.io.compress.GzipCodec"
|
|
|
|
>>> tempFile = NamedTemporaryFile(delete=True)
|
|
>>> tempFile.close()
|
|
>>> sc.parallelize(range(10)).saveAsTextFile(tempFile.name)
|
|
>>> from fileinput import input
|
|
>>> from glob import glob
|
|
>>> ''.join(sorted(input(glob(tempFile.name + "/part-0000*"))))
|
|
'0\\n1\\n2\\n3\\n4\\n5\\n6\\n7\\n8\\n9\\n'
|
|
|
|
Empty lines are tolerated when saving to text files.
|
|
|
|
>>> tempFile2 = NamedTemporaryFile(delete=True)
|
|
>>> tempFile2.close()
|
|
>>> sc.parallelize(['', 'foo', '', 'bar', '']).saveAsTextFile(tempFile2.name)
|
|
>>> ''.join(sorted(input(glob(tempFile2.name + "/part-0000*"))))
|
|
'\\n\\n\\nbar\\nfoo\\n'
|
|
|
|
Using compressionCodecClass
|
|
|
|
>>> tempFile3 = NamedTemporaryFile(delete=True)
|
|
>>> tempFile3.close()
|
|
>>> codec = "org.apache.hadoop.io.compress.GzipCodec"
|
|
>>> sc.parallelize(['foo', 'bar']).saveAsTextFile(tempFile3.name, codec)
|
|
>>> from fileinput import input, hook_compressed
|
|
>>> result = sorted(input(glob(tempFile3.name + "/part*.gz"), openhook=hook_compressed))
|
|
>>> b''.join(result).decode('utf-8')
|
|
u'bar\\nfoo\\n'
|
|
"""
|
|
def func(split, iterator):
|
|
for x in iterator:
|
|
if not isinstance(x, (unicode, bytes)):
|
|
x = unicode(x)
|
|
if isinstance(x, unicode):
|
|
x = x.encode("utf-8")
|
|
yield x
|
|
keyed = self.mapPartitionsWithIndex(func)
|
|
keyed._bypass_serializer = True
|
|
if compressionCodecClass:
|
|
compressionCodec = self.ctx._jvm.java.lang.Class.forName(compressionCodecClass)
|
|
keyed._jrdd.map(self.ctx._jvm.BytesToString()).saveAsTextFile(path, compressionCodec)
|
|
else:
|
|
keyed._jrdd.map(self.ctx._jvm.BytesToString()).saveAsTextFile(path)
|
|
|
|
# Pair functions
|
|
|
|
def collectAsMap(self):
|
|
"""
|
|
Return the key-value pairs in this RDD to the master as a dictionary.
|
|
|
|
.. note:: this method should only be used if the resulting data is expected
|
|
to be small, as all the data is loaded into the driver's memory.
|
|
|
|
>>> m = sc.parallelize([(1, 2), (3, 4)]).collectAsMap()
|
|
>>> m[1]
|
|
2
|
|
>>> m[3]
|
|
4
|
|
"""
|
|
return dict(self.collect())
|
|
|
|
def keys(self):
|
|
"""
|
|
Return an RDD with the keys of each tuple.
|
|
|
|
>>> m = sc.parallelize([(1, 2), (3, 4)]).keys()
|
|
>>> m.collect()
|
|
[1, 3]
|
|
"""
|
|
return self.map(lambda x: x[0])
|
|
|
|
def values(self):
|
|
"""
|
|
Return an RDD with the values of each tuple.
|
|
|
|
>>> m = sc.parallelize([(1, 2), (3, 4)]).values()
|
|
>>> m.collect()
|
|
[2, 4]
|
|
"""
|
|
return self.map(lambda x: x[1])
|
|
|
|
def reduceByKey(self, func, numPartitions=None, partitionFunc=portable_hash):
|
|
"""
|
|
Merge the values for each key using an associative and commutative reduce function.
|
|
|
|
This will also perform the merging locally on each mapper before
|
|
sending results to a reducer, similarly to a "combiner" in MapReduce.
|
|
|
|
Output will be partitioned with `numPartitions` partitions, or
|
|
the default parallelism level if `numPartitions` is not specified.
|
|
Default partitioner is hash-partition.
|
|
|
|
>>> from operator import add
|
|
>>> rdd = sc.parallelize([("a", 1), ("b", 1), ("a", 1)])
|
|
>>> sorted(rdd.reduceByKey(add).collect())
|
|
[('a', 2), ('b', 1)]
|
|
"""
|
|
return self.combineByKey(lambda x: x, func, func, numPartitions, partitionFunc)
|
|
|
|
def reduceByKeyLocally(self, func):
|
|
"""
|
|
Merge the values for each key using an associative and commutative reduce function, but
|
|
return the results immediately to the master as a dictionary.
|
|
|
|
This will also perform the merging locally on each mapper before
|
|
sending results to a reducer, similarly to a "combiner" in MapReduce.
|
|
|
|
>>> from operator import add
|
|
>>> rdd = sc.parallelize([("a", 1), ("b", 1), ("a", 1)])
|
|
>>> sorted(rdd.reduceByKeyLocally(add).items())
|
|
[('a', 2), ('b', 1)]
|
|
"""
|
|
func = fail_on_stopiteration(func)
|
|
|
|
def reducePartition(iterator):
|
|
m = {}
|
|
for k, v in iterator:
|
|
m[k] = func(m[k], v) if k in m else v
|
|
yield m
|
|
|
|
def mergeMaps(m1, m2):
|
|
for k, v in m2.items():
|
|
m1[k] = func(m1[k], v) if k in m1 else v
|
|
return m1
|
|
return self.mapPartitions(reducePartition).reduce(mergeMaps)
|
|
|
|
def countByKey(self):
|
|
"""
|
|
Count the number of elements for each key, and return the result to the
|
|
master as a dictionary.
|
|
|
|
>>> rdd = sc.parallelize([("a", 1), ("b", 1), ("a", 1)])
|
|
>>> sorted(rdd.countByKey().items())
|
|
[('a', 2), ('b', 1)]
|
|
"""
|
|
return self.map(lambda x: x[0]).countByValue()
|
|
|
|
def join(self, other, numPartitions=None):
|
|
"""
|
|
Return an RDD containing all pairs of elements with matching keys in
|
|
`self` and `other`.
|
|
|
|
Each pair of elements will be returned as a (k, (v1, v2)) tuple, where
|
|
(k, v1) is in `self` and (k, v2) is in `other`.
|
|
|
|
Performs a hash join across the cluster.
|
|
|
|
>>> x = sc.parallelize([("a", 1), ("b", 4)])
|
|
>>> y = sc.parallelize([("a", 2), ("a", 3)])
|
|
>>> sorted(x.join(y).collect())
|
|
[('a', (1, 2)), ('a', (1, 3))]
|
|
"""
|
|
return python_join(self, other, numPartitions)
|
|
|
|
def leftOuterJoin(self, other, numPartitions=None):
|
|
"""
|
|
Perform a left outer join of `self` and `other`.
|
|
|
|
For each element (k, v) in `self`, the resulting RDD will either
|
|
contain all pairs (k, (v, w)) for w in `other`, or the pair
|
|
(k, (v, None)) if no elements in `other` have key k.
|
|
|
|
Hash-partitions the resulting RDD into the given number of partitions.
|
|
|
|
>>> x = sc.parallelize([("a", 1), ("b", 4)])
|
|
>>> y = sc.parallelize([("a", 2)])
|
|
>>> sorted(x.leftOuterJoin(y).collect())
|
|
[('a', (1, 2)), ('b', (4, None))]
|
|
"""
|
|
return python_left_outer_join(self, other, numPartitions)
|
|
|
|
def rightOuterJoin(self, other, numPartitions=None):
|
|
"""
|
|
Perform a right outer join of `self` and `other`.
|
|
|
|
For each element (k, w) in `other`, the resulting RDD will either
|
|
contain all pairs (k, (v, w)) for v in this, or the pair (k, (None, w))
|
|
if no elements in `self` have key k.
|
|
|
|
Hash-partitions the resulting RDD into the given number of partitions.
|
|
|
|
>>> x = sc.parallelize([("a", 1), ("b", 4)])
|
|
>>> y = sc.parallelize([("a", 2)])
|
|
>>> sorted(y.rightOuterJoin(x).collect())
|
|
[('a', (2, 1)), ('b', (None, 4))]
|
|
"""
|
|
return python_right_outer_join(self, other, numPartitions)
|
|
|
|
def fullOuterJoin(self, other, numPartitions=None):
|
|
"""
|
|
Perform a right outer join of `self` and `other`.
|
|
|
|
For each element (k, v) in `self`, the resulting RDD will either
|
|
contain all pairs (k, (v, w)) for w in `other`, or the pair
|
|
(k, (v, None)) if no elements in `other` have key k.
|
|
|
|
Similarly, for each element (k, w) in `other`, the resulting RDD will
|
|
either contain all pairs (k, (v, w)) for v in `self`, or the pair
|
|
(k, (None, w)) if no elements in `self` have key k.
|
|
|
|
Hash-partitions the resulting RDD into the given number of partitions.
|
|
|
|
>>> x = sc.parallelize([("a", 1), ("b", 4)])
|
|
>>> y = sc.parallelize([("a", 2), ("c", 8)])
|
|
>>> sorted(x.fullOuterJoin(y).collect())
|
|
[('a', (1, 2)), ('b', (4, None)), ('c', (None, 8))]
|
|
"""
|
|
return python_full_outer_join(self, other, numPartitions)
|
|
|
|
# TODO: add option to control map-side combining
|
|
# portable_hash is used as default, because builtin hash of None is different
|
|
# cross machines.
|
|
def partitionBy(self, numPartitions, partitionFunc=portable_hash):
|
|
"""
|
|
Return a copy of the RDD partitioned using the specified partitioner.
|
|
|
|
>>> pairs = sc.parallelize([1, 2, 3, 4, 2, 4, 1]).map(lambda x: (x, x))
|
|
>>> sets = pairs.partitionBy(2).glom().collect()
|
|
>>> len(set(sets[0]).intersection(set(sets[1])))
|
|
0
|
|
"""
|
|
if numPartitions is None:
|
|
numPartitions = self._defaultReducePartitions()
|
|
partitioner = Partitioner(numPartitions, partitionFunc)
|
|
if self.partitioner == partitioner:
|
|
return self
|
|
|
|
# Transferring O(n) objects to Java is too expensive.
|
|
# Instead, we'll form the hash buckets in Python,
|
|
# transferring O(numPartitions) objects to Java.
|
|
# Each object is a (splitNumber, [objects]) pair.
|
|
# In order to avoid too huge objects, the objects are
|
|
# grouped into chunks.
|
|
outputSerializer = self.ctx._unbatched_serializer
|
|
|
|
limit = (self._memory_limit() / 2)
|
|
|
|
def add_shuffle_key(split, iterator):
|
|
|
|
buckets = defaultdict(list)
|
|
c, batch = 0, min(10 * numPartitions, 1000)
|
|
|
|
for k, v in iterator:
|
|
buckets[partitionFunc(k) % numPartitions].append((k, v))
|
|
c += 1
|
|
|
|
# check used memory and avg size of chunk of objects
|
|
if (c % 1000 == 0 and get_used_memory() > limit
|
|
or c > batch):
|
|
n, size = len(buckets), 0
|
|
for split in list(buckets.keys()):
|
|
yield pack_long(split)
|
|
d = outputSerializer.dumps(buckets[split])
|
|
del buckets[split]
|
|
yield d
|
|
size += len(d)
|
|
|
|
avg = int(size / n) >> 20
|
|
# let 1M < avg < 10M
|
|
if avg < 1:
|
|
batch *= 1.5
|
|
elif avg > 10:
|
|
batch = max(int(batch / 1.5), 1)
|
|
c = 0
|
|
|
|
for split, items in buckets.items():
|
|
yield pack_long(split)
|
|
yield outputSerializer.dumps(items)
|
|
|
|
keyed = self.mapPartitionsWithIndex(add_shuffle_key, preservesPartitioning=True)
|
|
keyed._bypass_serializer = True
|
|
with SCCallSiteSync(self.context) as css:
|
|
pairRDD = self.ctx._jvm.PairwiseRDD(
|
|
keyed._jrdd.rdd()).asJavaPairRDD()
|
|
jpartitioner = self.ctx._jvm.PythonPartitioner(numPartitions,
|
|
id(partitionFunc))
|
|
jrdd = self.ctx._jvm.PythonRDD.valueOfPair(pairRDD.partitionBy(jpartitioner))
|
|
rdd = RDD(jrdd, self.ctx, BatchedSerializer(outputSerializer))
|
|
rdd.partitioner = partitioner
|
|
return rdd
|
|
|
|
# TODO: add control over map-side aggregation
|
|
def combineByKey(self, createCombiner, mergeValue, mergeCombiners,
|
|
numPartitions=None, partitionFunc=portable_hash):
|
|
"""
|
|
Generic function to combine the elements for each key using a custom
|
|
set of aggregation functions.
|
|
|
|
Turns an RDD[(K, V)] into a result of type RDD[(K, C)], for a "combined
|
|
type" C.
|
|
|
|
Users provide three functions:
|
|
|
|
- `createCombiner`, which turns a V into a C (e.g., creates
|
|
a one-element list)
|
|
- `mergeValue`, to merge a V into a C (e.g., adds it to the end of
|
|
a list)
|
|
- `mergeCombiners`, to combine two C's into a single one (e.g., merges
|
|
the lists)
|
|
|
|
To avoid memory allocation, both mergeValue and mergeCombiners are allowed to
|
|
modify and return their first argument instead of creating a new C.
|
|
|
|
In addition, users can control the partitioning of the output RDD.
|
|
|
|
.. note:: V and C can be different -- for example, one might group an RDD of type
|
|
(Int, Int) into an RDD of type (Int, List[Int]).
|
|
|
|
>>> x = sc.parallelize([("a", 1), ("b", 1), ("a", 2)])
|
|
>>> def to_list(a):
|
|
... return [a]
|
|
...
|
|
>>> def append(a, b):
|
|
... a.append(b)
|
|
... return a
|
|
...
|
|
>>> def extend(a, b):
|
|
... a.extend(b)
|
|
... return a
|
|
...
|
|
>>> sorted(x.combineByKey(to_list, append, extend).collect())
|
|
[('a', [1, 2]), ('b', [1])]
|
|
"""
|
|
if numPartitions is None:
|
|
numPartitions = self._defaultReducePartitions()
|
|
|
|
serializer = self.ctx.serializer
|
|
memory = self._memory_limit()
|
|
agg = Aggregator(createCombiner, mergeValue, mergeCombiners)
|
|
|
|
def combineLocally(iterator):
|
|
merger = ExternalMerger(agg, memory * 0.9, serializer)
|
|
merger.mergeValues(iterator)
|
|
return merger.items()
|
|
|
|
locally_combined = self.mapPartitions(combineLocally, preservesPartitioning=True)
|
|
shuffled = locally_combined.partitionBy(numPartitions, partitionFunc)
|
|
|
|
def _mergeCombiners(iterator):
|
|
merger = ExternalMerger(agg, memory, serializer)
|
|
merger.mergeCombiners(iterator)
|
|
return merger.items()
|
|
|
|
return shuffled.mapPartitions(_mergeCombiners, preservesPartitioning=True)
|
|
|
|
def aggregateByKey(self, zeroValue, seqFunc, combFunc, numPartitions=None,
|
|
partitionFunc=portable_hash):
|
|
"""
|
|
Aggregate the values of each key, using given combine functions and a neutral
|
|
"zero value". This function can return a different result type, U, than the type
|
|
of the values in this RDD, V. Thus, we need one operation for merging a V into
|
|
a U and one operation for merging two U's, The former operation is used for merging
|
|
values within a partition, and the latter is used for merging values between
|
|
partitions. To avoid memory allocation, both of these functions are
|
|
allowed to modify and return their first argument instead of creating a new U.
|
|
"""
|
|
def createZero():
|
|
return copy.deepcopy(zeroValue)
|
|
|
|
return self.combineByKey(
|
|
lambda v: seqFunc(createZero(), v), seqFunc, combFunc, numPartitions, partitionFunc)
|
|
|
|
def foldByKey(self, zeroValue, func, numPartitions=None, partitionFunc=portable_hash):
|
|
"""
|
|
Merge the values for each key using an associative function "func"
|
|
and a neutral "zeroValue" which may be added to the result an
|
|
arbitrary number of times, and must not change the result
|
|
(e.g., 0 for addition, or 1 for multiplication.).
|
|
|
|
>>> rdd = sc.parallelize([("a", 1), ("b", 1), ("a", 1)])
|
|
>>> from operator import add
|
|
>>> sorted(rdd.foldByKey(0, add).collect())
|
|
[('a', 2), ('b', 1)]
|
|
"""
|
|
def createZero():
|
|
return copy.deepcopy(zeroValue)
|
|
|
|
return self.combineByKey(lambda v: func(createZero(), v), func, func, numPartitions,
|
|
partitionFunc)
|
|
|
|
def _memory_limit(self):
|
|
return _parse_memory(self.ctx._conf.get("spark.python.worker.memory", "512m"))
|
|
|
|
# TODO: support variant with custom partitioner
|
|
def groupByKey(self, numPartitions=None, partitionFunc=portable_hash):
|
|
"""
|
|
Group the values for each key in the RDD into a single sequence.
|
|
Hash-partitions the resulting RDD with numPartitions partitions.
|
|
|
|
.. note:: If you are grouping in order to perform an aggregation (such as a
|
|
sum or average) over each key, using reduceByKey or aggregateByKey will
|
|
provide much better performance.
|
|
|
|
>>> rdd = sc.parallelize([("a", 1), ("b", 1), ("a", 1)])
|
|
>>> sorted(rdd.groupByKey().mapValues(len).collect())
|
|
[('a', 2), ('b', 1)]
|
|
>>> sorted(rdd.groupByKey().mapValues(list).collect())
|
|
[('a', [1, 1]), ('b', [1])]
|
|
"""
|
|
def createCombiner(x):
|
|
return [x]
|
|
|
|
def mergeValue(xs, x):
|
|
xs.append(x)
|
|
return xs
|
|
|
|
def mergeCombiners(a, b):
|
|
a.extend(b)
|
|
return a
|
|
|
|
memory = self._memory_limit()
|
|
serializer = self._jrdd_deserializer
|
|
agg = Aggregator(createCombiner, mergeValue, mergeCombiners)
|
|
|
|
def combine(iterator):
|
|
merger = ExternalMerger(agg, memory * 0.9, serializer)
|
|
merger.mergeValues(iterator)
|
|
return merger.items()
|
|
|
|
locally_combined = self.mapPartitions(combine, preservesPartitioning=True)
|
|
shuffled = locally_combined.partitionBy(numPartitions, partitionFunc)
|
|
|
|
def groupByKey(it):
|
|
merger = ExternalGroupBy(agg, memory, serializer)
|
|
merger.mergeCombiners(it)
|
|
return merger.items()
|
|
|
|
return shuffled.mapPartitions(groupByKey, True).mapValues(ResultIterable)
|
|
|
|
def flatMapValues(self, f):
|
|
"""
|
|
Pass each value in the key-value pair RDD through a flatMap function
|
|
without changing the keys; this also retains the original RDD's
|
|
partitioning.
|
|
|
|
>>> x = sc.parallelize([("a", ["x", "y", "z"]), ("b", ["p", "r"])])
|
|
>>> def f(x): return x
|
|
>>> x.flatMapValues(f).collect()
|
|
[('a', 'x'), ('a', 'y'), ('a', 'z'), ('b', 'p'), ('b', 'r')]
|
|
"""
|
|
flat_map_fn = lambda kv: ((kv[0], x) for x in f(kv[1]))
|
|
return self.flatMap(flat_map_fn, preservesPartitioning=True)
|
|
|
|
def mapValues(self, f):
|
|
"""
|
|
Pass each value in the key-value pair RDD through a map function
|
|
without changing the keys; this also retains the original RDD's
|
|
partitioning.
|
|
|
|
>>> x = sc.parallelize([("a", ["apple", "banana", "lemon"]), ("b", ["grapes"])])
|
|
>>> def f(x): return len(x)
|
|
>>> x.mapValues(f).collect()
|
|
[('a', 3), ('b', 1)]
|
|
"""
|
|
map_values_fn = lambda kv: (kv[0], f(kv[1]))
|
|
return self.map(map_values_fn, preservesPartitioning=True)
|
|
|
|
def groupWith(self, other, *others):
|
|
"""
|
|
Alias for cogroup but with support for multiple RDDs.
|
|
|
|
>>> w = sc.parallelize([("a", 5), ("b", 6)])
|
|
>>> x = sc.parallelize([("a", 1), ("b", 4)])
|
|
>>> y = sc.parallelize([("a", 2)])
|
|
>>> z = sc.parallelize([("b", 42)])
|
|
>>> [(x, tuple(map(list, y))) for x, y in sorted(list(w.groupWith(x, y, z).collect()))]
|
|
[('a', ([5], [1], [2], [])), ('b', ([6], [4], [], [42]))]
|
|
|
|
"""
|
|
return python_cogroup((self, other) + others, numPartitions=None)
|
|
|
|
# TODO: add variant with custom parittioner
|
|
def cogroup(self, other, numPartitions=None):
|
|
"""
|
|
For each key k in `self` or `other`, return a resulting RDD that
|
|
contains a tuple with the list of values for that key in `self` as
|
|
well as `other`.
|
|
|
|
>>> x = sc.parallelize([("a", 1), ("b", 4)])
|
|
>>> y = sc.parallelize([("a", 2)])
|
|
>>> [(x, tuple(map(list, y))) for x, y in sorted(list(x.cogroup(y).collect()))]
|
|
[('a', ([1], [2])), ('b', ([4], []))]
|
|
"""
|
|
return python_cogroup((self, other), numPartitions)
|
|
|
|
def sampleByKey(self, withReplacement, fractions, seed=None):
|
|
"""
|
|
Return a subset of this RDD sampled by key (via stratified sampling).
|
|
Create a sample of this RDD using variable sampling rates for
|
|
different keys as specified by fractions, a key to sampling rate map.
|
|
|
|
>>> fractions = {"a": 0.2, "b": 0.1}
|
|
>>> rdd = sc.parallelize(fractions.keys()).cartesian(sc.parallelize(range(0, 1000)))
|
|
>>> sample = dict(rdd.sampleByKey(False, fractions, 2).groupByKey().collect())
|
|
>>> 100 < len(sample["a"]) < 300 and 50 < len(sample["b"]) < 150
|
|
True
|
|
>>> max(sample["a"]) <= 999 and min(sample["a"]) >= 0
|
|
True
|
|
>>> max(sample["b"]) <= 999 and min(sample["b"]) >= 0
|
|
True
|
|
"""
|
|
for fraction in fractions.values():
|
|
assert fraction >= 0.0, "Negative fraction value: %s" % fraction
|
|
return self.mapPartitionsWithIndex(
|
|
RDDStratifiedSampler(withReplacement, fractions, seed).func, True)
|
|
|
|
def subtractByKey(self, other, numPartitions=None):
|
|
"""
|
|
Return each (key, value) pair in `self` that has no pair with matching
|
|
key in `other`.
|
|
|
|
>>> x = sc.parallelize([("a", 1), ("b", 4), ("b", 5), ("a", 2)])
|
|
>>> y = sc.parallelize([("a", 3), ("c", None)])
|
|
>>> sorted(x.subtractByKey(y).collect())
|
|
[('b', 4), ('b', 5)]
|
|
"""
|
|
def filter_func(pair):
|
|
key, (val1, val2) = pair
|
|
return val1 and not val2
|
|
return self.cogroup(other, numPartitions).filter(filter_func).flatMapValues(lambda x: x[0])
|
|
|
|
def subtract(self, other, numPartitions=None):
|
|
"""
|
|
Return each value in `self` that is not contained in `other`.
|
|
|
|
>>> x = sc.parallelize([("a", 1), ("b", 4), ("b", 5), ("a", 3)])
|
|
>>> y = sc.parallelize([("a", 3), ("c", None)])
|
|
>>> sorted(x.subtract(y).collect())
|
|
[('a', 1), ('b', 4), ('b', 5)]
|
|
"""
|
|
# note: here 'True' is just a placeholder
|
|
rdd = other.map(lambda x: (x, True))
|
|
return self.map(lambda x: (x, True)).subtractByKey(rdd, numPartitions).keys()
|
|
|
|
def keyBy(self, f):
|
|
"""
|
|
Creates tuples of the elements in this RDD by applying `f`.
|
|
|
|
>>> x = sc.parallelize(range(0,3)).keyBy(lambda x: x*x)
|
|
>>> y = sc.parallelize(zip(range(0,5), range(0,5)))
|
|
>>> [(x, list(map(list, y))) for x, y in sorted(x.cogroup(y).collect())]
|
|
[(0, [[0], [0]]), (1, [[1], [1]]), (2, [[], [2]]), (3, [[], [3]]), (4, [[2], [4]])]
|
|
"""
|
|
return self.map(lambda x: (f(x), x))
|
|
|
|
def repartition(self, numPartitions):
|
|
"""
|
|
Return a new RDD that has exactly numPartitions partitions.
|
|
|
|
Can increase or decrease the level of parallelism in this RDD.
|
|
Internally, this uses a shuffle to redistribute data.
|
|
If you are decreasing the number of partitions in this RDD, consider
|
|
using `coalesce`, which can avoid performing a shuffle.
|
|
|
|
>>> rdd = sc.parallelize([1,2,3,4,5,6,7], 4)
|
|
>>> sorted(rdd.glom().collect())
|
|
[[1], [2, 3], [4, 5], [6, 7]]
|
|
>>> len(rdd.repartition(2).glom().collect())
|
|
2
|
|
>>> len(rdd.repartition(10).glom().collect())
|
|
10
|
|
"""
|
|
return self.coalesce(numPartitions, shuffle=True)
|
|
|
|
def coalesce(self, numPartitions, shuffle=False):
|
|
"""
|
|
Return a new RDD that is reduced into `numPartitions` partitions.
|
|
|
|
>>> sc.parallelize([1, 2, 3, 4, 5], 3).glom().collect()
|
|
[[1], [2, 3], [4, 5]]
|
|
>>> sc.parallelize([1, 2, 3, 4, 5], 3).coalesce(1).glom().collect()
|
|
[[1, 2, 3, 4, 5]]
|
|
"""
|
|
if shuffle:
|
|
# Decrease the batch size in order to distribute evenly the elements across output
|
|
# partitions. Otherwise, repartition will possibly produce highly skewed partitions.
|
|
batchSize = min(10, self.ctx._batchSize or 1024)
|
|
ser = BatchedSerializer(PickleSerializer(), batchSize)
|
|
selfCopy = self._reserialize(ser)
|
|
jrdd_deserializer = selfCopy._jrdd_deserializer
|
|
jrdd = selfCopy._jrdd.coalesce(numPartitions, shuffle)
|
|
else:
|
|
jrdd_deserializer = self._jrdd_deserializer
|
|
jrdd = self._jrdd.coalesce(numPartitions, shuffle)
|
|
return RDD(jrdd, self.ctx, jrdd_deserializer)
|
|
|
|
def zip(self, other):
|
|
"""
|
|
Zips this RDD with another one, returning key-value pairs with the
|
|
first element in each RDD second element in each RDD, etc. Assumes
|
|
that the two RDDs have the same number of partitions and the same
|
|
number of elements in each partition (e.g. one was made through
|
|
a map on the other).
|
|
|
|
>>> x = sc.parallelize(range(0,5))
|
|
>>> y = sc.parallelize(range(1000, 1005))
|
|
>>> x.zip(y).collect()
|
|
[(0, 1000), (1, 1001), (2, 1002), (3, 1003), (4, 1004)]
|
|
"""
|
|
def get_batch_size(ser):
|
|
if isinstance(ser, BatchedSerializer):
|
|
return ser.batchSize
|
|
return 1 # not batched
|
|
|
|
def batch_as(rdd, batchSize):
|
|
return rdd._reserialize(BatchedSerializer(PickleSerializer(), batchSize))
|
|
|
|
my_batch = get_batch_size(self._jrdd_deserializer)
|
|
other_batch = get_batch_size(other._jrdd_deserializer)
|
|
if my_batch != other_batch or not my_batch:
|
|
# use the smallest batchSize for both of them
|
|
batchSize = min(my_batch, other_batch)
|
|
if batchSize <= 0:
|
|
# auto batched or unlimited
|
|
batchSize = 100
|
|
other = batch_as(other, batchSize)
|
|
self = batch_as(self, batchSize)
|
|
|
|
if self.getNumPartitions() != other.getNumPartitions():
|
|
raise ValueError("Can only zip with RDD which has the same number of partitions")
|
|
|
|
# There will be an Exception in JVM if there are different number
|
|
# of items in each partitions.
|
|
pairRDD = self._jrdd.zip(other._jrdd)
|
|
deserializer = PairDeserializer(self._jrdd_deserializer,
|
|
other._jrdd_deserializer)
|
|
return RDD(pairRDD, self.ctx, deserializer)
|
|
|
|
def zipWithIndex(self):
|
|
"""
|
|
Zips this RDD with its element indices.
|
|
|
|
The ordering is first based on the partition index and then the
|
|
ordering of items within each partition. So the first item in
|
|
the first partition gets index 0, and the last item in the last
|
|
partition receives the largest index.
|
|
|
|
This method needs to trigger a spark job when this RDD contains
|
|
more than one partitions.
|
|
|
|
>>> sc.parallelize(["a", "b", "c", "d"], 3).zipWithIndex().collect()
|
|
[('a', 0), ('b', 1), ('c', 2), ('d', 3)]
|
|
"""
|
|
starts = [0]
|
|
if self.getNumPartitions() > 1:
|
|
nums = self.mapPartitions(lambda it: [sum(1 for i in it)]).collect()
|
|
for i in range(len(nums) - 1):
|
|
starts.append(starts[-1] + nums[i])
|
|
|
|
def func(k, it):
|
|
for i, v in enumerate(it, starts[k]):
|
|
yield v, i
|
|
|
|
return self.mapPartitionsWithIndex(func)
|
|
|
|
def zipWithUniqueId(self):
|
|
"""
|
|
Zips this RDD with generated unique Long ids.
|
|
|
|
Items in the kth partition will get ids k, n+k, 2*n+k, ..., where
|
|
n is the number of partitions. So there may exist gaps, but this
|
|
method won't trigger a spark job, which is different from
|
|
:meth:`zipWithIndex`.
|
|
|
|
>>> sc.parallelize(["a", "b", "c", "d", "e"], 3).zipWithUniqueId().collect()
|
|
[('a', 0), ('b', 1), ('c', 4), ('d', 2), ('e', 5)]
|
|
"""
|
|
n = self.getNumPartitions()
|
|
|
|
def func(k, it):
|
|
for i, v in enumerate(it):
|
|
yield v, i * n + k
|
|
|
|
return self.mapPartitionsWithIndex(func)
|
|
|
|
def name(self):
|
|
"""
|
|
Return the name of this RDD.
|
|
"""
|
|
n = self._jrdd.name()
|
|
if n:
|
|
return n
|
|
|
|
@ignore_unicode_prefix
|
|
def setName(self, name):
|
|
"""
|
|
Assign a name to this RDD.
|
|
|
|
>>> rdd1 = sc.parallelize([1, 2])
|
|
>>> rdd1.setName('RDD1').name()
|
|
u'RDD1'
|
|
"""
|
|
self._jrdd.setName(name)
|
|
return self
|
|
|
|
def toDebugString(self):
|
|
"""
|
|
A description of this RDD and its recursive dependencies for debugging.
|
|
"""
|
|
debug_string = self._jrdd.toDebugString()
|
|
if debug_string:
|
|
return debug_string.encode('utf-8')
|
|
|
|
def getStorageLevel(self):
|
|
"""
|
|
Get the RDD's current storage level.
|
|
|
|
>>> rdd1 = sc.parallelize([1,2])
|
|
>>> rdd1.getStorageLevel()
|
|
StorageLevel(False, False, False, False, 1)
|
|
>>> print(rdd1.getStorageLevel())
|
|
Serialized 1x Replicated
|
|
"""
|
|
java_storage_level = self._jrdd.getStorageLevel()
|
|
storage_level = StorageLevel(java_storage_level.useDisk(),
|
|
java_storage_level.useMemory(),
|
|
java_storage_level.useOffHeap(),
|
|
java_storage_level.deserialized(),
|
|
java_storage_level.replication())
|
|
return storage_level
|
|
|
|
def _defaultReducePartitions(self):
|
|
"""
|
|
Returns the default number of partitions to use during reduce tasks (e.g., groupBy).
|
|
If spark.default.parallelism is set, then we'll use the value from SparkContext
|
|
defaultParallelism, otherwise we'll use the number of partitions in this RDD.
|
|
|
|
This mirrors the behavior of the Scala Partitioner#defaultPartitioner, intended to reduce
|
|
the likelihood of OOMs. Once PySpark adopts Partitioner-based APIs, this behavior will
|
|
be inherent.
|
|
"""
|
|
if self.ctx._conf.contains("spark.default.parallelism"):
|
|
return self.ctx.defaultParallelism
|
|
else:
|
|
return self.getNumPartitions()
|
|
|
|
def lookup(self, key):
|
|
"""
|
|
Return the list of values in the RDD for key `key`. This operation
|
|
is done efficiently if the RDD has a known partitioner by only
|
|
searching the partition that the key maps to.
|
|
|
|
>>> l = range(1000)
|
|
>>> rdd = sc.parallelize(zip(l, l), 10)
|
|
>>> rdd.lookup(42) # slow
|
|
[42]
|
|
>>> sorted = rdd.sortByKey()
|
|
>>> sorted.lookup(42) # fast
|
|
[42]
|
|
>>> sorted.lookup(1024)
|
|
[]
|
|
>>> rdd2 = sc.parallelize([(('a', 'b'), 'c')]).groupByKey()
|
|
>>> list(rdd2.lookup(('a', 'b'))[0])
|
|
['c']
|
|
"""
|
|
values = self.filter(lambda kv: kv[0] == key).values()
|
|
|
|
if self.partitioner is not None:
|
|
return self.ctx.runJob(values, lambda x: x, [self.partitioner(key)])
|
|
|
|
return values.collect()
|
|
|
|
def _to_java_object_rdd(self):
|
|
""" Return a JavaRDD of Object by unpickling
|
|
|
|
It will convert each Python object into Java object by Pyrolite, whenever the
|
|
RDD is serialized in batch or not.
|
|
"""
|
|
rdd = self._pickled()
|
|
return self.ctx._jvm.SerDeUtil.pythonToJava(rdd._jrdd, True)
|
|
|
|
def countApprox(self, timeout, confidence=0.95):
|
|
"""
|
|
Approximate version of count() that returns a potentially incomplete
|
|
result within a timeout, even if not all tasks have finished.
|
|
|
|
>>> rdd = sc.parallelize(range(1000), 10)
|
|
>>> rdd.countApprox(1000, 1.0)
|
|
1000
|
|
"""
|
|
drdd = self.mapPartitions(lambda it: [float(sum(1 for i in it))])
|
|
return int(drdd.sumApprox(timeout, confidence))
|
|
|
|
def sumApprox(self, timeout, confidence=0.95):
|
|
"""
|
|
Approximate operation to return the sum within a timeout
|
|
or meet the confidence.
|
|
|
|
>>> rdd = sc.parallelize(range(1000), 10)
|
|
>>> r = sum(range(1000))
|
|
>>> abs(rdd.sumApprox(1000) - r) / r < 0.05
|
|
True
|
|
"""
|
|
jrdd = self.mapPartitions(lambda it: [float(sum(it))])._to_java_object_rdd()
|
|
jdrdd = self.ctx._jvm.JavaDoubleRDD.fromRDD(jrdd.rdd())
|
|
r = jdrdd.sumApprox(timeout, confidence).getFinalValue()
|
|
return BoundedFloat(r.mean(), r.confidence(), r.low(), r.high())
|
|
|
|
def meanApprox(self, timeout, confidence=0.95):
|
|
"""
|
|
Approximate operation to return the mean within a timeout
|
|
or meet the confidence.
|
|
|
|
>>> rdd = sc.parallelize(range(1000), 10)
|
|
>>> r = sum(range(1000)) / 1000.0
|
|
>>> abs(rdd.meanApprox(1000) - r) / r < 0.05
|
|
True
|
|
"""
|
|
jrdd = self.map(float)._to_java_object_rdd()
|
|
jdrdd = self.ctx._jvm.JavaDoubleRDD.fromRDD(jrdd.rdd())
|
|
r = jdrdd.meanApprox(timeout, confidence).getFinalValue()
|
|
return BoundedFloat(r.mean(), r.confidence(), r.low(), r.high())
|
|
|
|
def countApproxDistinct(self, relativeSD=0.05):
|
|
"""
|
|
Return approximate number of distinct elements in the RDD.
|
|
|
|
The algorithm used is based on streamlib's implementation of
|
|
`"HyperLogLog in Practice: Algorithmic Engineering of a State
|
|
of The Art Cardinality Estimation Algorithm", available here
|
|
<https://doi.org/10.1145/2452376.2452456>`_.
|
|
|
|
:param relativeSD: Relative accuracy. Smaller values create
|
|
counters that require more space.
|
|
It must be greater than 0.000017.
|
|
|
|
>>> n = sc.parallelize(range(1000)).map(str).countApproxDistinct()
|
|
>>> 900 < n < 1100
|
|
True
|
|
>>> n = sc.parallelize([i % 20 for i in range(1000)]).countApproxDistinct()
|
|
>>> 16 < n < 24
|
|
True
|
|
"""
|
|
if relativeSD < 0.000017:
|
|
raise ValueError("relativeSD should be greater than 0.000017")
|
|
# the hash space in Java is 2^32
|
|
hashRDD = self.map(lambda x: portable_hash(x) & 0xFFFFFFFF)
|
|
return hashRDD._to_java_object_rdd().countApproxDistinct(relativeSD)
|
|
|
|
def toLocalIterator(self, prefetchPartitions=False):
|
|
"""
|
|
Return an iterator that contains all of the elements in this RDD.
|
|
The iterator will consume as much memory as the largest partition in this RDD.
|
|
With prefetch it may consume up to the memory of the 2 largest partitions.
|
|
|
|
:param prefetchPartitions: If Spark should pre-fetch the next partition
|
|
before it is needed.
|
|
|
|
>>> rdd = sc.parallelize(range(10))
|
|
>>> [x for x in rdd.toLocalIterator()]
|
|
[0, 1, 2, 3, 4, 5, 6, 7, 8, 9]
|
|
"""
|
|
with SCCallSiteSync(self.context) as css:
|
|
sock_info = self.ctx._jvm.PythonRDD.toLocalIteratorAndServe(
|
|
self._jrdd.rdd(),
|
|
prefetchPartitions)
|
|
return _local_iterator_from_socket(sock_info, self._jrdd_deserializer)
|
|
|
|
def barrier(self):
|
|
"""
|
|
.. note:: Experimental
|
|
|
|
Marks the current stage as a barrier stage, where Spark must launch all tasks together.
|
|
In case of a task failure, instead of only restarting the failed task, Spark will abort the
|
|
entire stage and relaunch all tasks for this stage.
|
|
The barrier execution mode feature is experimental and it only handles limited scenarios.
|
|
Please read the linked SPIP and design docs to understand the limitations and future plans.
|
|
|
|
:return: an :class:`RDDBarrier` instance that provides actions within a barrier stage.
|
|
|
|
.. seealso:: :class:`BarrierTaskContext`
|
|
.. seealso:: `SPIP: Barrier Execution Mode
|
|
<http://jira.apache.org/jira/browse/SPARK-24374>`_
|
|
.. seealso:: `Design Doc <https://jira.apache.org/jira/browse/SPARK-24582>`_
|
|
|
|
.. versionadded:: 2.4.0
|
|
"""
|
|
return RDDBarrier(self)
|
|
|
|
def _is_barrier(self):
|
|
"""
|
|
Whether this RDD is in a barrier stage.
|
|
"""
|
|
return self._jrdd.rdd().isBarrier()
|
|
|
|
def withResources(self, profile):
|
|
"""
|
|
.. note:: Experimental
|
|
|
|
Specify a :class:`pyspark.resource.ResourceProfile` to use when calculating this RDD.
|
|
This is only supported on certain cluster managers and currently requires dynamic
|
|
allocation to be enabled. It will result in new executors with the resources specified
|
|
being acquired to calculate the RDD.
|
|
|
|
.. versionadded:: 3.1.0
|
|
"""
|
|
self.has_resource_profile = True
|
|
if profile._java_resource_profile is not None:
|
|
jrp = profile._java_resource_profile
|
|
else:
|
|
builder = self.ctx._jvm.org.apache.spark.resource.ResourceProfileBuilder()
|
|
ereqs = ExecutorResourceRequests(self.ctx._jvm, profile._executor_resource_requests)
|
|
treqs = TaskResourceRequests(self.ctx._jvm, profile._task_resource_requests)
|
|
builder.require(ereqs._java_executor_resource_requests)
|
|
builder.require(treqs._java_task_resource_requests)
|
|
jrp = builder.build()
|
|
|
|
self._jrdd.withResources(jrp)
|
|
return self
|
|
|
|
def getResourceProfile(self):
|
|
"""
|
|
.. note:: Experimental
|
|
|
|
Get the :class:`pyspark.resource.ResourceProfile` specified with this RDD or None
|
|
if it wasn't specified.
|
|
:return: the user specified ResourceProfile or None if none were specified
|
|
|
|
.. versionadded:: 3.1.0
|
|
"""
|
|
rp = self._jrdd.getResourceProfile()
|
|
if rp is not None:
|
|
return ResourceProfile(_java_resource_profile=rp)
|
|
else:
|
|
return None
|
|
|
|
|
|
def _prepare_for_python_RDD(sc, command):
|
|
# the serialized command will be compressed by broadcast
|
|
ser = CloudPickleSerializer()
|
|
pickled_command = ser.dumps(command)
|
|
if len(pickled_command) > sc._jvm.PythonUtils.getBroadcastThreshold(sc._jsc): # Default 1M
|
|
# The broadcast will have same life cycle as created PythonRDD
|
|
broadcast = sc.broadcast(pickled_command)
|
|
pickled_command = ser.dumps(broadcast)
|
|
broadcast_vars = [x._jbroadcast for x in sc._pickled_broadcast_vars]
|
|
sc._pickled_broadcast_vars.clear()
|
|
return pickled_command, broadcast_vars, sc.environment, sc._python_includes
|
|
|
|
|
|
def _wrap_function(sc, func, deserializer, serializer, profiler=None):
|
|
assert deserializer, "deserializer should not be empty"
|
|
assert serializer, "serializer should not be empty"
|
|
command = (func, profiler, deserializer, serializer)
|
|
pickled_command, broadcast_vars, env, includes = _prepare_for_python_RDD(sc, command)
|
|
return sc._jvm.PythonFunction(bytearray(pickled_command), env, includes, sc.pythonExec,
|
|
sc.pythonVer, broadcast_vars, sc._javaAccumulator)
|
|
|
|
|
|
class RDDBarrier(object):
|
|
|
|
"""
|
|
.. note:: Experimental
|
|
|
|
Wraps an RDD in a barrier stage, which forces Spark to launch tasks of this stage together.
|
|
:class:`RDDBarrier` instances are created by :func:`RDD.barrier`.
|
|
|
|
.. versionadded:: 2.4.0
|
|
"""
|
|
|
|
def __init__(self, rdd):
|
|
self.rdd = rdd
|
|
|
|
def mapPartitions(self, f, preservesPartitioning=False):
|
|
"""
|
|
.. note:: Experimental
|
|
|
|
Returns a new RDD by applying a function to each partition of the wrapped RDD,
|
|
where tasks are launched together in a barrier stage.
|
|
The interface is the same as :func:`RDD.mapPartitions`.
|
|
Please see the API doc there.
|
|
|
|
.. versionadded:: 2.4.0
|
|
"""
|
|
def func(s, iterator):
|
|
return f(iterator)
|
|
return PipelinedRDD(self.rdd, func, preservesPartitioning, isFromBarrier=True)
|
|
|
|
def mapPartitionsWithIndex(self, f, preservesPartitioning=False):
|
|
"""
|
|
.. note:: Experimental
|
|
|
|
Returns a new RDD by applying a function to each partition of the wrapped RDD, while
|
|
tracking the index of the original partition. And all tasks are launched together
|
|
in a barrier stage.
|
|
The interface is the same as :func:`RDD.mapPartitionsWithIndex`.
|
|
Please see the API doc there.
|
|
|
|
.. versionadded:: 3.0.0
|
|
"""
|
|
return PipelinedRDD(self.rdd, f, preservesPartitioning, isFromBarrier=True)
|
|
|
|
|
|
class PipelinedRDD(RDD):
|
|
|
|
"""
|
|
Pipelined maps:
|
|
|
|
>>> rdd = sc.parallelize([1, 2, 3, 4])
|
|
>>> rdd.map(lambda x: 2 * x).cache().map(lambda x: 2 * x).collect()
|
|
[4, 8, 12, 16]
|
|
>>> rdd.map(lambda x: 2 * x).map(lambda x: 2 * x).collect()
|
|
[4, 8, 12, 16]
|
|
|
|
Pipelined reduces:
|
|
>>> from operator import add
|
|
>>> rdd.map(lambda x: 2 * x).reduce(add)
|
|
20
|
|
>>> rdd.flatMap(lambda x: [x, x]).reduce(add)
|
|
20
|
|
"""
|
|
|
|
def __init__(self, prev, func, preservesPartitioning=False, isFromBarrier=False):
|
|
if not isinstance(prev, PipelinedRDD) or not prev._is_pipelinable():
|
|
# This transformation is the first in its stage:
|
|
self.func = func
|
|
self.preservesPartitioning = preservesPartitioning
|
|
self._prev_jrdd = prev._jrdd
|
|
self._prev_jrdd_deserializer = prev._jrdd_deserializer
|
|
else:
|
|
prev_func = prev.func
|
|
|
|
def pipeline_func(split, iterator):
|
|
return func(split, prev_func(split, iterator))
|
|
self.func = pipeline_func
|
|
self.preservesPartitioning = \
|
|
prev.preservesPartitioning and preservesPartitioning
|
|
self._prev_jrdd = prev._prev_jrdd # maintain the pipeline
|
|
self._prev_jrdd_deserializer = prev._prev_jrdd_deserializer
|
|
self.is_cached = False
|
|
self.has_resource_profile = False
|
|
self.is_checkpointed = False
|
|
self.ctx = prev.ctx
|
|
self.prev = prev
|
|
self._jrdd_val = None
|
|
self._id = None
|
|
self._jrdd_deserializer = self.ctx.serializer
|
|
self._bypass_serializer = False
|
|
self.partitioner = prev.partitioner if self.preservesPartitioning else None
|
|
self.is_barrier = isFromBarrier or prev._is_barrier()
|
|
|
|
def getNumPartitions(self):
|
|
return self._prev_jrdd.partitions().size()
|
|
|
|
@property
|
|
def _jrdd(self):
|
|
if self._jrdd_val:
|
|
return self._jrdd_val
|
|
if self._bypass_serializer:
|
|
self._jrdd_deserializer = NoOpSerializer()
|
|
|
|
if self.ctx.profiler_collector:
|
|
profiler = self.ctx.profiler_collector.new_profiler(self.ctx)
|
|
else:
|
|
profiler = None
|
|
|
|
wrapped_func = _wrap_function(self.ctx, self.func, self._prev_jrdd_deserializer,
|
|
self._jrdd_deserializer, profiler)
|
|
python_rdd = self.ctx._jvm.PythonRDD(self._prev_jrdd.rdd(), wrapped_func,
|
|
self.preservesPartitioning, self.is_barrier)
|
|
self._jrdd_val = python_rdd.asJavaRDD()
|
|
|
|
if profiler:
|
|
self._id = self._jrdd_val.id()
|
|
self.ctx.profiler_collector.add_profiler(self._id, profiler)
|
|
return self._jrdd_val
|
|
|
|
def id(self):
|
|
if self._id is None:
|
|
self._id = self._jrdd.id()
|
|
return self._id
|
|
|
|
def _is_pipelinable(self):
|
|
return not (self.is_cached or self.is_checkpointed or self.has_resource_profile)
|
|
|
|
def _is_barrier(self):
|
|
return self.is_barrier
|
|
|
|
|
|
def _test():
|
|
import doctest
|
|
from pyspark.context import SparkContext
|
|
globs = globals().copy()
|
|
# The small batch size here ensures that we see multiple batches,
|
|
# even in these small test examples:
|
|
globs['sc'] = SparkContext('local[4]', 'PythonTest')
|
|
(failure_count, test_count) = doctest.testmod(
|
|
globs=globs, optionflags=doctest.ELLIPSIS)
|
|
globs['sc'].stop()
|
|
if failure_count:
|
|
sys.exit(-1)
|
|
|
|
|
|
if __name__ == "__main__":
|
|
_test()
|