spark-instrumented-optimizer/python/pyspark/rddsampler.py
Arun Ramakrishnan 35e3d199f0 SPARK-1438 RDD.sample() make seed param optional
copying form previous pull request https://github.com/apache/spark/pull/462

Its probably better to let the underlying language implementation take care of the default . This was easier to do with python as the default value for seed in random and numpy random is None.

In Scala/Java side it might mean propagating an Option or null(oh no!) down the chain until where the Random is constructed. But, looks like the convention in some other methods was to use System.nanoTime. So, followed that convention.

Conflict with overloaded method in sql.SchemaRDD.sample which also defines default params.
sample(fraction, withReplacement=false, seed=math.random)
Scala does not allow more than one overloaded to have default params. I believe the author intended to override the RDD.sample method and not overload it. So, changed it.

If backward compatible is important, 3 new method can be introduced (without default params) like this
sample(fraction)
sample(fraction, withReplacement)
sample(fraction, withReplacement, seed)

Added some tests for the scala RDD takeSample method.

Author: Arun Ramakrishnan <smartnut007@gmail.com>

This patch had conflicts when merged, resolved by
Committer: Matei Zaharia <matei@databricks.com>

Closes #477 from smartnut007/master and squashes the following commits:

07bb06e [Arun Ramakrishnan] SPARK-1438 fixing more space formatting issues
b9ebfe2 [Arun Ramakrishnan] SPARK-1438 removing redundant import of random in python rddsampler
8d05b1a [Arun Ramakrishnan] SPARK-1438 RDD . Replace System.nanoTime with a Random generated number. python: use a separate instance of Random instead of seeding language api global Random instance.
69619c6 [Arun Ramakrishnan] SPARK-1438 fix spacing issue
0c247db [Arun Ramakrishnan] SPARK-1438 RDD language apis to support optional seed in RDD methods sample/takeSample
2014-04-24 17:27:16 -07:00

110 lines
3.9 KiB
Python

#
# Licensed to the Apache Software Foundation (ASF) under one or more
# contributor license agreements. See the NOTICE file distributed with
# this work for additional information regarding copyright ownership.
# The ASF licenses this file to You under the Apache License, Version 2.0
# (the "License"); you may not use this file except in compliance with
# the License. You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.
#
import sys
import random
class RDDSampler(object):
def __init__(self, withReplacement, fraction, seed=None):
try:
import numpy
self._use_numpy = True
except ImportError:
print >> sys.stderr, "NumPy does not appear to be installed. Falling back to default random generator for sampling."
self._use_numpy = False
self._seed = seed if seed is not None else random.randint(0, sys.maxint)
self._withReplacement = withReplacement
self._fraction = fraction
self._random = None
self._split = None
self._rand_initialized = False
def initRandomGenerator(self, split):
if self._use_numpy:
import numpy
self._random = numpy.random.RandomState(self._seed)
else:
self._random = random.Random(self._seed)
for _ in range(0, split):
# discard the next few values in the sequence to have a
# different seed for the different splits
self._random.randint(0, sys.maxint)
self._split = split
self._rand_initialized = True
def getUniformSample(self, split):
if not self._rand_initialized or split != self._split:
self.initRandomGenerator(split)
if self._use_numpy:
return self._random.random_sample()
else:
return self._random.uniform(0.0, 1.0)
def getPoissonSample(self, split, mean):
if not self._rand_initialized or split != self._split:
self.initRandomGenerator(split)
if self._use_numpy:
return self._random.poisson(mean)
else:
# here we simulate drawing numbers n_i ~ Poisson(lambda = 1/mean) by
# drawing a sequence of numbers delta_j ~ Exp(mean)
num_arrivals = 1
cur_time = 0.0
cur_time += self._random.expovariate(mean)
if cur_time > 1.0:
return 0
while(cur_time <= 1.0):
cur_time += self._random.expovariate(mean)
num_arrivals += 1
return (num_arrivals - 1)
def shuffle(self, vals):
if self._random == None:
self.initRandomGenerator(0) # this should only ever called on the master so
# the split does not matter
if self._use_numpy:
self._random.shuffle(vals)
else:
self._random.shuffle(vals, self._random.random)
def func(self, split, iterator):
if self._withReplacement:
for obj in iterator:
# For large datasets, the expected number of occurrences of each element in a sample with
# replacement is Poisson(frac). We use that to get a count for each element.
count = self.getPoissonSample(split, mean = self._fraction)
for _ in range(0, count):
yield obj
else:
for obj in iterator:
if self.getUniformSample(split) <= self._fraction:
yield obj