Commit graph

143 commits

Author SHA1 Message Date
Vladimir Vladimirov b3872e00d1 SPARK-5633 pyspark saveAsTextFile support for compression codec
See https://issues.apache.org/jira/browse/SPARK-5633 for details

Author: Vladimir Vladimirov <vladimir.vladimirov@magnetic.com>

Closes #4403 from smartkiwi/master and squashes the following commits:

94c014e [Vladimir Vladimirov] SPARK-5633 pyspark saveAsTextFile support for compression codec
2015-02-06 13:55:02 -08:00
Davies Liu dc101b0e4e [SPARK-5577] Python udf for DataFrame
Author: Davies Liu <davies@databricks.com>

Closes #4351 from davies/python_udf and squashes the following commits:

d250692 [Davies Liu] fix conflict
34234d4 [Davies Liu] Merge branch 'master' of github.com:apache/spark into python_udf
440f769 [Davies Liu] address comments
f0a3121 [Davies Liu] track life cycle of broadcast
f99b2e1 [Davies Liu] address comments
462b334 [Davies Liu] Merge branch 'master' of github.com:apache/spark into python_udf
7bccc3b [Davies Liu] python udf
58dee20 [Davies Liu] clean up
2015-02-04 15:55:09 -08:00
Xiangrui Meng 4ee79c71af [SPARK-5430] move treeReduce and treeAggregate from mllib to core
We have seen many use cases of `treeAggregate`/`treeReduce` outside the ML domain. Maybe it is time to move them to Core. pwendell

Author: Xiangrui Meng <meng@databricks.com>

Closes #4228 from mengxr/SPARK-5430 and squashes the following commits:

20ad40d [Xiangrui Meng] exclude tree* from mima
e89a43e [Xiangrui Meng] fix compile and update java doc
3ae1a4b [Xiangrui Meng] add treeReduce/treeAggregate to Python
6f948c5 [Xiangrui Meng] add treeReduce/treeAggregate to JavaRDDLike
d600b6c [Xiangrui Meng] move treeReduce and treeAggregate to core
2015-01-28 17:26:03 -08:00
Yandu Oppacher 3bead67d59 [SPARK-4387][PySpark] Refactoring python profiling code to make it extensible
This PR is based on #3255 , fix conflicts and code style.

Closes #3255.

Author: Yandu Oppacher <yandu.oppacher@jadedpixel.com>
Author: Davies Liu <davies@databricks.com>

Closes #3901 from davies/refactor-python-profile-code and squashes the following commits:

b4a9306 [Davies Liu] fix tests
4b79ce8 [Davies Liu] add docstring for profiler_cls
2700e47 [Davies Liu] use BasicProfiler as default
349e341 [Davies Liu] more refactor
6a5d4df [Davies Liu] refactor and fix tests
31bf6b6 [Davies Liu] fix code style
0864b5d [Yandu Oppacher] Remove unused method
76a6c37 [Yandu Oppacher] Added a profile collector to accumulate the profilers per stage
9eefc36 [Yandu Oppacher] Fix doc
9ace076 [Yandu Oppacher] Refactor of profiler, and moved tests around
8739aff [Yandu Oppacher] Code review fixes
9bda3ec [Yandu Oppacher] Refactor profiler code
2015-01-28 13:48:06 -08:00
Michael Nazario 456c11f15a [SPARK-5440][pyspark] Add toLocalIterator to pyspark rdd
Since Java and Scala both have access to iterate over partitions via the "toLocalIterator" function, python should also have that same ability.

Author: Michael Nazario <mnazario@palantir.com>

Closes #4237 from mnazario/feature/toLocalIterator and squashes the following commits:

1c58526 [Michael Nazario] Fix documentation off by one error
0cdc8f8 [Michael Nazario] Add toLocalIterator to PySpark
2015-01-28 12:47:12 -08:00
Sandy Ryza 406f6d3070 SPARK-5458. Refer to aggregateByKey instead of combineByKey in docs
Author: Sandy Ryza <sandy@cloudera.com>

Closes #4251 from sryza/sandy-spark-5458 and squashes the following commits:

460827a [Sandy Ryza] Python too
d2dc160 [Sandy Ryza] SPARK-5458. Refer to aggregateByKey instead of combineByKey in docs
2015-01-28 12:41:23 -08:00
Josh Rosen cef1f092a6 [SPARK-5063] More helpful error messages for several invalid operations
This patch adds more helpful error messages for invalid programs that define nested RDDs, broadcast RDDs, perform actions inside of transformations (e.g. calling `count()` from inside of `map()`), and call certain methods on stopped SparkContexts.  Currently, these invalid programs lead to confusing NullPointerExceptions at runtime and have been a major source of questions on the mailing list and StackOverflow.

In a few cases, I chose to log warnings instead of throwing exceptions in order to avoid any chance that this patch breaks programs that worked "by accident" in earlier Spark releases (e.g. programs that define nested RDDs but never run any jobs with them).

In SparkContext, the new `assertNotStopped()` method is used to check whether methods are being invoked on a stopped SparkContext.  In some cases, user programs will not crash in spite of calling methods on stopped SparkContexts, so I've only added `assertNotStopped()` calls to methods that always throw exceptions when called on stopped contexts (e.g. by dereferencing a null `dagScheduler` pointer).

Author: Josh Rosen <joshrosen@databricks.com>

Closes #3884 from JoshRosen/SPARK-5063 and squashes the following commits:

a38774b [Josh Rosen] Fix spelling typo
a943e00 [Josh Rosen] Convert two exceptions into warnings in order to avoid breaking user programs in some edge-cases.
2d0d7f7 [Josh Rosen] Fix test to reflect 1.2.1 compatibility
3f0ea0c [Josh Rosen] Revert two unintentional formatting changes
8e5da69 [Josh Rosen] Remove assertNotStopped() calls for methods that were sometimes safe to call on stopped SC's in Spark 1.2
8cff41a [Josh Rosen] IllegalStateException fix
6ef68d0 [Josh Rosen] Fix Python line length issues.
9f6a0b8 [Josh Rosen] Add improved error messages to PySpark.
13afd0f [Josh Rosen] SparkException -> IllegalStateException
8d404f3 [Josh Rosen] Merge remote-tracking branch 'origin/master' into SPARK-5063
b39e041 [Josh Rosen] Fix BroadcastSuite test which broadcasted an RDD
99cc09f [Josh Rosen] Guard against calling methods on stopped SparkContexts.
34833e8 [Josh Rosen] Add more descriptive error message.
57cc8a1 [Josh Rosen] Add error message when directly broadcasting RDD.
15b2e6b [Josh Rosen] [SPARK-5063] Useful error messages for nested RDDs and actions inside of transformations
2015-01-23 17:53:15 -08:00
Sean Owen 306ff187af SPARK-5270 [CORE] Provide isEmpty() function in RDD API
Pretty minor, but submitted for consideration -- this would at least help people make this check in the most efficient way I know.

Author: Sean Owen <sowen@cloudera.com>

Closes #4074 from srowen/SPARK-5270 and squashes the following commits:

66885b8 [Sean Owen] Add note that JavaRDDLike should not be implemented by user code
2e9b490 [Sean Owen] More tests, and Mima-exclude the new isEmpty method in JavaRDDLike
28395ff [Sean Owen] Add isEmpty to Java, Python
7dd04b7 [Sean Owen] Add efficient RDD.isEmpty()
2015-01-19 22:50:45 -08:00
lewuathe 3cd516191b [SPARK-4822] Use sphinx tags for Python doc annotations
Modify python annotations for sphinx. There is no change to build process from.
https://github.com/apache/spark/blob/master/docs/README.md

Author: lewuathe <lewuathe@me.com>

Closes #3685 from Lewuathe/sphinx-tag-for-pydoc and squashes the following commits:

88a0fd9 [lewuathe] [SPARK-4822] Fix DevelopApi and WARN tags
3d7a398 [lewuathe] [SPARK-4822] Use sphinx tags for Python doc annotations
2014-12-17 17:31:24 -08:00
Davies Liu c246b95dd2 [SPARK-4841] fix zip with textFile()
UTF8Deserializer can not be used in BatchedSerializer, so always use PickleSerializer() when change batchSize in zip().

Also, if two RDD have the same batch size already, they did not need re-serialize any more.

Author: Davies Liu <davies@databricks.com>

Closes #3706 from davies/fix_4841 and squashes the following commits:

20ce3a3 [Davies Liu] fix bug in _reserialize()
e3ebf7c [Davies Liu] add comment
379d2c8 [Davies Liu] fix zip with textFile()
2014-12-15 22:58:26 -08:00
Davies Liu d39f2e9c68 [SPARK-4477] [PySpark] remove numpy from RDDSampler
In RDDSampler, it try use numpy to gain better performance for possion(), but the number of call of random() is only (1+faction) * N in the pure python implementation of possion(), so there is no much performance gain from numpy.

numpy is not a dependent of pyspark, so it maybe introduce some problem, such as there is no numpy installed in slaves, but only installed master, as reported in SPARK-927.

It also complicate the code a lot, so we may should remove numpy from RDDSampler.

I also did some benchmark to verify that:
```
>>> from pyspark.mllib.random import RandomRDDs
>>> rdd = RandomRDDs.uniformRDD(sc, 1 << 20, 1).cache()
>>> rdd.count()  # cache it
>>> rdd.sample(True, 0.9).count()    # measure this line
```
the results:

|withReplacement      |  random  | numpy.random |
 ------- | ------------ |  -------
|True | 1.5 s|  1.4 s|
|False|  0.6 s | 0.8 s|

closes #2313

Note: this patch including some commits that not mirrored to github, it will be OK after it catches up.

Author: Davies Liu <davies@databricks.com>
Author: Xiangrui Meng <meng@databricks.com>

Closes #3351 from davies/numpy and squashes the following commits:

5c438d7 [Davies Liu] fix comment
c5b9252 [Davies Liu] Merge pull request #1 from mengxr/SPARK-4477
98eb31b [Xiangrui Meng] make poisson sampling slightly faster
ee17d78 [Davies Liu] remove = for float
13f7b05 [Davies Liu] Merge branch 'master' of http://git-wip-us.apache.org/repos/asf/spark into numpy
f583023 [Davies Liu] fix tests
51649f5 [Davies Liu] remove numpy in RDDSampler
78bf997 [Davies Liu] fix tests, do not use numpy in randomSplit, no performance gain
f5fdf63 [Davies Liu] fix bug with int in weights
4dfa2cd [Davies Liu] refactor
f866bcf [Davies Liu] remove unneeded change
c7a2007 [Davies Liu] switch to python implementation
95a48ac [Davies Liu] Merge branch 'master' of github.com:apache/spark into randomSplit
0d9b256 [Davies Liu] refactor
1715ee3 [Davies Liu] address comments
41fce54 [Davies Liu] randomSplit()
2014-11-20 16:40:25 -08:00
Davies Liu 7f22fa81eb [SPARK-4327] [PySpark] Python API for RDD.randomSplit()
```
pyspark.RDD.randomSplit(self, weights, seed=None)
    Randomly splits this RDD with the provided weights.

    :param weights: weights for splits, will be normalized if they don't sum to 1
    :param seed: random seed
    :return: split RDDs in an list

    >>> rdd = sc.parallelize(range(10), 1)
    >>> rdd1, rdd2, rdd3 = rdd.randomSplit([0.4, 0.6, 1.0], 11)
    >>> rdd1.collect()
    [3, 6]
    >>> rdd2.collect()
    [0, 5, 7]
    >>> rdd3.collect()
    [1, 2, 4, 8, 9]
```

Author: Davies Liu <davies@databricks.com>

Closes #3193 from davies/randomSplit and squashes the following commits:

78bf997 [Davies Liu] fix tests, do not use numpy in randomSplit, no performance gain
f5fdf63 [Davies Liu] fix bug with int in weights
4dfa2cd [Davies Liu] refactor
f866bcf [Davies Liu] remove unneeded change
c7a2007 [Davies Liu] switch to python implementation
95a48ac [Davies Liu] Merge branch 'master' of github.com:apache/spark into randomSplit
0d9b256 [Davies Liu] refactor
1715ee3 [Davies Liu] address comments
41fce54 [Davies Liu] randomSplit()
2014-11-18 16:37:35 -08:00
Davies Liu 7779109796 [SPARK-4304] [PySpark] Fix sort on empty RDD
This PR fix sortBy()/sortByKey() on empty RDD.

This should be back ported into 1.1/1.2

Author: Davies Liu <davies@databricks.com>

Closes #3162 from davies/fix_sort and squashes the following commits:

84f64b7 [Davies Liu] add tests
52995b5 [Davies Liu] fix sortByKey() on empty RDD
2014-11-07 20:53:03 -08:00
Davies Liu e4f42631a6 [SPARK-3886] [PySpark] simplify serializer, use AutoBatchedSerializer by default.
This PR simplify serializer, always use batched serializer (AutoBatchedSerializer as default), even batch size is 1.

Author: Davies Liu <davies@databricks.com>

This patch had conflicts when merged, resolved by
Committer: Josh Rosen <joshrosen@databricks.com>

Closes #2920 from davies/fix_autobatch and squashes the following commits:

e544ef9 [Davies Liu] revert unrelated change
6880b14 [Davies Liu] Merge branch 'master' of github.com:apache/spark into fix_autobatch
1d557fc [Davies Liu] fix tests
8180907 [Davies Liu] Merge branch 'master' of github.com:apache/spark into fix_autobatch
76abdce [Davies Liu] clean up
53fa60b [Davies Liu] Merge branch 'master' of github.com:apache/spark into fix_autobatch
d7ac751 [Davies Liu] Merge branch 'master' of github.com:apache/spark into fix_autobatch
2cc2497 [Davies Liu] Merge branch 'master' of github.com:apache/spark into fix_autobatch
b4292ce [Davies Liu] fix bug in master
d79744c [Davies Liu] recover hive tests
be37ece [Davies Liu] refactor
eb3938d [Davies Liu] refactor serializer in scala
8d77ef2 [Davies Liu] simplify serializer, use AutoBatchedSerializer by default.
2014-11-03 23:56:14 -08:00
Xiangrui Meng 3cca196220 [SPARK-4148][PySpark] fix seed distribution and add some tests for rdd.sample
The current way of seed distribution makes the random sequences from partition i and i+1 offset by 1.

~~~
In [14]: import random

In [15]: r1 = random.Random(10)

In [16]: r1.randint(0, 1)
Out[16]: 1

In [17]: r1.random()
Out[17]: 0.4288890546751146

In [18]: r1.random()
Out[18]: 0.5780913011344704

In [19]: r2 = random.Random(10)

In [20]: r2.randint(0, 1)
Out[20]: 1

In [21]: r2.randint(0, 1)
Out[21]: 0

In [22]: r2.random()
Out[22]: 0.5780913011344704
~~~

Note: The new tests are not for this bug fix.

Author: Xiangrui Meng <meng@databricks.com>

Closes #3010 from mengxr/SPARK-4148 and squashes the following commits:

869ae4b [Xiangrui Meng] move tests tests.py
c1bacd9 [Xiangrui Meng] fix seed distribution and add some tests for rdd.sample
2014-11-03 12:24:24 -08:00
Xiangrui Meng f1e7361f66 [SPARK-4150][PySpark] return self in rdd.setName
Then we can do `rdd.setName('abc').cache().count()`.

Author: Xiangrui Meng <meng@databricks.com>

Closes #3011 from mengxr/rdd-setname and squashes the following commits:

10d0d60 [Xiangrui Meng] update test
4ac3bbd [Xiangrui Meng] return self in rdd.setName
2014-10-31 12:07:48 -07:00
yingjieMiao 49bbdcb660 [Spark] RDD take() method: overestimate too much
In the comment (Line 1083), it says: "Otherwise, interpolate the number of partitions we need to try, but overestimate it by 50%."

`(1.5 * num * partsScanned / buf.size).toInt` is the guess of "num of total partitions needed". In every iteration, we should consider the increment `(1.5 * num * partsScanned / buf.size).toInt - partsScanned`
Existing implementation 'exponentially' grows `partsScanned ` ( roughly: `x_{n+1} >= (1.5 + 1) x_n`)

This could be a performance problem. (unless this is the intended behavior)

Author: yingjieMiao <yingjie@42go.com>

Closes #2648 from yingjieMiao/rdd_take and squashes the following commits:

d758218 [yingjieMiao] scala style fix
a8e74bb [yingjieMiao] python style fix
4b6e777 [yingjieMiao] infix operator style fix
4391d3b [yingjieMiao] typo fix.
692f4e6 [yingjieMiao] cap numPartsToTry
c4483dc [yingjieMiao] style fix
1d2c410 [yingjieMiao] also change in rdd.py and AsyncRDD
d31ff7e [yingjieMiao] handle the edge case after 1 iteration
a2aa36b [yingjieMiao] RDD take method: overestimate too much
2014-10-13 13:11:55 -07:00
cocoatomo 7a3f589ef8 [SPARK-3909][PySpark][Doc] A corrupted format in Sphinx documents and building warnings
Sphinx documents contains a corrupted ReST format and have some warnings.

The purpose of this issue is same as https://issues.apache.org/jira/browse/SPARK-3773.

commit: 0e8203f4fb

output
```
$ cd ./python/docs
$ make clean html
rm -rf _build/*
sphinx-build -b html -d _build/doctrees   . _build/html
Making output directory...
Running Sphinx v1.2.3
loading pickled environment... not yet created
building [html]: targets for 4 source files that are out of date
updating environment: 4 added, 0 changed, 0 removed
reading sources... [100%] pyspark.sql
/Users/<user>/MyRepos/Scala/spark/python/pyspark/mllib/feature.py:docstring of pyspark.mllib.feature.Word2VecModel.findSynonyms:4: WARNING: Field list ends without a blank line; unexpected unindent.
/Users/<user>/MyRepos/Scala/spark/python/pyspark/mllib/feature.py:docstring of pyspark.mllib.feature.Word2VecModel.transform:3: WARNING: Field list ends without a blank line; unexpected unindent.
/Users/<user>/MyRepos/Scala/spark/python/pyspark/sql.py:docstring of pyspark.sql:4: WARNING: Bullet list ends without a blank line; unexpected unindent.
looking for now-outdated files... none found
pickling environment... done
checking consistency... done
preparing documents... done
writing output... [100%] pyspark.sql
writing additional files... (12 module code pages) _modules/index search
copying static files... WARNING: html_static_path entry u'/Users/<user>/MyRepos/Scala/spark/python/docs/_static' does not exist
done
copying extra files... done
dumping search index... done
dumping object inventory... done
build succeeded, 4 warnings.

Build finished. The HTML pages are in _build/html.
```

Author: cocoatomo <cocoatomo77@gmail.com>

Closes #2766 from cocoatomo/issues/3909-sphinx-build-warnings and squashes the following commits:

2c7faa8 [cocoatomo] [SPARK-3909][PySpark][Doc] A corrupted format in Sphinx documents and building warnings
2014-10-11 11:51:59 -07:00
Davies Liu 798ed22c28 [SPARK-3412] [PySpark] Replace Epydoc with Sphinx to generate Python API docs
Retire Epydoc, use Sphinx to generate API docs.

Refine Sphinx docs, also convert some docstrings into Sphinx style.

It looks like:
![api doc](https://cloud.githubusercontent.com/assets/40902/4538272/9e2d4f10-4dec-11e4-8d96-6e45a8fe51f9.png)

Author: Davies Liu <davies.liu@gmail.com>

Closes #2689 from davies/docs and squashes the following commits:

bf4a0a5 [Davies Liu] fix links
3fb1572 [Davies Liu] fix _static in jekyll
65a287e [Davies Liu] fix scripts and logo
8524042 [Davies Liu] Merge branch 'master' of github.com:apache/spark into docs
d5b874a [Davies Liu] Merge branch 'master' of github.com:apache/spark into docs
4bc1c3c [Davies Liu] refactor
746d0b6 [Davies Liu] @param -> :param
240b393 [Davies Liu] replace epydoc with sphinx doc
2014-10-07 18:09:27 -07:00
cocoatomo 2300eb58ae [SPARK-3773][PySpark][Doc] Sphinx build warning
When building Sphinx documents for PySpark, we have 12 warnings.
Their causes are almost docstrings in broken ReST format.

To reproduce this issue, we should run following commands on the commit: 6e27cb630d.

```bash
$ cd ./python/docs
$ make clean html
...
/Users/<user>/MyRepos/Scala/spark/python/pyspark/__init__.py:docstring of pyspark.SparkContext.sequenceFile:4: ERROR: Unexpected indentation.
/Users/<user>/MyRepos/Scala/spark/python/pyspark/__init__.py:docstring of pyspark.RDD.saveAsSequenceFile:4: ERROR: Unexpected indentation.
/Users/<user>/MyRepos/Scala/spark/python/pyspark/mllib/classification.py:docstring of pyspark.mllib.classification.LogisticRegressionWithSGD.train:14: ERROR: Unexpected indentation.
/Users/<user>/MyRepos/Scala/spark/python/pyspark/mllib/classification.py:docstring of pyspark.mllib.classification.LogisticRegressionWithSGD.train:16: WARNING: Definition list ends without a blank line; unexpected unindent.
/Users/<user>/MyRepos/Scala/spark/python/pyspark/mllib/classification.py:docstring of pyspark.mllib.classification.LogisticRegressionWithSGD.train:17: WARNING: Block quote ends without a blank line; unexpected unindent.
/Users/<user>/MyRepos/Scala/spark/python/pyspark/mllib/classification.py:docstring of pyspark.mllib.classification.SVMWithSGD.train:14: ERROR: Unexpected indentation.
/Users/<user>/MyRepos/Scala/spark/python/pyspark/mllib/classification.py:docstring of pyspark.mllib.classification.SVMWithSGD.train:16: WARNING: Definition list ends without a blank line; unexpected unindent.
/Users/<user>/MyRepos/Scala/spark/python/pyspark/mllib/classification.py:docstring of pyspark.mllib.classification.SVMWithSGD.train:17: WARNING: Block quote ends without a blank line; unexpected unindent.
/Users/<user>/MyRepos/Scala/spark/python/docs/pyspark.mllib.rst:50: WARNING: missing attribute mentioned in :members: or __all__: module pyspark.mllib.regression, attribute RidgeRegressionModelLinearRegressionWithSGD
/Users/<user>/MyRepos/Scala/spark/python/pyspark/mllib/tree.py:docstring of pyspark.mllib.tree.DecisionTreeModel.predict:3: ERROR: Unexpected indentation.
...
checking consistency... /Users/<user>/MyRepos/Scala/spark/python/docs/modules.rst:: WARNING: document isn't included in any toctree
...
copying static files... WARNING: html_static_path entry u'/Users/<user>/MyRepos/Scala/spark/python/docs/_static' does not exist
...
build succeeded, 12 warnings.
```

Author: cocoatomo <cocoatomo77@gmail.com>

Closes #2653 from cocoatomo/issues/3773-sphinx-build-warnings and squashes the following commits:

6f65661 [cocoatomo] [SPARK-3773][PySpark][Doc] Sphinx build warning
2014-10-06 14:08:40 -07:00
Davies Liu abf588f47a [SPARK-3749] [PySpark] fix bugs in broadcast large closure of RDD
1. broadcast is triggle unexpected
2. fd is leaked in JVM (also leak in parallelize())
3. broadcast is not unpersisted in JVM after RDD is not be used any more.

cc JoshRosen , sorry for these stupid bugs.

Author: Davies Liu <davies.liu@gmail.com>

Closes #2603 from davies/fix_broadcast and squashes the following commits:

080a743 [Davies Liu] fix bugs in broadcast large closure of RDD
2014-10-01 11:21:34 -07:00
Davies Liu c5414b6818 [SPARK-3478] [PySpark] Profile the Python tasks
This patch add profiling support for PySpark, it will show the profiling results
before the driver exits, here is one example:

```
============================================================
Profile of RDD<id=3>
============================================================
         5146507 function calls (5146487 primitive calls) in 71.094 seconds

   Ordered by: internal time, cumulative time

   ncalls  tottime  percall  cumtime  percall filename:lineno(function)
  5144576   68.331    0.000   68.331    0.000 statcounter.py:44(merge)
       20    2.735    0.137   71.071    3.554 statcounter.py:33(__init__)
       20    0.017    0.001    0.017    0.001 {cPickle.dumps}
     1024    0.003    0.000    0.003    0.000 t.py:16(<lambda>)
       20    0.001    0.000    0.001    0.000 {reduce}
       21    0.001    0.000    0.001    0.000 {cPickle.loads}
       20    0.001    0.000    0.001    0.000 copy_reg.py:95(_slotnames)
       41    0.001    0.000    0.001    0.000 serializers.py:461(read_int)
       40    0.001    0.000    0.002    0.000 serializers.py:179(_batched)
       62    0.000    0.000    0.000    0.000 {method 'read' of 'file' objects}
       20    0.000    0.000   71.072    3.554 rdd.py:863(<lambda>)
       20    0.000    0.000    0.001    0.000 serializers.py:198(load_stream)
    40/20    0.000    0.000   71.072    3.554 rdd.py:2093(pipeline_func)
       41    0.000    0.000    0.002    0.000 serializers.py:130(load_stream)
       40    0.000    0.000   71.072    1.777 rdd.py:304(func)
       20    0.000    0.000   71.094    3.555 worker.py:82(process)
```

Also, use can show profile result manually by `sc.show_profiles()` or dump it into disk
by `sc.dump_profiles(path)`, such as

```python
>>> sc._conf.set("spark.python.profile", "true")
>>> rdd = sc.parallelize(range(100)).map(str)
>>> rdd.count()
100
>>> sc.show_profiles()
============================================================
Profile of RDD<id=1>
============================================================
         284 function calls (276 primitive calls) in 0.001 seconds

   Ordered by: internal time, cumulative time

   ncalls  tottime  percall  cumtime  percall filename:lineno(function)
        4    0.000    0.000    0.000    0.000 serializers.py:198(load_stream)
        4    0.000    0.000    0.000    0.000 {reduce}
     12/4    0.000    0.000    0.001    0.000 rdd.py:2092(pipeline_func)
        4    0.000    0.000    0.000    0.000 {cPickle.loads}
        4    0.000    0.000    0.000    0.000 {cPickle.dumps}
      104    0.000    0.000    0.000    0.000 rdd.py:852(<genexpr>)
        8    0.000    0.000    0.000    0.000 serializers.py:461(read_int)
       12    0.000    0.000    0.000    0.000 rdd.py:303(func)
```
The profiling is disabled by default, can be enabled by "spark.python.profile=true".

Also, users can dump the results into disks automatically for future analysis, by "spark.python.profile.dump=path_to_dump"

This is bugfix of #2351 cc JoshRosen

Author: Davies Liu <davies.liu@gmail.com>

Closes #2556 from davies/profiler and squashes the following commits:

e68df5a [Davies Liu] Merge branch 'master' of github.com:apache/spark into profiler
858e74c [Davies Liu] compatitable with python 2.6
7ef2aa0 [Davies Liu] bugfix, add tests for show_profiles and dump_profiles()
2b0daf2 [Davies Liu] fix docs
7a56c24 [Davies Liu] bugfix
cba9463 [Davies Liu] move show_profiles and dump_profiles to SparkContext
fb9565b [Davies Liu] Merge branch 'master' of github.com:apache/spark into profiler
116d52a [Davies Liu] Merge branch 'master' of github.com:apache/spark into profiler
09d02c3 [Davies Liu] Merge branch 'master' into profiler
c23865c [Davies Liu] Merge branch 'master' into profiler
15d6f18 [Davies Liu] add docs for two configs
dadee1a [Davies Liu] add docs string and clear profiles after show or dump
4f8309d [Davies Liu] address comment, add tests
0a5b6eb [Davies Liu] fix Python UDF
4b20494 [Davies Liu] add profile for python
2014-09-30 18:24:57 -07:00
Josh Rosen f872e4fb80 Revert "[SPARK-3478] [PySpark] Profile the Python tasks"
This reverts commit 1aa549ba98.
2014-09-26 14:47:14 -07:00
Davies Liu 1aa549ba98 [SPARK-3478] [PySpark] Profile the Python tasks
This patch add profiling support for PySpark, it will show the profiling results
before the driver exits, here is one example:

```
============================================================
Profile of RDD<id=3>
============================================================
         5146507 function calls (5146487 primitive calls) in 71.094 seconds

   Ordered by: internal time, cumulative time

   ncalls  tottime  percall  cumtime  percall filename:lineno(function)
  5144576   68.331    0.000   68.331    0.000 statcounter.py:44(merge)
       20    2.735    0.137   71.071    3.554 statcounter.py:33(__init__)
       20    0.017    0.001    0.017    0.001 {cPickle.dumps}
     1024    0.003    0.000    0.003    0.000 t.py:16(<lambda>)
       20    0.001    0.000    0.001    0.000 {reduce}
       21    0.001    0.000    0.001    0.000 {cPickle.loads}
       20    0.001    0.000    0.001    0.000 copy_reg.py:95(_slotnames)
       41    0.001    0.000    0.001    0.000 serializers.py:461(read_int)
       40    0.001    0.000    0.002    0.000 serializers.py:179(_batched)
       62    0.000    0.000    0.000    0.000 {method 'read' of 'file' objects}
       20    0.000    0.000   71.072    3.554 rdd.py:863(<lambda>)
       20    0.000    0.000    0.001    0.000 serializers.py:198(load_stream)
    40/20    0.000    0.000   71.072    3.554 rdd.py:2093(pipeline_func)
       41    0.000    0.000    0.002    0.000 serializers.py:130(load_stream)
       40    0.000    0.000   71.072    1.777 rdd.py:304(func)
       20    0.000    0.000   71.094    3.555 worker.py:82(process)
```

Also, use can show profile result manually by `sc.show_profiles()` or dump it into disk
by `sc.dump_profiles(path)`, such as

```python
>>> sc._conf.set("spark.python.profile", "true")
>>> rdd = sc.parallelize(range(100)).map(str)
>>> rdd.count()
100
>>> sc.show_profiles()
============================================================
Profile of RDD<id=1>
============================================================
         284 function calls (276 primitive calls) in 0.001 seconds

   Ordered by: internal time, cumulative time

   ncalls  tottime  percall  cumtime  percall filename:lineno(function)
        4    0.000    0.000    0.000    0.000 serializers.py:198(load_stream)
        4    0.000    0.000    0.000    0.000 {reduce}
     12/4    0.000    0.000    0.001    0.000 rdd.py:2092(pipeline_func)
        4    0.000    0.000    0.000    0.000 {cPickle.loads}
        4    0.000    0.000    0.000    0.000 {cPickle.dumps}
      104    0.000    0.000    0.000    0.000 rdd.py:852(<genexpr>)
        8    0.000    0.000    0.000    0.000 serializers.py:461(read_int)
       12    0.000    0.000    0.000    0.000 rdd.py:303(func)
```
The profiling is disabled by default, can be enabled by "spark.python.profile=true".

Also, users can dump the results into disks automatically for future analysis, by "spark.python.profile.dump=path_to_dump"

Author: Davies Liu <davies.liu@gmail.com>

Closes #2351 from davies/profiler and squashes the following commits:

7ef2aa0 [Davies Liu] bugfix, add tests for show_profiles and dump_profiles()
2b0daf2 [Davies Liu] fix docs
7a56c24 [Davies Liu] bugfix
cba9463 [Davies Liu] move show_profiles and dump_profiles to SparkContext
fb9565b [Davies Liu] Merge branch 'master' of github.com:apache/spark into profiler
116d52a [Davies Liu] Merge branch 'master' of github.com:apache/spark into profiler
09d02c3 [Davies Liu] Merge branch 'master' into profiler
c23865c [Davies Liu] Merge branch 'master' into profiler
15d6f18 [Davies Liu] add docs for two configs
dadee1a [Davies Liu] add docs string and clear profiles after show or dump
4f8309d [Davies Liu] address comment, add tests
0a5b6eb [Davies Liu] fix Python UDF
4b20494 [Davies Liu] add profile for python
2014-09-26 09:27:42 -07:00
Aaron Staple 8ca4ecb6a5 [SPARK-546] Add full outer join to RDD and DStream.
leftOuterJoin and rightOuterJoin are already implemented.  This patch adds fullOuterJoin.

Author: Aaron Staple <aaron.staple@gmail.com>

Closes #1395 from staple/SPARK-546 and squashes the following commits:

1f5595c [Aaron Staple] Fix python style
7ac0aa9 [Aaron Staple] [SPARK-546] Add full outer join to RDD and DStream.
3b5d137 [Aaron Staple] In JavaPairDStream, make class tag specification in rightOuterJoin consistent with other functions.
31f2956 [Aaron Staple] Fix left outer join documentation comments.
2014-09-24 20:39:09 -07:00
Davies Liu fce5e251d6 [SPARK-3491] [MLlib] [PySpark] use pickle to serialize data in MLlib
Currently, we serialize the data between JVM and Python case by case manually, this cannot scale to support so many APIs in MLlib.

This patch will try to address this problem by serialize the data using pickle protocol, using Pyrolite library to serialize/deserialize in JVM. Pickle protocol can be easily extended to support customized class.

All the modules are refactored to use this protocol.

Known issues: There will be some performance regression (both CPU and memory, the serialized data increased)

Author: Davies Liu <davies.liu@gmail.com>

Closes #2378 from davies/pickle_mllib and squashes the following commits:

dffbba2 [Davies Liu] Merge branch 'master' of github.com:apache/spark into pickle_mllib
810f97f [Davies Liu] fix equal of matrix
032cd62 [Davies Liu] add more type check and conversion for user_product
bd738ab [Davies Liu] address comments
e431377 [Davies Liu] fix cache of rdd, refactor
19d0967 [Davies Liu] refactor Picklers
2511e76 [Davies Liu] cleanup
1fccf1a [Davies Liu] address comments
a2cc855 [Davies Liu] fix tests
9ceff73 [Davies Liu] test size of serialized Rating
44e0551 [Davies Liu] fix cache
a379a81 [Davies Liu] fix pickle array in python2.7
df625c7 [Davies Liu] Merge commit '154d141' into pickle_mllib
154d141 [Davies Liu] fix autobatchedpickler
44736d7 [Davies Liu] speed up pickling array in Python 2.7
e1d1bfc [Davies Liu] refactor
708dc02 [Davies Liu] fix tests
9dcfb63 [Davies Liu] fix style
88034f0 [Davies Liu] rafactor, address comments
46a501e [Davies Liu] choose batch size automatically
df19464 [Davies Liu] memorize the module and class name during pickleing
f3506c5 [Davies Liu] Merge branch 'master' into pickle_mllib
722dd96 [Davies Liu] cleanup _common.py
0ee1525 [Davies Liu] remove outdated tests
b02e34f [Davies Liu] remove _common.py
84c721d [Davies Liu] Merge branch 'master' into pickle_mllib
4d7963e [Davies Liu] remove muanlly serialization
6d26b03 [Davies Liu] fix tests
c383544 [Davies Liu] classification
f2a0856 [Davies Liu] mllib/regression
d9f691f [Davies Liu] mllib/util
cccb8b1 [Davies Liu] mllib/tree
8fe166a [Davies Liu] Merge branch 'pickle' into pickle_mllib
aa2287e [Davies Liu] random
f1544c4 [Davies Liu] refactor clustering
52d1350 [Davies Liu] use new protocol in mllib/stat
b30ef35 [Davies Liu] use pickle to serialize data for mllib/recommendation
f44f771 [Davies Liu] enable tests about array
3908f5c [Davies Liu] Merge branch 'master' into pickle
c77c87b [Davies Liu] cleanup debugging code
60e4e2f [Davies Liu] support unpickle array.array for Python 2.6
2014-09-19 15:01:11 -07:00
Davies Liu e77fa81a61 [SPARK-3554] [PySpark] use broadcast automatically for large closure
Py4j can not handle large string efficiently, so we should use broadcast for large closure automatically. (Broadcast use local filesystem to pass through data).

Author: Davies Liu <davies.liu@gmail.com>

Closes #2417 from davies/command and squashes the following commits:

fbf4e97 [Davies Liu] bugfix
aefd508 [Davies Liu] use broadcast automatically for large closure
2014-09-18 18:11:48 -07:00
Matthew Farrellee 9d5fa763d8 [SPARK-3519] add distinct(n) to PySpark
Added missing rdd.distinct(numPartitions) and associated tests

Author: Matthew Farrellee <matt@redhat.com>

Closes #2383 from mattf/SPARK-3519 and squashes the following commits:

30b837a [Matthew Farrellee] Combine test cases to save on JVM startups
6bc4a2c [Matthew Farrellee] [SPARK-3519] add distinct(n) to SchemaRDD in PySpark
7a17f2b [Matthew Farrellee] [SPARK-3519] add distinct(n) to PySpark
2014-09-16 11:39:57 -07:00
Aaron Staple 60050f4288 [SPARK-1087] Move python traceback utilities into new traceback_utils.py file.
Also made some cosmetic cleanups.

Author: Aaron Staple <aaron.staple@gmail.com>

Closes #2385 from staple/SPARK-1087 and squashes the following commits:

7b3bb13 [Aaron Staple] Address review comments, cosmetic cleanups.
10ba6e1 [Aaron Staple] [SPARK-1087] Move python traceback utilities into new traceback_utils.py file.
2014-09-15 19:28:17 -07:00
RJ Nowling 533377621f [PySpark] Add blank line so that Python RDD.top() docstring renders correctly
Author: RJ Nowling <rnowling@gmail.com>

Closes #2370 from rnowling/python_rdd_docstrings and squashes the following commits:

5230574 [RJ Nowling] Add blank line so that Python RDD.top() docstring renders correctly
2014-09-12 09:46:21 -07:00
Sandy Ryza 16a73c2473 SPARK-2978. Transformation with MR shuffle semantics
I didn't add this to the transformations list in the docs because it's kind of obscure, but would be happy to do so if others think it would be helpful.

Author: Sandy Ryza <sandy@cloudera.com>

Closes #2274 from sryza/sandy-spark-2978 and squashes the following commits:

4a5332a [Sandy Ryza] Fix Java test
c04b447 [Sandy Ryza] Fix Python doc and add back deleted code
433ad5b [Sandy Ryza] Add Java test
4c25a54 [Sandy Ryza] Add s at the end and a couple other fixes
9b0ba99 [Sandy Ryza] Fix compilation
36e0571 [Sandy Ryza] Fix import ordering
48c12c2 [Sandy Ryza] Add Java version and additional doc
e5381cd [Sandy Ryza] Fix python style warnings
f147634 [Sandy Ryza] SPARK-2978. Transformation with MR shuffle semantics
2014-09-08 11:20:00 -07:00
Davies Liu 110fb8b24d [SPARK-2334] fix AttributeError when call PipelineRDD.id()
The underline JavaRDD for PipelineRDD is created lazily, it's delayed until call _jrdd.

The id of JavaRDD is cached as `_id`, it saves a RPC call in py4j for later calls.

closes #1276

Author: Davies Liu <davies.liu@gmail.com>

Closes #2296 from davies/id and squashes the following commits:

e197958 [Davies Liu] fix style
9721716 [Davies Liu] fix id of PipelineRDD
2014-09-06 16:12:29 -07:00
Holden Karau da35330e83 Spark-3406 add a default storage level to python RDD persist API
Author: Holden Karau <holden@pigscanfly.ca>

Closes #2280 from holdenk/SPARK-3406-Python-RDD-persist-api-does-not-have-default-storage-level and squashes the following commits:

33eaade [Holden Karau] As Josh pointed out, sql also override persist. Make persist behave the same as in the underlying RDD as well
e658227 [Holden Karau] Fix the test I added
e95a6c5 [Holden Karau] The Python persist function did not have a default storageLevel unlike the Scala API. Noticed this issue because we got a bug report back from the book where we had documented it as if it was the same as the Scala API
2014-09-06 14:49:25 -07:00
Andrew Ash ba5bcaddec SPARK-3211 .take() is OOM-prone with empty partitions
Instead of jumping straight from 1 partition to all partitions, do exponential
growth and double the number of partitions to attempt each time instead.

Fix proposed by Paul Nepywoda

Author: Andrew Ash <andrew@andrewash.com>

Closes #2117 from ash211/SPARK-3211 and squashes the following commits:

8b2299a [Andrew Ash] Quadruple instead of double for a minor speedup
e5f7e4d [Andrew Ash] Update comment to better reflect what we're doing
09a27f7 [Andrew Ash] Update PySpark to be less OOM-prone as well
3a156b8 [Andrew Ash] SPARK-3211 .take() is OOM-prone with empty partitions
2014-09-05 18:52:05 -07:00
Davies Liu 6481d27425 [SPARK-3309] [PySpark] Put all public API in __all__
Put all public API in __all__, also put them all in pyspark.__init__.py, then we can got all the documents for public API by `pydoc pyspark`. It also can be used by other programs (such as Sphinx or Epydoc) to generate only documents for public APIs.

Author: Davies Liu <davies.liu@gmail.com>

Closes #2205 from davies/public and squashes the following commits:

c6c5567 [Davies Liu] fix message
f7b35be [Davies Liu] put SchemeRDD, Row in pyspark.sql module
7e3016a [Davies Liu] add __all__ in mllib
6281b48 [Davies Liu] fix doc for SchemaRDD
6caab21 [Davies Liu] add public interfaces into pyspark.__init__.py
2014-09-03 11:49:45 -07:00
Davies Liu e2c901b4c7 [SPARK-2871] [PySpark] add countApproxDistinct() API
RDD.countApproxDistinct(relativeSD=0.05):

        :: Experimental ::
        Return approximate number of distinct elements in the RDD.

        The algorithm used is based on streamlib's implementation of
        "HyperLogLog in Practice: Algorithmic Engineering of a State
        of The Art Cardinality Estimation Algorithm", available
        <a href="http://dx.doi.org/10.1145/2452376.2452456">here</a>.

        This support all the types of objects, which is supported by
        Pyrolite, nearly all builtin types.

        param relativeSD Relative accuracy. Smaller values create
                           counters that require more space.
                           It must be greater than 0.000017.

        >>> n = sc.parallelize(range(1000)).map(str).countApproxDistinct()
        >>> 950 < n < 1050
        True
        >>> n = sc.parallelize([i % 20 for i in range(1000)]).countApproxDistinct()
        >>> 18 < n < 22
        True

Author: Davies Liu <davies.liu@gmail.com>

Closes #2142 from davies/countApproxDistinct and squashes the following commits:

e20da47 [Davies Liu] remove the correction in Python
c38c4e4 [Davies Liu] fix doc tests
2ab157c [Davies Liu] fix doc tests
9d2565f [Davies Liu] add commments and link for hash collision correction
d306492 [Davies Liu] change range of hash of tuple to [0, maxint]
ded624f [Davies Liu] calculate hash in Python
4cba98f [Davies Liu] add more tests
a85a8c6 [Davies Liu] Merge branch 'master' into countApproxDistinct
e97e342 [Davies Liu] add countApproxDistinct()
2014-09-02 15:47:47 -07:00
Davies Liu 4fa2fda88f [SPARK-2871] [PySpark] add RDD.lookup(key)
RDD.lookup(key)

        Return the list of values in the RDD for key `key`. This operation
        is done efficiently if the RDD has a known partitioner by only
        searching the partition that the key maps to.

        >>> l = range(1000)
        >>> rdd = sc.parallelize(zip(l, l), 10)
        >>> rdd.lookup(42)  # slow
        [42]
        >>> sorted = rdd.sortByKey()
        >>> sorted.lookup(42)  # fast
        [42]

It also clean up the code in RDD.py, and fix several bugs (related to preservesPartitioning).

Author: Davies Liu <davies.liu@gmail.com>

Closes #2093 from davies/lookup and squashes the following commits:

1789cd4 [Davies Liu] `f` in foreach could be generator or not.
2871b80 [Davies Liu] Merge branch 'master' into lookup
c6390ea [Davies Liu] address all comments
0f1bce8 [Davies Liu] add test case for lookup()
be0e8ba [Davies Liu] fix preservesPartitioning
eb1305d [Davies Liu] add RDD.lookup(key)
2014-08-27 13:18:33 -07:00
Davies Liu f1e71d4c3b [SPARK-3073] [PySpark] use external sort in sortBy() and sortByKey()
Using external sort to support sort large datasets in reduce stage.

Author: Davies Liu <davies.liu@gmail.com>

Closes #1978 from davies/sort and squashes the following commits:

bbcd9ba [Davies Liu] check spilled bytes in tests
b125d2f [Davies Liu] add test for external sort in rdd
eae0176 [Davies Liu] choose different disks from different processes and instances
1f075ed [Davies Liu] Merge branch 'master' into sort
eb53ca6 [Davies Liu] Merge branch 'master' into sort
644abaf [Davies Liu] add license in LICENSE
19f7873 [Davies Liu] improve tests
55602ee [Davies Liu] use external sort in sortBy() and sortByKey()
2014-08-26 16:57:40 -07:00
Davies Liu 3cedc4f4d7 [SPARK-2871] [PySpark] add histgram() API
RDD.histogram(buckets)

        Compute a histogram using the provided buckets. The buckets
        are all open to the right except for the last which is closed.
        e.g. [1,10,20,50] means the buckets are [1,10) [10,20) [20,50],
        which means 1<=x<10, 10<=x<20, 20<=x<=50. And on the input of 1
        and 50 we would have a histogram of 1,0,1.

        If your histogram is evenly spaced (e.g. [0, 10, 20, 30]),
        this can be switched from an O(log n) inseration to O(1) per
        element(where n = # buckets).

        Buckets must be sorted and not contain any duplicates, must be
        at least two elements.

        If `buckets` is a number, it will generates buckets which is
        evenly spaced between the minimum and maximum of the RDD. For
        example, if the min value is 0 and the max is 100, given buckets
        as 2, the resulting buckets will be [0,50) [50,100]. buckets must
        be at least 1 If the RDD contains infinity, NaN throws an exception
        If the elements in RDD do not vary (max == min) always returns
        a single bucket.

        It will return an tuple of buckets and histogram.

        >>> rdd = sc.parallelize(range(51))
        >>> rdd.histogram(2)
        ([0, 25, 50], [25, 26])
        >>> rdd.histogram([0, 5, 25, 50])
        ([0, 5, 25, 50], [5, 20, 26])
        >>> rdd.histogram([0, 15, 30, 45, 60], True)
        ([0, 15, 30, 45, 60], [15, 15, 15, 6])
        >>> rdd = sc.parallelize(["ab", "ac", "b", "bd", "ef"])
        >>> rdd.histogram(("a", "b", "c"))
        (('a', 'b', 'c'), [2, 2])

closes #122, it's duplicated.

Author: Davies Liu <davies.liu@gmail.com>

Closes #2091 from davies/histgram and squashes the following commits:

a322f8a [Davies Liu] fix deprecation of e.message
84e85fa [Davies Liu] remove evenBuckets, add more tests (including str)
d9a0722 [Davies Liu] address comments
0e18a2d [Davies Liu] add histgram() API
2014-08-26 13:04:30 -07:00
Davies Liu fb0db77242 [SPARK-2871] [PySpark] add zipWithIndex() and zipWithUniqueId()
RDD.zipWithIndex()

        Zips this RDD with its element indices.

        The ordering is first based on the partition index and then the
        ordering of items within each partition. So the first item in
        the first partition gets index 0, and the last item in the last
        partition receives the largest index.

        This method needs to trigger a spark job when this RDD contains
        more than one partitions.

        >>> sc.parallelize(range(4), 2).zipWithIndex().collect()
        [(0, 0), (1, 1), (2, 2), (3, 3)]

RDD.zipWithUniqueId()

        Zips this RDD with generated unique Long ids.

        Items in the kth partition will get ids k, n+k, 2*n+k, ..., where
        n is the number of partitions. So there may exist gaps, but this
        method won't trigger a spark job, which is different from
        L{zipWithIndex}

        >>> sc.parallelize(range(4), 2).zipWithUniqueId().collect()
        [(0, 0), (2, 1), (1, 2), (3, 3)]

Author: Davies Liu <davies.liu@gmail.com>

Closes #2092 from davies/zipWith and squashes the following commits:

cebe5bf [Davies Liu] improve test cases, reverse the order of index
0d2a128 [Davies Liu] add zipWithIndex() and zipWithUniqueId()
2014-08-24 21:16:05 -07:00
Davies Liu 8df4dad495 [SPARK-2871] [PySpark] add approx API for RDD
RDD.countApprox(self, timeout, confidence=0.95)

        :: Experimental ::
        Approximate version of count() that returns a potentially incomplete
        result within a timeout, even if not all tasks have finished.

        >>> rdd = sc.parallelize(range(1000), 10)
        >>> rdd.countApprox(1000, 1.0)
        1000

RDD.sumApprox(self, timeout, confidence=0.95)

        Approximate operation to return the sum within a timeout
        or meet the confidence.

        >>> rdd = sc.parallelize(range(1000), 10)
        >>> r = sum(xrange(1000))
        >>> (rdd.sumApprox(1000) - r) / r < 0.05

RDD.meanApprox(self, timeout, confidence=0.95)

        :: Experimental ::
        Approximate operation to return the mean within a timeout
        or meet the confidence.

        >>> rdd = sc.parallelize(range(1000), 10)
        >>> r = sum(xrange(1000)) / 1000.0
        >>> (rdd.meanApprox(1000) - r) / r < 0.05
        True

Author: Davies Liu <davies.liu@gmail.com>

Closes #2095 from davies/approx and squashes the following commits:

e8c252b [Davies Liu] add approx API for RDD
2014-08-23 19:33:34 -07:00
Davies Liu db436e36c4 [SPARK-2871] [PySpark] add key argument for max(), min() and top(n)
RDD.max(key=None)

        param key: A function used to generate key for comparing

        >>> rdd = sc.parallelize([1.0, 5.0, 43.0, 10.0])
        >>> rdd.max()
        43.0
        >>> rdd.max(key=str)
        5.0

RDD.min(key=None)

        Find the minimum item in this RDD.

        param key: A function used to generate key for comparing

        >>> rdd = sc.parallelize([2.0, 5.0, 43.0, 10.0])
        >>> rdd.min()
        2.0
        >>> rdd.min(key=str)
        10.0

RDD.top(num, key=None)

        Get the top N elements from a RDD.

        Note: It returns the list sorted in descending order.
        >>> sc.parallelize([10, 4, 2, 12, 3]).top(1)
        [12]
        >>> sc.parallelize([2, 3, 4, 5, 6], 2).top(2)
        [6, 5]
        >>> sc.parallelize([10, 4, 2, 12, 3]).top(3, key=str)
        [4, 3, 2]

Author: Davies Liu <davies.liu@gmail.com>

Closes #2094 from davies/cmp and squashes the following commits:

ccbaf25 [Davies Liu] add `key` to top()
ad7e374 [Davies Liu] fix tests
2f63512 [Davies Liu] change `comp` to `key` in min/max
dd91e08 [Davies Liu] add `comp` argument for RDD.max() and RDD.min()
2014-08-23 18:55:13 -07:00
Davies Liu 0a7ef6339f [SPARK-3141] [PySpark] fix sortByKey() with take()
Fix sortByKey() with take()

The function `f` used in mapPartitions should always return an iterator.

Author: Davies Liu <davies.liu@gmail.com>

Closes #2045 from davies/fix_sortbykey and squashes the following commits:

1160f59 [Davies Liu] fix sortByKey() with take()
2014-08-19 22:43:49 -07:00
Davies Liu d7e80c2597 [SPARK-2790] [PySpark] fix zip with serializers which have different batch sizes.
If two RDDs have different batch size in serializers, then it will try to re-serialize the one with smaller batch size, then call RDD.zip() in Spark.

Author: Davies Liu <davies.liu@gmail.com>

Closes #1894 from davies/zip and squashes the following commits:

c4652ea [Davies Liu] add more test cases
6d05fc8 [Davies Liu] Merge branch 'master' into zip
813b1e4 [Davies Liu] add more tests for failed cases
a4aafda [Davies Liu] fix zip with serializers which have different batch sizes.
2014-08-19 14:46:32 -07:00
Josh Rosen 1f1819b20f [SPARK-3114] [PySpark] Fix Python UDFs in Spark SQL.
This fixes SPARK-3114, an issue where we inadvertently broke Python UDFs in Spark SQL.

This PR modifiers the test runner script to always run the PySpark SQL tests, irrespective of whether SparkSQL itself has been modified.  It also includes Davies' fix for the bug.

Closes #2026.

Author: Josh Rosen <joshrosen@apache.org>
Author: Davies Liu <davies.liu@gmail.com>

Closes #2027 from JoshRosen/pyspark-sql-fix and squashes the following commits:

9af2708 [Davies Liu] bugfix: disable compression of command
0d8d3a4 [Josh Rosen] Always run Python Spark SQL tests.
2014-08-18 20:42:19 -07:00
Davies Liu d1d0ee41c2 [SPARK-3103] [PySpark] fix saveAsTextFile() with utf-8
bugfix: It will raise an exception when it try to encode non-ASCII strings into unicode. It should only encode unicode as "utf-8".

Author: Davies Liu <davies.liu@gmail.com>

Closes #2018 from davies/fix_utf8 and squashes the following commits:

4db7967 [Davies Liu] fix saveAsTextFile() with utf-8
2014-08-18 13:58:35 -07:00
Davies Liu 2fc8aca086 [SPARK-1065] [PySpark] improve supporting for large broadcast
Passing large object by py4j is very slow (cost much memory), so pass broadcast objects via files (similar to parallelize()).

Add an option to keep object in driver (it's False by default) to save memory in driver.

Author: Davies Liu <davies.liu@gmail.com>

Closes #1912 from davies/broadcast and squashes the following commits:

e06df4a [Davies Liu] load broadcast from disk in driver automatically
db3f232 [Davies Liu] fix serialization of accumulator
631a827 [Davies Liu] Merge branch 'master' into broadcast
c7baa8c [Davies Liu] compress serrialized broadcast and command
9a7161f [Davies Liu] fix doc tests
e93cf4b [Davies Liu] address comments: add test
6226189 [Davies Liu] improve large broadcast
2014-08-16 16:59:34 -07:00
Davies Liu 434bea1c00 [SPARK-2983] [PySpark] improve performance of sortByKey()
1. skip partitionBy() when numOfPartition is 1
2. use bisect_left (O(lg(N))) instread of loop (O(N)) in
rangePartitioner

Author: Davies Liu <davies.liu@gmail.com>

Closes #1898 from davies/sort and squashes the following commits:

0a9608b [Davies Liu] Merge branch 'master' into sort
1cf9565 [Davies Liu] improve performance of sortByKey()
2014-08-13 14:57:12 -07:00
RJ Nowling e537b33c63 [PySpark] Add blanklines to Python docstrings so example code renders correctly
Author: RJ Nowling <rnowling@gmail.com>

Closes #1808 from rnowling/pyspark_docs and squashes the following commits:

c06d774 [RJ Nowling] Add blanklines to Python docstrings so example code renders correctly
2014-08-06 14:12:21 -07:00
Nicholas Chammas d614967b0b [SPARK-2627] [PySpark] have the build enforce PEP 8 automatically
As described in [SPARK-2627](https://issues.apache.org/jira/browse/SPARK-2627), we'd like Python code to automatically be checked for PEP 8 compliance by Jenkins. This pull request aims to do that.

Notes:
* We may need to install [`pep8`](https://pypi.python.org/pypi/pep8) on the build server.
* I'm expecting tests to fail now that PEP 8 compliance is being checked as part of the build. I'm fine with cleaning up any remaining PEP 8 violations as part of this pull request.
* I did not understand why the RAT and scalastyle reports are saved to text files. I did the same for the PEP 8 check, but only so that the console output style can match those for the RAT and scalastyle checks. The PEP 8 report is removed right after the check is complete.
* Updates to the ["Contributing to Spark"](https://cwiki.apache.org/confluence/display/SPARK/Contributing+to+Spark) guide will be submitted elsewhere, as I don't believe that text is part of the Spark repo.

Author: Nicholas Chammas <nicholas.chammas@gmail.com>
Author: nchammas <nicholas.chammas@gmail.com>

Closes #1744 from nchammas/master and squashes the following commits:

274b238 [Nicholas Chammas] [SPARK-2627] [PySpark] minor indentation changes
983d963 [nchammas] Merge pull request #5 from apache/master
1db5314 [nchammas] Merge pull request #4 from apache/master
0e0245f [Nicholas Chammas] [SPARK-2627] undo erroneous whitespace fixes
bf30942 [Nicholas Chammas] [SPARK-2627] PEP8: comment spacing
6db9a44 [nchammas] Merge pull request #3 from apache/master
7b4750e [Nicholas Chammas] merge upstream changes
91b7584 [Nicholas Chammas] [SPARK-2627] undo unnecessary line breaks
44e3e56 [Nicholas Chammas] [SPARK-2627] use tox.ini to exclude files
b09fae2 [Nicholas Chammas] don't wrap comments unnecessarily
bfb9f9f [Nicholas Chammas] [SPARK-2627] keep up with the PEP 8 fixes
9da347f [nchammas] Merge pull request #2 from apache/master
aa5b4b5 [Nicholas Chammas] [SPARK-2627] follow Spark bash style for if blocks
d0a83b9 [Nicholas Chammas] [SPARK-2627] check that pep8 downloaded fine
dffb5dd [Nicholas Chammas] [SPARK-2627] download pep8 at runtime
a1ce7ae [Nicholas Chammas] [SPARK-2627] space out test report sections
21da538 [Nicholas Chammas] [SPARK-2627] it's PEP 8, not PEP8
6f4900b [Nicholas Chammas] [SPARK-2627] more misc PEP 8 fixes
fe57ed0 [Nicholas Chammas] removing merge conflict backups
9c01d4c [nchammas] Merge pull request #1 from apache/master
9a66cb0 [Nicholas Chammas] resolving merge conflicts
a31ccc4 [Nicholas Chammas] [SPARK-2627] miscellaneous PEP 8 fixes
beaa9ac [Nicholas Chammas] [SPARK-2627] fail check on non-zero status
723ed39 [Nicholas Chammas] always delete the report file
0541ebb [Nicholas Chammas] [SPARK-2627] call Python linter from run-tests
12440fa [Nicholas Chammas] [SPARK-2627] add Scala linter
61c07b9 [Nicholas Chammas] [SPARK-2627] add Python linter
75ad552 [Nicholas Chammas] make check output style consistent
2014-08-06 12:58:24 -07:00